forked from External/ergo
improvements to message replay code
This commit is contained in:
parent
939729a7c0
commit
b11bf503e7
10 changed files with 338 additions and 165 deletions
|
|
@ -23,8 +23,19 @@ const (
|
|||
// buffer will silently create a batch if required and label the outgoing messages as
|
||||
// necessary (or leave it off and simply tag the outgoing message).
|
||||
type ResponseBuffer struct {
|
||||
Label string
|
||||
batchID string
|
||||
Label string // label if this is a labeled response batch
|
||||
batchID string // ID of the labeled response batch, if one has been initiated
|
||||
batchType string // type of the labeled response batch (possibly `history` or `chathistory`)
|
||||
|
||||
// stack of batch IDs of nested batches, which are handled separately
|
||||
// from the underlying labeled-response batch. starting a new nested batch
|
||||
// unconditionally enqueues its batch start message; subsequent messages
|
||||
// are tagged with the nested batch ID, until nested batch end.
|
||||
// (the nested batch start itself may have no batch tag, or the batch tag of the
|
||||
// underlying labeled-response batch, or the batch tag of the next outermost
|
||||
// nested batch.)
|
||||
nestedBatches []string
|
||||
|
||||
messages []ircmsg.IrcMessage
|
||||
finalized bool
|
||||
target *Client
|
||||
|
|
@ -40,8 +51,9 @@ func GetLabel(msg ircmsg.IrcMessage) string {
|
|||
// NewResponseBuffer returns a new ResponseBuffer.
|
||||
func NewResponseBuffer(session *Session) *ResponseBuffer {
|
||||
return &ResponseBuffer{
|
||||
session: session,
|
||||
target: session.client,
|
||||
session: session,
|
||||
target: session.client,
|
||||
batchType: defaultBatchType,
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -54,6 +66,9 @@ func (rb *ResponseBuffer) AddMessage(msg ircmsg.IrcMessage) {
|
|||
return
|
||||
}
|
||||
|
||||
if 0 < len(rb.nestedBatches) {
|
||||
msg.SetTag("batch", rb.nestedBatches[len(rb.nestedBatches)-1])
|
||||
}
|
||||
rb.messages = append(rb.messages, msg)
|
||||
}
|
||||
|
||||
|
|
@ -63,9 +78,11 @@ func (rb *ResponseBuffer) Add(tags map[string]string, prefix string, command str
|
|||
}
|
||||
|
||||
// AddFromClient adds a new message from a specific client to our queue.
|
||||
func (rb *ResponseBuffer) AddFromClient(msgid string, fromNickMask string, fromAccount string, tags map[string]string, command string, params ...string) {
|
||||
func (rb *ResponseBuffer) AddFromClient(time time.Time, msgid string, fromNickMask string, fromAccount string, tags map[string]string, command string, params ...string) {
|
||||
msg := ircmsg.MakeMessage(nil, fromNickMask, command, params...)
|
||||
msg.UpdateTags(tags)
|
||||
if rb.session.capabilities.Has(caps.MessageTags) {
|
||||
msg.UpdateTags(tags)
|
||||
}
|
||||
|
||||
// attach account-tag
|
||||
if rb.session.capabilities.Has(caps.AccountTag) && fromAccount != "*" {
|
||||
|
|
@ -75,6 +92,10 @@ func (rb *ResponseBuffer) AddFromClient(msgid string, fromNickMask string, fromA
|
|||
if len(msgid) > 0 && rb.session.capabilities.Has(caps.MessageTags) {
|
||||
msg.SetTag("draft/msgid", msgid)
|
||||
}
|
||||
// attach server-time
|
||||
if rb.session.capabilities.Has(caps.ServerTime) && !msg.HasTag("time") {
|
||||
msg.SetTag("time", time.UTC().Format(IRCv3TimestampFormat))
|
||||
}
|
||||
|
||||
rb.AddMessage(msg)
|
||||
}
|
||||
|
|
@ -82,33 +103,31 @@ func (rb *ResponseBuffer) AddFromClient(msgid string, fromNickMask string, fromA
|
|||
// AddSplitMessageFromClient adds a new split message from a specific client to our queue.
|
||||
func (rb *ResponseBuffer) AddSplitMessageFromClient(fromNickMask string, fromAccount string, tags map[string]string, command string, target string, message utils.SplitMessage) {
|
||||
if rb.session.capabilities.Has(caps.MaxLine) || message.Wrapped == nil {
|
||||
rb.AddFromClient(message.Msgid, fromNickMask, fromAccount, tags, command, target, message.Message)
|
||||
rb.AddFromClient(message.Time, message.Msgid, fromNickMask, fromAccount, tags, command, target, message.Message)
|
||||
} else {
|
||||
for _, messagePair := range message.Wrapped {
|
||||
rb.AddFromClient(messagePair.Msgid, fromNickMask, fromAccount, tags, command, target, messagePair.Message)
|
||||
rb.AddFromClient(message.Time, messagePair.Msgid, fromNickMask, fromAccount, tags, command, target, messagePair.Message)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// InitializeBatch forcibly starts a batch of batch `batchType`.
|
||||
// ForceBatchStart forcibly starts a batch of batch `batchType`.
|
||||
// Normally, Send/Flush will decide automatically whether to start a batch
|
||||
// of type draft/labeled-response. This allows changing the batch type
|
||||
// and forcing the creation of a possibly empty batch.
|
||||
func (rb *ResponseBuffer) InitializeBatch(batchType string, blocking bool) {
|
||||
rb.sendBatchStart(batchType, blocking)
|
||||
func (rb *ResponseBuffer) ForceBatchStart(batchType string, blocking bool) {
|
||||
rb.batchType = batchType
|
||||
rb.sendBatchStart(blocking)
|
||||
}
|
||||
|
||||
func (rb *ResponseBuffer) sendBatchStart(batchType string, blocking bool) {
|
||||
func (rb *ResponseBuffer) sendBatchStart(blocking bool) {
|
||||
if rb.batchID != "" {
|
||||
// batch already initialized
|
||||
return
|
||||
}
|
||||
|
||||
// formerly this combined time.Now.UnixNano() in base 36 with an incrementing counter,
|
||||
// also in base 36. but let's just use a uuidv4-alike (26 base32 characters):
|
||||
rb.batchID = utils.GenerateSecretToken()
|
||||
|
||||
message := ircmsg.MakeMessage(nil, rb.target.server.name, "BATCH", "+"+rb.batchID, batchType)
|
||||
message := ircmsg.MakeMessage(nil, rb.target.server.name, "BATCH", "+"+rb.batchID, rb.batchType)
|
||||
if rb.Label != "" {
|
||||
message.SetTag(caps.LabelTagName, rb.Label)
|
||||
}
|
||||
|
|
@ -125,6 +144,50 @@ func (rb *ResponseBuffer) sendBatchEnd(blocking bool) {
|
|||
rb.session.SendRawMessage(message, blocking)
|
||||
}
|
||||
|
||||
// Starts a nested batch (see the ResponseBuffer struct definition for a description of
|
||||
// how this works)
|
||||
func (rb *ResponseBuffer) StartNestedBatch(batchType string, params ...string) (batchID string) {
|
||||
batchID = utils.GenerateSecretToken()
|
||||
msgParams := make([]string, len(params)+2)
|
||||
msgParams[0] = "+" + batchID
|
||||
msgParams[1] = batchType
|
||||
copy(msgParams[2:], params)
|
||||
rb.AddMessage(ircmsg.MakeMessage(nil, rb.target.server.name, "BATCH", msgParams...))
|
||||
rb.nestedBatches = append(rb.nestedBatches, batchID)
|
||||
return
|
||||
}
|
||||
|
||||
// Ends a nested batch
|
||||
func (rb *ResponseBuffer) EndNestedBatch(batchID string) {
|
||||
if batchID == "" {
|
||||
return
|
||||
}
|
||||
|
||||
if 0 == len(rb.nestedBatches) || rb.nestedBatches[len(rb.nestedBatches)-1] != batchID {
|
||||
rb.target.server.logger.Error("internal", "inconsistent batch nesting detected")
|
||||
debug.PrintStack()
|
||||
return
|
||||
}
|
||||
|
||||
rb.nestedBatches = rb.nestedBatches[0 : len(rb.nestedBatches)-1]
|
||||
rb.AddMessage(ircmsg.MakeMessage(nil, rb.target.server.name, "BATCH", "-"+batchID))
|
||||
}
|
||||
|
||||
// Convenience to start a nested batch for history lines, at the highest level
|
||||
// supported by the client (`history`, `chathistory`, or no batch, in descending order).
|
||||
func (rb *ResponseBuffer) StartNestedHistoryBatch(params ...string) (batchID string) {
|
||||
var batchType string
|
||||
if rb.session.capabilities.Has(caps.EventPlayback) {
|
||||
batchType = "history"
|
||||
} else if rb.session.capabilities.Has(caps.Batch) {
|
||||
batchType = "chathistory"
|
||||
}
|
||||
if batchType != "" {
|
||||
batchID = rb.StartNestedBatch(batchType, params...)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Send sends all messages in the buffer to the client.
|
||||
// Afterwards, the buffer is in an undefined state and MUST NOT be used further.
|
||||
// If `blocking` is true you MUST be sending to the client from its own goroutine.
|
||||
|
|
@ -158,7 +221,7 @@ func (rb *ResponseBuffer) flushInternal(final bool, blocking bool) error {
|
|||
if useLabel && !useBatch && len(rb.messages) == 1 && rb.batchID == "" {
|
||||
rb.messages[0].SetTag(caps.LabelTagName, rb.Label)
|
||||
} else if useBatch {
|
||||
rb.sendBatchStart(defaultBatchType, blocking)
|
||||
rb.sendBatchStart(blocking)
|
||||
}
|
||||
|
||||
// send each message out
|
||||
|
|
@ -168,8 +231,9 @@ func (rb *ResponseBuffer) flushInternal(final bool, blocking bool) error {
|
|||
message.SetTag("time", time.Now().UTC().Format(IRCv3TimestampFormat))
|
||||
}
|
||||
|
||||
// attach batch ID
|
||||
if rb.batchID != "" {
|
||||
// attach batch ID, unless this message was part of a nested batch and is
|
||||
// already tagged
|
||||
if rb.batchID != "" && !message.HasTag("batch") {
|
||||
message.SetTag("batch", rb.batchID)
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue