diff --git a/README.md b/README.md index f6fb91f9..72f67964 100644 --- a/README.md +++ b/README.md @@ -1669,6 +1669,7 @@ pathDefaults: # * G1, G2, ...: regular expression groups, if path name is # a regular expression. # * MTX_SEGMENT_PATH: segment file path + # * MTX_SEGMENT_DURATION: segment duration runOnRecordSegmentComplete: curl http://my-custom-server/webhook?path=$MTX_PATH&segment_path=$MTX_SEGMENT_PATH ``` diff --git a/internal/core/path.go b/internal/core/path.go index 226b7c3d..70ab2d54 100644 --- a/internal/core/path.go +++ b/internal/core/path.go @@ -806,10 +806,11 @@ func (pa *path) startRecording() { nil) } }, - OnSegmentComplete: func(segmentPath string) { + OnSegmentComplete: func(segmentPath string, segmentDuration time.Duration) { if pa.conf.RunOnRecordSegmentComplete != "" { env := pa.ExternalCmdEnv() env["MTX_SEGMENT_PATH"] = segmentPath + env["MTX_SEGMENT_DURATION"] = strconv.FormatFloat(segmentDuration.Seconds(), 'f', -1, 64) pa.Log(logger.Info, "runOnRecordSegmentComplete command launched") externalcmd.NewCmd( diff --git a/internal/core/path_test.go b/internal/core/path_test.go index 90e31900..145c00c7 100644 --- a/internal/core/path_test.go +++ b/internal/core/path_test.go @@ -105,12 +105,12 @@ func (sh *testServer) OnPlay(ctx *gortsplib.ServerHandlerOnPlayCtx) (*base.Respo var _ defs.Path = &path{} func TestPathRunOnDemand(t *testing.T) { - onDemandFile := filepath.Join(os.TempDir(), "ondemand") - onUnDemandFile := filepath.Join(os.TempDir(), "onundemand") + onDemand := filepath.Join(os.TempDir(), "on_demand") + onUnDemand := filepath.Join(os.TempDir(), "on_undemand") srcFile := filepath.Join(os.TempDir(), "ondemand.go") err := os.WriteFile(srcFile, - []byte(strings.ReplaceAll(runOnDemandSampleScript, "ON_DEMAND_FILE", onDemandFile)), 0o644) + []byte(strings.ReplaceAll(runOnDemandSampleScript, "ON_DEMAND_FILE", onDemand)), 0o644) require.NoError(t, err) execFile := filepath.Join(os.TempDir(), "ondemand_cmd") @@ -125,8 +125,8 @@ func TestPathRunOnDemand(t *testing.T) { for _, ca := range []string{"describe", "setup", "describe and setup"} { t.Run(ca, func(t *testing.T) { - defer os.Remove(onDemandFile) - defer os.Remove(onUnDemandFile) + defer os.Remove(onDemand) + defer os.Remove(onUnDemand) p1, ok := newInstance(fmt.Sprintf("rtmp: no\n"+ "hls: no\n"+ @@ -135,7 +135,7 @@ func TestPathRunOnDemand(t *testing.T) { " '~^(on)demand$':\n"+ " runOnDemand: %s\n"+ " runOnDemandCloseAfter: 1s\n"+ - " runOnUnDemand: touch %s\n", execFile, onUnDemandFile)) + " runOnUnDemand: touch %s\n", execFile, onUnDemand)) require.Equal(t, true, ok) defer p1.Close() @@ -204,14 +204,14 @@ func TestPathRunOnDemand(t *testing.T) { }() for { - _, err := os.Stat(onUnDemandFile) + _, err := os.Stat(onUnDemand) if err == nil { break } time.Sleep(100 * time.Millisecond) } - _, err := os.Stat(onDemandFile) + _, err := os.Stat(onDemand) require.NoError(t, err) }) } @@ -220,11 +220,11 @@ func TestPathRunOnDemand(t *testing.T) { func TestPathRunOnConnect(t *testing.T) { for _, ca := range []string{"rtsp", "rtmp", "srt"} { t.Run(ca, func(t *testing.T) { - onConnectFile := filepath.Join(os.TempDir(), "onconnect") - defer os.Remove(onConnectFile) + onConnect := filepath.Join(os.TempDir(), "on_connect") + defer os.Remove(onConnect) - onDisconnectFile := filepath.Join(os.TempDir(), "ondisconnect") - defer os.Remove(onDisconnectFile) + onDisconnect := filepath.Join(os.TempDir(), "on_disconnect") + defer os.Remove(onDisconnect) func() { p, ok := newInstance(fmt.Sprintf( @@ -232,7 +232,7 @@ func TestPathRunOnConnect(t *testing.T) { " test:\n"+ "runOnConnect: touch %s\n"+ "runOnDisconnect: touch %s\n", - onConnectFile, onDisconnectFile)) + onConnect, onDisconnect)) require.Equal(t, true, ok) defer p.Close() @@ -273,21 +273,21 @@ func TestPathRunOnConnect(t *testing.T) { time.Sleep(500 * time.Millisecond) }() - _, err := os.Stat(onConnectFile) + _, err := os.Stat(onConnect) require.NoError(t, err) - _, err = os.Stat(onDisconnectFile) + _, err = os.Stat(onDisconnect) require.NoError(t, err) }) } } func TestPathRunOnReady(t *testing.T) { - onReadyFile := filepath.Join(os.TempDir(), "onready") - defer os.Remove(onReadyFile) + onReady := filepath.Join(os.TempDir(), "on_ready") + defer os.Remove(onReady) - onNotReadyFile := filepath.Join(os.TempDir(), "onunready") - defer os.Remove(onNotReadyFile) + onNotReady := filepath.Join(os.TempDir(), "on_unready") + defer os.Remove(onNotReady) func() { p, ok := newInstance(fmt.Sprintf("rtmp: no\n"+ @@ -297,7 +297,7 @@ func TestPathRunOnReady(t *testing.T) { " test:\n"+ " runOnReady: sh -c 'echo \"$MTX_PATH $MTX_QUERY\" > %s'\n"+ " runOnNotReady: sh -c 'echo \"$MTX_PATH $MTX_QUERY\" > %s'\n", - onReadyFile, onNotReadyFile)) + onReady, onNotReady)) require.Equal(t, true, ok) defer p.Close() @@ -312,11 +312,11 @@ func TestPathRunOnReady(t *testing.T) { time.Sleep(500 * time.Millisecond) }() - byts, err := os.ReadFile(onReadyFile) + byts, err := os.ReadFile(onReady) require.NoError(t, err) require.Equal(t, "test query=value\n", string(byts)) - byts, err = os.ReadFile(onNotReadyFile) + byts, err = os.ReadFile(onNotReady) require.NoError(t, err) require.Equal(t, "test query=value\n", string(byts)) } @@ -324,11 +324,11 @@ func TestPathRunOnReady(t *testing.T) { func TestPathRunOnRead(t *testing.T) { for _, ca := range []string{"rtsp", "rtmp", "srt", "webrtc"} { t.Run(ca, func(t *testing.T) { - onReadFile := filepath.Join(os.TempDir(), "onread") - defer os.Remove(onReadFile) + onRead := filepath.Join(os.TempDir(), "on_read") + defer os.Remove(onRead) - onUnreadFile := filepath.Join(os.TempDir(), "onunread") - defer os.Remove(onUnreadFile) + onUnread := filepath.Join(os.TempDir(), "on_unread") + defer os.Remove(onUnread) func() { p, ok := newInstance(fmt.Sprintf( @@ -336,7 +336,7 @@ func TestPathRunOnRead(t *testing.T) { " test:\n"+ " runOnRead: sh -c 'echo \"$MTX_PATH $MTX_QUERY\" > %s'\n"+ " runOnUnread: sh -c 'echo \"$MTX_PATH $MTX_QUERY\" > %s'\n", - onReadFile, onUnreadFile)) + onRead, onUnread)) require.Equal(t, true, ok) defer p.Close() @@ -449,17 +449,79 @@ func TestPathRunOnRead(t *testing.T) { time.Sleep(500 * time.Millisecond) }() - byts, err := os.ReadFile(onReadFile) + byts, err := os.ReadFile(onRead) require.NoError(t, err) require.Equal(t, "test query=value\n", string(byts)) - byts, err = os.ReadFile(onUnreadFile) + byts, err = os.ReadFile(onUnread) require.NoError(t, err) require.Equal(t, "test query=value\n", string(byts)) }) } } +func TestPathRunOnRecordSegment(t *testing.T) { + onRecordSegmentCreate := filepath.Join(os.TempDir(), "on_record_segment_create") + defer os.Remove(onRecordSegmentCreate) + + onRecordSegmentComplete := filepath.Join(os.TempDir(), "on_record_segment_complete") + defer os.Remove(onRecordSegmentComplete) + + recordDir, err := os.MkdirTemp("", "rtsp-path-record") + require.NoError(t, err) + defer os.RemoveAll(recordDir) + + func() { + p, ok := newInstance("record: yes\n" + + "recordPath: " + filepath.Join(recordDir, "%path/%Y-%m-%d_%H-%M-%S-%f") + "\n" + + "paths:\n" + + " test:\n" + + " runOnRecordSegmentCreate: " + + "sh -c 'echo \"$MTX_SEGMENT_PATH\" > " + onRecordSegmentCreate + "'\n" + + " runOnRecordSegmentComplete: " + + "sh -c 'echo \"$MTX_SEGMENT_PATH $MTX_SEGMENT_DURATION\" > " + onRecordSegmentComplete + "'\n") + require.Equal(t, true, ok) + defer p.Close() + + media0 := test.UniqueMediaH264() + + source := gortsplib.Client{} + + err = source.StartRecording( + "rtsp://localhost:8554/test", + &description.Session{Medias: []*description.Media{media0}}) + require.NoError(t, err) + defer source.Close() + + for i := 0; i < 4; i++ { + err = source.WritePacketRTP(media0, &rtp.Packet{ + Header: rtp.Header{ + Version: 2, + Marker: true, + PayloadType: 96, + SequenceNumber: 1123 + uint16(i), + Timestamp: 45343 + 90000*uint32(i), + SSRC: 563423, + }, + Payload: []byte{5}, + }) + require.NoError(t, err) + } + + time.Sleep(500 * time.Millisecond) + }() + + byts, err := os.ReadFile(onRecordSegmentCreate) + require.NoError(t, err) + require.Equal(t, true, strings.HasPrefix(string(byts), recordDir)) + + byts, err = os.ReadFile(onRecordSegmentComplete) + require.NoError(t, err) + parts := strings.Split(string(byts[:len(byts)-1]), " ") + require.Equal(t, true, strings.HasPrefix(parts[0], recordDir)) + require.Equal(t, "3", parts[1]) +} + func TestPathMaxReaders(t *testing.T) { p, ok := newInstance("paths:\n" + " all_others:\n" + diff --git a/internal/record/agent.go b/internal/record/agent.go index 7ecde7db..f7189f67 100644 --- a/internal/record/agent.go +++ b/internal/record/agent.go @@ -8,6 +8,12 @@ import ( "github.com/bluenviron/mediamtx/internal/stream" ) +// OnSegmentCreateFunc is the prototype of the function passed as OnSegmentCreate +type OnSegmentCreateFunc = func(path string) + +// OnSegmentCompleteFunc is the prototype of the function passed as OnSegmentComplete +type OnSegmentCompleteFunc = func(path string, duration time.Duration) + // Agent writes recordings to disk. type Agent struct { WriteQueueSize int @@ -17,8 +23,8 @@ type Agent struct { SegmentDuration time.Duration PathName string Stream *stream.Stream - OnSegmentCreate OnSegmentFunc - OnSegmentComplete OnSegmentFunc + OnSegmentCreate OnSegmentCreateFunc + OnSegmentComplete OnSegmentCompleteFunc Parent logger.Writer restartPause time.Duration @@ -36,7 +42,7 @@ func (w *Agent) Initialize() { } } if w.OnSegmentComplete == nil { - w.OnSegmentComplete = func(string) { + w.OnSegmentComplete = func(string, time.Duration) { } } if w.restartPause == 0 { diff --git a/internal/record/agent_instance.go b/internal/record/agent_instance.go index 8e722a8f..bce6e3d1 100644 --- a/internal/record/agent_instance.go +++ b/internal/record/agent_instance.go @@ -11,9 +11,6 @@ import ( "github.com/bluenviron/mediamtx/internal/logger" ) -// OnSegmentFunc is the prototype of the function passed as runOnSegmentStart / runOnSegmentComplete -type OnSegmentFunc = func(string) - type sample struct { *fmp4.PartSample dts time.Duration diff --git a/internal/record/agent_test.go b/internal/record/agent_test.go index 0f2a18c6..fb342e9e 100644 --- a/internal/record/agent_test.go +++ b/internal/record/agent_test.go @@ -68,12 +68,15 @@ func TestAgent(t *testing.T) { }, }} - writeToStream := func(stream *stream.Stream, ntp time.Time) { - for i := 0; i < 3; i++ { + writeToStream := func(stream *stream.Stream, startDTS time.Duration, startNTP time.Time) { + for i := 0; i < 2; i++ { + pts := startDTS + time.Duration(i)*100*time.Millisecond + ntp := startNTP.Add(time.Duration(i*60) * time.Second) + stream.WriteUnit(desc.Medias[0], desc.Medias[0].Formats[0], &unit.H264{ Base: unit.Base{ - PTS: (50 + time.Duration(i)) * time.Second, - NTP: ntp.Add(time.Duration(i) * 60 * time.Second), + PTS: pts, + NTP: ntp, }, AU: [][]byte{ test.FormatH264.SPS, @@ -84,7 +87,7 @@ func TestAgent(t *testing.T) { stream.WriteUnit(desc.Medias[1], desc.Medias[1].Formats[0], &unit.H265{ Base: unit.Base{ - PTS: (50 + time.Duration(i)) * time.Second, + PTS: pts, }, AU: [][]byte{ test.FormatH265.VPS, @@ -96,21 +99,21 @@ func TestAgent(t *testing.T) { stream.WriteUnit(desc.Medias[2], desc.Medias[2].Formats[0], &unit.MPEG4Audio{ Base: unit.Base{ - PTS: (50 + time.Duration(i)) * time.Second, + PTS: pts, }, AUs: [][]byte{{1, 2, 3, 4}}, }) stream.WriteUnit(desc.Medias[3], desc.Medias[3].Formats[0], &unit.G711{ Base: unit.Base{ - PTS: (50 + time.Duration(i)) * time.Second, + PTS: pts, }, Samples: []byte{1, 2, 3, 4}, }) stream.WriteUnit(desc.Medias[4], desc.Medias[4].Formats[0], &unit.LPCM{ Base: unit.Base{ - PTS: (50 + time.Duration(i)) * time.Second, + PTS: pts, }, Samples: []byte{1, 2, 3, 4}, }) @@ -144,6 +147,15 @@ func TestAgent(t *testing.T) { f = conf.RecordFormatMPEGTS } + var ext string + if ca == "fmp4" { + ext = "mp4" + } else { + ext = "ts" + } + + n := 0 + w := &Agent{ WriteQueueSize: 1024, PathFormat: recordPath, @@ -152,10 +164,30 @@ func TestAgent(t *testing.T) { SegmentDuration: 1 * time.Second, PathName: "mypath", Stream: stream, - OnSegmentCreate: func(_ string) { + OnSegmentCreate: func(segPath string) { + switch n { + case 0: + require.Equal(t, filepath.Join(dir, "mypath", "2008-05-20_22-15-25-000000."+ext), segPath) + case 1: + require.Equal(t, filepath.Join(dir, "mypath", "2008-05-20_22-16-25-000000."+ext), segPath) + default: + require.Equal(t, filepath.Join(dir, "mypath", "2010-05-20_22-15-25-000000."+ext), segPath) + } segCreated <- struct{}{} }, - OnSegmentComplete: func(_ string) { + OnSegmentComplete: func(segPath string, du time.Duration) { + switch n { + case 0: + require.Equal(t, filepath.Join(dir, "mypath", "2008-05-20_22-15-25-000000."+ext), segPath) + require.Equal(t, 2*time.Second, du) + case 1: + require.Equal(t, filepath.Join(dir, "mypath", "2008-05-20_22-16-25-000000."+ext), segPath) + require.Equal(t, 100*time.Millisecond, du) + default: + require.Equal(t, filepath.Join(dir, "mypath", "2010-05-20_22-15-25-000000."+ext), segPath) + require.Equal(t, 100*time.Millisecond, du) + } + n++ segDone <- struct{}{} }, Parent: test.NilLogger, @@ -163,7 +195,13 @@ func TestAgent(t *testing.T) { } w.Initialize() - writeToStream(stream, time.Date(2008, 0o5, 20, 22, 15, 25, 0, time.UTC)) + writeToStream(stream, + 50*time.Second, + time.Date(2008, 0o5, 20, 22, 15, 25, 0, time.UTC)) + + writeToStream(stream, + 52*time.Second, + time.Date(2008, 0o5, 20, 22, 16, 25, 0, time.UTC)) // simulate a write error stream.WriteUnit(desc.Medias[0], desc.Medias[0].Formats[0], &unit.H264{ @@ -180,74 +218,68 @@ func TestAgent(t *testing.T) { <-segDone } - var ext string if ca == "fmp4" { - ext = "mp4" - } else { - ext = "ts" - } + var init fmp4.Init - if ca == "fmp4" { func() { f, err2 := os.Open(filepath.Join(dir, "mypath", "2008-05-20_22-15-25-000000."+ext)) require.NoError(t, err2) defer f.Close() - var init fmp4.Init err2 = init.Unmarshal(f) require.NoError(t, err2) + }() - require.Equal(t, fmp4.Init{ - Tracks: []*fmp4.InitTrack{ - { - ID: 1, - TimeScale: 90000, - Codec: &fmp4.CodecH264{ - SPS: test.FormatH264.SPS, - PPS: test.FormatH264.PPS, - }, + require.Equal(t, fmp4.Init{ + Tracks: []*fmp4.InitTrack{ + { + ID: 1, + TimeScale: 90000, + Codec: &fmp4.CodecH264{ + SPS: test.FormatH264.SPS, + PPS: test.FormatH264.PPS, }, - { - ID: 2, - TimeScale: 90000, - Codec: &fmp4.CodecH265{ - VPS: test.FormatH265.VPS, - SPS: test.FormatH265.SPS, - PPS: test.FormatH265.PPS, - }, + }, + { + ID: 2, + TimeScale: 90000, + Codec: &fmp4.CodecH265{ + VPS: test.FormatH265.VPS, + SPS: test.FormatH265.SPS, + PPS: test.FormatH265.PPS, }, - { - ID: 3, - TimeScale: 44100, - Codec: &fmp4.CodecMPEG4Audio{ - Config: mpeg4audio.Config{ - Type: 2, - SampleRate: 44100, - ChannelCount: 2, - }, - }, - }, - { - ID: 4, - TimeScale: 8000, - Codec: &fmp4.CodecLPCM{ - BitDepth: 16, - SampleRate: 8000, - ChannelCount: 1, - }, - }, - { - ID: 5, - TimeScale: 44100, - Codec: &fmp4.CodecLPCM{ - BitDepth: 16, + }, + { + ID: 3, + TimeScale: 44100, + Codec: &fmp4.CodecMPEG4Audio{ + Config: mpeg4audio.Config{ + Type: 2, SampleRate: 44100, ChannelCount: 2, }, }, }, - }, init) - }() + { + ID: 4, + TimeScale: 8000, + Codec: &fmp4.CodecLPCM{ + BitDepth: 16, + SampleRate: 8000, + ChannelCount: 1, + }, + }, + { + ID: 5, + TimeScale: 44100, + Codec: &fmp4.CodecLPCM{ + BitDepth: 16, + SampleRate: 44100, + ChannelCount: 2, + }, + }, + }, + }, init) _, err = os.Stat(filepath.Join(dir, "mypath", "2008-05-20_22-16-25-000000."+ext)) require.NoError(t, err) @@ -261,16 +293,18 @@ func TestAgent(t *testing.T) { time.Sleep(50 * time.Millisecond) - writeToStream(stream, time.Date(2010, 0o5, 20, 22, 15, 25, 0, time.UTC)) + writeToStream(stream, + 300*time.Second, + time.Date(2010, 0o5, 20, 22, 15, 25, 0, time.UTC)) time.Sleep(50 * time.Millisecond) w.Close() - _, err = os.Stat(filepath.Join(dir, "mypath", "2010-05-20_22-15-25-000000."+ext)) - require.NoError(t, err) + <-segCreated + <-segDone - _, err = os.Stat(filepath.Join(dir, "mypath", "2010-05-20_22-16-25-000000."+ext)) + _, err = os.Stat(filepath.Join(dir, "mypath", "2010-05-20_22-15-25-000000."+ext)) require.NoError(t, err) }) } diff --git a/internal/record/format_fmp4.go b/internal/record/format_fmp4.go index a8a0f8cb..5d67b065 100644 --- a/internal/record/format_fmp4.go +++ b/internal/record/format_fmp4.go @@ -191,7 +191,7 @@ func (f *formatFMP4) initialize() { return err } - return track.record(&sample{ + return track.write(&sample{ PartSample: sampl, dts: tunit.PTS, ntp: tunit.NTP, @@ -261,7 +261,7 @@ func (f *formatFMP4) initialize() { firstReceived = true } - return track.record(&sample{ + return track.write(&sample{ PartSample: &fmp4.PartSample{ IsNonSyncSample: !randomAccess, Payload: tunit.Frame, @@ -364,7 +364,7 @@ func (f *formatFMP4) initialize() { return err } - return track.record(&sample{ + return track.write(&sample{ PartSample: sampl, dts: dts, ntp: tunit.NTP, @@ -435,7 +435,7 @@ func (f *formatFMP4) initialize() { return err } - return track.record(&sample{ + return track.write(&sample{ PartSample: sampl, dts: dts, ntp: tunit.NTP, @@ -494,7 +494,7 @@ func (f *formatFMP4) initialize() { } lastPTS = tunit.PTS - return track.record(&sample{ + return track.write(&sample{ PartSample: &fmp4.PartSample{ Payload: tunit.Frame, IsNonSyncSample: !randomAccess, @@ -547,7 +547,7 @@ func (f *formatFMP4) initialize() { } lastPTS = tunit.PTS - return track.record(&sample{ + return track.write(&sample{ PartSample: &fmp4.PartSample{ Payload: tunit.Frame, IsNonSyncSample: !randomAccess, @@ -583,7 +583,7 @@ func (f *formatFMP4) initialize() { updateCodecs() } - return track.record(&sample{ + return track.write(&sample{ PartSample: &fmp4.PartSample{ Payload: tunit.Frame, }, @@ -607,7 +607,7 @@ func (f *formatFMP4) initialize() { var dt time.Duration for _, packet := range tunit.Packets { - err := track.record(&sample{ + err := track.write(&sample{ PartSample: &fmp4.PartSample{ Payload: packet, }, @@ -642,7 +642,7 @@ func (f *formatFMP4) initialize() { dt := time.Duration(i) * mpeg4audio.SamplesPerAccessUnit * time.Second / sampleRate - err := track.record(&sample{ + err := track.write(&sample{ PartSample: &fmp4.PartSample{ Payload: au, }, @@ -688,7 +688,7 @@ func (f *formatFMP4) initialize() { updateCodecs() } - err = track.record(&sample{ + err = track.write(&sample{ PartSample: &fmp4.PartSample{ Payload: frame, }, @@ -756,7 +756,7 @@ func (f *formatFMP4) initialize() { dt := time.Duration(i) * time.Duration(ac3.SamplesPerFrame) * time.Second / time.Duration(codec.SampleRate) - err = track.record(&sample{ + err = track.write(&sample{ PartSample: &fmp4.PartSample{ Payload: frame, }, @@ -796,7 +796,7 @@ func (f *formatFMP4) initialize() { out = g711.DecodeAlaw(tunit.Samples) } - return track.record(&sample{ + return track.write(&sample{ PartSample: &fmp4.PartSample{ Payload: out, }, @@ -820,7 +820,7 @@ func (f *formatFMP4) initialize() { return nil } - return track.record(&sample{ + return track.write(&sample{ PartSample: &fmp4.PartSample{ Payload: tunit.Samples, }, @@ -838,6 +838,12 @@ func (f *formatFMP4) initialize() { func (f *formatFMP4) close() { if f.currentSegment != nil { + for _, track := range f.tracks { + if track.nextSample != nil && track.nextSample.dts > f.currentSegment.lastDTS { + f.currentSegment.lastDTS = track.nextSample.dts + } + } + f.currentSegment.close() //nolint:errcheck } } diff --git a/internal/record/format_fmp4_part.go b/internal/record/format_fmp4_part.go index 1f1968fe..a960c9a7 100644 --- a/internal/record/format_fmp4_part.go +++ b/internal/record/format_fmp4_part.go @@ -81,7 +81,7 @@ func (p *formatFMP4Part) close() error { return writePart(p.s.fi, p.sequenceNumber, p.partTracks) } -func (p *formatFMP4Part) record(track *formatFMP4Track, sample *sample) error { +func (p *formatFMP4Part) write(track *formatFMP4Track, sample *sample) error { partTrack, ok := p.partTracks[track] if !ok { partTrack = &fmp4.PartTrack{ diff --git a/internal/record/format_fmp4_segment.go b/internal/record/format_fmp4_segment.go index f0695c00..7920fcb3 100644 --- a/internal/record/format_fmp4_segment.go +++ b/internal/record/format_fmp4_segment.go @@ -39,9 +39,11 @@ type formatFMP4Segment struct { path string fi *os.File curPart *formatFMP4Part + lastDTS time.Duration } func (s *formatFMP4Segment) initialize() { + s.lastDTS = s.startDTS } func (s *formatFMP4Segment) close() error { @@ -59,14 +61,17 @@ func (s *formatFMP4Segment) close() error { } if err2 == nil { - s.f.a.agent.OnSegmentComplete(s.path) + duration := s.lastDTS - s.startDTS + s.f.a.agent.OnSegmentComplete(s.path, duration) } } return err } -func (s *formatFMP4Segment) record(track *formatFMP4Track, sample *sample) error { +func (s *formatFMP4Segment) write(track *formatFMP4Track, sample *sample) error { + s.lastDTS = sample.dts + if s.curPart == nil { s.curPart = &formatFMP4Part{ s: s, @@ -92,5 +97,5 @@ func (s *formatFMP4Segment) record(track *formatFMP4Track, sample *sample) error s.f.nextSequenceNumber++ } - return s.curPart.record(track, sample) + return s.curPart.write(track, sample) } diff --git a/internal/record/format_fmp4_track.go b/internal/record/format_fmp4_track.go index 1ff214f2..fd7a075e 100644 --- a/internal/record/format_fmp4_track.go +++ b/internal/record/format_fmp4_track.go @@ -11,7 +11,7 @@ type formatFMP4Track struct { nextSample *sample } -func (t *formatFMP4Track) record(sample *sample) error { +func (t *formatFMP4Track) write(sample *sample) error { // wait the first video sample before setting hasVideo if t.initTrack.Codec.IsVideo() { t.f.hasVideo = true @@ -35,7 +35,7 @@ func (t *formatFMP4Track) record(sample *sample) error { return nil } - err := t.f.currentSegment.record(t, sample) + err := t.f.currentSegment.write(t, sample) if err != nil { return err } @@ -43,6 +43,7 @@ func (t *formatFMP4Track) record(sample *sample) error { if (!t.f.hasVideo || t.initTrack.Codec.IsVideo()) && !t.nextSample.IsNonSyncSample && (t.nextSample.dts-t.f.currentSegment.startDTS) >= t.f.a.agent.SegmentDuration { + t.f.currentSegment.lastDTS = t.nextSample.dts err := t.f.currentSegment.close() if err != nil { return err diff --git a/internal/record/format_mpegts.go b/internal/record/format_mpegts.go index 60f778cf..d7d4e652 100644 --- a/internal/record/format_mpegts.go +++ b/internal/record/format_mpegts.go @@ -66,7 +66,7 @@ func (f *formatMPEGTS) initialize() { for _, media := range f.a.agent.Stream.Desc().Medias { for _, forma := range media.Formats { switch forma := forma.(type) { - case *rtspformat.H265: + case *rtspformat.H265: //nolint:dupl track := addTrack(forma, &mpegts.CodecH265{}) var dtsExtractor *h265.DTSExtractor @@ -91,10 +91,18 @@ func (f *formatMPEGTS) initialize() { return err } - return f.recordH26x(track, tunit.PTS, dts, tunit.NTP, randomAccess, tunit.AU) + return f.write( + dts, + tunit.NTP, + true, + randomAccess, + func() error { + return f.mw.WriteH26x(track, durationGoToMPEGTS(tunit.PTS), durationGoToMPEGTS(dts), randomAccess, tunit.AU) + }, + ) }) - case *rtspformat.H264: + case *rtspformat.H264: //nolint:dupl track := addTrack(forma, &mpegts.CodecH264{}) var dtsExtractor *h264.DTSExtractor @@ -105,10 +113,10 @@ func (f *formatMPEGTS) initialize() { return nil } - idrPresent := h264.IDRPresent(tunit.AU) + randomAccess := h264.IDRPresent(tunit.AU) if dtsExtractor == nil { - if !idrPresent { + if !randomAccess { return nil } dtsExtractor = h264.NewDTSExtractor() @@ -119,7 +127,15 @@ func (f *formatMPEGTS) initialize() { return err } - return f.recordH26x(track, tunit.PTS, dts, tunit.NTP, idrPresent, tunit.AU) + return f.write( + dts, + tunit.NTP, + true, + randomAccess, + func() error { + return f.mw.WriteH26x(track, durationGoToMPEGTS(tunit.PTS), durationGoToMPEGTS(dts), randomAccess, tunit.AU) + }, + ) }) case *rtspformat.MPEG4Video: @@ -141,15 +157,17 @@ func (f *formatMPEGTS) initialize() { } lastPTS = tunit.PTS - f.hasVideo = true randomAccess := bytes.Contains(tunit.Frame, []byte{0, 0, 1, byte(mpeg4video.GroupOfVOPStartCode)}) - err := f.setupSegment(tunit.PTS, tunit.NTP, true, randomAccess) - if err != nil { - return err - } - - return f.mw.WriteMPEG4Video(track, durationGoToMPEGTS(tunit.PTS), tunit.Frame) + return f.write( + tunit.PTS, + tunit.NTP, + true, + randomAccess, + func() error { + return f.mw.WriteMPEG4Video(track, durationGoToMPEGTS(tunit.PTS), tunit.Frame) + }, + ) }) case *rtspformat.MPEG1Video: @@ -171,15 +189,17 @@ func (f *formatMPEGTS) initialize() { } lastPTS = tunit.PTS - f.hasVideo = true randomAccess := bytes.Contains(tunit.Frame, []byte{0, 0, 1, 0xB8}) - err := f.setupSegment(tunit.PTS, tunit.NTP, true, randomAccess) - if err != nil { - return err - } - - return f.mw.WriteMPEG1Video(track, durationGoToMPEGTS(tunit.PTS), tunit.Frame) + return f.write( + tunit.PTS, + tunit.NTP, + true, + randomAccess, + func() error { + return f.mw.WriteMPEG1Video(track, durationGoToMPEGTS(tunit.PTS), tunit.Frame) + }, + ) }) case *rtspformat.Opus: @@ -193,12 +213,15 @@ func (f *formatMPEGTS) initialize() { return nil } - err := f.setupSegment(tunit.PTS, tunit.NTP, false, true) - if err != nil { - return err - } - - return f.mw.WriteOpus(track, durationGoToMPEGTS(tunit.PTS), tunit.Packets) + return f.write( + tunit.PTS, + tunit.NTP, + false, + true, + func() error { + return f.mw.WriteOpus(track, durationGoToMPEGTS(tunit.PTS), tunit.Packets) + }, + ) }) case *rtspformat.MPEG4Audio: @@ -212,12 +235,15 @@ func (f *formatMPEGTS) initialize() { return nil } - err := f.setupSegment(tunit.PTS, tunit.NTP, false, true) - if err != nil { - return err - } - - return f.mw.WriteMPEG4Audio(track, durationGoToMPEGTS(tunit.PTS), tunit.AUs) + return f.write( + tunit.PTS, + tunit.NTP, + false, + true, + func() error { + return f.mw.WriteMPEG4Audio(track, durationGoToMPEGTS(tunit.PTS), tunit.AUs) + }, + ) }) case *rtspformat.MPEG1Audio: @@ -229,12 +255,15 @@ func (f *formatMPEGTS) initialize() { return nil } - err := f.setupSegment(tunit.PTS, tunit.NTP, false, true) - if err != nil { - return err - } - - return f.mw.WriteMPEG1Audio(track, durationGoToMPEGTS(tunit.PTS), tunit.Frames) + return f.write( + tunit.PTS, + tunit.NTP, + false, + true, + func() error { + return f.mw.WriteMPEG1Audio(track, durationGoToMPEGTS(tunit.PTS), tunit.Frames) + }, + ) }) case *rtspformat.AC3: @@ -248,17 +277,25 @@ func (f *formatMPEGTS) initialize() { return nil } - for i, frame := range tunit.Frames { - framePTS := tunit.PTS + time.Duration(i)*ac3.SamplesPerFrame* - time.Second/sampleRate + return f.write( + tunit.PTS, + tunit.NTP, + false, + true, + func() error { + for i, frame := range tunit.Frames { + framePTS := tunit.PTS + time.Duration(i)*ac3.SamplesPerFrame* + time.Second/sampleRate - err := f.mw.WriteAC3(track, durationGoToMPEGTS(framePTS), frame) - if err != nil { - return err - } - } + err := f.mw.WriteAC3(track, durationGoToMPEGTS(framePTS), frame) + if err != nil { + return err + } + } - return nil + return nil + }, + ) }) } } @@ -278,12 +315,17 @@ func (f *formatMPEGTS) close() { } } -func (f *formatMPEGTS) setupSegment( +func (f *formatMPEGTS) write( dts time.Duration, ntp time.Time, isVideo bool, randomAccess bool, + writeCB func() error, ) error { + if isVideo { + f.hasVideo = true + } + switch { case f.currentSegment == nil: f.currentSegment = &formatMPEGTSSegment{ @@ -295,6 +337,7 @@ func (f *formatMPEGTS) setupSegment( case (!f.hasVideo || isVideo) && randomAccess && (dts-f.currentSegment.startDTS) >= f.a.agent.SegmentDuration: + f.currentSegment.lastDTS = dts err := f.currentSegment.close() if err != nil { return err @@ -316,23 +359,7 @@ func (f *formatMPEGTS) setupSegment( f.currentSegment.lastFlush = dts } - return nil -} - -func (f *formatMPEGTS) recordH26x( - track *mpegts.Track, - pts time.Duration, - dts time.Duration, - ntp time.Time, - randomAccess bool, - au [][]byte, -) error { - f.hasVideo = true - - err := f.setupSegment(dts, ntp, true, randomAccess) - if err != nil { - return err - } - - return f.mw.WriteH26x(track, durationGoToMPEGTS(pts), durationGoToMPEGTS(dts), randomAccess, au) + f.currentSegment.lastDTS = dts + + return writeCB() } diff --git a/internal/record/format_mpegts_segment.go b/internal/record/format_mpegts_segment.go index 06e16883..754c93aa 100644 --- a/internal/record/format_mpegts_segment.go +++ b/internal/record/format_mpegts_segment.go @@ -13,13 +13,15 @@ type formatMPEGTSSegment struct { startDTS time.Duration startNTP time.Time - lastFlush time.Duration path string fi *os.File + lastFlush time.Duration + lastDTS time.Duration } func (s *formatMPEGTSSegment) initialize() { s.lastFlush = s.startDTS + s.lastDTS = s.startDTS s.f.dw.setTarget(s) } @@ -34,7 +36,8 @@ func (s *formatMPEGTSSegment) close() error { } if err2 == nil { - s.f.a.agent.OnSegmentComplete(s.path) + duration := s.lastDTS - s.startDTS + s.f.a.agent.OnSegmentComplete(s.path, duration) } } diff --git a/mediamtx.yml b/mediamtx.yml index 29468381..e3929d9e 100644 --- a/mediamtx.yml +++ b/mediamtx.yml @@ -675,6 +675,7 @@ pathDefaults: # * G1, G2, ...: regular expression groups, if path name is # a regular expression. # * MTX_SEGMENT_PATH: segment file path + # * MTX_SEGMENT_DURATION: segment duration runOnRecordSegmentComplete: ###############################################