1
0
Fork 0
forked from External/grumble

Rudimentary server freezing.

This commit is contained in:
Mikkel Krautz 2011-04-25 17:38:17 +02:00
parent b54cbd02be
commit adeb1b2a54
2 changed files with 101 additions and 11 deletions

View file

@ -10,10 +10,14 @@ import (
"flag"
"fmt"
"gob"
"io"
"io/ioutil"
"os"
"log"
"sqlite"
"path/filepath"
"regexp"
"time"
)
var help *bool = flag.Bool("help", false, "Show this help")
@ -158,21 +162,53 @@ func main() {
servers := make(map[int64]*Server)
for _, name := range names {
log.Printf("Loading server %v", name)
s, err := NewServerFromFrozen(filepath.Join(*datadir, name))
if err != nil {
log.Fatalf("Unable to load server: %s", err.String())
if matched, _ := regexp.MatchString("^[0-9]+$", name); matched {
log.Printf("Loading server %v", name)
s, err := NewServerFromFrozen(filepath.Join(*datadir, name))
if err != nil {
log.Fatalf("Unable to load server: %s", err.String())
}
servers[s.Id] = s
go s.ListenAndMurmur()
}
servers[s.Id] = s
go s.ListenAndMurmur()
}
if len(servers) > 0 {
// Sleep.
sleeper := make(chan int)
zzz := <-sleeper
if zzz > 0 {
ticker := time.NewTicker(10e9) // 10 secs
for {
select {
case <-ticker.C:
for sid, server := range servers {
r := server.FreezeServer()
if err != nil {
log.Panicf("Unable to freeze server %v", sid)
}
f, err := ioutil.TempFile(*datadir, fmt.Sprintf("%v_", sid))
if err != nil {
log.Panicf("Unable to open file: %", err.String())
}
nwritten, err := io.Copy(f, r)
if err != nil {
log.Panicf("Unable to copy frozen server data: %v bytes, err=%v", nwritten, err)
}
err = r.Close()
if err != nil {
log.Panicf("Unable to freeze server: %v", err)
}
err = f.Sync()
if err != nil {
log.Panicf("Unable to sync frozen file: %v", err)
}
err = f.Close()
if err != nil {
log.Panicf("Unable to freeze server: %v", err)
}
err = os.Rename(f.Name(), filepath.Join(*datadir, fmt.Sprintf("%v", sid)))
if err != nil {
log.Panicf("Unable to freeze server: %v", err)
}
}
}
}
}
}

View file

@ -12,13 +12,16 @@ import (
"net"
"bufio"
"bytes"
"compress/gzip"
"encoding/binary"
"encoding/hex"
"sync"
"goprotobuf.googlecode.com/hg/proto"
"mumbleproto"
"cryptstate"
"gob"
"hash"
"io"
"rand"
"strings"
)
@ -48,6 +51,7 @@ type Server struct {
incoming chan *Message
udpsend chan *Message
voicebroadcast chan *VoiceBroadcast
freezeRequest chan *freezeRequest
// Signals to the server that a client has been successfully
// authenticated.
@ -84,6 +88,11 @@ type Server struct {
aclcache ACLCache
}
type freezeRequest struct {
done chan bool
readCloser io.ReadCloser
}
// Allocate a new Murmur instance
func NewServer(id int64, addr string, port int) (s *Server, err os.Error) {
s = new(Server)
@ -103,6 +112,7 @@ func NewServer(id int64, addr string, port int) (s *Server, err os.Error) {
s.incoming = make(chan *Message)
s.udpsend = make(chan *Message)
s.voicebroadcast = make(chan *VoiceBroadcast)
s.freezeRequest = make(chan *freezeRequest)
s.clientAuthenticated = make(chan *Client)
s.MaxBandwidth = 300000
@ -326,10 +336,45 @@ func (server *Server) handler() {
// server info.
case client := <-server.clientAuthenticated:
server.finishAuthenticate(client)
// Synchonized freeze requests
case req := <-server.freezeRequest:
fs, err := server.Freeze()
if err != nil {
log.Panicf("Unable to freeze the server")
}
go server.handleFreezeRequest(req, &fs)
}
}
}
func (server *Server) handleFreezeRequest(freq *freezeRequest, fs *frozenServer) {
pr, pw := io.Pipe()
freq.readCloser = pr
freq.done <- true
zw, err := gzip.NewWriterLevel(pw, gzip.BestCompression)
if err != nil {
if err = pw.CloseWithError(err); err != nil {
log.Panicf("Unable to close PipeWriter: %v", err.String())
}
return
}
enc := gob.NewEncoder(zw)
err = enc.Encode(fs)
if err != nil {
if err = pw.CloseWithError(err); err != nil {
log.Panicf("Unable to close PipeWriter: %v", err.String())
}
}
if err = pw.CloseWithError(zw.Close()); err != nil {
log.Panicf("Unable to close PipeWriter: %v", err.String())
}
}
// Handle an Authenticate protobuf message. This is handled in a separate
// goroutine to allow for remote authenticators that are slow to respond.
//
@ -936,6 +981,15 @@ func (server *Server) userEnterChannel(client *Client, channel *Channel, usersta
}
}
// Create a point-in-time snapshot of Server and make it
// accessible through the returned io.ReadCloser.
func (s *Server) FreezeServer() io.ReadCloser {
fr := &freezeRequest{done:make(chan bool)}
s.freezeRequest <- fr
<-fr.done
return fr.readCloser
}
// The accept loop of the server.
func (s *Server) ListenAndMurmur() {
// Launch the event handler goroutine