1
0
Fork 0
forked from External/grumble

Add the freezer package.

This commit is contained in:
Mikkel Krautz 2011-08-27 13:46:14 +02:00
parent ceb3524b49
commit ff299b8aa4
7 changed files with 860 additions and 0 deletions

13
pkg/freezer/Makefile Normal file
View file

@ -0,0 +1,13 @@
include $(GOROOT)/src/Make.inc
TARG = grumble/freezer
GOFILES = \
writer.go \
walker.go \
types.go \
types.pb.go \
error.go
include $(GOROOT)/src/Make.pkg
include $(GOROOT)/src/pkg/goprotobuf.googlecode.com/hg/Make.protobuf

21
pkg/freezer/error.go Normal file
View file

@ -0,0 +1,21 @@
// Copyright (c) 2011 The Grumble Authors
// The use of this source code is goverened by a BSD-style
// license that can be found in the LICENSE-file.
package freezer
import "os"
// Writer errors
var (
ErrTxGroupFull = os.NewError("transction group is full")
ErrTxGroupValueTooBig = os.NewError("value too big to put inside the txgroup")
)
// Walker errors
var (
ErrUnexpectedEndOfRecord = os.NewError("unexpected end of record")
ErrCRC32Mismatch = os.NewError("CRC32 mismatch")
ErrRemainingBytesForRecord = os.NewError("remaining bytes in record")
ErrRecordTooBig = os.NewError("the record in the file is too big")
)

280
pkg/freezer/freezer_test.go Normal file
View file

