recorder: write additional infos inside segments (#5083)

write stream ID, segment number, DTS, NTP in a dedicated box. This
allows to improve the merge algorithm in the playback server.
This commit is contained in:
Alessandro Ros 2025-10-13 12:34:44 +02:00 committed by GitHub
parent ccaccc51b4
commit 02e2b9d640
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
11 changed files with 166 additions and 80 deletions

2
go.mod
View file

@ -12,7 +12,7 @@ require (
github.com/bluenviron/gohlslib/v2 v2.2.3 github.com/bluenviron/gohlslib/v2 v2.2.3
github.com/bluenviron/gortmplib v0.1.0 github.com/bluenviron/gortmplib v0.1.0
github.com/bluenviron/gortsplib/v5 v5.0.1 github.com/bluenviron/gortsplib/v5 v5.0.1
github.com/bluenviron/mediacommon/v2 v2.4.3 github.com/bluenviron/mediacommon/v2 v2.4.4-0.20251012155238-8c0b4c88a199
github.com/datarhei/gosrt v0.9.0 github.com/datarhei/gosrt v0.9.0
github.com/fsnotify/fsnotify v1.9.0 github.com/fsnotify/fsnotify v1.9.0
github.com/gin-contrib/pprof v1.5.3 github.com/gin-contrib/pprof v1.5.3

4
go.sum
View file

@ -37,8 +37,8 @@ github.com/bluenviron/gortmplib v0.1.0 h1:VtcmOrSsNNmjF5/hFVk/bvNFdairX2+ejhRIZ9
github.com/bluenviron/gortmplib v0.1.0/go.mod h1:FtII41guzpc9wMhdmZFsIKuC2hXN3yCMRsFlHYp1qQA= github.com/bluenviron/gortmplib v0.1.0/go.mod h1:FtII41guzpc9wMhdmZFsIKuC2hXN3yCMRsFlHYp1qQA=
github.com/bluenviron/gortsplib/v5 v5.0.1 h1:mM3zK1T0WojnAwQTQ/IK3mDnOpy8Yvm11QK7UmB8grk= github.com/bluenviron/gortsplib/v5 v5.0.1 h1:mM3zK1T0WojnAwQTQ/IK3mDnOpy8Yvm11QK7UmB8grk=
github.com/bluenviron/gortsplib/v5 v5.0.1/go.mod h1:jc0WG8LUa6Kb8lkDbLCJzAFdc/Ek7vM2rlO4VCHhbEQ= github.com/bluenviron/gortsplib/v5 v5.0.1/go.mod h1:jc0WG8LUa6Kb8lkDbLCJzAFdc/Ek7vM2rlO4VCHhbEQ=
github.com/bluenviron/mediacommon/v2 v2.4.3 h1:GFXKaMFgnQqbKv+uaAEfmgBZXBBRTtwabVepDowVDtM= github.com/bluenviron/mediacommon/v2 v2.4.4-0.20251012155238-8c0b4c88a199 h1:Ib+Azwbjy02RKurl+QHeKn+ZkLBdOzOFrxkUs425KHU=
github.com/bluenviron/mediacommon/v2 v2.4.3/go.mod h1:zy1fODPuS/kBd93ftgJS1Jhvjq7LFWfAo32KP7By9AE= github.com/bluenviron/mediacommon/v2 v2.4.4-0.20251012155238-8c0b4c88a199/go.mod h1:zy1fODPuS/kBd93ftgJS1Jhvjq7LFWfAo32KP7By9AE=
github.com/bytedance/sonic v1.14.0 h1:/OfKt8HFw0kh2rj8N0F6C/qPGRESq0BbaNZgcNXXzQQ= github.com/bytedance/sonic v1.14.0 h1:/OfKt8HFw0kh2rj8N0F6C/qPGRESq0BbaNZgcNXXzQQ=
github.com/bytedance/sonic v1.14.0/go.mod h1:WoEbx8WTcFJfzCe0hbmyTGrfjt8PzNEBdxlNUO24NhA= github.com/bytedance/sonic v1.14.0/go.mod h1:WoEbx8WTcFJfzCe0hbmyTGrfjt8PzNEBdxlNUO24NhA=
github.com/bytedance/sonic/loader v0.3.0 h1:dskwH8edlzNMctoruo8FPTJDF3vLtDT0sXZwvZJyqeA= github.com/bytedance/sonic/loader v0.3.0 h1:dskwH8edlzNMctoruo8FPTJDF3vLtDT0sXZwvZJyqeA=

View file

@ -104,6 +104,7 @@ type formatFMP4 struct {
tracks []*formatFMP4Track tracks []*formatFMP4Track
hasVideo bool hasVideo bool
currentSegment *formatFMP4Segment currentSegment *formatFMP4Segment
nextSegmentNumber uint64
} }
func (f *formatFMP4) initialize() bool { func (f *formatFMP4) initialize() bool {

View file

@ -3,15 +3,11 @@ package recorder
import ( import (
"fmt" "fmt"
"io" "io"
"os"
"path/filepath"
"time" "time"
"github.com/bluenviron/mediacommon/v2/pkg/formats/fmp4" "github.com/bluenviron/mediacommon/v2/pkg/formats/fmp4"
"github.com/bluenviron/mediacommon/v2/pkg/formats/fmp4/seekablebuffer" "github.com/bluenviron/mediacommon/v2/pkg/formats/fmp4/seekablebuffer"
"github.com/bluenviron/mediamtx/internal/conf"
"github.com/bluenviron/mediamtx/internal/logger"
"github.com/bluenviron/mediamtx/internal/recordstore"
) )
func writePart( func writePart(
@ -42,8 +38,9 @@ func writePart(
} }
type formatFMP4Part struct { type formatFMP4Part struct {
s *formatFMP4Segment maxPartSize conf.StringSize
sequenceNumber uint32 segmentStartDTS time.Duration
number uint32
startDTS time.Duration startDTS time.Duration
partTracks map[*formatFMP4Track]*fmp4.PartTrack partTracks map[*formatFMP4Track]*fmp4.PartTrack
@ -55,38 +52,13 @@ func (p *formatFMP4Part) initialize() {
p.partTracks = make(map[*formatFMP4Track]*fmp4.PartTrack) p.partTracks = make(map[*formatFMP4Track]*fmp4.PartTrack)
} }
func (p *formatFMP4Part) close() error { func (p *formatFMP4Part) close(w io.Writer) error {
if p.s.fi == nil { return writePart(w, p.number, p.partTracks)
p.s.path = recordstore.Path{Start: p.s.startNTP}.Encode(p.s.f.ri.pathFormat2)
p.s.f.ri.Log(logger.Debug, "creating segment %s", p.s.path)
err := os.MkdirAll(filepath.Dir(p.s.path), 0o755)
if err != nil {
return err
}
fi, err := os.Create(p.s.path)
if err != nil {
return err
}
p.s.f.ri.onSegmentCreate(p.s.path)
err = writeInit(fi, p.s.f.tracks)
if err != nil {
fi.Close()
return err
}
p.s.fi = fi
}
return writePart(p.s.fi, p.sequenceNumber, p.partTracks)
} }
func (p *formatFMP4Part) write(track *formatFMP4Track, sample *sample, dts time.Duration) error { func (p *formatFMP4Part) write(track *formatFMP4Track, sample *sample, dts time.Duration) error {
size := uint64(len(sample.Payload)) size := uint64(len(sample.Payload))
if (p.size + size) > uint64(p.s.f.ri.maxPartSize) { if (p.size + size) > uint64(p.maxPartSize) {
return fmt.Errorf("reached maximum part size") return fmt.Errorf("reached maximum part size")
} }
p.size += size p.size += size
@ -95,7 +67,7 @@ func (p *formatFMP4Part) write(track *formatFMP4Track, sample *sample, dts time.
if !ok { if !ok {
partTrack = &fmp4.PartTrack{ partTrack = &fmp4.PartTrack{
ID: track.initTrack.ID, ID: track.initTrack.ID,
BaseTime: uint64(multiplyAndDivide(int64(dts-p.s.startDTS), BaseTime: uint64(multiplyAndDivide(int64(dts-p.segmentStartDTS),
int64(track.initTrack.TimeScale), int64(time.Second))), int64(track.initTrack.TimeScale), int64(time.Second))),
} }
p.partTracks[track] = partTrack p.partTracks[track] = partTrack

View file

@ -5,16 +5,26 @@ import (
"fmt" "fmt"
"io" "io"
"os" "os"
"path/filepath"
"time" "time"
"github.com/abema/go-mp4" amp4 "github.com/abema/go-mp4"
"github.com/google/uuid"
"github.com/bluenviron/mediacommon/v2/pkg/formats/fmp4" "github.com/bluenviron/mediacommon/v2/pkg/formats/fmp4"
"github.com/bluenviron/mediacommon/v2/pkg/formats/fmp4/seekablebuffer" "github.com/bluenviron/mediacommon/v2/pkg/formats/fmp4/seekablebuffer"
"github.com/bluenviron/mediamtx/internal/logger" "github.com/bluenviron/mediamtx/internal/logger"
"github.com/bluenviron/mediamtx/internal/recordstore"
) )
func writeInit(f io.Writer, tracks []*formatFMP4Track) error { func writeInit(
f io.Writer,
streamID uuid.UUID,
segmentNumber uint64,
dts time.Duration,
ntp time.Time,
tracks []*formatFMP4Track,
) error {
fmp4Tracks := make([]*fmp4.InitTrack, len(tracks)) fmp4Tracks := make([]*fmp4.InitTrack, len(tracks))
for i, track := range tracks { for i, track := range tracks {
fmp4Tracks[i] = track.initTrack fmp4Tracks[i] = track.initTrack
@ -22,6 +32,17 @@ func writeInit(f io.Writer, tracks []*formatFMP4Track) error {
init := fmp4.Init{ init := fmp4.Init{
Tracks: fmp4Tracks, Tracks: fmp4Tracks,
UserData: []amp4.IBox{
&recordstore.Mtxi{
FullBox: amp4.FullBox{
Version: 0,
},
StreamID: streamID,
SegmentNumber: segmentNumber,
DTS: int64(dts),
NTP: ntp.UnixNano(),
},
},
} }
var buf seekablebuffer.Buffer var buf seekablebuffer.Buffer
@ -77,8 +98,8 @@ func writeDuration(f io.ReadWriteSeeker, d time.Duration) error {
return err return err
} }
var mvhd mp4.Mvhd var mvhd amp4.Mvhd
_, err = mp4.Unmarshal(f, uint64(moovSize-8), &mvhd, mp4.Context{}) _, err = amp4.Unmarshal(f, uint64(moovSize-8), &mvhd, amp4.Context{})
if err != nil { if err != nil {
return err return err
} }
@ -90,7 +111,7 @@ func writeDuration(f io.ReadWriteSeeker, d time.Duration) error {
return err return err
} }
_, err = mp4.Marshal(f, &mvhd, mp4.Context{}) _, err = amp4.Marshal(f, &mvhd, amp4.Context{})
if err != nil { if err != nil {
return err return err
} }
@ -102,12 +123,13 @@ type formatFMP4Segment struct {
f *formatFMP4 f *formatFMP4
startDTS time.Duration startDTS time.Duration
startNTP time.Time startNTP time.Time
number uint64
path string path string
fi *os.File fi *os.File
curPart *formatFMP4Part curPart *formatFMP4Part
endDTS time.Duration endDTS time.Duration
nextSequenceNumber uint32 nextPartNumber uint32
} }
func (s *formatFMP4Segment) initialize() { func (s *formatFMP4Segment) initialize() {
@ -118,7 +140,7 @@ func (s *formatFMP4Segment) close() error {
var err error var err error
if s.curPart != nil { if s.curPart != nil {
err = s.curPart.close() err = s.closeCurPart()
} }
if s.fi != nil { if s.fi != nil {
@ -144,6 +166,41 @@ func (s *formatFMP4Segment) close() error {
return err return err
} }
func (s *formatFMP4Segment) closeCurPart() error {
if s.fi == nil {
s.path = recordstore.Path{Start: s.startNTP}.Encode(s.f.ri.pathFormat2)
s.f.ri.Log(logger.Debug, "creating segment %s", s.path)
err := os.MkdirAll(filepath.Dir(s.path), 0o755)
if err != nil {
return err
}
fi, err := os.Create(s.path)
if err != nil {
return err
}
s.f.ri.onSegmentCreate(s.path)
err = writeInit(
fi,
s.f.ri.streamID,
s.number,
s.startDTS,
s.startNTP,
s.f.tracks)
if err != nil {
fi.Close()
return err
}
s.fi = fi
}
return s.curPart.close(s.fi)
}
func (s *formatFMP4Segment) write(track *formatFMP4Track, sample *sample, dts time.Duration) error { func (s *formatFMP4Segment) write(track *formatFMP4Track, sample *sample, dts time.Duration) error {
endDTS := dts + timestampToDuration(int64(sample.Duration), int(track.initTrack.TimeScale)) endDTS := dts + timestampToDuration(int64(sample.Duration), int(track.initTrack.TimeScale))
if endDTS > s.endDTS { if endDTS > s.endDTS {
@ -152,14 +209,15 @@ func (s *formatFMP4Segment) write(track *formatFMP4Track, sample *sample, dts ti
if s.curPart == nil { if s.curPart == nil {
s.curPart = &formatFMP4Part{ s.curPart = &formatFMP4Part{
s: s, maxPartSize: s.f.ri.maxPartSize,
sequenceNumber: s.nextSequenceNumber, segmentStartDTS: s.startDTS,
number: s.nextPartNumber,
startDTS: dts, startDTS: dts,
} }
s.curPart.initialize() s.curPart.initialize()
s.nextSequenceNumber++ s.nextPartNumber++
} else if s.curPart.duration() >= s.f.ri.partDuration { } else if s.curPart.duration() >= s.f.ri.partDuration {
err := s.curPart.close() err := s.closeCurPart()
s.curPart = nil s.curPart = nil
if err != nil { if err != nil {
@ -167,12 +225,13 @@ func (s *formatFMP4Segment) write(track *formatFMP4Track, sample *sample, dts ti
} }
s.curPart = &formatFMP4Part{ s.curPart = &formatFMP4Part{
s: s, maxPartSize: s.f.ri.maxPartSize,
sequenceNumber: s.nextSequenceNumber, segmentStartDTS: s.startDTS,
number: s.nextPartNumber,
startDTS: dts, startDTS: dts,
} }
s.curPart.initialize() s.curPart.initialize()
s.nextSequenceNumber++ s.nextPartNumber++
} }
return s.curPart.write(track, sample, dts) return s.curPart.write(track, sample, dts)

View file

@ -67,8 +67,10 @@ func (t *formatFMP4Track) write(sample *sample) error {
f: t.f, f: t.f,
startDTS: dts, startDTS: dts,
startNTP: sample.ntp, startNTP: sample.ntp,
number: t.f.nextSegmentNumber,
} }
t.f.currentSegment.initialize() t.f.currentSegment.initialize()
t.f.nextSegmentNumber++
} else if (dts - t.f.currentSegment.startDTS) < 0 { // BaseTime is negative, this is not supported by fMP4 } else if (dts - t.f.currentSegment.startDTS) < 0 { // BaseTime is negative, this is not supported by fMP4
t.f.ri.Log(logger.Warn, "sample of track %d received too late, discarding", t.initTrack.ID) t.f.ri.Log(logger.Warn, "sample of track %d received too late, discarding", t.initTrack.ID)
return nil return nil
@ -95,8 +97,10 @@ func (t *formatFMP4Track) write(sample *sample) error {
f: t.f, f: t.f,
startDTS: oldestDTS, startDTS: oldestDTS,
startNTP: oldestNTP, startNTP: oldestNTP,
number: t.f.nextSegmentNumber,
} }
t.f.currentSegment.initialize() t.f.currentSegment.initialize()
t.f.nextSegmentNumber++
} }
return nil return nil

View file

@ -468,11 +468,16 @@ func (f *formatMPEGTS) write(
switch { switch {
case f.currentSegment == nil: case f.currentSegment == nil:
f.currentSegment = &formatMPEGTSSegment{ f.currentSegment = &formatMPEGTSSegment{
f: f, pathFormat2: f.ri.pathFormat2,
flush: f.bw.Flush,
onSegmentCreate: f.ri.onSegmentCreate,
onSegmentComplete: f.ri.onSegmentComplete,
startDTS: dts, startDTS: dts,
startNTP: ntp, startNTP: ntp,
log: f.ri,
} }
f.currentSegment.initialize() f.currentSegment.initialize()
f.dw.setTarget(f.currentSegment)
case (!f.hasVideo || isVideo) && case (!f.hasVideo || isVideo) &&
randomAccess && randomAccess &&
(dts-f.currentSegment.startDTS) >= f.ri.segmentDuration: (dts-f.currentSegment.startDTS) >= f.ri.segmentDuration:
@ -483,11 +488,16 @@ func (f *formatMPEGTS) write(
} }
f.currentSegment = &formatMPEGTSSegment{ f.currentSegment = &formatMPEGTSSegment{
f: f, pathFormat2: f.ri.pathFormat2,
flush: f.bw.Flush,
onSegmentCreate: f.ri.onSegmentCreate,
onSegmentComplete: f.ri.onSegmentComplete,
startDTS: dts, startDTS: dts,
startNTP: ntp, startNTP: ntp,
log: f.ri,
} }
f.currentSegment.initialize() f.currentSegment.initialize()
f.dw.setTarget(f.currentSegment)
case (dts - f.currentSegment.lastFlush) >= f.ri.partDuration: case (dts - f.currentSegment.lastFlush) >= f.ri.partDuration:
err := f.bw.Flush() err := f.bw.Flush()

View file

@ -10,9 +10,13 @@ import (
) )
type formatMPEGTSSegment struct { type formatMPEGTSSegment struct {
f *formatMPEGTS pathFormat2 string
flush func() error
onSegmentCreate OnSegmentCreateFunc
onSegmentComplete OnSegmentCompleteFunc
startDTS time.Duration startDTS time.Duration
startNTP time.Time startNTP time.Time
log logger.Writer
path string path string
fi *os.File fi *os.File
@ -23,14 +27,13 @@ type formatMPEGTSSegment struct {
func (s *formatMPEGTSSegment) initialize() { func (s *formatMPEGTSSegment) initialize() {
s.lastFlush = s.startDTS s.lastFlush = s.startDTS
s.lastDTS = s.startDTS s.lastDTS = s.startDTS
s.f.dw.setTarget(s)
} }
func (s *formatMPEGTSSegment) close() error { func (s *formatMPEGTSSegment) close() error {
err := s.f.bw.Flush() err := s.flush()
if s.fi != nil { if s.fi != nil {
s.f.ri.Log(logger.Debug, "closing segment %s", s.path) s.log.Log(logger.Debug, "closing segment %s", s.path)
err2 := s.fi.Close() err2 := s.fi.Close()
if err == nil { if err == nil {
err = err2 err = err2
@ -38,7 +41,7 @@ func (s *formatMPEGTSSegment) close() error {
if err2 == nil { if err2 == nil {
duration := s.lastDTS - s.startDTS duration := s.lastDTS - s.startDTS
s.f.ri.onSegmentComplete(s.path, duration) s.onSegmentComplete(s.path, duration)
} }
} }
@ -47,8 +50,8 @@ func (s *formatMPEGTSSegment) close() error {
func (s *formatMPEGTSSegment) Write(p []byte) (int, error) { func (s *formatMPEGTSSegment) Write(p []byte) (int, error) {
if s.fi == nil { if s.fi == nil {
s.path = recordstore.Path{Start: s.startNTP}.Encode(s.f.ri.pathFormat2) s.path = recordstore.Path{Start: s.startNTP}.Encode(s.pathFormat2)
s.f.ri.Log(logger.Debug, "creating segment %s", s.path) s.log.Log(logger.Debug, "creating segment %s", s.path)
err := os.MkdirAll(filepath.Dir(s.path), 0o755) err := os.MkdirAll(filepath.Dir(s.path), 0o755)
if err != nil { if err != nil {
@ -60,7 +63,7 @@ func (s *formatMPEGTSSegment) Write(p []byte) (int, error) {
return 0, err return 0, err
} }
s.f.ri.onSegmentCreate(s.path) s.onSegmentCreate(s.path)
s.fi = fi s.fi = fi
} }

View file

@ -5,6 +5,7 @@ import (
"time" "time"
"github.com/bluenviron/mediacommon/v2/pkg/formats/fmp4" "github.com/bluenviron/mediacommon/v2/pkg/formats/fmp4"
"github.com/google/uuid"
"github.com/bluenviron/mediamtx/internal/conf" "github.com/bluenviron/mediamtx/internal/conf"
"github.com/bluenviron/mediamtx/internal/logger" "github.com/bluenviron/mediamtx/internal/logger"
@ -30,6 +31,7 @@ type recorderInstance struct {
onSegmentComplete OnSegmentCompleteFunc onSegmentComplete OnSegmentCompleteFunc
parent logger.Writer parent logger.Writer
streamID uuid.UUID
pathFormat2 string pathFormat2 string
format2 format format2 format
skip bool skip bool
@ -45,6 +47,7 @@ func (ri *recorderInstance) Log(level logger.Level, format string, args ...inter
} }
func (ri *recorderInstance) initialize() { func (ri *recorderInstance) initialize() {
ri.streamID = uuid.New()
ri.pathFormat2 = ri.pathFormat ri.pathFormat2 = ri.pathFormat
ri.pathFormat2 = recordstore.PathAddExtension( ri.pathFormat2 = recordstore.PathAddExtension(
strings.ReplaceAll(ri.pathFormat2, "%path", ri.pathName), strings.ReplaceAll(ri.pathFormat2, "%path", ri.pathName),

View file

@ -7,6 +7,7 @@ import (
"testing" "testing"
"time" "time"
amp4 "github.com/abema/go-mp4"
"github.com/bluenviron/gortsplib/v5/pkg/description" "github.com/bluenviron/gortsplib/v5/pkg/description"
rtspformat "github.com/bluenviron/gortsplib/v5/pkg/format" rtspformat "github.com/bluenviron/gortsplib/v5/pkg/format"
"github.com/bluenviron/mediacommon/v2/pkg/codecs/mpeg4audio" "github.com/bluenviron/mediacommon/v2/pkg/codecs/mpeg4audio"
@ -16,6 +17,7 @@ import (
"github.com/bluenviron/mediamtx/internal/conf" "github.com/bluenviron/mediamtx/internal/conf"
"github.com/bluenviron/mediamtx/internal/logger" "github.com/bluenviron/mediamtx/internal/logger"
"github.com/bluenviron/mediamtx/internal/recordstore"
"github.com/bluenviron/mediamtx/internal/stream" "github.com/bluenviron/mediamtx/internal/stream"
"github.com/bluenviron/mediamtx/internal/test" "github.com/bluenviron/mediamtx/internal/test"
"github.com/bluenviron/mediamtx/internal/unit" "github.com/bluenviron/mediamtx/internal/unit"
@ -271,6 +273,13 @@ func TestRecorder(t *testing.T) {
}, },
}, },
}, },
UserData: []amp4.IBox{
&recordstore.Mtxi{
StreamID: init.UserData[0].(*recordstore.Mtxi).StreamID,
DTS: 50000000000,
NTP: 1211321725000000000,
},
},
}, init) }, init)
_, err = os.Stat(filepath.Join(dir, "mypath", "2008-05-20_22-16-25-000000."+ext)) _, err = os.Stat(filepath.Join(dir, "mypath", "2008-05-20_22-16-25-000000."+ext))

View file

@ -0,0 +1,25 @@
package recordstore
import (
amp4 "github.com/abema/go-mp4"
)
func boxTypeMtxi() amp4.BoxType { return amp4.StrToBoxType("mtxi") }
func init() { //nolint:gochecknoinits
amp4.AddBoxDef(&Mtxi{}, 0)
}
// Mtxi is a MediaMTX segment info.
type Mtxi struct {
amp4.FullBox `mp4:"0,extend"`
StreamID [16]byte `mp4:"1,size=8"`
SegmentNumber uint64 `mp4:"2,size=64"`
DTS int64 `mp4:"3,size=64"`
NTP int64 `mp4:"4,size=64"`
}
// GetType implements amp4.IBox.
func (*Mtxi) GetType() amp4.BoxType {
return boxTypeMtxi()
}