diff --git a/docs/2-usage/14-route-absolute-timestamps.md b/docs/2-usage/14-route-absolute-timestamps.md index adaca45f..1021b87b 100644 --- a/docs/2-usage/14-route-absolute-timestamps.md +++ b/docs/2-usage/14-route-absolute-timestamps.md @@ -13,7 +13,7 @@ and supports sending absolute timestamps with the following protocols: - RTSP - WebRTC -By default, absolute timestamps of incoming frames are not used, instead they are replaced with the current timestamp. This prevents users from arbitrarily changing recording dates, and also allows to support sources that do not send absolute timestamps. It is possible to preserve original absolute timestamps by toggling the `useAbsoluteTimestamp` parameter: +By default, absolute timestamps of incoming frames are not used, instead they are replaced with the system timestamp. This prevents users from arbitrarily changing recording dates, and also allows to support sources that do not send absolute timestamps. It is possible to preserve original absolute timestamps by toggling the `useAbsoluteTimestamp` parameter: ```yml pathDefaults: diff --git a/go.mod b/go.mod index 2966fd5f..ccadaaa1 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,7 @@ require ( github.com/bluenviron/gohlslib/v2 v2.2.4 github.com/bluenviron/gortmplib v0.1.1 github.com/bluenviron/gortsplib/v5 v5.2.0 - github.com/bluenviron/mediacommon/v2 v2.5.1 + github.com/bluenviron/mediacommon/v2 v2.5.2-0.20251201152746-8d059e8616fb github.com/datarhei/gosrt v0.9.0 github.com/fsnotify/fsnotify v1.9.0 github.com/gin-contrib/pprof v1.5.3 diff --git a/go.sum b/go.sum index 7b47f3e8..b37b2dbd 100644 --- a/go.sum +++ b/go.sum @@ -39,8 +39,8 @@ github.com/bluenviron/gortmplib v0.1.1 h1:pmR6qfPcJJmE17lWQ/bpuBFZtgGnMrN8KdFj1G github.com/bluenviron/gortmplib v0.1.1/go.mod h1:XWy2YzbTP1XEEZ8232OG7I1MSwubsbDRKDNhXGgS2kg= github.com/bluenviron/gortsplib/v5 v5.2.0 h1:yk0H9Z1Z+H41/x5hDt84rKm6+MNA483NsRXPYe+or/A= github.com/bluenviron/gortsplib/v5 v5.2.0/go.mod h1:UYCbHEb0T49kBDgIlTJaZOchD2f5g1JigFmmxQfW7vY= -github.com/bluenviron/mediacommon/v2 v2.5.1 h1:qB2fb5c0xyl5OB2gfSfulpEJn7Cdm3vI2n8wjiLMxKI= -github.com/bluenviron/mediacommon/v2 v2.5.1/go.mod h1:zy1fODPuS/kBd93ftgJS1Jhvjq7LFWfAo32KP7By9AE= +github.com/bluenviron/mediacommon/v2 v2.5.2-0.20251201152746-8d059e8616fb h1:42lRaSsrPXvwB9kLGIujU9yONrSPPp0j4Ohwg6zp/yw= +github.com/bluenviron/mediacommon/v2 v2.5.2-0.20251201152746-8d059e8616fb/go.mod h1:5V15TiOfeaNVmZPVuOqAwqQSWyvMV86/dijDKu5q9Zs= github.com/bytedance/sonic v1.14.0 h1:/OfKt8HFw0kh2rj8N0F6C/qPGRESq0BbaNZgcNXXzQQ= github.com/bytedance/sonic v1.14.0/go.mod h1:WoEbx8WTcFJfzCe0hbmyTGrfjt8PzNEBdxlNUO24NhA= github.com/bytedance/sonic/loader v0.3.0 h1:dskwH8edlzNMctoruo8FPTJDF3vLtDT0sXZwvZJyqeA= diff --git a/internal/ntpestimator/estimator.go b/internal/ntpestimator/estimator.go index ea4fb82f..6a6c5c5c 100644 --- a/internal/ntpestimator/estimator.go +++ b/internal/ntpestimator/estimator.go @@ -31,6 +31,10 @@ var zero = time.Time{} func (e *Estimator) Estimate(pts int64) time.Time { now := timeNow() + // do not store monotonic clock, in order to include + // system clock changes into time differences + now = now.Round(0) + if e.refNTP.Equal(zero) { e.refNTP = now e.refPTS = pts diff --git a/internal/recorder/format_fmp4.go b/internal/recorder/format_fmp4.go index 2d954287..b21c5bda 100644 --- a/internal/recorder/format_fmp4.go +++ b/internal/recorder/format_fmp4.go @@ -98,6 +98,12 @@ func jpegExtractSize(image []byte) (int, int, error) { } } +type formatFMP4Sample struct { + *fmp4.Sample + dts int64 + ntp time.Time +} + type formatFMP4 struct { ri *recorderInstance @@ -111,18 +117,15 @@ func (f *formatFMP4) initialize() bool { nextID := 1 addTrack := func(format rtspformat.Format, codec mp4.Codec) *formatFMP4Track { - initTrack := &fmp4.InitTrack{ - TimeScale: uint32(format.ClockRate()), - Codec: codec, - } - initTrack.ID = nextID - nextID++ - track := &formatFMP4Track{ f: f, - initTrack: initTrack, + id: nextID, + clockRate: uint32(format.ClockRate()), + codec: codec, } + track.initialize() + nextID++ f.tracks = append(f.tracks, track) return track } @@ -180,7 +183,7 @@ func (f *formatFMP4) initialize() bool { return err } - return track.write(&sample{ + return track.write(&formatFMP4Sample{ Sample: &sampl, dts: u.PTS, ntp: u.NTP, @@ -257,7 +260,7 @@ func (f *formatFMP4) initialize() bool { firstReceived = true } - return track.write(&sample{ + return track.write(&formatFMP4Sample{ Sample: &fmp4.Sample{ IsNonSyncSample: !randomAccess, Payload: u.Payload.(unit.PayloadVP9), @@ -348,7 +351,7 @@ func (f *formatFMP4) initialize() bool { return err } - return track.write(&sample{ + return track.write(&formatFMP4Sample{ Sample: &sampl, dts: dts, ntp: u.NTP, @@ -424,7 +427,7 @@ func (f *formatFMP4) initialize() bool { return err } - return track.write(&sample{ + return track.write(&formatFMP4Sample{ Sample: &sampl, dts: dts, ntp: u.NTP, @@ -481,7 +484,7 @@ func (f *formatFMP4) initialize() bool { } lastPTS = u.PTS - return track.write(&sample{ + return track.write(&formatFMP4Sample{ Sample: &fmp4.Sample{ Payload: u.Payload.(unit.PayloadMPEG4Video), IsNonSyncSample: !randomAccess, @@ -532,7 +535,7 @@ func (f *formatFMP4) initialize() bool { } lastPTS = u.PTS - return track.write(&sample{ + return track.write(&formatFMP4Sample{ Sample: &fmp4.Sample{ Payload: u.Payload.(unit.PayloadMPEG1Video), IsNonSyncSample: !randomAccess, @@ -570,7 +573,7 @@ func (f *formatFMP4) initialize() bool { f.updateCodecParams() } - return track.write(&sample{ + return track.write(&formatFMP4Sample{ Sample: &fmp4.Sample{ Payload: u.Payload.(unit.PayloadMJPEG), }, @@ -596,7 +599,7 @@ func (f *formatFMP4) initialize() bool { pts := u.PTS for _, packet := range u.Payload.(unit.PayloadOpus) { - err := track.write(&sample{ + err := track.write(&formatFMP4Sample{ Sample: &fmp4.Sample{ Payload: packet, }, @@ -630,7 +633,7 @@ func (f *formatFMP4) initialize() bool { for i, au := range u.Payload.(unit.PayloadMPEG4Audio) { pts := u.PTS + int64(i)*mpeg4audio.SamplesPerAccessUnit - err := track.write(&sample{ + err := track.write(&formatFMP4Sample{ Sample: &fmp4.Sample{ Payload: au, }, @@ -667,7 +670,7 @@ func (f *formatFMP4) initialize() bool { return err } - return track.write(&sample{ + return track.write(&formatFMP4Sample{ Sample: &fmp4.Sample{ Payload: ame.Payloads[0][0][0], }, @@ -710,7 +713,7 @@ func (f *formatFMP4) initialize() bool { f.updateCodecParams() } - err = track.write(&sample{ + err = track.write(&formatFMP4Sample{ Sample: &fmp4.Sample{ Payload: frame, }, @@ -779,7 +782,7 @@ func (f *formatFMP4) initialize() bool { pts := u.PTS + int64(i)*ac3.SamplesPerFrame - err = track.write(&sample{ + err = track.write(&formatFMP4Sample{ Sample: &fmp4.Sample{ Payload: frame, }, @@ -825,7 +828,7 @@ func (f *formatFMP4) initialize() bool { lpcm = al } - return track.write(&sample{ + return track.write(&formatFMP4Sample{ Sample: &fmp4.Sample{ Payload: lpcm, }, @@ -851,7 +854,7 @@ func (f *formatFMP4) initialize() bool { return nil } - return track.write(&sample{ + return track.write(&formatFMP4Sample{ Sample: &fmp4.Sample{ Payload: u.Payload.(unit.PayloadLPCM), }, diff --git a/internal/recorder/format_fmp4_part.go b/internal/recorder/format_fmp4_part.go index f0ca6d70..1746f0f2 100644 --- a/internal/recorder/format_fmp4_part.go +++ b/internal/recorder/format_fmp4_part.go @@ -56,7 +56,7 @@ func (p *formatFMP4Part) close(w io.Writer) error { return writePart(w, p.number, p.partTracks) } -func (p *formatFMP4Part) write(track *formatFMP4Track, sample *sample, dts time.Duration) error { +func (p *formatFMP4Part) write(track *formatFMP4Track, sample *formatFMP4Sample, dts time.Duration) error { size := uint64(len(sample.Payload)) if (p.size + size) > uint64(p.maxPartSize) { return fmt.Errorf("reached maximum part size") diff --git a/internal/recorder/format_fmp4_segment.go b/internal/recorder/format_fmp4_segment.go index eb9bf1f8..b255f305 100644 --- a/internal/recorder/format_fmp4_segment.go +++ b/internal/recorder/format_fmp4_segment.go @@ -201,7 +201,7 @@ func (s *formatFMP4Segment) closeCurPart() error { return s.curPart.close(s.fi) } -func (s *formatFMP4Segment) write(track *formatFMP4Track, sample *sample, dts time.Duration) error { +func (s *formatFMP4Segment) write(track *formatFMP4Track, sample *formatFMP4Sample, dts time.Duration) error { endDTS := dts + timestampToDuration(int64(sample.Duration), int(track.initTrack.TimeScale)) if endDTS > s.endDTS { s.endDTS = endDTS diff --git a/internal/recorder/format_fmp4_track.go b/internal/recorder/format_fmp4_track.go index b266c6ee..3100a29f 100644 --- a/internal/recorder/format_fmp4_track.go +++ b/internal/recorder/format_fmp4_track.go @@ -1,9 +1,11 @@ package recorder import ( + "fmt" "time" "github.com/bluenviron/mediacommon/v2/pkg/formats/fmp4" + "github.com/bluenviron/mediacommon/v2/pkg/formats/mp4" "github.com/bluenviron/mediamtx/internal/logger" ) @@ -43,12 +45,26 @@ func nextSegmentStartingPos(tracks []*formatFMP4Track) (time.Time, time.Duration type formatFMP4Track struct { f *formatFMP4 - initTrack *fmp4.InitTrack + id int + clockRate uint32 + codec mp4.Codec - nextSample *sample + initTrack *fmp4.InitTrack + nextSample *formatFMP4Sample + startInitialized bool + startDTS time.Duration + startNTP time.Time } -func (t *formatFMP4Track) write(sample *sample) error { +func (t *formatFMP4Track) initialize() { + t.initTrack = &fmp4.InitTrack{ + ID: t.id, + TimeScale: t.clockRate, + Codec: t.codec, + } +} + +func (t *formatFMP4Track) write(sample *formatFMP4Sample) error { // wait the first video sample before setting hasVideo if t.initTrack.Codec.IsVideo() { t.f.hasVideo = true @@ -69,6 +85,17 @@ func (t *formatFMP4Track) write(sample *sample) error { dts := timestampToDuration(sample.dts, int(t.initTrack.TimeScale)) + if !t.startInitialized { + t.startDTS = dts + t.startNTP = sample.ntp + t.startInitialized = true + } else { + drift := sample.ntp.Sub(t.startNTP) - (dts - t.startDTS) + if drift < -ntpDriftTolerance || drift > ntpDriftTolerance { + return fmt.Errorf("detected drift between recording duration and absolute time, resetting") + } + } + if t.f.currentSegment == nil { t.f.currentSegment = &formatFMP4Segment{ f: t.f, diff --git a/internal/recorder/format_mpegts.go b/internal/recorder/format_mpegts.go index c2b36dc0..12521c4f 100644 --- a/internal/recorder/format_mpegts.go +++ b/internal/recorder/format_mpegts.go @@ -66,11 +66,14 @@ type formatMPEGTS struct { func (f *formatMPEGTS) initialize() bool { var tracks []*mpegts.Track - addTrack := func(codec mpegts.Codec) *mpegts.Track { - track := &mpegts.Track{ - Codec: codec, + addTrack := func(codec mpegts.Codec) *formatMPEGTSTrack { + track := &formatMPEGTSTrack{ + f: f, + codec: codec, } - tracks = append(tracks, track) + track.initialize() + + tracks = append(tracks, track.track) return track } @@ -107,14 +110,13 @@ func (f *formatMPEGTS) initialize() bool { return err } - return f.write( + return track.write( timestampToDuration(dts, clockRate), u.NTP, - true, randomAccess, - func() error { + func(mtrack *mpegts.Track) error { return f.mw.WriteH265( - track, + mtrack, u.PTS, // no conversion is needed since clock rate is 90khz in both MPEG-TS and RTSP dts, u.Payload.(unit.PayloadH265)) @@ -150,14 +152,13 @@ func (f *formatMPEGTS) initialize() bool { return err } - return f.write( + return track.write( timestampToDuration(dts, clockRate), u.NTP, - true, randomAccess, - func() error { + func(mtrack *mpegts.Track) error { return f.mw.WriteH264( - track, + mtrack, u.PTS, // no conversion is needed since clock rate is 90khz in both MPEG-TS and RTSP dts, u.Payload.(unit.PayloadH264)) @@ -189,14 +190,13 @@ func (f *formatMPEGTS) initialize() bool { randomAccess := bytes.Contains(u.Payload.(unit.PayloadMPEG4Video), []byte{0, 0, 1, byte(mpeg4video.GroupOfVOPStartCode)}) - return f.write( + return track.write( timestampToDuration(u.PTS, clockRate), u.NTP, - true, randomAccess, - func() error { + func(mtrack *mpegts.Track) error { return f.mw.WriteMPEG4Video( - track, + mtrack, u.PTS, // no conversion is needed since clock rate is 90khz in both MPEG-TS and RTSP u.Payload.(unit.PayloadMPEG4Video)) }, @@ -226,14 +226,13 @@ func (f *formatMPEGTS) initialize() bool { randomAccess := bytes.Contains(u.Payload.(unit.PayloadMPEG1Video), []byte{0, 0, 1, 0xB8}) - return f.write( + return track.write( timestampToDuration(u.PTS, clockRate), u.NTP, - true, randomAccess, - func() error { + func(mtrack *mpegts.Track) error { return f.mw.WriteMPEG1Video( - track, + mtrack, u.PTS, // no conversion is needed since clock rate is 90khz in both MPEG-TS and RTSP u.Payload.(unit.PayloadMPEG1Video)) }, @@ -253,14 +252,13 @@ func (f *formatMPEGTS) initialize() bool { return nil } - return f.write( + return track.write( timestampToDuration(u.PTS, clockRate), u.NTP, - false, true, - func() error { + func(mtrack *mpegts.Track) error { return f.mw.WriteOpus( - track, + mtrack, multiplyAndDivide(u.PTS, 90000, int64(clockRate)), u.Payload.(unit.PayloadOpus)) }, @@ -280,13 +278,15 @@ func (f *formatMPEGTS) initialize() bool { return nil } - return f.write( + return track.write( timestampToDuration(u.PTS, 90000), u.NTP, - false, true, - func() error { - return f.mw.WriteKLV(track, multiplyAndDivide(u.PTS, 90000, 90000), u.Payload.(unit.PayloadKLV)) + func(mtrack *mpegts.Track) error { + return f.mw.WriteKLV( + mtrack, + multiplyAndDivide(u.PTS, 90000, 90000), + u.Payload.(unit.PayloadKLV)) }, ) }) @@ -304,14 +304,13 @@ func (f *formatMPEGTS) initialize() bool { return nil } - return f.write( + return track.write( timestampToDuration(u.PTS, clockRate), u.NTP, - false, true, - func() error { + func(mtrack *mpegts.Track) error { return f.mw.WriteMPEG4Audio( - track, + mtrack, multiplyAndDivide(u.PTS, 90000, int64(clockRate)), u.Payload.(unit.PayloadMPEG4Audio)) }, @@ -339,14 +338,13 @@ func (f *formatMPEGTS) initialize() bool { return err } - return f.write( + return track.write( timestampToDuration(u.PTS, clockRate), u.NTP, - false, true, - func() error { + func(mtrack *mpegts.Track) error { return f.mw.WriteMPEG4Audio( - track, + mtrack, multiplyAndDivide(u.PTS, 90000, int64(clockRate)), [][]byte{ame.Payloads[0][0][0]}) }, @@ -365,14 +363,13 @@ func (f *formatMPEGTS) initialize() bool { return nil } - return f.write( + return track.write( timestampToDuration(u.PTS, clockRate), u.NTP, - false, true, - func() error { + func(mtrack *mpegts.Track) error { return f.mw.WriteMPEG1Audio( - track, + mtrack, u.PTS, // no conversion is needed since clock rate is 90khz in both MPEG-TS and RTSP u.Payload.(unit.PayloadMPEG1Audio)) }, @@ -390,17 +387,16 @@ func (f *formatMPEGTS) initialize() bool { return nil } - return f.write( + return track.write( timestampToDuration(u.PTS, clockRate), u.NTP, - false, true, - func() error { + func(mtrack *mpegts.Track) error { for i, frame := range u.Payload.(unit.PayloadAC3) { framePTS := u.PTS + int64(i)*ac3.SamplesPerFrame err := f.mw.WriteAC3( - track, + mtrack, multiplyAndDivide(framePTS, 90000, int64(clockRate)), frame) if err != nil { @@ -453,62 +449,3 @@ func (f *formatMPEGTS) close() { f.currentSegment.close() //nolint:errcheck } } - -func (f *formatMPEGTS) write( - dts time.Duration, - ntp time.Time, - isVideo bool, - randomAccess bool, - writeCB func() error, -) error { - if isVideo { - f.hasVideo = true - } - - switch { - case f.currentSegment == nil: - f.currentSegment = &formatMPEGTSSegment{ - pathFormat2: f.ri.pathFormat2, - flush: f.bw.Flush, - onSegmentCreate: f.ri.onSegmentCreate, - onSegmentComplete: f.ri.onSegmentComplete, - startDTS: dts, - startNTP: ntp, - log: f.ri, - } - f.currentSegment.initialize() - f.dw.setTarget(f.currentSegment) - case (!f.hasVideo || isVideo) && - randomAccess && - (dts-f.currentSegment.startDTS) >= f.ri.segmentDuration: - f.currentSegment.lastDTS = dts - err := f.currentSegment.close() - if err != nil { - return err - } - - f.currentSegment = &formatMPEGTSSegment{ - pathFormat2: f.ri.pathFormat2, - flush: f.bw.Flush, - onSegmentCreate: f.ri.onSegmentCreate, - onSegmentComplete: f.ri.onSegmentComplete, - startDTS: dts, - startNTP: ntp, - log: f.ri, - } - f.currentSegment.initialize() - f.dw.setTarget(f.currentSegment) - - case (dts - f.currentSegment.lastFlush) >= f.ri.partDuration: - err := f.bw.Flush() - if err != nil { - return err - } - - f.currentSegment.lastFlush = dts - } - - f.currentSegment.lastDTS = dts - - return writeCB() -} diff --git a/internal/recorder/format_mpegts_track.go b/internal/recorder/format_mpegts_track.go new file mode 100644 index 00000000..641ea3dc --- /dev/null +++ b/internal/recorder/format_mpegts_track.go @@ -0,0 +1,96 @@ +package recorder + +import ( + "fmt" + "time" + + "github.com/bluenviron/mediacommon/v2/pkg/formats/mpegts" +) + +type formatMPEGTSTrack struct { + f *formatMPEGTS + codec mpegts.Codec + + track *mpegts.Track + startInitialized bool + startDTS time.Duration + startNTP time.Time +} + +func (t *formatMPEGTSTrack) initialize() { + t.track = &mpegts.Track{ + Codec: t.codec, + } +} + +func (t *formatMPEGTSTrack) write( + dts time.Duration, + ntp time.Time, + randomAccess bool, + cb func(track *mpegts.Track) error, +) error { + isVideo := t.track.Codec.IsVideo() + + if isVideo { + t.f.hasVideo = true + } + + if !t.startInitialized { + t.startDTS = dts + t.startNTP = ntp + t.startInitialized = true + } else { + drift := ntp.Sub(t.startNTP) - (dts - t.startDTS) + if drift < -ntpDriftTolerance || drift > ntpDriftTolerance { + return fmt.Errorf("detected drift between recording duration and absolute time, resetting") + } + } + + switch { + case t.f.currentSegment == nil: + t.f.currentSegment = &formatMPEGTSSegment{ + pathFormat2: t.f.ri.pathFormat2, + flush: t.f.bw.Flush, + onSegmentCreate: t.f.ri.onSegmentCreate, + onSegmentComplete: t.f.ri.onSegmentComplete, + startDTS: dts, + startNTP: ntp, + log: t.f.ri, + } + t.f.currentSegment.initialize() + t.f.dw.setTarget(t.f.currentSegment) + + case (!t.f.hasVideo || isVideo) && + randomAccess && + (dts-t.f.currentSegment.startDTS) >= t.f.ri.segmentDuration: + t.f.currentSegment.lastDTS = dts + err := t.f.currentSegment.close() + if err != nil { + return err + } + + t.f.currentSegment = &formatMPEGTSSegment{ + pathFormat2: t.f.ri.pathFormat2, + flush: t.f.bw.Flush, + onSegmentCreate: t.f.ri.onSegmentCreate, + onSegmentComplete: t.f.ri.onSegmentComplete, + startDTS: dts, + startNTP: ntp, + log: t.f.ri, + } + t.f.currentSegment.initialize() + t.f.dw.setTarget(t.f.currentSegment) + + case (dts - t.f.currentSegment.lastFlush) >= t.f.ri.partDuration: + err := t.f.bw.Flush() + if err != nil { + return err + } + + t.f.currentSegment.lastFlush = dts + } + + t.f.currentSegment.lastDTS = dts + + return cb(t.track) +} diff --git a/internal/recorder/recorder.go b/internal/recorder/recorder.go index 4c93b3e1..336d8c86 100644 --- a/internal/recorder/recorder.go +++ b/internal/recorder/recorder.go @@ -9,6 +9,10 @@ import ( "github.com/bluenviron/mediamtx/internal/stream" ) +const ( + ntpDriftTolerance = 5 * time.Second +) + // OnSegmentCreateFunc is the prototype of the function passed as OnSegmentCreate type OnSegmentCreateFunc = func(path string) diff --git a/internal/recorder/recorder_instance.go b/internal/recorder/recorder_instance.go index c0b3e06f..2c44ec70 100644 --- a/internal/recorder/recorder_instance.go +++ b/internal/recorder/recorder_instance.go @@ -4,7 +4,6 @@ import ( "strings" "time" - "github.com/bluenviron/mediacommon/v2/pkg/formats/fmp4" "github.com/google/uuid" "github.com/bluenviron/mediamtx/internal/conf" @@ -13,12 +12,6 @@ import ( "github.com/bluenviron/mediamtx/internal/stream" ) -type sample struct { - *fmp4.Sample - dts int64 - ntp time.Time -} - type recorderInstance struct { pathFormat string format conf.RecordFormat diff --git a/internal/recorder/recorder_test.go b/internal/recorder/recorder_test.go index 6e1b8a3f..2a278542 100644 --- a/internal/recorder/recorder_test.go +++ b/internal/recorder/recorder_test.go @@ -75,7 +75,7 @@ func TestRecorder(t *testing.T) { writeToStream := func(strm *stream.Stream, startDTS int64, startNTP time.Time) { for i := range 2 { pts := startDTS + int64(i)*100*90000/1000 - ntp := startNTP.Add(time.Duration(i*60) * time.Second) + ntp := startNTP.Add(time.Duration(i*100) * time.Millisecond) strm.WriteUnit(desc.Medias[0], desc.Medias[0].Formats[0], &unit.Unit{ PTS: pts, @@ -178,7 +178,7 @@ func TestRecorder(t *testing.T) { case 0: require.Equal(t, filepath.Join(dir, "mypath", "2008-05-20_22-15-25-000000."+ext), segPath) case 1: - require.Equal(t, filepath.Join(dir, "mypath", "2008-05-20_22-16-25-000000."+ext), segPath) + require.Equal(t, filepath.Join(dir, "mypath", "2008-05-20_22-15-27-000000."+ext), segPath) default: require.Equal(t, filepath.Join(dir, "mypath", "2010-05-20_22-15-25-000000."+ext), segPath) } @@ -190,7 +190,7 @@ func TestRecorder(t *testing.T) { require.Equal(t, filepath.Join(dir, "mypath", "2008-05-20_22-15-25-000000."+ext), segPath) require.Equal(t, 2*time.Second, du) case 1: - require.Equal(t, filepath.Join(dir, "mypath", "2008-05-20_22-16-25-000000."+ext), segPath) + require.Equal(t, filepath.Join(dir, "mypath", "2008-05-20_22-15-27-000000."+ext), segPath) require.Equal(t, 100*time.Millisecond, du) default: require.Equal(t, filepath.Join(dir, "mypath", "2010-05-20_22-15-25-000000."+ext), segPath) @@ -210,7 +210,7 @@ func TestRecorder(t *testing.T) { writeToStream(strm, 52*90000, - time.Date(2008, 5, 20, 22, 16, 25, 0, time.UTC)) + time.Date(2008, 5, 20, 22, 15, 27, 0, time.UTC)) // simulate a write error strm.WriteUnit(desc.Medias[0], desc.Medias[0].Formats[0], &unit.Unit{ @@ -308,13 +308,13 @@ func TestRecorder(t *testing.T) { }, }, init) - _, err = os.Stat(filepath.Join(dir, "mypath", "2008-05-20_22-16-25-000000."+ext)) + _, err = os.Stat(filepath.Join(dir, "mypath", "2008-05-20_22-15-27-000000."+ext)) require.NoError(t, err) } else { _, err = os.Stat(filepath.Join(dir, "mypath", "2008-05-20_22-15-25-000000."+ext)) require.NoError(t, err) - _, err = os.Stat(filepath.Join(dir, "mypath", "2008-05-20_22-16-25-000000."+ext)) + _, err = os.Stat(filepath.Join(dir, "mypath", "2008-05-20_22-15-27-000000."+ext)) require.NoError(t, err) } @@ -821,3 +821,224 @@ func TestRecorderFMP4SegmentSwitch(t *testing.T) { require.Equal(t, 2, n) } + +func TestRecorderTimeDriftDetector(t *testing.T) { + for _, ca := range []string{"fmp4", "mpegts"} { + t.Run(ca, func(t *testing.T) { + desc := &description.Session{Medias: []*description.Media{ + { + Type: description.MediaTypeVideo, + Formats: []rtspformat.Format{&rtspformat.H264{ + PayloadTyp: 96, + PacketizationMode: 1, + }}, + }, + { + Type: description.MediaTypeAudio, + Formats: []rtspformat.Format{&rtspformat.MPEG4Audio{ + PayloadTyp: 96, + Config: &mpeg4audio.AudioSpecificConfig{ + Type: 2, + SampleRate: 44100, + ChannelCount: 2, + }, + SizeLength: 13, + IndexLength: 3, + IndexDeltaLength: 3, + }}, + }, + }} + + strm := &stream.Stream{ + WriteQueueSize: 512, + RTPMaxPayloadSize: 1450, + Desc: desc, + GenerateRTPPackets: true, + Parent: test.NilLogger, + } + err := strm.Initialize() + require.NoError(t, err) + defer strm.Close() + + dir, err := os.MkdirTemp("", "mediamtx-agent") + require.NoError(t, err) + defer os.RemoveAll(dir) + + recordPath := filepath.Join(dir, "%path/%Y-%m-%d_%H-%M-%S-%f") + + var ext string + if ca == "fmp4" { + ext = "mp4" + } else { + ext = "ts" + } + + segCreated := make(chan struct{}, 10) + segDone := make(chan struct{}, 10) + + var f conf.RecordFormat + if ca == "fmp4" { + f = conf.RecordFormatFMP4 + } else { + f = conf.RecordFormatMPEGTS + } + + w := &Recorder{ + PathFormat: recordPath, + Format: f, + PartDuration: 100 * time.Millisecond, + MaxPartSize: 50 * 1024 * 1024, + SegmentDuration: 1 * time.Second, + PathName: "mypath", + Stream: strm, + OnSegmentCreate: func(_ string) { + select { + case segCreated <- struct{}{}: + default: + } + }, + OnSegmentComplete: func(_ string, _ time.Duration) { + select { + case segDone <- struct{}{}: + default: + } + }, + Parent: test.NilLogger, + restartPause: 10 * time.Millisecond, + } + w.Initialize() + + // Write initial samples with correct timing + startDTS := int64(50 * 90000) + startNTP := time.Date(2008, 5, 20, 22, 15, 25, 0, time.UTC) + + for i := range 3 { + pts := startDTS + int64(i)*100*90000/1000 + ntp := startNTP.Add(time.Duration(i*100) * time.Millisecond) + + strm.WriteUnit(desc.Medias[0], desc.Medias[0].Formats[0], &unit.Unit{ + PTS: pts, + NTP: ntp, + Payload: unit.PayloadH264{ + test.FormatH264.SPS, + test.FormatH264.PPS, + {5}, // IDR + }, + }) + + strm.WriteUnit(desc.Medias[1], desc.Medias[1].Formats[0], &unit.Unit{ + PTS: pts * int64(desc.Medias[1].Formats[0].ClockRate()) / 90000, + NTP: ntp, + Payload: unit.PayloadMPEG4Audio{{1, 2, 3, 4}}, + }) + } + + // Wait for first segment to be created + select { + case <-segCreated: + case <-time.After(2 * time.Second): + t.Fatal("timeout waiting for first segment") + } + + // Write more samples to ensure segment has data + for i := 3; i < 15; i++ { + pts := startDTS + int64(i)*100*90000/1000 + ntp := startNTP.Add(time.Duration(i*100) * time.Millisecond) + + strm.WriteUnit(desc.Medias[0], desc.Medias[0].Formats[0], &unit.Unit{ + PTS: pts, + NTP: ntp, + Payload: unit.PayloadH264{ + test.FormatH264.SPS, + test.FormatH264.PPS, + {5}, // IDR + }, + }) + + strm.WriteUnit(desc.Medias[1], desc.Medias[1].Formats[0], &unit.Unit{ + PTS: pts * int64(desc.Medias[1].Formats[0].ClockRate()) / 90000, + NTP: ntp, + Payload: unit.PayloadMPEG4Audio{{1, 2, 3, 4}}, + }) + } + + // Simulate a time drift by advancing NTP time by more than 5 seconds + // while keeping DTS progression normal (only 100ms forward) + driftedPTS := startDTS + 15*100*90000/1000 + driftedNTP := startNTP.Add(15*100*time.Millisecond + 6*time.Second) // 6 second drift + + strm.WriteUnit(desc.Medias[0], desc.Medias[0].Formats[0], &unit.Unit{ + PTS: driftedPTS, + NTP: driftedNTP, + Payload: unit.PayloadH264{ + test.FormatH264.SPS, + test.FormatH264.PPS, + {5}, // IDR + }, + }) + + // Wait for the recorder to detect the drift, complete the segment, and restart + select { + case <-segDone: + case <-time.After(2 * time.Second): + t.Fatal("timeout waiting for segment completion after drift") + } + + // Give the recorder time to restart + time.Sleep(100 * time.Millisecond) + + // Write samples after restart with corrected timing + restartDTS := int64(60 * 90000) + restartNTP := time.Date(2008, 5, 20, 22, 15, 35, 0, time.UTC) + + for i := range 3 { + pts := restartDTS + int64(i)*100*90000/1000 + ntp := restartNTP.Add(time.Duration(i*100) * time.Millisecond) + + strm.WriteUnit(desc.Medias[0], desc.Medias[0].Formats[0], &unit.Unit{ + PTS: pts, + NTP: ntp, + Payload: unit.PayloadH264{ + test.FormatH264.SPS, + test.FormatH264.PPS, + {5}, // IDR + }, + }) + + strm.WriteUnit(desc.Medias[1], desc.Medias[1].Formats[0], &unit.Unit{ + PTS: pts * int64(desc.Medias[1].Formats[0].ClockRate()) / 90000, + NTP: ntp, + Payload: unit.PayloadMPEG4Audio{{1, 2, 3, 4}}, + }) + } + + // Wait for second segment to be created after restart + select { + case <-segCreated: + case <-time.After(2 * time.Second): + t.Fatal("timeout waiting for segment after restart") + } + + time.Sleep(50 * time.Millisecond) + + w.Close() + + // Wait for final segment to complete + select { + case <-segDone: + case <-time.After(2 * time.Second): + // This is not fatal as the final segment may complete during Close() + } + + // Verify that files were created + entries, err := os.ReadDir(filepath.Join(dir, "mypath")) + require.NoError(t, err) + require.GreaterOrEqual(t, len(entries), 2, "expected at least 2 segments (before and after drift)") + + // Verify files have the expected extension + for _, entry := range entries { + require.Equal(t, "."+ext, filepath.Ext(entry.Name())) + } + }) + } +}