diff --git a/Makefile b/Makefile index 89f692ea..381c4e45 100644 --- a/Makefile +++ b/Makefile @@ -65,10 +65,18 @@ endef export DOCKERFILE_RUN define CONFIG_RUN +#rtspPort: 8555 +#rtpPort: 8002 +#rtcpPort: 8003 + paths: all: readUser: test readPass: tast + + proxied: + source: rtsp://192.168.10.1/unicast + sourceProtocol: udp endef export CONFIG_RUN diff --git a/conf.yml b/conf.yml index c8f861b4..ce9563c3 100644 --- a/conf.yml +++ b/conf.yml @@ -1,20 +1,20 @@ # supported stream protocols (the handshake is always performed with TCP) protocols: [udp, tcp] -# port of the TCP rtsp listener +# port of the TCP RTSP listener rtspPort: 8554 -# port of the UDP rtp listener +# port of the UDP RTP listener rtpPort: 8000 -# port of the UDP rtcp listener +# port of the UDP RTCP listener rtcpPort: 8001 -# timeout of read operations -readTimeout: 5s -# timeout of write operations -writeTimeout: 5s # script to run when a client connects preScript: # script to run when a client disconnects postScript: +# timeout of read operations +readTimeout: 5s +# timeout of write operations +writeTimeout: 5s # enable pprof on port 9999 to monitor performance pprof: false diff --git a/go.mod b/go.mod index 92f6ac75..dc990e93 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,8 @@ require ( github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 // indirect github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d // indirect github.com/aler9/gortsplib v0.0.0-20200710091324-fb7d7b008e68 - github.com/stretchr/testify v1.4.0 + github.com/pion/rtcp v1.2.3 + github.com/stretchr/testify v1.5.1 gopkg.in/alecthomas/kingpin.v2 v2.2.6 gopkg.in/yaml.v2 v2.2.2 gortc.io/sdp v0.18.2 diff --git a/go.sum b/go.sum index 680e4d8e..301632ac 100644 --- a/go.sum +++ b/go.sum @@ -6,6 +6,8 @@ github.com/aler9/gortsplib v0.0.0-20200710091324-fb7d7b008e68 h1:apyYugiG/luHl0X github.com/aler9/gortsplib v0.0.0-20200710091324-fb7d7b008e68/go.mod h1:sL64nUkmrTVhlT/GCaxRXyI2Xk7m8XSdw5Uv8xKGPdc= github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/pion/rtcp v1.2.3 h1:2wrhKnqgSz91Q5nzYTO07mQXztYPtxL8a0XOss4rJqA= +github.com/pion/rtcp v1.2.3/go.mod h1:zGhIv0RPRF0Z1Wiij22pUt5W/c9fevqSzT4jje/oK7I= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= @@ -13,6 +15,8 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +github.com/stretchr/testify v1.5.1 h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H4= +github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= gopkg.in/alecthomas/kingpin.v2 v2.2.6 h1:jMFz6MfLP0/4fUyZle81rXUoxOBFi19VUFKVDOQfozc= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= diff --git a/main.go b/main.go index bfc3bd53..48596f43 100644 --- a/main.go +++ b/main.go @@ -11,6 +11,7 @@ import ( "regexp" "time" + "github.com/aler9/gortsplib" "gopkg.in/alecthomas/kingpin.v2" "gopkg.in/yaml.v2" "gortc.io/sdp" @@ -223,9 +224,9 @@ type publisher interface { type program struct { conf *conf protocols map[streamProtocol]struct{} - tcpl *serverTcpListener - udplRtp *serverUdpListener - udplRtcp *serverUdpListener + rtspl *serverTcpListener + rtpl *serverUdpListener + rtcpl *serverUdpListener clients map[*serverClient]struct{} streamers []*streamer publishers map[string]publisher @@ -387,24 +388,24 @@ func newProgram(sargs []string, stdin io.Reader) (*program, error) { http.DefaultServeMux = http.NewServeMux() } - p.udplRtp, err = newServerUdpListener(p, conf.RtpPort, _TRACK_FLOW_TYPE_RTP) + p.rtpl, err = newServerUdpListener(p, conf.RtpPort, _TRACK_FLOW_TYPE_RTP) if err != nil { return nil, err } - p.udplRtcp, err = newServerUdpListener(p, conf.RtcpPort, _TRACK_FLOW_TYPE_RTCP) + p.rtcpl, err = newServerUdpListener(p, conf.RtcpPort, _TRACK_FLOW_TYPE_RTCP) if err != nil { return nil, err } - p.tcpl, err = newServerTcpListener(p) + p.rtspl, err = newServerTcpListener(p) if err != nil { return nil, err } - go p.udplRtp.run() - go p.udplRtcp.run() - go p.tcpl.run() + go p.rtpl.run() + go p.rtcpl.run() + go p.rtspl.run() for _, s := range p.streamers { go s.run() } @@ -548,7 +549,7 @@ outer: continue } - client.udpLastFrameTime = time.Now() + client.rtcpReceivers[trackId].onFrame(evt.trackFlowType, evt.buf) p.forwardFrame(client.path, trackId, evt.trackFlowType, evt.buf) case programEventClientFrameTcp: @@ -613,9 +614,9 @@ outer: s.close() } - p.tcpl.close() - p.udplRtcp.close() - p.udplRtp.close() + p.rtspl.close() + p.rtcpl.close() + p.rtpl.close() for c := range p.clients { c.close() @@ -659,31 +660,42 @@ func (p *program) findPublisher(addr *net.UDPAddr, trackFlowType trackFlowType) } func (p *program) forwardFrame(path string, trackId int, trackFlowType trackFlowType, frame []byte) { - for c := range p.clients { - if c.path == path && c.state == _CLIENT_STATE_PLAY { - if c.streamProtocol == _STREAM_PROTOCOL_UDP { + for client := range p.clients { + if client.path == path && client.state == _CLIENT_STATE_PLAY { + if client.streamProtocol == _STREAM_PROTOCOL_UDP { if trackFlowType == _TRACK_FLOW_TYPE_RTP { - p.udplRtp.write(&udpAddrBufPair{ + p.rtpl.write(&udpAddrBufPair{ addr: &net.UDPAddr{ - IP: c.ip(), - Zone: c.zone(), - Port: c.streamTracks[trackId].rtpPort, + IP: client.ip(), + Zone: client.zone(), + Port: client.streamTracks[trackId].rtpPort, }, buf: frame, }) } else { - p.udplRtcp.write(&udpAddrBufPair{ + p.rtcpl.write(&udpAddrBufPair{ addr: &net.UDPAddr{ - IP: c.ip(), - Zone: c.zone(), - Port: c.streamTracks[trackId].rtcpPort, + IP: client.ip(), + Zone: client.zone(), + Port: client.streamTracks[trackId].rtcpPort, }, buf: frame, }) } } else { - c.writeFrame(trackFlowTypeToInterleavedChannel(trackId, trackFlowType), frame) + channel := trackFlowTypeToInterleavedChannel(trackId, trackFlowType) + + buf := client.writeBuf.swap() + buf = buf[:len(frame)] + copy(buf, frame) + + client.events <- serverClientEventFrameTcp{ + frame: &gortsplib.InterleavedFrame{ + Channel: channel, + Content: buf, + }, + } } } } diff --git a/server-client.go b/server-client.go index 23dff185..508cfff5 100644 --- a/server-client.go +++ b/server-client.go @@ -14,10 +14,21 @@ import ( ) const ( - _UDP_CHECK_STREAM_INTERVAL = 5 * time.Second - _UDP_STREAM_DEAD_AFTER = 10 * time.Second + _CLIENT_CHECK_STREAM_INTERVAL = 5 * time.Second + _CLIENT_STREAM_DEAD_AFTER = 15 * time.Second + _CLIENT_RECEIVER_REPORT_INTERVAL = 10 * time.Second ) +type serverClientEvent interface { + isServerClientEvent() +} + +type serverClientEventFrameTcp struct { + frame *gortsplib.InterleavedFrame +} + +func (serverClientEventFrameTcp) isServerClientEvent() {} + type serverClientState int const ( @@ -53,24 +64,24 @@ func (cs serverClientState) String() string { } type serverClient struct { - p *program - conn *gortsplib.ConnServer - state serverClientState - path string - authUser string - authPass string - authHelper *gortsplib.AuthServer - authFailures int - streamSdpText []byte // only if publisher - streamSdpParsed *sdp.Message // only if publisher - streamProtocol streamProtocol - streamTracks []*track - udpLastFrameTime time.Time - readBuf *doubleBuffer - writeBuf *doubleBuffer + p *program + conn *gortsplib.ConnServer + state serverClientState + path string + authUser string + authPass string + authHelper *gortsplib.AuthServer + authFailures int + streamSdpText []byte // only if publisher + streamSdpParsed *sdp.Message // only if publisher + streamProtocol streamProtocol + streamTracks []*track + rtcpReceivers []*rtcpReceiver + readBuf *doubleBuffer + writeBuf *doubleBuffer - writeChan chan *gortsplib.InterleavedFrame // only if state = _CLIENT_STATE_PLAY - done chan struct{} + events chan serverClientEvent + done chan struct{} } func newServerClient(p *program, nconn net.Conn) *serverClient { @@ -201,31 +212,45 @@ outer: func (c *serverClient) runPlay() bool { if c.streamProtocol == _STREAM_PROTOCOL_TCP { - writeDone := make(chan struct{}) + readDone := make(chan error) go func() { - defer close(writeDone) - for frame := range c.writeChan { - c.conn.WriteInterleavedFrame(frame) + buf := make([]byte, 2048) + for { + _, err := c.conn.NetConn().Read(buf) + if err != nil { + readDone <- err + break + } } }() - buf := make([]byte, 2048) + outer: for { - _, err := c.conn.NetConn().Read(buf) - if err != nil { + select { + case err := <-readDone: if err != io.EOF { c.log("ERR: %s", err) } - break + break outer + + case rawEvt := <-c.events: + switch evt := rawEvt.(type) { + case serverClientEventFrameTcp: + c.conn.WriteInterleavedFrame(evt.frame) + } } } + go func() { + for range c.events { + } + }() + done := make(chan struct{}) c.p.events <- programEventClientClose{done, c} <-done - close(c.writeChan) - <-writeDone + close(c.events) } else { for { @@ -255,83 +280,147 @@ func (c *serverClient) runRecord() bool { if c.streamProtocol == _STREAM_PROTOCOL_TCP { frame := &gortsplib.InterleavedFrame{} - outer: + readDone := make(chan error) + go func() { + for { + frame.Content = c.readBuf.swap() + frame.Content = frame.Content[:cap(frame.Content)] + recv, err := c.conn.ReadInterleavedFrameOrRequest(frame) + if err != nil { + readDone <- err + break + } + + switch recvt := recv.(type) { + case *gortsplib.InterleavedFrame: + trackId, trackFlowType := interleavedChannelToTrackFlowType(frame.Channel) + if trackId >= len(c.streamTracks) { + c.log("ERR: invalid track id '%d'", trackId) + readDone <- nil + break + } + + c.rtcpReceivers[trackId].onFrame(trackFlowType, frame.Content) + c.p.events <- programEventClientFrameTcp{ + c.path, + trackId, + trackFlowType, + frame.Content, + } + + case *gortsplib.Request: + err := c.handleRequest(recvt) + if err != nil { + readDone <- nil + break + } + } + } + }() + + checkStreamTicker := time.NewTicker(_CLIENT_CHECK_STREAM_INTERVAL) + receiverReportTicker := time.NewTicker(_CLIENT_RECEIVER_REPORT_INTERVAL) + + outer1: for { - frame.Content = c.readBuf.swap() - frame.Content = frame.Content[:cap(frame.Content)] - recv, err := c.conn.ReadInterleavedFrameOrRequest(frame) - if err != nil { - if err != io.EOF { + select { + case err := <-readDone: + if err != nil && err != io.EOF { c.log("ERR: %s", err) } - break outer - } + break outer1 - switch recvt := recv.(type) { - case *gortsplib.InterleavedFrame: - trackId, trackFlowType := interleavedChannelToTrackFlowType(frame.Channel) - - if trackId >= len(c.streamTracks) { - c.log("ERR: invalid track id '%d'", trackId) - break outer + case <-checkStreamTicker.C: + for trackId := range c.streamTracks { + if time.Since(c.rtcpReceivers[trackId].lastFrameTime()) >= _CLIENT_STREAM_DEAD_AFTER { + c.log("ERR: stream is dead") + c.conn.NetConn().Close() + <-readDone + break outer1 + } } - c.p.events <- programEventClientFrameTcp{ - c.path, - trackId, - trackFlowType, - frame.Content, - } + case <-receiverReportTicker.C: + for trackId := range c.streamTracks { + channel := trackFlowTypeToInterleavedChannel(trackId, _TRACK_FLOW_TYPE_RTCP) - case *gortsplib.Request: - err := c.handleRequest(recvt) - if err != nil { - break outer + frame := c.rtcpReceivers[trackId].report() + c.conn.WriteInterleavedFrame(&gortsplib.InterleavedFrame{ + Channel: channel, + Content: frame, + }) } } } - done := make(chan struct{}) - c.p.events <- programEventClientClose{done, c} - <-done + checkStreamTicker.Stop() + receiverReportTicker.Stop() } else { - c.udpLastFrameTime = time.Now() - - udpCheckStreamTicker := time.NewTicker(_UDP_CHECK_STREAM_INTERVAL) - udpCheckStreamDone := make(chan struct{}) + readDone := make(chan error) go func() { - defer close(udpCheckStreamDone) - for range udpCheckStreamTicker.C { - if time.Since(c.udpLastFrameTime) >= _UDP_STREAM_DEAD_AFTER { - c.log("ERR: stream is dead") - c.conn.NetConn().Close() + for { + req, err := c.conn.ReadRequest() + if err != nil { + readDone <- err + break + } + + err = c.handleRequest(req) + if err != nil { + readDone <- nil // err is not needed break } } }() + checkStreamTicker := time.NewTicker(_CLIENT_CHECK_STREAM_INTERVAL) + receiverReportTicker := time.NewTicker(_CLIENT_RECEIVER_REPORT_INTERVAL) + + outer2: for { - req, err := c.conn.ReadRequest() - if err != nil { - if err != io.EOF { + select { + case err := <-readDone: + if err != nil && err != io.EOF { c.log("ERR: %s", err) } - break - } + break outer2 - err = c.handleRequest(req) - if err != nil { - break + case <-checkStreamTicker.C: + for trackId := range c.streamTracks { + if time.Since(c.rtcpReceivers[trackId].lastFrameTime()) >= _CLIENT_STREAM_DEAD_AFTER { + c.log("ERR: stream is dead") + c.conn.NetConn().Close() + <-readDone + break outer2 + } + } + + case <-receiverReportTicker.C: + for trackId := range c.streamTracks { + frame := c.rtcpReceivers[trackId].report() + c.p.rtcpl.writeChan <- &udpAddrBufPair{ + addr: &net.UDPAddr{ + IP: c.ip(), + Zone: c.zone(), + Port: c.streamTracks[trackId].rtcpPort, + }, + buf: frame, + } + } } } - done := make(chan struct{}) - c.p.events <- programEventClientClose{done, c} - <-done + checkStreamTicker.Stop() + receiverReportTicker.Stop() + } - udpCheckStreamTicker.Stop() - <-udpCheckStreamDone + done := make(chan struct{}) + c.p.events <- programEventClientClose{done, c} + <-done + + for trackId := range c.streamTracks { + c.rtcpReceivers[trackId].close() } return false @@ -342,17 +431,6 @@ func (c *serverClient) close() { <-c.done } -func (c *serverClient) writeFrame(channel uint8, inbuf []byte) { - buf := c.writeBuf.swap() - buf = buf[:len(inbuf)] - copy(buf, inbuf) - - c.writeChan <- &gortsplib.InterleavedFrame{ - Channel: channel, - Content: buf, - } -} - func (c *serverClient) writeResError(req *gortsplib.Request, code gortsplib.StatusCode, err error) { c.log("ERR: %s", err) @@ -532,7 +610,7 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) error { c.p.events <- programEventClientDescribe{path, res} sdp := <-res if sdp == nil { - c.writeResError(req, gortsplib.StatusBadRequest, fmt.Errorf("no one is streaming on path '%s'", path)) + c.writeResError(req, gortsplib.StatusNotFound, fmt.Errorf("no one is publishing on path '%s'", path)) return errClientTerminate } @@ -906,8 +984,10 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) error { }, }) - c.writeBuf = newDoubleBuffer(2048) - c.writeChan = make(chan *gortsplib.InterleavedFrame) + if c.streamProtocol == _STREAM_PROTOCOL_TCP { + c.writeBuf = newDoubleBuffer(2048) + c.events = make(chan serverClientEvent) + } // set state res = make(chan error) @@ -948,6 +1028,11 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) error { }, }) + c.rtcpReceivers = make([]*rtcpReceiver, len(c.streamTracks)) + for trackId := range c.streamTracks { + c.rtcpReceivers[trackId] = newRtcpReceiver() + } + res := make(chan error) c.p.events <- programEventClientRecord{res, c} <-res diff --git a/server-udpl.go b/server-udpl.go index 8aaab5bb..0e27f699 100644 --- a/server-udpl.go +++ b/server-udpl.go @@ -54,7 +54,9 @@ func (l *serverUdpListener) log(format string, args ...interface{}) { } func (l *serverUdpListener) run() { + writeDone := make(chan struct{}) go func() { + defer close(writeDone) for w := range l.writeChan { l.nconn.SetWriteDeadline(time.Now().Add(l.p.conf.WriteTimeout)) l.nconn.WriteTo(w.buf, w.addr) @@ -76,6 +78,7 @@ func (l *serverUdpListener) run() { } close(l.writeChan) + <-writeDone close(l.done) } diff --git a/streamer-udpl.go b/streamer-udpl.go index 9f102602..a7686454 100644 --- a/streamer-udpl.go +++ b/streamer-udpl.go @@ -16,7 +16,8 @@ type streamerUdpListener struct { running bool readBuf *doubleBuffer - done chan struct{} + writeChan chan *udpAddrBufPair + done chan struct{} } func newStreamerUdpListener(p *program, port int, streamer *streamer, @@ -36,6 +37,7 @@ func newStreamerUdpListener(p *program, port int, streamer *streamer, publisherIp: publisherIp, nconn: nconn, readBuf: newDoubleBuffer(2048), + writeChan: make(chan *udpAddrBufPair), done: make(chan struct{}), } @@ -43,19 +45,28 @@ func newStreamerUdpListener(p *program, port int, streamer *streamer, } func (l *streamerUdpListener) close() { - l.nconn.Close() - - if l.running { - <-l.done - } + l.nconn.Close() // close twice } func (l *streamerUdpListener) start() { - l.running = true go l.run() } +func (l *streamerUdpListener) stop() { + l.nconn.Close() + <-l.done +} + func (l *streamerUdpListener) run() { + writeDone := make(chan struct{}) + go func() { + defer close(writeDone) + for w := range l.writeChan { + l.nconn.SetWriteDeadline(time.Now().Add(l.p.conf.WriteTimeout)) + l.nconn.WriteTo(w.buf, w.addr) + } + }() + for { buf := l.readBuf.swap() n, addr, err := l.nconn.ReadFromUDP(buf) @@ -67,10 +78,12 @@ func (l *streamerUdpListener) run() { continue } - l.streamer.udpLastFrameTime = time.Now() - + l.streamer.rtcpReceivers[l.trackId].onFrame(l.trackFlowType, buf[:n]) l.p.events <- programEventStreamerFrame{l.streamer, l.trackId, l.trackFlowType, buf[:n]} } + close(l.writeChan) + <-writeDone + close(l.done) } diff --git a/streamer.go b/streamer.go index c25c7174..90c1b457 100644 --- a/streamer.go +++ b/streamer.go @@ -14,30 +14,29 @@ import ( ) const ( - _DIAL_TIMEOUT = 10 * time.Second - _RETRY_INTERVAL = 5 * time.Second - _CHECK_STREAM_INTERVAL = 6 * time.Second - _STREAM_DEAD_AFTER = 5 * time.Second - _KEEPALIVE_INTERVAL = 60 * time.Second + _STREAMER_RETRY_INTERVAL = 5 * time.Second + _STREAMER_CHECK_STREAM_INTERVAL = 5 * time.Second + _STREAMER_STREAM_DEAD_AFTER = 15 * time.Second + _STREAMER_KEEPALIVE_INTERVAL = 60 * time.Second + _STREAMER_RECEIVER_REPORT_INTERVAL = 10 * time.Second ) type streamerUdpListenerPair struct { - udplRtp *streamerUdpListener - udplRtcp *streamerUdpListener + rtpl *streamerUdpListener + rtcpl *streamerUdpListener } type streamer struct { - p *program - path string - ur *url.URL - proto streamProtocol - ready bool - clientSdpParsed *sdp.Message - serverSdpText []byte - serverSdpParsed *sdp.Message - firstTime bool - udpLastFrameTime time.Time - readBuf *doubleBuffer + p *program + path string + ur *url.URL + proto streamProtocol + ready bool + clientSdpParsed *sdp.Message + serverSdpText []byte + serverSdpParsed *sdp.Message + rtcpReceivers []*rtcpReceiver + readBuf *doubleBuffer terminate chan struct{} done chan struct{} @@ -82,7 +81,6 @@ func newStreamer(p *program, path string, source string, sourceProtocol string) path: path, ur: ur, proto: proto, - firstTime: true, readBuf: newDoubleBuffer(512 * 1024), terminate: make(chan struct{}), done: make(chan struct{}), @@ -113,30 +111,26 @@ func (s *streamer) run() { if !ok { break } + + t := time.NewTimer(_STREAMER_RETRY_INTERVAL) + select { + case <-s.terminate: + break + case <-t.C: + } } close(s.done) } func (s *streamer) do() bool { - if s.firstTime { - s.firstTime = false - } else { - t := time.NewTimer(_RETRY_INTERVAL) - select { - case <-s.terminate: - return false - case <-t.C: - } - } - s.log("initializing with protocol %s", s.proto) var nconn net.Conn var err error dialDone := make(chan struct{}) go func() { - nconn, err = net.DialTimeout("tcp", s.ur.Host, _DIAL_TIMEOUT) + nconn, err = net.DialTimeout("tcp", s.ur.Host, s.p.conf.ReadTimeout) close(dialDone) }() @@ -251,16 +245,16 @@ func (s *streamer) runUdp(conn *gortsplib.ConnClient) bool { defer func() { for _, pair := range streamerUdpListenerPairs { - pair.udplRtp.close() - pair.udplRtcp.close() + pair.rtpl.close() + pair.rtcpl.close() } }() for i, media := range s.clientSdpParsed.Medias { var rtpPort int var rtcpPort int - var udplRtp *streamerUdpListener - var udplRtcp *streamerUdpListener + var rtpl *streamerUdpListener + var rtcpl *streamerUdpListener func() { for { // choose two consecutive ports in range 65536-10000 @@ -269,16 +263,16 @@ func (s *streamer) runUdp(conn *gortsplib.ConnClient) bool { rtcpPort = rtpPort + 1 var err error - udplRtp, err = newStreamerUdpListener(s.p, rtpPort, s, i, + rtpl, err = newStreamerUdpListener(s.p, rtpPort, s, i, _TRACK_FLOW_TYPE_RTP, publisherIp) if err != nil { continue } - udplRtcp, err = newStreamerUdpListener(s.p, rtcpPort, s, i, + rtcpl, err = newStreamerUdpListener(s.p, rtcpPort, s, i, _TRACK_FLOW_TYPE_RTCP, publisherIp) if err != nil { - udplRtp.close() + rtpl.close() continue } @@ -338,23 +332,23 @@ func (s *streamer) runUdp(conn *gortsplib.ConnClient) bool { }) if err != nil { s.log("ERR: %s", err) - udplRtp.close() - udplRtcp.close() + rtpl.close() + rtcpl.close() return true } if res.StatusCode != gortsplib.StatusOK { s.log("ERR: SETUP returned code %d (%s)", res.StatusCode, res.StatusMessage) - udplRtp.close() - udplRtcp.close() + rtpl.close() + rtcpl.close() return true } tsRaw, ok := res.Header["Transport"] if !ok || len(tsRaw) != 1 { s.log("ERR: transport header not provided") - udplRtp.close() - udplRtcp.close() + rtpl.close() + rtcpl.close() return true } @@ -362,17 +356,17 @@ func (s *streamer) runUdp(conn *gortsplib.ConnClient) bool { rtpServerPort, rtcpServerPort := th.GetPorts("server_port") if rtpServerPort == 0 { s.log("ERR: server ports not provided") - udplRtp.close() - udplRtcp.close() + rtpl.close() + rtcpl.close() return true } - udplRtp.publisherPort = rtpServerPort - udplRtcp.publisherPort = rtcpServerPort + rtpl.publisherPort = rtpServerPort + rtcpl.publisherPort = rtcpServerPort streamerUdpListenerPairs = append(streamerUdpListenerPairs, streamerUdpListenerPair{ - udplRtp: udplRtp, - udplRtcp: udplRtcp, + rtpl: rtpl, + rtcpl: rtcpl, }) } @@ -395,30 +389,32 @@ func (s *streamer) runUdp(conn *gortsplib.ConnClient) bool { return true } - for _, pair := range streamerUdpListenerPairs { - pair.udplRtp.start() - pair.udplRtcp.start() + s.rtcpReceivers = make([]*rtcpReceiver, len(s.clientSdpParsed.Medias)) + for trackId := range s.clientSdpParsed.Medias { + s.rtcpReceivers[trackId] = newRtcpReceiver() } - tickerSendKeepalive := time.NewTicker(_KEEPALIVE_INTERVAL) - defer tickerSendKeepalive.Stop() + for _, pair := range streamerUdpListenerPairs { + pair.rtpl.start() + pair.rtcpl.start() + } - s.udpLastFrameTime = time.Now() - tickerCheckStream := time.NewTicker(_CHECK_STREAM_INTERVAL) - defer tickerCheckStream.Stop() + sendKeepaliveTicker := time.NewTicker(_STREAMER_KEEPALIVE_INTERVAL) + checkStreamTicker := time.NewTicker(_STREAMER_CHECK_STREAM_INTERVAL) + receiverReportTicker := time.NewTicker(_STREAMER_RECEIVER_REPORT_INTERVAL) s.p.events <- programEventStreamerReady{s} - defer func() { - s.p.events <- programEventStreamerNotReady{s} - }() + var ret bool +outer: for { select { case <-s.terminate: - return false + ret = false + break outer - case <-tickerSendKeepalive.C: + case <-sendKeepaliveTicker.C: _, err = conn.WriteRequest(&gortsplib.Request{ Method: gortsplib.OPTIONS, Url: &url.URL{ @@ -429,16 +425,50 @@ func (s *streamer) runUdp(conn *gortsplib.ConnClient) bool { }) if err != nil { s.log("ERR: %s", err) - return true + ret = true + break outer } - case <-tickerCheckStream.C: - if time.Since(s.udpLastFrameTime) >= _STREAM_DEAD_AFTER { - s.log("ERR: stream is dead") - return true + case <-checkStreamTicker.C: + for trackId := range s.clientSdpParsed.Medias { + if time.Since(s.rtcpReceivers[trackId].lastFrameTime()) >= _STREAMER_STREAM_DEAD_AFTER { + s.log("ERR: stream is dead") + ret = true + break outer + } + } + + case <-receiverReportTicker.C: + for trackId := range s.clientSdpParsed.Medias { + frame := s.rtcpReceivers[trackId].report() + streamerUdpListenerPairs[trackId].rtcpl.writeChan <- &udpAddrBufPair{ + addr: &net.UDPAddr{ + IP: conn.NetConn().RemoteAddr().(*net.TCPAddr).IP, + Zone: conn.NetConn().RemoteAddr().(*net.TCPAddr).Zone, + Port: streamerUdpListenerPairs[trackId].rtcpl.publisherPort, + }, + buf: frame, + } } } } + + sendKeepaliveTicker.Stop() + checkStreamTicker.Stop() + receiverReportTicker.Stop() + + s.p.events <- programEventStreamerNotReady{s} + + for _, pair := range streamerUdpListenerPairs { + pair.rtpl.stop() + pair.rtcpl.stop() + } + + for trackId := range s.clientSdpParsed.Medias { + s.rtcpReceivers[trackId].close() + } + + return ret } func (s *streamer) runTcp(conn *gortsplib.ConnClient) bool { @@ -536,7 +566,7 @@ func (s *streamer) runTcp(conn *gortsplib.ConnClient) bool { frame := &gortsplib.InterleavedFrame{} -outer: +outer1: for { frame.Content = s.readBuf.swap() frame.Content = frame.Content[:cap(frame.Content)] @@ -553,18 +583,19 @@ outer: s.log("ERR: PLAY returned code %d (%s)", recvt.StatusCode, recvt.StatusMessage) return true } - break outer + break outer1 case *gortsplib.InterleavedFrame: // ignore the frames sent before the response } } - s.p.events <- programEventStreamerReady{s} + s.rtcpReceivers = make([]*rtcpReceiver, len(s.clientSdpParsed.Medias)) + for trackId := range s.clientSdpParsed.Medias { + s.rtcpReceivers[trackId] = newRtcpReceiver() + } - defer func() { - s.p.events <- programEventStreamerNotReady{s} - }() + s.p.events <- programEventStreamerReady{s} chanConnError := make(chan struct{}) go func() { @@ -580,16 +611,60 @@ outer: trackId, trackFlowType := interleavedChannelToTrackFlowType(frame.Channel) + s.rtcpReceivers[trackId].onFrame(trackFlowType, frame.Content) s.p.events <- programEventStreamerFrame{s, trackId, trackFlowType, frame.Content} } }() - select { - case <-s.terminate: - return false - case <-chanConnError: - return true + checkStreamTicker := time.NewTicker(_STREAMER_CHECK_STREAM_INTERVAL) + receiverReportTicker := time.NewTicker(_STREAMER_RECEIVER_REPORT_INTERVAL) + + var ret bool + +outer2: + for { + select { + case <-s.terminate: + ret = false + break outer2 + + case <-chanConnError: + ret = true + break outer2 + + case <-checkStreamTicker.C: + for trackId := range s.clientSdpParsed.Medias { + if time.Since(s.rtcpReceivers[trackId].lastFrameTime()) >= _STREAMER_STREAM_DEAD_AFTER { + s.log("ERR: stream is dead") + ret = true + break outer2 + } + } + + case <-receiverReportTicker.C: + for trackId := range s.clientSdpParsed.Medias { + frame := s.rtcpReceivers[trackId].report() + + channel := trackFlowTypeToInterleavedChannel(trackId, _TRACK_FLOW_TYPE_RTCP) + + conn.WriteInterleavedFrame(&gortsplib.InterleavedFrame{ + Channel: channel, + Content: frame, + }) + } + } } + + checkStreamTicker.Stop() + receiverReportTicker.Stop() + + s.p.events <- programEventStreamerNotReady{s} + + for trackId := range s.clientSdpParsed.Medias { + s.rtcpReceivers[trackId].close() + } + + return ret } func (s *streamer) close() { diff --git a/utils.go b/utils.go index 9942dc4c..688d43d7 100644 --- a/utils.go +++ b/utils.go @@ -2,7 +2,11 @@ package main import ( "fmt" + "math/rand" "net" + "time" + + "github.com/pion/rtcp" ) func parseIpCidrList(in []string) ([]interface{}, error) { @@ -90,3 +94,139 @@ func (db *doubleBuffer) swap() []byte { db.curBuf = !db.curBuf return ret } + +type rtcpReceiverEvent interface { + isRtpReceiverEvent() +} + +type rtcpReceiverEventFrameRtp struct { + sequenceNumber uint16 +} + +func (rtcpReceiverEventFrameRtp) isRtpReceiverEvent() {} + +type rtcpReceiverEventFrameRtcp struct { + ssrc uint32 + ntpTimeMiddle uint32 +} + +func (rtcpReceiverEventFrameRtcp) isRtpReceiverEvent() {} + +type rtcpReceiverEventLastFrameTime struct { + res chan time.Time +} + +func (rtcpReceiverEventLastFrameTime) isRtpReceiverEvent() {} + +type rtcpReceiverEventReport struct { + res chan []byte +} + +func (rtcpReceiverEventReport) isRtpReceiverEvent() {} + +type rtcpReceiverEventTerminate struct{} + +func (rtcpReceiverEventTerminate) isRtpReceiverEvent() {} + +type rtcpReceiver struct { + events chan rtcpReceiverEvent + done chan struct{} +} + +func newRtcpReceiver() *rtcpReceiver { + rr := &rtcpReceiver{ + events: make(chan rtcpReceiverEvent), + done: make(chan struct{}), + } + go rr.run() + return rr +} + +func (rr *rtcpReceiver) run() { + lastFrameTime := time.Now() + publisherSSRC := uint32(0) + receiverSSRC := rand.Uint32() + sequenceNumberCycles := uint16(0) + lastSequenceNumber := uint16(0) + lastSenderReport := uint32(0) + +outer: + for rawEvt := range rr.events { + switch evt := rawEvt.(type) { + case rtcpReceiverEventFrameRtp: + if evt.sequenceNumber < lastSequenceNumber { + sequenceNumberCycles += 1 + } + lastSequenceNumber = evt.sequenceNumber + lastFrameTime = time.Now() + + case rtcpReceiverEventFrameRtcp: + publisherSSRC = evt.ssrc + lastSenderReport = evt.ntpTimeMiddle + + case rtcpReceiverEventLastFrameTime: + evt.res <- lastFrameTime + + case rtcpReceiverEventReport: + rr := &rtcp.ReceiverReport{ + SSRC: receiverSSRC, + Reports: []rtcp.ReceptionReport{ + { + SSRC: publisherSSRC, + LastSequenceNumber: uint32(sequenceNumberCycles)<<8 | uint32(lastSequenceNumber), + LastSenderReport: lastSenderReport, + }, + }, + } + frame, _ := rr.Marshal() + evt.res <- frame + + case rtcpReceiverEventTerminate: + break outer + } + } + + close(rr.events) + + close(rr.done) +} + +func (rr *rtcpReceiver) close() { + rr.events <- rtcpReceiverEventTerminate{} + <-rr.done +} + +func (rr *rtcpReceiver) onFrame(trackFlowType trackFlowType, buf []byte) { + if trackFlowType == _TRACK_FLOW_TYPE_RTP { + // extract sequence number of first frame + if len(buf) >= 3 { + sequenceNumber := uint16(uint16(buf[2])<<8 | uint16(buf[1])) + rr.events <- rtcpReceiverEventFrameRtp{sequenceNumber} + } + + } else { + frames, err := rtcp.Unmarshal(buf) + if err == nil { + for _, frame := range frames { + if senderReport, ok := (frame).(*rtcp.SenderReport); ok { + rr.events <- rtcpReceiverEventFrameRtcp{ + senderReport.SSRC, + uint32(senderReport.NTPTime >> 16), + } + } + } + } + } +} + +func (rr *rtcpReceiver) lastFrameTime() time.Time { + res := make(chan time.Time) + rr.events <- rtcpReceiverEventLastFrameTime{res} + return <-res +} + +func (rr *rtcpReceiver) report() []byte { + res := make(chan []byte) + rr.events <- rtcpReceiverEventReport{res} + return <-res +}