diff --git a/go.mod b/go.mod index 1a3c7291..ffaef0db 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.15 require ( github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 // indirect github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d // indirect - github.com/aler9/gortsplib v0.0.0-20210106201702-d17ef3fcc3ff + github.com/aler9/gortsplib v0.0.0-20210106212707-7b8b8e7c84d6 github.com/davecgh/go-spew v1.1.1 // indirect github.com/fsnotify/fsnotify v1.4.9 github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 diff --git a/go.sum b/go.sum index 4815f029..12482af2 100644 --- a/go.sum +++ b/go.sum @@ -2,8 +2,8 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 h1:JYp7IbQjafo github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d h1:UQZhZ2O0vMHr2cI+DC1Mbh0TJxzA3RcLoMsFw+aXw7E= github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho= -github.com/aler9/gortsplib v0.0.0-20210106201702-d17ef3fcc3ff h1:rXe4QSWV7QwDaOW1NCqEOa7T4p5N86Q13urvo82TuPg= -github.com/aler9/gortsplib v0.0.0-20210106201702-d17ef3fcc3ff/go.mod h1:8P09VjpiPJFyfkVosyF5/TY82jNwkMN165NS/7sc32I= +github.com/aler9/gortsplib v0.0.0-20210106212707-7b8b8e7c84d6 h1:rqUzaMn1Yg2LK/yr+gaRXnMC/o1ciZuW4wZubzO8BGo= +github.com/aler9/gortsplib v0.0.0-20210106212707-7b8b8e7c84d6/go.mod h1:8P09VjpiPJFyfkVosyF5/TY82jNwkMN165NS/7sc32I= github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= diff --git a/internal/client/client.go b/internal/client/client.go index 680232d9..3afdf45a 100644 --- a/internal/client/client.go +++ b/internal/client/client.go @@ -15,7 +15,6 @@ import ( "github.com/aler9/gortsplib/pkg/auth" "github.com/aler9/gortsplib/pkg/base" "github.com/aler9/gortsplib/pkg/headers" - "github.com/aler9/gortsplib/pkg/rtcpreceiver" "github.com/aler9/rtsp-simple-server/internal/conf" "github.com/aler9/rtsp-simple-server/internal/externalcmd" @@ -24,10 +23,8 @@ import ( ) const ( - checkStreamInterval = 5 * time.Second - receiverReportInterval = 10 * time.Second - sessionID = "12345678" - pauseAfterAuthError = 2 * time.Second + sessionID = "12345678" + pauseAfterAuthError = 2 * time.Second ) type describeData struct { @@ -69,22 +66,17 @@ type Client struct { conn *gortsplib.ServerConn parent Parent - path Path - authUser string - authPass string - authValidator *auth.Validator - authFailures int - rtcpReceivers map[int]*rtcpreceiver.RTCPReceiver - udpLastFrameTimes []*int64 - onReadCmd *externalcmd.Cmd - onPublishCmd *externalcmd.Cmd + path Path + authUser string + authPass string + authValidator *auth.Validator + authFailures int + onReadCmd *externalcmd.Cmd + onPublishCmd *externalcmd.Cmd // in describeData chan describeData // from path terminate chan struct{} - - backgroundRecordTerminate chan struct{} - backgroundRecordDone chan struct{} } // New allocates a Client. @@ -110,7 +102,6 @@ func New( stats: stats, conn: conn, parent: parent, - rtcpReceivers: make(map[int]*rtcpreceiver.RTCPReceiver), terminate: make(chan struct{}), } @@ -282,11 +273,6 @@ func (c *Client) run() { } } - for trackID, t := range tracks { - clockRate, _ := t.ClockRate() - c.rtcpReceivers[trackID] = rtcpreceiver.New(nil, clockRate) - } - c.path = path return &base.Response{ @@ -498,12 +484,6 @@ func (c *Client) run() { }, fmt.Errorf("path has changed, was '%s', now is '%s'", c.path.Name(), basePath) } - if c.conn.TracksLen() != c.path.SourceTrackCount() { - return &base.Response{ - StatusCode: base.StatusBadRequest, - }, fmt.Errorf("not all tracks have been setup") - } - c.startRecord() return &base.Response{ @@ -538,16 +518,7 @@ func (c *Client) run() { return } - if *c.conn.TracksProtocol() == gortsplib.StreamProtocolUDP { - now := time.Now() - atomic.StoreInt64(c.udpLastFrameTimes[trackID], now.Unix()) - c.rtcpReceivers[trackID].ProcessFrame(now, streamType, payload) - c.path.OnFrame(trackID, streamType, payload) - - } else { - c.rtcpReceivers[trackID].ProcessFrame(time.Now(), streamType, payload) - c.path.OnFrame(trackID, streamType, payload) - } + c.path.OnFrame(trackID, streamType, payload) } readDone := c.conn.Read(gortsplib.ServerConnReadHandlers{ @@ -718,97 +689,20 @@ func (c *Client) startRecord() { return "tracks" }(), *c.conn.TracksProtocol()) - if *c.conn.TracksProtocol() == gortsplib.StreamProtocolUDP { - c.udpLastFrameTimes = make([]*int64, c.conn.TracksLen()) - for trackID := range c.conn.Tracks() { - v := time.Now().Unix() - c.udpLastFrameTimes[trackID] = &v - } - } - if c.path.Conf().RunOnPublish != "" { c.onPublishCmd = externalcmd.New(c.path.Conf().RunOnPublish, c.path.Conf().RunOnPublishRestart, externalcmd.Environment{ Path: c.path.Name(), Port: strconv.FormatInt(int64(c.rtspPort), 10), }) } - - c.backgroundRecordTerminate = make(chan struct{}) - c.backgroundRecordDone = make(chan struct{}) - - if *c.conn.TracksProtocol() == gortsplib.StreamProtocolUDP { - go c.backgroundRecordUDP() - } else { - go c.backgroundRecordTCP() - } } func (c *Client) stopRecord() { - close(c.backgroundRecordTerminate) - <-c.backgroundRecordDone - if c.path.Conf().RunOnPublish != "" { c.onPublishCmd.Close() } } -func (c *Client) backgroundRecordUDP() { - defer close(c.backgroundRecordDone) - - checkStreamTicker := time.NewTicker(checkStreamInterval) - defer checkStreamTicker.Stop() - - receiverReportTicker := time.NewTicker(receiverReportInterval) - defer receiverReportTicker.Stop() - - for { - select { - case <-checkStreamTicker.C: - now := time.Now() - for _, lastUnix := range c.udpLastFrameTimes { - last := time.Unix(atomic.LoadInt64(lastUnix), 0) - - if now.Sub(last) >= c.readTimeout { - c.log(logger.Info, "ERR: no UDP packets received recently (maybe there's a firewall/NAT in between)") - c.conn.Close() - return - } - } - - case <-receiverReportTicker.C: - now := time.Now() - for trackID := range c.conn.Tracks() { - r := c.rtcpReceivers[trackID].Report(now) - c.conn.WriteFrame(trackID, gortsplib.StreamTypeRTP, r) - } - - case <-c.backgroundRecordTerminate: - return - } - } -} - -func (c *Client) backgroundRecordTCP() { - defer close(c.backgroundRecordDone) - - receiverReportTicker := time.NewTicker(receiverReportInterval) - defer receiverReportTicker.Stop() - - for { - select { - case <-receiverReportTicker.C: - now := time.Now() - for trackID := range c.conn.Tracks() { - r := c.rtcpReceivers[trackID].Report(now) - c.conn.WriteFrame(trackID, gortsplib.StreamTypeRTCP, r) - } - - case <-c.backgroundRecordTerminate: - return - } - } -} - // OnReaderFrame implements path.Reader. func (c *Client) OnReaderFrame(trackID int, streamType base.StreamType, buf []byte) { if !c.conn.HasTrack(trackID) {