diff --git a/internal/core/path.go b/internal/core/path.go index 4fcbd469..ed6e376b 100644 --- a/internal/core/path.go +++ b/internal/core/path.go @@ -377,7 +377,7 @@ func (pa *path) doReloadConf(newConf *conf.Path) { } func (pa *path) doSourceStaticSetReady(req defs.PathSourceStaticSetReadyReq) { - err := pa.setReady(req.Desc, req.GenerateRTPPackets) + err := pa.setReady(req.Desc, req.GenerateRTPPackets, req.FillNTP) if err != nil { req.Res <- defs.PathSourceStaticSetReadyRes{Err: err} return @@ -474,7 +474,7 @@ func (pa *path) doAddPublisher(req defs.PathAddPublisherReq) { pa.source = req.Author pa.publisherQuery = req.AccessRequest.Query - err := pa.setReady(req.Desc, req.GenerateRTPPackets) + err := pa.setReady(req.Desc, req.GenerateRTPPackets, req.FillNTP) if err != nil { pa.source = nil req.Res <- defs.PathAddPublisherRes{Err: err} @@ -684,12 +684,13 @@ func (pa *path) onDemandPublisherStop(reason string) { pa.onDemandPublisherState = pathOnDemandStateInitial } -func (pa *path) setReady(desc *description.Session, allocateEncoder bool) error { +func (pa *path) setReady(desc *description.Session, generateRTPPackets bool, fillNTP bool) error { pa.stream = &stream.Stream{ WriteQueueSize: pa.writeQueueSize, RTPMaxPayloadSize: pa.rtpMaxPayloadSize, Desc: desc, - GenerateRTPPackets: allocateEncoder, + GenerateRTPPackets: generateRTPPackets, + FillNTP: fillNTP, Parent: pa.source, } err := pa.stream.Initialize() diff --git a/internal/defs/path.go b/internal/defs/path.go index 592063d0..bac443c6 100644 --- a/internal/defs/path.go +++ b/internal/defs/path.go @@ -67,6 +67,7 @@ type PathAddPublisherReq struct { Author Publisher Desc *description.Session GenerateRTPPackets bool + FillNTP bool ConfToCompare *conf.Path AccessRequest PathAccessRequest Res chan PathAddPublisherRes @@ -108,6 +109,7 @@ type PathSourceStaticSetReadyRes struct { type PathSourceStaticSetReadyReq struct { Desc *description.Session GenerateRTPPackets bool + FillNTP bool Res chan PathSourceStaticSetReadyRes } diff --git a/internal/ntpestimator/estimator.go b/internal/ntpestimator/estimator.go new file mode 100644 index 00000000..fa234683 --- /dev/null +++ b/internal/ntpestimator/estimator.go @@ -0,0 +1,45 @@ +// Package ntpestimator contains a NTP estimator. +package ntpestimator + +import ( + "time" +) + +var timeNow = time.Now + +func multiplyAndDivide(v, m, d time.Duration) time.Duration { + secs := v / d + dec := v % d + return (secs*m + dec*m/d) +} + +// Estimator is a NTP estimator. +type Estimator struct { + ClockRate int + + refNTP time.Time + refPTS int64 +} + +var zero = time.Time{} + +// Estimate returns estimated NTP. +func (e *Estimator) Estimate(pts int64) time.Time { + now := timeNow() + + if e.refNTP.Equal(zero) { + e.refNTP = now + e.refPTS = pts + return now + } + + computed := e.refNTP.Add((multiplyAndDivide(time.Duration(pts-e.refPTS), time.Second, time.Duration(e.ClockRate)))) + + if computed.After(now) { + e.refNTP = now + e.refPTS = pts + return now + } + + return computed +} diff --git a/internal/ntpestimator/estimator_test.go b/internal/ntpestimator/estimator_test.go new file mode 100644 index 00000000..4929fcf2 --- /dev/null +++ b/internal/ntpestimator/estimator_test.go @@ -0,0 +1,32 @@ +package ntpestimator + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestEstimator(t *testing.T) { + e := &Estimator{ClockRate: 90000} + + timeNow = func() time.Time { return time.Date(2003, 11, 4, 23, 15, 7, 0, time.UTC) } + ntp := e.Estimate(90000) + require.Equal(t, time.Date(2003, 11, 4, 23, 15, 7, 0, time.UTC), ntp) + + timeNow = func() time.Time { return time.Date(2003, 11, 4, 23, 15, 8, 0, time.UTC) } + ntp = e.Estimate(2 * 90000) + require.Equal(t, time.Date(2003, 11, 4, 23, 15, 8, 0, time.UTC), ntp) + + timeNow = func() time.Time { return time.Date(2003, 11, 4, 23, 15, 10, 0, time.UTC) } + ntp = e.Estimate(3 * 90000) + require.Equal(t, time.Date(2003, 11, 4, 23, 15, 9, 0, time.UTC), ntp) + + timeNow = func() time.Time { return time.Date(2003, 11, 4, 23, 15, 9, 0, time.UTC) } + ntp = e.Estimate(4 * 90000) + require.Equal(t, time.Date(2003, 11, 4, 23, 15, 9, 0, time.UTC), ntp) + + timeNow = func() time.Time { return time.Date(2003, 11, 4, 23, 15, 15, 0, time.UTC) } + ntp = e.Estimate(5 * 90000) + require.Equal(t, time.Date(2003, 11, 4, 23, 15, 10, 0, time.UTC), ntp) +} diff --git a/internal/protocols/hls/to_stream.go b/internal/protocols/hls/to_stream.go index 6c4c2865..919e6337 100644 --- a/internal/protocols/hls/to_stream.go +++ b/internal/protocols/hls/to_stream.go @@ -9,6 +9,7 @@ import ( "github.com/bluenviron/gortsplib/v5/pkg/description" "github.com/bluenviron/gortsplib/v5/pkg/format" "github.com/bluenviron/mediamtx/internal/logger" + "github.com/bluenviron/mediamtx/internal/ntpestimator" "github.com/bluenviron/mediamtx/internal/stream" "github.com/bluenviron/mediamtx/internal/unit" ) @@ -32,55 +33,54 @@ func multiplyAndDivide(v, m, d int64) int64 { func ToStream( c *gohlslib.Client, tracks []*gohlslib.Track, - stream **stream.Stream, + strm **stream.Stream, ) ([]*description.Media, error) { var ntpStat ntpState var ntpStatMutex sync.Mutex - handleNTP := func(track *gohlslib.Track) time.Time { - ntpStatMutex.Lock() - defer ntpStatMutex.Unlock() - - switch ntpStat { - case ntpStateInitial: - ntp, avail := c.AbsoluteTime(track) - if !avail { - ntpStat = ntpStateUnavailable - return time.Now() - } - - ntpStat = ntpStateAvailable - return ntp - - case ntpStateAvailable: - ntp, avail := c.AbsoluteTime(track) - if !avail { - panic("should not happen") - } - - return ntp - - case ntpStateUnavailable: - _, avail := c.AbsoluteTime(track) - if avail { - (*stream).Parent.Log(logger.Warn, "absolute timestamp appeared after stream started, we are not using it") - ntpStat = ntpStateDegraded - } - - return time.Now() - - default: // ntpStateDegraded - return time.Now() - } - } - var medias []*description.Media //nolint:prealloc for _, track := range tracks { - var medi *description.Media - clockRate := track.ClockRate + ctrack := track + ntpEstimator := &ntpestimator.Estimator{ClockRate: track.ClockRate} - switch tcodec := track.Codec.(type) { + handleNTP := func(pts int64) time.Time { + ntpStatMutex.Lock() + defer ntpStatMutex.Unlock() + + switch ntpStat { + case ntpStateInitial: + ntp, avail := c.AbsoluteTime(ctrack) + if !avail { + ntpStat = ntpStateUnavailable + return ntpEstimator.Estimate(pts) + } + ntpStat = ntpStateAvailable + return ntp + + case ntpStateAvailable: + ntp, avail := c.AbsoluteTime(ctrack) + if !avail { + panic("should not happen") + } + return ntp + + case ntpStateUnavailable: + _, avail := c.AbsoluteTime(ctrack) + if avail { + (*strm).Parent.Log(logger.Warn, "absolute timestamp appeared after stream started, we are not using it") + ntpStat = ntpStateDegraded + } + return ntpEstimator.Estimate(pts) + + default: // ntpStateDegraded + return ntpEstimator.Estimate(pts) + } + } + + var medi *description.Media + + switch tcodec := ctrack.Codec.(type) { case *codecs.AV1: medi = &description.Media{ Type: description.MediaTypeVideo, @@ -90,10 +90,10 @@ func ToStream( } newClockRate := medi.Formats[0].ClockRate() - c.OnDataAV1(track, func(pts int64, tu [][]byte) { - (*stream).WriteUnit(medi, medi.Formats[0], &unit.Unit{ - NTP: handleNTP(track), - PTS: multiplyAndDivide(pts, int64(newClockRate), int64(clockRate)), + c.OnDataAV1(ctrack, func(pts int64, tu [][]byte) { + (*strm).WriteUnit(medi, medi.Formats[0], &unit.Unit{ + NTP: handleNTP(pts), + PTS: multiplyAndDivide(pts, int64(newClockRate), int64(ctrack.ClockRate)), Payload: unit.PayloadAV1(tu), }) }) @@ -107,10 +107,10 @@ func ToStream( } newClockRate := medi.Formats[0].ClockRate() - c.OnDataVP9(track, func(pts int64, frame []byte) { - (*stream).WriteUnit(medi, medi.Formats[0], &unit.Unit{ - NTP: handleNTP(track), - PTS: multiplyAndDivide(pts, int64(newClockRate), int64(clockRate)), + c.OnDataVP9(ctrack, func(pts int64, frame []byte) { + (*strm).WriteUnit(medi, medi.Formats[0], &unit.Unit{ + NTP: handleNTP(pts), + PTS: multiplyAndDivide(pts, int64(newClockRate), int64(ctrack.ClockRate)), Payload: unit.PayloadVP9(frame), }) }) @@ -127,10 +127,10 @@ func ToStream( } newClockRate := medi.Formats[0].ClockRate() - c.OnDataH26x(track, func(pts int64, _ int64, au [][]byte) { - (*stream).WriteUnit(medi, medi.Formats[0], &unit.Unit{ - NTP: handleNTP(track), - PTS: multiplyAndDivide(pts, int64(newClockRate), int64(clockRate)), + c.OnDataH26x(ctrack, func(pts int64, _ int64, au [][]byte) { + (*strm).WriteUnit(medi, medi.Formats[0], &unit.Unit{ + NTP: handleNTP(pts), + PTS: multiplyAndDivide(pts, int64(newClockRate), int64(ctrack.ClockRate)), Payload: unit.PayloadH265(au), }) }) @@ -147,10 +147,10 @@ func ToStream( } newClockRate := medi.Formats[0].ClockRate() - c.OnDataH26x(track, func(pts int64, _ int64, au [][]byte) { - (*stream).WriteUnit(medi, medi.Formats[0], &unit.Unit{ - NTP: handleNTP(track), - PTS: multiplyAndDivide(pts, int64(newClockRate), int64(clockRate)), + c.OnDataH26x(ctrack, func(pts int64, _ int64, au [][]byte) { + (*strm).WriteUnit(medi, medi.Formats[0], &unit.Unit{ + NTP: handleNTP(pts), + PTS: multiplyAndDivide(pts, int64(newClockRate), int64(ctrack.ClockRate)), Payload: unit.PayloadH264(au), }) }) @@ -165,10 +165,10 @@ func ToStream( } newClockRate := medi.Formats[0].ClockRate() - c.OnDataOpus(track, func(pts int64, packets [][]byte) { - (*stream).WriteUnit(medi, medi.Formats[0], &unit.Unit{ - NTP: handleNTP(track), - PTS: multiplyAndDivide(pts, int64(newClockRate), int64(clockRate)), + c.OnDataOpus(ctrack, func(pts int64, packets [][]byte) { + (*strm).WriteUnit(medi, medi.Formats[0], &unit.Unit{ + NTP: handleNTP(pts), + PTS: multiplyAndDivide(pts, int64(newClockRate), int64(ctrack.ClockRate)), Payload: unit.PayloadOpus(packets), }) }) @@ -186,10 +186,10 @@ func ToStream( } newClockRate := medi.Formats[0].ClockRate() - c.OnDataMPEG4Audio(track, func(pts int64, aus [][]byte) { - (*stream).WriteUnit(medi, medi.Formats[0], &unit.Unit{ - NTP: handleNTP(track), - PTS: multiplyAndDivide(pts, int64(newClockRate), int64(clockRate)), + c.OnDataMPEG4Audio(ctrack, func(pts int64, aus [][]byte) { + (*strm).WriteUnit(medi, medi.Formats[0], &unit.Unit{ + NTP: handleNTP(pts), + PTS: multiplyAndDivide(pts, int64(newClockRate), int64(ctrack.ClockRate)), Payload: unit.PayloadMPEG4Audio(aus), }) }) diff --git a/internal/protocols/mpegts/to_stream.go b/internal/protocols/mpegts/to_stream.go index 3e8e087b..ec9b45df 100644 --- a/internal/protocols/mpegts/to_stream.go +++ b/internal/protocols/mpegts/to_stream.go @@ -5,7 +5,6 @@ import ( "errors" "fmt" "reflect" - "time" "github.com/bluenviron/gortsplib/v5/pkg/description" "github.com/bluenviron/gortsplib/v5/pkg/format" @@ -24,7 +23,7 @@ var errNoSupportedCodecs = errors.New( // ToStream maps a MPEG-TS stream to a MediaMTX stream. func ToStream( r *EnhancedReader, - stream **stream.Stream, + strm **stream.Stream, l logger.Writer, ) ([]*description.Media, error) { var medias []*description.Media //nolint:prealloc @@ -48,8 +47,7 @@ func ToStream( r.OnDataH265(track, func(pts int64, _ int64, au [][]byte) error { pts = td.Decode(pts) - (*stream).WriteUnit(medi, medi.Formats[0], &unit.Unit{ - NTP: time.Now(), + (*strm).WriteUnit(medi, medi.Formats[0], &unit.Unit{ PTS: pts, // no conversion is needed since clock rate is 90khz in both MPEG-TS and RTSP Payload: unit.PayloadH265(au), }) @@ -68,8 +66,7 @@ func ToStream( r.OnDataH264(track, func(pts int64, _ int64, au [][]byte) error { pts = td.Decode(pts) - (*stream).WriteUnit(medi, medi.Formats[0], &unit.Unit{ - NTP: time.Now(), + (*strm).WriteUnit(medi, medi.Formats[0], &unit.Unit{ PTS: pts, // no conversion is needed since clock rate is 90khz in both MPEG-TS and RTSP Payload: unit.PayloadH264(au), }) @@ -87,8 +84,7 @@ func ToStream( r.OnDataMPEGxVideo(track, func(pts int64, frame []byte) error { pts = td.Decode(pts) - (*stream).WriteUnit(medi, medi.Formats[0], &unit.Unit{ - NTP: time.Now(), + (*strm).WriteUnit(medi, medi.Formats[0], &unit.Unit{ PTS: pts, // no conversion is needed since clock rate is 90khz in both MPEG-TS and RTSP Payload: unit.PayloadMPEG4Video(frame), }) @@ -104,8 +100,7 @@ func ToStream( r.OnDataMPEGxVideo(track, func(pts int64, frame []byte) error { pts = td.Decode(pts) - (*stream).WriteUnit(medi, medi.Formats[0], &unit.Unit{ - NTP: time.Now(), + (*strm).WriteUnit(medi, medi.Formats[0], &unit.Unit{ PTS: pts, // no conversion is needed since clock rate is 90khz in both MPEG-TS and RTSP Payload: unit.PayloadMPEG1Video(frame), }) @@ -124,8 +119,7 @@ func ToStream( r.OnDataOpus(track, func(pts int64, packets [][]byte) error { pts = td.Decode(pts) - (*stream).WriteUnit(medi, medi.Formats[0], &unit.Unit{ - NTP: time.Now(), + (*strm).WriteUnit(medi, medi.Formats[0], &unit.Unit{ PTS: multiplyAndDivide(pts, int64(medi.Formats[0].ClockRate()), 90000), Payload: unit.PayloadOpus(packets), }) @@ -142,8 +136,7 @@ func ToStream( r.OnDataKLV(track, func(pts int64, uni []byte) error { pts = td.Decode(pts) - (*stream).WriteUnit(medi, medi.Formats[0], &unit.Unit{ - NTP: time.Now(), + (*strm).WriteUnit(medi, medi.Formats[0], &unit.Unit{ PTS: pts, Payload: unit.PayloadKLV(uni), }) @@ -165,8 +158,7 @@ func ToStream( r.OnDataMPEG4Audio(track, func(pts int64, aus [][]byte) error { pts = td.Decode(pts) - (*stream).WriteUnit(medi, medi.Formats[0], &unit.Unit{ - NTP: time.Now(), + (*strm).WriteUnit(medi, medi.Formats[0], &unit.Unit{ PTS: multiplyAndDivide(pts, int64(medi.Formats[0].ClockRate()), 90000), Payload: unit.PayloadMPEG4Audio(aus), }) @@ -217,8 +209,7 @@ func ToStream( return err } - (*stream).WriteUnit(medi, medi.Formats[0], &unit.Unit{ - NTP: time.Now(), + (*strm).WriteUnit(medi, medi.Formats[0], &unit.Unit{ PTS: pts, Payload: unit.PayloadMPEG4AudioLATM(buf), }) @@ -238,8 +229,7 @@ func ToStream( r.OnDataMPEG1Audio(track, func(pts int64, frames [][]byte) error { pts = td.Decode(pts) - (*stream).WriteUnit(medi, medi.Formats[0], &unit.Unit{ - NTP: time.Now(), + (*strm).WriteUnit(medi, medi.Formats[0], &unit.Unit{ PTS: pts, // no conversion is needed since clock rate is 90khz in both MPEG-TS and RTSP Payload: unit.PayloadMPEG1Audio(frames), }) @@ -259,8 +249,7 @@ func ToStream( r.OnDataAC3(track, func(pts int64, frame []byte) error { pts = td.Decode(pts) - (*stream).WriteUnit(medi, medi.Formats[0], &unit.Unit{ - NTP: time.Now(), + (*strm).WriteUnit(medi, medi.Formats[0], &unit.Unit{ PTS: multiplyAndDivide(pts, int64(medi.Formats[0].ClockRate()), 90000), Payload: unit.PayloadAC3{frame}, }) diff --git a/internal/protocols/rtmp/from_stream.go b/internal/protocols/rtmp/from_stream.go index 8f1bd9b0..91cccb22 100644 --- a/internal/protocols/rtmp/from_stream.go +++ b/internal/protocols/rtmp/from_stream.go @@ -267,8 +267,6 @@ func FromStream( } case *format.MPEG1Audio: - // TODO: check sample rate and layer, - // unfortunately they are not available at this stage. r.OnData( media, forma, diff --git a/internal/protocols/rtmp/to_stream.go b/internal/protocols/rtmp/to_stream.go index 9e12ba2d..c98c9c53 100644 --- a/internal/protocols/rtmp/to_stream.go +++ b/internal/protocols/rtmp/to_stream.go @@ -31,7 +31,10 @@ func fourCCToString(c message.FourCC) string { } // ToStream maps a RTMP stream to a MediaMTX stream. -func ToStream(r *gortmplib.Reader, stream **stream.Stream) ([]*description.Media, error) { +func ToStream( + r *gortmplib.Reader, + strm **stream.Stream, +) ([]*description.Media, error) { var medias []*description.Media for _, track := range r.Tracks() { @@ -46,8 +49,7 @@ func ToStream(r *gortmplib.Reader, stream **stream.Stream) ([]*description.Media medias = append(medias, medi) r.OnDataAV1(ttrack, func(pts time.Duration, tu [][]byte) { - (*stream).WriteUnit(medi, ctrack, &unit.Unit{ - NTP: time.Now(), + (*strm).WriteUnit(medi, ctrack, &unit.Unit{ PTS: durationToTimestamp(pts, ctrack.ClockRate()), Payload: unit.PayloadAV1(tu), }) @@ -61,8 +63,7 @@ func ToStream(r *gortmplib.Reader, stream **stream.Stream) ([]*description.Media medias = append(medias, medi) r.OnDataVP9(ttrack, func(pts time.Duration, frame []byte) { - (*stream).WriteUnit(medi, ctrack, &unit.Unit{ - NTP: time.Now(), + (*strm).WriteUnit(medi, ctrack, &unit.Unit{ PTS: durationToTimestamp(pts, ctrack.ClockRate()), Payload: unit.PayloadVP9(frame), }) @@ -76,8 +77,7 @@ func ToStream(r *gortmplib.Reader, stream **stream.Stream) ([]*description.Media medias = append(medias, medi) r.OnDataH265(ttrack, func(pts time.Duration, _ time.Duration, au [][]byte) { - (*stream).WriteUnit(medi, ctrack, &unit.Unit{ - NTP: time.Now(), + (*strm).WriteUnit(medi, ctrack, &unit.Unit{ PTS: durationToTimestamp(pts, ctrack.ClockRate()), Payload: unit.PayloadH265(au), }) @@ -91,8 +91,7 @@ func ToStream(r *gortmplib.Reader, stream **stream.Stream) ([]*description.Media medias = append(medias, medi) r.OnDataH264(ttrack, func(pts time.Duration, _ time.Duration, au [][]byte) { - (*stream).WriteUnit(medi, ctrack, &unit.Unit{ - NTP: time.Now(), + (*strm).WriteUnit(medi, ctrack, &unit.Unit{ PTS: durationToTimestamp(pts, ctrack.ClockRate()), Payload: unit.PayloadH264(au), }) @@ -106,8 +105,7 @@ func ToStream(r *gortmplib.Reader, stream **stream.Stream) ([]*description.Media medias = append(medias, medi) r.OnDataOpus(ttrack, func(pts time.Duration, packet []byte) { - (*stream).WriteUnit(medi, ctrack, &unit.Unit{ - NTP: time.Now(), + (*strm).WriteUnit(medi, ctrack, &unit.Unit{ PTS: durationToTimestamp(pts, ctrack.ClockRate()), Payload: unit.PayloadOpus{packet}, }) @@ -121,8 +119,7 @@ func ToStream(r *gortmplib.Reader, stream **stream.Stream) ([]*description.Media medias = append(medias, medi) r.OnDataMPEG4Audio(ttrack, func(pts time.Duration, au []byte) { - (*stream).WriteUnit(medi, ctrack, &unit.Unit{ - NTP: time.Now(), + (*strm).WriteUnit(medi, ctrack, &unit.Unit{ PTS: durationToTimestamp(pts, ctrack.ClockRate()), Payload: unit.PayloadMPEG4Audio{au}, }) @@ -136,8 +133,7 @@ func ToStream(r *gortmplib.Reader, stream **stream.Stream) ([]*description.Media medias = append(medias, medi) r.OnDataMPEG1Audio(ttrack, func(pts time.Duration, frame []byte) { - (*stream).WriteUnit(medi, ctrack, &unit.Unit{ - NTP: time.Now(), + (*strm).WriteUnit(medi, ctrack, &unit.Unit{ PTS: durationToTimestamp(pts, ctrack.ClockRate()), Payload: unit.PayloadMPEG1Audio{frame}, }) @@ -151,8 +147,7 @@ func ToStream(r *gortmplib.Reader, stream **stream.Stream) ([]*description.Media medias = append(medias, medi) r.OnDataAC3(ttrack, func(pts time.Duration, frame []byte) { - (*stream).WriteUnit(medi, ctrack, &unit.Unit{ - NTP: time.Now(), + (*strm).WriteUnit(medi, ctrack, &unit.Unit{ PTS: durationToTimestamp(pts, ctrack.ClockRate()), Payload: unit.PayloadAC3{frame}, }) @@ -166,8 +161,7 @@ func ToStream(r *gortmplib.Reader, stream **stream.Stream) ([]*description.Media medias = append(medias, medi) r.OnDataG711(ttrack, func(pts time.Duration, samples []byte) { - (*stream).WriteUnit(medi, ctrack, &unit.Unit{ - NTP: time.Now(), + (*strm).WriteUnit(medi, ctrack, &unit.Unit{ PTS: durationToTimestamp(pts, ctrack.ClockRate()), Payload: unit.PayloadG711(samples), }) @@ -181,8 +175,7 @@ func ToStream(r *gortmplib.Reader, stream **stream.Stream) ([]*description.Media medias = append(medias, medi) r.OnDataLPCM(ttrack, func(pts time.Duration, samples []byte) { - (*stream).WriteUnit(medi, ctrack, &unit.Unit{ - NTP: time.Now(), + (*strm).WriteUnit(medi, ctrack, &unit.Unit{ PTS: durationToTimestamp(pts, ctrack.ClockRate()), Payload: unit.PayloadLPCM(samples), }) diff --git a/internal/protocols/rtsp/to_stream.go b/internal/protocols/rtsp/to_stream.go index 44f7647e..bbf277e8 100644 --- a/internal/protocols/rtsp/to_stream.go +++ b/internal/protocols/rtsp/to_stream.go @@ -49,7 +49,7 @@ func ToStream( handleNTP := func(pkt *rtp.Packet) (time.Time, bool) { switch ntpStat { case ntpStateReplace: - return time.Now(), true + return time.Time{}, true case ntpStateInitial: ntp, avail := source.PacketNTP(cmedi, pkt) diff --git a/internal/protocols/webrtc/from_stream_test.go b/internal/protocols/webrtc/from_stream_test.go index cfb75b6c..c3a4a413 100644 --- a/internal/protocols/webrtc/from_stream_test.go +++ b/internal/protocols/webrtc/from_stream_test.go @@ -182,7 +182,7 @@ func TestFromStreamResampleOpus(t *testing.T) { n := 0 var ts uint32 - tracks[0].OnPacketRTP = func(pkt *rtp.Packet, _ time.Time) { + tracks[0].OnPacketRTP = func(pkt *rtp.Packet) { n++ switch n { diff --git a/internal/protocols/webrtc/incoming_track.go b/internal/protocols/webrtc/incoming_track.go index 9917d21a..45b18f81 100644 --- a/internal/protocols/webrtc/incoming_track.go +++ b/internal/protocols/webrtc/incoming_track.go @@ -237,22 +237,21 @@ var incomingAudioCodecs = []webrtc.RTPCodecParameters{ // IncomingTrack is an incoming track. type IncomingTrack struct { - OnPacketRTP func(*rtp.Packet, time.Time) + OnPacketRTP func(*rtp.Packet) - useAbsoluteTimestamp bool - track *webrtc.TrackRemote - receiver *webrtc.RTPReceiver - writeRTCP func([]rtcp.Packet) error - log logger.Writer - rtpPacketsReceived *uint64 - rtpPacketsLost *uint64 + track *webrtc.TrackRemote + receiver *webrtc.RTPReceiver + writeRTCP func([]rtcp.Packet) error + log logger.Writer + rtpPacketsReceived *uint64 + rtpPacketsLost *uint64 packetsLost *counterdumper.CounterDumper rtcpReceiver *rtpreceiver.Receiver } func (t *IncomingTrack) initialize() { - t.OnPacketRTP = func(*rtp.Packet, time.Time) {} + t.OnPacketRTP = func(*rtp.Packet) {} } // Codec returns the track codec. @@ -361,30 +360,23 @@ func (t *IncomingTrack) start() { atomic.AddUint64(t.rtpPacketsReceived, uint64(len(packets))) - var ntp time.Time - if t.useAbsoluteTimestamp { - var avail bool - ntp, avail = t.rtcpReceiver.PacketNTP(pkt.Timestamp) - if !avail { - t.log.Log(logger.Warn, "received RTP packet without absolute time, skipping it") - continue - } - } else { - ntp = time.Now() - } - for _, pkt := range packets { // sometimes Chrome sends empty RTP packets. ignore them. if len(pkt.Payload) == 0 { continue } - t.OnPacketRTP(pkt, ntp) + t.OnPacketRTP(pkt) } } }() } +// PacketNTP returns the packet NTP. +func (t *IncomingTrack) PacketNTP(pkt *rtp.Packet) (time.Time, bool) { + return t.rtcpReceiver.PacketNTP(pkt.Timestamp) +} + func (t *IncomingTrack) close() { if t.packetsLost != nil { t.packetsLost.Stop() diff --git a/internal/protocols/webrtc/peer_connection.go b/internal/protocols/webrtc/peer_connection.go index 6519acb1..8f5b22b1 100644 --- a/internal/protocols/webrtc/peer_connection.go +++ b/internal/protocols/webrtc/peer_connection.go @@ -144,7 +144,6 @@ type PeerConnection struct { STUNGatherTimeout conf.Duration Publish bool OutgoingTracks []*OutgoingTrack - UseAbsoluteTimestamp bool Log logger.Writer wr *webrtc.PeerConnection @@ -698,13 +697,12 @@ func (co *PeerConnection) GatherIncomingTracks() error { case pair := <-co.incomingTrack: t := &IncomingTrack{ - useAbsoluteTimestamp: co.UseAbsoluteTimestamp, - track: pair.track, - receiver: pair.receiver, - writeRTCP: co.wr.WriteRTCP, - log: co.Log, - rtpPacketsReceived: co.rtpPacketsReceived, - rtpPacketsLost: co.rtpPacketsLost, + track: pair.track, + receiver: pair.receiver, + writeRTCP: co.wr.WriteRTCP, + log: co.Log, + rtpPacketsReceived: co.rtpPacketsReceived, + rtpPacketsLost: co.rtpPacketsLost, } t.initialize() co.incomingTracks = append(co.incomingTracks, t) diff --git a/internal/protocols/webrtc/to_stream.go b/internal/protocols/webrtc/to_stream.go index 3150a8a2..983567be 100644 --- a/internal/protocols/webrtc/to_stream.go +++ b/internal/protocols/webrtc/to_stream.go @@ -9,11 +9,21 @@ import ( "github.com/bluenviron/gortsplib/v5/pkg/description" "github.com/bluenviron/gortsplib/v5/pkg/format" "github.com/bluenviron/gortsplib/v5/pkg/rtptime" + "github.com/bluenviron/mediamtx/internal/conf" + "github.com/bluenviron/mediamtx/internal/logger" "github.com/bluenviron/mediamtx/internal/stream" "github.com/pion/rtp" "github.com/pion/webrtc/v4" ) +type ntpState int + +const ( + ntpStateInitial ntpState = iota + ntpStateReplace + ntpStateAvailable +) + var errNoSupportedCodecsTo = errors.New( "the stream doesn't contain any supported codec, which are currently " + "AV1, VP9, VP8, H265, H264, Opus, G722, G711, LPCM") @@ -21,7 +31,9 @@ var errNoSupportedCodecsTo = errors.New( // ToStream maps a WebRTC connection to a MediaMTX stream. func ToStream( pc *PeerConnection, - stream **stream.Stream, + pathConf *conf.Path, + strm **stream.Stream, + log logger.Writer, ) ([]*description.Media, error) { var medias []*description.Media //nolint:prealloc timeDecoder := &rtptime.GlobalDecoder{} @@ -142,13 +154,49 @@ func ToStream( Formats: []format.Format{forma}, } - track.OnPacketRTP = func(pkt *rtp.Packet, ntp time.Time) { + var ntpStat ntpState + + if !pathConf.UseAbsoluteTimestamp { + ntpStat = ntpStateReplace + } + + handleNTP := func(pkt *rtp.Packet) (time.Time, bool) { + switch ntpStat { + case ntpStateReplace: + return time.Time{}, true + + case ntpStateInitial: + ntp, avail := track.PacketNTP(pkt) + if !avail { + log.Log(logger.Warn, "received RTP packet without absolute time, skipping it") + return time.Time{}, false + } + + ntpStat = ntpStateAvailable + return ntp, true + + default: // ntpStateAvailable + ntp, avail := track.PacketNTP(pkt) + if !avail { + panic("should not happen") + } + + return ntp, true + } + } + + track.OnPacketRTP = func(pkt *rtp.Packet) { pts, ok := timeDecoder.Decode(track, pkt) if !ok { return } - (*stream).WriteRTPPacket(medi, forma, pkt, ntp, pts) + ntp, ok := handleNTP(pkt) + if !ok { + return + } + + (*strm).WriteRTPPacket(medi, forma, pkt, ntp, pts) } medias = append(medias, medi) diff --git a/internal/protocols/webrtc/to_stream_test.go b/internal/protocols/webrtc/to_stream_test.go index 66767295..afa5437d 100644 --- a/internal/protocols/webrtc/to_stream_test.go +++ b/internal/protocols/webrtc/to_stream_test.go @@ -15,7 +15,7 @@ import ( func TestToStreamNoSupportedCodecs(t *testing.T) { pc := &PeerConnection{} - _, err := ToStream(pc, nil) + _, err := ToStream(pc, &conf.Path{}, nil, nil) require.Equal(t, errNoSupportedCodecsTo, err) } @@ -406,7 +406,7 @@ func TestToStream(t *testing.T) { require.NoError(t, err) var stream *stream.Stream - medias, err := ToStream(pc2, &stream) + medias, err := ToStream(pc2, &conf.Path{}, &stream, nil) require.NoError(t, err) require.Equal(t, ca.out, medias[0].Formats[0]) }) diff --git a/internal/protocols/whip/client.go b/internal/protocols/whip/client.go index 03057a64..6d6c92db 100644 --- a/internal/protocols/whip/client.go +++ b/internal/protocols/whip/client.go @@ -26,12 +26,11 @@ const ( // Client is a WHIP client. type Client struct { - URL *url.URL - Publish bool - OutgoingTracks []*webrtc.OutgoingTrack - HTTPClient *http.Client - UseAbsoluteTimestamp bool - Log logger.Writer + URL *url.URL + Publish bool + OutgoingTracks []*webrtc.OutgoingTrack + HTTPClient *http.Client + Log logger.Writer pc *webrtc.PeerConnection patchIsSupported bool @@ -45,15 +44,14 @@ func (c *Client) Initialize(ctx context.Context) error { } c.pc = &webrtc.PeerConnection{ - LocalRandomUDP: true, - ICEServers: iceServers, - IPsFromInterfaces: true, - HandshakeTimeout: conf.Duration(10 * time.Second), - TrackGatherTimeout: conf.Duration(2 * time.Second), - Publish: c.Publish, - OutgoingTracks: c.OutgoingTracks, - UseAbsoluteTimestamp: c.UseAbsoluteTimestamp, - Log: c.Log, + LocalRandomUDP: true, + ICEServers: iceServers, + IPsFromInterfaces: true, + HandshakeTimeout: conf.Duration(10 * time.Second), + TrackGatherTimeout: conf.Duration(2 * time.Second), + Publish: c.Publish, + OutgoingTracks: c.OutgoingTracks, + Log: c.Log, } err = c.pc.Start() if err != nil { diff --git a/internal/protocols/whip/client_test.go b/internal/protocols/whip/client_test.go index 24a9f3d8..f7dd409a 100644 --- a/internal/protocols/whip/client_test.go +++ b/internal/protocols/whip/client_test.go @@ -225,7 +225,7 @@ func TestClientRead(t *testing.T) { for i, track := range cl.IncomingTracks() { ci := i - track.OnPacketRTP = func(_ *rtp.Packet, _ time.Time) { + track.OnPacketRTP = func(_ *rtp.Packet) { close(recv[ci]) } } @@ -340,7 +340,7 @@ func TestClientPublish(t *testing.T) { for i, track := range pc.IncomingTracks() { ci := i - track.OnPacketRTP = func(_ *rtp.Packet, _ time.Time) { + track.OnPacketRTP = func(_ *rtp.Packet) { close(recv[ci]) } } diff --git a/internal/servers/rtmp/conn.go b/internal/servers/rtmp/conn.go index c1083fa4..1d2609c9 100644 --- a/internal/servers/rtmp/conn.go +++ b/internal/servers/rtmp/conn.go @@ -242,6 +242,7 @@ func (c *conn) runPublish() error { Author: c, Desc: &description.Session{Medias: medias}, GenerateRTPPackets: true, + FillNTP: true, AccessRequest: defs.PathAccessRequest{ Name: pathName, Query: c.rconn.URL.RawQuery, diff --git a/internal/servers/rtsp/session.go b/internal/servers/rtsp/session.go index 47b2f83a..7e4100ea 100644 --- a/internal/servers/rtsp/session.go +++ b/internal/servers/rtsp/session.go @@ -311,6 +311,7 @@ func (s *session) onRecord(_ *gortsplib.ServerHandlerOnRecordCtx) (*base.Respons Author: s, Desc: s.rsession.AnnouncedDescription(), GenerateRTPPackets: false, + FillNTP: !s.pathConf.UseAbsoluteTimestamp, ConfToCompare: s.pathConf, AccessRequest: defs.PathAccessRequest{ Name: s.rsession.Path()[1:], diff --git a/internal/servers/srt/conn.go b/internal/servers/srt/conn.go index 50f36a5f..9ae931da 100644 --- a/internal/servers/srt/conn.go +++ b/internal/servers/srt/conn.go @@ -225,6 +225,7 @@ func (c *conn) runPublishReader(sconn srt.Conn, streamID *streamID, pathConf *co Author: c, Desc: &description.Session{Medias: medias}, GenerateRTPPackets: true, + FillNTP: true, ConfToCompare: pathConf, AccessRequest: defs.PathAccessRequest{ Name: streamID.path, diff --git a/internal/servers/webrtc/server_test.go b/internal/servers/webrtc/server_test.go index 05119140..aae15d2c 100644 --- a/internal/servers/webrtc/server_test.go +++ b/internal/servers/webrtc/server_test.go @@ -567,7 +567,7 @@ func TestServerRead(t *testing.T) { done := make(chan struct{}) - wc.IncomingTracks()[0].OnPacketRTP = func(pkt *rtp.Packet, _ time.Time) { + wc.IncomingTracks()[0].OnPacketRTP = func(pkt *rtp.Packet) { select { case <-done: default: diff --git a/internal/servers/webrtc/session.go b/internal/servers/webrtc/session.go index 2d8ada23..07c3210d 100644 --- a/internal/servers/webrtc/session.go +++ b/internal/servers/webrtc/session.go @@ -168,7 +168,6 @@ func (s *session) runPublish() (int, error) { TrackGatherTimeout: s.trackGatherTimeout, STUNGatherTimeout: s.stunGatherTimeout, Publish: false, - UseAbsoluteTimestamp: pathConf.UseAbsoluteTimestamp, Log: s, } err = pc.Start() @@ -234,7 +233,7 @@ func (s *session) runPublish() (int, error) { var stream *stream.Stream - medias, err := webrtc.ToStream(pc, &stream) + medias, err := webrtc.ToStream(pc, pathConf, &stream, s) if err != nil { return 0, err } @@ -244,6 +243,7 @@ func (s *session) runPublish() (int, error) { Author: s, Desc: &description.Session{Medias: medias}, GenerateRTPPackets: false, + FillNTP: !pathConf.UseAbsoluteTimestamp, ConfToCompare: pathConf, AccessRequest: defs.PathAccessRequest{ Name: s.req.pathName, @@ -312,7 +312,6 @@ func (s *session) runRead() (int, error) { TrackGatherTimeout: s.trackGatherTimeout, STUNGatherTimeout: s.stunGatherTimeout, Publish: true, - UseAbsoluteTimestamp: path.SafeConf().UseAbsoluteTimestamp, Log: s, } diff --git a/internal/staticsources/mpegts/source.go b/internal/staticsources/mpegts/source.go index b2912bf8..95b4995b 100644 --- a/internal/staticsources/mpegts/source.go +++ b/internal/staticsources/mpegts/source.go @@ -120,6 +120,7 @@ func (s *Source) runReader(nc net.Conn) error { res := s.Parent.SetReady(defs.PathSourceStaticSetReadyReq{ Desc: &description.Session{Medias: medias}, GenerateRTPPackets: true, + FillNTP: true, }) if res.Err != nil { return res.Err diff --git a/internal/staticsources/rtmp/source.go b/internal/staticsources/rtmp/source.go index 731deb48..90bd03fe 100644 --- a/internal/staticsources/rtmp/source.go +++ b/internal/staticsources/rtmp/source.go @@ -117,6 +117,7 @@ func (s *Source) runReader(ctx context.Context, u *url.URL, fingerprint string) res := s.Parent.SetReady(defs.PathSourceStaticSetReadyReq{ Desc: &description.Session{Medias: medias}, GenerateRTPPackets: true, + FillNTP: true, }) if res.Err != nil { conn.Close() diff --git a/internal/staticsources/rtp/source.go b/internal/staticsources/rtp/source.go index 6c08aabd..fada03c8 100644 --- a/internal/staticsources/rtp/source.go +++ b/internal/staticsources/rtp/source.go @@ -149,6 +149,7 @@ func (s *Source) runReader(desc *description.Session, nc net.Conn) error { res := s.Parent.SetReady(defs.PathSourceStaticSetReadyReq{ Desc: desc, GenerateRTPPackets: false, + FillNTP: true, }) if res.Err != nil { return res.Err @@ -171,7 +172,7 @@ func (s *Source) runReader(desc *description.Session, nc net.Conn) error { continue } - stream.WriteRTPPacket(media, forma, &pkt, time.Now(), pts) + stream.WriteRTPPacket(media, forma, &pkt, time.Time{}, pts) } } diff --git a/internal/staticsources/rtsp/source.go b/internal/staticsources/rtsp/source.go index 36036f08..4e0091a3 100644 --- a/internal/staticsources/rtsp/source.go +++ b/internal/staticsources/rtsp/source.go @@ -195,6 +195,7 @@ func (s *Source) Run(params defs.StaticSourceRunParams) error { res := s.Parent.SetReady(defs.PathSourceStaticSetReadyReq{ Desc: desc, GenerateRTPPackets: false, + FillNTP: !params.Conf.UseAbsoluteTimestamp, }) if res.Err != nil { return res.Err diff --git a/internal/staticsources/srt/source.go b/internal/staticsources/srt/source.go index c8bdb031..12d1435b 100644 --- a/internal/staticsources/srt/source.go +++ b/internal/staticsources/srt/source.go @@ -111,6 +111,7 @@ func (s *Source) runReader(sconn srt.Conn) error { res := s.Parent.SetReady(defs.PathSourceStaticSetReadyReq{ Desc: &description.Session{Medias: medias}, GenerateRTPPackets: true, + FillNTP: true, }) if res.Err != nil { return res.Err diff --git a/internal/staticsources/webrtc/source.go b/internal/staticsources/webrtc/source.go index dfa3211b..3182005c 100644 --- a/internal/staticsources/webrtc/source.go +++ b/internal/staticsources/webrtc/source.go @@ -58,8 +58,7 @@ func (s *Source) Run(params defs.StaticSourceRunParams) error { Timeout: time.Duration(s.ReadTimeout), Transport: tr, }, - UseAbsoluteTimestamp: params.Conf.UseAbsoluteTimestamp, - Log: s, + Log: s, } err = client.Initialize(params.Context) if err != nil { @@ -68,7 +67,7 @@ func (s *Source) Run(params defs.StaticSourceRunParams) error { var stream *stream.Stream - medias, err := webrtc.ToStream(client.PeerConnection(), &stream) + medias, err := webrtc.ToStream(client.PeerConnection(), params.Conf, &stream, s) if err != nil { client.Close() //nolint:errcheck return err @@ -77,6 +76,7 @@ func (s *Source) Run(params defs.StaticSourceRunParams) error { rres := s.Parent.SetReady(defs.PathSourceStaticSetReadyReq{ Desc: &description.Session{Medias: medias}, GenerateRTPPackets: true, + FillNTP: !params.Conf.UseAbsoluteTimestamp, }) if rres.Err != nil { client.Close() //nolint:errcheck diff --git a/internal/stream/stream.go b/internal/stream/stream.go index 34a0e3be..d066cfea 100644 --- a/internal/stream/stream.go +++ b/internal/stream/stream.go @@ -23,6 +23,7 @@ type Stream struct { RTPMaxPayloadSize int Desc *description.Session GenerateRTPPackets bool + FillNTP bool Parent logger.Writer bytesReceived *uint64 @@ -61,6 +62,7 @@ func (s *Stream) Initialize() error { rtpMaxPayloadSize: s.RTPMaxPayloadSize, media: media, generateRTPPackets: s.GenerateRTPPackets, + fillNTP: s.FillNTP, processingErrors: s.processingErrors, parent: s.Parent, } diff --git a/internal/stream/stream_format.go b/internal/stream/stream_format.go index bf41dae8..30ad5296 100644 --- a/internal/stream/stream_format.go +++ b/internal/stream/stream_format.go @@ -11,6 +11,7 @@ import ( "github.com/bluenviron/mediamtx/internal/codecprocessor" "github.com/bluenviron/mediamtx/internal/counterdumper" "github.com/bluenviron/mediamtx/internal/logger" + "github.com/bluenviron/mediamtx/internal/ntpestimator" "github.com/bluenviron/mediamtx/internal/unit" ) @@ -26,11 +27,13 @@ type streamFormat struct { rtpMaxPayloadSize int format format.Format generateRTPPackets bool + fillNTP bool processingErrors *counterdumper.CounterDumper parent logger.Writer - proc codecprocessor.Processor - onDatas map[*Reader]OnDataFunc + proc codecprocessor.Processor + ntpEstimator *ntpestimator.Estimator + onDatas map[*Reader]OnDataFunc } func (sf *streamFormat) initialize() error { @@ -42,6 +45,10 @@ func (sf *streamFormat) initialize() error { return err } + sf.ntpEstimator = &ntpestimator.Estimator{ + ClockRate: sf.format.ClockRate(), + } + return nil } @@ -80,6 +87,10 @@ func (sf *streamFormat) writeRTPPacket( } func (sf *streamFormat) writeUnitInner(s *Stream, medi *description.Media, u *unit.Unit) { + if sf.fillNTP { + u.NTP = sf.ntpEstimator.Estimate(u.PTS) + } + size := unitSize(u) atomic.AddUint64(s.bytesReceived, size) diff --git a/internal/stream/stream_media.go b/internal/stream/stream_media.go index b1d1e183..a0037c05 100644 --- a/internal/stream/stream_media.go +++ b/internal/stream/stream_media.go @@ -11,6 +11,7 @@ type streamMedia struct { rtpMaxPayloadSize int media *description.Media generateRTPPackets bool + fillNTP bool processingErrors *counterdumper.CounterDumper parent logger.Writer @@ -25,6 +26,7 @@ func (sm *streamMedia) initialize() error { rtpMaxPayloadSize: sm.rtpMaxPayloadSize, format: forma, generateRTPPackets: sm.generateRTPPackets, + fillNTP: sm.fillNTP, processingErrors: sm.processingErrors, parent: sm.parent, }