1
0
Fork 0
forked from External/ergo

refactor of channel persistence to use UUIDs

This commit is contained in:
Shivaram Lingamneni 2023-01-04 05:06:21 -05:00
parent bceae9b739
commit 7ce0636276
18 changed files with 804 additions and 653 deletions

View file

@ -14,6 +14,8 @@ import (
"strings"
"time"
"github.com/ergochat/ergo/irc/bunt"
"github.com/ergochat/ergo/irc/datastore"
"github.com/ergochat/ergo/irc/modes"
"github.com/ergochat/ergo/irc/utils"
@ -21,12 +23,19 @@ import (
)
const (
// 'version' of the database schema
keySchemaVersion = "db.version"
// latest schema of the db
latestDbSchema = 22
// TODO migrate metadata keys as well
keyCloakSecret = "crypto.cloak_secret"
// 'version' of the database schema
// latest schema of the db
latestDbSchema = 23
)
var (
schemaVersionUUID = utils.UUID{0, 255, 85, 13, 212, 10, 191, 121, 245, 152, 142, 89, 97, 141, 219, 87} // AP9VDdQKv3n1mI5ZYY3bVw
cloakSecretUUID = utils.UUID{170, 214, 184, 208, 116, 181, 67, 75, 161, 23, 233, 16, 113, 251, 94, 229} // qta40HS1Q0uhF-kQcfte5Q
keySchemaVersion = bunt.BuntKey(datastore.TableMetadata, schemaVersionUUID)
keyCloakSecret = bunt.BuntKey(datastore.TableMetadata, cloakSecretUUID)
)
type SchemaChanger func(*Config, *buntdb.Tx) error
@ -99,10 +108,7 @@ func openDatabaseInternal(config *Config, allowAutoupgrade bool) (db *buntdb.DB,
// read the current version string
var version int
err = db.View(func(tx *buntdb.Tx) (err error) {
vStr, err := tx.Get(keySchemaVersion)
if err == nil {
version, err = strconv.Atoi(vStr)
}
version, err = retrieveSchemaVersion(tx)
return err
})
if err != nil {
@ -130,6 +136,17 @@ func openDatabaseInternal(config *Config, allowAutoupgrade bool) (db *buntdb.DB,
}
}
func retrieveSchemaVersion(tx *buntdb.Tx) (version int, err error) {
if val, err := tx.Get(keySchemaVersion); err == nil {
return strconv.Atoi(val)
}
// legacy key:
if val, err := tx.Get("db.version"); err == nil {
return strconv.Atoi(val)
}
return 0, buntdb.ErrNotFound
}
func performAutoUpgrade(currentVersion int, config *Config) (err error) {
path := config.Datastore.Path
log.Printf("attempting to auto-upgrade schema from version %d to %d\n", currentVersion, latestDbSchema)
@ -167,8 +184,12 @@ func UpgradeDB(config *Config) (err error) {
var version int
err = store.Update(func(tx *buntdb.Tx) error {
for {
vStr, _ := tx.Get(keySchemaVersion)
version, _ = strconv.Atoi(vStr)
if version == 0 {
version, err = retrieveSchemaVersion(tx)
if err != nil {
return err
}
}
if version == latestDbSchema {
// success!
break
@ -183,11 +204,12 @@ func UpgradeDB(config *Config) (err error) {
if err != nil {
return err
}
_, _, err = tx.Set(keySchemaVersion, strconv.Itoa(change.TargetVersion), nil)
version = change.TargetVersion
_, _, err = tx.Set(keySchemaVersion, strconv.Itoa(version), nil)
if err != nil {
return err
}
log.Printf("successfully updated schema to version %d\n", change.TargetVersion)
log.Printf("successfully updated schema to version %d\n", version)
}
return nil
})
@ -198,19 +220,17 @@ func UpgradeDB(config *Config) (err error) {
return err
}
func LoadCloakSecret(db *buntdb.DB) (result string) {
db.View(func(tx *buntdb.Tx) error {
result, _ = tx.Get(keyCloakSecret)
return nil
})
return
func LoadCloakSecret(dstore datastore.Datastore) (result string, err error) {
val, err := dstore.Get(datastore.TableMetadata, cloakSecretUUID)
if err != nil {
return
}
return string(val), nil
}
func StoreCloakSecret(db *buntdb.DB, secret string) {
db.Update(func(tx *buntdb.Tx) error {
tx.Set(keyCloakSecret, secret, nil)
return nil
})
func StoreCloakSecret(dstore datastore.Datastore, secret string) {
// TODO error checking
dstore.Set(datastore.TableMetadata, cloakSecretUUID, []byte(secret), time.Time{})
}
func schemaChangeV1toV2(config *Config, tx *buntdb.Tx) error {
@ -1112,6 +1132,92 @@ func schemaChangeV21To22(config *Config, tx *buntdb.Tx) error {
return nil
}
// first phase of document-oriented database refactor: channels
func schemaChangeV22ToV23(config *Config, tx *buntdb.Tx) error {
keyChannelExists := "channel.exists "
var channelNames []string
tx.AscendGreaterOrEqual("", keyChannelExists, func(key, value string) bool {
if !strings.HasPrefix(key, keyChannelExists) {
return false
}
channelNames = append(channelNames, strings.TrimPrefix(key, keyChannelExists))
return true
})
for _, channelName := range channelNames {
channel, err := loadLegacyChannel(tx, channelName)
if err != nil {
log.Printf("error loading legacy channel %s: %v", channelName, err)
continue
}
channel.UUID = utils.GenerateUUIDv4()
newKey := bunt.BuntKey(datastore.TableChannels, channel.UUID)
j, err := json.Marshal(channel)
if err != nil {
log.Printf("error marshaling channel %s: %v", channelName, err)
continue
}
tx.Set(newKey, string(j), nil)
deleteLegacyChannel(tx, channelName)
}
// purges
keyChannelPurged := "channel.purged "
var purgeKeys []string
var channelPurges []ChannelPurgeRecord
tx.AscendGreaterOrEqual("", keyChannelPurged, func(key, value string) bool {
if !strings.HasPrefix(key, keyChannelPurged) {
return false
}
purgeKeys = append(purgeKeys, key)
cfname := strings.TrimPrefix(key, keyChannelPurged)
var record ChannelPurgeRecord
err := json.Unmarshal([]byte(value), &record)
if err != nil {
log.Printf("error unmarshaling channel purge for %s: %v", cfname, err)
return true
}
record.NameCasefolded = cfname
record.UUID = utils.GenerateUUIDv4()
channelPurges = append(channelPurges, record)
return true
})
for _, record := range channelPurges {
newKey := bunt.BuntKey(datastore.TableChannelPurges, record.UUID)
j, err := json.Marshal(record)
if err != nil {
log.Printf("error marshaling channel purge %s: %v", record.NameCasefolded, err)
continue
}
tx.Set(newKey, string(j), nil)
}
for _, purgeKey := range purgeKeys {
tx.Delete(purgeKey)
}
// clean up denormalized account-to-channels mapping
keyAccountChannels := "account.channels "
var accountToChannels []string
tx.AscendGreaterOrEqual("", keyAccountChannels, func(key, value string) bool {
if !strings.HasPrefix(key, keyAccountChannels) {
return false
}
accountToChannels = append(accountToChannels, key)
return true
})
for _, key := range accountToChannels {
tx.Delete(key)
}
// migrate cloak secret
val, _ := tx.Get("crypto.cloak_secret")
tx.Set(keyCloakSecret, val, nil)
// bump the legacy version key to mark the database as downgrade-incompatible
tx.Set("db.version", "23", nil)
return nil
}
func getSchemaChange(initialVersion int) (result SchemaChange, ok bool) {
for _, change := range allChanges {
if initialVersion == change.InitialVersion {
@ -1227,4 +1333,9 @@ var allChanges = []SchemaChange{
TargetVersion: 22,
Changer: schemaChangeV21To22,
},
{
InitialVersion: 22,
TargetVersion: 23,
Changer: schemaChangeV22ToV23,
},
}