1
0
Fork 0
forked from External/mediamtx

RTSP client: fix race condition during computation of RTP-Info that prevented clients from reading frames (#353)

This commit is contained in:
aler9 2021-04-17 13:55:52 +02:00
parent a6ed028227
commit 8b3cd43a51
3 changed files with 34 additions and 37 deletions

View file

@ -334,7 +334,7 @@ func (c *Client) run() {
// add RTP-Info
var ri headers.RTPInfo
for trackID, ti := range res.TrackInfos {
if !ti.Initialized {
if ti.LastTimeNTP == 0 {
continue
}
@ -351,8 +351,8 @@ func (c *Client) run() {
}
clockRate, _ := track.ClockRate()
ts := uint32(uint64(time.Since(ti.NTPTime).Seconds()*float64(clockRate)) +
uint64(ti.RTPTime))
ts := uint32(uint64(ti.LastTimeRTP) +
uint64(time.Since(time.Unix(ti.LastTimeNTP, 0)).Seconds()*float64(clockRate)))
lsn := ti.LastSequenceNumber
ri = append(ri, &headers.RTPInfoEntry{

View file

@ -2,7 +2,6 @@ package streamproc
import (
"encoding/binary"
"sync"
"sync/atomic"
"time"
@ -16,68 +15,63 @@ type Path interface {
// TrackInfo contains infos about a track.
type TrackInfo struct {
Initialized bool
LastSequenceNumber uint16
RTPTime uint32
NTPTime time.Time
LastTimeRTP uint32
LastTimeNTP int64
}
type trackInfo struct {
initialized bool
type track struct {
lastSequenceNumber uint32
timeMutex sync.Mutex
rtpTime uint32
ntpTime time.Time
lastTimeRTP uint32
lastTimeNTP int64
}
// StreamProc is a stream processor, an intermediate layer between a source and a path.
type StreamProc struct {
path Path
trackInfos []trackInfo
path Path
tracks []*track
}
// New allocates a StreamProc.
func New(path Path, tracksLen int) *StreamProc {
return &StreamProc{
path: path,
trackInfos: make([]trackInfo, tracksLen),
sp := &StreamProc{
path: path,
}
sp.tracks = make([]*track, tracksLen)
for i := range sp.tracks {
sp.tracks[i] = &track{}
}
return sp
}
// TrackInfos returns infos about the tracks of the stream.
func (sp *StreamProc) TrackInfos() []TrackInfo {
ret := make([]TrackInfo, len(sp.trackInfos))
for trackID := range sp.trackInfos {
sp.trackInfos[trackID].timeMutex.Lock()
ret := make([]TrackInfo, len(sp.tracks))
for trackID, track := range sp.tracks {
ret[trackID] = TrackInfo{
Initialized: sp.trackInfos[trackID].initialized,
LastSequenceNumber: uint16(atomic.LoadUint32(&sp.trackInfos[trackID].lastSequenceNumber)),
RTPTime: sp.trackInfos[trackID].rtpTime,
NTPTime: sp.trackInfos[trackID].ntpTime,
LastSequenceNumber: uint16(atomic.LoadUint32(&track.lastSequenceNumber)),
LastTimeRTP: atomic.LoadUint32(&track.lastTimeRTP),
LastTimeNTP: atomic.LoadInt64(&track.lastTimeNTP),
}
sp.trackInfos[trackID].timeMutex.Unlock()
}
return ret
}
// OnFrame processes a frame.
func (sp *StreamProc) OnFrame(trackID int, streamType gortsplib.StreamType, payload []byte) {
if streamType == gortsplib.StreamTypeRTP && len(payload) >= 8 {
track := sp.tracks[trackID]
// store last sequence number
sequenceNumber := binary.BigEndian.Uint16(payload[2 : 2+2])
atomic.StoreUint32(&sp.trackInfos[trackID].lastSequenceNumber, uint32(sequenceNumber))
atomic.StoreUint32(&track.lastSequenceNumber, uint32(sequenceNumber))
// store time mapping
if !sp.trackInfos[trackID].initialized {
timestamp := binary.BigEndian.Uint32(payload[4 : 4+4])
sp.trackInfos[trackID].timeMutex.Lock()
sp.trackInfos[trackID].initialized = true
sp.trackInfos[trackID].rtpTime = timestamp
sp.trackInfos[trackID].ntpTime = time.Now()
sp.trackInfos[trackID].timeMutex.Unlock()
}
// store last RTP time and correspondent NTP time
timestamp := binary.BigEndian.Uint32(payload[4 : 4+4])
atomic.StoreUint32(&track.lastTimeRTP, timestamp)
atomic.StoreInt64(&track.lastTimeNTP, time.Now().Unix())
}
sp.path.OnSPFrame(trackID, streamType, payload)

View file

@ -321,4 +321,7 @@ func TestSourceRTSPRTPInfo(t *testing.T) {
Timestamp: (*dest.RTPInfo())[1].Timestamp,
},
}, dest.RTPInfo())
require.Less(t, uint32(756436454), *(*dest.RTPInfo())[0].Timestamp)
require.Less(t, uint32(156457686), *(*dest.RTPInfo())[1].Timestamp)
}