initial implementation of bouncer functionality

This commit is contained in:
Shivaram Lingamneni 2019-04-12 00:08:46 -04:00
parent a8f04ecc4d
commit c2faeed4b5
19 changed files with 733 additions and 441 deletions

View file

@ -50,26 +50,16 @@ type Client struct {
accountName string // display name of the account: uncasefolded, '*' if not logged in
atime time.Time
awayMessage string
capabilities caps.Set
capState caps.State
capVersion caps.Version
certfp string
channels ChannelSet
ctime time.Time
exitedSnomaskSent bool
fakelag Fakelag
flags modes.ModeSet
hasQuit bool
hops int
hostname string
idletimer IdleTimer
invitedTo map[string]bool
isDestroyed bool
isTor bool
isQuitting bool
languages []string
loginThrottle connection_limits.GenericThrottle
maxlenRest uint32
nick string
nickCasefolded string
nickMaskCasefolded string
@ -78,7 +68,6 @@ type Client struct {
oper *Oper
preregNick string
proxiedIP net.IP // actual remote IP if using the PROXY protocol
quitMessage string
rawHostname string
realname string
realIP net.IP
@ -91,13 +80,64 @@ type Client struct {
sentPassCommand bool
server *Server
skeleton string
socket *Socket
sessions []*Session
stateMutex sync.RWMutex // tier 1
username string
vhost string
history *history.Buffer
}
// Session is an individual client connection to the server (TCP connection
// and associated per-connection data, such as capabilities). There is a
// many-one relationship between sessions and clients.
type Session struct {
client *Client
socket *Socket
idletimer IdleTimer
fakelag Fakelag
quitMessage string
capabilities caps.Set
maxlenRest uint32
capState caps.State
capVersion caps.Version
// TODO track per-connection real IP, proxied IP, and hostname here,
// so we can list attached sessions and their details
}
// sets the session quit message, if there isn't one already
func (sd *Session) SetQuitMessage(message string) (set bool) {
if message == "" {
if sd.quitMessage == "" {
sd.quitMessage = "Connection closed"
return true
} else {
return false
}
} else {
sd.quitMessage = message
return true
}
}
// set the negotiated message length based on session capabilities
func (session *Session) SetMaxlenRest() {
maxlenRest := 512
if session.capabilities.Has(caps.MaxLine) {
maxlenRest = session.client.server.Config().Limits.LineLen.Rest
}
atomic.StoreUint32(&session.maxlenRest, uint32(maxlenRest))
}
// allow the negotiated message length limit to be read without locks; this is a convenience
// so that Session.SendRawMessage doesn't have to acquire any Client locks
func (session *Session) MaxlenRest() int {
return int(atomic.LoadUint32(&session.maxlenRest))
}
// WhoWas is the subset of client details needed to answer a WHOWAS query
type WhoWas struct {
nick string
@ -125,32 +165,35 @@ func RunNewClient(server *Server, conn clientConn) {
// give them 1k of grace over the limit:
socket := NewSocket(conn.Conn, fullLineLenLimit+1024, config.Server.MaxSendQBytes)
client := &Client{
atime: now,
capState: caps.NoneState,
capVersion: caps.Cap301,
channels: make(ChannelSet),
ctime: now,
isTor: conn.IsTor,
languages: server.Languages().Default(),
atime: now,
channels: make(ChannelSet),
ctime: now,
isTor: conn.IsTor,
languages: server.Languages().Default(),
loginThrottle: connection_limits.GenericThrottle{
Duration: config.Accounts.LoginThrottling.Duration,
Limit: config.Accounts.LoginThrottling.MaxAttempts,
},
server: server,
socket: socket,
accountName: "*",
nick: "*", // * is used until actual nick is given
nickCasefolded: "*",
nickMaskString: "*", // * is used until actual nick is given
history: history.NewHistoryBuffer(config.History.ClientLength),
}
client.recomputeMaxlens()
session := &Session{
client: client,
socket: socket,
capVersion: caps.Cap301,
capState: caps.NoneState,
}
session.SetMaxlenRest()
client.sessions = []*Session{session}
if conn.IsTLS {
client.SetMode(modes.TLS, true)
// error is not useful to us here anyways so we can ignore it
client.certfp, _ = client.socket.CertFP()
client.certfp, _ = socket.CertFP()
}
if conn.IsTor {
@ -168,7 +211,7 @@ func RunNewClient(server *Server, conn clientConn) {
}
}
client.run()
client.run(session)
}
func (client *Client) doIdentLookup(conn net.Conn) {
@ -214,10 +257,10 @@ func (client *Client) isAuthorized(config *Config) bool {
return !config.Accounts.RequireSasl.Enabled || saslSent || utils.IPInNets(client.IP(), config.Accounts.RequireSasl.exemptedNets)
}
func (client *Client) resetFakelag() {
var flc FakelagConfig = client.server.Config().Fakelag
flc.Enabled = flc.Enabled && !client.HasRoleCapabs("nofakelag")
client.fakelag.Initialize(flc)
func (session *Session) resetFakelag() {
var flc FakelagConfig = session.client.server.Config().Fakelag
flc.Enabled = flc.Enabled && !session.client.HasRoleCapabs("nofakelag")
session.fakelag.Initialize(flc)
}
// IP returns the IP address of this client.
@ -244,28 +287,7 @@ func (client *Client) IPString() string {
// command goroutine
//
func (client *Client) recomputeMaxlens() int {
maxlenRest := 512
if client.capabilities.Has(caps.MaxLine) {
maxlenRest = client.server.Limits().LineLen.Rest
}
atomic.StoreUint32(&client.maxlenRest, uint32(maxlenRest))
return maxlenRest
}
// allow these negotiated length limits to be read without locks; this is a convenience
// so that Client.Send doesn't have to acquire any Client locks
func (client *Client) MaxlenRest() int {
return int(atomic.LoadUint32(&client.maxlenRest))
}
func (client *Client) run() {
var err error
var isExiting bool
var line string
var msg ircmsg.IrcMessage
func (client *Client) run(session *Session) {
defer func() {
if r := recover(); r != nil {
@ -278,27 +300,30 @@ func (client *Client) run() {
}
}
// ensure client connection gets closed
client.destroy(false)
client.destroy(false, session)
}()
client.idletimer.Initialize(client)
session.idletimer.Initialize(session)
session.resetFakelag()
client.nickTimer.Initialize(client)
client.resetFakelag()
isReattach := client.Registered()
// don't reset the nick timer during a reattach
if !isReattach {
client.nickTimer.Initialize(client)
}
firstLine := true
for {
maxlenRest := client.recomputeMaxlens()
maxlenRest := session.MaxlenRest()
line, err = client.socket.Read()
line, err := session.socket.Read()
if err != nil {
quitMessage := "connection closed"
if err == errReadQ {
quitMessage = "readQ exceeded"
}
client.Quit(quitMessage)
client.Quit(quitMessage, session)
break
}
@ -307,10 +332,10 @@ func (client *Client) run() {
}
// special-cased handling of PROXY protocol, see `handleProxyCommand` for details:
if firstLine {
if !isReattach && firstLine {
firstLine = false
if strings.HasPrefix(line, "PROXY") {
err = handleProxyCommand(client.server, client, line)
err = handleProxyCommand(client.server, client, session, line)
if err != nil {
break
} else {
@ -319,14 +344,14 @@ func (client *Client) run() {
}
}
msg, err = ircmsg.ParseLineStrict(line, true, maxlenRest)
msg, err := ircmsg.ParseLineStrict(line, true, maxlenRest)
if err == ircmsg.ErrorLineIsEmpty {
continue
} else if err == ircmsg.ErrorLineTooLong {
client.Send(nil, client.server.name, ERR_INPUTTOOLONG, client.Nick(), client.t("Input line too long"))
continue
} else if err != nil {
client.Quit(client.t("Received malformed line"))
client.Quit(client.t("Received malformed line"), session)
break
}
@ -340,13 +365,24 @@ func (client *Client) run() {
continue
}
isExiting = cmd.Run(client.server, client, msg)
if isExiting || client.isQuitting {
isExiting := cmd.Run(client.server, client, session, msg)
if isExiting {
break
} else if session.client != client {
// bouncer reattach
session.playReattachMessages()
go session.client.run(session)
break
}
}
}
func (session *Session) playReattachMessages() {
for _, channel := range session.client.Channels() {
channel.playJoinForSession(session)
}
}
//
// idle, quit, timers and timeouts
//
@ -359,9 +395,8 @@ func (client *Client) Active() {
}
// Ping sends the client a PING message.
func (client *Client) Ping() {
client.Send(nil, "", "PING", client.nick)
func (session *Session) Ping() {
session.Send(nil, "", "PING", session.client.Nick())
}
// tryResume tries to resume if the client asked us to.
@ -400,6 +435,11 @@ func (client *Client) tryResume() (success bool) {
return
}
if 1 < len(oldClient.Sessions()) {
client.Send(nil, server.name, "RESUME", "ERR", client.t("Cannot resume a client with multiple attached sessions"))
return
}
err := server.clients.Resume(client, oldClient)
if err != nil {
client.Send(nil, server.name, "RESUME", "ERR", client.t("Cannot resume connection"))
@ -467,17 +507,19 @@ func (client *Client) tryResume() (success bool) {
// send quit/resume messages to friends
for friend := range friends {
if friend.capabilities.Has(caps.Resume) {
if timestamp.IsZero() {
friend.Send(nil, oldNickmask, "RESUMED", username, hostname)
for _, session := range friend.Sessions() {
if session.capabilities.Has(caps.Resume) {
if timestamp.IsZero() {
session.Send(nil, oldNickmask, "RESUMED", username, hostname)
} else {
session.Send(nil, oldNickmask, "RESUMED", username, hostname, timestampString)
}
} else {
friend.Send(nil, oldNickmask, "RESUMED", username, hostname, timestampString)
}
} else {
if client.resumeDetails.HistoryIncomplete {
friend.Send(nil, oldNickmask, "QUIT", fmt.Sprintf(friend.t("Client reconnected (up to %d seconds of history lost)"), gapSeconds))
} else {
friend.Send(nil, oldNickmask, "QUIT", fmt.Sprintf(friend.t("Client reconnected")))
if client.resumeDetails.HistoryIncomplete {
session.Send(nil, oldNickmask, "QUIT", fmt.Sprintf(friend.t("Client reconnected (up to %d seconds of history lost)"), gapSeconds))
} else {
session.Send(nil, oldNickmask, "QUIT", fmt.Sprintf(friend.t("Client reconnected")))
}
}
}
}
@ -509,17 +551,17 @@ func (client *Client) tryResumeChannels() {
if !details.Timestamp.IsZero() {
now := time.Now()
items, complete := client.history.Between(details.Timestamp, now, false, 0)
rb := NewResponseBuffer(client)
rb := NewResponseBuffer(client.Sessions()[0])
client.replayPrivmsgHistory(rb, items, complete)
rb.Send(true)
}
details.OldClient.destroy(true)
details.OldClient.destroy(true, nil)
}
func (client *Client) replayPrivmsgHistory(rb *ResponseBuffer, items []history.Item, complete bool) {
nick := client.Nick()
serverTime := client.capabilities.Has(caps.ServerTime)
serverTime := rb.session.capabilities.Has(caps.ServerTime)
for _, item := range items {
var command string
switch item.Type {
@ -661,37 +703,27 @@ func (client *Client) ModeString() (str string) {
}
// Friends refers to clients that share a channel with this client.
func (client *Client) Friends(capabs ...caps.Capability) ClientSet {
friends := make(ClientSet)
func (client *Client) Friends(capabs ...caps.Capability) (result map[*Session]bool) {
result = make(map[*Session]bool)
// make sure that I have the right caps
hasCaps := true
for _, capab := range capabs {
if !client.capabilities.Has(capab) {
hasCaps = false
break
// look at the client's own sessions
for _, session := range client.Sessions() {
if session.capabilities.HasAll(capabs...) {
result[session] = true
}
}
if hasCaps {
friends.Add(client)
}
for _, channel := range client.Channels() {
for _, member := range channel.Members() {
// make sure they have all the required caps
hasCaps = true
for _, capab := range capabs {
if !member.capabilities.Has(capab) {
hasCaps = false
break
for _, session := range member.Sessions() {
if session.capabilities.HasAll(capabs...) {
result[session] = true
}
}
if hasCaps {
friends.Add(member)
}
}
}
return friends
return
}
func (client *Client) SetOper(oper *Oper) {
@ -816,47 +848,88 @@ func (client *Client) RplISupport(rb *ResponseBuffer) {
// Quit sets the given quit message for the client.
// (You must ensure separately that destroy() is called, e.g., by returning `true` from
// the command handler or calling it yourself.)
func (client *Client) Quit(message string) {
func (client *Client) Quit(message string, session *Session) {
setFinalData := func(sess *Session) {
message := sess.quitMessage
var finalData []byte
// #364: don't send QUIT lines to unregistered clients
if client.registered {
quitMsg := ircmsg.MakeMessage(nil, client.nickMaskString, "QUIT", message)
finalData, _ = quitMsg.LineBytesStrict(false, 512)
}
errorMsg := ircmsg.MakeMessage(nil, "", "ERROR", message)
errorMsgBytes, _ := errorMsg.LineBytesStrict(false, 512)
finalData = append(finalData, errorMsgBytes...)
sess.socket.SetFinalData(finalData)
}
client.stateMutex.Lock()
alreadyQuit := client.isQuitting
if !alreadyQuit {
client.isQuitting = true
client.quitMessage = message
}
registered := client.registered
prefix := client.nickMaskString
client.stateMutex.Unlock()
defer client.stateMutex.Unlock()
if alreadyQuit {
return
var sessions []*Session
if session != nil {
sessions = []*Session{session}
} else {
sessions = client.sessions
}
var finalData []byte
// #364: don't send QUIT lines to unregistered clients
if registered {
quitMsg := ircmsg.MakeMessage(nil, prefix, "QUIT", message)
finalData, _ = quitMsg.LineBytesStrict(false, 512)
for _, session := range sessions {
if session.SetQuitMessage(message) {
setFinalData(session)
}
}
errorMsg := ircmsg.MakeMessage(nil, "", "ERROR", message)
errorMsgBytes, _ := errorMsg.LineBytesStrict(false, 512)
finalData = append(finalData, errorMsgBytes...)
client.socket.SetFinalData(finalData)
}
// destroy gets rid of a client, removes them from server lists etc.
func (client *Client) destroy(beingResumed bool) {
// if `session` is nil, destroys the client unconditionally, removing all sessions;
// otherwise, destroys one specific session, only destroying the client if it
// has no more sessions.
func (client *Client) destroy(beingResumed bool, session *Session) {
var sessionsToDestroy []*Session
// allow destroy() to execute at most once
client.stateMutex.Lock()
isDestroyed := client.isDestroyed
client.isDestroyed = true
quitMessage := client.quitMessage
nickMaskString := client.nickMaskString
accountName := client.accountName
alreadyDestroyed := len(client.sessions) == 0
sessionRemoved := false
var remainingSessions int
if session == nil {
sessionRemoved = !alreadyDestroyed
sessionsToDestroy = client.sessions
client.sessions = nil
remainingSessions = 0
} else {
sessionRemoved, remainingSessions = client.removeSession(session)
if sessionRemoved {
sessionsToDestroy = []*Session{session}
}
}
var quitMessage string
if 0 < len(sessionsToDestroy) {
quitMessage = sessionsToDestroy[0].quitMessage
}
client.stateMutex.Unlock()
if isDestroyed {
if alreadyDestroyed || !sessionRemoved {
return
}
for _, session := range sessionsToDestroy {
if session.client != client {
// session has been attached to a new client; do not destroy it
continue
}
session.idletimer.Stop()
session.socket.Close()
// send quit/error message to client if they haven't been sent already
client.Quit("", session)
}
if remainingSessions != 0 {
return
}
@ -871,9 +944,6 @@ func (client *Client) destroy(beingResumed bool) {
client.server.logger.Debug("quit", fmt.Sprintf("%s is no longer on the server", client.nick))
}
// send quit/error message to client if they haven't been sent already
client.Quit("Connection closed")
if !beingResumed {
client.server.whoWas.Append(client.WhoWas())
}
@ -916,13 +986,10 @@ func (client *Client) destroy(beingResumed bool) {
}
// clean up self
client.idletimer.Stop()
client.nickTimer.Stop()
client.server.accounts.Logout(client)
client.socket.Close()
// send quit messages to friends
if !beingResumed {
if client.Registered() {
@ -953,16 +1020,12 @@ func (client *Client) destroy(beingResumed bool) {
// SendSplitMsgFromClient sends an IRC PRIVMSG/NOTICE coming from a specific client.
// Adds account-tag to the line as well.
func (client *Client) SendSplitMsgFromClient(from *Client, tags map[string]string, command, target string, message utils.SplitMessage) {
client.sendSplitMsgFromClientInternal(false, time.Time{}, from.NickMaskString(), from.AccountName(), tags, command, target, message)
}
func (client *Client) sendSplitMsgFromClientInternal(blocking bool, serverTime time.Time, nickmask, accountName string, tags map[string]string, command, target string, message utils.SplitMessage) {
if client.capabilities.Has(caps.MaxLine) || message.Wrapped == nil {
client.sendFromClientInternal(blocking, serverTime, message.Msgid, nickmask, accountName, tags, command, target, message.Message)
func (session *Session) sendSplitMsgFromClientInternal(blocking bool, serverTime time.Time, nickmask, accountName string, tags map[string]string, command, target string, message utils.SplitMessage) {
if session.capabilities.Has(caps.MaxLine) || message.Wrapped == nil {
session.sendFromClientInternal(blocking, serverTime, message.Msgid, nickmask, accountName, tags, command, target, message.Message)
} else {
for _, messagePair := range message.Wrapped {
client.sendFromClientInternal(blocking, serverTime, messagePair.Msgid, nickmask, accountName, tags, command, target, messagePair.Message)
session.sendFromClientInternal(blocking, serverTime, messagePair.Msgid, nickmask, accountName, tags, command, target, messagePair.Message)
}
}
}
@ -976,22 +1039,32 @@ func (client *Client) SendFromClient(msgid string, from *Client, tags map[string
// this is SendFromClient, but directly exposing nickmask and accountName,
// for things like history replay and CHGHOST where they no longer (necessarily)
// correspond to the current state of a client
func (client *Client) sendFromClientInternal(blocking bool, serverTime time.Time, msgid string, nickmask, accountName string, tags map[string]string, command string, params ...string) error {
func (client *Client) sendFromClientInternal(blocking bool, serverTime time.Time, msgid string, nickmask, accountName string, tags map[string]string, command string, params ...string) (err error) {
for _, session := range client.Sessions() {
err_ := session.sendFromClientInternal(blocking, serverTime, msgid, nickmask, accountName, tags, command, params...)
if err_ != nil {
err = err_
}
}
return
}
func (session *Session) sendFromClientInternal(blocking bool, serverTime time.Time, msgid string, nickmask, accountName string, tags map[string]string, command string, params ...string) (err error) {
msg := ircmsg.MakeMessage(tags, nickmask, command, params...)
// attach account-tag
if client.capabilities.Has(caps.AccountTag) && accountName != "*" {
if session.capabilities.Has(caps.AccountTag) && accountName != "*" {
msg.SetTag("account", accountName)
}
// attach message-id
if msgid != "" && client.capabilities.Has(caps.MessageTags) {
if msgid != "" && session.capabilities.Has(caps.MessageTags) {
msg.SetTag("draft/msgid", msgid)
}
// attach server-time
if client.capabilities.Has(caps.ServerTime) {
if session.capabilities.Has(caps.ServerTime) {
msg.SetTag("time", time.Now().UTC().Format(IRCv3TimestampFormat))
}
return client.SendRawMessage(msg, blocking)
return session.SendRawMessage(msg, blocking)
}
var (
@ -1008,7 +1081,7 @@ var (
)
// SendRawMessage sends a raw message to the client.
func (client *Client) SendRawMessage(message ircmsg.IrcMessage, blocking bool) error {
func (session *Session) SendRawMessage(message ircmsg.IrcMessage, blocking bool) error {
// use dumb hack to force the last param to be a trailing param if required
var usedTrailingHack bool
if commandsThatMustUseTrailing[message.Command] && len(message.Params) > 0 {
@ -1021,19 +1094,19 @@ func (client *Client) SendRawMessage(message ircmsg.IrcMessage, blocking bool) e
}
// assemble message
maxlenRest := client.MaxlenRest()
maxlenRest := session.MaxlenRest()
line, err := message.LineBytesStrict(false, maxlenRest)
if err != nil {
logline := fmt.Sprintf("Error assembling message for sending: %v\n%s", err, debug.Stack())
client.server.logger.Error("internal", logline)
session.client.server.logger.Error("internal", logline)
message = ircmsg.MakeMessage(nil, client.server.name, ERR_UNKNOWNERROR, "*", "Error assembling message for sending")
message = ircmsg.MakeMessage(nil, session.client.server.name, ERR_UNKNOWNERROR, "*", "Error assembling message for sending")
line, _ := message.LineBytesStrict(false, 0)
if blocking {
client.socket.BlockingWrite(line)
session.socket.BlockingWrite(line)
} else {
client.socket.Write(line)
session.socket.Write(line)
}
return err
}
@ -1044,43 +1117,40 @@ func (client *Client) SendRawMessage(message ircmsg.IrcMessage, blocking bool) e
line = line[:len(line)-1]
}
if client.server.logger.IsLoggingRawIO() {
if session.client.server.logger.IsLoggingRawIO() {
logline := string(line[:len(line)-2]) // strip "\r\n"
client.server.logger.Debug("useroutput", client.nick, " ->", logline)
session.client.server.logger.Debug("useroutput", session.client.Nick(), " ->", logline)
}
if blocking {
return client.socket.BlockingWrite(line)
return session.socket.BlockingWrite(line)
} else {
return client.socket.Write(line)
return session.socket.Write(line)
}
}
// Send sends an IRC line to the client.
func (client *Client) Send(tags map[string]string, prefix string, command string, params ...string) error {
func (client *Client) Send(tags map[string]string, prefix string, command string, params ...string) (err error) {
for _, session := range client.Sessions() {
err_ := session.Send(tags, prefix, command, params...)
if err_ != nil {
err = err_
}
}
return
}
func (session *Session) Send(tags map[string]string, prefix string, command string, params ...string) (err error) {
msg := ircmsg.MakeMessage(tags, prefix, command, params...)
if client.capabilities.Has(caps.ServerTime) && !msg.HasTag("time") {
if session.capabilities.Has(caps.ServerTime) && !msg.HasTag("time") {
msg.SetTag("time", time.Now().UTC().Format(IRCv3TimestampFormat))
}
return client.SendRawMessage(msg, false)
return session.SendRawMessage(msg, false)
}
// Notice sends the client a notice from the server.
func (client *Client) Notice(text string) {
limit := 400
if client.capabilities.Has(caps.MaxLine) {
limit = client.server.Limits().LineLen.Rest - 110
}
lines := utils.WordWrap(text, limit)
// force blank lines to be sent if we receive them
if len(lines) == 0 {
lines = []string{""}
}
for _, line := range lines {
client.Send(nil, client.server.name, "NOTICE", client.nick, line)
}
client.Send(nil, client.server.name, "NOTICE", client.Nick(), text)
}
func (client *Client) addChannel(channel *Channel) {