diff --git a/grumble.go b/grumble.go index 1bd5b65..471dcb4 100644 --- a/grumble.go +++ b/grumble.go @@ -10,10 +10,14 @@ import ( "flag" "fmt" "gob" + "io" + "io/ioutil" "os" "log" "sqlite" "path/filepath" + "regexp" + "time" ) var help *bool = flag.Bool("help", false, "Show this help") @@ -158,21 +162,53 @@ func main() { servers := make(map[int64]*Server) for _, name := range names { - log.Printf("Loading server %v", name) - s, err := NewServerFromFrozen(filepath.Join(*datadir, name)) - if err != nil { - log.Fatalf("Unable to load server: %s", err.String()) + if matched, _ := regexp.MatchString("^[0-9]+$", name); matched { + log.Printf("Loading server %v", name) + s, err := NewServerFromFrozen(filepath.Join(*datadir, name)) + if err != nil { + log.Fatalf("Unable to load server: %s", err.String()) + } + servers[s.Id] = s + go s.ListenAndMurmur() } - - servers[s.Id] = s - go s.ListenAndMurmur() } if len(servers) > 0 { - // Sleep. - sleeper := make(chan int) - zzz := <-sleeper - if zzz > 0 { + ticker := time.NewTicker(10e9) // 10 secs + for { + select { + case <-ticker.C: + for sid, server := range servers { + r := server.FreezeServer() + if err != nil { + log.Panicf("Unable to freeze server %v", sid) + } + f, err := ioutil.TempFile(*datadir, fmt.Sprintf("%v_", sid)) + if err != nil { + log.Panicf("Unable to open file: %", err.String()) + } + nwritten, err := io.Copy(f, r) + if err != nil { + log.Panicf("Unable to copy frozen server data: %v bytes, err=%v", nwritten, err) + } + err = r.Close() + if err != nil { + log.Panicf("Unable to freeze server: %v", err) + } + err = f.Sync() + if err != nil { + log.Panicf("Unable to sync frozen file: %v", err) + } + err = f.Close() + if err != nil { + log.Panicf("Unable to freeze server: %v", err) + } + err = os.Rename(f.Name(), filepath.Join(*datadir, fmt.Sprintf("%v", sid))) + if err != nil { + log.Panicf("Unable to freeze server: %v", err) + } + } + } } } } diff --git a/server.go b/server.go index c9f4415..3780d68 100644 --- a/server.go +++ b/server.go @@ -12,13 +12,16 @@ import ( "net" "bufio" "bytes" + "compress/gzip" "encoding/binary" "encoding/hex" "sync" "goprotobuf.googlecode.com/hg/proto" "mumbleproto" "cryptstate" + "gob" "hash" + "io" "rand" "strings" ) @@ -48,6 +51,7 @@ type Server struct { incoming chan *Message udpsend chan *Message voicebroadcast chan *VoiceBroadcast + freezeRequest chan *freezeRequest // Signals to the server that a client has been successfully // authenticated. @@ -84,6 +88,11 @@ type Server struct { aclcache ACLCache } +type freezeRequest struct { + done chan bool + readCloser io.ReadCloser +} + // Allocate a new Murmur instance func NewServer(id int64, addr string, port int) (s *Server, err os.Error) { s = new(Server) @@ -103,6 +112,7 @@ func NewServer(id int64, addr string, port int) (s *Server, err os.Error) { s.incoming = make(chan *Message) s.udpsend = make(chan *Message) s.voicebroadcast = make(chan *VoiceBroadcast) + s.freezeRequest = make(chan *freezeRequest) s.clientAuthenticated = make(chan *Client) s.MaxBandwidth = 300000 @@ -326,10 +336,45 @@ func (server *Server) handler() { // server info. case client := <-server.clientAuthenticated: server.finishAuthenticate(client) + + // Synchonized freeze requests + case req := <-server.freezeRequest: + fs, err := server.Freeze() + if err != nil { + log.Panicf("Unable to freeze the server") + } + go server.handleFreezeRequest(req, &fs) } } } +func (server *Server) handleFreezeRequest(freq *freezeRequest, fs *frozenServer) { + pr, pw := io.Pipe() + + freq.readCloser = pr + freq.done <- true + + zw, err := gzip.NewWriterLevel(pw, gzip.BestCompression) + if err != nil { + if err = pw.CloseWithError(err); err != nil { + log.Panicf("Unable to close PipeWriter: %v", err.String()) + } + return + } + + enc := gob.NewEncoder(zw) + err = enc.Encode(fs) + if err != nil { + if err = pw.CloseWithError(err); err != nil { + log.Panicf("Unable to close PipeWriter: %v", err.String()) + } + } + + if err = pw.CloseWithError(zw.Close()); err != nil { + log.Panicf("Unable to close PipeWriter: %v", err.String()) + } +} + // Handle an Authenticate protobuf message. This is handled in a separate // goroutine to allow for remote authenticators that are slow to respond. // @@ -936,6 +981,15 @@ func (server *Server) userEnterChannel(client *Client, channel *Channel, usersta } } +// Create a point-in-time snapshot of Server and make it +// accessible through the returned io.ReadCloser. +func (s *Server) FreezeServer() io.ReadCloser { + fr := &freezeRequest{done:make(chan bool)} + s.freezeRequest <- fr + <-fr.done + return fr.readCloser +} + // The accept loop of the server. func (s *Server) ListenAndMurmur() { // Launch the event handler goroutine