From 875cc89b9eee52e3a40676b319717e56731eeea6 Mon Sep 17 00:00:00 2001 From: Mikkel Krautz Date: Fri, 11 Nov 2011 21:08:32 +0100 Subject: [PATCH] Get rid of Client's sender goroutine. --- client.go | 128 +++++++++++++-------------------------- message.go | 24 ++++---- pkg/mumbleproto/types.go | 3 +- server.go | 18 +++--- 4 files changed, 62 insertions(+), 111 deletions(-) diff --git a/client.go b/client.go index 495902e..f308a03 100644 --- a/client.go +++ b/client.go @@ -6,6 +6,7 @@ package main import ( "bufio" + "bytes" "encoding/binary" "goprotobuf.googlecode.com/hg/proto" "grumble/blobstore" @@ -29,13 +30,10 @@ type Client struct { udpaddr *net.UDPAddr conn net.Conn reader *bufio.Reader - writer *bufio.Writer state int server *Server - msgchan chan *Message udprecv chan []byte - doneSending chan bool disconnected bool @@ -157,13 +155,6 @@ func (client *Client) disconnect(kicked bool) { close(client.clientReady) } - // Cleanly shut down the sender goroutine. This should be non-blocking - // since we're writing to a bufio.Writer. - // todo(mkrautz): Check whether that's the case? We do a flush, so maybe not. - client.msgchan <- nil - <-client.doneSending - close(client.msgchan) - client.Printf("Disconnected") client.conn.Close() } @@ -186,7 +177,7 @@ func (client *Client) RejectAuth(rejectType mumbleproto.Reject_RejectType, reaso reasonString = proto.String(reason) } - client.sendProtoMessage(&mumbleproto.Reject{ + client.sendMessage(&mumbleproto.Reject{ Type: mumbleproto.NewReject_RejectType(rejectType), Reason: reasonString, }) @@ -228,21 +219,6 @@ func (client *Client) readProtoMessage() (msg *Message, err error) { return } -// Send a protobuf-encoded message -func (c *Client) sendProtoMessage(msg interface{}) (err error) { - d, err := proto.Marshal(msg) - if err != nil { - return - } - - c.msgchan <- &Message{ - buf: d, - kind: mumbleproto.MessageType(msg), - } - - return -} - // Send permission denied by type func (c *Client) sendPermissionDeniedType(denyType mumbleproto.PermissionDenied_DenyType) { c.sendPermissionDeniedTypeUser(denyType, nil) @@ -256,31 +232,25 @@ func (c *Client) sendPermissionDeniedTypeUser(denyType mumbleproto.PermissionDen if user != nil { pd.Session = proto.Uint32(uint32(user.Session)) } - d, err := proto.Marshal(pd) + err := c.sendMessage(pd) if err != nil { - c.Panicf("%v", err) + c.Panicf("%v", err.Error()) return } - c.msgchan <- &Message{ - buf: d, - kind: mumbleproto.MessagePermissionDenied, - } } // Send permission denied by who, what, where func (c *Client) sendPermissionDenied(who *Client, where *Channel, what Permission) { - d, err := proto.Marshal(&mumbleproto.PermissionDenied{ + pd := &mumbleproto.PermissionDenied{ Permission: proto.Uint32(uint32(what)), ChannelId: proto.Uint32(uint32(where.Id)), Session: proto.Uint32(who.Session), Type: mumbleproto.NewPermissionDenied_DenyType(mumbleproto.PermissionDenied_Permission), - }) - if err != nil { - c.Panicf(err.Error()) } - c.msgchan <- &Message{ - buf: d, - kind: mumbleproto.MessagePermissionDenied, + err := c.sendMessage(pd) + if err != nil { + c.Panicf("%v", err.Error()) + return } } @@ -357,9 +327,7 @@ func (client *Client) sendUdp(msg *Message) { client.Printf("Sent UDP!") client.server.udpsend <- msg } else { - client.Printf("Sent TCP!") - msg.kind = mumbleproto.MessageUDPTunnel - client.msgchan <- msg + client.sendMessage(msg.buf) } } @@ -369,27 +337,38 @@ func (client *Client) sendUdp(msg *Message) { // This method should only be called from within the client's own // sender goroutine, since it serializes access to the underlying // buffered writer. -func (client *Client) sendMessage(msg *Message) error { - // Write message kind - err := binary.Write(client.writer, binary.BigEndian, msg.kind) +func (client *Client) sendMessage(msg interface{}) error { + buf := new(bytes.Buffer) + var ( + kind uint16 + msgData []byte + err error + ) + + kind = mumbleproto.MessageType(msg) + if kind == mumbleproto.MessageUDPTunnel { + msgData = msg.([]byte) + } else { + msgData, err = proto.Marshal(msg) + if err != nil { + return err + } + } + + err = binary.Write(buf, binary.BigEndian, kind) + if err != nil { + return err + } + err = binary.Write(buf, binary.BigEndian, uint32(len(msgData))) + if err != nil { + return err + } + _, err = buf.Write(msgData) if err != nil { return err } - // Message length - err = binary.Write(client.writer, binary.BigEndian, uint32(len(msg.buf))) - if err != nil { - return err - } - - // Message buffer itself - _, err = client.writer.Write(msg.buf) - if err != nil { - return err - } - - // Flush it, no need to keep it in the buffer for any longer. - err = client.writer.Flush() + _, err = client.conn.Write(buf.Bytes()) if err != nil { return err } @@ -397,31 +376,6 @@ func (client *Client) sendMessage(msg *Message) error { return nil } -// Sender Goroutine. The sender goroutine will initiate shutdown -// if it receives a nil Message. -// -// On shutdown, it will send a true boolean value on the client's -// doneSending channel. This allows the client to send all the messages -// that remain in it's buffer when the server has to force a disconnect. -func (client *Client) sender() { - defer func() { - client.doneSending <- true - }() - - for msg := range client.msgchan { - if msg == nil { - return - } - - err := client.sendMessage(msg) - if err != nil { - // fixme(mkrautz): This is a deadlock waiting to happen. - client.Panicf("Unable to send message to client") - return - } - } -} - // Receiver Goroutine func (client *Client) receiver() { for { @@ -480,7 +434,7 @@ func (client *Client) receiver() { // information we must send it our version information so it knows // what version of the protocol it should speak. if client.state == StateClientConnected { - client.sendProtoMessage(&mumbleproto.Version{ + client.sendMessage(&mumbleproto.Version{ Version: proto.Uint32(0x10203), Release: proto.String("Grumble"), }) @@ -570,7 +524,7 @@ func (client *Client) sendChannelTree(channel *Channel) { } chanstate.Links = links - err := client.sendProtoMessage(chanstate) + err := client.sendMessage(chanstate) if err != nil { client.Panicf("%v", err) } @@ -588,7 +542,7 @@ func (client *Client) cryptResync() { if requestElapsed > 5 { client.lastResync = time.Seconds() cryptsetup := &mumbleproto.CryptSetup{} - err := client.sendProtoMessage(cryptsetup) + err := client.sendMessage(cryptsetup) if err != nil { client.Panicf("%v", err) } diff --git a/message.go b/message.go index a120f02..04d46e7 100644 --- a/message.go +++ b/message.go @@ -57,7 +57,7 @@ func (server *Server) handleCryptSetup(client *Client, msg *Message) { if copy(cs.ClientNonce, client.crypt.EncryptIV[0:]) != aes.BlockSize { return } - client.sendProtoMessage(cs) + client.sendMessage(cs) } else { client.Printf("Received client nonce") if len(cs.ClientNonce) != aes.BlockSize { @@ -113,7 +113,7 @@ func (server *Server) handlePingMessage(client *Client, msg *Message) { client.TcpPackets = *ping.TcpPackets } - client.sendProtoMessage(&mumbleproto.Ping{ + client.sendMessage(&mumbleproto.Ping{ Timestamp: ping.Timestamp, Good: proto.Uint32(uint32(client.crypt.Good)), Late: proto.Uint32(uint32(client.crypt.Late)), @@ -917,7 +917,7 @@ func (server *Server) handleBanListMessage(client *Client, msg *Message) { entry.Duration = proto.Uint32(ban.Duration) banlist.Bans = append(banlist.Bans, entry) } - if err := client.sendProtoMessage(banlist); err != nil { + if err := client.sendMessage(banlist); err != nil { client.Panic("Unable to send BanList") } } else { @@ -1017,7 +1017,7 @@ func (server *Server) handleTextMessage(client *Client, msg *Message) { delete(clients, client.Session) for _, target := range clients { - target.sendProtoMessage(&mumbleproto.TextMessage{ + target.sendMessage(&mumbleproto.TextMessage{ Actor: proto.Uint32(client.Session), Message: txtmsg.Message, }) @@ -1147,7 +1147,7 @@ func (server *Server) handleAclMessage(client *Client, msg *Message) { reply.Groups = append(reply.Groups, mpgroup) } - if err := client.sendProtoMessage(reply); err != nil { + if err := client.sendMessage(reply); err != nil { client.Panic(err) return } @@ -1164,7 +1164,7 @@ func (server *Server) handleAclMessage(client *Client, msg *Message) { queryusers.Names = append(queryusers.Names, user.Name) } if len(queryusers.Ids) > 0 { - client.sendProtoMessage(queryusers) + client.sendMessage(queryusers) } // Set new groups and ACLs @@ -1272,7 +1272,7 @@ func (server *Server) handleQueryUsers(client *Client, msg *Message) { } } - if err := client.sendProtoMessage(reply); err != nil { + if err := client.sendMessage(reply); err != nil { client.Panic(err) return } @@ -1377,7 +1377,7 @@ func (server *Server) handleUserStatsMessage(client *Client, msg *Message) { // fixme(mkrautz): we don't do bandwidth tracking yet - if err := client.sendProtoMessage(stats); err != nil { + if err := client.sendMessage(stats); err != nil { client.Panic(err) return } @@ -1427,7 +1427,7 @@ func (server *Server) handleRequestBlob(client *Client, msg *Message) { userstate.Reset() userstate.Session = proto.Uint32(uint32(target.Session)) userstate.Texture = buf - if err := client.sendProtoMessage(userstate); err != nil { + if err := client.sendMessage(userstate); err != nil { client.Panic(err) return } @@ -1452,7 +1452,7 @@ func (server *Server) handleRequestBlob(client *Client, msg *Message) { userstate.Reset() userstate.Session = proto.Uint32(uint32(target.Session)) userstate.Comment = proto.String(string(buf)) - if err := client.sendProtoMessage(userstate); err != nil { + if err := client.sendMessage(userstate); err != nil { client.Panic(err) return } @@ -1476,7 +1476,7 @@ func (server *Server) handleRequestBlob(client *Client, msg *Message) { } chanstate.ChannelId = proto.Uint32(uint32(channel.Id)) chanstate.Description = proto.String(string(buf)) - if err := client.sendProtoMessage(chanstate); err != nil { + if err := client.sendMessage(chanstate); err != nil { client.Panic(err) return } @@ -1512,7 +1512,7 @@ func (server *Server) handleUserList(client *Client, msg *Message) { Name: proto.String(user.Name), }) } - if err := client.sendProtoMessage(userlist); err != nil { + if err := client.sendMessage(userlist); err != nil { client.Panic(err) return } diff --git a/pkg/mumbleproto/types.go b/pkg/mumbleproto/types.go index 8006f3e..5cf41e3 100644 --- a/pkg/mumbleproto/types.go +++ b/pkg/mumbleproto/types.go @@ -45,6 +45,7 @@ func MessageType(msg interface{}) uint16 { case *Version: return MessageVersion case *UDPTunnel: + case []byte: return MessageUDPTunnel case *Authenticate: return MessageAuthenticate @@ -94,4 +95,4 @@ func MessageType(msg interface{}) uint16 { return MessageServerConfig } panic("unknown type") -} \ No newline at end of file +} diff --git a/server.go b/server.go index 39a4e35..4205a6f 100644 --- a/server.go +++ b/server.go @@ -260,10 +260,9 @@ func (server *Server) NewClient(conn net.Conn) (err error) { client.server = server client.conn = conn client.reader = bufio.NewReader(client.conn) - client.writer = bufio.NewWriter(client.conn) + client.state = StateClientConnected - client.msgchan = make(chan *Message) client.udprecv = make(chan []byte) client.user = nil @@ -271,9 +270,6 @@ func (server *Server) NewClient(conn net.Conn) (err error) { go client.receiver() go client.udpreceiver() - client.doneSending = make(chan bool) - go client.sender() - return } @@ -505,7 +501,7 @@ func (server *Server) handleAuthenticate(client *Client, msg *Message) { // Send CryptState information to the client so it can establish an UDP connection, // if it wishes. client.lastResync = time.Seconds() - err = client.sendProtoMessage(&mumbleproto.CryptSetup{ + err = client.sendMessage(&mumbleproto.CryptSetup{ Key: client.crypt.RawKey[0:], ClientNonce: client.crypt.DecryptIV[0:], ServerNonce: client.crypt.EncryptIV[0:], @@ -629,12 +625,12 @@ func (server *Server) finishAuthenticate(client *Client) { perm.ClearCacheBit() sync.Permissions = proto.Uint64(uint64(perm)) } - if err := client.sendProtoMessage(sync); err != nil { + if err := client.sendMessage(sync); err != nil { client.Panicf("%v", err) return } - err := client.sendProtoMessage(&mumbleproto.ServerConfig{ + err := client.sendMessage(&mumbleproto.ServerConfig{ AllowHtml: proto.Bool(server.cfg.BoolValue("AllowHTML")), MessageLength: proto.Uint32(server.cfg.Uint32Value("MaxTextMessageLength")), ImageMessageLength: proto.Uint32(server.cfg.Uint32Value("MaxImageMessageLength")), @@ -780,7 +776,7 @@ func (server *Server) sendUserList(client *Client) { userstate.PluginIdentity = proto.String(connectedClient.PluginIdentity) } - err := client.sendProtoMessage(userstate) + err := client.sendMessage(userstate) if err != nil { // Server panic? continue @@ -800,7 +796,7 @@ func (server *Server) sendClientPermissions(client *Client, channel *Channel) { perm := server.aclcache.GetPermission(client, channel) // fixme(mkrautz): Cache which permissions we've already sent. - client.sendProtoMessage(&mumbleproto.PermissionQuery{ + client.sendMessage(&mumbleproto.PermissionQuery{ ChannelId: proto.Uint32(uint32(channel.Id)), Permissions: proto.Uint32(uint32(perm)), }) @@ -816,7 +812,7 @@ func (server *Server) broadcastProtoMessageWithPredicate(msg interface{}, client if client.state < StateClientAuthenticated { continue } - err := client.sendProtoMessage(msg) + err := client.sendMessage(msg) if err != nil { return err }