1
0
Fork 0
forked from External/ergo

history replay enhancements

This commit is contained in:
Shivaram Lingamneni 2018-12-28 13:45:55 -05:00
parent f3d138d909
commit 2c7c8fbaf9
18 changed files with 264 additions and 255 deletions

View file

@ -32,9 +32,12 @@ type Item struct {
// this is the uncasefolded account name, if there's no account it should be set to "*"
AccountName string
Message utils.SplitMessage
Msgid string
// for non-privmsg items, we may stuff some other data in here
Msgid string
}
type Predicate func(item Item) (matches bool)
// Buffer is a ring buffer holding message/event history for a channel or user
type Buffer struct {
sync.RWMutex
@ -85,7 +88,7 @@ func (list *Buffer) Add(item Item) {
}
if item.Time.IsZero() {
item.Time = time.Now()
item.Time = time.Now().UTC()
}
list.Lock()
@ -112,6 +115,12 @@ func (list *Buffer) Add(item Item) {
list.buffer[pos] = item
}
func reverse(results []Item) {
for i, j := 0, len(results)-1; i < j; i, j = i+1, j-1 {
results[i], results[j] = results[j], results[i]
}
}
// Between returns all history items with a time `after` <= time <= `before`,
// with an indication of whether the results are complete or are missing items
// because some of that period was discarded. A zero value of `before` is considered
@ -126,51 +135,55 @@ func (list *Buffer) Between(after, before time.Time) (results []Item, complete b
complete = after.Equal(list.lastDiscarded) || after.After(list.lastDiscarded)
satisfies := func(item Item) bool {
return (after.IsZero() || item.Time.After(after)) && (before.IsZero() || item.Time.Before(before))
}
return list.matchInternal(satisfies, 0), complete
}
// Match returns all history items such that `predicate` returns true for them.
// Items are considered in reverse insertion order, up to a total of `limit` matches.
// `predicate` MAY be a closure that maintains its own state across invocations;
// it MUST NOT acquire any locks or otherwise do anything weird.
func (list *Buffer) Match(predicate Predicate, limit int) (results []Item) {
if !list.Enabled() {
return
}
list.RLock()
defer list.RUnlock()
return list.matchInternal(predicate, limit)
}
// you must be holding the read lock to call this
func (list *Buffer) matchInternal(predicate Predicate, limit int) (results []Item) {
if list.start == -1 {
return
}
satisfies := func(itime time.Time) bool {
return (after.IsZero() || itime.After(after)) && (before.IsZero() || itime.Before(before))
}
// TODO: if we can guarantee that the insertion order is also the monotonic clock order,
// then this can do a single allocation and use binary search and 1-2 copy calls
pos := list.prev(list.end)
for {
if satisfies(list.buffer[pos].Time) {
if predicate(list.buffer[pos]) {
results = append(results, list.buffer[pos])
}
if pos == list.start {
if pos == list.start || (limit != 0 && len(results) == limit) {
break
}
pos = list.prev(pos)
}
// reverse the results
for i, j := 0, len(results)-1; i < j; i, j = i+1, j-1 {
results[i], results[j] = results[j], results[i]
}
// TODO sort by time instead?
reverse(results)
return
}
// All returns all available history items as a slice
func (list *Buffer) All() (results []Item) {
list.RLock()
defer list.RUnlock()
if list.start == -1 {
return
}
results = make([]Item, list.length())
if list.start < list.end {
copy(results, list.buffer[list.start:list.end])
} else {
initialSegment := copy(results, list.buffer[list.start:])
copy(results[initialSegment:], list.buffer[:list.end])
}
return
// Latest returns the items most recently added, up to `limit`. If `limit` is 0,
// it returns all items.
func (list *Buffer) Latest(limit int) (results []Item) {
matchAll := func(item Item) bool { return true }
return list.Match(matchAll, limit)
}
// LastDiscarded returns the latest time of any entry that was evicted