From ff299b8aa477174585452518833e7b0112264087 Mon Sep 17 00:00:00 2001 From: Mikkel Krautz Date: Sat, 27 Aug 2011 13:46:14 +0200 Subject: [PATCH] Add the freezer package. --- pkg/freezer/Makefile | 13 ++ pkg/freezer/error.go | 21 +++ pkg/freezer/freezer_test.go | 280 ++++++++++++++++++++++++++++++++++++ pkg/freezer/types.go | 17 +++ pkg/freezer/types.proto | 78 ++++++++++ pkg/freezer/walker.go | 257 +++++++++++++++++++++++++++++++++ pkg/freezer/writer.go | 194 +++++++++++++++++++++++++ 7 files changed, 860 insertions(+) create mode 100644 pkg/freezer/Makefile create mode 100644 pkg/freezer/error.go create mode 100644 pkg/freezer/freezer_test.go create mode 100644 pkg/freezer/types.go create mode 100644 pkg/freezer/types.proto create mode 100644 pkg/freezer/walker.go create mode 100644 pkg/freezer/writer.go diff --git a/pkg/freezer/Makefile b/pkg/freezer/Makefile new file mode 100644 index 0000000..e2ccce1 --- /dev/null +++ b/pkg/freezer/Makefile @@ -0,0 +1,13 @@ +include $(GOROOT)/src/Make.inc + +TARG = grumble/freezer +GOFILES = \ + writer.go \ + walker.go \ + types.go \ + types.pb.go \ + error.go + +include $(GOROOT)/src/Make.pkg +include $(GOROOT)/src/pkg/goprotobuf.googlecode.com/hg/Make.protobuf + diff --git a/pkg/freezer/error.go b/pkg/freezer/error.go new file mode 100644 index 0000000..eeb6472 --- /dev/null +++ b/pkg/freezer/error.go @@ -0,0 +1,21 @@ +// Copyright (c) 2011 The Grumble Authors +// The use of this source code is goverened by a BSD-style +// license that can be found in the LICENSE-file. + +package freezer + +import "os" + +// Writer errors +var ( + ErrTxGroupFull = os.NewError("transction group is full") + ErrTxGroupValueTooBig = os.NewError("value too big to put inside the txgroup") +) + +// Walker errors +var ( + ErrUnexpectedEndOfRecord = os.NewError("unexpected end of record") + ErrCRC32Mismatch = os.NewError("CRC32 mismatch") + ErrRemainingBytesForRecord = os.NewError("remaining bytes in record") + ErrRecordTooBig = os.NewError("the record in the file is too big") +) \ No newline at end of file diff --git a/pkg/freezer/freezer_test.go b/pkg/freezer/freezer_test.go new file mode 100644 index 0000000..2425c18 --- /dev/null +++ b/pkg/freezer/freezer_test.go @@ -0,0 +1,280 @@ +// Copyright (c) 2011 The Grumble Authors +// The use of this source code is goverened by a BSD-style +// license that can be found in the LICENSE-file. + +package freezer + +import ( + "bytes" + "encoding/binary" + "goprotobuf.googlecode.com/hg/proto" + "hash/crc32" + "io" + "math" + "os" + "testing" +) + +var testValues []interface{} = []interface{} { + &ConfigKeyValuePair{Key: proto.String("Foo")}, + &BanList{Bans: []*Ban{ &Ban{ Mask: proto.Uint32(32) } } }, + &User{Id: proto.Uint32(0), Name: proto.String("SuperUser")}, + &UserRemove{Id: proto.Uint32(0)}, + &Channel{Id: proto.Uint32(0), Name: proto.String("RootChannel")}, + &ChannelRemove{Id: proto.Uint32(0)}, +} + +// Generate a byet slice representing an entry in a Tx record +func genTxValue(kind uint16, val []byte) (chunk []byte, crc32sum uint32, err os.Error) { + buf := new(bytes.Buffer) + + err = binary.Write(buf, binary.LittleEndian, kind) + if err != nil { + return nil, 0, err + } + + err = binary.Write(buf, binary.LittleEndian, uint16(len(val))) + if err != nil { + return nil, 0, err + } + + _, err = buf.Write(val) + if err != nil { + return nil, 0, err + } + + summer := crc32.NewIEEE() + _, err = summer.Write(val) + if err != nil { + return nil, 0, err + } + + return buf.Bytes(), summer.Sum32(), nil +} + +// Generate the header of a Tx record +func genTestCaseHeader(chunk []byte, numops uint32, crc32sum uint32) (r io.Reader, err os.Error) { + buf := new(bytes.Buffer) + + err = binary.Write(buf, binary.LittleEndian, uint32(4 + 4 + len(chunk))) + if err != nil { + return nil, err + } + + err = binary.Write(buf, binary.LittleEndian, numops) + if err != nil { + return nil, err + } + + err = binary.Write(buf, binary.LittleEndian, crc32sum) + if err != nil { + return nil, err + } + + _, err = buf.Write(chunk) + if err != nil { + return nil, err + } + + return buf, nil +} + +// Test that the Walker and the Writer agree on the +// protocol. +func TestCreation(t *testing.T) { + l, err := NewLogFile("creation.log") + if err != nil { + t.Error(err) + return + } + l.Close() + os.Remove("creation.log") +} + +func TestLogging(t *testing.T) { + l, err := NewLogFile("logging.log") + if err != nil { + t.Error(err) + return + } + defer l.Close() + defer os.Remove("logging.log") + + for _, val := range testValues { + err = l.Put(val) + if err != nil { + t.Error(err) + return + } + } + + walker, err := NewFileWalker("logging.log") + if err != nil { + t.Error(err) + return + } + + i := 0 + for { + entries, err := walker.Next() + if err == os.EOF { + break + } else if err != nil { + t.Error(err) + return + } + if len(entries) != 1 { + t.Error("> 1 entry in log tx") + return + } + val := entries[0] + if !proto.Equal(val, testValues[i]) { + t.Error("proto message mismatch") + } + i += 1 + } +} + +// Check that we correctly catch CRC32 mismatches +func TestCRC32MismatchLog(t *testing.T) { + chunk, _, err := genTxValue(0xff, []byte{0xff, 0xff, 0xff, 0xff, 0xff}) + if err != nil { + t.Error(err) + } + + buf, err := genTestCaseHeader(chunk, 1, 0xcafebabe) + if err != nil { + t.Error(err) + } + + walker, err := NewReaderWalker(buf) + if err != nil { + t.Error(err) + } + + _, err = walker.Next() + if err != ErrCRC32Mismatch { + t.Errorf("exepcted CRC32 mismatch, got %v", err) + } + _, err = walker.Next() + if err != os.EOF { + t.Errorf("expected EOF, got %v", err) + } +} + +// Test that unknown TxGroup values are not attempted to be +// decoded. +func TestUnknownTypeDecode(t *testing.T) { + buf, crc32sum, err := genTxValue(0xfa, []byte{0xfa, 0xfa, 0xfa}) + if err != nil { + t.Error(err) + } + + r, err := genTestCaseHeader(buf, 1, crc32sum) + if err != nil { + t.Error(err) + } + + walker, err := NewReaderWalker(r) + if err != nil { + t.Error(err) + } + + entries, err := walker.Next() + // The bytes above should not decode to anything useful + // (because they have an unknown type kind) + if len(entries) != 0 && err != nil { + t.Errorf("expected empty entries and non-nil err (got %v entries and %v)", len(entries), err) + } + _, err = walker.Next() + if err != os.EOF { + t.Errorf("expected EOF, got %v", err) + } +} + +// Test a TxRecord with some trailing bytes +func TestTrailingBytesTxRecord(t *testing.T) { + buf, _, err := genTxValue(0xfa, []byte{0xff, 0xff, 0xff}) + // Add some trailing bytes to the tx record + buf = append(buf, byte(0xff)) + buf = append(buf, byte(0xff)) + buf = append(buf, byte(0xff)) + + summer := crc32.NewIEEE() + _, err = summer.Write(buf) + if err != nil { + t.Error(err) + } + crc32sum := summer.Sum32() + + r, err := genTestCaseHeader(buf, 1, crc32sum) + if err != nil { + t.Error(err) + } + + walker, err := NewReaderWalker(r) + if err != nil { + t.Error(err) + } + + _, err = walker.Next() + if err != ErrRemainingBytesForRecord { + t.Error(err) + } + + _, err = walker.Next() + if err != os.EOF { + t.Errorf("expected EOF, got %v", err) + } +} + +// Test that we check for TxRecords that are too big. +// A TxRecord can hold 255 entries, and each of those can be +// up to 16KB. +func TestTooBigTxRecord(t *testing.T) { + bigValue := make([]byte, math.MaxUint16*math.MaxUint8 + 4) + r, err := genTestCaseHeader(bigValue, 1, 0) + if err != nil { + t.Error(err) + } + + walker, err := NewReaderWalker(r) + if err != nil { + t.Error(err) + } + + _, err = walker.Next() + if err != ErrRecordTooBig { + t.Errorf("expected ErrRecordTooBig, got %v", err) + } +} + +// Test that we correctly enforce the 255 entry limit of TxGroups. +func TestTxGroupCapacityEnforcement(t *testing.T) { + l, err := NewLogFile("capacity-enforcement.log") + if err != nil { + t.Error(err) + return + } + defer l.Close() + defer os.Remove("capacity-enforcement.log") + + tx := l.BeginTx() + if err != nil { + t.Error(err) + } + + for i := 0; i <= 255; i++ { + entry := testValues[i % len(testValues)] + err = tx.Put(entry) + if err != nil { + t.Error(err) + } + } + + entry := testValues[0] + err = tx.Put(entry) + if err != ErrTxGroupFull { + t.Error(err) + } +} \ No newline at end of file diff --git a/pkg/freezer/types.go b/pkg/freezer/types.go new file mode 100644 index 0000000..6b3c009 --- /dev/null +++ b/pkg/freezer/types.go @@ -0,0 +1,17 @@ +// Copyright (c) 2011 The Grumble Authors +// The use of this source code is goverened by a BSD-style +// license that can be found in the LICENSE-file. + +package freezer + +type typeKind uint32 + +const ( + ServerType typeKind = iota + ConfigKeyValuePairType + BanListType + UserType + UserRemoveType + ChannelType + ChannelRemoveType +) \ No newline at end of file diff --git a/pkg/freezer/types.proto b/pkg/freezer/types.proto new file mode 100644 index 0000000..9bc03c1 --- /dev/null +++ b/pkg/freezer/types.proto @@ -0,0 +1,78 @@ +package freezer; + +option optimize_for = SPEED; + +message Server { + repeated ConfigKeyValuePair config = 2; + optional BanList ban_list = 3; + repeated Channel channels = 4; + repeated User users = 5; +} + +message ConfigKeyValuePair { + required string key = 1; + optional string value = 2; +} + +message Ban { + optional bytes ip = 1; + optional uint32 mask = 2; + optional string username = 3; + optional string cert_hash = 4; + optional string reason = 5; + optional int64 start = 6; + optional uint32 duration = 7; +} + +message BanList { + repeated Ban bans = 1; +} + +message User { + optional uint32 id = 1; + optional string name = 2; + optional string password = 3; + optional string cert_hash = 4; + optional string email = 5; + optional string texture_blob = 6; + optional string comment_blob = 7; + optional uint32 last_channel_id = 8; + optional uint64 last_active = 9; +} + +message UserRemove { + optional uint32 id = 1; +} + +message Channel { + optional uint32 id = 1; + optional string name = 2; + optional uint32 parent_id = 3; + optional int64 position = 4; + optional bool inherit_acl = 5; + repeated uint32 links = 6; + repeated ACL acl = 7; + repeated Group groups = 8; + optional string description_blob = 9; +} + +message ChannelRemove { + optional uint32 id = 1; +} + +message ACL { + optional uint32 user_id = 1; + optional string group = 2; + optional bool apply_here = 3; + optional bool apply_subs = 4; + optional uint32 allow = 5; + optional uint32 deny = 6; +} + +message Group { + optional string name = 1; + optional bool inherit = 2; + optional bool inheritable = 3; + repeated uint32 add = 4; + repeated uint32 remove = 5; +} \ No newline at end of file diff --git a/pkg/freezer/walker.go b/pkg/freezer/walker.go new file mode 100644 index 0000000..6edef19 --- /dev/null +++ b/pkg/freezer/walker.go @@ -0,0 +1,257 @@ +// Copyright (c) 2011 The Grumble Authors +// The use of this source code is goverened by a BSD-style +// license that can be found in the LICENSE-file. + +package freezer + +import ( + "encoding/binary" + "goprotobuf.googlecode.com/hg/proto" + "hash" + "hash/crc32" + "io" + "math" + "os" +) + +// Checks whether the error err is an EOF +// error. +func isEOF(err os.Error) bool { + if err == os.EOF || err == io.ErrUnexpectedEOF { + return true + } + return false +} + +// Type Walker implements a method for +// iterating the transaction groups of an +// immutable Log. +type Walker struct { + r io.Reader +} + +// Type txReader imlpements a checksumming reader, intended +// for reading transaction groups of a Log. +// +// Besides auto-checksumming the read content, it also +// keeps track of the amount of consumed bytes. +type txReader struct { + r io.Reader + crc32 hash.Hash32 + consumed int +} + +// Create a new txReader for reading a transaction group +// from the log. +func newTxReader(r io.Reader) *txReader { + txr := new(txReader) + txr.r = r + txr.crc32 = crc32.NewIEEE() + return txr +} + +// walkReader's Read method. Reads from walkReader's Reader +// and checksums while reading. +func (txr *txReader) Read(p []byte) (n int, err os.Error) { + n, err = txr.r.Read(p) + if err != nil && err != os.EOF { + return + } + txr.consumed += n + + _, crc32err := txr.crc32.Write(p) + if crc32err != nil { + return n, crc32err + } + + return n, err +} + +// Sum32 returns the IEEE-style CRC32 checksum +// of the data read by the walkReader. +func (txr *txReader) Sum32() uint32 { + return txr.crc32.Sum32() +} + +// Consumed returns the amount of bytes consumed by +// the walkReader. +func (txr *txReader) Consumed() int { + return txr.consumed +} + +// Create a new Walker that iterates over the entries of the given log file. +func NewFileWalker(fn string) (walker *Walker, err os.Error) { + f, err := os.Open(fn) + if err != nil { + return nil, err + } + + walker = new(Walker) + walker.r = f + + return walker, nil +} + +// Create a new Walker that iterates over the log entries of a given Reader. +func NewReaderWalker(r io.Reader) (walker *Walker, err os.Error) { + walker = new(Walker) + walker.r = r + return walker, nil +} + +// Next returns the next transaction group in the log as a slice of +// pointers to the protobuf-serialized log entries. +// +// This method will only attempt to serialize types with type identifiers +// that this package knows of. In case an unknown type identifier is found +// in a transaction group, it is silently ignored (it's skipped). +// +// On error, Next returns a nil slice and a non-nil err. +// When the end of the file is reached, Next returns nil, os.EOF. +func (walker *Walker) Next() (entries []interface{}, err os.Error) { + var ( + remainBytes uint32 + remainOps uint32 + crcsum uint32 + kind uint16 + length uint16 + ) + + err = binary.Read(walker.r, binary.LittleEndian, &remainBytes) + if isEOF(err) { + return nil, os.EOF + } else if err != nil { + return nil, err + } + + if remainBytes < 8 { + return nil, ErrUnexpectedEndOfRecord + } + if remainBytes-8 > math.MaxUint8*math.MaxUint16 { + return nil, ErrRecordTooBig + } + + err = binary.Read(walker.r, binary.LittleEndian, &remainOps) + if isEOF(err) { + return nil, ErrUnexpectedEndOfRecord + } else if err != nil { + return nil, err + } + + err = binary.Read(walker.r, binary.LittleEndian, &crcsum) + if isEOF(err) { + return nil, ErrUnexpectedEndOfRecord + } else if err != nil { + return nil, err + } + + remainBytes -= 8 + reader := newTxReader(walker.r) + + for remainOps > 0 { + err = binary.Read(reader, binary.LittleEndian, &kind) + if isEOF(err) { + break + } else if err != nil { + return nil, err + } + + err = binary.Read(reader, binary.LittleEndian, &length) + if isEOF(err) { + break + } else if err != nil { + return nil, err + } + + buf := make([]byte, length) + _, err = io.ReadFull(reader, buf) + if isEOF(err) { + break + } else if err != nil { + return nil, err + } + + switch typeKind(kind) { + case ServerType: + server := &Server{} + err = proto.Unmarshal(buf, &server) + if isEOF(err) { + break + } else if err != nil { + return nil, err + } + entries = append(entries, server) + case ConfigKeyValuePairType: + cfg := &ConfigKeyValuePair{} + err = proto.Unmarshal(buf, cfg) + if isEOF(err) { + break + } else if err != nil { + return nil, err + } + entries = append(entries, cfg) + case BanListType: + banlist := &BanList{} + err = proto.Unmarshal(buf, banlist) + if isEOF(err) { + break + } else if err != nil { + return nil, err + } + entries = append(entries, banlist) + case UserType: + user := &User{} + err = proto.Unmarshal(buf, user) + if isEOF(err) { + break + } else if err != nil { + return nil, err + } + entries = append(entries, user) + case UserRemoveType: + userRemove := &UserRemove{} + err = proto.Unmarshal(buf, userRemove) + if isEOF(err) { + break + } else if err != nil { + return nil, err + } + entries = append(entries, userRemove) + case ChannelType: + channel := &Channel{} + err = proto.Unmarshal(buf, channel) + if isEOF(err) { + break + } else if err != nil { + return nil, err + } + entries = append(entries, channel) + case ChannelRemoveType: + channelRemove := &ChannelRemove{} + err = proto.Unmarshal(buf, channelRemove) + if isEOF(err) { + break + } else if err != nil { + return nil, err + } + entries = append(entries, channelRemove) + } + + remainOps -= 1 + continue + } + + if isEOF(err) { + return nil, ErrUnexpectedEndOfRecord + } + + if reader.Consumed() != int(remainBytes) { + return nil, ErrRemainingBytesForRecord + } + + if reader.Sum32() != crcsum { + return nil, ErrCRC32Mismatch + } + + return entries, nil +} diff --git a/pkg/freezer/writer.go b/pkg/freezer/writer.go new file mode 100644 index 0000000..2a6d369 --- /dev/null +++ b/pkg/freezer/writer.go @@ -0,0 +1,194 @@ +// Copyright (c) 2011 The Grumble Authors +// The use of this source code is goverened by a BSD-style +// license that can be found in the LICENSE-file. + +// Package freezer implements a persistence layer for Grumble. +package freezer + +// The freezer package exports types that can be persisted to disk, +// both as part of a full server snapshot, and as part of a log of state changes. +// +// The freezer package also implements an append-only log writer that can be used +// to serialize the freezer types to disk in atomic entities called transactions +// records. +// +// A Walker type that can be used to iterate over the different transaction records +// of a log file is also provided. + +import ( + "bytes" + "encoding/binary" + "goprotobuf.googlecode.com/hg/proto" + "hash" + "hash/crc32" + "io" + "math" + "os" +) + +// Log implements an append-only log for flattened +// protobuf-encoded log entries. +// +// These log entries are typically state-change deltas +// for a Grumble server's main data strutures. +// +// The log supports atomic transactions. Transaction groups +// are persisted to disk with a checksum that covers the +// whole transaction group. In case of a failure, none of the +// entries of a transaction will be applied. +type Log struct { + wc io.WriteCloser +} + +// Type LogTx represents a transaction in the log. +// Transactions can be used to group several changes into an +// atomic entity in the log file. +type LogTx struct { + log *Log + crc hash.Hash32 + buf *bytes.Buffer + numops int +} + +// Create a new log file +func NewLogFile(fn string) (*Log, os.Error) { + f, err := os.Create(fn) + if err != nil { + return nil, err + } + + log := new(Log) + log.wc = f + + return log, nil +} + +// Close a Log +func (log *Log) Close() os.Error { + return log.wc.Close() +} + +// Append a log entry +// +// This method implicitly creates a transaction +// group for this single Put operation. It is merely +// a convenience wrapper. +func (log *Log) Put(value interface{}) (err os.Error) { + tx := log.BeginTx() + err = tx.Put(value) + if err != nil { + return err + } + return tx.Commit() +} + +// Begin a transaction +func (log *Log) BeginTx() *LogTx { + tx := &LogTx{} + tx.log = log + tx.buf = new(bytes.Buffer) + tx.crc = crc32.NewIEEE() + return tx +} + +// Append a log entry to the transaction. +// The transaction's log entries will not be persisted to +// the log until the Commit has been called on the transaction. +func (tx *LogTx) Put(value interface{}) (err os.Error) { + var ( + buf []byte + kind typeKind + ) + + if tx.numops > 255 { + return ErrTxGroupFull + } + + switch val := value.(type) { + case *Server: + kind = ServerType + buf, err = proto.Marshal(val) + case *ConfigKeyValuePair: + kind = ConfigKeyValuePairType + buf, err = proto.Marshal(val) + case *BanList: + kind = BanListType + buf, err = proto.Marshal(val) + case *User: + kind = UserType + buf, err = proto.Marshal(val) + case *UserRemove: + kind = UserRemoveType + buf, err = proto.Marshal(val) + case *Channel: + kind = ChannelType + buf, err = proto.Marshal(val) + case *ChannelRemove: + kind = ChannelRemoveType + buf, err = proto.Marshal(val) + default: + panic("Attempt to put an unknown type") + } + + if err != nil { + return err + } + + if len(buf) > math.MaxUint16 { + return ErrTxGroupValueTooBig + } + + w := io.MultiWriter(tx.buf, tx.crc) + + err = binary.Write(w, binary.LittleEndian, uint16(kind)) + if err != nil { + return err + } + + err = binary.Write(w, binary.LittleEndian, uint16(len(buf))) + if err != nil { + return err + } + + _, err = w.Write(buf) + if err != nil { + return err + } + + tx.numops += 1; + + return nil +} + +// Commit all changes of the transaction to the log +// as a single atomic entry. +func (tx *LogTx) Commit() (err os.Error) { + buf := new(bytes.Buffer) + + err = binary.Write(buf, binary.LittleEndian, uint32(4 + 4 + tx.buf.Len())) + if err != nil { + return err + } + + err = binary.Write(buf, binary.LittleEndian, uint32(tx.numops)) + if err != nil { + return err + } + + err = binary.Write(buf, binary.LittleEndian, tx.crc.Sum32()) + if err != nil { + return err + } + + _, err = buf.Write(tx.buf.Bytes()) + if err != nil { + return err + } + + _, err = tx.log.wc.Write(buf.Bytes()) + if err != nil { + return err + } + + return nil +} \ No newline at end of file