mediamtx/internal/recorder/format_mpegts_track.go
Alessandro Ros fb9027a334
recorder: reset when absolute time drifts from stream time (#4778) (#5239)
the server now detects when system time changes too much and restarts
recordings when that happens.
2025-12-02 18:10:23 +01:00

96 lines
2.1 KiB
Go

package recorder
import (
"fmt"
"time"
"github.com/bluenviron/mediacommon/v2/pkg/formats/mpegts"
)
type formatMPEGTSTrack struct {
f *formatMPEGTS
codec mpegts.Codec
track *mpegts.Track
startInitialized bool
startDTS time.Duration
startNTP time.Time
}
func (t *formatMPEGTSTrack) initialize() {
t.track = &mpegts.Track{
Codec: t.codec,
}
}
func (t *formatMPEGTSTrack) write(
dts time.Duration,
ntp time.Time,
randomAccess bool,
cb func(track *mpegts.Track) error,
) error {
isVideo := t.track.Codec.IsVideo()
if isVideo {
t.f.hasVideo = true
}
if !t.startInitialized {
t.startDTS = dts
t.startNTP = ntp
t.startInitialized = true
} else {
drift := ntp.Sub(t.startNTP) - (dts - t.startDTS)
if drift < -ntpDriftTolerance || drift > ntpDriftTolerance {
return fmt.Errorf("detected drift between recording duration and absolute time, resetting")
}
}
switch {
case t.f.currentSegment == nil:
t.f.currentSegment = &formatMPEGTSSegment{
pathFormat2: t.f.ri.pathFormat2,
flush: t.f.bw.Flush,
onSegmentCreate: t.f.ri.onSegmentCreate,
onSegmentComplete: t.f.ri.onSegmentComplete,
startDTS: dts,
startNTP: ntp,
log: t.f.ri,
}
t.f.currentSegment.initialize()
t.f.dw.setTarget(t.f.currentSegment)
case (!t.f.hasVideo || isVideo) &&
randomAccess &&
(dts-t.f.currentSegment.startDTS) >= t.f.ri.segmentDuration:
t.f.currentSegment.lastDTS = dts
err := t.f.currentSegment.close()
if err != nil {
return err
}
t.f.currentSegment = &formatMPEGTSSegment{
pathFormat2: t.f.ri.pathFormat2,
flush: t.f.bw.Flush,
onSegmentCreate: t.f.ri.onSegmentCreate,
onSegmentComplete: t.f.ri.onSegmentComplete,
startDTS: dts,
startNTP: ntp,
log: t.f.ri,
}
t.f.currentSegment.initialize()
t.f.dw.setTarget(t.f.currentSegment)
case (dts - t.f.currentSegment.lastFlush) >= t.f.ri.partDuration:
err := t.f.bw.Flush()
if err != nil {
return err
}
t.f.currentSegment.lastFlush = dts
}
t.f.currentSegment.lastDTS = dts
return cb(t.track)
}