diff --git a/server-client.go b/server-client.go index 53a53293..1e54d399 100644 --- a/server-client.go +++ b/server-client.go @@ -67,12 +67,8 @@ type serverClient struct { streamTracks []*track udpLastFrameTime time.Time udpCheckStreamTicker *time.Ticker - readBuf1 []byte - readBuf2 []byte - readCurBuf bool - writeBuf1 []byte - writeBuf2 []byte - writeCurBuf bool + readBuf *doubleBuffer + writeBuf *doubleBuffer writeChan chan *gortsplib.InterleavedFrame done chan struct{} @@ -87,10 +83,8 @@ func newServerClient(p *program, nconn net.Conn) *serverClient { WriteTimeout: p.conf.WriteTimeout, }), state: _CLIENT_STATE_STARTING, - readBuf1: make([]byte, 0, 512*1024), - readBuf2: make([]byte, 0, 512*1024), - writeBuf1: make([]byte, 2048), - writeBuf2: make([]byte, 2048), + readBuf: newDoubleBuffer(512 * 1024), + writeBuf: newDoubleBuffer(2048), writeChan: make(chan *gortsplib.InterleavedFrame), done: make(chan struct{}), } @@ -182,16 +176,9 @@ func (c *serverClient) close() { } func (c *serverClient) writeFrame(channel uint8, inbuf []byte) { - var buf []byte - if !c.writeCurBuf { - buf = c.writeBuf1 - } else { - buf = c.writeBuf2 - } - + buf := c.writeBuf.swap() buf = buf[:len(inbuf)] copy(buf, inbuf) - c.writeCurBuf = !c.writeCurBuf c.writeChan <- &gortsplib.InterleavedFrame{ Channel: channel, @@ -868,15 +855,8 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) bool { if c.streamProtocol == _STREAM_PROTOCOL_TCP { frame := &gortsplib.InterleavedFrame{} for { - if !c.readCurBuf { - frame.Content = c.readBuf1 - } else { - frame.Content = c.readBuf2 - } - + frame.Content = c.readBuf.swap() frame.Content = frame.Content[:cap(frame.Content)] - c.readCurBuf = !c.readCurBuf - recv, err := c.conn.ReadInterleavedFrameOrRequest(frame) if err != nil { if err != io.EOF { diff --git a/server-udpl.go b/server-udpl.go index 3d3957d3..44de8059 100644 --- a/server-udpl.go +++ b/server-udpl.go @@ -14,12 +14,8 @@ type serverUdpListener struct { p *program nconn *net.UDPConn trackFlowType trackFlowType - readBuf1 []byte - readBuf2 []byte - readCurBuf bool - writeBuf1 []byte - writeBuf2 []byte - writeCurBuf bool + readBuf *doubleBuffer + writeBuf *doubleBuffer writeChan chan *udpAddrFramePair done chan struct{} @@ -37,10 +33,8 @@ func newServerUdpListener(p *program, port int, trackFlowType trackFlowType) (*s p: p, nconn: nconn, trackFlowType: trackFlowType, - readBuf1: make([]byte, 2048), - readBuf2: make([]byte, 2048), - writeBuf1: make([]byte, 2048), - writeBuf2: make([]byte, 2048), + readBuf: newDoubleBuffer(2048), + writeBuf: newDoubleBuffer(2048), writeChan: make(chan *udpAddrFramePair), done: make(chan struct{}), } @@ -68,14 +62,7 @@ func (l *serverUdpListener) run() { }() for { - var buf []byte - if !l.readCurBuf { - buf = l.readBuf1 - } else { - buf = l.readBuf2 - } - l.readCurBuf = !l.readCurBuf - + buf := l.readBuf.swap() n, addr, err := l.nconn.ReadFromUDP(buf) if err != nil { break @@ -99,16 +86,9 @@ func (l *serverUdpListener) close() { } func (l *serverUdpListener) write(addr *net.UDPAddr, inbuf []byte) { - var buf []byte - if !l.writeCurBuf { - buf = l.writeBuf1 - } else { - buf = l.writeBuf2 - } - + buf := l.writeBuf.swap() buf = buf[:len(inbuf)] copy(buf, inbuf) - l.writeCurBuf = !l.writeCurBuf l.writeChan <- &udpAddrFramePair{ addr: addr, diff --git a/streamer-udpl.go b/streamer-udpl.go index 00765767..1c2b1e58 100644 --- a/streamer-udpl.go +++ b/streamer-udpl.go @@ -14,9 +14,7 @@ type streamerUdpListener struct { publisherPort int nconn *net.UDPConn running bool - readBuf1 []byte - readBuf2 []byte - readCurBuf bool + readBuf *doubleBuffer lastFrameTime time.Time done chan struct{} @@ -38,8 +36,7 @@ func newStreamerUdpListener(p *program, port int, streamer *streamer, trackFlowType: trackFlowType, publisherIp: publisherIp, nconn: nconn, - readBuf1: make([]byte, 2048), - readBuf2: make([]byte, 2048), + readBuf: newDoubleBuffer(2048), lastFrameTime: time.Now(), done: make(chan struct{}), } @@ -62,14 +59,7 @@ func (l *streamerUdpListener) start() { func (l *streamerUdpListener) run() { for { - var buf []byte - if !l.readCurBuf { - buf = l.readBuf1 - } else { - buf = l.readBuf2 - } - l.readCurBuf = !l.readCurBuf - + buf := l.readBuf.swap() n, addr, err := l.nconn.ReadFromUDP(buf) if err != nil { break diff --git a/streamer.go b/streamer.go index da0858dd..4cf99925 100644 --- a/streamer.go +++ b/streamer.go @@ -36,9 +36,7 @@ type streamer struct { serverSdpText []byte serverSdpParsed *sdp.Message firstTime bool - readBuf1 []byte - readBuf2 []byte - readCurBuf bool + readBuf *doubleBuffer terminate chan struct{} done chan struct{} @@ -84,8 +82,7 @@ func newStreamer(p *program, path string, source string, sourceProtocol string) ur: ur, proto: proto, firstTime: true, - readBuf1: make([]byte, 0, 512*1024), - readBuf2: make([]byte, 0, 512*1024), + readBuf: newDoubleBuffer(512 * 1024), terminate: make(chan struct{}), done: make(chan struct{}), } @@ -553,14 +550,8 @@ func (s *streamer) runTcp(conn *gortsplib.ConnClient) bool { outer: for { - if !s.readCurBuf { - frame.Content = s.readBuf1 - } else { - frame.Content = s.readBuf2 - } - + frame.Content = s.readBuf.swap() frame.Content = frame.Content[:cap(frame.Content)] - s.readCurBuf = !s.readCurBuf recv, err := conn.ReadInterleavedFrameOrResponse(frame) if err != nil { @@ -590,15 +581,8 @@ outer: chanConnError := make(chan struct{}) go func() { for { - if !s.readCurBuf { - frame.Content = s.readBuf1 - } else { - frame.Content = s.readBuf2 - } - + frame.Content = s.readBuf.swap() frame.Content = frame.Content[:cap(frame.Content)] - s.readCurBuf = !s.readCurBuf - err := conn.ReadInterleavedFrame(frame) if err != nil { s.log("ERR: %s", err) diff --git a/utils.go b/utils.go index 4525734d..422d1e95 100644 --- a/utils.go +++ b/utils.go @@ -49,3 +49,27 @@ func trackFlowTypeToInterleavedChannel(id int, trackFlowType trackFlowType) uint } return uint8((id * 2) + 1) } + +type doubleBuffer struct { + buf1 []byte + buf2 []byte + curBuf bool +} + +func newDoubleBuffer(size int) *doubleBuffer { + return &doubleBuffer{ + buf1: make([]byte, size), + buf2: make([]byte, size), + } +} + +func (db *doubleBuffer) swap() []byte { + var ret []byte + if !db.curBuf { + ret = db.buf1 + } else { + ret = db.buf2 + } + db.curBuf = !db.curBuf + return ret +}