From d338e04df78aab027e8e5a1fe5e06c88813149e5 Mon Sep 17 00:00:00 2001 From: aler9 <46489434+aler9@users.noreply.github.com> Date: Tue, 23 Mar 2021 21:31:43 +0100 Subject: [PATCH] readd seqnum to RTP-Info (#233) --- go.mod | 2 +- go.sum | 4 +- internal/client/client.go | 2 +- internal/clientrtsp/client.go | 13 ++--- internal/path/path.go | 36 +------------ internal/streamproc/streamproc.go | 86 +++++++++++++++++++------------ main_clientrtsp_test.go | 21 +++++--- main_sourcertsp_test.go | 14 +++-- 8 files changed, 89 insertions(+), 89 deletions(-) diff --git a/go.mod b/go.mod index f73ad439..907c30c2 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.15 require ( github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 // indirect github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d // indirect - github.com/aler9/gortsplib v0.0.0-20210322195141-1a50a3a5a027 + github.com/aler9/gortsplib v0.0.0-20210323202043-7e9e266054fc github.com/davecgh/go-spew v1.1.1 // indirect github.com/fsnotify/fsnotify v1.4.9 github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 diff --git a/go.sum b/go.sum index 1e760edc..6c292ae0 100644 --- a/go.sum +++ b/go.sum @@ -2,8 +2,8 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 h1:JYp7IbQjafo github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d h1:UQZhZ2O0vMHr2cI+DC1Mbh0TJxzA3RcLoMsFw+aXw7E= github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho= -github.com/aler9/gortsplib v0.0.0-20210322195141-1a50a3a5a027 h1:cJkd74/wqKvjAUmvIoBElY12m+R2I0Pzk0UR14xyT0c= -github.com/aler9/gortsplib v0.0.0-20210322195141-1a50a3a5a027/go.mod h1:zVCg+TQX445hh1pC5QgAuuBvvXZMWLY1XYz626dGFqY= +github.com/aler9/gortsplib v0.0.0-20210323202043-7e9e266054fc h1:uxCYfD2G2vlGMjxB3sGP++PJP1sJFd6ABl2TTV0/r9g= +github.com/aler9/gortsplib v0.0.0-20210323202043-7e9e266054fc/go.mod h1:zVCg+TQX445hh1pC5QgAuuBvvXZMWLY1XYz626dGFqY= github.com/aler9/rtmp v0.0.0-20210309202041-2d7177b7300d h1:LAX8pNvYpGgFpKdbPpEZWjNkHbmyvjMrT3vO7s7aaKU= github.com/aler9/rtmp v0.0.0-20210309202041-2d7177b7300d/go.mod h1:vzuE21rowz+lT1NGsWbreIvYulgBpCGnQyeTyFblUHc= github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= diff --git a/internal/client/client.go b/internal/client/client.go index 493f6138..ee8d00d6 100644 --- a/internal/client/client.go +++ b/internal/client/client.go @@ -104,7 +104,7 @@ type RemoveReq struct { // PlayRes is a play response. type PlayRes struct { - TrackStartingPoints []streamproc.TrackStartingPoint + TrackInfos []streamproc.TrackInfo } // PlayReq is a play request. diff --git a/internal/clientrtsp/client.go b/internal/clientrtsp/client.go index 53b50210..b35b6d75 100644 --- a/internal/clientrtsp/client.go +++ b/internal/clientrtsp/client.go @@ -326,8 +326,8 @@ func (c *Client) run() { // add RTP-Info var ri headers.RTPInfo - for trackID, v := range res.TrackStartingPoints { - if !v.Filled { + for trackID, ti := range res.TrackInfos { + if !ti.Initialized { continue } @@ -345,13 +345,14 @@ func (c *Client) run() { u.AddControlAttribute("trackID=" + strconv.FormatInt(int64(trackID), 10)) clockRate, _ := track.ClockRate() - ts := uint32(uint64(time.Since(v.NTPTime).Seconds()*float64(clockRate)) + - uint64(v.RTPTime)) + ts := uint32(uint64(time.Since(ti.NTPTime).Seconds()*float64(clockRate)) + + uint64(ti.RTPTime)) + lsn := ti.LastSequenceNumber ri = append(ri, &headers.RTPInfoEntry{ URL: u, - SequenceNumber: 0, - Timestamp: ts, + SequenceNumber: &lsn, + Timestamp: &ts, }) } if len(ri) > 0 { diff --git a/internal/path/path.go b/internal/path/path.go index 3397d244..074505f9 100644 --- a/internal/path/path.go +++ b/internal/path/path.go @@ -77,7 +77,6 @@ type Path struct { setupPlayRequests []client.SetupPlayReq source source.Source sourceTracks gortsplib.Tracks - sourceTrackStartingPoints []streamproc.TrackStartingPoint sp *streamproc.StreamProc readers *readersMap onDemandCmd *externalcmd.Cmd @@ -101,7 +100,6 @@ type Path struct { clientRecord chan client.RecordReq clientPause chan client.PauseReq clientRemove chan client.RemoveReq - spSetStartingPoint chan streamproc.SetStartingPointReq terminate chan struct{} } @@ -146,7 +144,6 @@ func New( clientRecord: make(chan client.RecordReq), clientPause: make(chan client.PauseReq), clientRemove: make(chan client.RemoveReq), - spSetStartingPoint: make(chan streamproc.SetStartingPointReq), terminate: make(chan struct{}), } @@ -228,7 +225,6 @@ outer: case req := <-pa.extSourceSetReady: pa.sourceTracks = req.Tracks - pa.sourceTrackStartingPoints = make([]streamproc.TrackStartingPoint, len(req.Tracks)) pa.sp = streamproc.New(pa, len(req.Tracks)) pa.onSourceSetReady() req.Res <- source.ExtSetReadyRes{SP: pa.sp} @@ -269,9 +265,6 @@ outer: pa.clientsWg.Done() close(req.Res) - case req := <-pa.spSetStartingPoint: - pa.onSPSetStartingPoint(req) - case <-pa.terminate: pa.exhaustChannels() break outer @@ -330,7 +323,6 @@ outer: close(pa.clientRecord) close(pa.clientPause) close(pa.clientRemove) - close(pa.spSetStartingPoint) } func (pa *Path) exhaustChannels() { @@ -397,12 +389,6 @@ func (pa *Path) exhaustChannels() { pa.clientsWg.Done() close(req.Res) - - case _, ok := <-pa.spSetStartingPoint: - if !ok { - return - } - } } }() @@ -657,13 +643,7 @@ func (pa *Path) onClientPlay(req client.PlayReq) { pa.clients[req.Client] = clientStatePlay pa.readers.add(req.Client) - // clone - cl := make([]streamproc.TrackStartingPoint, len(pa.sourceTrackStartingPoints)) - for k, v := range pa.sourceTrackStartingPoints { - cl[k] = v - } - - req.Res <- client.PlayRes{cl} // nolint:govet + req.Res <- client.PlayRes{TrackInfos: pa.sp.TrackInfos()} } func (pa *Path) onClientAnnounce(req client.AnnounceReq) { @@ -713,7 +693,6 @@ func (pa *Path) onClientRecord(req client.RecordReq) { pa.clients[req.Client] = clientStateRecord pa.onSourceSetReady() - pa.sourceTrackStartingPoints = make([]streamproc.TrackStartingPoint, len(pa.sourceTracks)) pa.sp = streamproc.New(pa, len(pa.sourceTracks)) req.Res <- client.RecordRes{SP: pa.sp, Err: nil} @@ -740,14 +719,6 @@ func (pa *Path) onClientPause(req client.PauseReq) { close(req.Res) } -func (pa *Path) onSPSetStartingPoint(req streamproc.SetStartingPointReq) { - if req.SP != pa.sp { - return - } - - pa.sourceTrackStartingPoints[req.TrackID] = req.StartingPoint -} - func (pa *Path) scheduleSourceClose() { if !pa.hasExternalSource() || !pa.conf.SourceOnDemand || pa.source == nil { return @@ -855,11 +826,6 @@ func (pa *Path) OnClientPause(req client.PauseReq) { pa.clientPause <- req } -// OnSPSetStartingPoint is called by streamproc.StreamProc. -func (pa *Path) OnSPSetStartingPoint(req streamproc.SetStartingPointReq) { - pa.spSetStartingPoint <- req -} - // OnSPFrame is called by streamproc.StreamProc. func (pa *Path) OnSPFrame(trackID int, streamType gortsplib.StreamType, payload []byte) { pa.readers.forwardFrame(trackID, streamType, payload) diff --git a/internal/streamproc/streamproc.go b/internal/streamproc/streamproc.go index 77c02200..e9c0a1b6 100644 --- a/internal/streamproc/streamproc.go +++ b/internal/streamproc/streamproc.go @@ -1,65 +1,83 @@ package streamproc import ( + "encoding/binary" + "sync" + "sync/atomic" "time" "github.com/aler9/gortsplib" - "github.com/pion/rtp" ) -// TrackStartingPoint is the starting point of a track. -type TrackStartingPoint struct { - Filled bool // used to avoid mutexes - RTPTime uint32 - NTPTime time.Time -} - // Path is implemented by path.path. type Path interface { - OnSPSetStartingPoint(SetStartingPointReq) OnSPFrame(int, gortsplib.StreamType, []byte) } -// SetStartingPointReq is a set starting point request. -type SetStartingPointReq struct { - SP *StreamProc - TrackID int - StartingPoint TrackStartingPoint +// TrackInfo contains infos about a track. +type TrackInfo struct { + Initialized bool + LastSequenceNumber uint16 + RTPTime uint32 + NTPTime time.Time +} + +type trackInfo struct { + initialized bool + lastSequenceNumber uint32 + timeMutex sync.Mutex + rtpTime uint32 + ntpTime time.Time } // StreamProc is a stream processor, an intermediate layer between a source and a path. type StreamProc struct { - path Path - startingPoints []TrackStartingPoint + path Path + trackInfos []trackInfo } // New allocates a StreamProc. func New(path Path, tracksLen int) *StreamProc { return &StreamProc{ - path: path, - startingPoints: make([]TrackStartingPoint, tracksLen), + path: path, + trackInfos: make([]trackInfo, tracksLen), } } +// TrackInfos returns infos about the tracks of the stream. +func (sp *StreamProc) TrackInfos() []TrackInfo { + ret := make([]TrackInfo, len(sp.trackInfos)) + + for trackID := range sp.trackInfos { + sp.trackInfos[trackID].timeMutex.Lock() + ret[trackID] = TrackInfo{ + Initialized: sp.trackInfos[trackID].initialized, + LastSequenceNumber: uint16(atomic.LoadUint32(&sp.trackInfos[trackID].lastSequenceNumber)), + RTPTime: sp.trackInfos[trackID].rtpTime, + NTPTime: sp.trackInfos[trackID].ntpTime, + } + sp.trackInfos[trackID].timeMutex.Unlock() + } + + return ret +} + // OnFrame processes a frame. func (sp *StreamProc) OnFrame(trackID int, streamType gortsplib.StreamType, payload []byte) { - if streamType == gortsplib.StreamTypeRTP && - !sp.startingPoints[trackID].Filled { - pkt := rtp.Packet{} - err := pkt.Unmarshal(payload) - if err != nil { - return + if streamType == gortsplib.StreamTypeRTP && len(payload) >= 8 { + // store last sequence number + sequenceNumber := binary.BigEndian.Uint16(payload[2 : 2+2]) + atomic.StoreUint32(&sp.trackInfos[trackID].lastSequenceNumber, uint32(sequenceNumber)) + + // store time mapping + if !sp.trackInfos[trackID].initialized { + timestamp := binary.BigEndian.Uint32(payload[4 : 4+4]) + sp.trackInfos[trackID].timeMutex.Lock() + sp.trackInfos[trackID].initialized = true + sp.trackInfos[trackID].rtpTime = timestamp + sp.trackInfos[trackID].ntpTime = time.Now() + sp.trackInfos[trackID].timeMutex.Unlock() } - - sp.startingPoints[trackID].Filled = true - sp.startingPoints[trackID].RTPTime = pkt.Timestamp - sp.startingPoints[trackID].NTPTime = time.Now() - - sp.path.OnSPSetStartingPoint(SetStartingPointReq{ - SP: sp, - TrackID: trackID, - StartingPoint: sp.startingPoints[trackID], - }) } sp.path.OnSPFrame(trackID, streamType, payload) diff --git a/main_clientrtsp_test.go b/main_clientrtsp_test.go index 385ad7d8..53c4ecb4 100644 --- a/main_clientrtsp_test.go +++ b/main_clientrtsp_test.go @@ -670,8 +670,11 @@ func TestClientRTSPRTPInfo(t *testing.T) { Host: ownDockerIP + ":8554", Path: "/teststream/trackID=0", }, - SequenceNumber: 0, - Timestamp: (*dest.RTPInfo())[0].Timestamp, + SequenceNumber: func() *uint16 { + v := uint16(556) + return &v + }(), + Timestamp: (*dest.RTPInfo())[0].Timestamp, }, }, dest.RTPInfo()) }() @@ -705,8 +708,11 @@ func TestClientRTSPRTPInfo(t *testing.T) { Host: ownDockerIP + ":8554", Path: "/teststream/trackID=0", }, - SequenceNumber: 0, - Timestamp: (*dest.RTPInfo())[0].Timestamp, + SequenceNumber: func() *uint16 { + v := uint16(556) + return &v + }(), + Timestamp: (*dest.RTPInfo())[0].Timestamp, }, &headers.RTPInfoEntry{ URL: &base.URL{ @@ -714,8 +720,11 @@ func TestClientRTSPRTPInfo(t *testing.T) { Host: ownDockerIP + ":8554", Path: "/teststream/trackID=1", }, - SequenceNumber: 0, - Timestamp: (*dest.RTPInfo())[1].Timestamp, + SequenceNumber: func() *uint16 { + v := uint16(87) + return &v + }(), + Timestamp: (*dest.RTPInfo())[1].Timestamp, }, }, dest.RTPInfo()) }() diff --git a/main_sourcertsp_test.go b/main_sourcertsp_test.go index 4528243c..ada3de5c 100644 --- a/main_sourcertsp_test.go +++ b/main_sourcertsp_test.go @@ -301,8 +301,11 @@ func TestSourceRTSPRTPInfo(t *testing.T) { Host: "127.0.1.2:8554", Path: "/proxied/trackID=0", }, - SequenceNumber: 0, - Timestamp: (*dest.RTPInfo())[0].Timestamp, + SequenceNumber: func() *uint16 { + v := uint16(87) + return &v + }(), + Timestamp: (*dest.RTPInfo())[0].Timestamp, }, &headers.RTPInfoEntry{ URL: &base.URL{ @@ -310,8 +313,11 @@ func TestSourceRTSPRTPInfo(t *testing.T) { Host: "127.0.1.2:8554", Path: "/proxied/trackID=1", }, - SequenceNumber: 0, - Timestamp: (*dest.RTPInfo())[1].Timestamp, + SequenceNumber: func() *uint16 { + v := uint16(34254) + return &v + }(), + Timestamp: (*dest.RTPInfo())[1].Timestamp, }, }, dest.RTPInfo()) }