1
0
Fork 0
forked from External/mediamtx

add runOnRecordSegmentCreate hook (#2451) (#2503)

This commit is contained in:
Alessandro Ros 2023-10-14 16:48:02 +02:00 committed by GitHub
parent 38a2a127f5
commit 4ec12a6c8b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 103 additions and 56 deletions

View file

@ -1321,71 +1321,80 @@ paths:
`runOnReady` allows to run a command when a stream is ready to be read: `runOnReady` allows to run a command when a stream is ready to be read:
```yml ```yml
paths: pathDefaults:
mypath: # This is terminated with SIGINT when the stream is not ready anymore.
# This is terminated with SIGINT when the stream is not ready anymore. # The following environment variables are available:
# The following environment variables are available: # * MTX_PATH: path name
# * MTX_PATH: path name # * MTX_SOURCE_TYPE: source type
# * MTX_SOURCE_TYPE: source type # * MTX_SOURCE_ID: source ID
# * MTX_SOURCE_ID: source ID # * RTSP_PORT: RTSP server port
# * RTSP_PORT: RTSP server port # * G1, G2, ...: regular expression groups, if path name is
# * G1, G2, ...: regular expression groups, if path name is # a regular expression.
# a regular expression. runOnReady: curl http://my-custom-server/webhook?path=$MTX_PATH&source_type=$MTX_SOURCE_TYPE&source_id=$MTX_SOURCE_ID
runOnReady: curl http://my-custom-server/webhook?path=$MTX_PATH&source_type=$MTX_SOURCE_TYPE&source_id=$MTX_SOURCE_ID # Restart the command if it exits.
# Restart the command if it exits. runOnReadyRestart: no
runOnReadyRestart: no
``` ```
`runOnNotReady` allows to run a command when a stream is not available anymore: `runOnNotReady` allows to run a command when a stream is not available anymore:
```yml ```yml
paths: pathDefaults:
mypath: # Environment variables are the same of runOnReady.
# Environment variables are the same of runOnReady. runOnNotReady: curl http://my-custom-server/webhook?path=$MTX_PATH&source_type=$MTX_SOURCE_TYPE&source_id=$MTX_SOURCE_ID
runOnNotReady: curl http://my-custom-server/webhook?path=$MTX_PATH&source_type=$MTX_SOURCE_TYPE&source_id=$MTX_SOURCE_ID
``` ```
`runOnRead` allows to run a command when a client starts reading: `runOnRead` allows to run a command when a client starts reading:
```yml ```yml
paths: pathDefaults:
mypath: # This is terminated with SIGINT when a client stops reading.
# This is terminated with SIGINT when a client stops reading. # The following environment variables are available:
# The following environment variables are available: # * MTX_PATH: path name
# * MTX_PATH: path name # * MTX_READER_TYPE: reader type
# * MTX_READER_TYPE: reader type # * MTX_READER_ID: reader ID
# * MTX_READER_ID: reader ID # * RTSP_PORT: RTSP server port
# * RTSP_PORT: RTSP server port # * G1, G2, ...: regular expression groups, if path name is
# * G1, G2, ...: regular expression groups, if path name is # a regular expression.
# a regular expression. runOnRead: curl http://my-custom-server/webhook?path=$MTX_PATH&reader_type=$MTX_READER_TYPE&reader_id=$MTX_READER_ID
runOnRead: curl http://my-custom-server/webhook?path=$MTX_PATH&reader_type=$MTX_READER_TYPE&reader_id=$MTX_READER_ID # Restart the command if it exits.
# Restart the command if it exits. runOnReadRestart: no
runOnReadRestart: no
``` ```
`runOnUnread` allows to run a command when a client stops reading: `runOnUnread` allows to run a command when a client stops reading:
```yml ```yml
paths: pathDefaults:
mypath: # Command to run when a client stops reading.
# Command to run when a client stops reading. # Environment variables are the same of runOnRead.
# Environment variables are the same of runOnRead. runOnUnread: curl http://my-custom-server/webhook?path=$MTX_PATH&reader_type=$MTX_READER_TYPE&reader_id=$MTX_READER_ID
runOnUnread: curl http://my-custom-server/webhook?path=$MTX_PATH&reader_type=$MTX_READER_TYPE&reader_id=$MTX_READER_ID ```
`runOnRecordSegmentCreate` allows to run a command when a recording segment is created:
```yml
pathDefaults:
# Command to run when a recording segment is created.
# The following environment variables are available:
# * MTX_PATH: path name
# * RTSP_PORT: RTSP server port
# * G1, G2, ...: regular expression groups, if path name is
# a regular expression.
# * MTX_SEGMENT_PATH: segment file path
runOnRecordSegmentCreate: curl http://my-custom-server/webhook?path=$MTX_PATH&segment_path=$MTX_SEGMENT_PATH
``` ```
`runOnRecordSegmentComplete` allows to run a command when a recording segment is complete: `runOnRecordSegmentComplete` allows to run a command when a recording segment is complete:
```yml ```yml
paths: pathDefaults:
mypath: # Command to run when a recording segment is complete.
# Command to run when a recording segment is complete. # The following environment variables are available:
# The following environment variables are available: # * MTX_PATH: path name
# * MTX_PATH: path name # * RTSP_PORT: RTSP server port
# * RTSP_PORT: RTSP server port # * G1, G2, ...: regular expression groups, if path name is
# * G1, G2, ...: regular expression groups, if path name is # a regular expression.
# a regular expression. # * MTX_SEGMENT_PATH: segment file path
# * MTX_SEGMENT_PATH: segment file path runOnRecordSegmentComplete: curl http://my-custom-server/webhook?path=$MTX_PATH&segment_path=$MTX_SEGMENT_PATH
runOnRecordSegmentComplete: curl http://my-custom-server/webhook?path=$MTX_PATH&segment_path=$MTX_SEGMENT_PATH
``` ```
### API ### API

View file

@ -347,6 +347,8 @@ components:
type: boolean type: boolean
runOnUnread: runOnUnread:
type: string type: string
runOnRecordSegmentCreate:
type: string
runOnRecordSegmentComplete: runOnRecordSegmentComplete:
type: string type: string

View file

@ -138,6 +138,7 @@ type Path struct {
RunOnRead string `json:"runOnRead"` RunOnRead string `json:"runOnRead"`
RunOnReadRestart bool `json:"runOnReadRestart"` RunOnReadRestart bool `json:"runOnReadRestart"`
RunOnUnread string `json:"runOnUnread"` RunOnUnread string `json:"runOnUnread"`
RunOnRecordSegmentCreate string `json:"runOnRecordSegmentCreate"`
RunOnRecordSegmentComplete string `json:"runOnRecordSegmentComplete"` RunOnRecordSegmentComplete string `json:"runOnRecordSegmentComplete"`
} }

View file

@ -957,6 +957,20 @@ func (pa *path) startRecording() {
time.Duration(pa.conf.RecordSegmentDuration), time.Duration(pa.conf.RecordSegmentDuration),
pa.name, pa.name,
pa.stream, pa.stream,
func(segmentPath string) {
if pa.conf.RunOnRecordSegmentCreate != "" {
env := pa.externalCmdEnv()
env["MTX_SEGMENT_PATH"] = segmentPath
pa.Log(logger.Info, "runOnRecordSegmentCreate command launched")
externalcmd.NewCmd(
pa.externalCmdPool,
pa.conf.RunOnRecordSegmentCreate,
false,
env,
nil)
}
},
func(segmentPath string) { func(segmentPath string) {
if pa.conf.RunOnRecordSegmentComplete != "" { if pa.conf.RunOnRecordSegmentComplete != "" {
env := pa.externalCmdEnv() env := pa.externalCmdEnv()

View file

@ -26,6 +26,9 @@ import (
"github.com/bluenviron/mediamtx/internal/unit" "github.com/bluenviron/mediamtx/internal/unit"
) )
// OnSegmentFunc is the prototype of the function passed as runOnSegmentStart / runOnSegmentComplete
type OnSegmentFunc = func(string)
func durationGoToMp4(v time.Duration, timeScale uint32) uint64 { func durationGoToMp4(v time.Duration, timeScale uint32) uint64 {
timeScale64 := uint64(timeScale) timeScale64 := uint64(timeScale)
secs := v / time.Second secs := v / time.Second
@ -111,7 +114,8 @@ type Agent struct {
partDuration time.Duration partDuration time.Duration
segmentDuration time.Duration segmentDuration time.Duration
stream *stream.Stream stream *stream.Stream
onSegmentComplete func(string) onSegmentCreate OnSegmentFunc
onSegmentComplete OnSegmentFunc
parent logger.Writer parent logger.Writer
ctx context.Context ctx context.Context
@ -125,7 +129,7 @@ type Agent struct {
done chan struct{} done chan struct{}
} }
// NewAgent allocates a nAgent. // NewAgent allocates an Agent.
func NewAgent( func NewAgent(
writeQueueSize int, writeQueueSize int,
recordPath string, recordPath string,
@ -133,16 +137,13 @@ func NewAgent(
segmentDuration time.Duration, segmentDuration time.Duration,
pathName string, pathName string,
stream *stream.Stream, stream *stream.Stream,
onSegmentComplete func(string), onSegmentCreate OnSegmentFunc,
onSegmentComplete OnSegmentFunc,
parent logger.Writer, parent logger.Writer,
) *Agent { ) *Agent {
recordPath = strings.ReplaceAll(recordPath, "%path", pathName) recordPath = strings.ReplaceAll(recordPath, "%path", pathName)
recordPath += ".mp4" recordPath += ".mp4"
if onSegmentComplete == nil {
onSegmentComplete = func(_ string) {}
}
ctx, ctxCancel := context.WithCancel(context.Background()) ctx, ctxCancel := context.WithCancel(context.Background())
r := &Agent{ r := &Agent{
@ -150,6 +151,7 @@ func NewAgent(
partDuration: partDuration, partDuration: partDuration,
segmentDuration: segmentDuration, segmentDuration: segmentDuration,
stream: stream, stream: stream,
onSegmentCreate: onSegmentCreate,
onSegmentComplete: onSegmentComplete, onSegmentComplete: onSegmentComplete,
parent: parent, parent: parent,
ctx: ctx, ctx: ctx,

View file

@ -77,6 +77,7 @@ func TestAgent(t *testing.T) {
recordPath := filepath.Join(dir, "%path/%Y-%m-%d_%H-%M-%S-%f") recordPath := filepath.Join(dir, "%path/%Y-%m-%d_%H-%M-%S-%f")
segCreated := make(chan struct{}, 2)
segDone := make(chan struct{}, 2) segDone := make(chan struct{}, 2)
a := NewAgent( a := NewAgent(
@ -86,6 +87,9 @@ func TestAgent(t *testing.T) {
1*time.Second, 1*time.Second,
"mypath", "mypath",
stream, stream,
func(fpath string) {
segCreated <- struct{}{}
},
func(fpath string) { func(fpath string) {
segDone <- struct{}{} segDone <- struct{}{}
}, },
@ -145,8 +149,11 @@ func TestAgent(t *testing.T) {
}) })
} }
<-segDone for i := 0; i < 2; i++ {
<-segDone <-segCreated
<-segDone
}
a.Close() a.Close()
_, err = os.Stat(filepath.Join(dir, "mypath", "2008-05-20_22-15-25-000125.mp4")) _, err = os.Stat(filepath.Join(dir, "mypath", "2008-05-20_22-15-25-000125.mp4"))

View file

@ -60,7 +60,7 @@ func newPart(
func (p *part) close() error { func (p *part) close() error {
if p.s.f == nil { if p.s.f == nil {
p.s.fpath = encodeRecordPath(&recordPathParams{time: timeNow()}, p.s.r.path) p.s.fpath = encodeRecordPath(&recordPathParams{time: timeNow()}, p.s.r.path)
p.s.r.Log(logger.Debug, "opening segment %s", p.s.fpath) p.s.r.Log(logger.Debug, "creating segment %s", p.s.fpath)
err := os.MkdirAll(filepath.Dir(p.s.fpath), 0o755) err := os.MkdirAll(filepath.Dir(p.s.fpath), 0o755)
if err != nil { if err != nil {
@ -72,6 +72,8 @@ func (p *part) close() error {
return err return err
} }
p.s.r.onSegmentCreate(p.s.fpath)
err = writeInit(f, p.s.r.tracks) err = writeInit(f, p.s.r.tracks)
if err != nil { if err != nil {
f.Close() f.Close()

View file

@ -518,6 +518,15 @@ pathDefaults:
# Environment variables are the same of runOnRead. # Environment variables are the same of runOnRead.
runOnUnread: runOnUnread:
# Command to run when a recording segment is created.
# The following environment variables are available:
# * MTX_PATH: path name
# * RTSP_PORT: RTSP server port
# * G1, G2, ...: regular expression groups, if path name is
# a regular expression.
# * MTX_SEGMENT_PATH: segment file path
runOnRecordSegmentCreate:
# Command to run when a recording segment is complete. # Command to run when a recording segment is complete.
# The following environment variables are available: # The following environment variables are available:
# * MTX_PATH: path name # * MTX_PATH: path name
@ -531,7 +540,8 @@ pathDefaults:
# Path settings # Path settings
# Settings in "paths" are applied to specific paths, and the map key # Settings in "paths" are applied to specific paths, and the map key
# is the name of the path. Any setting in "pathDefaults" can be overridden. # is the name of the path.
# Any setting in "pathDefaults" can be overridden here.
# It's possible to use regular expressions by using a tilde as prefix, # It's possible to use regular expressions by using a tilde as prefix,
# for example "~^(test1|test2)$" will match both "test1" and "test2", # for example "~^(test1|test2)$" will match both "test1" and "test2",
# for example "~^prefix" will match all paths that start with "prefix". # for example "~^prefix" will match all paths that start with "prefix".