From eb4f3319fd43a13e0e18ea1fabe9b2a52afaf89e Mon Sep 17 00:00:00 2001 From: Mikkel Krautz Date: Sat, 27 Aug 2011 15:15:23 +0200 Subject: [PATCH] Hook up new freezer code to the server. --- Makefile | 7 +- ctlrpc.go | 1 + freeze.go | 870 +++++++++++++++++++++++++++++++++++++------------ group.go | 2 +- grumble.go | 145 +-------- message.go | 44 ++- server.go | 110 ++----- signal_unix.go | 12 - 8 files changed, 744 insertions(+), 447 deletions(-) diff --git a/Makefile b/Makefile index 8713da6..aaf05a8 100644 --- a/Makefile +++ b/Makefile @@ -18,7 +18,7 @@ PACKAGES = \ pkg/sessionpool \ pkg/ban \ pkg/htmlfilter \ - pkg/sqlite + pkg/freezer GCFLAGS = \ -Ipkg/cryptstate/_obj \ @@ -29,7 +29,7 @@ GCFLAGS = \ -Ipkg/sessionpool/_obj \ -Ipkg/ban/_obj \ -Ipkg/htmlfilter/_obj \ - -Ipkg/sqlite/_obj + -Ipkg/freezer/_obj LDFLAGS = \ -Lpkg/cryptstate/_obj \ @@ -40,7 +40,7 @@ LDFLAGS = \ -Lpkg/sessionpool/_obj \ -Lpkg/ban/_obj \ -Lpkg/htmlfilter/_obj \ - -Lpkg/sqlite/_obj + -Lpkg/freezer/_obj GOFILES = \ grumble.go \ @@ -51,7 +51,6 @@ GOFILES = \ acl.go \ group.go \ user.go \ - murmurdb.go \ freeze.go \ gencert.go \ register.go \ diff --git a/ctlrpc.go b/ctlrpc.go index c176f57..6349cce 100644 --- a/ctlrpc.go +++ b/ctlrpc.go @@ -56,6 +56,7 @@ func (c *ControlRPC) SetConfig(in *KeyValuePair, out *KeyValuePair) os.Error { return os.NewError("no such server") } server.cfg.Set(in.Key, in.Value) + server.cfgUpdate <- in out.Id = in.Id out.Key = in.Key out.Value = in.Value diff --git a/freeze.go b/freeze.go index cd4ba4d..7f27a08 100644 --- a/freeze.go +++ b/freeze.go @@ -5,82 +5,43 @@ package main import ( - "compress/gzip" - "fmt" - "gob" + "goprotobuf.googlecode.com/hg/proto" "grumble/ban" + "grumble/freezer" "grumble/serverconf" - "io" "io/ioutil" + "log" + "mumbleproto" "os" - "runtime" + "path/filepath" + "strconv" ) -type frozenServer struct { - Id int "id" - SuperUserPassword string "supw" - Config map[string]string "config" - Bans []ban.Ban "bans" - Channels []frozenChannel "channels" - Users []frozenUser "users" -} +// Freeze a server to disk, and re-open the log, if needed. +// This must be called from within the Server's synchronous handler. +func (server *Server) FreezeToFile() (err os.Error) { + // Close the log file, if it's open + if server.freezelog != nil { + err = server.freezelog.Close() + if err != nil { + return err + } + } -type frozenUser struct { - Id uint32 "id" - Name string "name" - Password string "password" - CertHash string "cert_hash" - Email string "email" - TextureBlob string "texture_blob" - CommentBlob string "comment_blob" - LastChannelId int "last_channel_id" - LastActive uint64 "last_active" -} - -type frozenChannel struct { - Id int "id" - Name string "name" - ParentId int "parent_id" - Position int64 "position" - InheritACL bool "inherit_acl" - Links []int "links" - ACL []frozenACL "acl" - Groups []frozenGroup "groups" - DescriptionBlob string "description_blob" -} - -type frozenACL struct { - UserId int "user_id" - Group string "group" - ApplyHere bool "apply_here" - ApplySubs bool "apply_subs" - Allow uint32 "allow" - Deny uint32 "deny" -} - -type frozenGroup struct { - Name string "name" - Inherit bool "inherit" - Inheritable bool "inheritable" - Add []int "add" - Remove []int "remove" -} - -// Freeze a server and write it to a file. -func (server *Server) FreezeToFile(filename string) (err os.Error) { - r := server.FreezeServer() + // Make sure the whole server is synced to disk + fs, err := server.Freeze() if err != nil { return err } - f, err := ioutil.TempFile(*datadir, fmt.Sprintf("%v_", server.Id)) + f, err := ioutil.TempFile(filepath.Join(*datadir, strconv.Itoa64(server.Id)), ".main.fz_") if err != nil { return err } - _, err = io.Copy(f, r) + buf, err := proto.Marshal(fs) if err != nil { return err } - err = r.Close() + _, err = f.Write(buf) if err != nil { return err } @@ -92,48 +53,61 @@ func (server *Server) FreezeToFile(filename string) (err os.Error) { if err != nil { return err } - // Temporary non-atomic path - // We should probably use MoveFileEx instead, but I'm - // not sure whether that's atomic either. MoveFileTransacted - // would be prefered for Vista/Server 2008+. - if runtime.GOOS == "windows" { - err = os.Remove(filename + ".old") - if e, ok := err.(*os.PathError); ok { - if e.Error != os.ENOENT { - return err - } - } else if err != nil { - return err - } - err = os.Rename(filename, filename+".old") - if e, ok := err.(*os.LinkError); ok { - if e.Error != os.ENOENT { - return err - } - } else if err != nil { - return err - } - } - err = os.Rename(f.Name(), filename) + err = os.Rename(f.Name(), filepath.Join(*datadir, strconv.Itoa64(server.Id), "main.fz")) if err != nil { return err } - return + // Re-open a new log file + err = server.openFreezeLog() + if err != nil { + return err + } + + return nil } -// Freeze a server -func (server *Server) Freeze() (fs frozenServer, err os.Error) { - fs.Id = int(server.Id) - fs.Config = server.cfg.GetAll() - fs.SuperUserPassword = server.SuperUserPassword +// Open a new freeze log +func (server *Server) openFreezeLog() (err os.Error) { + logfn := filepath.Join(*datadir, strconv.Itoa64(server.Id), "log.fz") + err = os.Remove(logfn) + if pe, ok := err.(*os.PathError); ok && pe.Error == os.ENOENT { + // OK. File does not exist... + } else if err != nil { + return err + } + server.freezelog, err = freezer.NewLogFile(logfn) + if err != nil { + return err + } + return nil +} +// Freeze a server to a flattened protobuf-based structure ready to +// persist to disk. +func (server *Server) Freeze() (fs *freezer.Server, err os.Error) { + fs = new(freezer.Server) + + // Freeze all config kv-pairs + allCfg := server.cfg.GetAll() + for k, v := range allCfg { + fs.Config = append(fs.Config, &freezer.ConfigKeyValuePair{ + Key: proto.String(k), + Value: proto.String(v), + }) + } + + // Freeze all bans server.banlock.RLock() - fs.Bans = make([]ban.Ban, len(server.Bans)) - copy(fs.Bans, server.Bans) + fs.BanList = &freezer.BanList{} + fs.BanList.Bans = make([]*freezer.Ban, len(server.Bans)) + for i := 0; i < len(server.Bans); i++ { + fs.BanList.Bans[i] = FreezeBan(server.Bans[i]) + } server.banlock.RUnlock() - channels := []frozenChannel{} + // Freeze all channels + channels := []*freezer.Channel{} for _, c := range server.Channels { fc, err := c.Freeze() if err != nil { @@ -143,7 +117,8 @@ func (server *Server) Freeze() (fs frozenServer, err os.Error) { } fs.Channels = channels - users := []frozenUser{} + // Freeze all registered users + users := []*freezer.User{} for _, u := range server.Users { fu, err := u.Freeze() if err != nil { @@ -156,19 +131,67 @@ func (server *Server) Freeze() (fs frozenServer, err os.Error) { return } -// Freeze a channel -func (channel *Channel) Freeze() (fc frozenChannel, err os.Error) { - fc.Id = channel.Id - fc.Name = channel.Name - if channel.parent != nil { - fc.ParentId = channel.parent.Id - } else { - fc.ParentId = -1 - } - fc.Position = int64(channel.Position) - fc.InheritACL = channel.InheritACL +// Merge the contents of a freezer.BanList into the server's +// ban list. +func (s *Server) UnfreezeBanList(fblist *freezer.BanList) { + s.Bans = nil + for _, fb := range fblist.Bans { + ban := ban.Ban{} - acls := []frozenACL{} + ban.IP = fb.Ip + if fb.Mask != nil { + ban.Mask = int(*fb.Mask) + } + if fb.Username != nil { + ban.Username = *fb.Username + } + if fb.CertHash != nil { + ban.CertHash = *fb.CertHash + } + if fb.Reason != nil { + ban.Reason = *fb.Reason + } + if fb.Start != nil { + ban.Start = *fb.Start + } + if fb.Duration != nil { + ban.Duration = *fb.Duration + } + + s.Bans = append(s.Bans, ban) + } +} + +// Freeze a ban into a flattened protobuf-based struct +// ready to be persisted to disk. +func FreezeBan(ban ban.Ban) (fb *freezer.Ban) { + fb = new(freezer.Ban) + + fb.Ip = ban.IP + fb.Mask = proto.Uint32(uint32(ban.Mask)) + fb.Username = proto.String(ban.Username) + fb.CertHash = proto.String(ban.CertHash) + fb.Reason = proto.String(ban.Reason) + fb.Start = proto.Int64(ban.Start) + fb.Duration = proto.Uint32(ban.Duration) + return +} + +// Freeze a channel into a flattened protobuf-based struct +// ready to be persisted to disk. +func (channel *Channel) Freeze() (fc *freezer.Channel, err os.Error) { + fc = new(freezer.Channel) + + fc.Id = proto.Uint32(uint32(channel.Id)) + fc.Name = proto.String(channel.Name) + if channel.parent != nil { + fc.ParentId = proto.Uint32(uint32(channel.parent.Id)) + } + fc.Position = proto.Int64(int64(channel.Position)) + fc.InheritAcl = proto.Bool(channel.InheritACL) + + // Freeze the channel's ACLs + acls := []*freezer.ACL{} for _, acl := range channel.ACL { facl, err := acl.Freeze() if err != nil { @@ -176,9 +199,10 @@ func (channel *Channel) Freeze() (fc frozenChannel, err os.Error) { } acls = append(acls, facl) } - fc.ACL = acls + fc.Acl = acls - groups := []frozenGroup{} + // Freeze the channel's groups + groups := []*freezer.Group{} for _, grp := range channel.Groups { fgrp, err := grp.Freeze() if err != nil { @@ -188,160 +212,576 @@ func (channel *Channel) Freeze() (fc frozenChannel, err os.Error) { } fc.Groups = groups - links := []int{} + // Add linked channels + links := []uint32{} for cid, _ := range channel.Links { - links = append(links, cid) + links = append(links, uint32(cid)) } fc.Links = links - fc.DescriptionBlob = channel.DescriptionBlob + // Blobstore reference to the channel's description. + fc.DescriptionBlob = proto.String(channel.DescriptionBlob) return } -// Freeze a User -func (user *User) Freeze() (fu frozenUser, err os.Error) { - fu.Id = user.Id - fu.Name = user.Name - fu.CertHash = user.CertHash - fu.Email = user.Email - fu.TextureBlob = user.TextureBlob - fu.CommentBlob = user.CommentBlob - fu.LastChannelId = user.LastChannelId - fu.LastActive = user.LastActive - - return -} - -// Freeze a ChannelACL -func (acl *ChannelACL) Freeze() (facl frozenACL, err os.Error) { - facl.UserId = acl.UserId - facl.Group = acl.Group - facl.ApplyHere = acl.ApplyHere - facl.ApplySubs = acl.ApplySubs - facl.Allow = uint32(acl.Allow) - facl.Deny = uint32(acl.Deny) - - return -} - -// Freeze a Group -func (group *Group) Freeze() (fgrp frozenGroup, err os.Error) { - fgrp.Name = group.Name - fgrp.Inherit = group.Inherit - fgrp.Inheritable = group.Inheritable - fgrp.Add = group.AddUsers() - fgrp.Remove = group.RemoveUsers() - - return -} - -// Create a new Server from a frozen server -func NewServerFromFrozen(filename string) (s *Server, err os.Error) { - descFile, err := os.Open(filename) - if err != nil { - return nil, err +// Unfreeze unfreezes the contents of a freezer.Channel +// into a channel. +func (c *Channel) Unfreeze(fc *freezer.Channel) { + if fc.Name != nil { + c.Name = *fc.Name + } + if fc.Position != nil { + c.Position = int(*fc.Position) + } + if fc.InheritAcl != nil { + c.InheritACL = *fc.InheritAcl + } + if fc.DescriptionBlob != nil { + c.DescriptionBlob = *fc.DescriptionBlob } - defer descFile.Close() - zr, err := gzip.NewReader(descFile) + // Update ACLs + c.ACL = nil + for _, facl := range fc.Acl { + acl := NewChannelACL(c) + if facl.ApplyHere != nil { + acl.ApplyHere = *facl.ApplyHere + } + if facl.ApplySubs != nil { + acl.ApplySubs = *facl.ApplySubs + } + if facl.UserId != nil { + acl.UserId = int(*facl.UserId) + } else { + acl.UserId = -1 + } + if facl.Group != nil { + acl.Group = *facl.Group + } + if facl.Deny != nil { + acl.Deny = Permission(*facl.Deny) + } + if facl.Allow != nil { + acl.Allow = Permission(*facl.Allow) + } + c.ACL = append(c.ACL, acl) + } + + // Update groups + c.Groups = make(map[string]*Group) + for _, fgrp := range fc.Groups { + if fgrp.Name == nil { + continue + } + g := NewGroup(c, *fgrp.Name) + if fgrp.Inherit != nil { + g.Inherit = *fgrp.Inherit + } + if fgrp.Inheritable != nil { + g.Inheritable = *fgrp.Inheritable + } + for _, uid := range fgrp.Add { + g.Add[int(uid)] = true + } + for _, uid := range fgrp.Remove { + g.Remove[int(uid)] = true + } + c.Groups[g.Name] = g + } +} + +// Freeze a User into a flattened protobuf-based structure +// ready to be persisted to disk. +func (user *User) Freeze() (fu *freezer.User, err os.Error) { + fu = new(freezer.User) + + fu.Id = proto.Uint32(user.Id) + fu.Name = proto.String(user.Name) + fu.CertHash = proto.String(user.CertHash) + fu.Email = proto.String(user.Email) + fu.TextureBlob = proto.String(user.TextureBlob) + fu.CommentBlob = proto.String(user.CommentBlob) + fu.LastChannelId = proto.Uint32(uint32(user.LastChannelId)) + fu.LastActive = proto.Uint64(user.LastActive) + + return +} + +// Merge the contents of a frozen User into an existing user struct. +func (u *User) Unfreeze(fu *freezer.User) { + if fu.Name != nil { + u.Name = *fu.Name + } + if fu.CertHash != nil { + u.CertHash = *fu.CertHash + } + if fu.Email != nil { + u.Email = *fu.Email + } + if fu.TextureBlob != nil { + u.TextureBlob = *fu.TextureBlob + } + if fu.CommentBlob != nil { + u.CommentBlob = *fu.CommentBlob + } + if fu.LastChannelId != nil { + u.LastChannelId = int(*fu.LastChannelId) + } + if fu.LastActive != nil { + u.LastActive = *fu.LastActive + } +} + +// Freeze a ChannelACL into it a flattened protobuf-based structure +// ready to be persisted to disk. +func (acl *ChannelACL) Freeze() (facl *freezer.ACL, err os.Error) { + facl = new(freezer.ACL) + + if acl.UserId != -1 { + facl.UserId = proto.Uint32(uint32(acl.UserId)) + } else { + facl.Group = proto.String(acl.Group) + } + facl.ApplyHere = proto.Bool(acl.ApplyHere) + facl.ApplySubs = proto.Bool(acl.ApplySubs) + facl.Allow = proto.Uint32(uint32(acl.Allow)) + facl.Deny = proto.Uint32(uint32(acl.Deny)) + + return +} + +// Freeze a Group into a flattened protobuf-based structure +// ready to be persisted to disk. +func (group *Group) Freeze() (fgrp *freezer.Group, err os.Error) { + fgrp = new(freezer.Group) + + fgrp.Name = proto.String(group.Name) + fgrp.Inherit = proto.Bool(group.Inherit) + fgrp.Inheritable = proto.Bool(group.Inheritable) + + for _, id := range group.AddUsers() { + fgrp.Add = append(fgrp.Add, uint32(id)) + } + + for _, id := range group.RemoveUsers() { + fgrp.Remove = append(fgrp.Remove, uint32(id)) + } + + return +} + +// Create a new server from its on-disk representation. +// +// This will read a full serialized server (typically stored in +// a file called 'main.fz') from disk. It will also check for +// a log file ('log.fz') and iterate through the entries of the log +// file and apply the updates incrementally to the server. +// +// Once both the full server and the log file has been merged together +// in memory, a new full seralized server will be written and synced to +// disk, and the existing log file will be removed. +func NewServerFromFrozen(name string) (s *Server, err os.Error) { + id, err := strconv.Atoi64(name) if err != nil { return nil, err } - fs := new(frozenServer) - decoder := gob.NewDecoder(zr) - decoder.Decode(&fs) + path := filepath.Join(*datadir, name) + mainFile := filepath.Join(path, "main.fz") + logFile := filepath.Join(path, "log.fz") - s, err = NewServer(int64(fs.Id), "0.0.0.0", int(DefaultPort+fs.Id-1)) + r, err := os.Open(mainFile) + if err != nil { + return nil, err + } + defer r.Close() + + buf, err := ioutil.ReadAll(r) if err != nil { return nil, err } - if fs.Config != nil { - s.cfg = serverconf.New(fs.Config) + // Unmarshal the server from it's frozen state + fs := freezer.Server{} + err = proto.Unmarshal(buf, &fs) + if err != nil { + return nil, err } - s.SuperUserPassword = fs.SuperUserPassword - s.Bans = fs.Bans + // Create a config map from the frozen server. + cfgMap := map[string]string{} + for _, cfgEntry := range fs.Config { + if cfgEntry.Key != nil && cfgEntry.Value != nil { + cfgMap[*cfgEntry.Key] = *cfgEntry.Value + } + } + + s, err = NewServer(id, "0.0.0.0", int(DefaultPort+id-1)) + if err != nil { + return nil, err + } + s.cfg = serverconf.New(cfgMap) + + // Unfreeze the server's frozen bans. + s.UnfreezeBanList(fs.BanList) // Add all channels, but don't hook up parent/child relationships - // until all of them are loaded. + // until after we've walked the log file. No need to make it harder + // than it really is. + parents := make(map[uint32]uint32) for _, fc := range fs.Channels { - c := NewChannel(fc.Id, fc.Name) - c.Position = int(fc.Position) - c.InheritACL = fc.InheritACL - c.DescriptionBlob = fc.DescriptionBlob - - for _, facl := range fc.ACL { - acl := NewChannelACL(c) - acl.ApplyHere = facl.ApplyHere - acl.ApplySubs = facl.ApplySubs - acl.UserId = facl.UserId - acl.Group = facl.Group - acl.Deny = Permission(facl.Deny) - acl.Allow = Permission(facl.Allow) - c.ACL = append(c.ACL, acl) - } - for _, fgrp := range fc.Groups { - g := NewGroup(c, fgrp.Name) - g.Inherit = fgrp.Inherit - g.Inheritable = fgrp.Inheritable - for _, uid := range fgrp.Add { - g.Add[uid] = true - } - for _, uid := range fgrp.Remove { - g.Remove[uid] = true - } - c.Groups[g.Name] = g + // The frozen channel must contain an Id and a Name, + // since the server's frozen channels are guaranteed to + // not be deltas. + if fc.Id == nil || fc.Name == nil { + continue } - s.Channels[c.Id] = c - + // Create the channel on the server. + // Update the server's nextChanId field if it needs to be, + // to make sure the server doesn't re-use channel id's. + c := NewChannel(int(*fc.Id), *fc.Name) if c.Id >= s.nextChanId { s.nextChanId = c.Id + 1 } - } - // Hook up children with their parents. - for _, fc := range fs.Channels { - if fc.Id == 0 { - continue + // Update the channel with the contents of the freezer.Channel. + c.Unfreeze(fc) + + // Add the channel's id to the server's channel-id-map. + s.Channels[c.Id] = c + + // Mark the channel's parent + if fc.ParentId != nil { + parents[*fc.Id] = *fc.ParentId + } else { + parents[*fc.Id] = 0, false } - childChan, exists := s.Channels[fc.Id] - if !exists { - return nil, os.NewError("Non-existant child channel") - } - parentChan, exists := s.Channels[fc.ParentId] - if !exists { - return nil, os.NewError("Non-existant parent channel") - } - parentChan.AddChild(childChan) } // Add all users for _, fu := range fs.Users { - u, err := NewUser(fu.Id, fu.Name) + if fu.Id == nil && fu.Name == nil { + continue + } + u, err := NewUser(*fu.Id, *fu.Name) if err != nil { return nil, err } - - u.CertHash = fu.CertHash - u.Email = fu.Email - u.TextureBlob = fu.TextureBlob - u.CommentBlob = fu.CommentBlob - u.LastChannelId = fu.LastChannelId - u.LastActive = fu.LastActive - - s.Users[u.Id] = u if u.Id >= s.nextUserId { s.nextUserId = u.Id + 1 } + + // Merge the contents of the freezer.User into + // the user struct. + u.Unfreeze(fu) + + // Update the server's user maps to point correctly + // to the new user. + s.Users[u.Id] = u s.UserNameMap[u.Name] = u if len(u.CertHash) > 0 { s.UserCertMap[u.CertHash] = u } } + // Attempt to walk the stored log file + walker, err := freezer.NewFileWalker(logFile) + if err != nil { + return nil, err + } + + for { + values, err := walker.Next() + if err == os.EOF { + break + } else if err != nil { + return nil, err + } + + for _, val := range values { + switch val.(type) { + case *freezer.User: + fu := val.(*freezer.User) + // Check if it's a valid freezer.User message. It must at least + // have the Id field filled out for us to be able to do anything + // with it. Warn the admin if an illegal entry is encountered. + if fu.Id == nil { + log.Printf("Skipped User log entry: No id given.") + continue + } + + userId := *fu.Id + + // Determine whether the user already exists on the server or not. + // If the user already exists, this log entry simply updates the + // data for that user. + // If the user doesn't exist, we create it with the data given in + // this log entry. + user, ok := s.Users[userId] + if !ok { + // If no name is given in the log entry, skip this entry. + // Also, warn the admin. + if fu.Name == nil { + log.Printf("Skipped User creation log entry: No name given.") + continue + } + // Create the new user and increment the UserId + // counter for the server if needed. + user, err = NewUser(userId, *fu.Name) + if err != nil { + return nil, err + } + if user.Id >= s.nextUserId { + s.nextUserId = user.Id + 1 + } + } + + // Merge the contents of the frozen.User into the + // user struct. + user.Unfreeze(fu) + + // Update the various user maps in the server to + // be able to correctly look up the user. + s.Users[user.Id] = user + s.UserNameMap[user.Name] = user + if len(user.CertHash) > 0 { + s.UserCertMap[user.CertHash] = user + } + + case *freezer.UserRemove: + fu := val.(*freezer.UserRemove) + // Check for an invalid message and warn if appropriate. + if fu.Id == nil { + log.Printf("Skipped UserRemove log entry: No id given.") + continue + } + + userId := *fu.Id + + // Does this user even exist? + // Warn if we encounter an illegal delete op. + user, ok := s.Users[userId] + if ok { + // Clear the server maps. That should do it. + s.Users[userId] = nil, false + s.UserNameMap[user.Name] = nil, false + if len(user.CertHash) > 0 { + s.UserCertMap[user.CertHash] = nil, false + } + } else { + log.Printf("Skipped UserRemove log entry: No user for given id.") + continue + } + + case *freezer.Channel: + fc := val.(*freezer.Channel) + // Check whether the log entry is legal. + if fc.Id == nil { + log.Printf("Skipped Channel log entry: No id given.") + continue + } + + channelId := int(*fc.Id) + + channel, ok := s.Channels[channelId] + if !ok { + if fc.Name == nil { + log.Printf("Skipped Channel creation log entry: No name given.") + continue + } + // Add the channel and increment the server's + // nextChanId field to a consistent state. + channel = NewChannel(channelId, *fc.Name) + if channel.Id >= s.nextChanId { + s.nextChanId = channel.Id + 1 + } + } + + // Unfreeze the contents of the frozen channel + // into the existing or newly-created channel. + channel.Unfreeze(fc) + // Re-add it to the server's channel map (in case + // the channel was newly-created) + s.Channels[channelId] = channel + + // Mark the channel's parent + if fc.ParentId != nil { + parents[*fc.Id] = *fc.ParentId + } else { + parents[*fc.Id] = 0, false + } + + case *freezer.ChannelRemove: + fc := val.(*freezer.ChannelRemove) + if fc.Id == nil { + log.Printf("Skipped ChannelRemove log entry: No id given.") + continue + } + s.Channels[int(*fc.Id)] = nil + parents[*fc.Id] = 0, false + + case *freezer.BanList: + fbl := val.(*freezer.BanList) + s.UnfreezeBanList(fbl) + + case *freezer.ConfigKeyValuePair: + fcfg := val.(*freezer.ConfigKeyValuePair) + if fcfg.Key != nil { + // It's an update operation + if fcfg.Value != nil { + s.cfg.Set(*fcfg.Key, *fcfg.Value) + // It's a delete/reset operation. + } else { + s.cfg.Reset(*fcfg.Key) + } + } + } + } + } + + // Hook up children with their parents + for chanId, parentId := range parents { + childChan, exists := s.Channels[int(chanId)] + if !exists { + return nil, os.NewError("Non-existant child channel") + } + parentChan, exists := s.Channels[int(parentId)] + if !exists { + return nil, os.NewError("Non-existant parent channel") + } + parentChan.AddChild(childChan) + } + return s, nil } + +// Update the datastore with the user's current state. +func (server *Server) UpdateFrozenUser(user *User) { + fu, err := user.Freeze() + if err != nil { + server.Fatal(err) + } + err = server.freezelog.Put(fu) + if err != nil { + server.Fatal(err) + } +} + +// Mark a user as deleted in the datstore. +func (server *Server) DeleteFrozenUser(user *User) { + err := server.freezelog.Put(&freezer.UserRemove{Id: proto.Uint32(user.Id)}) + if err != nil { + server.Fatal(err) + } +} + +// Given a target channel and a ChannelState protocol message, create a freezer.Channel that +// only includes the values changed by the given ChannelState message. When done, write that +// frozen.Channel to the datastore. +func (server *Server) UpdateFrozenChannel(channel *Channel, state *mumbleproto.ChannelState) { + fc := &freezer.Channel{} + fc.Id = proto.Uint32(uint32(channel.Id)) + if state.Name != nil { + fc.Name = state.Name + } + if state.Parent != nil { + fc.ParentId = state.Parent + } + if len(state.LinksAdd) > 0 && len(state.LinksRemove) > 0 { + links := []uint32{} + for cid, _ := range channel.Links { + links = append(links, uint32(cid)) + } + fc.Links = links + } + if state.Position != nil { + fc.Position = proto.Int64(int64(*state.Position)) + } + if len(state.DescriptionHash) > 0 { + fc.DescriptionBlob = proto.String(channel.DescriptionBlob) + } + err := server.freezelog.Put(fc) + if err != nil { + server.Fatal(err) + } +} + +// Write a channel's ACL and Group data to disk. Mumble doesn't support +// incremental ACL updates and as such we must write all ACLs and groups +// to the datastore on each change. +func (server *Server) UpdateFrozenChannelACLs(channel *Channel) { + fc := &freezer.Channel{} + + fc.Id = proto.Uint32(uint32(channel.Id)) + fc.InheritAcl = proto.Bool(channel.InheritACL) + + acls := []*freezer.ACL{} + for _, acl := range channel.ACL { + facl, err := acl.Freeze() + if err != nil { + return + } + acls = append(acls, facl) + } + fc.Acl = acls + + groups := []*freezer.Group{} + for _, grp := range channel.Groups { + fgrp, err := grp.Freeze() + if err != nil { + return + } + groups = append(groups, fgrp) + } + fc.Groups = groups + + err := server.freezelog.Put(fc) + if err != nil { + server.Fatal(err) + } +} + +// Mark a channel as deleted in the datastore. +func (server *Server) DeleteFrozenChannel(channel *Channel) { + err := server.freezelog.Put(&freezer.ChannelRemove{Id: proto.Uint32(uint32(channel.Id))}) + if err != nil { + server.Fatal(err) + } +} + +// Write the server's banlist to the datastore. +func (server *Server) UpdateFrozenBans(bans []ban.Ban) { + fbl := &freezer.BanList{} + for _, ban := range server.Bans { + fbl.Bans = append(fbl.Bans, FreezeBan(ban)) + } + err := server.freezelog.Put(fbl) + if err != nil { + server.Fatal(err) + } +} + +// Write an updated config value to the datastore. +func (server *Server) UpdateConfig(key, value string) { + fcfg := &freezer.ConfigKeyValuePair{ + Key: proto.String(key), + Value: proto.String(value), + } + err := server.freezelog.Put(fcfg) + if err != nil { + server.Fatal(err) + } +} + +// Delete the config value identified by the given key from the datastore. +func (server *Server) ResetConfig(key string) { + fcfg := &freezer.ConfigKeyValuePair{ + Key: proto.String(key), + } + err := server.freezelog.Put(fcfg) + if err != nil { + server.Fatal(err) + } +} diff --git a/group.go b/group.go index dbd0a76..28c42d8 100644 --- a/group.go +++ b/group.go @@ -231,7 +231,7 @@ func GroupMemberCheck(current *Channel, aclchan *Channel, name string, client *C // Parse the groupname to extract the values we should use // for minpath (first argument), mindesc (second argument), // and maxdesc (third argument). - args := strings.Split(name, ",", 3) + args := strings.SplitN(name, ",", 3) nargs := len(args) if nargs == 3 { if len(args[2]) > 0 { diff --git a/grumble.go b/grumble.go index ed2d270..fd83952 100644 --- a/grumble.go +++ b/grumble.go @@ -5,20 +5,16 @@ package main import ( - "grumble/blobstore" "flag" "fmt" - "json" - "os" + "grumble/blobstore" "log" "net" - "sqlite" + "os" "path/filepath" "regexp" "rpc" "runtime" - "strconv" - "time" ) func defaultGrumbleDir() string { @@ -56,9 +52,6 @@ var datadir *string = flag.String("datadir", defaultDataDir(), "Directory to use var blobdir *string = flag.String("blobdir", defaultBlobDir(), "Directory to use for blob storage") var ctlnet *string = flag.String("ctlnet", defaultCtlNet(), "Network to use for ctl socket") var ctladdr *string = flag.String("ctladdr", defaultCtlAddr(), "Address to use for ctl socket") -var sqlitedb *string = flag.String("murmurdb", "", "Path to murmur.sqlite to import server structure from") -var jsonify *string = flag.String("jsonify", "", "Convert the frozen server at the specified path to JSON and output it to stdout") -var cleanup *bool = flag.Bool("clean", false, "Clean up existing data dir content before importing Murmur data") var gencert *bool = flag.Bool("gencert", false, "Generate a self-signed certificate for use with Grumble") var servers map[int64]*Server @@ -69,43 +62,6 @@ func Usage() { flag.PrintDefaults() } -func MurmurImport(filename string) (err os.Error) { - db, err := sqlite.Open(filename) - if err != nil { - panic(err.String()) - } - - stmt, err := db.Prepare("SELECT server_id FROM servers") - if err != nil { - panic(err.String()) - } - - var serverids []int64 - var sid int64 - for stmt.Next() { - stmt.Scan(&sid) - serverids = append(serverids, sid) - } - - log.Printf("Found servers: %v (%v servers)", serverids, len(serverids)) - - for _, sid := range serverids { - m, err := NewServerFromSQLite(sid, db) - if err != nil { - return err - } - - err = m.FreezeToFile(filepath.Join(*datadir, fmt.Sprintf("%v", sid))) - if err != nil { - return err - } - - log.Printf("Successfully imported server %v", sid) - } - - return -} - func main() { var err os.Error @@ -122,27 +78,6 @@ func main() { } } - // JSONify? - if len(*jsonify) > 0 { - server, err := NewServerFromFrozen(*jsonify) - if err != nil { - log.Fatalf("%v", err) - } - - frozenServer, err := server.Freeze() - if err != nil { - log.Fatalf("%v", err) - } - - enc := json.NewEncoder(os.Stdout) - err = enc.Encode(frozenServer) - if err != nil { - log.Fatalf("%v", err) - } - - return - } - log.SetPrefix("[G] ") log.Printf("Grumble") @@ -172,42 +107,6 @@ func main() { return } - // Should we import data from a Murmur SQLite file? - if len(*sqlitedb) > 0 { - f, err := os.Open(*datadir) - if err != nil { - log.Fatalf("Murmur import failed: %s", err.String()) - } - defer f.Close() - - names, err := f.Readdirnames(-1) - if err != nil { - log.Fatalf("Murmur import failed: %s", err.String()) - } - - if !*cleanup && len(names) > 0 { - log.Fatalf("Non-empty datadir. Refusing to import Murmur data.") - } - if *cleanup { - log.Printf("Cleaning up existing data directory") - for _, name := range names { - if err := os.Remove(filepath.Join(*datadir, name)); err != nil { - log.Fatalf("Unable to cleanup file: %s", name) - } - } - } - - log.Printf("Importing Murmur data from '%s'", *sqlitedb) - if err = MurmurImport(*sqlitedb); err != nil { - log.Fatalf("Murmur import failed: %s", err.String()) - } - - log.Printf("Import from Murmur SQLite database succeeded.") - log.Printf("Please restart Grumble to make use of the imported data.") - - return - } - f, err := os.Open(*datadir) if err != nil { log.Fatalf("Murmur import failed: %s", err.String()) @@ -223,27 +122,17 @@ func main() { for _, name := range names { if matched, _ := regexp.MatchString("^[0-9]+$", name); matched { log.Printf("Loading server %v", name) - s, err := NewServerFromFrozen(filepath.Join(*datadir, name)) + s, err := NewServerFromFrozen(name) if err != nil { - log.Fatalf("Unable to load server: %s", err.String()) + log.Fatalf("Unable to load server: %v", err) + } + err = s.FreezeToFile() + if err != nil { + log.Fatalf("Unable to freeze server to disk: %v", err) } servers[s.Id] = s go s.ListenAndMurmur() } - // win32 special-case - if matched, _ := regexp.MatchString("^[0-9]+.old$", name); matched { - sid, _ := strconv.Atoi64(name[0 : len(name)-4]) - _, exists := servers[sid] - if !exists { - log.Printf("Recovering lost server %v", name) - s, err := NewServerFromFrozen(filepath.Join(*datadir, name)) - if err != nil { - log.Fatalf("Unable to recover server: %S", err.String()) - } - servers[sid] = s - go s.ListenAndMurmur() - } - } } if len(servers) == 0 { @@ -253,6 +142,10 @@ func main() { } servers[s.Id] = s + + os.Mkdir(filepath.Join(*datadir, fmt.Sprintf("%v", 1)), 0750) + s.FreezeToFile() + go s.ListenAndMurmur() } @@ -269,19 +162,7 @@ func main() { go rpc.Accept(lis) if len(servers) > 0 { - ticker := time.NewTicker(10e9) // 10 secs go SignalHandler() - for { - select { - case <-ticker.C: - for sid, server := range servers { - err := server.FreezeToFile(filepath.Join(*datadir, fmt.Sprintf("%v", sid))) - if err != nil { - log.Printf("Unable to freeze server %v: %s", sid, err.String()) - continue - } - } - } - } + select {} } } diff --git a/message.go b/message.go index a720739..a8e34d3 100644 --- a/message.go +++ b/message.go @@ -180,6 +180,11 @@ func (server *Server) handleChannelRemoveMessage(client *Client, msg *Message) { return } + // Update datastore + if !channel.Temporary { + server.DeleteFrozenChannel(channel) + } + server.RemoveChannel(channel) } @@ -511,6 +516,11 @@ func (server *Server) handleChannelStateMessage(client *Client, msg *Message) { return client.Version >= 0x10202 }) } + + // Update channel in datastore + if !channel.Temporary { + server.UpdateFrozenChannel(channel, chanstate) + } } // Handle a user remove packet. This can either be a client disconnecting, or a @@ -558,6 +568,7 @@ func (server *Server) handleUserRemoveMessage(client *Client, msg *Message) { server.banlock.Lock() server.Bans = append(server.Bans, ban) + server.UpdateFrozenBans(server.Bans) server.banlock.Unlock() } @@ -820,7 +831,7 @@ func (server *Server) handleUserStateMessage(client *Client, msg *Message) { userRegistrationChanged := false if userstate.UserId != nil { - uid, err := server.RegisterClient(client) + uid, err := server.RegisterClient(target) if err != nil { client.Printf("Unable to register: %v", err) userstate.UserId = nil @@ -903,6 +914,10 @@ func (server *Server) handleUserStateMessage(client *Client, msg *Message) { server.Panic("Unable to broadcast UserState") } } + + if target.IsRegistered() { + server.UpdateFrozenUser(target.user) + } } func (server *Server) handleBanListMessage(client *Client, msg *Message) { @@ -964,6 +979,9 @@ func (server *Server) handleBanListMessage(client *Client, msg *Message) { } server.Bans = append(server.Bans, ban) } + + server.UpdateFrozenBans(server.Bans) + client.Printf("Banlist updated") } } @@ -1250,6 +1268,9 @@ func (server *Server) handleAclMessage(client *Client, msg *Message) { server.ClearACLCache() } + + // Update freezer + server.UpdateFrozenChannelACLs(channel) } } @@ -1261,6 +1282,8 @@ func (server *Server) handleQueryUsers(client *Client, msg *Message) { client.Panic(err.String()) } + server.Printf("in handleQueryUsers") + reply := &mumbleproto.QueryUsers{} for _, id := range query.Ids { @@ -1525,15 +1548,18 @@ func (server *Server) handleUserList(client *Client, msg *Message) { if uid == 0 { continue } - // De-register a user - if listUser.Name == nil { - server.RemoveRegistration(uid) - // Rename user - } else { - // todo(mkrautz): Validate name. - user, ok := server.Users[uid] - if ok { + user, ok := server.Users[uid] + if ok { + // De-register a user + if listUser.Name == nil { + server.RemoveRegistration(uid) + server.DeleteFrozenUser(user) + + // Rename user + } else { + // todo(mkrautz): Validate name. user.Name = *listUser.Name + server.UpdateFrozenUser(user) } } } diff --git a/server.go b/server.go index e4f4af6..11001b2 100644 --- a/server.go +++ b/server.go @@ -5,32 +5,30 @@ package main import ( - "log" - "crypto/tls" - "crypto/sha1" - "os" - "net" "bufio" "bytes" - "compress/gzip" "crypto/rand" + "crypto/sha1" + "crypto/tls" "encoding/binary" "encoding/hex" - "sync" - "goprotobuf.googlecode.com/hg/proto" - "mumbleproto" "fmt" - "gob" + "goprotobuf.googlecode.com/hg/proto" "grumble/ban" "grumble/blobstore" "grumble/cryptstate" + "grumble/freezer" "grumble/htmlfilter" "grumble/serverconf" "grumble/sessionpool" "hash" - "io" + "log" + "mumbleproto" + "net" + "os" "path/filepath" "strings" + "sync" "time" ) @@ -61,7 +59,8 @@ type Server struct { incoming chan *Message udpsend chan *Message voicebroadcast chan *VoiceBroadcast - freezeRequest chan *freezeRequest + freezeRequest chan bool + cfgUpdate chan *KeyValuePair // Signals to the server that a client has been successfully // authenticated. @@ -87,9 +86,6 @@ type Server struct { Channels map[int]*Channel nextChanId int - // Administration - SuperUserPassword string - // Users Users map[uint32]*User UserCertMap map[string]*User @@ -99,6 +95,9 @@ type Server struct { // Sessions pool *sessionpool.SessionPool + // Freezer + freezelog *freezer.Log + // ACL cache aclcache ACLCache @@ -123,11 +122,6 @@ func (lf clientLogForwarder) Write(incoming []byte) (int, os.Error) { return len(incoming), nil } -type freezeRequest struct { - done chan bool - readCloser io.ReadCloser -} - // Allocate a new Murmur instance func NewServer(id int64, addr string, port int) (s *Server, err os.Error) { s = new(Server) @@ -151,7 +145,8 @@ func NewServer(id int64, addr string, port int) (s *Server, err os.Error) { s.incoming = make(chan *Message) s.udpsend = make(chan *Message) s.voicebroadcast = make(chan *VoiceBroadcast) - s.freezeRequest = make(chan *freezeRequest) + s.freezeRequest = make(chan bool, 1) + s.cfgUpdate = make(chan *KeyValuePair) s.clientAuthenticated = make(chan *Client) s.Users[0], err = NewUser(0, "SuperUser") @@ -194,12 +189,15 @@ func (server *Server) SetSuperUserPassword(password string) { digest := hex.EncodeToString(hasher.Sum()) // Could be racy, but shouldn't really matter... - server.SuperUserPassword = "sha1$" + salt + "$" + digest + key := "SuperUserPassword" + val := "sha1$" + salt + "$" + digest + server.cfg.Set(key, val) + server.cfgUpdate <- &KeyValuePair{Key: key, Value: val} } // Check whether password matches the set SuperUser password. func (server *Server) CheckSuperUserPassword(password string) bool { - parts := strings.Split(server.SuperUserPassword, "$", -1) + parts := strings.Split(server.cfg.StringValue("SuperUserPassword"), "$") if len(parts) != 3 { return false } @@ -318,6 +316,7 @@ func (server *Server) AddChannel(name string) (channel *Channel) { channel = NewChannel(server.nextChanId, name) server.Channels[channel.Id] = channel server.nextChanId += 1 + return } @@ -327,6 +326,7 @@ func (server *Server) RemoveChanel(channel *Channel) { server.Printf("Attempted to remove root channel.") return } + server.Channels[channel.Id] = nil, false } @@ -373,12 +373,15 @@ func (server *Server) handler() { server.finishAuthenticate(client) // Synchonized freeze requests - case req := <-server.freezeRequest: - fs, err := server.Freeze() + case <-server.freezeRequest: + err := server.FreezeToFile() if err != nil { - server.Panicf("Unable to freeze the server") + server.Fatal(err) } - go server.handleFreezeRequest(req, &fs) + + // Disk freeze config update + case kvp := <-server.cfgUpdate: + server.UpdateConfig(kvp.Key, kvp.Value) // Server registration update // Tick every hour + a minute offset based on the server id. @@ -388,33 +391,6 @@ func (server *Server) handler() { } } -func (server *Server) handleFreezeRequest(freq *freezeRequest, fs *frozenServer) { - pr, pw := io.Pipe() - - freq.readCloser = pr - freq.done <- true - - zw, err := gzip.NewWriterLevel(pw, gzip.BestCompression) - if err != nil { - if err = pw.CloseWithError(err); err != nil { - server.Panicf("Unable to close PipeWriter: %v", err.String()) - } - return - } - - enc := gob.NewEncoder(zw) - err = enc.Encode(fs) - if err != nil { - if err = pw.CloseWithError(err); err != nil { - server.Panicf("Unable to close PipeWriter: %v", err.String()) - } - } - - if err = pw.CloseWithError(zw.Close()); err != nil { - server.Panicf("Unable to close PipeWriter: %v", err.String()) - } -} - // Handle an Authenticate protobuf message. This is handled in a separate // goroutine to allow for remote authenticators that are slow to respond. // @@ -1035,26 +1011,6 @@ func (server *Server) userEnterChannel(client *Client, channel *Channel, usersta } } -// Create a point-in-time snapshot of Server and make it -// accessible through the returned io.ReadCloser. -func (s *Server) FreezeServer() io.ReadCloser { - if !s.running { - fs, err := s.Freeze() - if err != nil { - s.Panicf("Unable to freeze the server") - } - fr := &freezeRequest{done: make(chan bool)} - go s.handleFreezeRequest(fr, &fs) - <-fr.done - return fr.readCloser - } - - fr := &freezeRequest{done: make(chan bool)} - s.freezeRequest <- fr - <-fr.done - return fr.readCloser -} - // Register a client on the server. func (s *Server) RegisterClient(client *Client) (uid uint32, err os.Error) { // Increment nextUserId only if registration succeeded. @@ -1248,6 +1204,12 @@ func (s *Server) ListenAndMurmur() { s.Printf("Started: listening on %v", tl.Addr()) + // Open a fresh freezer log + err = s.openFreezeLog() + if err != nil { + s.Fatal(err) + } + // Update server registration if needed. go func() { time.Sleep((60 + s.Id*10) * 1e9) diff --git a/signal_unix.go b/signal_unix.go index 6f51ff7..5d0dff2 100644 --- a/signal_unix.go +++ b/signal_unix.go @@ -5,11 +5,8 @@ package main import ( - "fmt" - "log" "os" "os/signal" - "path/filepath" ) func SignalHandler() { @@ -18,15 +15,6 @@ func SignalHandler() { if sig != os.SIGINT && sig != os.SIGTERM { continue } - for sid, s := range servers { - err := s.FreezeToFile(filepath.Join(*datadir, fmt.Sprintf("%v", sid))) - if err != nil { - log.Printf("Unable to freeze server %v: %s", sid, err.String()) - continue - } - log.Printf("Server %v frozen", sid) - } - os.Exit(0) } }