@ -0,0 +1,280 @@
// Copyright (c) 2011 The Grumble Authors
// The use of this source code is goverened by a BSD-style
// license that can be found in the LICENSE-file.
package freezer
import (
"bytes"
"encoding/binary"
"goprotobuf.googlecode.com/hg/proto"
"hash/crc32"
"io"
"math"
"os"
"testing"
)
var testValues []interface{} = []interface{} {
&ConfigKeyValuePair{Key: proto.String("Foo")},
&BanList{Bans: []*Ban{ &Ban{ Mask: proto.Uint32(32) } } },
&User{Id: proto.Uint32(0), Name: proto.String("SuperUser")},
&UserRemove{Id: proto.Uint32(0)},
&Channel{Id: proto.Uint32(0), Name: proto.String("RootChannel")},
&ChannelRemove{Id: proto.Uint32(0)},
}
// Generate a byet slice representing an entry in a Tx record
func genTxValue(kind uint16, val []byte) (chunk []byte, crc32sum uint32, err os.Error) {
buf := new(bytes.Buffer)
err = binary.Write(buf, binary.LittleEndian, kind)
if err != nil {
return nil, 0, err
}
err = binary.Write(buf, binary.LittleEndian, uint16(len(val)))
if err != nil {
return nil, 0, err
}
_, err = buf.Write(val)
if err != nil {
return nil, 0, err
}
summer := crc32.NewIEEE()
_, err = summer.Write(val)
if err != nil {
return nil, 0, err
}
return buf.Bytes(), summer.Sum32(), nil
}
// Generate the header of a Tx record
func genTestCaseHeader(chunk []byte, numops uint32, crc32sum uint32) (r io.Reader, err os.Error) {
buf := new(bytes.Buffer)
err = binary.Write(buf, binary.LittleEndian, uint32(4 + 4 + len(chunk)))
if err != nil {
return nil, err
}
err = binary.Write(buf, binary.LittleEndian, numops)
if err != nil {
return nil, err
}
err = binary.Write(buf, binary.LittleEndian, crc32sum)
if err != nil {
return nil, err
}
_, err = buf.Write(chunk)
if err != nil {
return nil, err
}
return buf, nil
}
// Test that the Walker and the Writer agree on the
// protocol.
func TestCreation(t *testing.T) {
l, err := NewLogFile("creation.log")
if err != nil {
t.Error(err)
return
}
l.Close()
os.Remove("creation.log")
}
func TestLogging(t *testing.T) {
l, err := NewLogFile("logging.log")
if err != nil {
t.Error(err)
return
}
defer l.Close()
defer os.Remove("logging.log")
for _, val := range testValues {
err = l.Put(val)
if err != nil {
t.Error(err)
return
}
}
walker, err := NewFileWalker("logging.log")
if err != nil {
t.Error(err)
return
}
i := 0
for {
entries, err := walker.Next()
if err == os.EOF {
break
} else if err != nil {
t.Error(err)
return
}
if len(entries) != 1 {
t.Error("> 1 entry in log tx")
return
}
val := entries[0]
if !proto.Equal(val, testValues[i]) {
t.Error("proto message mismatch")
}
i += 1
}
}
// Check that we correctly catch CRC32 mismatches
func TestCRC32MismatchLog(t *testing.T) {
chunk, _, err := genTxValue(0xff, []byte{0xff, 0xff, 0xff, 0xff, 0xff})
if err != nil {
t.Error(err)
}
buf, err := genTestCaseHeader(chunk, 1, 0xcafebabe)
if err != nil {
t.Error(err)
}
walker, err := NewReaderWalker(buf)
if err != nil {
t.Error(err)
}
_, err = walker.Next()
if err != ErrCRC32Mismatch {
t.Errorf("exepcted CRC32 mismatch, got %v", err)
}
_, err = walker.Next()
if err != os.EOF {
t.Errorf("expected EOF, got %v", err)
}
}
// Test that unknown TxGroup values are not attempted to be
// decoded.
func TestUnknownTypeDecode(t *testing.T) {
buf, crc32sum, err := genTxValue(0xfa, []byte{0xfa, 0xfa, 0xfa})
if err != nil {
t.Error(err)
}
r, err := genTestCaseHeader(buf, 1, crc32sum)
if err != nil {
t.Error(err)
}
walker, err := NewReaderWalker(r)
if err != nil {
t.Error(err)
}
entries, err := walker.Next()
// The bytes above should not decode to anything useful
// (because they have an unknown type kind)
if len(entries) != 0 && err != nil {
t.Errorf("expected empty entries and non-nil err (got %v entries and %v)", len(entries), err)
}
_, err = walker.Next()
if err != os.EOF {
t.Errorf("expected EOF, got %v", err)
}
}
// Test a TxRecord with some trailing bytes
func TestTrailingBytesTxRecord(t *testing.T) {
buf, _, err := genTxValue(0xfa, []byte{0xff, 0xff, 0xff})
// Add some trailing bytes to the tx record
buf = append(buf, byte(0xff))
buf = append(buf, byte(0xff))
buf = append(buf, byte(0xff))
summer := crc32.NewIEEE()
_, err = summer.Write(buf)
if err != nil {
t.Error(err)
}
crc32sum := summer.Sum32()
r, err := genTestCaseHeader(buf, 1, crc32sum)
if err != nil {
t.Error(err)
}
walker, err := NewReaderWalker(r)
if err != nil {
t.Error(err)
}
_, err = walker.Next()
if err != ErrRemainingBytesForRecord {
t.Error(err)
}
_, err = walker.Next()
if err != os.EOF {
t.Errorf("expected EOF, got %v", err)
}
}
// Test that we check for TxRecords that are too big.
// A TxRecord can hold 255 entries, and each of those can be
// up to 16KB.
func TestTooBigTxRecord(t *testing.T) {
bigValue := make([]byte, math.MaxUint16*math.MaxUint8 + 4)
r, err := genTestCaseHeader(bigValue, 1, 0)
if err != nil {
t.Error(err)
}
walker, err := NewReaderWalker(r)
if err != nil {
t.Error(err)
}
_, err = walker.Next()
if err != ErrRecordTooBig {
t.Errorf("expected ErrRecordTooBig, got %v", err)
}
}
// Test that we correctly enforce the 255 entry limit of TxGroups.
func TestTxGroupCapacityEnforcement(t *testing.T) {
l, err := NewLogFile("capacity-enforcement.log")
if err != nil {
t.Error(err)
return
}
defer l.Close()
defer os.Remove("capacity-enforcement.log")
tx := l.BeginTx()
if err != nil {
t.Error(err)
}
for i := 0; i <= 255; i++ {
entry := testValues[i % len(testValues)]
err = tx.Put(entry)
if err != nil {
t.Error(err)
}
}
entry := testValues[0]
err = tx.Put(entry)
if err != ErrTxGroupFull {
t.Error(err)
}
}

