1
0
Fork 0
forked from External/ergo
This commit is contained in:
Shivaram Lingamneni 2018-03-17 21:32:12 -04:00
parent 8fd1446627
commit d1f5c59eef
6 changed files with 34 additions and 23 deletions

View file

@ -29,7 +29,7 @@ type Socket struct {
conn net.Conn
reader *bufio.Reader
MaxSendQBytes uint64
maxSendQBytes int
// coordination system for asynchronous writes
buffer []byte
@ -41,11 +41,11 @@ type Socket struct {
}
// NewSocket returns a new Socket.
func NewSocket(conn net.Conn, maxSendQBytes uint64) Socket {
func NewSocket(conn net.Conn, maxReadQBytes int, maxSendQBytes int) Socket {
return Socket{
conn: conn,
reader: bufio.NewReader(conn),
MaxSendQBytes: maxSendQBytes,
reader: bufio.NewReaderSize(conn, maxReadQBytes),
maxSendQBytes: maxSendQBytes,
lineToSendExists: make(chan bool, 1),
}
}
@ -92,10 +92,13 @@ func (socket *Socket) Read() (string, error) {
return "", io.EOF
}
lineBytes, err := socket.reader.ReadBytes('\n')
lineBytes, isPrefix, err := socket.reader.ReadLine()
if isPrefix {
return "", errReadQ
}
// convert bytes to string
line := string(lineBytes[:])
line := string(lineBytes)
// read last message properly (such as ERROR/QUIT/etc), just fail next reads/writes
if err == io.EOF {
@ -108,7 +111,7 @@ func (socket *Socket) Read() (string, error) {
return "", err
}
return strings.TrimRight(line, "\r\n"), nil
return line, nil
}
// Write sends the given string out of Socket.
@ -116,7 +119,7 @@ func (socket *Socket) Write(data string) (err error) {
socket.Lock()
if socket.closed {
err = io.EOF
} else if uint64(len(data)+len(socket.buffer)) > socket.MaxSendQBytes {
} else if len(data)+len(socket.buffer) > socket.maxSendQBytes {
socket.sendQExceeded = true
err = errSendQExceeded
} else {