recorder: reset when absolute time drifts from stream time (#4778) (#5239)

the server now detects when system time changes too much and restarts
recordings when that happens.
This commit is contained in:
Alessandro Ros 2025-12-02 18:10:23 +01:00 committed by GitHub
parent 9c5930464f
commit fb9027a334
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
13 changed files with 432 additions and 147 deletions

View file

@ -13,7 +13,7 @@ and supports sending absolute timestamps with the following protocols:
- RTSP - RTSP
- WebRTC - WebRTC
By default, absolute timestamps of incoming frames are not used, instead they are replaced with the current timestamp. This prevents users from arbitrarily changing recording dates, and also allows to support sources that do not send absolute timestamps. It is possible to preserve original absolute timestamps by toggling the `useAbsoluteTimestamp` parameter: By default, absolute timestamps of incoming frames are not used, instead they are replaced with the system timestamp. This prevents users from arbitrarily changing recording dates, and also allows to support sources that do not send absolute timestamps. It is possible to preserve original absolute timestamps by toggling the `useAbsoluteTimestamp` parameter:
```yml ```yml
pathDefaults: pathDefaults:

2
go.mod
View file

@ -13,7 +13,7 @@ require (
github.com/bluenviron/gohlslib/v2 v2.2.4 github.com/bluenviron/gohlslib/v2 v2.2.4
github.com/bluenviron/gortmplib v0.1.1 github.com/bluenviron/gortmplib v0.1.1
github.com/bluenviron/gortsplib/v5 v5.2.0 github.com/bluenviron/gortsplib/v5 v5.2.0
github.com/bluenviron/mediacommon/v2 v2.5.1 github.com/bluenviron/mediacommon/v2 v2.5.2-0.20251201152746-8d059e8616fb
github.com/datarhei/gosrt v0.9.0 github.com/datarhei/gosrt v0.9.0
github.com/fsnotify/fsnotify v1.9.0 github.com/fsnotify/fsnotify v1.9.0
github.com/gin-contrib/pprof v1.5.3 github.com/gin-contrib/pprof v1.5.3

4
go.sum
View file

@ -39,8 +39,8 @@ github.com/bluenviron/gortmplib v0.1.1 h1:pmR6qfPcJJmE17lWQ/bpuBFZtgGnMrN8KdFj1G
github.com/bluenviron/gortmplib v0.1.1/go.mod h1:XWy2YzbTP1XEEZ8232OG7I1MSwubsbDRKDNhXGgS2kg= github.com/bluenviron/gortmplib v0.1.1/go.mod h1:XWy2YzbTP1XEEZ8232OG7I1MSwubsbDRKDNhXGgS2kg=
github.com/bluenviron/gortsplib/v5 v5.2.0 h1:yk0H9Z1Z+H41/x5hDt84rKm6+MNA483NsRXPYe+or/A= github.com/bluenviron/gortsplib/v5 v5.2.0 h1:yk0H9Z1Z+H41/x5hDt84rKm6+MNA483NsRXPYe+or/A=
github.com/bluenviron/gortsplib/v5 v5.2.0/go.mod h1:UYCbHEb0T49kBDgIlTJaZOchD2f5g1JigFmmxQfW7vY= github.com/bluenviron/gortsplib/v5 v5.2.0/go.mod h1:UYCbHEb0T49kBDgIlTJaZOchD2f5g1JigFmmxQfW7vY=
github.com/bluenviron/mediacommon/v2 v2.5.1 h1:qB2fb5c0xyl5OB2gfSfulpEJn7Cdm3vI2n8wjiLMxKI= github.com/bluenviron/mediacommon/v2 v2.5.2-0.20251201152746-8d059e8616fb h1:42lRaSsrPXvwB9kLGIujU9yONrSPPp0j4Ohwg6zp/yw=
github.com/bluenviron/mediacommon/v2 v2.5.1/go.mod h1:zy1fODPuS/kBd93ftgJS1Jhvjq7LFWfAo32KP7By9AE= github.com/bluenviron/mediacommon/v2 v2.5.2-0.20251201152746-8d059e8616fb/go.mod h1:5V15TiOfeaNVmZPVuOqAwqQSWyvMV86/dijDKu5q9Zs=
github.com/bytedance/sonic v1.14.0 h1:/OfKt8HFw0kh2rj8N0F6C/qPGRESq0BbaNZgcNXXzQQ= github.com/bytedance/sonic v1.14.0 h1:/OfKt8HFw0kh2rj8N0F6C/qPGRESq0BbaNZgcNXXzQQ=
github.com/bytedance/sonic v1.14.0/go.mod h1:WoEbx8WTcFJfzCe0hbmyTGrfjt8PzNEBdxlNUO24NhA= github.com/bytedance/sonic v1.14.0/go.mod h1:WoEbx8WTcFJfzCe0hbmyTGrfjt8PzNEBdxlNUO24NhA=
github.com/bytedance/sonic/loader v0.3.0 h1:dskwH8edlzNMctoruo8FPTJDF3vLtDT0sXZwvZJyqeA= github.com/bytedance/sonic/loader v0.3.0 h1:dskwH8edlzNMctoruo8FPTJDF3vLtDT0sXZwvZJyqeA=

View file

@ -31,6 +31,10 @@ var zero = time.Time{}
func (e *Estimator) Estimate(pts int64) time.Time { func (e *Estimator) Estimate(pts int64) time.Time {
now := timeNow() now := timeNow()
// do not store monotonic clock, in order to include
// system clock changes into time differences
now = now.Round(0)
if e.refNTP.Equal(zero) { if e.refNTP.Equal(zero) {
e.refNTP = now e.refNTP = now
e.refPTS = pts e.refPTS = pts

View file

@ -98,6 +98,12 @@ func jpegExtractSize(image []byte) (int, int, error) {
} }
} }
type formatFMP4Sample struct {
*fmp4.Sample
dts int64
ntp time.Time
}
type formatFMP4 struct { type formatFMP4 struct {
ri *recorderInstance ri *recorderInstance
@ -111,18 +117,15 @@ func (f *formatFMP4) initialize() bool {
nextID := 1 nextID := 1
addTrack := func(format rtspformat.Format, codec mp4.Codec) *formatFMP4Track { addTrack := func(format rtspformat.Format, codec mp4.Codec) *formatFMP4Track {
initTrack := &fmp4.InitTrack{
TimeScale: uint32(format.ClockRate()),
Codec: codec,
}
initTrack.ID = nextID
nextID++
track := &formatFMP4Track{ track := &formatFMP4Track{
f: f, f: f,
initTrack: initTrack, id: nextID,
clockRate: uint32(format.ClockRate()),
codec: codec,
} }
track.initialize()
nextID++
f.tracks = append(f.tracks, track) f.tracks = append(f.tracks, track)
return track return track
} }
@ -180,7 +183,7 @@ func (f *formatFMP4) initialize() bool {
return err return err
} }
return track.write(&sample{ return track.write(&formatFMP4Sample{
Sample: &sampl, Sample: &sampl,
dts: u.PTS, dts: u.PTS,
ntp: u.NTP, ntp: u.NTP,
@ -257,7 +260,7 @@ func (f *formatFMP4) initialize() bool {
firstReceived = true firstReceived = true
} }
return track.write(&sample{ return track.write(&formatFMP4Sample{
Sample: &fmp4.Sample{ Sample: &fmp4.Sample{
IsNonSyncSample: !randomAccess, IsNonSyncSample: !randomAccess,
Payload: u.Payload.(unit.PayloadVP9), Payload: u.Payload.(unit.PayloadVP9),
@ -348,7 +351,7 @@ func (f *formatFMP4) initialize() bool {
return err return err
} }
return track.write(&sample{ return track.write(&formatFMP4Sample{
Sample: &sampl, Sample: &sampl,
dts: dts, dts: dts,
ntp: u.NTP, ntp: u.NTP,
@ -424,7 +427,7 @@ func (f *formatFMP4) initialize() bool {
return err return err
} }
return track.write(&sample{ return track.write(&formatFMP4Sample{
Sample: &sampl, Sample: &sampl,
dts: dts, dts: dts,
ntp: u.NTP, ntp: u.NTP,
@ -481,7 +484,7 @@ func (f *formatFMP4) initialize() bool {
} }
lastPTS = u.PTS lastPTS = u.PTS
return track.write(&sample{ return track.write(&formatFMP4Sample{
Sample: &fmp4.Sample{ Sample: &fmp4.Sample{
Payload: u.Payload.(unit.PayloadMPEG4Video), Payload: u.Payload.(unit.PayloadMPEG4Video),
IsNonSyncSample: !randomAccess, IsNonSyncSample: !randomAccess,
@ -532,7 +535,7 @@ func (f *formatFMP4) initialize() bool {
} }
lastPTS = u.PTS lastPTS = u.PTS
return track.write(&sample{ return track.write(&formatFMP4Sample{
Sample: &fmp4.Sample{ Sample: &fmp4.Sample{
Payload: u.Payload.(unit.PayloadMPEG1Video), Payload: u.Payload.(unit.PayloadMPEG1Video),
IsNonSyncSample: !randomAccess, IsNonSyncSample: !randomAccess,
@ -570,7 +573,7 @@ func (f *formatFMP4) initialize() bool {
f.updateCodecParams() f.updateCodecParams()
} }
return track.write(&sample{ return track.write(&formatFMP4Sample{
Sample: &fmp4.Sample{ Sample: &fmp4.Sample{
Payload: u.Payload.(unit.PayloadMJPEG), Payload: u.Payload.(unit.PayloadMJPEG),
}, },
@ -596,7 +599,7 @@ func (f *formatFMP4) initialize() bool {
pts := u.PTS pts := u.PTS
for _, packet := range u.Payload.(unit.PayloadOpus) { for _, packet := range u.Payload.(unit.PayloadOpus) {
err := track.write(&sample{ err := track.write(&formatFMP4Sample{
Sample: &fmp4.Sample{ Sample: &fmp4.Sample{
Payload: packet, Payload: packet,
}, },
@ -630,7 +633,7 @@ func (f *formatFMP4) initialize() bool {
for i, au := range u.Payload.(unit.PayloadMPEG4Audio) { for i, au := range u.Payload.(unit.PayloadMPEG4Audio) {
pts := u.PTS + int64(i)*mpeg4audio.SamplesPerAccessUnit pts := u.PTS + int64(i)*mpeg4audio.SamplesPerAccessUnit
err := track.write(&sample{ err := track.write(&formatFMP4Sample{
Sample: &fmp4.Sample{ Sample: &fmp4.Sample{
Payload: au, Payload: au,
}, },
@ -667,7 +670,7 @@ func (f *formatFMP4) initialize() bool {
return err return err
} }
return track.write(&sample{ return track.write(&formatFMP4Sample{
Sample: &fmp4.Sample{ Sample: &fmp4.Sample{
Payload: ame.Payloads[0][0][0], Payload: ame.Payloads[0][0][0],
}, },
@ -710,7 +713,7 @@ func (f *formatFMP4) initialize() bool {
f.updateCodecParams() f.updateCodecParams()
} }
err = track.write(&sample{ err = track.write(&formatFMP4Sample{
Sample: &fmp4.Sample{ Sample: &fmp4.Sample{
Payload: frame, Payload: frame,
}, },
@ -779,7 +782,7 @@ func (f *formatFMP4) initialize() bool {
pts := u.PTS + int64(i)*ac3.SamplesPerFrame pts := u.PTS + int64(i)*ac3.SamplesPerFrame
err = track.write(&sample{ err = track.write(&formatFMP4Sample{
Sample: &fmp4.Sample{ Sample: &fmp4.Sample{
Payload: frame, Payload: frame,
}, },
@ -825,7 +828,7 @@ func (f *formatFMP4) initialize() bool {
lpcm = al lpcm = al
} }
return track.write(&sample{ return track.write(&formatFMP4Sample{
Sample: &fmp4.Sample{ Sample: &fmp4.Sample{
Payload: lpcm, Payload: lpcm,
}, },
@ -851,7 +854,7 @@ func (f *formatFMP4) initialize() bool {
return nil return nil
} }
return track.write(&sample{ return track.write(&formatFMP4Sample{
Sample: &fmp4.Sample{ Sample: &fmp4.Sample{
Payload: u.Payload.(unit.PayloadLPCM), Payload: u.Payload.(unit.PayloadLPCM),
}, },

View file

@ -56,7 +56,7 @@ func (p *formatFMP4Part) close(w io.Writer) error {
return writePart(w, p.number, p.partTracks) return writePart(w, p.number, p.partTracks)
} }
func (p *formatFMP4Part) write(track *formatFMP4Track, sample *sample, dts time.Duration) error { func (p *formatFMP4Part) write(track *formatFMP4Track, sample *formatFMP4Sample, dts time.Duration) error {
size := uint64(len(sample.Payload)) size := uint64(len(sample.Payload))
if (p.size + size) > uint64(p.maxPartSize) { if (p.size + size) > uint64(p.maxPartSize) {
return fmt.Errorf("reached maximum part size") return fmt.Errorf("reached maximum part size")

View file

@ -201,7 +201,7 @@ func (s *formatFMP4Segment) closeCurPart() error {
return s.curPart.close(s.fi) return s.curPart.close(s.fi)
} }
func (s *formatFMP4Segment) write(track *formatFMP4Track, sample *sample, dts time.Duration) error { func (s *formatFMP4Segment) write(track *formatFMP4Track, sample *formatFMP4Sample, dts time.Duration) error {
endDTS := dts + timestampToDuration(int64(sample.Duration), int(track.initTrack.TimeScale)) endDTS := dts + timestampToDuration(int64(sample.Duration), int(track.initTrack.TimeScale))
if endDTS > s.endDTS { if endDTS > s.endDTS {
s.endDTS = endDTS s.endDTS = endDTS

View file

@ -1,9 +1,11 @@
package recorder package recorder
import ( import (
"fmt"
"time" "time"
"github.com/bluenviron/mediacommon/v2/pkg/formats/fmp4" "github.com/bluenviron/mediacommon/v2/pkg/formats/fmp4"
"github.com/bluenviron/mediacommon/v2/pkg/formats/mp4"
"github.com/bluenviron/mediamtx/internal/logger" "github.com/bluenviron/mediamtx/internal/logger"
) )
@ -43,12 +45,26 @@ func nextSegmentStartingPos(tracks []*formatFMP4Track) (time.Time, time.Duration
type formatFMP4Track struct { type formatFMP4Track struct {
f *formatFMP4 f *formatFMP4
initTrack *fmp4.InitTrack id int
clockRate uint32
codec mp4.Codec
nextSample *sample initTrack *fmp4.InitTrack
nextSample *formatFMP4Sample
startInitialized bool
startDTS time.Duration
startNTP time.Time
} }
func (t *formatFMP4Track) write(sample *sample) error { func (t *formatFMP4Track) initialize() {
t.initTrack = &fmp4.InitTrack{
ID: t.id,
TimeScale: t.clockRate,
Codec: t.codec,
}
}
func (t *formatFMP4Track) write(sample *formatFMP4Sample) error {
// wait the first video sample before setting hasVideo // wait the first video sample before setting hasVideo
if t.initTrack.Codec.IsVideo() { if t.initTrack.Codec.IsVideo() {
t.f.hasVideo = true t.f.hasVideo = true
@ -69,6 +85,17 @@ func (t *formatFMP4Track) write(sample *sample) error {
dts := timestampToDuration(sample.dts, int(t.initTrack.TimeScale)) dts := timestampToDuration(sample.dts, int(t.initTrack.TimeScale))
if !t.startInitialized {
t.startDTS = dts
t.startNTP = sample.ntp
t.startInitialized = true
} else {
drift := sample.ntp.Sub(t.startNTP) - (dts - t.startDTS)
if drift < -ntpDriftTolerance || drift > ntpDriftTolerance {
return fmt.Errorf("detected drift between recording duration and absolute time, resetting")
}
}
if t.f.currentSegment == nil { if t.f.currentSegment == nil {
t.f.currentSegment = &formatFMP4Segment{ t.f.currentSegment = &formatFMP4Segment{
f: t.f, f: t.f,

View file

@ -66,11 +66,14 @@ type formatMPEGTS struct {
func (f *formatMPEGTS) initialize() bool { func (f *formatMPEGTS) initialize() bool {
var tracks []*mpegts.Track var tracks []*mpegts.Track
addTrack := func(codec mpegts.Codec) *mpegts.Track { addTrack := func(codec mpegts.Codec) *formatMPEGTSTrack {
track := &mpegts.Track{ track := &formatMPEGTSTrack{
Codec: codec, f: f,
codec: codec,
} }
tracks = append(tracks, track) track.initialize()
tracks = append(tracks, track.track)
return track return track
} }
@ -107,14 +110,13 @@ func (f *formatMPEGTS) initialize() bool {
return err return err
} }
return f.write( return track.write(
timestampToDuration(dts, clockRate), timestampToDuration(dts, clockRate),
u.NTP, u.NTP,
true,
randomAccess, randomAccess,
func() error { func(mtrack *mpegts.Track) error {
return f.mw.WriteH265( return f.mw.WriteH265(
track, mtrack,
u.PTS, // no conversion is needed since clock rate is 90khz in both MPEG-TS and RTSP u.PTS, // no conversion is needed since clock rate is 90khz in both MPEG-TS and RTSP
dts, dts,
u.Payload.(unit.PayloadH265)) u.Payload.(unit.PayloadH265))
@ -150,14 +152,13 @@ func (f *formatMPEGTS) initialize() bool {
return err return err
} }
return f.write( return track.write(
timestampToDuration(dts, clockRate), timestampToDuration(dts, clockRate),
u.NTP, u.NTP,
true,
randomAccess, randomAccess,
func() error { func(mtrack *mpegts.Track) error {
return f.mw.WriteH264( return f.mw.WriteH264(
track, mtrack,
u.PTS, // no conversion is needed since clock rate is 90khz in both MPEG-TS and RTSP u.PTS, // no conversion is needed since clock rate is 90khz in both MPEG-TS and RTSP
dts, dts,
u.Payload.(unit.PayloadH264)) u.Payload.(unit.PayloadH264))
@ -189,14 +190,13 @@ func (f *formatMPEGTS) initialize() bool {
randomAccess := bytes.Contains(u.Payload.(unit.PayloadMPEG4Video), randomAccess := bytes.Contains(u.Payload.(unit.PayloadMPEG4Video),
[]byte{0, 0, 1, byte(mpeg4video.GroupOfVOPStartCode)}) []byte{0, 0, 1, byte(mpeg4video.GroupOfVOPStartCode)})
return f.write( return track.write(
timestampToDuration(u.PTS, clockRate), timestampToDuration(u.PTS, clockRate),
u.NTP, u.NTP,
true,
randomAccess, randomAccess,
func() error { func(mtrack *mpegts.Track) error {
return f.mw.WriteMPEG4Video( return f.mw.WriteMPEG4Video(
track, mtrack,
u.PTS, // no conversion is needed since clock rate is 90khz in both MPEG-TS and RTSP u.PTS, // no conversion is needed since clock rate is 90khz in both MPEG-TS and RTSP
u.Payload.(unit.PayloadMPEG4Video)) u.Payload.(unit.PayloadMPEG4Video))
}, },
@ -226,14 +226,13 @@ func (f *formatMPEGTS) initialize() bool {
randomAccess := bytes.Contains(u.Payload.(unit.PayloadMPEG1Video), []byte{0, 0, 1, 0xB8}) randomAccess := bytes.Contains(u.Payload.(unit.PayloadMPEG1Video), []byte{0, 0, 1, 0xB8})
return f.write( return track.write(
timestampToDuration(u.PTS, clockRate), timestampToDuration(u.PTS, clockRate),
u.NTP, u.NTP,
true,
randomAccess, randomAccess,
func() error { func(mtrack *mpegts.Track) error {
return f.mw.WriteMPEG1Video( return f.mw.WriteMPEG1Video(
track, mtrack,
u.PTS, // no conversion is needed since clock rate is 90khz in both MPEG-TS and RTSP u.PTS, // no conversion is needed since clock rate is 90khz in both MPEG-TS and RTSP
u.Payload.(unit.PayloadMPEG1Video)) u.Payload.(unit.PayloadMPEG1Video))
}, },
@ -253,14 +252,13 @@ func (f *formatMPEGTS) initialize() bool {
return nil return nil
} }
return f.write( return track.write(
timestampToDuration(u.PTS, clockRate), timestampToDuration(u.PTS, clockRate),
u.NTP, u.NTP,
false,
true, true,
func() error { func(mtrack *mpegts.Track) error {
return f.mw.WriteOpus( return f.mw.WriteOpus(
track, mtrack,
multiplyAndDivide(u.PTS, 90000, int64(clockRate)), multiplyAndDivide(u.PTS, 90000, int64(clockRate)),
u.Payload.(unit.PayloadOpus)) u.Payload.(unit.PayloadOpus))
}, },
@ -280,13 +278,15 @@ func (f *formatMPEGTS) initialize() bool {
return nil return nil
} }
return f.write( return track.write(
timestampToDuration(u.PTS, 90000), timestampToDuration(u.PTS, 90000),
u.NTP, u.NTP,
false,
true, true,
func() error { func(mtrack *mpegts.Track) error {
return f.mw.WriteKLV(track, multiplyAndDivide(u.PTS, 90000, 90000), u.Payload.(unit.PayloadKLV)) return f.mw.WriteKLV(
mtrack,
multiplyAndDivide(u.PTS, 90000, 90000),
u.Payload.(unit.PayloadKLV))
}, },
) )
}) })
@ -304,14 +304,13 @@ func (f *formatMPEGTS) initialize() bool {
return nil return nil
} }
return f.write( return track.write(
timestampToDuration(u.PTS, clockRate), timestampToDuration(u.PTS, clockRate),
u.NTP, u.NTP,
false,
true, true,
func() error { func(mtrack *mpegts.Track) error {
return f.mw.WriteMPEG4Audio( return f.mw.WriteMPEG4Audio(
track, mtrack,
multiplyAndDivide(u.PTS, 90000, int64(clockRate)), multiplyAndDivide(u.PTS, 90000, int64(clockRate)),
u.Payload.(unit.PayloadMPEG4Audio)) u.Payload.(unit.PayloadMPEG4Audio))
}, },
@ -339,14 +338,13 @@ func (f *formatMPEGTS) initialize() bool {
return err return err
} }
return f.write( return track.write(
timestampToDuration(u.PTS, clockRate), timestampToDuration(u.PTS, clockRate),
u.NTP, u.NTP,
false,
true, true,
func() error { func(mtrack *mpegts.Track) error {
return f.mw.WriteMPEG4Audio( return f.mw.WriteMPEG4Audio(
track, mtrack,
multiplyAndDivide(u.PTS, 90000, int64(clockRate)), multiplyAndDivide(u.PTS, 90000, int64(clockRate)),
[][]byte{ame.Payloads[0][0][0]}) [][]byte{ame.Payloads[0][0][0]})
}, },
@ -365,14 +363,13 @@ func (f *formatMPEGTS) initialize() bool {
return nil return nil
} }
return f.write( return track.write(
timestampToDuration(u.PTS, clockRate), timestampToDuration(u.PTS, clockRate),
u.NTP, u.NTP,
false,
true, true,
func() error { func(mtrack *mpegts.Track) error {
return f.mw.WriteMPEG1Audio( return f.mw.WriteMPEG1Audio(
track, mtrack,
u.PTS, // no conversion is needed since clock rate is 90khz in both MPEG-TS and RTSP u.PTS, // no conversion is needed since clock rate is 90khz in both MPEG-TS and RTSP
u.Payload.(unit.PayloadMPEG1Audio)) u.Payload.(unit.PayloadMPEG1Audio))
}, },
@ -390,17 +387,16 @@ func (f *formatMPEGTS) initialize() bool {
return nil return nil
} }
return f.write( return track.write(
timestampToDuration(u.PTS, clockRate), timestampToDuration(u.PTS, clockRate),
u.NTP, u.NTP,
false,
true, true,
func() error { func(mtrack *mpegts.Track) error {
for i, frame := range u.Payload.(unit.PayloadAC3) { for i, frame := range u.Payload.(unit.PayloadAC3) {
framePTS := u.PTS + int64(i)*ac3.SamplesPerFrame framePTS := u.PTS + int64(i)*ac3.SamplesPerFrame
err := f.mw.WriteAC3( err := f.mw.WriteAC3(
track, mtrack,
multiplyAndDivide(framePTS, 90000, int64(clockRate)), multiplyAndDivide(framePTS, 90000, int64(clockRate)),
frame) frame)
if err != nil { if err != nil {
@ -453,62 +449,3 @@ func (f *formatMPEGTS) close() {
f.currentSegment.close() //nolint:errcheck f.currentSegment.close() //nolint:errcheck
} }
} }
func (f *formatMPEGTS) write(
dts time.Duration,
ntp time.Time,
isVideo bool,
randomAccess bool,
writeCB func() error,
) error {
if isVideo {
f.hasVideo = true
}
switch {
case f.currentSegment == nil:
f.currentSegment = &formatMPEGTSSegment{
pathFormat2: f.ri.pathFormat2,
flush: f.bw.Flush,
onSegmentCreate: f.ri.onSegmentCreate,
onSegmentComplete: f.ri.onSegmentComplete,
startDTS: dts,
startNTP: ntp,
log: f.ri,
}
f.currentSegment.initialize()
f.dw.setTarget(f.currentSegment)
case (!f.hasVideo || isVideo) &&
randomAccess &&
(dts-f.currentSegment.startDTS) >= f.ri.segmentDuration:
f.currentSegment.lastDTS = dts
err := f.currentSegment.close()
if err != nil {
return err
}
f.currentSegment = &formatMPEGTSSegment{
pathFormat2: f.ri.pathFormat2,
flush: f.bw.Flush,
onSegmentCreate: f.ri.onSegmentCreate,
onSegmentComplete: f.ri.onSegmentComplete,
startDTS: dts,
startNTP: ntp,
log: f.ri,
}
f.currentSegment.initialize()
f.dw.setTarget(f.currentSegment)
case (dts - f.currentSegment.lastFlush) >= f.ri.partDuration:
err := f.bw.Flush()
if err != nil {
return err
}
f.currentSegment.lastFlush = dts
}
f.currentSegment.lastDTS = dts
return writeCB()
}

View file

@ -0,0 +1,96 @@
package recorder
import (
"fmt"
"time"
"github.com/bluenviron/mediacommon/v2/pkg/formats/mpegts"
)
type formatMPEGTSTrack struct {
f *formatMPEGTS
codec mpegts.Codec
track *mpegts.Track
startInitialized bool
startDTS time.Duration
startNTP time.Time
}
func (t *formatMPEGTSTrack) initialize() {
t.track = &mpegts.Track{
Codec: t.codec,
}
}
func (t *formatMPEGTSTrack) write(
dts time.Duration,
ntp time.Time,
randomAccess bool,
cb func(track *mpegts.Track) error,
) error {
isVideo := t.track.Codec.IsVideo()
if isVideo {
t.f.hasVideo = true
}
if !t.startInitialized {
t.startDTS = dts
t.startNTP = ntp
t.startInitialized = true
} else {
drift := ntp.Sub(t.startNTP) - (dts - t.startDTS)
if drift < -ntpDriftTolerance || drift > ntpDriftTolerance {
return fmt.Errorf("detected drift between recording duration and absolute time, resetting")
}
}
switch {
case t.f.currentSegment == nil:
t.f.currentSegment = &formatMPEGTSSegment{
pathFormat2: t.f.ri.pathFormat2,
flush: t.f.bw.Flush,
onSegmentCreate: t.f.ri.onSegmentCreate,
onSegmentComplete: t.f.ri.onSegmentComplete,
startDTS: dts,
startNTP: ntp,
log: t.f.ri,
}
t.f.currentSegment.initialize()
t.f.dw.setTarget(t.f.currentSegment)
case (!t.f.hasVideo || isVideo) &&
randomAccess &&
(dts-t.f.currentSegment.startDTS) >= t.f.ri.segmentDuration:
t.f.currentSegment.lastDTS = dts
err := t.f.currentSegment.close()
if err != nil {
return err
}
t.f.currentSegment = &formatMPEGTSSegment{
pathFormat2: t.f.ri.pathFormat2,
flush: t.f.bw.Flush,
onSegmentCreate: t.f.ri.onSegmentCreate,
onSegmentComplete: t.f.ri.onSegmentComplete,
startDTS: dts,
startNTP: ntp,
log: t.f.ri,
}
t.f.currentSegment.initialize()
t.f.dw.setTarget(t.f.currentSegment)
case (dts - t.f.currentSegment.lastFlush) >= t.f.ri.partDuration:
err := t.f.bw.Flush()
if err != nil {
return err
}
t.f.currentSegment.lastFlush = dts
}
t.f.currentSegment.lastDTS = dts
return cb(t.track)
}

View file

@ -9,6 +9,10 @@ import (
"github.com/bluenviron/mediamtx/internal/stream" "github.com/bluenviron/mediamtx/internal/stream"
) )
const (
ntpDriftTolerance = 5 * time.Second
)
// OnSegmentCreateFunc is the prototype of the function passed as OnSegmentCreate // OnSegmentCreateFunc is the prototype of the function passed as OnSegmentCreate
type OnSegmentCreateFunc = func(path string) type OnSegmentCreateFunc = func(path string)

View file

@ -4,7 +4,6 @@ import (
"strings" "strings"
"time" "time"
"github.com/bluenviron/mediacommon/v2/pkg/formats/fmp4"
"github.com/google/uuid" "github.com/google/uuid"
"github.com/bluenviron/mediamtx/internal/conf" "github.com/bluenviron/mediamtx/internal/conf"
@ -13,12 +12,6 @@ import (
"github.com/bluenviron/mediamtx/internal/stream" "github.com/bluenviron/mediamtx/internal/stream"
) )
type sample struct {
*fmp4.Sample
dts int64
ntp time.Time
}
type recorderInstance struct { type recorderInstance struct {
pathFormat string pathFormat string
format conf.RecordFormat format conf.RecordFormat

View file

@ -75,7 +75,7 @@ func TestRecorder(t *testing.T) {
writeToStream := func(strm *stream.Stream, startDTS int64, startNTP time.Time) { writeToStream := func(strm *stream.Stream, startDTS int64, startNTP time.Time) {
for i := range 2 { for i := range 2 {
pts := startDTS + int64(i)*100*90000/1000 pts := startDTS + int64(i)*100*90000/1000
ntp := startNTP.Add(time.Duration(i*60) * time.Second) ntp := startNTP.Add(time.Duration(i*100) * time.Millisecond)
strm.WriteUnit(desc.Medias[0], desc.Medias[0].Formats[0], &unit.Unit{ strm.WriteUnit(desc.Medias[0], desc.Medias[0].Formats[0], &unit.Unit{
PTS: pts, PTS: pts,
@ -178,7 +178,7 @@ func TestRecorder(t *testing.T) {
case 0: case 0:
require.Equal(t, filepath.Join(dir, "mypath", "2008-05-20_22-15-25-000000."+ext), segPath) require.Equal(t, filepath.Join(dir, "mypath", "2008-05-20_22-15-25-000000."+ext), segPath)
case 1: case 1:
require.Equal(t, filepath.Join(dir, "mypath", "2008-05-20_22-16-25-000000."+ext), segPath) require.Equal(t, filepath.Join(dir, "mypath", "2008-05-20_22-15-27-000000."+ext), segPath)
default: default:
require.Equal(t, filepath.Join(dir, "mypath", "2010-05-20_22-15-25-000000."+ext), segPath) require.Equal(t, filepath.Join(dir, "mypath", "2010-05-20_22-15-25-000000."+ext), segPath)
} }
@ -190,7 +190,7 @@ func TestRecorder(t *testing.T) {
require.Equal(t, filepath.Join(dir, "mypath", "2008-05-20_22-15-25-000000."+ext), segPath) require.Equal(t, filepath.Join(dir, "mypath", "2008-05-20_22-15-25-000000."+ext), segPath)
require.Equal(t, 2*time.Second, du) require.Equal(t, 2*time.Second, du)
case 1: case 1:
require.Equal(t, filepath.Join(dir, "mypath", "2008-05-20_22-16-25-000000."+ext), segPath) require.Equal(t, filepath.Join(dir, "mypath", "2008-05-20_22-15-27-000000."+ext), segPath)
require.Equal(t, 100*time.Millisecond, du) require.Equal(t, 100*time.Millisecond, du)
default: default:
require.Equal(t, filepath.Join(dir, "mypath", "2010-05-20_22-15-25-000000."+ext), segPath) require.Equal(t, filepath.Join(dir, "mypath", "2010-05-20_22-15-25-000000."+ext), segPath)
@ -210,7 +210,7 @@ func TestRecorder(t *testing.T) {
writeToStream(strm, writeToStream(strm,
52*90000, 52*90000,
time.Date(2008, 5, 20, 22, 16, 25, 0, time.UTC)) time.Date(2008, 5, 20, 22, 15, 27, 0, time.UTC))
// simulate a write error // simulate a write error
strm.WriteUnit(desc.Medias[0], desc.Medias[0].Formats[0], &unit.Unit{ strm.WriteUnit(desc.Medias[0], desc.Medias[0].Formats[0], &unit.Unit{
@ -308,13 +308,13 @@ func TestRecorder(t *testing.T) {
}, },
}, init) }, init)
_, err = os.Stat(filepath.Join(dir, "mypath", "2008-05-20_22-16-25-000000."+ext)) _, err = os.Stat(filepath.Join(dir, "mypath", "2008-05-20_22-15-27-000000."+ext))
require.NoError(t, err) require.NoError(t, err)
} else { } else {
_, err = os.Stat(filepath.Join(dir, "mypath", "2008-05-20_22-15-25-000000."+ext)) _, err = os.Stat(filepath.Join(dir, "mypath", "2008-05-20_22-15-25-000000."+ext))
require.NoError(t, err) require.NoError(t, err)
_, err = os.Stat(filepath.Join(dir, "mypath", "2008-05-20_22-16-25-000000."+ext)) _, err = os.Stat(filepath.Join(dir, "mypath", "2008-05-20_22-15-27-000000."+ext))
require.NoError(t, err) require.NoError(t, err)
} }
@ -821,3 +821,224 @@ func TestRecorderFMP4SegmentSwitch(t *testing.T) {
require.Equal(t, 2, n) require.Equal(t, 2, n)
} }
func TestRecorderTimeDriftDetector(t *testing.T) {
for _, ca := range []string{"fmp4", "mpegts"} {
t.Run(ca, func(t *testing.T) {
desc := &description.Session{Medias: []*description.Media{
{
Type: description.MediaTypeVideo,
Formats: []rtspformat.Format{&rtspformat.H264{
PayloadTyp: 96,
PacketizationMode: 1,
}},
},
{
Type: description.MediaTypeAudio,
Formats: []rtspformat.Format{&rtspformat.MPEG4Audio{
PayloadTyp: 96,
Config: &mpeg4audio.AudioSpecificConfig{
Type: 2,
SampleRate: 44100,
ChannelCount: 2,
},
SizeLength: 13,
IndexLength: 3,
IndexDeltaLength: 3,
}},
},
}}
strm := &stream.Stream{
WriteQueueSize: 512,
RTPMaxPayloadSize: 1450,
Desc: desc,
GenerateRTPPackets: true,
Parent: test.NilLogger,
}
err := strm.Initialize()
require.NoError(t, err)
defer strm.Close()
dir, err := os.MkdirTemp("", "mediamtx-agent")
require.NoError(t, err)
defer os.RemoveAll(dir)
recordPath := filepath.Join(dir, "%path/%Y-%m-%d_%H-%M-%S-%f")
var ext string
if ca == "fmp4" {
ext = "mp4"
} else {
ext = "ts"
}
segCreated := make(chan struct{}, 10)
segDone := make(chan struct{}, 10)
var f conf.RecordFormat
if ca == "fmp4" {
f = conf.RecordFormatFMP4
} else {
f = conf.RecordFormatMPEGTS
}
w := &Recorder{
PathFormat: recordPath,
Format: f,
PartDuration: 100 * time.Millisecond,
MaxPartSize: 50 * 1024 * 1024,
SegmentDuration: 1 * time.Second,
PathName: "mypath",
Stream: strm,
OnSegmentCreate: func(_ string) {
select {
case segCreated <- struct{}{}:
default:
}
},
OnSegmentComplete: func(_ string, _ time.Duration) {
select {
case segDone <- struct{}{}:
default:
}
},
Parent: test.NilLogger,
restartPause: 10 * time.Millisecond,
}
w.Initialize()
// Write initial samples with correct timing
startDTS := int64(50 * 90000)
startNTP := time.Date(2008, 5, 20, 22, 15, 25, 0, time.UTC)
for i := range 3 {
pts := startDTS + int64(i)*100*90000/1000
ntp := startNTP.Add(time.Duration(i*100) * time.Millisecond)
strm.WriteUnit(desc.Medias[0], desc.Medias[0].Formats[0], &unit.Unit{
PTS: pts,
NTP: ntp,
Payload: unit.PayloadH264{
test.FormatH264.SPS,
test.FormatH264.PPS,
{5}, // IDR
},
})
strm.WriteUnit(desc.Medias[1], desc.Medias[1].Formats[0], &unit.Unit{
PTS: pts * int64(desc.Medias[1].Formats[0].ClockRate()) / 90000,
NTP: ntp,
Payload: unit.PayloadMPEG4Audio{{1, 2, 3, 4}},
})
}
// Wait for first segment to be created
select {
case <-segCreated:
case <-time.After(2 * time.Second):
t.Fatal("timeout waiting for first segment")
}
// Write more samples to ensure segment has data
for i := 3; i < 15; i++ {
pts := startDTS + int64(i)*100*90000/1000
ntp := startNTP.Add(time.Duration(i*100) * time.Millisecond)
strm.WriteUnit(desc.Medias[0], desc.Medias[0].Formats[0], &unit.Unit{
PTS: pts,
NTP: ntp,
Payload: unit.PayloadH264{
test.FormatH264.SPS,
test.FormatH264.PPS,
{5}, // IDR
},
})
strm.WriteUnit(desc.Medias[1], desc.Medias[1].Formats[0], &unit.Unit{
PTS: pts * int64(desc.Medias[1].Formats[0].ClockRate()) / 90000,
NTP: ntp,
Payload: unit.PayloadMPEG4Audio{{1, 2, 3, 4}},
})
}
// Simulate a time drift by advancing NTP time by more than 5 seconds
// while keeping DTS progression normal (only 100ms forward)
driftedPTS := startDTS + 15*100*90000/1000
driftedNTP := startNTP.Add(15*100*time.Millisecond + 6*time.Second) // 6 second drift
strm.WriteUnit(desc.Medias[0], desc.Medias[0].Formats[0], &unit.Unit{
PTS: driftedPTS,
NTP: driftedNTP,
Payload: unit.PayloadH264{
test.FormatH264.SPS,
test.FormatH264.PPS,
{5}, // IDR
},
})
// Wait for the recorder to detect the drift, complete the segment, and restart
select {
case <-segDone:
case <-time.After(2 * time.Second):
t.Fatal("timeout waiting for segment completion after drift")
}
// Give the recorder time to restart
time.Sleep(100 * time.Millisecond)
// Write samples after restart with corrected timing
restartDTS := int64(60 * 90000)
restartNTP := time.Date(2008, 5, 20, 22, 15, 35, 0, time.UTC)
for i := range 3 {
pts := restartDTS + int64(i)*100*90000/1000
ntp := restartNTP.Add(time.Duration(i*100) * time.Millisecond)
strm.WriteUnit(desc.Medias[0], desc.Medias[0].Formats[0], &unit.Unit{
PTS: pts,
NTP: ntp,
Payload: unit.PayloadH264{
test.FormatH264.SPS,
test.FormatH264.PPS,
{5}, // IDR
},
})
strm.WriteUnit(desc.Medias[1], desc.Medias[1].Formats[0], &unit.Unit{
PTS: pts * int64(desc.Medias[1].Formats[0].ClockRate()) / 90000,
NTP: ntp,
Payload: unit.PayloadMPEG4Audio{{1, 2, 3, 4}},
})
}
// Wait for second segment to be created after restart
select {
case <-segCreated:
case <-time.After(2 * time.Second):
t.Fatal("timeout waiting for segment after restart")
}
time.Sleep(50 * time.Millisecond)
w.Close()
// Wait for final segment to complete
select {
case <-segDone:
case <-time.After(2 * time.Second):
// This is not fatal as the final segment may complete during Close()
}
// Verify that files were created
entries, err := os.ReadDir(filepath.Join(dir, "mypath"))
require.NoError(t, err)
require.GreaterOrEqual(t, len(entries), 2, "expected at least 2 segments (before and after drift)")
// Verify files have the expected extension
for _, entry := range entries {
require.Equal(t, "."+ext, filepath.Ext(entry.Name()))
}
})
}
}