17
pkg/freezer/types.go Normal file
View file

@ -0,0 +1,17 @@
// Copyright (c) 2011 The Grumble Authors
// The use of this source code is goverened by a BSD-style
// license that can be found in the LICENSE-file.
package freezer
type typeKind uint32
const (
ServerType typeKind = iota
ConfigKeyValuePairType
BanListType
UserType
UserRemoveType
ChannelType
ChannelRemoveType
)

78
pkg/freezer/types.proto Normal file
View file

@ -0,0 +1,78 @@
package freezer;
option optimize_for = SPEED;
message Server {
repeated ConfigKeyValuePair config = 2;
optional BanList ban_list = 3;
repeated Channel channels = 4;
repeated User users = 5;
}
message ConfigKeyValuePair {
required string key = 1;
optional string value = 2;
}
message Ban {
optional bytes ip = 1;
optional uint32 mask = 2;
optional string username = 3;
optional string cert_hash = 4;
optional string reason = 5;
optional int64 start = 6;
optional uint32 duration = 7;
}
message BanList {
repeated Ban bans = 1;
}
message User {
optional uint32 id = 1;
optional string name = 2;
optional string password = 3;
optional string cert_hash = 4;
optional string email = 5;
optional string texture_blob = 6;
optional string comment_blob = 7;
optional uint32 last_channel_id = 8;
optional uint64 last_active = 9;
}
message UserRemove {
optional uint32 id = 1;
}
message Channel {
optional uint32 id = 1;
optional string name = 2;
optional uint32 parent_id = 3;
optional int64 position = 4;
optional bool inherit_acl = 5;
repeated uint32 links = 6;
repeated ACL acl = 7;
repeated Group groups = 8;
optional string description_blob = 9;
}
message ChannelRemove {
optional uint32 id = 1;
}
message ACL {
optional uint32 user_id = 1;
optional string group = 2;
optional bool apply_here = 3;
optional bool apply_subs = 4;
optional uint32 allow = 5;
optional uint32 deny = 6;
}
message Group {
optional string name = 1;
optional bool inherit = 2;
optional bool inheritable = 3;
repeated uint32 add = 4;
repeated uint32 remove = 5;
}

257
pkg/freezer/walker.go Normal file
View file

