From 0cdae40fe30a0dbe57311f5c16a38f13abd64267 Mon Sep 17 00:00:00 2001 From: Alessandro Ros Date: Sun, 12 Oct 2025 11:02:14 +0200 Subject: [PATCH] estimate absolute timestamp more precisely (#5078) When the absolute timestamp of incoming frames was not available, it was filled with the current timestamp, which is influenced by latency over time. This mechanism is replaced by an algorithm that detects when latency is the lowest, stores the current timestamp and uses it as reference throughout the rest of the stream. --- internal/core/path.go | 9 +- internal/defs/path.go | 2 + internal/ntpestimator/estimator.go | 45 ++++++ internal/ntpestimator/estimator_test.go | 32 +++++ internal/protocols/hls/to_stream.go | 130 +++++++++--------- internal/protocols/mpegts/to_stream.go | 33 ++--- internal/protocols/rtmp/from_stream.go | 2 - internal/protocols/rtmp/to_stream.go | 35 ++--- internal/protocols/rtsp/to_stream.go | 2 +- internal/protocols/webrtc/from_stream_test.go | 2 +- internal/protocols/webrtc/incoming_track.go | 36 ++--- internal/protocols/webrtc/peer_connection.go | 14 +- internal/protocols/webrtc/to_stream.go | 54 +++++++- internal/protocols/webrtc/to_stream_test.go | 4 +- internal/protocols/whip/client.go | 28 ++-- internal/protocols/whip/client_test.go | 4 +- internal/servers/rtmp/conn.go | 1 + internal/servers/rtsp/session.go | 1 + internal/servers/srt/conn.go | 1 + internal/servers/webrtc/server_test.go | 2 +- internal/servers/webrtc/session.go | 5 +- internal/staticsources/mpegts/source.go | 1 + internal/staticsources/rtmp/source.go | 1 + internal/staticsources/rtp/source.go | 3 +- internal/staticsources/rtsp/source.go | 1 + internal/staticsources/srt/source.go | 1 + internal/staticsources/webrtc/source.go | 6 +- internal/stream/stream.go | 2 + internal/stream/stream_format.go | 15 +- internal/stream/stream_media.go | 2 + 30 files changed, 296 insertions(+), 178 deletions(-) create mode 100644 internal/ntpestimator/estimator.go create mode 100644 internal/ntpestimator/estimator_test.go diff --git a/internal/core/path.go b/internal/core/path.go index 4fcbd469..ed6e376b 100644 --- a/internal/core/path.go +++ b/internal/core/path.go @@ -377,7 +377,7 @@ func (pa *path) doReloadConf(newConf *conf.Path) { } func (pa *path) doSourceStaticSetReady(req defs.PathSourceStaticSetReadyReq) { - err := pa.setReady(req.Desc, req.GenerateRTPPackets) + err := pa.setReady(req.Desc, req.GenerateRTPPackets, req.FillNTP) if err != nil { req.Res <- defs.PathSourceStaticSetReadyRes{Err: err} return @@ -474,7 +474,7 @@ func (pa *path) doAddPublisher(req defs.PathAddPublisherReq) { pa.source = req.Author pa.publisherQuery = req.AccessRequest.Query - err := pa.setReady(req.Desc, req.GenerateRTPPackets) + err := pa.setReady(req.Desc, req.GenerateRTPPackets, req.FillNTP) if err != nil { pa.source = nil req.Res <- defs.PathAddPublisherRes{Err: err} @@ -684,12 +684,13 @@ func (pa *path) onDemandPublisherStop(reason string) { pa.onDemandPublisherState = pathOnDemandStateInitial } -func (pa *path) setReady(desc *description.Session, allocateEncoder bool) error { +func (pa *path) setReady(desc *description.Session, generateRTPPackets bool, fillNTP bool) error { pa.stream = &stream.Stream{ WriteQueueSize: pa.writeQueueSize, RTPMaxPayloadSize: pa.rtpMaxPayloadSize, Desc: desc, - GenerateRTPPackets: allocateEncoder, + GenerateRTPPackets: generateRTPPackets, + FillNTP: fillNTP, Parent: pa.source, } err := pa.stream.Initialize() diff --git a/internal/defs/path.go b/internal/defs/path.go index 592063d0..bac443c6 100644 --- a/internal/defs/path.go +++ b/internal/defs/path.go @@ -67,6 +67,7 @@ type PathAddPublisherReq struct { Author Publisher Desc *description.Session GenerateRTPPackets bool + FillNTP bool ConfToCompare *conf.Path AccessRequest PathAccessRequest Res chan PathAddPublisherRes @@ -108,6 +109,7 @@ type PathSourceStaticSetReadyRes struct { type PathSourceStaticSetReadyReq struct { Desc *description.Session GenerateRTPPackets bool + FillNTP bool Res chan PathSourceStaticSetReadyRes } diff --git a/internal/ntpestimator/estimator.go b/internal/ntpestimator/estimator.go new file mode 100644 index 00000000..fa234683 --- /dev/null +++ b/internal/ntpestimator/estimator.go @@ -0,0 +1,45 @@ +// Package ntpestimator contains a NTP estimator. +package ntpestimator + +import ( + "time" +) + +var timeNow = time.Now + +func multiplyAndDivide(v, m, d time.Duration) time.Duration { + secs := v / d + dec := v % d + return (secs*m + dec*m/d) +} + +// Estimator is a NTP estimator. +type Estimator struct { + ClockRate int + + refNTP time.Time + refPTS int64 +} + +var zero = time.Time{} + +// Estimate returns estimated NTP. +func (e *Estimator) Estimate(pts int64) time.Time { + now := timeNow() + + if e.refNTP.Equal(zero) { + e.refNTP = now + e.refPTS = pts + return now + } + + computed := e.refNTP.Add((multiplyAndDivide(time.Duration(pts-e.refPTS), time.Second, time.Duration(e.ClockRate)))) + + if computed.After(now) { + e.refNTP = now + e.refPTS = pts + return now + } + + return computed +} diff --git a/internal/ntpestimator/estimator_test.go b/internal/ntpestimator/estimator_test.go new file mode 100644 index 00000000..4929fcf2 --- /dev/null +++ b/internal/ntpestimator/estimator_test.go @@ -0,0 +1,32 @@ +package ntpestimator + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestEstimator(t *testing.T) { + e := &Estimator{ClockRate: 90000} + + timeNow = func() time.Time { return time.Date(2003, 11, 4, 23, 15, 7, 0, time.UTC) } + ntp := e.Estimate(90000) + require.Equal(t, time.Date(2003, 11, 4, 23, 15, 7, 0, time.UTC), ntp) + + timeNow = func() time.Time { return time.Date(2003, 11, 4, 23, 15, 8, 0, time.UTC) } + ntp = e.Estimate(2 * 90000) + require.Equal(t, time.Date(2003, 11, 4, 23, 15, 8, 0, time.UTC), ntp) + + timeNow = func() time.Time { return time.Date(2003, 11, 4, 23, 15, 10, 0, time.UTC) } + ntp = e.Estimate(3 * 90000) + require.Equal(t, time.Date(2003, 11, 4, 23, 15, 9, 0, time.UTC), ntp) + + timeNow = func() time.Time { return time.Date(2003, 11, 4, 23, 15, 9, 0, time.UTC) } + ntp = e.Estimate(4 * 90000) + require.Equal(t, time.Date(2003, 11, 4, 23, 15, 9, 0, time.UTC), ntp) + + timeNow = func() time.Time { return time.Date(2003, 11, 4, 23, 15, 15, 0, time.UTC) } + ntp = e.Estimate(5 * 90000) + require.Equal(t, time.Date(2003, 11, 4, 23, 15, 10, 0, time.UTC), ntp) +} diff --git a/internal/protocols/hls/to_stream.go b/internal/protocols/hls/to_stream.go index 6c4c2865..919e6337 100644 --- a/internal/protocols/hls/to_stream.go +++ b/internal/protocols/hls/to_stream.go @@ -9,6 +9,7 @@ import ( "github.com/bluenviron/gortsplib/v5/pkg/description" "github.com/bluenviron/gortsplib/v5/pkg/format" "github.com/bluenviron/mediamtx/internal/logger" + "github.com/bluenviron/mediamtx/internal/ntpestimator" "github.com/bluenviron/mediamtx/internal/stream" "github.com/bluenviron/mediamtx/internal/unit" ) @@ -32,55 +33,54 @@ func multiplyAndDivide(v, m, d int64) int64 { func ToStream( c *gohlslib.Client, tracks []*gohlslib.Track, - stream **stream.Stream, + strm **stream.Stream, ) ([]*description.Media, error) { var ntpStat ntpState var ntpStatMutex sync.Mutex - handleNTP := func(track *gohlslib.Track) time.Time { - ntpStatMutex.Lock() - defer ntpStatMutex.Unlock() - - switch ntpStat { - case ntpStateInitial: - ntp, avail := c.AbsoluteTime(track) - if !avail { - ntpStat = ntpStateUnavailable - return time.Now() - } - - ntpStat = ntpStateAvailable - return ntp - - case ntpStateAvailable: - ntp, avail := c.AbsoluteTime(track) - if !avail { - panic("should not happen") - } - - return ntp - - case ntpStateUnavailable: - _, avail := c.AbsoluteTime(track) - if avail { - (*stream).Parent.Log(logger.Warn, "absolute timestamp appeared after stream started, we are not using it") - ntpStat = ntpStateDegraded - } - - return time.Now() - - default: // ntpStateDegraded - return time.Now() - } - } - var medias []*description.Media //nolint:prealloc for _, track := range tracks { - var medi *description.Media - clockRate := track.ClockRate + ctrack := track + ntpEstimator := &ntpestimator.Estimator{ClockRate: track.ClockRate} - switch tcodec := track.Codec.(type) { + handleNTP := func(pts int64) time.Time { + ntpStatMutex.Lock() + defer ntpStatMutex.Unlock() + + switch ntpStat { + case ntpStateInitial: + ntp, avail := c.AbsoluteTime(ctrack) + if !avail { + ntpStat = ntpStateUnavailable + return ntpEstimator.Estimate(pts) + } + ntpStat = ntpStateAvailable + return ntp + + case ntpStateAvailable: + ntp, avail := c.AbsoluteTime(ctrack) + if !avail { + panic("should not happen") + } + return ntp + + case ntpStateUnavailable: + _, avail := c.AbsoluteTime(ctrack) + if avail { + (*strm).Parent.Log(logger.Warn, "absolute timestamp appeared after stream started, we are not using it") + ntpStat = ntpStateDegraded + } + return ntpEstimator.Estimate(pts) + + default: // ntpStateDegraded + return ntpEstimator.Estimate(pts) + } + } + + var medi *description.Media + + switch tcodec := ctrack.Codec.(type) { case *codecs.AV1: medi = &description.Media{ Type: description.MediaTypeVideo, @@ -90,10 +90,10 @@ func ToStream( } newClockRate := medi.Formats[0].ClockRate() - c.OnDataAV1(track, func(pts int64, tu [][]byte) { - (*stream).WriteUnit(medi, medi.Formats[0], &unit.Unit{ - NTP: handleNTP(track), - PTS: multiplyAndDivide(pts, int64(newClockRate), int64(clockRate)), + c.OnDataAV1(ctrack, func(pts int64, tu [][]byte) { + (*strm).WriteUnit(medi, medi.Formats[0], &unit.Unit{ + NTP: handleNTP(pts), + PTS: multiplyAndDivide(pts, int64(newClockRate), int64(ctrack.ClockRate)), Payload: unit.PayloadAV1(tu), }) }) @@ -107,10 +107,10 @@ func ToStream( } newClockRate := medi.Formats[0].ClockRate() - c.OnDataVP9(track, func(pts int64, frame []byte) { - (*stream).WriteUnit(medi, medi.Formats[0], &unit.Unit{ - NTP: handleNTP(track), - PTS: multiplyAndDivide(pts, int64(newClockRate), int64(clockRate)), + c.OnDataVP9(ctrack, func(pts int64, frame []byte) { + (*strm).WriteUnit(medi, medi.Formats[0], &unit.Unit{ + NTP: handleNTP(pts), + PTS: multiplyAndDivide(pts, int64(newClockRate), int64(ctrack.ClockRate)), Payload: unit.PayloadVP9(frame), }) }) @@ -127,10 +127,10 @@ func ToStream( } newClockRate := medi.Formats[0].ClockRate() - c.OnDataH26x(track, func(pts int64, _ int64, au [][]byte) { - (*stream).WriteUnit(medi, medi.Formats[0], &unit.Unit{ - NTP: handleNTP(track), - PTS: multiplyAndDivide(pts, int64(newClockRate), int64(clockRate)), + c.OnDataH26x(ctrack, func(pts int64, _ int64, au [][]byte) { + (*strm).WriteUnit(medi, medi.Formats[0], &unit.Unit{ + NTP: handleNTP(pts), + PTS: multiplyAndDivide(pts, int64(newClockRate), int64(ctrack.ClockRate)), Payload: unit.PayloadH265(au), }) }) @@ -147,10 +147,10 @@ func ToStream( } newClockRate := medi.Formats[0].ClockRate() - c.OnDataH26x(track, func(pts int64, _ int64, au [][]byte) { - (*stream).WriteUnit(medi, medi.Formats[0], &unit.Unit{ - NTP: handleNTP(track), - PTS: multiplyAndDivide(pts, int64(newClockRate), int64(clockRate)), + c.OnDataH26x(ctrack, func(pts int64, _ int64, au [][]byte) { + (*strm).WriteUnit(medi, medi.Formats[0], &unit.Unit{ + NTP: handleNTP(pts), + PTS: multiplyAndDivide(pts, int64(newClockRate), int64(ctrack.ClockRate)), Payload: unit.PayloadH264(au), }) }) @@ -165,10 +165,10 @@ func ToStream( } newClockRate := medi.Formats[0].ClockRate() - c.OnDataOpus(track, func(pts int64, packets [][]byte) { - (*stream).WriteUnit(medi, medi.Formats[0], &unit.Unit{ - NTP: handleNTP(track), - PTS: multiplyAndDivide(pts, int64(newClockRate), int64(clockRate)), + c.OnDataOpus(ctrack, func(pts int64, packets [][]byte) { + (*strm).WriteUnit(medi, medi.Formats[0], &unit.Unit{ + NTP: handleNTP(pts), + PTS: multiplyAndDivide(pts, int64(newClockRate), int64(ctrack.ClockRate)), Payload: unit.PayloadOpus(packets), }) }) @@ -186,10 +186,10 @@ func ToStream( } newClockRate := medi.Formats[0].ClockRate() - c.OnDataMPEG4Audio(track, func(pts int64, aus [][]byte) { - (*stream).WriteUnit(medi, medi.Formats[0], &unit.Unit{ - NTP: handleNTP(track), - PTS: multiplyAndDivide(pts, int64(newClockRate), int64(clockRate)), + c.OnDataMPEG4Audio(ctrack, func(pts int64, aus [][]byte) { + (*strm).WriteUnit(medi, medi.Formats[0], &unit.Unit{ + NTP: handleNTP(pts), + PTS: multiplyAndDivide(pts, int64(newClockRate), int64(ctrack.ClockRate)), Payload: unit.PayloadMPEG4Audio(aus), }) }) diff --git a/internal/protocols/mpegts/to_stream.go b/internal/protocols/mpegts/to_stream.go index 3e8e087b..ec9b45df 100644 --- a/internal/protocols/mpegts/to_stream.go +++ b/internal/protocols/mpegts/to_stream.go @@ -5,7 +5,6 @@ import ( "errors" "fmt" "reflect" - "time" "github.com/bluenviron/gortsplib/v5/pkg/description" "github.com/bluenviron/gortsplib/v5/pkg/format" @@ -24,7 +23,7 @@ var errNoSupportedCodecs = errors.New( // ToStream maps a MPEG-TS stream to a MediaMTX stream. func ToStream( r *EnhancedReader, - stream **stream.Stream, + strm **stream.Stream, l logger.Writer, ) ([]*description.Media, error) { var medias []*description.Media //nolint:prealloc @@ -48,8 +47,7 @@ func ToStream( r.OnDataH265(track, func(pts int64, _ int64, au [][]byte) error { pts = td.Decode(pts) - (*stream).WriteUnit(medi, medi.Formats[0], &unit.Unit{ - NTP: time.Now(), + (*strm).WriteUnit(medi, medi.Formats[0], &unit.Unit{ PTS: pts, // no conversion is needed since clock rate is 90khz in both MPEG-TS and RTSP Payload: unit.PayloadH265(au), }) @@ -68,8 +66,7 @@ func ToStream( r.OnDataH264(track, func(pts int64, _ int64, au [][]byte) error { pts = td.Decode(pts) - (*stream).WriteUnit(medi, medi.Formats[0], &unit.Unit{ - NTP: time.Now(), + (*strm).WriteUnit(medi, medi.Formats[0], &unit.Unit{ PTS: pts, // no conversion is needed since clock rate is 90khz in both MPEG-TS and RTSP Payload: unit.PayloadH264(au), }) @@ -87,8 +84,7 @@ func ToStream( r.OnDataMPEGxVideo(track, func(pts int64, frame []byte) error { pts = td.Decode(pts) - (*stream).WriteUnit(medi, medi.Formats[0], &unit.Unit{ - NTP: time.Now(), + (*strm).WriteUnit(medi, medi.Formats[0], &unit.Unit{ PTS: pts, // no conversion is needed since clock rate is 90khz in both MPEG-TS and RTSP Payload: unit.PayloadMPEG4Video(frame), }) @@ -104,8 +100,7 @@ func ToStream( r.OnDataMPEGxVideo(track, func(pts int64, frame []byte) error { pts = td.Decode(pts) - (*stream).WriteUnit(medi, medi.Formats[0], &unit.Unit{ - NTP: time.Now(), + (*strm).WriteUnit(medi, medi.Formats[0], &unit.Unit{ PTS: pts, // no conversion is needed since clock rate is 90khz in both MPEG-TS and RTSP Payload: unit.PayloadMPEG1Video(frame), }) @@ -124,8 +119,7 @@ func ToStream( r.OnDataOpus(track, func(pts int64, packets [][]byte) error { pts = td.Decode(pts) - (*stream).WriteUnit(medi, medi.Formats[0], &unit.Unit{ - NTP: time.Now(), + (*strm).WriteUnit(medi, medi.Formats[0], &unit.Unit{ PTS: multiplyAndDivide(pts, int64(medi.Formats[0].ClockRate()), 90000), Payload: unit.PayloadOpus(packets), }) @@ -142,8 +136,7 @@ func ToStream( r.OnDataKLV(track, func(pts int64, uni []byte) error { pts = td.Decode(pts) - (*stream).WriteUnit(medi, medi.Formats[0], &unit.Unit{ - NTP: time.Now(), + (*strm).WriteUnit(medi, medi.Formats[0], &unit.Unit{ PTS: pts, Payload: unit.PayloadKLV(uni), }) @@ -165,8 +158,7 @@ func ToStream( r.OnDataMPEG4Audio(track, func(pts int64, aus [][]byte) error { pts = td.Decode(pts) - (*stream).WriteUnit(medi, medi.Formats[0], &unit.Unit{ - NTP: time.Now(), + (*strm).WriteUnit(medi, medi.Formats[0], &unit.Unit{ PTS: multiplyAndDivide(pts, int64(medi.Formats[0].ClockRate()), 90000), Payload: unit.PayloadMPEG4Audio(aus), }) @@ -217,8 +209,7 @@ func ToStream( return err } - (*stream).WriteUnit(medi, medi.Formats[0], &unit.Unit{ - NTP: time.Now(), + (*strm).WriteUnit(medi, medi.Formats[0], &unit.Unit{ PTS: pts, Payload: unit.PayloadMPEG4AudioLATM(buf), }) @@ -238,8 +229,7 @@ func ToStream( r.OnDataMPEG1Audio(track, func(pts int64, frames [][]byte) error { pts = td.Decode(pts) - (*stream).WriteUnit(medi, medi.Formats[0], &unit.Unit{ - NTP: time.Now(), + (*strm).WriteUnit(medi, medi.Formats[0], &unit.Unit{ PTS: pts, // no conversion is needed since clock rate is 90khz in both MPEG-TS and RTSP Payload: unit.PayloadMPEG1Audio(frames), }) @@ -259,8 +249,7 @@ func ToStream( r.OnDataAC3(track, func(pts int64, frame []byte) error { pts = td.Decode(pts) - (*stream).WriteUnit(medi, medi.Formats[0], &unit.Unit{ - NTP: time.Now(), + (*strm).WriteUnit(medi, medi.Formats[0], &unit.Unit{ PTS: multiplyAndDivide(pts, int64(medi.Formats[0].ClockRate()), 90000), Payload: unit.PayloadAC3{frame}, }) diff --git a/internal/protocols/rtmp/from_stream.go b/internal/protocols/rtmp/from_stream.go index 8f1bd9b0..91cccb22 100644 --- a/internal/protocols/rtmp/from_stream.go +++ b/internal/protocols/rtmp/from_stream.go @@ -267,8 +267,6 @@ func FromStream( } case *format.MPEG1Audio: - // TODO: check sample rate and layer, - // unfortunately they are not available at this stage. r.OnData( media, forma, diff --git a/internal/protocols/rtmp/to_stream.go b/internal/protocols/rtmp/to_stream.go index 9e12ba2d..c98c9c53 100644 --- a/internal/protocols/rtmp/to_stream.go +++ b/internal/protocols/rtmp/to_stream.go @@ -31,7 +31,10 @@ func fourCCToString(c message.FourCC) string { } // ToStream maps a RTMP stream to a MediaMTX stream. -func ToStream(r *gortmplib.Reader, stream **stream.Stream) ([]*description.Media, error) { +func ToStream( + r *gortmplib.Reader, + strm **stream.Stream, +) ([]*description.Media, error) { var medias []*description.Media for _, track := range r.Tracks() { @@ -46,8 +49,7 @@ func ToStream(r *gortmplib.Reader, stream **stream.Stream) ([]*description.Media medias = append(medias, medi) r.OnDataAV1(ttrack, func(pts time.Duration, tu [][]byte) { - (*stream).WriteUnit(medi, ctrack, &unit.Unit{ - NTP: time.Now(), + (*strm).WriteUnit(medi, ctrack, &unit.Unit{ PTS: durationToTimestamp(pts, ctrack.ClockRate()), Payload: unit.PayloadAV1(tu), }) @@ -61,8 +63,7 @@ func ToStream(r *gortmplib.Reader, stream **stream.Stream) ([]*description.Media medias = append(medias, medi) r.OnDataVP9(ttrack, func(pts time.Duration, frame []byte) { - (*stream).WriteUnit(medi, ctrack, &unit.Unit{ - NTP: time.Now(), + (*strm).WriteUnit(medi, ctrack, &unit.Unit{ PTS: durationToTimestamp(pts, ctrack.ClockRate()), Payload: unit.PayloadVP9(frame), }) @@ -76,8 +77,7 @@ func ToStream(r *gortmplib.Reader, stream **stream.Stream) ([]*description.Media medias = append(medias, medi) r.OnDataH265(ttrack, func(pts time.Duration, _ time.Duration, au [][]byte) { - (*stream).WriteUnit(medi, ctrack, &unit.Unit{ - NTP: time.Now(), + (*strm).WriteUnit(medi, ctrack, &unit.Unit{ PTS: durationToTimestamp(pts, ctrack.ClockRate()), Payload: unit.PayloadH265(au), }) @@ -91,8 +91,7 @@ func ToStream(r *gortmplib.Reader, stream **stream.Stream) ([]*description.Media medias = append(medias, medi) r.OnDataH264(ttrack, func(pts time.Duration, _ time.Duration, au [][]byte) { - (*stream).WriteUnit(medi, ctrack, &unit.Unit{ - NTP: time.Now(), + (*strm).WriteUnit(medi, ctrack, &unit.Unit{ PTS: durationToTimestamp(pts, ctrack.ClockRate()), Payload: unit.PayloadH264(au), }) @@ -106,8 +105,7 @@ func ToStream(r *gortmplib.Reader, stream **stream.Stream) ([]*description.Media medias = append(medias, medi) r.OnDataOpus(ttrack, func(pts time.Duration, packet []byte) { - (*stream).WriteUnit(medi, ctrack, &unit.Unit{ - NTP: time.Now(), + (*strm).WriteUnit(medi, ctrack, &unit.Unit{ PTS: durationToTimestamp(pts, ctrack.ClockRate()), Payload: unit.PayloadOpus{packet}, }) @@ -121,8 +119,7 @@ func ToStream(r *gortmplib.Reader, stream **stream.Stream) ([]*description.Media medias = append(medias, medi) r.OnDataMPEG4Audio(ttrack, func(pts time.Duration, au []byte) { - (*stream).WriteUnit(medi, ctrack, &unit.Unit{ - NTP: time.Now(), + (*strm).WriteUnit(medi, ctrack, &unit.Unit{ PTS: durationToTimestamp(pts, ctrack.ClockRate()), Payload: unit.PayloadMPEG4Audio{au}, }) @@ -136,8 +133,7 @@ func ToStream(r *gortmplib.Reader, stream **stream.Stream) ([]*description.Media medias = append(medias, medi) r.OnDataMPEG1Audio(ttrack, func(pts time.Duration, frame []byte) { - (*stream).WriteUnit(medi, ctrack, &unit.Unit{ - NTP: time.Now(), + (*strm).WriteUnit(medi, ctrack, &unit.Unit{ PTS: durationToTimestamp(pts, ctrack.ClockRate()), Payload: unit.PayloadMPEG1Audio{frame}, }) @@ -151,8 +147,7 @@ func ToStream(r *gortmplib.Reader, stream **stream.Stream) ([]*description.Media medias = append(medias, medi) r.OnDataAC3(ttrack, func(pts time.Duration, frame []byte) { - (*stream).WriteUnit(medi, ctrack, &unit.Unit{ - NTP: time.Now(), + (*strm).WriteUnit(medi, ctrack, &unit.Unit{ PTS: durationToTimestamp(pts, ctrack.ClockRate()), Payload: unit.PayloadAC3{frame}, }) @@ -166,8 +161,7 @@ func ToStream(r *gortmplib.Reader, stream **stream.Stream) ([]*description.Media medias = append(medias, medi) r.OnDataG711(ttrack, func(pts time.Duration, samples []byte) { - (*stream).WriteUnit(medi, ctrack, &unit.Unit{ - NTP: time.Now(), + (*strm).WriteUnit(medi, ctrack, &unit.Unit{ PTS: durationToTimestamp(pts, ctrack.ClockRate()), Payload: unit.PayloadG711(samples), }) @@ -181,8 +175,7 @@ func ToStream(r *gortmplib.Reader, stream **stream.Stream) ([]*description.Media medias = append(medias, medi) r.OnDataLPCM(ttrack, func(pts time.Duration, samples []byte) { - (*stream).WriteUnit(medi, ctrack, &unit.Unit{ - NTP: time.Now(), + (*strm).WriteUnit(medi, ctrack, &unit.Unit{ PTS: durationToTimestamp(pts, ctrack.ClockRate()), Payload: unit.PayloadLPCM(samples), }) diff --git a/internal/protocols/rtsp/to_stream.go b/internal/protocols/rtsp/to_stream.go index 44f7647e..bbf277e8 100644 --- a/internal/protocols/rtsp/to_stream.go +++ b/internal/protocols/rtsp/to_stream.go @@ -49,7 +49,7 @@ func ToStream( handleNTP := func(pkt *rtp.Packet) (time.Time, bool) { switch ntpStat { case ntpStateReplace: - return time.Now(), true + return time.Time{}, true case ntpStateInitial: ntp, avail := source.PacketNTP(cmedi, pkt) diff --git a/internal/protocols/webrtc/from_stream_test.go b/internal/protocols/webrtc/from_stream_test.go index cfb75b6c..c3a4a413 100644 --- a/internal/protocols/webrtc/from_stream_test.go +++ b/internal/protocols/webrtc/from_stream_test.go @@ -182,7 +182,7 @@ func TestFromStreamResampleOpus(t *testing.T) { n := 0 var ts uint32 - tracks[0].OnPacketRTP = func(pkt *rtp.Packet, _ time.Time) { + tracks[0].OnPacketRTP = func(pkt *rtp.Packet) { n++ switch n { diff --git a/internal/protocols/webrtc/incoming_track.go b/internal/protocols/webrtc/incoming_track.go index 9917d21a..45b18f81 100644 --- a/internal/protocols/webrtc/incoming_track.go +++ b/internal/protocols/webrtc/incoming_track.go @@ -237,22 +237,21 @@ var incomingAudioCodecs = []webrtc.RTPCodecParameters{ // IncomingTrack is an incoming track. type IncomingTrack struct { - OnPacketRTP func(*rtp.Packet, time.Time) + OnPacketRTP func(*rtp.Packet) - useAbsoluteTimestamp bool - track *webrtc.TrackRemote - receiver *webrtc.RTPReceiver - writeRTCP func([]rtcp.Packet) error - log logger.Writer - rtpPacketsReceived *uint64 - rtpPacketsLost *uint64 + track *webrtc.TrackRemote + receiver *webrtc.RTPReceiver + writeRTCP func([]rtcp.Packet) error + log logger.Writer + rtpPacketsReceived *uint64 + rtpPacketsLost *uint64 packetsLost *counterdumper.CounterDumper rtcpReceiver *rtpreceiver.Receiver } func (t *IncomingTrack) initialize() { - t.OnPacketRTP = func(*rtp.Packet, time.Time) {} + t.OnPacketRTP = func(*rtp.Packet) {} } // Codec returns the track codec. @@ -361,30 +360,23 @@ func (t *IncomingTrack) start() { atomic.AddUint64(t.rtpPacketsReceived, uint64(len(packets))) - var ntp time.Time - if t.useAbsoluteTimestamp { - var avail bool - ntp, avail = t.rtcpReceiver.PacketNTP(pkt.Timestamp) - if !avail { - t.log.Log(logger.Warn, "received RTP packet without absolute time, skipping it") - continue - } - } else { - ntp = time.Now() - } - for _, pkt := range packets { // sometimes Chrome sends empty RTP packets. ignore them. if len(pkt.Payload) == 0 { continue } - t.OnPacketRTP(pkt, ntp) + t.OnPacketRTP(pkt) } } }() } +// PacketNTP returns the packet NTP. +func (t *IncomingTrack) PacketNTP(pkt *rtp.Packet) (time.Time, bool) { + return t.rtcpReceiver.PacketNTP(pkt.Timestamp) +} + func (t *IncomingTrack) close() { if t.packetsLost != nil { t.packetsLost.Stop() diff --git a/internal/protocols/webrtc/peer_connection.go b/internal/protocols/webrtc/peer_connection.go index 6519acb1..8f5b22b1 100644 --- a/internal/protocols/webrtc/peer_connection.go +++ b/internal/protocols/webrtc/peer_connection.go @@ -144,7 +144,6 @@ type PeerConnection struct { STUNGatherTimeout conf.Duration Publish bool OutgoingTracks []*OutgoingTrack - UseAbsoluteTimestamp bool Log logger.Writer wr *webrtc.PeerConnection @@ -698,13 +697,12 @@ func (co *PeerConnection) GatherIncomingTracks() error { case pair := <-co.incomingTrack: t := &IncomingTrack{ - useAbsoluteTimestamp: co.UseAbsoluteTimestamp, - track: pair.track, - receiver: pair.receiver, - writeRTCP: co.wr.WriteRTCP, - log: co.Log, - rtpPacketsReceived: co.rtpPacketsReceived, - rtpPacketsLost: co.rtpPacketsLost, + track: pair.track, + receiver: pair.receiver, + writeRTCP: co.wr.WriteRTCP, + log: co.Log, + rtpPacketsReceived: co.rtpPacketsReceived, + rtpPacketsLost: co.rtpPacketsLost, } t.initialize() co.incomingTracks = append(co.incomingTracks, t) diff --git a/internal/protocols/webrtc/to_stream.go b/internal/protocols/webrtc/to_stream.go index 3150a8a2..983567be 100644 --- a/internal/protocols/webrtc/to_stream.go +++ b/internal/protocols/webrtc/to_stream.go @@ -9,11 +9,21 @@ import ( "github.com/bluenviron/gortsplib/v5/pkg/description" "github.com/bluenviron/gortsplib/v5/pkg/format" "github.com/bluenviron/gortsplib/v5/pkg/rtptime" + "github.com/bluenviron/mediamtx/internal/conf" + "github.com/bluenviron/mediamtx/internal/logger" "github.com/bluenviron/mediamtx/internal/stream" "github.com/pion/rtp" "github.com/pion/webrtc/v4" ) +type ntpState int + +const ( + ntpStateInitial ntpState = iota + ntpStateReplace + ntpStateAvailable +) + var errNoSupportedCodecsTo = errors.New( "the stream doesn't contain any supported codec, which are currently " + "AV1, VP9, VP8, H265, H264, Opus, G722, G711, LPCM") @@ -21,7 +31,9 @@ var errNoSupportedCodecsTo = errors.New( // ToStream maps a WebRTC connection to a MediaMTX stream. func ToStream( pc *PeerConnection, - stream **stream.Stream, + pathConf *conf.Path, + strm **stream.Stream, + log logger.Writer, ) ([]*description.Media, error) { var medias []*description.Media //nolint:prealloc timeDecoder := &rtptime.GlobalDecoder{} @@ -142,13 +154,49 @@ func ToStream( Formats: []format.Format{forma}, } - track.OnPacketRTP = func(pkt *rtp.Packet, ntp time.Time) { + var ntpStat ntpState + + if !pathConf.UseAbsoluteTimestamp { + ntpStat = ntpStateReplace + } + + handleNTP := func(pkt *rtp.Packet) (time.Time, bool) { + switch ntpStat { + case ntpStateReplace: + return time.Time{}, true + + case ntpStateInitial: + ntp, avail := track.PacketNTP(pkt) + if !avail { + log.Log(logger.Warn, "received RTP packet without absolute time, skipping it") + return time.Time{}, false + } + + ntpStat = ntpStateAvailable + return ntp, true + + default: // ntpStateAvailable + ntp, avail := track.PacketNTP(pkt) + if !avail { + panic("should not happen") + } + + return ntp, true + } + } + + track.OnPacketRTP = func(pkt *rtp.Packet) { pts, ok := timeDecoder.Decode(track, pkt) if !ok { return } - (*stream).WriteRTPPacket(medi, forma, pkt, ntp, pts) + ntp, ok := handleNTP(pkt) + if !ok { + return + } + + (*strm).WriteRTPPacket(medi, forma, pkt, ntp, pts) } medias = append(medias, medi) diff --git a/internal/protocols/webrtc/to_stream_test.go b/internal/protocols/webrtc/to_stream_test.go index 66767295..afa5437d 100644 --- a/internal/protocols/webrtc/to_stream_test.go +++ b/internal/protocols/webrtc/to_stream_test.go @@ -15,7 +15,7 @@ import ( func TestToStreamNoSupportedCodecs(t *testing.T) { pc := &PeerConnection{} - _, err := ToStream(pc, nil) + _, err := ToStream(pc, &conf.Path{}, nil, nil) require.Equal(t, errNoSupportedCodecsTo, err) } @@ -406,7 +406,7 @@ func TestToStream(t *testing.T) { require.NoError(t, err) var stream *stream.Stream - medias, err := ToStream(pc2, &stream) + medias, err := ToStream(pc2, &conf.Path{}, &stream, nil) require.NoError(t, err) require.Equal(t, ca.out, medias[0].Formats[0]) }) diff --git a/internal/protocols/whip/client.go b/internal/protocols/whip/client.go index 03057a64..6d6c92db 100644 --- a/internal/protocols/whip/client.go +++ b/internal/protocols/whip/client.go @@ -26,12 +26,11 @@ const ( // Client is a WHIP client. type Client struct { - URL *url.URL - Publish bool - OutgoingTracks []*webrtc.OutgoingTrack - HTTPClient *http.Client - UseAbsoluteTimestamp bool - Log logger.Writer + URL *url.URL + Publish bool + OutgoingTracks []*webrtc.OutgoingTrack + HTTPClient *http.Client + Log logger.Writer pc *webrtc.PeerConnection patchIsSupported bool @@ -45,15 +44,14 @@ func (c *Client) Initialize(ctx context.Context) error { } c.pc = &webrtc.PeerConnection{ - LocalRandomUDP: true, - ICEServers: iceServers, - IPsFromInterfaces: true, - HandshakeTimeout: conf.Duration(10 * time.Second), - TrackGatherTimeout: conf.Duration(2 * time.Second), - Publish: c.Publish, - OutgoingTracks: c.OutgoingTracks, - UseAbsoluteTimestamp: c.UseAbsoluteTimestamp, - Log: c.Log, + LocalRandomUDP: true, + ICEServers: iceServers, + IPsFromInterfaces: true, + HandshakeTimeout: conf.Duration(10 * time.Second), + TrackGatherTimeout: conf.Duration(2 * time.Second), + Publish: c.Publish, + OutgoingTracks: c.OutgoingTracks, + Log: c.Log, } err = c.pc.Start() if err != nil { diff --git a/internal/protocols/whip/client_test.go b/internal/protocols/whip/client_test.go index 24a9f3d8..f7dd409a 100644 --- a/internal/protocols/whip/client_test.go +++ b/internal/protocols/whip/client_test.go @@ -225,7 +225,7 @@ func TestClientRead(t *testing.T) { for i, track := range cl.IncomingTracks() { ci := i - track.OnPacketRTP = func(_ *rtp.Packet, _ time.Time) { + track.OnPacketRTP = func(_ *rtp.Packet) { close(recv[ci]) } } @@ -340,7 +340,7 @@ func TestClientPublish(t *testing.T) { for i, track := range pc.IncomingTracks() { ci := i - track.OnPacketRTP = func(_ *rtp.Packet, _ time.Time) { + track.OnPacketRTP = func(_ *rtp.Packet) { close(recv[ci]) } } diff --git a/internal/servers/rtmp/conn.go b/internal/servers/rtmp/conn.go index c1083fa4..1d2609c9 100644 --- a/internal/servers/rtmp/conn.go +++ b/internal/servers/rtmp/conn.go @@ -242,6 +242,7 @@ func (c *conn) runPublish() error { Author: c, Desc: &description.Session{Medias: medias}, GenerateRTPPackets: true, + FillNTP: true, AccessRequest: defs.PathAccessRequest{ Name: pathName, Query: c.rconn.URL.RawQuery, diff --git a/internal/servers/rtsp/session.go b/internal/servers/rtsp/session.go index 47b2f83a..7e4100ea 100644 --- a/internal/servers/rtsp/session.go +++ b/internal/servers/rtsp/session.go @@ -311,6 +311,7 @@ func (s *session) onRecord(_ *gortsplib.ServerHandlerOnRecordCtx) (*base.Respons Author: s, Desc: s.rsession.AnnouncedDescription(), GenerateRTPPackets: false, + FillNTP: !s.pathConf.UseAbsoluteTimestamp, ConfToCompare: s.pathConf, AccessRequest: defs.PathAccessRequest{ Name: s.rsession.Path()[1:], diff --git a/internal/servers/srt/conn.go b/internal/servers/srt/conn.go index 50f36a5f..9ae931da 100644 --- a/internal/servers/srt/conn.go +++ b/internal/servers/srt/conn.go @@ -225,6 +225,7 @@ func (c *conn) runPublishReader(sconn srt.Conn, streamID *streamID, pathConf *co Author: c, Desc: &description.Session{Medias: medias}, GenerateRTPPackets: true, + FillNTP: true, ConfToCompare: pathConf, AccessRequest: defs.PathAccessRequest{ Name: streamID.path, diff --git a/internal/servers/webrtc/server_test.go b/internal/servers/webrtc/server_test.go index 05119140..aae15d2c 100644 --- a/internal/servers/webrtc/server_test.go +++ b/internal/servers/webrtc/server_test.go @@ -567,7 +567,7 @@ func TestServerRead(t *testing.T) { done := make(chan struct{}) - wc.IncomingTracks()[0].OnPacketRTP = func(pkt *rtp.Packet, _ time.Time) { + wc.IncomingTracks()[0].OnPacketRTP = func(pkt *rtp.Packet) { select { case <-done: default: diff --git a/internal/servers/webrtc/session.go b/internal/servers/webrtc/session.go index 2d8ada23..07c3210d 100644 --- a/internal/servers/webrtc/session.go +++ b/internal/servers/webrtc/session.go @@ -168,7 +168,6 @@ func (s *session) runPublish() (int, error) { TrackGatherTimeout: s.trackGatherTimeout, STUNGatherTimeout: s.stunGatherTimeout, Publish: false, - UseAbsoluteTimestamp: pathConf.UseAbsoluteTimestamp, Log: s, } err = pc.Start() @@ -234,7 +233,7 @@ func (s *session) runPublish() (int, error) { var stream *stream.Stream - medias, err := webrtc.ToStream(pc, &stream) + medias, err := webrtc.ToStream(pc, pathConf, &stream, s) if err != nil { return 0, err } @@ -244,6 +243,7 @@ func (s *session) runPublish() (int, error) { Author: s, Desc: &description.Session{Medias: medias}, GenerateRTPPackets: false, + FillNTP: !pathConf.UseAbsoluteTimestamp, ConfToCompare: pathConf, AccessRequest: defs.PathAccessRequest{ Name: s.req.pathName, @@ -312,7 +312,6 @@ func (s *session) runRead() (int, error) { TrackGatherTimeout: s.trackGatherTimeout, STUNGatherTimeout: s.stunGatherTimeout, Publish: true, - UseAbsoluteTimestamp: path.SafeConf().UseAbsoluteTimestamp, Log: s, } diff --git a/internal/staticsources/mpegts/source.go b/internal/staticsources/mpegts/source.go index b2912bf8..95b4995b 100644 --- a/internal/staticsources/mpegts/source.go +++ b/internal/staticsources/mpegts/source.go @@ -120,6 +120,7 @@ func (s *Source) runReader(nc net.Conn) error { res := s.Parent.SetReady(defs.PathSourceStaticSetReadyReq{ Desc: &description.Session{Medias: medias}, GenerateRTPPackets: true, + FillNTP: true, }) if res.Err != nil { return res.Err diff --git a/internal/staticsources/rtmp/source.go b/internal/staticsources/rtmp/source.go index 731deb48..90bd03fe 100644 --- a/internal/staticsources/rtmp/source.go +++ b/internal/staticsources/rtmp/source.go @@ -117,6 +117,7 @@ func (s *Source) runReader(ctx context.Context, u *url.URL, fingerprint string) res := s.Parent.SetReady(defs.PathSourceStaticSetReadyReq{ Desc: &description.Session{Medias: medias}, GenerateRTPPackets: true, + FillNTP: true, }) if res.Err != nil { conn.Close() diff --git a/internal/staticsources/rtp/source.go b/internal/staticsources/rtp/source.go index 6c08aabd..fada03c8 100644 --- a/internal/staticsources/rtp/source.go +++ b/internal/staticsources/rtp/source.go @@ -149,6 +149,7 @@ func (s *Source) runReader(desc *description.Session, nc net.Conn) error { res := s.Parent.SetReady(defs.PathSourceStaticSetReadyReq{ Desc: desc, GenerateRTPPackets: false, + FillNTP: true, }) if res.Err != nil { return res.Err @@ -171,7 +172,7 @@ func (s *Source) runReader(desc *description.Session, nc net.Conn) error { continue } - stream.WriteRTPPacket(media, forma, &pkt, time.Now(), pts) + stream.WriteRTPPacket(media, forma, &pkt, time.Time{}, pts) } } diff --git a/internal/staticsources/rtsp/source.go b/internal/staticsources/rtsp/source.go index 36036f08..4e0091a3 100644 --- a/internal/staticsources/rtsp/source.go +++ b/internal/staticsources/rtsp/source.go @@ -195,6 +195,7 @@ func (s *Source) Run(params defs.StaticSourceRunParams) error { res := s.Parent.SetReady(defs.PathSourceStaticSetReadyReq{ Desc: desc, GenerateRTPPackets: false, + FillNTP: !params.Conf.UseAbsoluteTimestamp, }) if res.Err != nil { return res.Err diff --git a/internal/staticsources/srt/source.go b/internal/staticsources/srt/source.go index c8bdb031..12d1435b 100644 --- a/internal/staticsources/srt/source.go +++ b/internal/staticsources/srt/source.go @@ -111,6 +111,7 @@ func (s *Source) runReader(sconn srt.Conn) error { res := s.Parent.SetReady(defs.PathSourceStaticSetReadyReq{ Desc: &description.Session{Medias: medias}, GenerateRTPPackets: true, + FillNTP: true, }) if res.Err != nil { return res.Err diff --git a/internal/staticsources/webrtc/source.go b/internal/staticsources/webrtc/source.go index dfa3211b..3182005c 100644 --- a/internal/staticsources/webrtc/source.go +++ b/internal/staticsources/webrtc/source.go @@ -58,8 +58,7 @@ func (s *Source) Run(params defs.StaticSourceRunParams) error { Timeout: time.Duration(s.ReadTimeout), Transport: tr, }, - UseAbsoluteTimestamp: params.Conf.UseAbsoluteTimestamp, - Log: s, + Log: s, } err = client.Initialize(params.Context) if err != nil { @@ -68,7 +67,7 @@ func (s *Source) Run(params defs.StaticSourceRunParams) error { var stream *stream.Stream - medias, err := webrtc.ToStream(client.PeerConnection(), &stream) + medias, err := webrtc.ToStream(client.PeerConnection(), params.Conf, &stream, s) if err != nil { client.Close() //nolint:errcheck return err @@ -77,6 +76,7 @@ func (s *Source) Run(params defs.StaticSourceRunParams) error { rres := s.Parent.SetReady(defs.PathSourceStaticSetReadyReq{ Desc: &description.Session{Medias: medias}, GenerateRTPPackets: true, + FillNTP: !params.Conf.UseAbsoluteTimestamp, }) if rres.Err != nil { client.Close() //nolint:errcheck diff --git a/internal/stream/stream.go b/internal/stream/stream.go index 34a0e3be..d066cfea 100644 --- a/internal/stream/stream.go +++ b/internal/stream/stream.go @@ -23,6 +23,7 @@ type Stream struct { RTPMaxPayloadSize int Desc *description.Session GenerateRTPPackets bool + FillNTP bool Parent logger.Writer bytesReceived *uint64 @@ -61,6 +62,7 @@ func (s *Stream) Initialize() error { rtpMaxPayloadSize: s.RTPMaxPayloadSize, media: media, generateRTPPackets: s.GenerateRTPPackets, + fillNTP: s.FillNTP, processingErrors: s.processingErrors, parent: s.Parent, } diff --git a/internal/stream/stream_format.go b/internal/stream/stream_format.go index bf41dae8..30ad5296 100644 --- a/internal/stream/stream_format.go +++ b/internal/stream/stream_format.go @@ -11,6 +11,7 @@ import ( "github.com/bluenviron/mediamtx/internal/codecprocessor" "github.com/bluenviron/mediamtx/internal/counterdumper" "github.com/bluenviron/mediamtx/internal/logger" + "github.com/bluenviron/mediamtx/internal/ntpestimator" "github.com/bluenviron/mediamtx/internal/unit" ) @@ -26,11 +27,13 @@ type streamFormat struct { rtpMaxPayloadSize int format format.Format generateRTPPackets bool + fillNTP bool processingErrors *counterdumper.CounterDumper parent logger.Writer - proc codecprocessor.Processor - onDatas map[*Reader]OnDataFunc + proc codecprocessor.Processor + ntpEstimator *ntpestimator.Estimator + onDatas map[*Reader]OnDataFunc } func (sf *streamFormat) initialize() error { @@ -42,6 +45,10 @@ func (sf *streamFormat) initialize() error { return err } + sf.ntpEstimator = &ntpestimator.Estimator{ + ClockRate: sf.format.ClockRate(), + } + return nil } @@ -80,6 +87,10 @@ func (sf *streamFormat) writeRTPPacket( } func (sf *streamFormat) writeUnitInner(s *Stream, medi *description.Media, u *unit.Unit) { + if sf.fillNTP { + u.NTP = sf.ntpEstimator.Estimate(u.PTS) + } + size := unitSize(u) atomic.AddUint64(s.bytesReceived, size) diff --git a/internal/stream/stream_media.go b/internal/stream/stream_media.go index b1d1e183..a0037c05 100644 --- a/internal/stream/stream_media.go +++ b/internal/stream/stream_media.go @@ -11,6 +11,7 @@ type streamMedia struct { rtpMaxPayloadSize int media *description.Media generateRTPPackets bool + fillNTP bool processingErrors *counterdumper.CounterDumper parent logger.Writer @@ -25,6 +26,7 @@ func (sm *streamMedia) initialize() error { rtpMaxPayloadSize: sm.rtpMaxPayloadSize, format: forma, generateRTPPackets: sm.generateRTPPackets, + fillNTP: sm.fillNTP, processingErrors: sm.processingErrors, parent: sm.parent, }