remove registeredChannelsMutex

This moves channel registration to an eventual consistency model,
where the in-memory datastructures (Channel and ChannelManager)
are the exclusive source of truth, and updates to them get persisted
asynchronously to the DB.
This commit is contained in:
Shivaram Lingamneni 2017-11-08 22:19:50 -05:00
parent d5832bf765
commit d4cb15354f
7 changed files with 318 additions and 247 deletions

View file

@ -4,9 +4,9 @@
package irc
import (
"errors"
"fmt"
"strconv"
"sync"
"time"
"encoding/json"
@ -14,6 +14,9 @@ import (
"github.com/tidwall/buntdb"
)
// this is exclusively the *persistence* layer for channel registration;
// channel creation/tracking/destruction is in channelmanager.go
const (
keyChannelExists = "channel.exists %s"
keyChannelName = "channel.name %s" // stores the 'preferred name' of the channel, not casemapped
@ -28,7 +31,18 @@ const (
)
var (
errChanExists = errors.New("Channel already exists")
channelKeyStrings = []string{
keyChannelExists,
keyChannelName,
keyChannelRegTime,
keyChannelFounder,
keyChannelTopic,
keyChannelTopicSetBy,
keyChannelTopicSetTime,
keyChannelBanlist,
keyChannelExceptlist,
keyChannelInvitelist,
}
)
// RegisteredChannel holds details about a given registered channel.
@ -53,62 +67,142 @@ type RegisteredChannel struct {
Invitelist []string
}
// deleteChannelNoMutex deletes a given channel from our store.
func (server *Server) deleteChannelNoMutex(tx *buntdb.Tx, channelKey string) {
tx.Delete(fmt.Sprintf(keyChannelExists, channelKey))
server.registeredChannels[channelKey] = nil
type ChannelRegistry struct {
// this serializes operations of the form (read channel state, synchronously persist it);
// this is enough to guarantee eventual consistency of the database with the
// ChannelManager and Channel objects, which are the source of truth.
// Wwe could use the buntdb RW transaction lock for this purpose but we share
// that with all the other modules, so let's not.
sync.Mutex // tier 2
server *Server
channels map[string]*RegisteredChannel
}
// loadChannelNoMutex loads a channel from the store.
func (server *Server) loadChannelNoMutex(tx *buntdb.Tx, channelKey string) *RegisteredChannel {
// return loaded chan if it already exists
if server.registeredChannels[channelKey] != nil {
return server.registeredChannels[channelKey]
func NewChannelRegistry(server *Server) *ChannelRegistry {
return &ChannelRegistry{
server: server,
}
_, err := tx.Get(fmt.Sprintf(keyChannelExists, channelKey))
if err == buntdb.ErrNotFound {
// chan does not already exist, return
}
// StoreChannel obtains a consistent view of a channel, then persists it to the store.
func (reg *ChannelRegistry) StoreChannel(channel *Channel, includeLists bool) {
if !reg.server.ChannelRegistrationEnabled() {
return
}
reg.Lock()
defer reg.Unlock()
key := channel.NameCasefolded()
info := channel.ExportRegistration(includeLists)
if info.Founder == "" {
// sanity check, don't try to store an unregistered channel
return
}
reg.server.store.Update(func(tx *buntdb.Tx) error {
reg.saveChannel(tx, key, info, includeLists)
return nil
})
}
// LoadChannel loads a channel from the store.
func (reg *ChannelRegistry) LoadChannel(nameCasefolded string) (info *RegisteredChannel) {
if !reg.server.ChannelRegistrationEnabled() {
return nil
}
// channel exists, load it
name, _ := tx.Get(fmt.Sprintf(keyChannelName, channelKey))
regTime, _ := tx.Get(fmt.Sprintf(keyChannelRegTime, channelKey))
regTimeInt, _ := strconv.ParseInt(regTime, 10, 64)
founder, _ := tx.Get(fmt.Sprintf(keyChannelFounder, channelKey))
topic, _ := tx.Get(fmt.Sprintf(keyChannelTopic, channelKey))
topicSetBy, _ := tx.Get(fmt.Sprintf(keyChannelTopicSetBy, channelKey))
topicSetTime, _ := tx.Get(fmt.Sprintf(keyChannelTopicSetTime, channelKey))
topicSetTimeInt, _ := strconv.ParseInt(topicSetTime, 10, 64)
banlistString, _ := tx.Get(fmt.Sprintf(keyChannelBanlist, channelKey))
exceptlistString, _ := tx.Get(fmt.Sprintf(keyChannelExceptlist, channelKey))
invitelistString, _ := tx.Get(fmt.Sprintf(keyChannelInvitelist, channelKey))
channelKey := nameCasefolded
// nice to have: do all JSON (de)serialization outside of the buntdb transaction
reg.server.store.View(func(tx *buntdb.Tx) error {
_, err := tx.Get(fmt.Sprintf(keyChannelExists, channelKey))
if err == buntdb.ErrNotFound {
// chan does not already exist, return
return nil
}
var banlist []string
_ = json.Unmarshal([]byte(banlistString), &banlist)
var exceptlist []string
_ = json.Unmarshal([]byte(exceptlistString), &exceptlist)
var invitelist []string
_ = json.Unmarshal([]byte(invitelistString), &invitelist)
// channel exists, load it
name, _ := tx.Get(fmt.Sprintf(keyChannelName, channelKey))
regTime, _ := tx.Get(fmt.Sprintf(keyChannelRegTime, channelKey))
regTimeInt, _ := strconv.ParseInt(regTime, 10, 64)
founder, _ := tx.Get(fmt.Sprintf(keyChannelFounder, channelKey))
topic, _ := tx.Get(fmt.Sprintf(keyChannelTopic, channelKey))
topicSetBy, _ := tx.Get(fmt.Sprintf(keyChannelTopicSetBy, channelKey))
topicSetTime, _ := tx.Get(fmt.Sprintf(keyChannelTopicSetTime, channelKey))
topicSetTimeInt, _ := strconv.ParseInt(topicSetTime, 10, 64)
banlistString, _ := tx.Get(fmt.Sprintf(keyChannelBanlist, channelKey))
exceptlistString, _ := tx.Get(fmt.Sprintf(keyChannelExceptlist, channelKey))
invitelistString, _ := tx.Get(fmt.Sprintf(keyChannelInvitelist, channelKey))
chanInfo := RegisteredChannel{
Name: name,
RegisteredAt: time.Unix(regTimeInt, 0),
Founder: founder,
Topic: topic,
TopicSetBy: topicSetBy,
TopicSetTime: time.Unix(topicSetTimeInt, 0),
Banlist: banlist,
Exceptlist: exceptlist,
Invitelist: invitelist,
}
server.registeredChannels[channelKey] = &chanInfo
var banlist []string
_ = json.Unmarshal([]byte(banlistString), &banlist)
var exceptlist []string
_ = json.Unmarshal([]byte(exceptlistString), &exceptlist)
var invitelist []string
_ = json.Unmarshal([]byte(invitelistString), &invitelist)
return &chanInfo
info = &RegisteredChannel{
Name: name,
RegisteredAt: time.Unix(regTimeInt, 0),
Founder: founder,
Topic: topic,
TopicSetBy: topicSetBy,
TopicSetTime: time.Unix(topicSetTimeInt, 0),
Banlist: banlist,
Exceptlist: exceptlist,
Invitelist: invitelist,
}
return nil
})
return info
}
// saveChannelNoMutex saves a channel to the store.
func (server *Server) saveChannelNoMutex(tx *buntdb.Tx, channelKey string, channelInfo RegisteredChannel) {
// Rename handles the persistence part of a channel rename: the channel is
// persisted under its new name, and the old name is cleaned up if necessary.
func (reg *ChannelRegistry) Rename(channel *Channel, casefoldedOldName string) {
if !reg.server.ChannelRegistrationEnabled() {
return
}
reg.Lock()
defer reg.Unlock()
includeLists := true
oldKey := casefoldedOldName
key := channel.NameCasefolded()
info := channel.ExportRegistration(includeLists)
if info.Founder == "" {
return
}
reg.server.store.Update(func(tx *buntdb.Tx) error {
reg.deleteChannel(tx, oldKey, info)
reg.saveChannel(tx, key, info, includeLists)
return nil
})
}
// delete a channel, unless it was overwritten by another registration of the same channel
func (reg *ChannelRegistry) deleteChannel(tx *buntdb.Tx, key string, info RegisteredChannel) {
_, err := tx.Get(fmt.Sprintf(keyChannelExists, key))
if err == nil {
regTime, _ := tx.Get(fmt.Sprintf(keyChannelRegTime, key))
regTimeInt, _ := strconv.ParseInt(regTime, 10, 64)
registeredAt := time.Unix(regTimeInt, 0)
founder, _ := tx.Get(fmt.Sprintf(keyChannelFounder, key))
// to see if we're deleting the right channel, confirm the founder and the registration time
if founder == info.Founder && registeredAt == info.RegisteredAt {
for _, keyFmt := range channelKeyStrings {
tx.Delete(fmt.Sprintf(keyFmt, key))
}
}
}
}
// saveChannel saves a channel to the store.
func (reg *ChannelRegistry) saveChannel(tx *buntdb.Tx, channelKey string, channelInfo RegisteredChannel, includeLists bool) {
tx.Set(fmt.Sprintf(keyChannelExists, channelKey), "1", nil)
tx.Set(fmt.Sprintf(keyChannelName, channelKey), channelInfo.Name, nil)
tx.Set(fmt.Sprintf(keyChannelRegTime, channelKey), strconv.FormatInt(channelInfo.RegisteredAt.Unix(), 10), nil)
@ -117,12 +211,12 @@ func (server *Server) saveChannelNoMutex(tx *buntdb.Tx, channelKey string, chann
tx.Set(fmt.Sprintf(keyChannelTopicSetBy, channelKey), channelInfo.TopicSetBy, nil)
tx.Set(fmt.Sprintf(keyChannelTopicSetTime, channelKey), strconv.FormatInt(channelInfo.TopicSetTime.Unix(), 10), nil)
banlistString, _ := json.Marshal(channelInfo.Banlist)
tx.Set(fmt.Sprintf(keyChannelBanlist, channelKey), string(banlistString), nil)
exceptlistString, _ := json.Marshal(channelInfo.Exceptlist)
tx.Set(fmt.Sprintf(keyChannelExceptlist, channelKey), string(exceptlistString), nil)
invitelistString, _ := json.Marshal(channelInfo.Invitelist)
tx.Set(fmt.Sprintf(keyChannelInvitelist, channelKey), string(invitelistString), nil)
server.registeredChannels[channelKey] = &channelInfo
if includeLists {
banlistString, _ := json.Marshal(channelInfo.Banlist)
tx.Set(fmt.Sprintf(keyChannelBanlist, channelKey), string(banlistString), nil)
exceptlistString, _ := json.Marshal(channelInfo.Exceptlist)
tx.Set(fmt.Sprintf(keyChannelExceptlist, channelKey), string(exceptlistString), nil)
invitelistString, _ := json.Marshal(channelInfo.Invitelist)
tx.Set(fmt.Sprintf(keyChannelInvitelist, channelKey), string(invitelistString), nil)
}
}