@ -0,0 +1,257 @@
// Copyright (c) 2011 The Grumble Authors
// The use of this source code is goverened by a BSD-style
// license that can be found in the LICENSE-file.
package freezer
import (
"encoding/binary"
"goprotobuf.googlecode.com/hg/proto"
"hash"
"hash/crc32"
"io"
"math"
"os"
)
// Checks whether the error err is an EOF
// error.
func isEOF(err os.Error) bool {
if err == os.EOF || err == io.ErrUnexpectedEOF {
return true
}
return false
}
// Type Walker implements a method for
// iterating the transaction groups of an
// immutable Log.
type Walker struct {
r io.Reader
}
// Type txReader imlpements a checksumming reader, intended
// for reading transaction groups of a Log.
//
// Besides auto-checksumming the read content, it also
// keeps track of the amount of consumed bytes.
type txReader struct {
r io.Reader
crc32 hash.Hash32
consumed int
}
// Create a new txReader for reading a transaction group
// from the log.
func newTxReader(r io.Reader) *txReader {
txr := new(txReader)
txr.r = r
txr.crc32 = crc32.NewIEEE()
return txr
}
// walkReader's Read method. Reads from walkReader's Reader
// and checksums while reading.
func (txr *txReader) Read(p []byte) (n int, err os.Error) {
n, err = txr.r.Read(p)
if err != nil && err != os.EOF {
return
}
txr.consumed += n
_, crc32err := txr.crc32.Write(p)
if crc32err != nil {
return n, crc32err
}
return n, err
}
// Sum32 returns the IEEE-style CRC32 checksum
// of the data read by the walkReader.
func (txr *txReader) Sum32() uint32 {
return txr.crc32.Sum32()
}
// Consumed returns the amount of bytes consumed by
// the walkReader.
func (txr *txReader) Consumed() int {
return txr.consumed
}
// Create a new Walker that iterates over the entries of the given log file.
func NewFileWalker(fn string) (walker *Walker, err os.Error) {
f, err := os.Open(fn)
if err != nil {
return nil, err
}
walker = new(Walker)
walker.r = f
return walker, nil
}
// Create a new Walker that iterates over the log entries of a given Reader.
func NewReaderWalker(r io.Reader) (walker *Walker, err os.Error) {
walker = new(Walker)
walker.r = r
return walker, nil
}
// Next returns the next transaction group in the log as a slice of
// pointers to the protobuf-serialized log entries.
//
// This method will only attempt to serialize types with type identifiers
// that this package knows of. In case an unknown type identifier is found
// in a transaction group, it is silently ignored (it's skipped).
//
// On error, Next returns a nil slice and a non-nil err.
// When the end of the file is reached, Next returns nil, os.EOF.
func (walker *Walker) Next() (entries []interface{}, err os.Error) {
var (
remainBytes uint32
remainOps uint32
crcsum uint32
kind uint16
length uint16
)
err = binary.Read(walker.r, binary.LittleEndian, &remainBytes)
if isEOF(err) {
return nil, os.EOF
} else if err != nil {
return nil, err
}
if remainBytes < 8 {
return nil, ErrUnexpectedEndOfRecord
}
if remainBytes-8 > math.MaxUint8*math.MaxUint16 {
return nil, ErrRecordTooBig
}
err = binary.Read(walker.r, binary.LittleEndian, &remainOps)
if isEOF(err) {
return nil, ErrUnexpectedEndOfRecord
} else if err != nil {
return nil, err
}
err = binary.Read(walker.r, binary.LittleEndian, &crcsum)
if isEOF(err) {
return nil, ErrUnexpectedEndOfRecord
} else if err != nil {
return nil, err
}
remainBytes -= 8
reader := newTxReader(walker.r)
for remainOps > 0 {
err = binary.Read(reader, binary.LittleEndian, &kind)
if isEOF(err) {
break
} else if err != nil {
return nil, err
}
err = binary.Read(reader, binary.LittleEndian, &length)
if isEOF(err) {
break
} else if err != nil {
return nil, err
}
buf := make([]byte, length)
_, err = io.ReadFull(reader, buf)
if isEOF(err) {
break
} else if err != nil {
return nil, err
}
switch typeKind(kind) {
case ServerType:
server := &Server{}
err = proto.Unmarshal(buf, &server)
if isEOF(err) {
break
} else if err != nil {
return nil, err
}
entries = append(entries, server)
case ConfigKeyValuePairType:
cfg := &ConfigKeyValuePair{}
err = proto.Unmarshal(buf, cfg)
if isEOF(err) {
break
} else if err != nil {
return nil, err
}
entries = append(entries, cfg)
case BanListType:
banlist := &BanList{}
err = proto.Unmarshal(buf, banlist)
if isEOF(err) {
break
} else if err != nil {
return nil, err
}
entries = append(entries, banlist)
case UserType:
user := &User{}
err = proto.Unmarshal(buf, user)
if isEOF(err) {
break
} else if err != nil {
return nil, err
}
entries = append(entries, user)
case UserRemoveType:
userRemove := &UserRemove{}
err = proto.Unmarshal(buf, userRemove)
if isEOF(err) {
break
} else if err != nil {
return nil, err
}
entries = append(entries, userRemove)
case ChannelType:
channel := &Channel{}
err = proto.Unmarshal(buf, channel)
if isEOF(err) {
break
} else if err != nil {
return nil, err
}
entries = append(entries, channel)
case ChannelRemoveType:
channelRemove := &ChannelRemove{}
err = proto.Unmarshal(buf, channelRemove)
if isEOF(err) {
break
} else if err != nil {
return nil, err
}
entries = append(entries, channelRemove)
}
remainOps -= 1
continue
}
if isEOF(err) {
return nil, ErrUnexpectedEndOfRecord
}
if reader.Consumed() != int(remainBytes) {
return nil, ErrRemainingBytesForRecord
}
if reader.Sum32() != crcsum {
return nil, ErrCRC32Mismatch
}
return entries, nil
}

194
pkg/freezer/writer.go Normal file
View file

