From 8b3cd43a51adfa7228c9e33f0f95d91174e8c549 Mon Sep 17 00:00:00 2001 From: aler9 <46489434+aler9@users.noreply.github.com> Date: Sat, 17 Apr 2021 13:55:52 +0200 Subject: [PATCH] RTSP client: fix race condition during computation of RTP-Info that prevented clients from reading frames (#353) --- internal/clientrtsp/client.go | 6 +-- internal/streamproc/streamproc.go | 62 ++++++++++++++----------------- main_sourcertsp_test.go | 3 ++ 3 files changed, 34 insertions(+), 37 deletions(-) diff --git a/internal/clientrtsp/client.go b/internal/clientrtsp/client.go index 843a79c6..2cea2787 100644 --- a/internal/clientrtsp/client.go +++ b/internal/clientrtsp/client.go @@ -334,7 +334,7 @@ func (c *Client) run() { // add RTP-Info var ri headers.RTPInfo for trackID, ti := range res.TrackInfos { - if !ti.Initialized { + if ti.LastTimeNTP == 0 { continue } @@ -351,8 +351,8 @@ func (c *Client) run() { } clockRate, _ := track.ClockRate() - ts := uint32(uint64(time.Since(ti.NTPTime).Seconds()*float64(clockRate)) + - uint64(ti.RTPTime)) + ts := uint32(uint64(ti.LastTimeRTP) + + uint64(time.Since(time.Unix(ti.LastTimeNTP, 0)).Seconds()*float64(clockRate))) lsn := ti.LastSequenceNumber ri = append(ri, &headers.RTPInfoEntry{ diff --git a/internal/streamproc/streamproc.go b/internal/streamproc/streamproc.go index e9c0a1b6..b438d3e4 100644 --- a/internal/streamproc/streamproc.go +++ b/internal/streamproc/streamproc.go @@ -2,7 +2,6 @@ package streamproc import ( "encoding/binary" - "sync" "sync/atomic" "time" @@ -16,68 +15,63 @@ type Path interface { // TrackInfo contains infos about a track. type TrackInfo struct { - Initialized bool LastSequenceNumber uint16 - RTPTime uint32 - NTPTime time.Time + LastTimeRTP uint32 + LastTimeNTP int64 } -type trackInfo struct { - initialized bool +type track struct { lastSequenceNumber uint32 - timeMutex sync.Mutex - rtpTime uint32 - ntpTime time.Time + lastTimeRTP uint32 + lastTimeNTP int64 } // StreamProc is a stream processor, an intermediate layer between a source and a path. type StreamProc struct { - path Path - trackInfos []trackInfo + path Path + tracks []*track } // New allocates a StreamProc. func New(path Path, tracksLen int) *StreamProc { - return &StreamProc{ - path: path, - trackInfos: make([]trackInfo, tracksLen), + sp := &StreamProc{ + path: path, } + + sp.tracks = make([]*track, tracksLen) + for i := range sp.tracks { + sp.tracks[i] = &track{} + } + + return sp } // TrackInfos returns infos about the tracks of the stream. func (sp *StreamProc) TrackInfos() []TrackInfo { - ret := make([]TrackInfo, len(sp.trackInfos)) - - for trackID := range sp.trackInfos { - sp.trackInfos[trackID].timeMutex.Lock() + ret := make([]TrackInfo, len(sp.tracks)) + for trackID, track := range sp.tracks { ret[trackID] = TrackInfo{ - Initialized: sp.trackInfos[trackID].initialized, - LastSequenceNumber: uint16(atomic.LoadUint32(&sp.trackInfos[trackID].lastSequenceNumber)), - RTPTime: sp.trackInfos[trackID].rtpTime, - NTPTime: sp.trackInfos[trackID].ntpTime, + LastSequenceNumber: uint16(atomic.LoadUint32(&track.lastSequenceNumber)), + LastTimeRTP: atomic.LoadUint32(&track.lastTimeRTP), + LastTimeNTP: atomic.LoadInt64(&track.lastTimeNTP), } - sp.trackInfos[trackID].timeMutex.Unlock() } - return ret } // OnFrame processes a frame. func (sp *StreamProc) OnFrame(trackID int, streamType gortsplib.StreamType, payload []byte) { if streamType == gortsplib.StreamTypeRTP && len(payload) >= 8 { + track := sp.tracks[trackID] + // store last sequence number sequenceNumber := binary.BigEndian.Uint16(payload[2 : 2+2]) - atomic.StoreUint32(&sp.trackInfos[trackID].lastSequenceNumber, uint32(sequenceNumber)) + atomic.StoreUint32(&track.lastSequenceNumber, uint32(sequenceNumber)) - // store time mapping - if !sp.trackInfos[trackID].initialized { - timestamp := binary.BigEndian.Uint32(payload[4 : 4+4]) - sp.trackInfos[trackID].timeMutex.Lock() - sp.trackInfos[trackID].initialized = true - sp.trackInfos[trackID].rtpTime = timestamp - sp.trackInfos[trackID].ntpTime = time.Now() - sp.trackInfos[trackID].timeMutex.Unlock() - } + // store last RTP time and correspondent NTP time + timestamp := binary.BigEndian.Uint32(payload[4 : 4+4]) + atomic.StoreUint32(&track.lastTimeRTP, timestamp) + atomic.StoreInt64(&track.lastTimeNTP, time.Now().Unix()) } sp.path.OnSPFrame(trackID, streamType, payload) diff --git a/main_sourcertsp_test.go b/main_sourcertsp_test.go index 4aa1dc6a..43eca6fc 100644 --- a/main_sourcertsp_test.go +++ b/main_sourcertsp_test.go @@ -321,4 +321,7 @@ func TestSourceRTSPRTPInfo(t *testing.T) { Timestamp: (*dest.RTPInfo())[1].Timestamp, }, }, dest.RTPInfo()) + + require.Less(t, uint32(756436454), *(*dest.RTPInfo())[0].Timestamp) + require.Less(t, uint32(156457686), *(*dest.RTPInfo())[1].Timestamp) }