1
0
Fork 0
forked from External/ergo

draft/resume-0.2 implementation, message history support

This commit is contained in:
Shivaram Lingamneni 2018-11-26 05:23:27 -05:00
parent 70364f5f67
commit a0bf548fc5
28 changed files with 1294 additions and 317 deletions

View file

@ -146,6 +146,38 @@ func (socket *Socket) Write(data []byte) (err error) {
return
}
// BlockingWrite sends the given string out of Socket. Requirements:
// 1. MUST block until the message is sent
// 2. MUST bypass sendq (calls to BlockingWrite cannot, on their own, cause a sendq overflow)
// 3. MUST provide mutual exclusion for socket.conn.Write
// 4. MUST respect the same ordering guarantees as Write (i.e., if a call to Write that sends
// message m1 happens-before a call to BlockingWrite that sends message m2,
// m1 must be sent on the wire before m2
// Callers MUST be writing to the client's socket from the client's own goroutine;
// other callers must use the nonblocking Write call instead. Otherwise, a client
// with a slow/unreliable connection risks stalling the progress of the system as a whole.
func (socket *Socket) BlockingWrite(data []byte) (err error) {
if len(data) == 0 {
return
}
// blocking acquire of the trylock
socket.writerSemaphore.Acquire()
defer socket.writerSemaphore.Release()
// first, flush any buffered data, to preserve the ordering guarantees
closed := socket.performWrite()
if closed {
return io.EOF
}
_, err = socket.conn.Write(data)
if err != nil {
socket.finalize()
}
return
}
// wakeWriter starts the goroutine that actually performs the write, without blocking
func (socket *Socket) wakeWriter() {
if socket.writerSemaphore.TryAcquire() {
@ -199,7 +231,8 @@ func (socket *Socket) send() {
}
// write the contents of the buffer, then see if we need to close
func (socket *Socket) performWrite() {
// returns whether we closed
func (socket *Socket) performWrite() (closed bool) {
// retrieve the buffered data, clear the buffer
socket.Lock()
buffers := socket.buffers
@ -214,10 +247,14 @@ func (socket *Socket) performWrite() {
shouldClose := (err != nil) || socket.closed || socket.sendQExceeded
socket.Unlock()
if !shouldClose {
return
if shouldClose {
socket.finalize()
}
return shouldClose
}
// mark closed and send final data. you must be holding the semaphore to call this:
func (socket *Socket) finalize() {
// mark the socket closed (if someone hasn't already), then write error lines
socket.Lock()
socket.closed = true