1
0
Fork 0
forked from External/mediamtx

expose MTX_SEGMENT_DURATION to runOnRecordSegmentComplete (#3440) (#2983) (#3456)

* improve tests

* add duration to OnSegmentComplete

* expose MTX_SEGMENT_DURATION to runOnRecordSegmentComplete

* add tests
This commit is contained in:
Alessandro Ros 2024-06-11 18:30:40 +02:00 committed by GitHub
parent 3a2594d610
commit 3eabe6ac54
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
13 changed files with 334 additions and 190 deletions

View file

@ -1669,6 +1669,7 @@ pathDefaults:
# * G1, G2, ...: regular expression groups, if path name is # * G1, G2, ...: regular expression groups, if path name is
# a regular expression. # a regular expression.
# * MTX_SEGMENT_PATH: segment file path # * MTX_SEGMENT_PATH: segment file path
# * MTX_SEGMENT_DURATION: segment duration
runOnRecordSegmentComplete: curl http://my-custom-server/webhook?path=$MTX_PATH&segment_path=$MTX_SEGMENT_PATH runOnRecordSegmentComplete: curl http://my-custom-server/webhook?path=$MTX_PATH&segment_path=$MTX_SEGMENT_PATH
``` ```

View file

@ -806,10 +806,11 @@ func (pa *path) startRecording() {
nil) nil)
} }
}, },
OnSegmentComplete: func(segmentPath string) { OnSegmentComplete: func(segmentPath string, segmentDuration time.Duration) {
if pa.conf.RunOnRecordSegmentComplete != "" { if pa.conf.RunOnRecordSegmentComplete != "" {
env := pa.ExternalCmdEnv() env := pa.ExternalCmdEnv()
env["MTX_SEGMENT_PATH"] = segmentPath env["MTX_SEGMENT_PATH"] = segmentPath
env["MTX_SEGMENT_DURATION"] = strconv.FormatFloat(segmentDuration.Seconds(), 'f', -1, 64)
pa.Log(logger.Info, "runOnRecordSegmentComplete command launched") pa.Log(logger.Info, "runOnRecordSegmentComplete command launched")
externalcmd.NewCmd( externalcmd.NewCmd(

View file

@ -105,12 +105,12 @@ func (sh *testServer) OnPlay(ctx *gortsplib.ServerHandlerOnPlayCtx) (*base.Respo
var _ defs.Path = &path{} var _ defs.Path = &path{}
func TestPathRunOnDemand(t *testing.T) { func TestPathRunOnDemand(t *testing.T) {
onDemandFile := filepath.Join(os.TempDir(), "ondemand") onDemand := filepath.Join(os.TempDir(), "on_demand")
onUnDemandFile := filepath.Join(os.TempDir(), "onundemand") onUnDemand := filepath.Join(os.TempDir(), "on_undemand")
srcFile := filepath.Join(os.TempDir(), "ondemand.go") srcFile := filepath.Join(os.TempDir(), "ondemand.go")
err := os.WriteFile(srcFile, err := os.WriteFile(srcFile,
[]byte(strings.ReplaceAll(runOnDemandSampleScript, "ON_DEMAND_FILE", onDemandFile)), 0o644) []byte(strings.ReplaceAll(runOnDemandSampleScript, "ON_DEMAND_FILE", onDemand)), 0o644)
require.NoError(t, err) require.NoError(t, err)
execFile := filepath.Join(os.TempDir(), "ondemand_cmd") execFile := filepath.Join(os.TempDir(), "ondemand_cmd")
@ -125,8 +125,8 @@ func TestPathRunOnDemand(t *testing.T) {
for _, ca := range []string{"describe", "setup", "describe and setup"} { for _, ca := range []string{"describe", "setup", "describe and setup"} {
t.Run(ca, func(t *testing.T) { t.Run(ca, func(t *testing.T) {
defer os.Remove(onDemandFile) defer os.Remove(onDemand)
defer os.Remove(onUnDemandFile) defer os.Remove(onUnDemand)
p1, ok := newInstance(fmt.Sprintf("rtmp: no\n"+ p1, ok := newInstance(fmt.Sprintf("rtmp: no\n"+
"hls: no\n"+ "hls: no\n"+
@ -135,7 +135,7 @@ func TestPathRunOnDemand(t *testing.T) {
" '~^(on)demand$':\n"+ " '~^(on)demand$':\n"+
" runOnDemand: %s\n"+ " runOnDemand: %s\n"+
" runOnDemandCloseAfter: 1s\n"+ " runOnDemandCloseAfter: 1s\n"+
" runOnUnDemand: touch %s\n", execFile, onUnDemandFile)) " runOnUnDemand: touch %s\n", execFile, onUnDemand))
require.Equal(t, true, ok) require.Equal(t, true, ok)
defer p1.Close() defer p1.Close()
@ -204,14 +204,14 @@ func TestPathRunOnDemand(t *testing.T) {
}() }()
for { for {
_, err := os.Stat(onUnDemandFile) _, err := os.Stat(onUnDemand)
if err == nil { if err == nil {
break break
} }
time.Sleep(100 * time.Millisecond) time.Sleep(100 * time.Millisecond)
} }
_, err := os.Stat(onDemandFile) _, err := os.Stat(onDemand)
require.NoError(t, err) require.NoError(t, err)
}) })
} }
@ -220,11 +220,11 @@ func TestPathRunOnDemand(t *testing.T) {
func TestPathRunOnConnect(t *testing.T) { func TestPathRunOnConnect(t *testing.T) {
for _, ca := range []string{"rtsp", "rtmp", "srt"} { for _, ca := range []string{"rtsp", "rtmp", "srt"} {
t.Run(ca, func(t *testing.T) { t.Run(ca, func(t *testing.T) {
onConnectFile := filepath.Join(os.TempDir(), "onconnect") onConnect := filepath.Join(os.TempDir(), "on_connect")
defer os.Remove(onConnectFile) defer os.Remove(onConnect)
onDisconnectFile := filepath.Join(os.TempDir(), "ondisconnect") onDisconnect := filepath.Join(os.TempDir(), "on_disconnect")
defer os.Remove(onDisconnectFile) defer os.Remove(onDisconnect)
func() { func() {
p, ok := newInstance(fmt.Sprintf( p, ok := newInstance(fmt.Sprintf(
@ -232,7 +232,7 @@ func TestPathRunOnConnect(t *testing.T) {
" test:\n"+ " test:\n"+
"runOnConnect: touch %s\n"+ "runOnConnect: touch %s\n"+
"runOnDisconnect: touch %s\n", "runOnDisconnect: touch %s\n",
onConnectFile, onDisconnectFile)) onConnect, onDisconnect))
require.Equal(t, true, ok) require.Equal(t, true, ok)
defer p.Close() defer p.Close()
@ -273,21 +273,21 @@ func TestPathRunOnConnect(t *testing.T) {
time.Sleep(500 * time.Millisecond) time.Sleep(500 * time.Millisecond)
}() }()
_, err := os.Stat(onConnectFile) _, err := os.Stat(onConnect)
require.NoError(t, err) require.NoError(t, err)
_, err = os.Stat(onDisconnectFile) _, err = os.Stat(onDisconnect)
require.NoError(t, err) require.NoError(t, err)
}) })
} }
} }
func TestPathRunOnReady(t *testing.T) { func TestPathRunOnReady(t *testing.T) {
onReadyFile := filepath.Join(os.TempDir(), "onready") onReady := filepath.Join(os.TempDir(), "on_ready")
defer os.Remove(onReadyFile) defer os.Remove(onReady)
onNotReadyFile := filepath.Join(os.TempDir(), "onunready") onNotReady := filepath.Join(os.TempDir(), "on_unready")
defer os.Remove(onNotReadyFile) defer os.Remove(onNotReady)
func() { func() {
p, ok := newInstance(fmt.Sprintf("rtmp: no\n"+ p, ok := newInstance(fmt.Sprintf("rtmp: no\n"+
@ -297,7 +297,7 @@ func TestPathRunOnReady(t *testing.T) {
" test:\n"+ " test:\n"+
" runOnReady: sh -c 'echo \"$MTX_PATH $MTX_QUERY\" > %s'\n"+ " runOnReady: sh -c 'echo \"$MTX_PATH $MTX_QUERY\" > %s'\n"+
" runOnNotReady: sh -c 'echo \"$MTX_PATH $MTX_QUERY\" > %s'\n", " runOnNotReady: sh -c 'echo \"$MTX_PATH $MTX_QUERY\" > %s'\n",
onReadyFile, onNotReadyFile)) onReady, onNotReady))
require.Equal(t, true, ok) require.Equal(t, true, ok)
defer p.Close() defer p.Close()
@ -312,11 +312,11 @@ func TestPathRunOnReady(t *testing.T) {
time.Sleep(500 * time.Millisecond) time.Sleep(500 * time.Millisecond)
}() }()
byts, err := os.ReadFile(onReadyFile) byts, err := os.ReadFile(onReady)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, "test query=value\n", string(byts)) require.Equal(t, "test query=value\n", string(byts))
byts, err = os.ReadFile(onNotReadyFile) byts, err = os.ReadFile(onNotReady)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, "test query=value\n", string(byts)) require.Equal(t, "test query=value\n", string(byts))
} }
@ -324,11 +324,11 @@ func TestPathRunOnReady(t *testing.T) {
func TestPathRunOnRead(t *testing.T) { func TestPathRunOnRead(t *testing.T) {
for _, ca := range []string{"rtsp", "rtmp", "srt", "webrtc"} { for _, ca := range []string{"rtsp", "rtmp", "srt", "webrtc"} {
t.Run(ca, func(t *testing.T) { t.Run(ca, func(t *testing.T) {
onReadFile := filepath.Join(os.TempDir(), "onread") onRead := filepath.Join(os.TempDir(), "on_read")
defer os.Remove(onReadFile) defer os.Remove(onRead)
onUnreadFile := filepath.Join(os.TempDir(), "onunread") onUnread := filepath.Join(os.TempDir(), "on_unread")
defer os.Remove(onUnreadFile) defer os.Remove(onUnread)
func() { func() {
p, ok := newInstance(fmt.Sprintf( p, ok := newInstance(fmt.Sprintf(
@ -336,7 +336,7 @@ func TestPathRunOnRead(t *testing.T) {
" test:\n"+ " test:\n"+
" runOnRead: sh -c 'echo \"$MTX_PATH $MTX_QUERY\" > %s'\n"+ " runOnRead: sh -c 'echo \"$MTX_PATH $MTX_QUERY\" > %s'\n"+
" runOnUnread: sh -c 'echo \"$MTX_PATH $MTX_QUERY\" > %s'\n", " runOnUnread: sh -c 'echo \"$MTX_PATH $MTX_QUERY\" > %s'\n",
onReadFile, onUnreadFile)) onRead, onUnread))
require.Equal(t, true, ok) require.Equal(t, true, ok)
defer p.Close() defer p.Close()
@ -449,17 +449,79 @@ func TestPathRunOnRead(t *testing.T) {
time.Sleep(500 * time.Millisecond) time.Sleep(500 * time.Millisecond)
}() }()
byts, err := os.ReadFile(onReadFile) byts, err := os.ReadFile(onRead)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, "test query=value\n", string(byts)) require.Equal(t, "test query=value\n", string(byts))
byts, err = os.ReadFile(onUnreadFile) byts, err = os.ReadFile(onUnread)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, "test query=value\n", string(byts)) require.Equal(t, "test query=value\n", string(byts))
}) })
} }
} }
func TestPathRunOnRecordSegment(t *testing.T) {
onRecordSegmentCreate := filepath.Join(os.TempDir(), "on_record_segment_create")
defer os.Remove(onRecordSegmentCreate)
onRecordSegmentComplete := filepath.Join(os.TempDir(), "on_record_segment_complete")
defer os.Remove(onRecordSegmentComplete)
recordDir, err := os.MkdirTemp("", "rtsp-path-record")
require.NoError(t, err)
defer os.RemoveAll(recordDir)
func() {
p, ok := newInstance("record: yes\n" +
"recordPath: " + filepath.Join(recordDir, "%path/%Y-%m-%d_%H-%M-%S-%f") + "\n" +
"paths:\n" +
" test:\n" +
" runOnRecordSegmentCreate: " +
"sh -c 'echo \"$MTX_SEGMENT_PATH\" > " + onRecordSegmentCreate + "'\n" +
" runOnRecordSegmentComplete: " +
"sh -c 'echo \"$MTX_SEGMENT_PATH $MTX_SEGMENT_DURATION\" > " + onRecordSegmentComplete + "'\n")
require.Equal(t, true, ok)
defer p.Close()
media0 := test.UniqueMediaH264()
source := gortsplib.Client{}
err = source.StartRecording(
"rtsp://localhost:8554/test",
&description.Session{Medias: []*description.Media{media0}})
require.NoError(t, err)
defer source.Close()
for i := 0; i < 4; i++ {
err = source.WritePacketRTP(media0, &rtp.Packet{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 1123 + uint16(i),
Timestamp: 45343 + 90000*uint32(i),
SSRC: 563423,
},
Payload: []byte{5},
})
require.NoError(t, err)
}
time.Sleep(500 * time.Millisecond)
}()
byts, err := os.ReadFile(onRecordSegmentCreate)
require.NoError(t, err)
require.Equal(t, true, strings.HasPrefix(string(byts), recordDir))
byts, err = os.ReadFile(onRecordSegmentComplete)
require.NoError(t, err)
parts := strings.Split(string(byts[:len(byts)-1]), " ")
require.Equal(t, true, strings.HasPrefix(parts[0], recordDir))
require.Equal(t, "3", parts[1])
}
func TestPathMaxReaders(t *testing.T) { func TestPathMaxReaders(t *testing.T) {
p, ok := newInstance("paths:\n" + p, ok := newInstance("paths:\n" +
" all_others:\n" + " all_others:\n" +

View file

@ -8,6 +8,12 @@ import (
"github.com/bluenviron/mediamtx/internal/stream" "github.com/bluenviron/mediamtx/internal/stream"
) )
// OnSegmentCreateFunc is the prototype of the function passed as OnSegmentCreate
type OnSegmentCreateFunc = func(path string)
// OnSegmentCompleteFunc is the prototype of the function passed as OnSegmentComplete
type OnSegmentCompleteFunc = func(path string, duration time.Duration)
// Agent writes recordings to disk. // Agent writes recordings to disk.
type Agent struct { type Agent struct {
WriteQueueSize int WriteQueueSize int
@ -17,8 +23,8 @@ type Agent struct {
SegmentDuration time.Duration SegmentDuration time.Duration
PathName string PathName string
Stream *stream.Stream Stream *stream.Stream
OnSegmentCreate OnSegmentFunc OnSegmentCreate OnSegmentCreateFunc
OnSegmentComplete OnSegmentFunc OnSegmentComplete OnSegmentCompleteFunc
Parent logger.Writer Parent logger.Writer
restartPause time.Duration restartPause time.Duration
@ -36,7 +42,7 @@ func (w *Agent) Initialize() {
} }
} }
if w.OnSegmentComplete == nil { if w.OnSegmentComplete == nil {
w.OnSegmentComplete = func(string) { w.OnSegmentComplete = func(string, time.Duration) {
} }
} }
if w.restartPause == 0 { if w.restartPause == 0 {

View file

@ -11,9 +11,6 @@ import (
"github.com/bluenviron/mediamtx/internal/logger" "github.com/bluenviron/mediamtx/internal/logger"
) )
// OnSegmentFunc is the prototype of the function passed as runOnSegmentStart / runOnSegmentComplete
type OnSegmentFunc = func(string)
type sample struct { type sample struct {
*fmp4.PartSample *fmp4.PartSample
dts time.Duration dts time.Duration

View file

@ -68,12 +68,15 @@ func TestAgent(t *testing.T) {
}, },
}} }}
writeToStream := func(stream *stream.Stream, ntp time.Time) { writeToStream := func(stream *stream.Stream, startDTS time.Duration, startNTP time.Time) {
for i := 0; i < 3; i++ { for i := 0; i < 2; i++ {
pts := startDTS + time.Duration(i)*100*time.Millisecond
ntp := startNTP.Add(time.Duration(i*60) * time.Second)
stream.WriteUnit(desc.Medias[0], desc.Medias[0].Formats[0], &unit.H264{ stream.WriteUnit(desc.Medias[0], desc.Medias[0].Formats[0], &unit.H264{
Base: unit.Base{ Base: unit.Base{
PTS: (50 + time.Duration(i)) * time.Second, PTS: pts,
NTP: ntp.Add(time.Duration(i) * 60 * time.Second), NTP: ntp,
}, },
AU: [][]byte{ AU: [][]byte{
test.FormatH264.SPS, test.FormatH264.SPS,
@ -84,7 +87,7 @@ func TestAgent(t *testing.T) {
stream.WriteUnit(desc.Medias[1], desc.Medias[1].Formats[0], &unit.H265{ stream.WriteUnit(desc.Medias[1], desc.Medias[1].Formats[0], &unit.H265{
Base: unit.Base{ Base: unit.Base{
PTS: (50 + time.Duration(i)) * time.Second, PTS: pts,
}, },
AU: [][]byte{ AU: [][]byte{
test.FormatH265.VPS, test.FormatH265.VPS,
@ -96,21 +99,21 @@ func TestAgent(t *testing.T) {
stream.WriteUnit(desc.Medias[2], desc.Medias[2].Formats[0], &unit.MPEG4Audio{ stream.WriteUnit(desc.Medias[2], desc.Medias[2].Formats[0], &unit.MPEG4Audio{
Base: unit.Base{ Base: unit.Base{
PTS: (50 + time.Duration(i)) * time.Second, PTS: pts,
}, },
AUs: [][]byte{{1, 2, 3, 4}}, AUs: [][]byte{{1, 2, 3, 4}},
}) })
stream.WriteUnit(desc.Medias[3], desc.Medias[3].Formats[0], &unit.G711{ stream.WriteUnit(desc.Medias[3], desc.Medias[3].Formats[0], &unit.G711{
Base: unit.Base{ Base: unit.Base{
PTS: (50 + time.Duration(i)) * time.Second, PTS: pts,
}, },
Samples: []byte{1, 2, 3, 4}, Samples: []byte{1, 2, 3, 4},
}) })
stream.WriteUnit(desc.Medias[4], desc.Medias[4].Formats[0], &unit.LPCM{ stream.WriteUnit(desc.Medias[4], desc.Medias[4].Formats[0], &unit.LPCM{
Base: unit.Base{ Base: unit.Base{
PTS: (50 + time.Duration(i)) * time.Second, PTS: pts,
}, },
Samples: []byte{1, 2, 3, 4}, Samples: []byte{1, 2, 3, 4},
}) })
@ -144,6 +147,15 @@ func TestAgent(t *testing.T) {
f = conf.RecordFormatMPEGTS f = conf.RecordFormatMPEGTS
} }
var ext string
if ca == "fmp4" {
ext = "mp4"
} else {
ext = "ts"
}
n := 0
w := &Agent{ w := &Agent{
WriteQueueSize: 1024, WriteQueueSize: 1024,
PathFormat: recordPath, PathFormat: recordPath,
@ -152,10 +164,30 @@ func TestAgent(t *testing.T) {
SegmentDuration: 1 * time.Second, SegmentDuration: 1 * time.Second,
PathName: "mypath", PathName: "mypath",
Stream: stream, Stream: stream,
OnSegmentCreate: func(_ string) { OnSegmentCreate: func(segPath string) {
switch n {
case 0:
require.Equal(t, filepath.Join(dir, "mypath", "2008-05-20_22-15-25-000000."+ext), segPath)
case 1:
require.Equal(t, filepath.Join(dir, "mypath", "2008-05-20_22-16-25-000000."+ext), segPath)
default:
require.Equal(t, filepath.Join(dir, "mypath", "2010-05-20_22-15-25-000000."+ext), segPath)
}
segCreated <- struct{}{} segCreated <- struct{}{}
}, },
OnSegmentComplete: func(_ string) { OnSegmentComplete: func(segPath string, du time.Duration) {
switch n {
case 0:
require.Equal(t, filepath.Join(dir, "mypath", "2008-05-20_22-15-25-000000."+ext), segPath)
require.Equal(t, 2*time.Second, du)
case 1:
require.Equal(t, filepath.Join(dir, "mypath", "2008-05-20_22-16-25-000000."+ext), segPath)
require.Equal(t, 100*time.Millisecond, du)
default:
require.Equal(t, filepath.Join(dir, "mypath", "2010-05-20_22-15-25-000000."+ext), segPath)
require.Equal(t, 100*time.Millisecond, du)
}
n++
segDone <- struct{}{} segDone <- struct{}{}
}, },
Parent: test.NilLogger, Parent: test.NilLogger,
@ -163,7 +195,13 @@ func TestAgent(t *testing.T) {
} }
w.Initialize() w.Initialize()
writeToStream(stream, time.Date(2008, 0o5, 20, 22, 15, 25, 0, time.UTC)) writeToStream(stream,
50*time.Second,
time.Date(2008, 0o5, 20, 22, 15, 25, 0, time.UTC))
writeToStream(stream,
52*time.Second,
time.Date(2008, 0o5, 20, 22, 16, 25, 0, time.UTC))
// simulate a write error // simulate a write error
stream.WriteUnit(desc.Medias[0], desc.Medias[0].Formats[0], &unit.H264{ stream.WriteUnit(desc.Medias[0], desc.Medias[0].Formats[0], &unit.H264{
@ -180,74 +218,68 @@ func TestAgent(t *testing.T) {
<-segDone <-segDone
} }
var ext string
if ca == "fmp4" { if ca == "fmp4" {
ext = "mp4" var init fmp4.Init
} else {
ext = "ts"
}
if ca == "fmp4" {
func() { func() {
f, err2 := os.Open(filepath.Join(dir, "mypath", "2008-05-20_22-15-25-000000."+ext)) f, err2 := os.Open(filepath.Join(dir, "mypath", "2008-05-20_22-15-25-000000."+ext))
require.NoError(t, err2) require.NoError(t, err2)
defer f.Close() defer f.Close()
var init fmp4.Init
err2 = init.Unmarshal(f) err2 = init.Unmarshal(f)
require.NoError(t, err2) require.NoError(t, err2)
}()
require.Equal(t, fmp4.Init{ require.Equal(t, fmp4.Init{
Tracks: []*fmp4.InitTrack{ Tracks: []*fmp4.InitTrack{
{ {
ID: 1, ID: 1,
TimeScale: 90000, TimeScale: 90000,
Codec: &fmp4.CodecH264{ Codec: &fmp4.CodecH264{
SPS: test.FormatH264.SPS, SPS: test.FormatH264.SPS,
PPS: test.FormatH264.PPS, PPS: test.FormatH264.PPS,
},
}, },
{ },
ID: 2, {
TimeScale: 90000, ID: 2,
Codec: &fmp4.CodecH265{ TimeScale: 90000,
VPS: test.FormatH265.VPS, Codec: &fmp4.CodecH265{
SPS: test.FormatH265.SPS, VPS: test.FormatH265.VPS,
PPS: test.FormatH265.PPS, SPS: test.FormatH265.SPS,
}, PPS: test.FormatH265.PPS,
}, },
{ },
ID: 3, {
TimeScale: 44100, ID: 3,
Codec: &fmp4.CodecMPEG4Audio{ TimeScale: 44100,
Config: mpeg4audio.Config{ Codec: &fmp4.CodecMPEG4Audio{
Type: 2, Config: mpeg4audio.Config{
SampleRate: 44100, Type: 2,
ChannelCount: 2,
},
},
},
{
ID: 4,
TimeScale: 8000,
Codec: &fmp4.CodecLPCM{
BitDepth: 16,
SampleRate: 8000,
ChannelCount: 1,
},
},
{
ID: 5,
TimeScale: 44100,
Codec: &fmp4.CodecLPCM{
BitDepth: 16,
SampleRate: 44100, SampleRate: 44100,
ChannelCount: 2, ChannelCount: 2,
}, },
}, },
}, },
}, init) {
}() ID: 4,
TimeScale: 8000,
Codec: &fmp4.CodecLPCM{
BitDepth: 16,
SampleRate: 8000,
ChannelCount: 1,
},
},
{
ID: 5,
TimeScale: 44100,
Codec: &fmp4.CodecLPCM{
BitDepth: 16,
SampleRate: 44100,
ChannelCount: 2,
},
},
},
}, init)
_, err = os.Stat(filepath.Join(dir, "mypath", "2008-05-20_22-16-25-000000."+ext)) _, err = os.Stat(filepath.Join(dir, "mypath", "2008-05-20_22-16-25-000000."+ext))
require.NoError(t, err) require.NoError(t, err)
@ -261,16 +293,18 @@ func TestAgent(t *testing.T) {
time.Sleep(50 * time.Millisecond) time.Sleep(50 * time.Millisecond)
writeToStream(stream, time.Date(2010, 0o5, 20, 22, 15, 25, 0, time.UTC)) writeToStream(stream,
300*time.Second,
time.Date(2010, 0o5, 20, 22, 15, 25, 0, time.UTC))
time.Sleep(50 * time.Millisecond) time.Sleep(50 * time.Millisecond)
w.Close() w.Close()
_, err = os.Stat(filepath.Join(dir, "mypath", "2010-05-20_22-15-25-000000."+ext)) <-segCreated
require.NoError(t, err) <-segDone
_, err = os.Stat(filepath.Join(dir, "mypath", "2010-05-20_22-16-25-000000."+ext)) _, err = os.Stat(filepath.Join(dir, "mypath", "2010-05-20_22-15-25-000000."+ext))
require.NoError(t, err) require.NoError(t, err)
}) })
} }

View file

@ -191,7 +191,7 @@ func (f *formatFMP4) initialize() {
return err return err
} }
return track.record(&sample{ return track.write(&sample{
PartSample: sampl, PartSample: sampl,
dts: tunit.PTS, dts: tunit.PTS,
ntp: tunit.NTP, ntp: tunit.NTP,
@ -261,7 +261,7 @@ func (f *formatFMP4) initialize() {
firstReceived = true firstReceived = true
} }
return track.record(&sample{ return track.write(&sample{
PartSample: &fmp4.PartSample{ PartSample: &fmp4.PartSample{
IsNonSyncSample: !randomAccess, IsNonSyncSample: !randomAccess,
Payload: tunit.Frame, Payload: tunit.Frame,
@ -364,7 +364,7 @@ func (f *formatFMP4) initialize() {
return err return err
} }
return track.record(&sample{ return track.write(&sample{
PartSample: sampl, PartSample: sampl,
dts: dts, dts: dts,
ntp: tunit.NTP, ntp: tunit.NTP,
@ -435,7 +435,7 @@ func (f *formatFMP4) initialize() {
return err return err
} }
return track.record(&sample{ return track.write(&sample{
PartSample: sampl, PartSample: sampl,
dts: dts, dts: dts,
ntp: tunit.NTP, ntp: tunit.NTP,
@ -494,7 +494,7 @@ func (f *formatFMP4) initialize() {
} }
lastPTS = tunit.PTS lastPTS = tunit.PTS
return track.record(&sample{ return track.write(&sample{
PartSample: &fmp4.PartSample{ PartSample: &fmp4.PartSample{
Payload: tunit.Frame, Payload: tunit.Frame,
IsNonSyncSample: !randomAccess, IsNonSyncSample: !randomAccess,
@ -547,7 +547,7 @@ func (f *formatFMP4) initialize() {
} }
lastPTS = tunit.PTS lastPTS = tunit.PTS
return track.record(&sample{ return track.write(&sample{
PartSample: &fmp4.PartSample{ PartSample: &fmp4.PartSample{
Payload: tunit.Frame, Payload: tunit.Frame,
IsNonSyncSample: !randomAccess, IsNonSyncSample: !randomAccess,
@ -583,7 +583,7 @@ func (f *formatFMP4) initialize() {
updateCodecs() updateCodecs()
} }
return track.record(&sample{ return track.write(&sample{
PartSample: &fmp4.PartSample{ PartSample: &fmp4.PartSample{
Payload: tunit.Frame, Payload: tunit.Frame,
}, },
@ -607,7 +607,7 @@ func (f *formatFMP4) initialize() {
var dt time.Duration var dt time.Duration
for _, packet := range tunit.Packets { for _, packet := range tunit.Packets {
err := track.record(&sample{ err := track.write(&sample{
PartSample: &fmp4.PartSample{ PartSample: &fmp4.PartSample{
Payload: packet, Payload: packet,
}, },
@ -642,7 +642,7 @@ func (f *formatFMP4) initialize() {
dt := time.Duration(i) * mpeg4audio.SamplesPerAccessUnit * dt := time.Duration(i) * mpeg4audio.SamplesPerAccessUnit *
time.Second / sampleRate time.Second / sampleRate
err := track.record(&sample{ err := track.write(&sample{
PartSample: &fmp4.PartSample{ PartSample: &fmp4.PartSample{
Payload: au, Payload: au,
}, },
@ -688,7 +688,7 @@ func (f *formatFMP4) initialize() {
updateCodecs() updateCodecs()
} }
err = track.record(&sample{ err = track.write(&sample{
PartSample: &fmp4.PartSample{ PartSample: &fmp4.PartSample{
Payload: frame, Payload: frame,
}, },
@ -756,7 +756,7 @@ func (f *formatFMP4) initialize() {
dt := time.Duration(i) * time.Duration(ac3.SamplesPerFrame) * dt := time.Duration(i) * time.Duration(ac3.SamplesPerFrame) *
time.Second / time.Duration(codec.SampleRate) time.Second / time.Duration(codec.SampleRate)
err = track.record(&sample{ err = track.write(&sample{
PartSample: &fmp4.PartSample{ PartSample: &fmp4.PartSample{
Payload: frame, Payload: frame,
}, },
@ -796,7 +796,7 @@ func (f *formatFMP4) initialize() {
out = g711.DecodeAlaw(tunit.Samples) out = g711.DecodeAlaw(tunit.Samples)
} }
return track.record(&sample{ return track.write(&sample{
PartSample: &fmp4.PartSample{ PartSample: &fmp4.PartSample{
Payload: out, Payload: out,
}, },
@ -820,7 +820,7 @@ func (f *formatFMP4) initialize() {
return nil return nil
} }
return track.record(&sample{ return track.write(&sample{
PartSample: &fmp4.PartSample{ PartSample: &fmp4.PartSample{
Payload: tunit.Samples, Payload: tunit.Samples,
}, },
@ -838,6 +838,12 @@ func (f *formatFMP4) initialize() {
func (f *formatFMP4) close() { func (f *formatFMP4) close() {
if f.currentSegment != nil { if f.currentSegment != nil {
for _, track := range f.tracks {
if track.nextSample != nil && track.nextSample.dts > f.currentSegment.lastDTS {
f.currentSegment.lastDTS = track.nextSample.dts
}
}
f.currentSegment.close() //nolint:errcheck f.currentSegment.close() //nolint:errcheck
} }
} }

View file

@ -81,7 +81,7 @@ func (p *formatFMP4Part) close() error {
return writePart(p.s.fi, p.sequenceNumber, p.partTracks) return writePart(p.s.fi, p.sequenceNumber, p.partTracks)
} }
func (p *formatFMP4Part) record(track *formatFMP4Track, sample *sample) error { func (p *formatFMP4Part) write(track *formatFMP4Track, sample *sample) error {
partTrack, ok := p.partTracks[track] partTrack, ok := p.partTracks[track]
if !ok { if !ok {
partTrack = &fmp4.PartTrack{ partTrack = &fmp4.PartTrack{

View file

@ -39,9 +39,11 @@ type formatFMP4Segment struct {
path string path string
fi *os.File fi *os.File
curPart *formatFMP4Part curPart *formatFMP4Part
lastDTS time.Duration
} }
func (s *formatFMP4Segment) initialize() { func (s *formatFMP4Segment) initialize() {
s.lastDTS = s.startDTS
} }
func (s *formatFMP4Segment) close() error { func (s *formatFMP4Segment) close() error {
@ -59,14 +61,17 @@ func (s *formatFMP4Segment) close() error {
} }
if err2 == nil { if err2 == nil {
s.f.a.agent.OnSegmentComplete(s.path) duration := s.lastDTS - s.startDTS
s.f.a.agent.OnSegmentComplete(s.path, duration)
} }
} }
return err return err
} }
func (s *formatFMP4Segment) record(track *formatFMP4Track, sample *sample) error { func (s *formatFMP4Segment) write(track *formatFMP4Track, sample *sample) error {
s.lastDTS = sample.dts
if s.curPart == nil { if s.curPart == nil {
s.curPart = &formatFMP4Part{ s.curPart = &formatFMP4Part{
s: s, s: s,
@ -92,5 +97,5 @@ func (s *formatFMP4Segment) record(track *formatFMP4Track, sample *sample) error
s.f.nextSequenceNumber++ s.f.nextSequenceNumber++
} }
return s.curPart.record(track, sample) return s.curPart.write(track, sample)
} }

View file

@ -11,7 +11,7 @@ type formatFMP4Track struct {
nextSample *sample nextSample *sample
} }
func (t *formatFMP4Track) record(sample *sample) error { func (t *formatFMP4Track) write(sample *sample) error {
// wait the first video sample before setting hasVideo // wait the first video sample before setting hasVideo
if t.initTrack.Codec.IsVideo() { if t.initTrack.Codec.IsVideo() {
t.f.hasVideo = true t.f.hasVideo = true
@ -35,7 +35,7 @@ func (t *formatFMP4Track) record(sample *sample) error {
return nil return nil
} }
err := t.f.currentSegment.record(t, sample) err := t.f.currentSegment.write(t, sample)
if err != nil { if err != nil {
return err return err
} }
@ -43,6 +43,7 @@ func (t *formatFMP4Track) record(sample *sample) error {
if (!t.f.hasVideo || t.initTrack.Codec.IsVideo()) && if (!t.f.hasVideo || t.initTrack.Codec.IsVideo()) &&
!t.nextSample.IsNonSyncSample && !t.nextSample.IsNonSyncSample &&
(t.nextSample.dts-t.f.currentSegment.startDTS) >= t.f.a.agent.SegmentDuration { (t.nextSample.dts-t.f.currentSegment.startDTS) >= t.f.a.agent.SegmentDuration {
t.f.currentSegment.lastDTS = t.nextSample.dts
err := t.f.currentSegment.close() err := t.f.currentSegment.close()
if err != nil { if err != nil {
return err return err

View file

@ -66,7 +66,7 @@ func (f *formatMPEGTS) initialize() {
for _, media := range f.a.agent.Stream.Desc().Medias { for _, media := range f.a.agent.Stream.Desc().Medias {
for _, forma := range media.Formats { for _, forma := range media.Formats {
switch forma := forma.(type) { switch forma := forma.(type) {
case *rtspformat.H265: case *rtspformat.H265: //nolint:dupl
track := addTrack(forma, &mpegts.CodecH265{}) track := addTrack(forma, &mpegts.CodecH265{})
var dtsExtractor *h265.DTSExtractor var dtsExtractor *h265.DTSExtractor
@ -91,10 +91,18 @@ func (f *formatMPEGTS) initialize() {
return err return err
} }
return f.recordH26x(track, tunit.PTS, dts, tunit.NTP, randomAccess, tunit.AU) return f.write(
dts,
tunit.NTP,
true,
randomAccess,
func() error {
return f.mw.WriteH26x(track, durationGoToMPEGTS(tunit.PTS), durationGoToMPEGTS(dts), randomAccess, tunit.AU)
},
)
}) })
case *rtspformat.H264: case *rtspformat.H264: //nolint:dupl
track := addTrack(forma, &mpegts.CodecH264{}) track := addTrack(forma, &mpegts.CodecH264{})
var dtsExtractor *h264.DTSExtractor var dtsExtractor *h264.DTSExtractor
@ -105,10 +113,10 @@ func (f *formatMPEGTS) initialize() {
return nil return nil
} }
idrPresent := h264.IDRPresent(tunit.AU) randomAccess := h264.IDRPresent(tunit.AU)
if dtsExtractor == nil { if dtsExtractor == nil {
if !idrPresent { if !randomAccess {
return nil return nil
} }
dtsExtractor = h264.NewDTSExtractor() dtsExtractor = h264.NewDTSExtractor()
@ -119,7 +127,15 @@ func (f *formatMPEGTS) initialize() {
return err return err
} }
return f.recordH26x(track, tunit.PTS, dts, tunit.NTP, idrPresent, tunit.AU) return f.write(
dts,
tunit.NTP,
true,
randomAccess,
func() error {
return f.mw.WriteH26x(track, durationGoToMPEGTS(tunit.PTS), durationGoToMPEGTS(dts), randomAccess, tunit.AU)
},
)
}) })
case *rtspformat.MPEG4Video: case *rtspformat.MPEG4Video:
@ -141,15 +157,17 @@ func (f *formatMPEGTS) initialize() {
} }
lastPTS = tunit.PTS lastPTS = tunit.PTS
f.hasVideo = true
randomAccess := bytes.Contains(tunit.Frame, []byte{0, 0, 1, byte(mpeg4video.GroupOfVOPStartCode)}) randomAccess := bytes.Contains(tunit.Frame, []byte{0, 0, 1, byte(mpeg4video.GroupOfVOPStartCode)})
err := f.setupSegment(tunit.PTS, tunit.NTP, true, randomAccess) return f.write(
if err != nil { tunit.PTS,
return err tunit.NTP,
} true,
randomAccess,
return f.mw.WriteMPEG4Video(track, durationGoToMPEGTS(tunit.PTS), tunit.Frame) func() error {
return f.mw.WriteMPEG4Video(track, durationGoToMPEGTS(tunit.PTS), tunit.Frame)
},
)
}) })
case *rtspformat.MPEG1Video: case *rtspformat.MPEG1Video:
@ -171,15 +189,17 @@ func (f *formatMPEGTS) initialize() {
} }
lastPTS = tunit.PTS lastPTS = tunit.PTS
f.hasVideo = true
randomAccess := bytes.Contains(tunit.Frame, []byte{0, 0, 1, 0xB8}) randomAccess := bytes.Contains(tunit.Frame, []byte{0, 0, 1, 0xB8})
err := f.setupSegment(tunit.PTS, tunit.NTP, true, randomAccess) return f.write(
if err != nil { tunit.PTS,
return err tunit.NTP,
} true,
randomAccess,
return f.mw.WriteMPEG1Video(track, durationGoToMPEGTS(tunit.PTS), tunit.Frame) func() error {
return f.mw.WriteMPEG1Video(track, durationGoToMPEGTS(tunit.PTS), tunit.Frame)
},
)
}) })
case *rtspformat.Opus: case *rtspformat.Opus:
@ -193,12 +213,15 @@ func (f *formatMPEGTS) initialize() {
return nil return nil
} }
err := f.setupSegment(tunit.PTS, tunit.NTP, false, true) return f.write(
if err != nil { tunit.PTS,
return err tunit.NTP,
} false,
true,
return f.mw.WriteOpus(track, durationGoToMPEGTS(tunit.PTS), tunit.Packets) func() error {
return f.mw.WriteOpus(track, durationGoToMPEGTS(tunit.PTS), tunit.Packets)
},
)
}) })
case *rtspformat.MPEG4Audio: case *rtspformat.MPEG4Audio:
@ -212,12 +235,15 @@ func (f *formatMPEGTS) initialize() {
return nil return nil
} }
err := f.setupSegment(tunit.PTS, tunit.NTP, false, true) return f.write(
if err != nil { tunit.PTS,
return err tunit.NTP,
} false,
true,
return f.mw.WriteMPEG4Audio(track, durationGoToMPEGTS(tunit.PTS), tunit.AUs) func() error {
return f.mw.WriteMPEG4Audio(track, durationGoToMPEGTS(tunit.PTS), tunit.AUs)
},
)
}) })
case *rtspformat.MPEG1Audio: case *rtspformat.MPEG1Audio:
@ -229,12 +255,15 @@ func (f *formatMPEGTS) initialize() {
return nil return nil
} }
err := f.setupSegment(tunit.PTS, tunit.NTP, false, true) return f.write(
if err != nil { tunit.PTS,
return err tunit.NTP,
} false,
true,
return f.mw.WriteMPEG1Audio(track, durationGoToMPEGTS(tunit.PTS), tunit.Frames) func() error {
return f.mw.WriteMPEG1Audio(track, durationGoToMPEGTS(tunit.PTS), tunit.Frames)
},
)
}) })
case *rtspformat.AC3: case *rtspformat.AC3:
@ -248,17 +277,25 @@ func (f *formatMPEGTS) initialize() {
return nil return nil
} }
for i, frame := range tunit.Frames { return f.write(
framePTS := tunit.PTS + time.Duration(i)*ac3.SamplesPerFrame* tunit.PTS,
time.Second/sampleRate tunit.NTP,
false,
true,
func() error {
for i, frame := range tunit.Frames {
framePTS := tunit.PTS + time.Duration(i)*ac3.SamplesPerFrame*
time.Second/sampleRate
err := f.mw.WriteAC3(track, durationGoToMPEGTS(framePTS), frame) err := f.mw.WriteAC3(track, durationGoToMPEGTS(framePTS), frame)
if err != nil { if err != nil {
return err return err
} }
} }
return nil return nil
},
)
}) })
} }
} }
@ -278,12 +315,17 @@ func (f *formatMPEGTS) close() {
} }
} }
func (f *formatMPEGTS) setupSegment( func (f *formatMPEGTS) write(
dts time.Duration, dts time.Duration,
ntp time.Time, ntp time.Time,
isVideo bool, isVideo bool,
randomAccess bool, randomAccess bool,
writeCB func() error,
) error { ) error {
if isVideo {
f.hasVideo = true
}
switch { switch {
case f.currentSegment == nil: case f.currentSegment == nil:
f.currentSegment = &formatMPEGTSSegment{ f.currentSegment = &formatMPEGTSSegment{
@ -295,6 +337,7 @@ func (f *formatMPEGTS) setupSegment(
case (!f.hasVideo || isVideo) && case (!f.hasVideo || isVideo) &&
randomAccess && randomAccess &&
(dts-f.currentSegment.startDTS) >= f.a.agent.SegmentDuration: (dts-f.currentSegment.startDTS) >= f.a.agent.SegmentDuration:
f.currentSegment.lastDTS = dts
err := f.currentSegment.close() err := f.currentSegment.close()
if err != nil { if err != nil {
return err return err
@ -316,23 +359,7 @@ func (f *formatMPEGTS) setupSegment(
f.currentSegment.lastFlush = dts f.currentSegment.lastFlush = dts
} }
return nil f.currentSegment.lastDTS = dts
}
return writeCB()
func (f *formatMPEGTS) recordH26x(
track *mpegts.Track,
pts time.Duration,
dts time.Duration,
ntp time.Time,
randomAccess bool,
au [][]byte,
) error {
f.hasVideo = true
err := f.setupSegment(dts, ntp, true, randomAccess)
if err != nil {
return err
}
return f.mw.WriteH26x(track, durationGoToMPEGTS(pts), durationGoToMPEGTS(dts), randomAccess, au)
} }

View file

@ -13,13 +13,15 @@ type formatMPEGTSSegment struct {
startDTS time.Duration startDTS time.Duration
startNTP time.Time startNTP time.Time
lastFlush time.Duration
path string path string
fi *os.File fi *os.File
lastFlush time.Duration
lastDTS time.Duration
} }
func (s *formatMPEGTSSegment) initialize() { func (s *formatMPEGTSSegment) initialize() {
s.lastFlush = s.startDTS s.lastFlush = s.startDTS
s.lastDTS = s.startDTS
s.f.dw.setTarget(s) s.f.dw.setTarget(s)
} }
@ -34,7 +36,8 @@ func (s *formatMPEGTSSegment) close() error {
} }
if err2 == nil { if err2 == nil {
s.f.a.agent.OnSegmentComplete(s.path) duration := s.lastDTS - s.startDTS
s.f.a.agent.OnSegmentComplete(s.path, duration)
} }
} }

View file

@ -675,6 +675,7 @@ pathDefaults:
# * G1, G2, ...: regular expression groups, if path name is # * G1, G2, ...: regular expression groups, if path name is
# a regular expression. # a regular expression.
# * MTX_SEGMENT_PATH: segment file path # * MTX_SEGMENT_PATH: segment file path
# * MTX_SEGMENT_DURATION: segment duration
runOnRecordSegmentComplete: runOnRecordSegmentComplete:
############################################### ###############################################