Hook blobstore into Grumble.

This commit is contained in:
Mikkel Krautz 2011-04-11 17:55:11 +02:00
parent b3fec9315a
commit ed602e9d8c
8 changed files with 159 additions and 72 deletions

View file

@ -24,7 +24,7 @@ LDFLAGS = \
-Lpkg/cryptstate/_obj \ -Lpkg/cryptstate/_obj \
-Lpkg/packetdatastream/_obj \ -Lpkg/packetdatastream/_obj \
-Lpkg/mumbleproto/_obj \ -Lpkg/mumbleproto/_obj \
-Ipkg/blobstore/_obj \ -Lpkg/blobstore/_obj \
-Lpkg/sqlite/_obj -Lpkg/sqlite/_obj
GOFILES = \ GOFILES = \

View file

@ -4,6 +4,10 @@
package main package main
import (
"encoding/hex"
)
// A Mumble channel // A Mumble channel
type Channel struct { type Channel struct {
Id int Id int
@ -27,8 +31,7 @@ type Channel struct {
Links map[int]*Channel Links map[int]*Channel
// Blobs // Blobs
Description string DescriptionBlob string
DescriptionHash []byte
} }
func NewChannel(id int, name string) (channel *Channel) { func NewChannel(id int, name string) (channel *Channel) {
@ -65,3 +68,18 @@ func (channel *Channel) RemoveClient(client *Client) {
channel.clients[client.Session] = nil, false channel.clients[client.Session] = nil, false
client.Channel = nil client.Channel = nil
} }
// Does the channel have a description?
func (channel *Channel) HasDescription() bool {
return len(channel.DescriptionBlob) > 0
}
// Get the channel's blob hash as a byte slice for sending via a protobuf message.
// Returns nil if there is no blob.
func (channel *Channel) DescriptionBlobHashBytes() (buf []byte) {
buf, err := hex.DecodeString(channel.DescriptionBlob)
if err != nil {
return nil
}
return buf
}

View file

@ -60,12 +60,6 @@ type Client struct {
OSName string OSName string
OSVersion string OSVersion string
// Blobs
Comment string
CommentHash []byte
Texture []byte
TextureHash []byte
// Personal // Personal
Username string Username string
Session uint32 Session uint32
@ -527,17 +521,26 @@ func (client *Client) sendChannelList() {
} }
func (client *Client) sendChannelTree(channel *Channel) { func (client *Client) sendChannelTree(channel *Channel) {
// Start at the root channel.
log.Printf("sending channel ID=%i, NAME=%s", channel.Id, channel.Name)
chanstate := &mumbleproto.ChannelState{ chanstate := &mumbleproto.ChannelState{
ChannelId: proto.Uint32(uint32(channel.Id)), ChannelId: proto.Uint32(uint32(channel.Id)),
Name: proto.String(channel.Name), Name: proto.String(channel.Name),
Description: proto.String(channel.Description),
} }
if channel.parent != nil { if channel.parent != nil {
chanstate.Parent = proto.Uint32(uint32(channel.parent.Id)) chanstate.Parent = proto.Uint32(uint32(channel.parent.Id))
} }
if channel.HasDescription() {
if client.Version >= 0x10202 {
chanstate.DescriptionHash = channel.DescriptionBlobHashBytes()
} else {
buf, err := globalBlobstore.Get(channel.DescriptionBlob)
if err != nil {
panic("Blobstore error.")
}
chanstate.Description = proto.String(string(buf))
}
}
err := client.sendProtoMessage(MessageChannelState, chanstate) err := client.sendProtoMessage(MessageChannelState, chanstate)
if err != nil { if err != nil {
client.Panic(err.String()) client.Panic(err.String())

View file

@ -195,7 +195,7 @@ func NewServerFromFrozen(filename string) (s *Server, err os.Error) {
c := NewChannel(fc.Id, fc.Name) c := NewChannel(fc.Id, fc.Name)
c.Position = int(fc.Position) c.Position = int(fc.Position)
c.InheritACL = fc.InheritACL c.InheritACL = fc.InheritACL
c.DescriptionHash = []byte{} // fixme c.DescriptionBlob = fc.DescriptionBlob
for _, facl := range fc.ACL { for _, facl := range fc.ACL {
acl := NewChannelACL(c) acl := NewChannelACL(c)

View file

@ -5,6 +5,7 @@
package main package main
import ( import (
"blobstore"
"compress/gzip" "compress/gzip"
"flag" "flag"
"fmt" "fmt"
@ -23,6 +24,8 @@ var blobdir *string = flag.String("blobdir", "", "Directory to use for blob stor
var sqlitedb *string = flag.String("murmurdb", "", "Path to murmur.sqlite to import server structure from") var sqlitedb *string = flag.String("murmurdb", "", "Path to murmur.sqlite to import server structure from")
var cleanup *bool = flag.Bool("clean", false, "Clean up existing data dir content before importing Murmur data") var cleanup *bool = flag.Bool("clean", false, "Clean up existing data dir content before importing Murmur data")
var globalBlobstore *blobstore.BlobStore
func Usage() { func Usage() {
fmt.Fprintf(os.Stderr, "usage: grumble [options]\n") fmt.Fprintf(os.Stderr, "usage: grumble [options]\n")
flag.PrintDefaults() flag.PrintDefaults()
@ -82,6 +85,7 @@ func MurmurImport(filename string) (err os.Error) {
} }
func main() { func main() {
var err os.Error
flag.Parse() flag.Parse()
if *help == true { if *help == true {
Usage() Usage()
@ -98,7 +102,12 @@ func main() {
if len(*blobdir) == 0 { if len(*blobdir) == 0 {
*blobdir = filepath.Join(os.Getenv("HOME"), ".grumble", "blob") *blobdir = filepath.Join(os.Getenv("HOME"), ".grumble", "blob")
} }
log.Printf("Using blob directory: %s", *blobdir) log.Printf("Using blob directory: %s", *blobdir)
globalBlobstore, err = blobstore.NewBlobStore(*blobdir, true)
if err != nil {
log.Fatalf("Unable to initialize blobstore: %v", err.String())
}
// Should we import data from a Murmur SQLite file? // Should we import data from a Murmur SQLite file?
if len(*sqlitedb) > 0 { if len(*sqlitedb) > 0 {

View file

@ -9,7 +9,6 @@ import (
"mumbleproto" "mumbleproto"
"goprotobuf.googlecode.com/hg/proto" "goprotobuf.googlecode.com/hg/proto"
"net" "net"
"crypto/sha1"
"cryptstate" "cryptstate"
"fmt" "fmt"
) )
@ -234,22 +233,21 @@ func (server *Server) handleChannelStateMessage(client *Client, msg *Message) {
return return
} }
key := ""
if len(description) > 0 {
key, err = globalBlobstore.Put([]byte(description))
if err != nil {
log.Panicf("Blobstore error: %v", err.String())
}
}
// Add the new channel // Add the new channel
channel = server.AddChannel(name) channel = server.AddChannel(name)
channel.Description = description channel.DescriptionBlob = key
channel.Temporary = *chanstate.Temporary channel.Temporary = *chanstate.Temporary
channel.Position = int(*chanstate.Position) channel.Position = int(*chanstate.Position)
parent.AddChild(channel) parent.AddChild(channel)
// Generate description hash.
if len(channel.Description) >= 128 {
hash := sha1.New()
hash.Write([]byte(channel.Description))
channel.DescriptionHash = hash.Sum()
} else {
channel.DescriptionHash = []byte{}
}
// Add the creator to the channel's admin group // Add the creator to the channel's admin group
if client.IsRegistered() { if client.IsRegistered() {
grp := NewGroup(channel, "admin") grp := NewGroup(channel, "admin")
@ -282,10 +280,11 @@ func (server *Server) handleChannelStateMessage(client *Client, msg *Message) {
server.broadcastProtoMessageWithPredicate(MessageChannelState, chanstate, func(client *Client) bool { server.broadcastProtoMessageWithPredicate(MessageChannelState, chanstate, func(client *Client) bool {
return client.Version < 0x10202 return client.Version < 0x10202
}) })
// Remove description if client knows how to handle blobs. // Remove description if client knows how to handle blobs.
if len(channel.DescriptionHash) > 0 { if chanstate.Description != nil && channel.HasDescription() {
chanstate.Description = nil chanstate.Description = nil
chanstate.DescriptionHash = channel.DescriptionHash chanstate.DescriptionHash = channel.DescriptionBlobHashBytes()
} }
server.broadcastProtoMessageWithPredicate(MessageChannelState, chanstate, func(client *Client) bool { server.broadcastProtoMessageWithPredicate(MessageChannelState, chanstate, func(client *Client) bool {
return client.Version >= 0x10202 return client.Version >= 0x10202
@ -415,14 +414,11 @@ func (server *Server) handleChannelStateMessage(client *Client, msg *Message) {
// Description change // Description change
if chanstate.Description != nil { if chanstate.Description != nil {
// Generate description hash. key, err := globalBlobstore.Put([]byte(*chanstate.Description))
if len(channel.Description) >= 128 { if err != nil {
hash := sha1.New() log.Panicf("Blobstore error: %v", err.String())
hash.Write([]byte(channel.Description))
channel.DescriptionHash = hash.Sum()
} else {
channel.DescriptionHash = []byte{}
} }
channel.DescriptionBlob = key
} }
// Position change // Position change
@ -444,11 +440,13 @@ func (server *Server) handleChannelStateMessage(client *Client, msg *Message) {
server.broadcastProtoMessageWithPredicate(MessageChannelState, chanstate, func(client *Client) bool { server.broadcastProtoMessageWithPredicate(MessageChannelState, chanstate, func(client *Client) bool {
return client.Version < 0x10202 return client.Version < 0x10202
}) })
// Remove description blob when sending to 1.2.2 >= users. Only send the blob hash. // Remove description blob when sending to 1.2.2 >= users. Only send the blob hash.
if chanstate.Description != nil && len(channel.DescriptionHash) > 0 { if channel.HasDescription() {
chanstate.Description = nil chanstate.Description = nil
chanstate.DescriptionHash = channel.DescriptionHash chanstate.DescriptionHash = channel.DescriptionBlobHashBytes()
} }
chanstate.DescriptionHash = channel.DescriptionBlobHashBytes()
server.broadcastProtoMessageWithPredicate(MessageChannelState, chanstate, func(client *Client) bool { server.broadcastProtoMessageWithPredicate(MessageChannelState, chanstate, func(client *Client) bool {
return client.Version >= 0x10202 return client.Version >= 0x10202
}) })
@ -591,13 +589,7 @@ func (server *Server) handleUserStateMessage(client *Client, msg *Message) {
} }
} }
// Check if the text is allowed. // todo(mkrautz): Check if the text is allowed.
// Only set the comment if it is different from the current
// user comment.
if comment == target.Comment {
userstate.Comment = nil
}
} }
// Texture change // Texture change
@ -646,14 +638,17 @@ func (server *Server) handleUserStateMessage(client *Client, msg *Message) {
broadcast := false broadcast := false
if userstate.Texture != nil { if userstate.Texture != nil {
target.Texture = userstate.Texture key, err := globalBlobstore.Put(userstate.Texture)
if len(target.Texture) >= 128 { if err != nil {
hash := sha1.New() log.Panicf("Blobstore error: %v", err.String())
hash.Write(target.Texture)
target.TextureHash = hash.Sum()
} else {
target.TextureHash = []byte{}
} }
if target.user.TextureBlob != key {
target.user.TextureBlob = key
} else {
userstate.Texture = nil
}
broadcast = true broadcast = true
} }
@ -683,14 +678,17 @@ func (server *Server) handleUserStateMessage(client *Client, msg *Message) {
} }
if userstate.Comment != nil { if userstate.Comment != nil {
target.Comment = *userstate.Comment key, err := globalBlobstore.Put([]byte(*userstate.Comment))
if len(target.Comment) >= 128 { if err != nil {
hash := sha1.New() log.Panicf("Blobstore error: %v", err.String())
hash.Write([]byte(target.Comment))
target.CommentHash = hash.Sum()
} else {
target.CommentHash = []byte{}
} }
if target.user.CommentBlob != key {
target.user.CommentBlob = key
} else {
userstate.Comment = nil
}
broadcast = true broadcast = true
} }
@ -754,11 +752,12 @@ func (server *Server) handleUserStateMessage(client *Client, msg *Message) {
// Mumble and Murmur used qCompress and qUncompress from Qt to compress // Mumble and Murmur used qCompress and qUncompress from Qt to compress
// textures that were sent over the wire. We can use this to determine // textures that were sent over the wire. We can use this to determine
// whether a texture is a "new style" or an "old style" texture. // whether a texture is a "new style" or an "old style" texture.
texture := userstate.Texture
texlen := uint32(0) texlen := uint32(0)
if target.Texture != nil && len(target.Texture) > 4 { if texture != nil && len(texture) > 4 {
texlen = uint32(target.Texture[0])<<24 | uint32(target.Texture[1])<<16 | uint32(target.Texture[2])<<8 | uint32(target.Texture[3]) texlen = uint32(texture[0])<<24 | uint32(texture[1])<<16 | uint32(texture[2])<<8 | uint32(texture[3])
} }
if userstate.Texture != nil && len(target.Texture) > 4 && texlen != 600*60*4 { if texture != nil && len(texture) > 4 && texlen != 600*60*4 {
// The sent texture is a new-style texture. Strip it from the message // The sent texture is a new-style texture. Strip it from the message
// we send to pre-1.2.2 clients. // we send to pre-1.2.2 clients.
userstate.Texture = nil userstate.Texture = nil
@ -769,7 +768,7 @@ func (server *Server) handleUserStateMessage(client *Client, msg *Message) {
log.Panic("Unable to broadcast UserState") log.Panic("Unable to broadcast UserState")
} }
// Re-add it to the message, so that 1.2.2+ clients *do* get the new-style texture. // Re-add it to the message, so that 1.2.2+ clients *do* get the new-style texture.
userstate.Texture = target.Texture userstate.Texture = texture
} else { } else {
// Old style texture. We can send the message as-is. // Old style texture. We can send the message as-is.
err := server.broadcastProtoMessageWithPredicate(MessageUserState, userstate, func(client *Client) bool { err := server.broadcastProtoMessageWithPredicate(MessageUserState, userstate, func(client *Client) bool {
@ -783,14 +782,15 @@ func (server *Server) handleUserStateMessage(client *Client, msg *Message) {
// If a texture hash is set on user, we transmit that instead of // If a texture hash is set on user, we transmit that instead of
// the texture itself. This allows the client to intelligently fetch // the texture itself. This allows the client to intelligently fetch
// the blobs that it does not already have in its local storage. // the blobs that it does not already have in its local storage.
if userstate != nil && len(target.TextureHash) > 0 { if userstate.Texture != nil && target.user.HasTexture() {
userstate.Texture = nil userstate.Texture = nil
userstate.TextureHash = target.TextureHash userstate.TextureHash = target.user.TextureBlobHashBytes()
} }
// Ditto for comments. // Ditto for comments.
if userstate.Comment != nil && len(target.CommentHash) > 0 { if userstate.Comment != nil && target.user.HasComment() {
userstate.Comment = nil userstate.Comment = nil
userstate.CommentHash = target.CommentHash userstate.CommentHash = target.user.CommentBlobHashBytes()
} }
err := server.broadcastProtoMessageWithPredicate(MessageUserState, userstate, func(client *Client) bool { err := server.broadcastProtoMessageWithPredicate(MessageUserState, userstate, func(client *Client) bool {
@ -1122,10 +1122,14 @@ func (server *Server) handleRequestBlob(client *Client, msg *Message) {
if len(blobreq.SessionTexture) > 0 { if len(blobreq.SessionTexture) > 0 {
for _, sid := range blobreq.SessionTexture { for _, sid := range blobreq.SessionTexture {
if target, ok := server.clients[sid]; ok { if target, ok := server.clients[sid]; ok {
if len(target.Texture) > 0 { if target.user.HasTexture() {
buf, err := globalBlobstore.Get(target.user.TextureBlob)
if err != nil {
log.Panicf("Blobstore error: %v", err.String())
}
userstate.Reset() userstate.Reset()
userstate.Session = proto.Uint32(uint32(target.Session)) userstate.Session = proto.Uint32(uint32(target.Session))
userstate.Texture = target.Texture userstate.Texture = buf
if err := client.sendProtoMessage(MessageUserState, userstate); err != nil { if err := client.sendProtoMessage(MessageUserState, userstate); err != nil {
client.Panic(err.String()) client.Panic(err.String())
return return
@ -1139,10 +1143,14 @@ func (server *Server) handleRequestBlob(client *Client, msg *Message) {
if len(blobreq.SessionComment) > 0 { if len(blobreq.SessionComment) > 0 {
for _, sid := range blobreq.SessionComment { for _, sid := range blobreq.SessionComment {
if target, ok := server.clients[sid]; ok { if target, ok := server.clients[sid]; ok {
if len(target.Comment) > 0 { if target.user.HasComment() {
buf, err := globalBlobstore.Get(target.user.CommentBlob)
if err != nil {
log.Panicf("Blobstore error: %v", err.String())
}
userstate.Reset() userstate.Reset()
userstate.Session = proto.Uint32(uint32(target.Session)) userstate.Session = proto.Uint32(uint32(target.Session))
userstate.Comment = proto.String(target.Comment) userstate.Comment = proto.String(string(buf))
if err := client.sendProtoMessage(MessageUserState, userstate); err != nil { if err := client.sendProtoMessage(MessageUserState, userstate); err != nil {
client.Panic(err.String()) client.Panic(err.String())
return return
@ -1158,10 +1166,14 @@ func (server *Server) handleRequestBlob(client *Client, msg *Message) {
if len(blobreq.ChannelDescription) > 0 { if len(blobreq.ChannelDescription) > 0 {
for _, cid := range blobreq.ChannelDescription { for _, cid := range blobreq.ChannelDescription {
if channel, ok := server.Channels[int(cid)]; ok { if channel, ok := server.Channels[int(cid)]; ok {
if len(channel.Description) > 0 { if channel.HasDescription() {
chanstate.Reset() chanstate.Reset()
buf, err := globalBlobstore.Get(channel.DescriptionBlob)
if err != nil {
log.Panicf("Blobstore error: %v", err.String())
}
chanstate.ChannelId = proto.Uint32(uint32(channel.Id)) chanstate.ChannelId = proto.Uint32(uint32(channel.Id))
chanstate.Description = proto.String(channel.Description) chanstate.Description = proto.String(string(buf))
if err := client.sendProtoMessage(MessageChannelState, chanstate); err != nil { if err := client.sendProtoMessage(MessageChannelState, chanstate); err != nil {
client.Panic(err.String()) client.Panic(err.String())
return return

View file

@ -105,7 +105,11 @@ func populateChannelsFromDatabase(server *Server, db *sqlite.Conn, parentId int)
return err return err
} }
c.Description = description key, err := globalBlobstore.Put([]byte(description))
if err != nil {
return err
}
c.DescriptionBlob = key
} }
if err := stmt.Reset(); err != nil { if err := stmt.Reset(); err != nil {
@ -315,6 +319,12 @@ func populateUsers(server *Server, db *sqlite.Conn) (err os.Error) {
return err return err
} }
key, err := globalBlobstore.Put(Texture)
if err != nil {
return err
}
user.TextureBlob = key
user.LastActive = uint64(LastActive) user.LastActive = uint64(LastActive)
user.LastChannelId = LastChannel user.LastChannelId = LastChannel
@ -353,7 +363,11 @@ func populateUsers(server *Server, db *sqlite.Conn) (err os.Error) {
case UserInfoEmail: case UserInfoEmail:
user.Email = Value user.Email = Value
case UserInfoComment: case UserInfoComment:
// unhandled key, err := globalBlobstore.Put([]byte(Value))
if err != nil {
return err
}
user.CommentBlob = key
case UserInfoHash: case UserInfoHash:
user.CertHash = Value user.CertHash = Value
case UserInfoLastActive: case UserInfoLastActive:

31
user.go
View file

@ -5,6 +5,7 @@
package main package main
import ( import (
"encoding/hex"
"os" "os"
) )
@ -37,3 +38,33 @@ func NewUser(id uint32, name string) (user *User, err os.Error) {
Name: name, Name: name,
},nil },nil
} }
// Does the channel have comment?
func (user *User) HasComment() bool {
return len(user.CommentBlob) > 0
}
// Get the hash of the user's comment blob as a byte slice for transmitting via a protobuf message.
// Returns nil if there is no such blob.
func (user *User) CommentBlobHashBytes() (buf []byte) {
buf, err := hex.DecodeString(user.CommentBlob)
if err != nil {
return nil
}
return buf
}
// Does the user have a texture?
func (user *User) HasTexture() bool {
return len(user.TextureBlob) > 0
}
// Get the hash of the user's texture blob as a byte slice for transmitting via a protobuf message.
// Returns nil if there is no such blob.
func (user *User) TextureBlobHashBytes() (buf []byte) {
buf, err := hex.DecodeString(user.TextureBlob)
if err != nil {
return nil
}
return buf
}