@ -0,0 +1,194 @@
// Copyright (c) 2011 The Grumble Authors
// The use of this source code is goverened by a BSD-style
// license that can be found in the LICENSE-file.
// Package freezer implements a persistence layer for Grumble.
package freezer
// The freezer package exports types that can be persisted to disk,
// both as part of a full server snapshot, and as part of a log of state changes.
//
// The freezer package also implements an append-only log writer that can be used
// to serialize the freezer types to disk in atomic entities called transactions
// records.
//
// A Walker type that can be used to iterate over the different transaction records
// of a log file is also provided.
import (
"bytes"
"encoding/binary"
"goprotobuf.googlecode.com/hg/proto"
"hash"
"hash/crc32"
"io"
"math"
"os"
)
// Log implements an append-only log for flattened
// protobuf-encoded log entries.
//
// These log entries are typically state-change deltas
// for a Grumble server's main data strutures.
//
// The log supports atomic transactions. Transaction groups
// are persisted to disk with a checksum that covers the
// whole transaction group. In case of a failure, none of the
// entries of a transaction will be applied.
type Log struct {
wc io.WriteCloser
}
// Type LogTx represents a transaction in the log.
// Transactions can be used to group several changes into an
// atomic entity in the log file.
type LogTx struct {
log *Log
crc hash.Hash32
buf *bytes.Buffer
numops int
}
// Create a new log file
func NewLogFile(fn string) (*Log, os.Error) {
f, err := os.Create(fn)
if err != nil {
return nil, err
}
log := new(Log)
log.wc = f
return log, nil
}
// Close a Log
func (log *Log) Close() os.Error {
return log.wc.Close()
}
// Append a log entry
//
// This method implicitly creates a transaction
// group for this single Put operation. It is merely
// a convenience wrapper.
func (log *Log) Put(value interface{}) (err os.Error) {
tx := log.BeginTx()
err = tx.Put(value)
if err != nil {
return err
}
return tx.Commit()
}
// Begin a transaction
func (log *Log) BeginTx() *LogTx {
tx := &LogTx{}
tx.log = log
tx.buf = new(bytes.Buffer)
tx.crc = crc32.NewIEEE()
return tx
}
// Append a log entry to the transaction.
// The transaction's log entries will not be persisted to
// the log until the Commit has been called on the transaction.
func (tx *LogTx) Put(value interface{}) (err os.Error) {
var (
buf []byte
kind typeKind
)
if tx.numops > 255 {
return ErrTxGroupFull
}
switch val := value.(type) {
case *Server:
kind = ServerType
buf, err = proto.Marshal(val)
case *ConfigKeyValuePair:
kind = ConfigKeyValuePairType
buf, err = proto.Marshal(val)
case *BanList:
kind = BanListType
buf, err = proto.Marshal(val)
case *User:
kind = UserType
buf, err = proto.Marshal(val)
case *UserRemove:
kind = UserRemoveType
buf, err = proto.Marshal(val)
case *Channel:
kind = ChannelType
buf, err = proto.Marshal(val)
case *ChannelRemove:
kind = ChannelRemoveType
buf, err = proto.Marshal(val)
default:
panic("Attempt to put an unknown type")
}
if err != nil {
return err
}
if len(buf) > math.MaxUint16 {
return ErrTxGroupValueTooBig
}
w := io.MultiWriter(tx.buf, tx.crc)
err = binary.Write(w, binary.LittleEndian, uint16(kind))
if err != nil {
return err
}
err = binary.Write(w, binary.LittleEndian, uint16(len(buf)))
if err != nil {
return err
}
_, err = w.Write(buf)
if err != nil {
return err
}
tx.numops += 1;
return nil
}
// Commit all changes of the transaction to the log
// as a single atomic entry.
func (tx *LogTx) Commit() (err os.Error) {
buf := new(bytes.Buffer)
err = binary.Write(buf, binary.LittleEndian, uint32(4 + 4 + tx.buf.Len()))
if err != nil {
return err
}
err = binary.Write(buf, binary.LittleEndian, uint32(tx.numops))
if err != nil {
return err
}
err = binary.Write(buf, binary.LittleEndian, tx.crc.Sum32())
if err != nil {
return err
}
_, err = buf.Write(tx.buf.Bytes())
if err != nil {
return err
}
_, err = tx.log.wc.Write(buf.Bytes())
if err != nil {
return err
}
return nil
}