1
0
Fork 0
forked from External/ergo

upgrade buntdb

This commit is contained in:
Shivaram Lingamneni 2021-08-03 01:46:20 -04:00
parent 1389d89a9b
commit c5a9916302
15 changed files with 519 additions and 288 deletions

View file

@ -69,6 +69,7 @@ type DB struct {
keys *btree.BTree // a tree of all item ordered by key
exps *btree.BTree // a tree of items ordered by expiration
idxs map[string]*index // the index trees.
insIdxs []*index // a reuse buffer for gathering indexes
flushes int // a count of the number of disk flushes
closed bool // set when the database has been closed
config Config // the database configuration
@ -139,8 +140,8 @@ type exctx struct {
func Open(path string) (*DB, error) {
db := &DB{}
// initialize trees and indexes
db.keys = btree.New(lessCtx(nil))
db.exps = btree.New(lessCtx(&exctx{db}))
db.keys = btreeNew(lessCtx(nil))
db.exps = btreeNew(lessCtx(&exctx{db}))
db.idxs = make(map[string]*index)
// initialize default configuration
db.config = Config{
@ -200,10 +201,11 @@ func (db *DB) Save(wr io.Writer) error {
defer db.mu.RUnlock()
// use a buffered writer and flush every 4MB
var buf []byte
now := time.Now()
// iterated through every item in the database and write to the buffer
btreeAscend(db.keys, func(item interface{}) bool {
dbi := item.(*dbItem)
buf = dbi.writeSetTo(buf)
buf = dbi.writeSetTo(buf, now)
if len(buf) > 1024*1024*4 {
// flush when buffer is over 4MB
_, err = wr.Write(buf)
@ -283,7 +285,7 @@ func (idx *index) clearCopy() *index {
}
// initialize with empty trees
if nidx.less != nil {
nidx.btr = btree.New(lessCtx(nidx))
nidx.btr = btreeNew(lessCtx(nidx))
}
if nidx.rect != nil {
nidx.rtr = rtred.New(nidx)
@ -295,7 +297,7 @@ func (idx *index) clearCopy() *index {
func (idx *index) rebuild() {
// initialize trees
if idx.less != nil {
idx.btr = btree.New(lessCtx(idx))
idx.btr = btreeNew(lessCtx(idx))
}
if idx.rect != nil {
idx.rtr = rtred.New(idx)
@ -454,16 +456,23 @@ func (db *DB) SetConfig(config Config) error {
// will be replaced with the new one, and return the previous item.
func (db *DB) insertIntoDatabase(item *dbItem) *dbItem {
var pdbi *dbItem
// Generate a list of indexes that this item will be inserted in to.
idxs := db.insIdxs
for _, idx := range db.idxs {
if idx.match(item.key) {
idxs = append(idxs, idx)
}
}
prev := db.keys.Set(item)
if prev != nil {
// A previous item was removed from the keys tree. Let's
// fully delete this item from all indexes.
pdbi = prev.(*dbItem)
if pdbi.opts != nil && pdbi.opts.ex {
// Remove it from the exipres tree.
// Remove it from the expires tree.
db.exps.Delete(pdbi)
}
for _, idx := range db.idxs {
for _, idx := range idxs {
if idx.btr != nil {
// Remove it from the btree index.
idx.btr.Delete(pdbi)
@ -479,10 +488,7 @@ func (db *DB) insertIntoDatabase(item *dbItem) *dbItem {
// expires tree
db.exps.Set(item)
}
for _, idx := range db.idxs {
if !idx.match(item.key) {
continue
}
for i, idx := range idxs {
if idx.btr != nil {
// Add new item to btree index.
idx.btr.Set(item)
@ -491,7 +497,11 @@ func (db *DB) insertIntoDatabase(item *dbItem) *dbItem {
// Add new item to rtree index.
idx.rtr.Insert(item)
}
// clear the index
idxs[i] = nil
}
// reuse the index list slice
db.insIdxs = idxs[:0]
// we must return the previous item to the caller.
return pdbi
}
@ -512,6 +522,9 @@ func (db *DB) deleteFromDatabase(item *dbItem) *dbItem {
db.exps.Delete(pdbi)
}
for _, idx := range db.idxs {
if !idx.match(pdbi.key) {
continue
}
if idx.btr != nil {
// Remove it from the btree index.
idx.btr.Delete(pdbi)
@ -672,6 +685,7 @@ func (db *DB) Shrink() error {
}
done = true
var n int
now := time.Now()
btreeAscendGreaterOrEqual(db.keys, &dbItem{key: pivot},
func(item interface{}) bool {
dbi := item.(*dbItem)
@ -681,7 +695,7 @@ func (db *DB) Shrink() error {
done = false
return false
}
buf = dbi.writeSetTo(buf)
buf = dbi.writeSetTo(buf, now)
n++
return true
},
@ -908,8 +922,8 @@ func (db *DB) readLoad(rd io.Reader, modTime time.Time) (n int64, err error) {
db.deleteFromDatabase(&dbItem{key: parts[1]})
} else if (parts[0][0] == 'f' || parts[0][0] == 'F') &&
strings.ToLower(parts[0]) == "flushdb" {
db.keys = btree.New(lessCtx(nil))
db.exps = btree.New(lessCtx(&exctx{db}))
db.keys = btreeNew(lessCtx(nil))
db.exps = btreeNew(lessCtx(&exctx{db}))
db.idxs = make(map[string]*index)
} else {
return totalSize, ErrInvalid
@ -941,11 +955,16 @@ func (db *DB) load() error {
return err
}
}
pos, err := db.file.Seek(n, 0)
if err != nil {
if _, err := db.file.Seek(n, 0); err != nil {
return err
}
db.lastaofsz = int(pos)
var estaofsz int
db.keys.Walk(func(items []interface{}) {
for _, v := range items {
estaofsz += v.(*dbItem).estAOFSetSize()
}
})
db.lastaofsz += estaofsz
return nil
}
@ -1054,8 +1073,8 @@ func (tx *Tx) DeleteAll() error {
}
// now reset the live database trees
tx.db.keys = btree.New(lessCtx(nil))
tx.db.exps = btree.New(lessCtx(&exctx{tx.db}))
tx.db.keys = btreeNew(lessCtx(nil))
tx.db.exps = btreeNew(lessCtx(&exctx{tx.db}))
tx.db.idxs = make(map[string]*index)
// finally re-create the indexes
@ -1165,12 +1184,13 @@ func (tx *Tx) Commit() error {
if tx.wc.rbkeys != nil {
tx.db.buf = append(tx.db.buf, "*1\r\n$7\r\nflushdb\r\n"...)
}
now := time.Now()
// Each committed record is written to disk
for key, item := range tx.wc.commitItems {
if item == nil {
tx.db.buf = (&dbItem{key: key}).writeDeleteTo(tx.db.buf)
} else {
tx.db.buf = item.writeSetTo(tx.db.buf)
tx.db.buf = item.writeSetTo(tx.db.buf, now)
}
}
// Flushing the buffer only once per transaction.
@ -1243,16 +1263,53 @@ type dbItem struct {
keyless bool // keyless item for scanning
}
func estIntSize(x int) int {
if x == 0 {
return 1
}
var n int
for x > 0 {
n++
x /= 10
}
return n
}
func estArraySize(count int) int {
return 1 + estIntSize(count) + 2
}
func estBulkStringSize(s string) int {
return 1 + estIntSize(len(s)) + 2 + len(s) + 2
}
func (dbi *dbItem) estAOFSetSize() (n int) {
if dbi.opts != nil && dbi.opts.ex {
n += estArraySize(5)
n += estBulkStringSize("set")
n += estBulkStringSize(dbi.key)
n += estBulkStringSize(dbi.val)
n += estBulkStringSize("ex")
n += estBulkStringSize("99") // estimate two byte bulk string
} else {
n += estArraySize(3)
n += estBulkStringSize("set")
n += estBulkStringSize(dbi.key)
n += estBulkStringSize(dbi.val)
}
return n
}
func appendArray(buf []byte, count int) []byte {
buf = append(buf, '*')
buf = append(buf, strconv.FormatInt(int64(count), 10)...)
buf = strconv.AppendInt(buf, int64(count), 10)
buf = append(buf, '\r', '\n')
return buf
}
func appendBulkString(buf []byte, s string) []byte {
buf = append(buf, '$')
buf = append(buf, strconv.FormatInt(int64(len(s)), 10)...)
buf = strconv.AppendInt(buf, int64(len(s)), 10)
buf = append(buf, '\r', '\n')
buf = append(buf, s...)
buf = append(buf, '\r', '\n')
@ -1260,9 +1317,9 @@ func appendBulkString(buf []byte, s string) []byte {
}
// writeSetTo writes an item as a single SET record to the a bufio Writer.
func (dbi *dbItem) writeSetTo(buf []byte) []byte {
func (dbi *dbItem) writeSetTo(buf []byte, now time.Time) []byte {
if dbi.opts != nil && dbi.opts.ex {
ex := time.Until(dbi.opts.exat) / time.Second
ex := dbi.opts.exat.Sub(now) / time.Second
buf = appendArray(buf, 5)
buf = appendBulkString(buf, "set")
buf = appendBulkString(buf, dbi.key)
@ -2300,3 +2357,8 @@ func btreeDescendLessOrEqual(tr *btree.BTree, pivot interface{},
) {
tr.Descend(pivot, iter)
}
func btreeNew(less func(a, b interface{}) bool) *btree.BTree {
// Using NewNonConcurrent because we're managing our own locks.
return btree.NewNonConcurrent(less)
}