Work-in-progress server start/stop.

This commit is contained in:
Mikkel Krautz 2011-11-12 01:52:10 +01:00
parent 03932c13cf
commit 8731716a05
7 changed files with 219 additions and 123 deletions

275
server.go
View file

@ -55,13 +55,17 @@ type KeyValuePair struct {
// A Murmur server instance
type Server struct {
Id int64
listener tls.Listener
address string
port int
udpconn *net.UDPConn
tlscfg *tls.Config
running bool
Id int64
tcpl *net.TCPListener
tlsl *tls.Listener
address string
port int
udpconn *net.UDPConn
tlscfg *tls.Config
bye chan bool
netwg sync.WaitGroup
running bool
incoming chan *Message
voicebroadcast chan *VoiceBroadcast
@ -148,6 +152,7 @@ func NewServer(id int64, addr string, port int) (s *Server, err error) {
s.hclients = make(map[string][]*Client)
s.hpclients = make(map[string]*Client)
s.bye = make(chan bool)
s.incoming = make(chan *Message)
s.voicebroadcast = make(chan *VoiceBroadcast)
s.cfgUpdate = make(chan *KeyValuePair)
@ -345,10 +350,13 @@ func (server *Server) UnlinkChannels(channel *Channel, other *Channel) {
// This is the synchronous handler goroutine.
// Important control channel messages are routed through this Goroutine
// to keep server state synchronized.
func (server *Server) handler() {
func (server *Server) handlerLoop() {
regtick := time.Tick((3600 + ((server.Id * 60) % 600)) * 1e9)
for {
select {
// We're done. Stop the server's event handler
case <-server.bye:
return
// Control channel messages
case msg := <-server.incoming:
client := msg.client
@ -863,19 +871,6 @@ func (server *Server) handleIncomingMessage(client *Client, msg *Message) {
}
}
func (s *Server) SetupUDP() (err error) {
addr := &net.UDPAddr{
net.ParseIP(s.address),
s.port,
}
s.udpconn, err = net.ListenUDP("udp", addr)
if err != nil {
return
}
return
}
// Send the content of buf as a UDP packet to addr.
func (s *Server) SendUDP(buf []byte, addr *net.UDPAddr) (err error) {
_, err = s.udpconn.WriteTo(buf, addr)
@ -883,13 +878,18 @@ func (s *Server) SendUDP(buf []byte, addr *net.UDPAddr) (err error) {
}
// Listen for and handle UDP packets.
func (server *Server) ListenUDP() {
func (server *Server) udpListenLoop() {
defer server.netwg.Done()
buf := make([]byte, UDPPacketSize)
for {
nread, remote, err := server.udpconn.ReadFrom(buf)
if err != nil {
// Not much to do here. This is bad, of course. Should we panic this server instance?
continue
if isTimeout(err) {
continue
} else {
return
}
}
udpaddr, ok := remote.(*net.UDPAddr)
@ -917,7 +917,7 @@ func (server *Server) ListenUDP() {
err = server.SendUDP(buffer.Bytes(), udpaddr)
if err != nil {
server.Print("Unable to write UDP packet: %v", err.Error())
return
}
} else {
@ -1153,93 +1153,180 @@ func (server *Server) FilterText(text string) (filtered string, err error) {
}
// The accept loop of the server.
func (s *Server) ListenAndMurmur() {
// Launch the event handler goroutine
go s.handler()
func (server *Server) acceptLoop() {
defer server.netwg.Done()
host := s.cfg.StringValue("Address")
if host != "" {
s.address = host
}
port := s.cfg.IntValue("Port")
if port != 0 {
s.port = port
}
s.running = true
// Setup our UDP listener and spawn our reader and writer goroutines
s.SetupUDP()
go s.ListenUDP()
// Create a new listening TLS socket.
certFn := filepath.Join(Args.DataDir, "cert.pem")
keyFn := filepath.Join(Args.DataDir, "key.pem")
cert, err := tls.LoadX509KeyPair(certFn, keyFn)
if err != nil {
s.Printf("Unable to load x509 key pair: %v", err)
return
}
cfg := new(tls.Config)
cfg.Certificates = append(cfg.Certificates, cert)
cfg.AuthenticateClient = true
s.tlscfg = cfg
tl, err := net.ListenTCP("tcp", &net.TCPAddr{
net.ParseIP(s.address),
s.port,
})
if err != nil {
s.Printf("Cannot bind: %s\n", err)
return
}
listener := tls.NewListener(tl, s.tlscfg)
s.Printf("Started: listening on %v", tl.Addr())
// Open a fresh freezer log
err = s.openFreezeLog()
if err != nil {
s.Fatal(err)
}
// Update server registration if needed.
go func() {
time.Sleep((60 + s.Id*10) * 1e9)
s.RegisterPublicServer()
}()
// The main accept loop. Basically, we block
// until we get a new client connection, and
// when we do get a new connection, we spawn
// a new Go-routine to handle the client.
for {
// New client connected
conn, err := listener.Accept()
conn, err := server.tlsl.Accept()
if err != nil {
s.Printf("Unable to accept new client: %v", err)
continue
if isTimeout(err) {
continue
} else {
return
}
}
// Is the client banned?
// fixme(mkrautz): Clean up expired bans
if s.IsBanned(conn) {
s.Printf("Rejected client %v: Banned", conn.RemoteAddr())
if server.IsBanned(conn) {
server.Printf("Rejected client %v: Banned", conn.RemoteAddr())
err := conn.Close()
if err != nil {
s.Printf("Unable to close connection: %v", err)
server.Printf("Unable to close connection: %v", err)
}
continue
}
// Create a new client connection from our *tls.Conn
// which wraps net.TCPConn.
err = s.NewClient(conn)
err = server.NewClient(conn)
if err != nil {
s.Printf("Unable to handle new client: %v", err)
server.Printf("Unable to handle new client: %v", err)
continue
}
}
}
// The isTimeout function checks whether a
// network error is a timeout.
func isTimeout(err error) bool {
if e, ok := err.(net.Error); ok {
return e.Timeout()
}
return false
}
// Start the server.
func (server *Server) Start() (err error) {
if server.running {
return errors.New("already running")
}
host := server.cfg.StringValue("Address")
if host != "" {
server.address = host
}
port := server.cfg.IntValue("Port")
if port != 0 {
server.port = port
}
// Setup our UDP listener
addr := &net.UDPAddr{
net.ParseIP(server.address),
server.port,
}
server.udpconn, err = net.ListenUDP("udp", addr)
if err != nil {
return err
}
err = server.udpconn.SetReadTimeout(1e9)
if err != nil {
return err
}
// Set up our TCP connection
server.tcpl, err = net.ListenTCP("tcp", &net.TCPAddr{
net.ParseIP(server.address),
server.port,
})
if err != nil {
return err
}
err = server.tcpl.SetTimeout(1e9)
if err != nil {
return err
}
// Wrap a TLS listener around the TCP connection
certFn := filepath.Join(Args.DataDir, "cert.pem")
keyFn := filepath.Join(Args.DataDir, "key.pem")
cert, err := tls.LoadX509KeyPair(certFn, keyFn)
if err != nil {
return err
}
server.tlscfg = &tls.Config{
Certificates: []tls.Certificate{cert},
AuthenticateClient: true,
}
server.tlsl = tls.NewListener(server.tcpl, server.tlscfg)
server.Printf("Started: listening on %v", server.tcpl.Addr())
server.running = true
// Open a fresh freezer log
err = server.openFreezeLog()
if err != nil {
server.Fatal(err)
}
// Launch the event handler goroutine
go server.handlerLoop()
// Add the two network receiver goroutines to the net waitgroup
// and launch them.
//
// We use the waitgroup to provide a blocking Stop() method
// for the servers. Each network goroutine defers a call to
// netwg.Done(). In the Stop() we close all the connections
// and call netwg.Wait() to wait for the goroutines to end.
server.netwg.Add(2)
go server.udpListenLoop()
go server.acceptLoop()
// Schedule a server registration update (if needed)
go func() {
time.Sleep((60 + server.Id*10) * 1e9)
server.RegisterPublicServer()
}()
return nil
}
// Stop the server.
func (server *Server) Stop() (err error) {
if !server.running {
return errors.New("server not running")
}
// Stop the handler goroutine and disconnect all
// clients
server.bye <- true
for _, client := range server.clients {
client.Disconnect()
}
// Close the TLS listener and the TCP listener
err = server.tlsl.Close()
if err != nil {
return err
}
err = server.tcpl.Close()
if err != nil {
return err
}
// Close the UDP connection
err = server.udpconn.Close()
if err != nil {
return err
}
// Since we'll (on some OSes) have to wait for the network
// goroutines to end, we might as well use the time to store
// a full server freeze to disk.
err = server.FreezeToFile()
if err != nil {
server.Fatal(err)
}
// Wait for the two network receiver
// goroutines end.
server.netwg.Wait()
server.running = false
server.Printf("Stopped")
return nil
}