From 02e2b9d64046338c81a8b0d4b3305440f16c485b Mon Sep 17 00:00:00 2001 From: Alessandro Ros Date: Mon, 13 Oct 2025 12:34:44 +0200 Subject: [PATCH] recorder: write additional infos inside segments (#5083) write stream ID, segment number, DTS, NTP in a dedicated box. This allows to improve the merge algorithm in the playback server. --- go.mod | 2 +- go.sum | 4 +- internal/recorder/format_fmp4.go | 7 +- internal/recorder/format_fmp4_part.go | 46 ++-------- internal/recorder/format_fmp4_segment.go | 101 ++++++++++++++++----- internal/recorder/format_fmp4_track.go | 4 + internal/recorder/format_mpegts.go | 22 +++-- internal/recorder/format_mpegts_segment.go | 23 +++-- internal/recorder/recorder_instance.go | 3 + internal/recorder/recorder_test.go | 9 ++ internal/recordstore/mp4_boxes.go | 25 +++++ 11 files changed, 166 insertions(+), 80 deletions(-) create mode 100644 internal/recordstore/mp4_boxes.go diff --git a/go.mod b/go.mod index f1e19506..15992a3b 100644 --- a/go.mod +++ b/go.mod @@ -12,7 +12,7 @@ require ( github.com/bluenviron/gohlslib/v2 v2.2.3 github.com/bluenviron/gortmplib v0.1.0 github.com/bluenviron/gortsplib/v5 v5.0.1 - github.com/bluenviron/mediacommon/v2 v2.4.3 + github.com/bluenviron/mediacommon/v2 v2.4.4-0.20251012155238-8c0b4c88a199 github.com/datarhei/gosrt v0.9.0 github.com/fsnotify/fsnotify v1.9.0 github.com/gin-contrib/pprof v1.5.3 diff --git a/go.sum b/go.sum index c7b4619d..ebf200dd 100644 --- a/go.sum +++ b/go.sum @@ -37,8 +37,8 @@ github.com/bluenviron/gortmplib v0.1.0 h1:VtcmOrSsNNmjF5/hFVk/bvNFdairX2+ejhRIZ9 github.com/bluenviron/gortmplib v0.1.0/go.mod h1:FtII41guzpc9wMhdmZFsIKuC2hXN3yCMRsFlHYp1qQA= github.com/bluenviron/gortsplib/v5 v5.0.1 h1:mM3zK1T0WojnAwQTQ/IK3mDnOpy8Yvm11QK7UmB8grk= github.com/bluenviron/gortsplib/v5 v5.0.1/go.mod h1:jc0WG8LUa6Kb8lkDbLCJzAFdc/Ek7vM2rlO4VCHhbEQ= -github.com/bluenviron/mediacommon/v2 v2.4.3 h1:GFXKaMFgnQqbKv+uaAEfmgBZXBBRTtwabVepDowVDtM= -github.com/bluenviron/mediacommon/v2 v2.4.3/go.mod h1:zy1fODPuS/kBd93ftgJS1Jhvjq7LFWfAo32KP7By9AE= +github.com/bluenviron/mediacommon/v2 v2.4.4-0.20251012155238-8c0b4c88a199 h1:Ib+Azwbjy02RKurl+QHeKn+ZkLBdOzOFrxkUs425KHU= +github.com/bluenviron/mediacommon/v2 v2.4.4-0.20251012155238-8c0b4c88a199/go.mod h1:zy1fODPuS/kBd93ftgJS1Jhvjq7LFWfAo32KP7By9AE= github.com/bytedance/sonic v1.14.0 h1:/OfKt8HFw0kh2rj8N0F6C/qPGRESq0BbaNZgcNXXzQQ= github.com/bytedance/sonic v1.14.0/go.mod h1:WoEbx8WTcFJfzCe0hbmyTGrfjt8PzNEBdxlNUO24NhA= github.com/bytedance/sonic/loader v0.3.0 h1:dskwH8edlzNMctoruo8FPTJDF3vLtDT0sXZwvZJyqeA= diff --git a/internal/recorder/format_fmp4.go b/internal/recorder/format_fmp4.go index 437346c1..2d954287 100644 --- a/internal/recorder/format_fmp4.go +++ b/internal/recorder/format_fmp4.go @@ -101,9 +101,10 @@ func jpegExtractSize(image []byte) (int, int, error) { type formatFMP4 struct { ri *recorderInstance - tracks []*formatFMP4Track - hasVideo bool - currentSegment *formatFMP4Segment + tracks []*formatFMP4Track + hasVideo bool + currentSegment *formatFMP4Segment + nextSegmentNumber uint64 } func (f *formatFMP4) initialize() bool { diff --git a/internal/recorder/format_fmp4_part.go b/internal/recorder/format_fmp4_part.go index d5462794..f0ca6d70 100644 --- a/internal/recorder/format_fmp4_part.go +++ b/internal/recorder/format_fmp4_part.go @@ -3,15 +3,11 @@ package recorder import ( "fmt" "io" - "os" - "path/filepath" "time" "github.com/bluenviron/mediacommon/v2/pkg/formats/fmp4" "github.com/bluenviron/mediacommon/v2/pkg/formats/fmp4/seekablebuffer" - - "github.com/bluenviron/mediamtx/internal/logger" - "github.com/bluenviron/mediamtx/internal/recordstore" + "github.com/bluenviron/mediamtx/internal/conf" ) func writePart( @@ -42,9 +38,10 @@ func writePart( } type formatFMP4Part struct { - s *formatFMP4Segment - sequenceNumber uint32 - startDTS time.Duration + maxPartSize conf.StringSize + segmentStartDTS time.Duration + number uint32 + startDTS time.Duration partTracks map[*formatFMP4Track]*fmp4.PartTrack size uint64 @@ -55,38 +52,13 @@ func (p *formatFMP4Part) initialize() { p.partTracks = make(map[*formatFMP4Track]*fmp4.PartTrack) } -func (p *formatFMP4Part) close() error { - if p.s.fi == nil { - p.s.path = recordstore.Path{Start: p.s.startNTP}.Encode(p.s.f.ri.pathFormat2) - p.s.f.ri.Log(logger.Debug, "creating segment %s", p.s.path) - - err := os.MkdirAll(filepath.Dir(p.s.path), 0o755) - if err != nil { - return err - } - - fi, err := os.Create(p.s.path) - if err != nil { - return err - } - - p.s.f.ri.onSegmentCreate(p.s.path) - - err = writeInit(fi, p.s.f.tracks) - if err != nil { - fi.Close() - return err - } - - p.s.fi = fi - } - - return writePart(p.s.fi, p.sequenceNumber, p.partTracks) +func (p *formatFMP4Part) close(w io.Writer) error { + return writePart(w, p.number, p.partTracks) } func (p *formatFMP4Part) write(track *formatFMP4Track, sample *sample, dts time.Duration) error { size := uint64(len(sample.Payload)) - if (p.size + size) > uint64(p.s.f.ri.maxPartSize) { + if (p.size + size) > uint64(p.maxPartSize) { return fmt.Errorf("reached maximum part size") } p.size += size @@ -95,7 +67,7 @@ func (p *formatFMP4Part) write(track *formatFMP4Track, sample *sample, dts time. if !ok { partTrack = &fmp4.PartTrack{ ID: track.initTrack.ID, - BaseTime: uint64(multiplyAndDivide(int64(dts-p.s.startDTS), + BaseTime: uint64(multiplyAndDivide(int64(dts-p.segmentStartDTS), int64(track.initTrack.TimeScale), int64(time.Second))), } p.partTracks[track] = partTrack diff --git a/internal/recorder/format_fmp4_segment.go b/internal/recorder/format_fmp4_segment.go index 1dae8b66..eb9bf1f8 100644 --- a/internal/recorder/format_fmp4_segment.go +++ b/internal/recorder/format_fmp4_segment.go @@ -5,16 +5,26 @@ import ( "fmt" "io" "os" + "path/filepath" "time" - "github.com/abema/go-mp4" + amp4 "github.com/abema/go-mp4" + "github.com/google/uuid" + "github.com/bluenviron/mediacommon/v2/pkg/formats/fmp4" "github.com/bluenviron/mediacommon/v2/pkg/formats/fmp4/seekablebuffer" - "github.com/bluenviron/mediamtx/internal/logger" + "github.com/bluenviron/mediamtx/internal/recordstore" ) -func writeInit(f io.Writer, tracks []*formatFMP4Track) error { +func writeInit( + f io.Writer, + streamID uuid.UUID, + segmentNumber uint64, + dts time.Duration, + ntp time.Time, + tracks []*formatFMP4Track, +) error { fmp4Tracks := make([]*fmp4.InitTrack, len(tracks)) for i, track := range tracks { fmp4Tracks[i] = track.initTrack @@ -22,6 +32,17 @@ func writeInit(f io.Writer, tracks []*formatFMP4Track) error { init := fmp4.Init{ Tracks: fmp4Tracks, + UserData: []amp4.IBox{ + &recordstore.Mtxi{ + FullBox: amp4.FullBox{ + Version: 0, + }, + StreamID: streamID, + SegmentNumber: segmentNumber, + DTS: int64(dts), + NTP: ntp.UnixNano(), + }, + }, } var buf seekablebuffer.Buffer @@ -77,8 +98,8 @@ func writeDuration(f io.ReadWriteSeeker, d time.Duration) error { return err } - var mvhd mp4.Mvhd - _, err = mp4.Unmarshal(f, uint64(moovSize-8), &mvhd, mp4.Context{}) + var mvhd amp4.Mvhd + _, err = amp4.Unmarshal(f, uint64(moovSize-8), &mvhd, amp4.Context{}) if err != nil { return err } @@ -90,7 +111,7 @@ func writeDuration(f io.ReadWriteSeeker, d time.Duration) error { return err } - _, err = mp4.Marshal(f, &mvhd, mp4.Context{}) + _, err = amp4.Marshal(f, &mvhd, amp4.Context{}) if err != nil { return err } @@ -102,12 +123,13 @@ type formatFMP4Segment struct { f *formatFMP4 startDTS time.Duration startNTP time.Time + number uint64 - path string - fi *os.File - curPart *formatFMP4Part - endDTS time.Duration - nextSequenceNumber uint32 + path string + fi *os.File + curPart *formatFMP4Part + endDTS time.Duration + nextPartNumber uint32 } func (s *formatFMP4Segment) initialize() { @@ -118,7 +140,7 @@ func (s *formatFMP4Segment) close() error { var err error if s.curPart != nil { - err = s.curPart.close() + err = s.closeCurPart() } if s.fi != nil { @@ -144,6 +166,41 @@ func (s *formatFMP4Segment) close() error { return err } +func (s *formatFMP4Segment) closeCurPart() error { + if s.fi == nil { + s.path = recordstore.Path{Start: s.startNTP}.Encode(s.f.ri.pathFormat2) + s.f.ri.Log(logger.Debug, "creating segment %s", s.path) + + err := os.MkdirAll(filepath.Dir(s.path), 0o755) + if err != nil { + return err + } + + fi, err := os.Create(s.path) + if err != nil { + return err + } + + s.f.ri.onSegmentCreate(s.path) + + err = writeInit( + fi, + s.f.ri.streamID, + s.number, + s.startDTS, + s.startNTP, + s.f.tracks) + if err != nil { + fi.Close() + return err + } + + s.fi = fi + } + + return s.curPart.close(s.fi) +} + func (s *formatFMP4Segment) write(track *formatFMP4Track, sample *sample, dts time.Duration) error { endDTS := dts + timestampToDuration(int64(sample.Duration), int(track.initTrack.TimeScale)) if endDTS > s.endDTS { @@ -152,14 +209,15 @@ func (s *formatFMP4Segment) write(track *formatFMP4Track, sample *sample, dts ti if s.curPart == nil { s.curPart = &formatFMP4Part{ - s: s, - sequenceNumber: s.nextSequenceNumber, - startDTS: dts, + maxPartSize: s.f.ri.maxPartSize, + segmentStartDTS: s.startDTS, + number: s.nextPartNumber, + startDTS: dts, } s.curPart.initialize() - s.nextSequenceNumber++ + s.nextPartNumber++ } else if s.curPart.duration() >= s.f.ri.partDuration { - err := s.curPart.close() + err := s.closeCurPart() s.curPart = nil if err != nil { @@ -167,12 +225,13 @@ func (s *formatFMP4Segment) write(track *formatFMP4Track, sample *sample, dts ti } s.curPart = &formatFMP4Part{ - s: s, - sequenceNumber: s.nextSequenceNumber, - startDTS: dts, + maxPartSize: s.f.ri.maxPartSize, + segmentStartDTS: s.startDTS, + number: s.nextPartNumber, + startDTS: dts, } s.curPart.initialize() - s.nextSequenceNumber++ + s.nextPartNumber++ } return s.curPart.write(track, sample, dts) diff --git a/internal/recorder/format_fmp4_track.go b/internal/recorder/format_fmp4_track.go index fe3b5f0f..c8ab82f5 100644 --- a/internal/recorder/format_fmp4_track.go +++ b/internal/recorder/format_fmp4_track.go @@ -67,8 +67,10 @@ func (t *formatFMP4Track) write(sample *sample) error { f: t.f, startDTS: dts, startNTP: sample.ntp, + number: t.f.nextSegmentNumber, } t.f.currentSegment.initialize() + t.f.nextSegmentNumber++ } else if (dts - t.f.currentSegment.startDTS) < 0 { // BaseTime is negative, this is not supported by fMP4 t.f.ri.Log(logger.Warn, "sample of track %d received too late, discarding", t.initTrack.ID) return nil @@ -95,8 +97,10 @@ func (t *formatFMP4Track) write(sample *sample) error { f: t.f, startDTS: oldestDTS, startNTP: oldestNTP, + number: t.f.nextSegmentNumber, } t.f.currentSegment.initialize() + t.f.nextSegmentNumber++ } return nil diff --git a/internal/recorder/format_mpegts.go b/internal/recorder/format_mpegts.go index 553a34f5..c2b36dc0 100644 --- a/internal/recorder/format_mpegts.go +++ b/internal/recorder/format_mpegts.go @@ -468,11 +468,16 @@ func (f *formatMPEGTS) write( switch { case f.currentSegment == nil: f.currentSegment = &formatMPEGTSSegment{ - f: f, - startDTS: dts, - startNTP: ntp, + pathFormat2: f.ri.pathFormat2, + flush: f.bw.Flush, + onSegmentCreate: f.ri.onSegmentCreate, + onSegmentComplete: f.ri.onSegmentComplete, + startDTS: dts, + startNTP: ntp, + log: f.ri, } f.currentSegment.initialize() + f.dw.setTarget(f.currentSegment) case (!f.hasVideo || isVideo) && randomAccess && (dts-f.currentSegment.startDTS) >= f.ri.segmentDuration: @@ -483,11 +488,16 @@ func (f *formatMPEGTS) write( } f.currentSegment = &formatMPEGTSSegment{ - f: f, - startDTS: dts, - startNTP: ntp, + pathFormat2: f.ri.pathFormat2, + flush: f.bw.Flush, + onSegmentCreate: f.ri.onSegmentCreate, + onSegmentComplete: f.ri.onSegmentComplete, + startDTS: dts, + startNTP: ntp, + log: f.ri, } f.currentSegment.initialize() + f.dw.setTarget(f.currentSegment) case (dts - f.currentSegment.lastFlush) >= f.ri.partDuration: err := f.bw.Flush() diff --git a/internal/recorder/format_mpegts_segment.go b/internal/recorder/format_mpegts_segment.go index 73f0e76b..facc05e2 100644 --- a/internal/recorder/format_mpegts_segment.go +++ b/internal/recorder/format_mpegts_segment.go @@ -10,9 +10,13 @@ import ( ) type formatMPEGTSSegment struct { - f *formatMPEGTS - startDTS time.Duration - startNTP time.Time + pathFormat2 string + flush func() error + onSegmentCreate OnSegmentCreateFunc + onSegmentComplete OnSegmentCompleteFunc + startDTS time.Duration + startNTP time.Time + log logger.Writer path string fi *os.File @@ -23,14 +27,13 @@ type formatMPEGTSSegment struct { func (s *formatMPEGTSSegment) initialize() { s.lastFlush = s.startDTS s.lastDTS = s.startDTS - s.f.dw.setTarget(s) } func (s *formatMPEGTSSegment) close() error { - err := s.f.bw.Flush() + err := s.flush() if s.fi != nil { - s.f.ri.Log(logger.Debug, "closing segment %s", s.path) + s.log.Log(logger.Debug, "closing segment %s", s.path) err2 := s.fi.Close() if err == nil { err = err2 @@ -38,7 +41,7 @@ func (s *formatMPEGTSSegment) close() error { if err2 == nil { duration := s.lastDTS - s.startDTS - s.f.ri.onSegmentComplete(s.path, duration) + s.onSegmentComplete(s.path, duration) } } @@ -47,8 +50,8 @@ func (s *formatMPEGTSSegment) close() error { func (s *formatMPEGTSSegment) Write(p []byte) (int, error) { if s.fi == nil { - s.path = recordstore.Path{Start: s.startNTP}.Encode(s.f.ri.pathFormat2) - s.f.ri.Log(logger.Debug, "creating segment %s", s.path) + s.path = recordstore.Path{Start: s.startNTP}.Encode(s.pathFormat2) + s.log.Log(logger.Debug, "creating segment %s", s.path) err := os.MkdirAll(filepath.Dir(s.path), 0o755) if err != nil { @@ -60,7 +63,7 @@ func (s *formatMPEGTSSegment) Write(p []byte) (int, error) { return 0, err } - s.f.ri.onSegmentCreate(s.path) + s.onSegmentCreate(s.path) s.fi = fi } diff --git a/internal/recorder/recorder_instance.go b/internal/recorder/recorder_instance.go index 7d9ba8e4..899ae6ee 100644 --- a/internal/recorder/recorder_instance.go +++ b/internal/recorder/recorder_instance.go @@ -5,6 +5,7 @@ import ( "time" "github.com/bluenviron/mediacommon/v2/pkg/formats/fmp4" + "github.com/google/uuid" "github.com/bluenviron/mediamtx/internal/conf" "github.com/bluenviron/mediamtx/internal/logger" @@ -30,6 +31,7 @@ type recorderInstance struct { onSegmentComplete OnSegmentCompleteFunc parent logger.Writer + streamID uuid.UUID pathFormat2 string format2 format skip bool @@ -45,6 +47,7 @@ func (ri *recorderInstance) Log(level logger.Level, format string, args ...inter } func (ri *recorderInstance) initialize() { + ri.streamID = uuid.New() ri.pathFormat2 = ri.pathFormat ri.pathFormat2 = recordstore.PathAddExtension( strings.ReplaceAll(ri.pathFormat2, "%path", ri.pathName), diff --git a/internal/recorder/recorder_test.go b/internal/recorder/recorder_test.go index cf2b8164..02873bb3 100644 --- a/internal/recorder/recorder_test.go +++ b/internal/recorder/recorder_test.go @@ -7,6 +7,7 @@ import ( "testing" "time" + amp4 "github.com/abema/go-mp4" "github.com/bluenviron/gortsplib/v5/pkg/description" rtspformat "github.com/bluenviron/gortsplib/v5/pkg/format" "github.com/bluenviron/mediacommon/v2/pkg/codecs/mpeg4audio" @@ -16,6 +17,7 @@ import ( "github.com/bluenviron/mediamtx/internal/conf" "github.com/bluenviron/mediamtx/internal/logger" + "github.com/bluenviron/mediamtx/internal/recordstore" "github.com/bluenviron/mediamtx/internal/stream" "github.com/bluenviron/mediamtx/internal/test" "github.com/bluenviron/mediamtx/internal/unit" @@ -271,6 +273,13 @@ func TestRecorder(t *testing.T) { }, }, }, + UserData: []amp4.IBox{ + &recordstore.Mtxi{ + StreamID: init.UserData[0].(*recordstore.Mtxi).StreamID, + DTS: 50000000000, + NTP: 1211321725000000000, + }, + }, }, init) _, err = os.Stat(filepath.Join(dir, "mypath", "2008-05-20_22-16-25-000000."+ext)) diff --git a/internal/recordstore/mp4_boxes.go b/internal/recordstore/mp4_boxes.go new file mode 100644 index 00000000..3a48ef3f --- /dev/null +++ b/internal/recordstore/mp4_boxes.go @@ -0,0 +1,25 @@ +package recordstore + +import ( + amp4 "github.com/abema/go-mp4" +) + +func boxTypeMtxi() amp4.BoxType { return amp4.StrToBoxType("mtxi") } + +func init() { //nolint:gochecknoinits + amp4.AddBoxDef(&Mtxi{}, 0) +} + +// Mtxi is a MediaMTX segment info. +type Mtxi struct { + amp4.FullBox `mp4:"0,extend"` + StreamID [16]byte `mp4:"1,size=8"` + SegmentNumber uint64 `mp4:"2,size=64"` + DTS int64 `mp4:"3,size=64"` + NTP int64 `mp4:"4,size=64"` +} + +// GetType implements amp4.IBox. +func (*Mtxi) GetType() amp4.BoxType { + return boxTypeMtxi() +}