fix crash when recording a stream with unsupported tracks (#3978) (#3996)

* normalize variable names

* fix file name

* fix crash when recording a stream with unsupported tracks (#3978)
This commit is contained in:
Alessandro Ros 2024-11-30 11:23:41 +01:00 committed by GitHub
parent bdc051c6b7
commit b77df43536
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
11 changed files with 261 additions and 188 deletions

View file

@ -1,6 +1,6 @@
package recorder package recorder
type format interface { type format interface {
initialize() initialize() bool
close() close()
} }

View file

@ -93,7 +93,7 @@ func jpegExtractSize(image []byte) (int, int, error) {
} }
type formatFMP4 struct { type formatFMP4 struct {
ai *recorderInstance ri *recorderInstance
tracks []*formatFMP4Track tracks []*formatFMP4Track
hasVideo bool hasVideo bool
@ -101,7 +101,7 @@ type formatFMP4 struct {
nextSequenceNumber uint32 nextSequenceNumber uint32
} }
func (f *formatFMP4) initialize() { func (f *formatFMP4) initialize() bool {
nextID := 1 nextID := 1
var setuppedFormats []rtspformat.Format var setuppedFormats []rtspformat.Format
setuppedFormatsMap := make(map[rtspformat.Format]struct{}) setuppedFormatsMap := make(map[rtspformat.Format]struct{})
@ -135,7 +135,7 @@ func (f *formatFMP4) initialize() {
} }
} }
for _, media := range f.ai.agent.Stream.Desc().Medias { for _, media := range f.ri.rec.Stream.Desc().Medias {
for _, forma := range media.Formats { for _, forma := range media.Formats {
clockRate := forma.ClockRate() clockRate := forma.ClockRate()
@ -148,8 +148,8 @@ func (f *formatFMP4) initialize() {
firstReceived := false firstReceived := false
f.ai.agent.Stream.AddReader( f.ri.rec.Stream.AddReader(
f.ai, f.ri,
media, media,
forma, forma,
func(u unit.Unit) error { func(u unit.Unit) error {
@ -210,8 +210,8 @@ func (f *formatFMP4) initialize() {
firstReceived := false firstReceived := false
f.ai.agent.Stream.AddReader( f.ri.rec.Stream.AddReader(
f.ai, f.ri,
media, media,
forma, forma,
func(u unit.Unit) error { func(u unit.Unit) error {
@ -295,8 +295,8 @@ func (f *formatFMP4) initialize() {
var dtsExtractor *h265.DTSExtractor2 var dtsExtractor *h265.DTSExtractor2
f.ai.agent.Stream.AddReader( f.ri.rec.Stream.AddReader(
f.ai, f.ri,
media, media,
forma, forma,
func(u unit.Unit) error { func(u unit.Unit) error {
@ -377,8 +377,8 @@ func (f *formatFMP4) initialize() {
var dtsExtractor *h264.DTSExtractor2 var dtsExtractor *h264.DTSExtractor2
f.ai.agent.Stream.AddReader( f.ri.rec.Stream.AddReader(
f.ai, f.ri,
media, media,
forma, forma,
func(u unit.Unit) error { func(u unit.Unit) error {
@ -451,8 +451,8 @@ func (f *formatFMP4) initialize() {
firstReceived := false firstReceived := false
var lastPTS int64 var lastPTS int64
f.ai.agent.Stream.AddReader( f.ri.rec.Stream.AddReader(
f.ai, f.ri,
media, media,
forma, forma,
func(u unit.Unit) error { func(u unit.Unit) error {
@ -504,8 +504,8 @@ func (f *formatFMP4) initialize() {
firstReceived := false firstReceived := false
var lastPTS int64 var lastPTS int64
f.ai.agent.Stream.AddReader( f.ri.rec.Stream.AddReader(
f.ai, f.ri,
media, media,
forma, forma,
func(u unit.Unit) error { func(u unit.Unit) error {
@ -557,8 +557,8 @@ func (f *formatFMP4) initialize() {
parsed := false parsed := false
f.ai.agent.Stream.AddReader( f.ri.rec.Stream.AddReader(
f.ai, f.ri,
media, media,
forma, forma,
func(u unit.Unit) error { func(u unit.Unit) error {
@ -593,8 +593,8 @@ func (f *formatFMP4) initialize() {
} }
track := addTrack(forma, codec) track := addTrack(forma, codec)
f.ai.agent.Stream.AddReader( f.ri.rec.Stream.AddReader(
f.ai, f.ri,
media, media,
forma, forma,
func(u unit.Unit) error { func(u unit.Unit) error {
@ -631,8 +631,8 @@ func (f *formatFMP4) initialize() {
} }
track := addTrack(forma, codec) track := addTrack(forma, codec)
f.ai.agent.Stream.AddReader( f.ri.rec.Stream.AddReader(
f.ai, f.ri,
media, media,
forma, forma,
func(u unit.Unit) error { func(u unit.Unit) error {
@ -669,8 +669,8 @@ func (f *formatFMP4) initialize() {
parsed := false parsed := false
f.ai.agent.Stream.AddReader( f.ri.rec.Stream.AddReader(
f.ai, f.ri,
media, media,
forma, forma,
func(u unit.Unit) error { func(u unit.Unit) error {
@ -728,8 +728,8 @@ func (f *formatFMP4) initialize() {
parsed := false parsed := false
f.ai.agent.Stream.AddReader( f.ri.rec.Stream.AddReader(
f.ai, f.ri,
media, media,
forma, forma,
func(u unit.Unit) error { func(u unit.Unit) error {
@ -793,8 +793,8 @@ func (f *formatFMP4) initialize() {
} }
track := addTrack(forma, codec) track := addTrack(forma, codec)
f.ai.agent.Stream.AddReader( f.ri.rec.Stream.AddReader(
f.ai, f.ri,
media, media,
forma, forma,
func(u unit.Unit) error { func(u unit.Unit) error {
@ -828,8 +828,8 @@ func (f *formatFMP4) initialize() {
} }
track := addTrack(forma, codec) track := addTrack(forma, codec)
f.ai.agent.Stream.AddReader( f.ri.rec.Stream.AddReader(
f.ai, f.ri,
media, media,
forma, forma,
func(u unit.Unit) error { func(u unit.Unit) error {
@ -851,22 +851,24 @@ func (f *formatFMP4) initialize() {
} }
if len(setuppedFormats) == 0 { if len(setuppedFormats) == 0 {
f.ai.Log(logger.Warn, "no supported tracks found, skipping recording") f.ri.Log(logger.Warn, "no supported tracks found, skipping recording")
return return false
} }
n := 1 n := 1
for _, medi := range f.ai.agent.Stream.Desc().Medias { for _, medi := range f.ri.rec.Stream.Desc().Medias {
for _, forma := range medi.Formats { for _, forma := range medi.Formats {
if _, ok := setuppedFormatsMap[forma]; !ok { if _, ok := setuppedFormatsMap[forma]; !ok {
f.ai.Log(logger.Warn, "skipping track %d (%s)", n, forma.Codec()) f.ri.Log(logger.Warn, "skipping track %d (%s)", n, forma.Codec())
} }
n++ n++
} }
} }
f.ai.Log(logger.Info, "recording %s", f.ri.Log(logger.Info, "recording %s",
defs.FormatsInfo(setuppedFormats)) defs.FormatsInfo(setuppedFormats))
return true
} }
func (f *formatFMP4) close() { func (f *formatFMP4) close() {

View file

@ -55,8 +55,8 @@ func (p *formatFMP4Part) initialize() {
func (p *formatFMP4Part) close() error { func (p *formatFMP4Part) close() error {
if p.s.fi == nil { if p.s.fi == nil {
p.s.path = recordstore.Path{Start: p.s.startNTP}.Encode(p.s.f.ai.pathFormat) p.s.path = recordstore.Path{Start: p.s.startNTP}.Encode(p.s.f.ri.pathFormat)
p.s.f.ai.Log(logger.Debug, "creating segment %s", p.s.path) p.s.f.ri.Log(logger.Debug, "creating segment %s", p.s.path)
err := os.MkdirAll(filepath.Dir(p.s.path), 0o755) err := os.MkdirAll(filepath.Dir(p.s.path), 0o755)
if err != nil { if err != nil {
@ -68,7 +68,7 @@ func (p *formatFMP4Part) close() error {
return err return err
} }
p.s.f.ai.agent.OnSegmentCreate(p.s.path) p.s.f.ri.rec.OnSegmentCreate(p.s.path)
err = writeInit(fi, p.s.f.tracks) err = writeInit(fi, p.s.f.tracks)
if err != nil { if err != nil {

View file

@ -54,7 +54,7 @@ func (s *formatFMP4Segment) close() error {
} }
if s.fi != nil { if s.fi != nil {
s.f.ai.Log(logger.Debug, "closing segment %s", s.path) s.f.ri.Log(logger.Debug, "closing segment %s", s.path)
err2 := s.fi.Close() err2 := s.fi.Close()
if err == nil { if err == nil {
err = err2 err = err2
@ -62,7 +62,7 @@ func (s *formatFMP4Segment) close() error {
if err2 == nil { if err2 == nil {
duration := s.lastDTS - s.startDTS duration := s.lastDTS - s.startDTS
s.f.ai.agent.OnSegmentComplete(s.path, duration) s.f.ri.rec.OnSegmentComplete(s.path, duration)
} }
} }
@ -80,7 +80,7 @@ func (s *formatFMP4Segment) write(track *formatFMP4Track, sample *sample, dtsDur
} }
s.curPart.initialize() s.curPart.initialize()
s.f.nextSequenceNumber++ s.f.nextSequenceNumber++
} else if s.curPart.duration() >= s.f.ai.agent.PartDuration { } else if s.curPart.duration() >= s.f.ri.rec.PartDuration {
err := s.curPart.close() err := s.curPart.close()
s.curPart = nil s.curPart = nil

View file

@ -46,7 +46,7 @@ func (t *formatFMP4Track) write(sample *sample) error {
if (!t.f.hasVideo || t.initTrack.Codec.IsVideo()) && if (!t.f.hasVideo || t.initTrack.Codec.IsVideo()) &&
!t.nextSample.IsNonSyncSample && !t.nextSample.IsNonSyncSample &&
(nextDTSDuration-t.f.currentSegment.startDTS) >= t.f.ai.agent.SegmentDuration { (nextDTSDuration-t.f.currentSegment.startDTS) >= t.f.ri.rec.SegmentDuration {
t.f.currentSegment.lastDTS = nextDTSDuration t.f.currentSegment.lastDTS = nextDTSDuration
err := t.f.currentSegment.close() err := t.f.currentSegment.close()
if err != nil { if err != nil {

View file

@ -52,7 +52,7 @@ func (d *dynamicWriter) setTarget(w io.Writer) {
} }
type formatMPEGTS struct { type formatMPEGTS struct {
ai *recorderInstance ri *recorderInstance
dw *dynamicWriter dw *dynamicWriter
bw *bufio.Writer bw *bufio.Writer
@ -61,7 +61,7 @@ type formatMPEGTS struct {
currentSegment *formatMPEGTSSegment currentSegment *formatMPEGTSSegment
} }
func (f *formatMPEGTS) initialize() { func (f *formatMPEGTS) initialize() bool {
var tracks []*mpegts.Track var tracks []*mpegts.Track
var setuppedFormats []rtspformat.Format var setuppedFormats []rtspformat.Format
setuppedFormatsMap := make(map[rtspformat.Format]struct{}) setuppedFormatsMap := make(map[rtspformat.Format]struct{})
@ -77,7 +77,7 @@ func (f *formatMPEGTS) initialize() {
return track return track
} }
for _, media := range f.ai.agent.Stream.Desc().Medias { for _, media := range f.ri.rec.Stream.Desc().Medias {
for _, forma := range media.Formats { for _, forma := range media.Formats {
clockRate := forma.ClockRate() clockRate := forma.ClockRate()
@ -87,8 +87,8 @@ func (f *formatMPEGTS) initialize() {
var dtsExtractor *h265.DTSExtractor2 var dtsExtractor *h265.DTSExtractor2
f.ai.agent.Stream.AddReader( f.ri.rec.Stream.AddReader(
f.ai, f.ri,
media, media,
forma, forma,
func(u unit.Unit) error { func(u unit.Unit) error {
@ -132,8 +132,8 @@ func (f *formatMPEGTS) initialize() {
var dtsExtractor *h264.DTSExtractor2 var dtsExtractor *h264.DTSExtractor2
f.ai.agent.Stream.AddReader( f.ri.rec.Stream.AddReader(
f.ai, f.ri,
media, media,
forma, forma,
func(u unit.Unit) error { func(u unit.Unit) error {
@ -178,8 +178,8 @@ func (f *formatMPEGTS) initialize() {
firstReceived := false firstReceived := false
var lastPTS int64 var lastPTS int64
f.ai.agent.Stream.AddReader( f.ri.rec.Stream.AddReader(
f.ai, f.ri,
media, media,
forma, forma,
func(u unit.Unit) error { func(u unit.Unit) error {
@ -217,8 +217,8 @@ func (f *formatMPEGTS) initialize() {
firstReceived := false firstReceived := false
var lastPTS int64 var lastPTS int64
f.ai.agent.Stream.AddReader( f.ri.rec.Stream.AddReader(
f.ai, f.ri,
media, media,
forma, forma,
func(u unit.Unit) error { func(u unit.Unit) error {
@ -255,8 +255,8 @@ func (f *formatMPEGTS) initialize() {
ChannelCount: forma.ChannelCount, ChannelCount: forma.ChannelCount,
}) })
f.ai.agent.Stream.AddReader( f.ri.rec.Stream.AddReader(
f.ai, f.ri,
media, media,
forma, forma,
func(u unit.Unit) error { func(u unit.Unit) error {
@ -282,14 +282,14 @@ func (f *formatMPEGTS) initialize() {
case *rtspformat.MPEG4Audio: case *rtspformat.MPEG4Audio:
co := forma.GetConfig() co := forma.GetConfig()
if co == nil { if co == nil {
f.ai.Log(logger.Warn, "skipping MPEG-4 audio track: tracks without explicit configuration are not supported") f.ri.Log(logger.Warn, "skipping MPEG-4 audio track: tracks without explicit configuration are not supported")
} else { } else {
track := addTrack(forma, &mpegts.CodecMPEG4Audio{ track := addTrack(forma, &mpegts.CodecMPEG4Audio{
Config: *co, Config: *co,
}) })
f.ai.agent.Stream.AddReader( f.ri.rec.Stream.AddReader(
f.ai, f.ri,
media, media,
forma, forma,
func(u unit.Unit) error { func(u unit.Unit) error {
@ -316,8 +316,8 @@ func (f *formatMPEGTS) initialize() {
case *rtspformat.MPEG1Audio: case *rtspformat.MPEG1Audio:
track := addTrack(forma, &mpegts.CodecMPEG1Audio{}) track := addTrack(forma, &mpegts.CodecMPEG1Audio{})
f.ai.agent.Stream.AddReader( f.ri.rec.Stream.AddReader(
f.ai, f.ri,
media, media,
forma, forma,
func(u unit.Unit) error { func(u unit.Unit) error {
@ -343,8 +343,8 @@ func (f *formatMPEGTS) initialize() {
case *rtspformat.AC3: case *rtspformat.AC3:
track := addTrack(forma, &mpegts.CodecAC3{}) track := addTrack(forma, &mpegts.CodecAC3{})
f.ai.agent.Stream.AddReader( f.ri.rec.Stream.AddReader(
f.ai, f.ri,
media, media,
forma, forma,
func(u unit.Unit) error { func(u unit.Unit) error {
@ -380,15 +380,15 @@ func (f *formatMPEGTS) initialize() {
} }
if len(setuppedFormats) == 0 { if len(setuppedFormats) == 0 {
f.ai.Log(logger.Warn, "no supported tracks found, skipping recording") f.ri.Log(logger.Warn, "no supported tracks found, skipping recording")
return return false
} }
n := 1 n := 1
for _, medi := range f.ai.agent.Stream.Desc().Medias { for _, medi := range f.ri.rec.Stream.Desc().Medias {
for _, forma := range medi.Formats { for _, forma := range medi.Formats {
if _, ok := setuppedFormatsMap[forma]; !ok { if _, ok := setuppedFormatsMap[forma]; !ok {
f.ai.Log(logger.Warn, "skipping track %d (%s)", n, forma.Codec()) f.ri.Log(logger.Warn, "skipping track %d (%s)", n, forma.Codec())
} }
n++ n++
} }
@ -398,8 +398,10 @@ func (f *formatMPEGTS) initialize() {
f.bw = bufio.NewWriterSize(f.dw, mpegtsMaxBufferSize) f.bw = bufio.NewWriterSize(f.dw, mpegtsMaxBufferSize)
f.mw = mpegts.NewWriter(f.bw, tracks) f.mw = mpegts.NewWriter(f.bw, tracks)
f.ai.Log(logger.Info, "recording %s", f.ri.Log(logger.Info, "recording %s",
defs.FormatsInfo(setuppedFormats)) defs.FormatsInfo(setuppedFormats))
return true
} }
func (f *formatMPEGTS) close() { func (f *formatMPEGTS) close() {
@ -429,7 +431,7 @@ func (f *formatMPEGTS) write(
f.currentSegment.initialize() f.currentSegment.initialize()
case (!f.hasVideo || isVideo) && case (!f.hasVideo || isVideo) &&
randomAccess && randomAccess &&
(dtsDuration-f.currentSegment.startDTS) >= f.ai.agent.SegmentDuration: (dtsDuration-f.currentSegment.startDTS) >= f.ri.rec.SegmentDuration:
f.currentSegment.lastDTS = dtsDuration f.currentSegment.lastDTS = dtsDuration
err := f.currentSegment.close() err := f.currentSegment.close()
if err != nil { if err != nil {
@ -443,7 +445,7 @@ func (f *formatMPEGTS) write(
} }
f.currentSegment.initialize() f.currentSegment.initialize()
case (dtsDuration - f.currentSegment.lastFlush) >= f.ai.agent.PartDuration: case (dtsDuration - f.currentSegment.lastFlush) >= f.ri.rec.PartDuration:
err := f.bw.Flush() err := f.bw.Flush()
if err != nil { if err != nil {
return err return err

View file

@ -30,7 +30,7 @@ func (s *formatMPEGTSSegment) close() error {
err := s.f.bw.Flush() err := s.f.bw.Flush()
if s.fi != nil { if s.fi != nil {
s.f.ai.Log(logger.Debug, "closing segment %s", s.path) s.f.ri.Log(logger.Debug, "closing segment %s", s.path)
err2 := s.fi.Close() err2 := s.fi.Close()
if err == nil { if err == nil {
err = err2 err = err2
@ -38,7 +38,7 @@ func (s *formatMPEGTSSegment) close() error {
if err2 == nil { if err2 == nil {
duration := s.lastDTS - s.startDTS duration := s.lastDTS - s.startDTS
s.f.ai.agent.OnSegmentComplete(s.path, duration) s.f.ri.rec.OnSegmentComplete(s.path, duration)
} }
} }
@ -47,8 +47,8 @@ func (s *formatMPEGTSSegment) close() error {
func (s *formatMPEGTSSegment) Write(p []byte) (int, error) { func (s *formatMPEGTSSegment) Write(p []byte) (int, error) {
if s.fi == nil { if s.fi == nil {
s.path = recordstore.Path{Start: s.startNTP}.Encode(s.f.ai.pathFormat) s.path = recordstore.Path{Start: s.startNTP}.Encode(s.f.ri.pathFormat)
s.f.ai.Log(logger.Debug, "creating segment %s", s.path) s.f.ri.Log(logger.Debug, "creating segment %s", s.path)
err := os.MkdirAll(filepath.Dir(s.path), 0o755) err := os.MkdirAll(filepath.Dir(s.path), 0o755)
if err != nil { if err != nil {
@ -60,7 +60,7 @@ func (s *formatMPEGTSSegment) Write(p []byte) (int, error) {
return 0, err return 0, err
} }
s.f.ai.agent.OnSegmentCreate(s.path) s.f.ri.rec.OnSegmentCreate(s.path)
s.fi = fi s.fi = fi
} }

View file

@ -1,83 +0,0 @@
package recorder
import (
"strings"
"time"
"github.com/bluenviron/mediacommon/pkg/formats/fmp4"
"github.com/bluenviron/mediamtx/internal/conf"
"github.com/bluenviron/mediamtx/internal/logger"
"github.com/bluenviron/mediamtx/internal/recordstore"
)
type sample struct {
*fmp4.PartSample
dts int64
ntp time.Time
}
type recorderInstance struct {
agent *Recorder
pathFormat string
format format
terminate chan struct{}
done chan struct{}
}
// Log implements logger.Writer.
func (ai *recorderInstance) Log(level logger.Level, format string, args ...interface{}) {
ai.agent.Log(level, format, args...)
}
func (ai *recorderInstance) initialize() {
ai.pathFormat = ai.agent.PathFormat
ai.pathFormat = recordstore.PathAddExtension(
strings.ReplaceAll(ai.pathFormat, "%path", ai.agent.PathName),
ai.agent.Format,
)
ai.terminate = make(chan struct{})
ai.done = make(chan struct{})
switch ai.agent.Format {
case conf.RecordFormatMPEGTS:
ai.format = &formatMPEGTS{
ai: ai,
}
ai.format.initialize()
default:
ai.format = &formatFMP4{
ai: ai,
}
ai.format.initialize()
}
ai.agent.Stream.StartReader(ai)
go ai.run()
}
func (ai *recorderInstance) close() {
close(ai.terminate)
<-ai.done
}
func (ai *recorderInstance) run() {
defer close(ai.done)
select {
case err := <-ai.agent.Stream.ReaderError(ai):
ai.Log(logger.Error, err.Error())
case <-ai.terminate:
}
ai.agent.Stream.RemoveReader(ai)
ai.format.close()
}

View file

@ -36,63 +36,63 @@ type Recorder struct {
} }
// Initialize initializes Recorder. // Initialize initializes Recorder.
func (w *Recorder) Initialize() { func (r *Recorder) Initialize() {
if w.OnSegmentCreate == nil { if r.OnSegmentCreate == nil {
w.OnSegmentCreate = func(string) { r.OnSegmentCreate = func(string) {
} }
} }
if w.OnSegmentComplete == nil { if r.OnSegmentComplete == nil {
w.OnSegmentComplete = func(string, time.Duration) { r.OnSegmentComplete = func(string, time.Duration) {
} }
} }
if w.restartPause == 0 { if r.restartPause == 0 {
w.restartPause = 2 * time.Second r.restartPause = 2 * time.Second
} }
w.terminate = make(chan struct{}) r.terminate = make(chan struct{})
w.done = make(chan struct{}) r.done = make(chan struct{})
w.currentInstance = &recorderInstance{ r.currentInstance = &recorderInstance{
agent: w, rec: r,
} }
w.currentInstance.initialize() r.currentInstance.initialize()
go w.run() go r.run()
} }
// Log implements logger.Writer. // Log implements logger.Writer.
func (w *Recorder) Log(level logger.Level, format string, args ...interface{}) { func (r *Recorder) Log(level logger.Level, format string, args ...interface{}) {
w.Parent.Log(level, "[recorder] "+format, args...) r.Parent.Log(level, "[recorder] "+format, args...)
} }
// Close closes the agent. // Close closes the agent.
func (w *Recorder) Close() { func (r *Recorder) Close() {
w.Log(logger.Info, "recording stopped") r.Log(logger.Info, "recording stopped")
close(w.terminate) close(r.terminate)
<-w.done <-r.done
} }
func (w *Recorder) run() { func (r *Recorder) run() {
defer close(w.done) defer close(r.done)
for { for {
select { select {
case <-w.currentInstance.done: case <-r.currentInstance.done:
w.currentInstance.close() r.currentInstance.close()
case <-w.terminate: case <-r.terminate:
w.currentInstance.close() r.currentInstance.close()
return return
} }
select { select {
case <-time.After(w.restartPause): case <-time.After(r.restartPause):
case <-w.terminate: case <-r.terminate:
return return
} }
w.currentInstance = &recorderInstance{ r.currentInstance = &recorderInstance{
agent: w, rec: r,
} }
w.currentInstance.initialize() r.currentInstance.initialize()
} }
} }

View file

@ -0,0 +1,92 @@
package recorder
import (
"strings"
"time"
"github.com/bluenviron/mediacommon/pkg/formats/fmp4"
"github.com/bluenviron/mediamtx/internal/conf"
"github.com/bluenviron/mediamtx/internal/logger"
"github.com/bluenviron/mediamtx/internal/recordstore"
)
type sample struct {
*fmp4.PartSample
dts int64
ntp time.Time
}
type recorderInstance struct {
rec *Recorder
pathFormat string
format format
skip bool
terminate chan struct{}
done chan struct{}
}
// Log implements logger.Writer.
func (ri *recorderInstance) Log(level logger.Level, format string, args ...interface{}) {
ri.rec.Log(level, format, args...)
}
func (ri *recorderInstance) initialize() {
ri.pathFormat = ri.rec.PathFormat
ri.pathFormat = recordstore.PathAddExtension(
strings.ReplaceAll(ri.pathFormat, "%path", ri.rec.PathName),
ri.rec.Format,
)
ri.terminate = make(chan struct{})
ri.done = make(chan struct{})
switch ri.rec.Format {
case conf.RecordFormatMPEGTS:
ri.format = &formatMPEGTS{
ri: ri,
}
ok := ri.format.initialize()
ri.skip = !ok
default:
ri.format = &formatFMP4{
ri: ri,
}
ok := ri.format.initialize()
ri.skip = !ok
}
if !ri.skip {
ri.rec.Stream.StartReader(ri)
}
go ri.run()
}
func (ri *recorderInstance) close() {
close(ri.terminate)
<-ri.done
}
func (ri *recorderInstance) run() {
defer close(ri.done)
if !ri.skip {
select {
case err := <-ri.rec.Stream.ReaderError(ri):
ri.Log(logger.Error, err.Error())
case <-ri.terminate:
}
ri.rec.Stream.RemoveReader(ri)
} else {
<-ri.terminate
}
ri.format.close()
}

View file

@ -410,7 +410,7 @@ func TestRecorderFMP4NegativeDTS(t *testing.T) {
require.Equal(t, true, found) require.Equal(t, true, found)
} }
func TestRecorderSkipTracks(t *testing.T) { func TestRecorderSkipTracksPartial(t *testing.T) {
for _, ca := range []string{"fmp4", "mpegts"} { for _, ca := range []string{"fmp4", "mpegts"} {
t.Run(ca, func(t *testing.T) { t.Run(ca, func(t *testing.T) {
desc := &description.Session{Medias: []*description.Media{ desc := &description.Session{Medias: []*description.Media{
@ -473,3 +473,63 @@ func TestRecorderSkipTracks(t *testing.T) {
}) })
} }
} }
func TestRecorderSkipTracksFull(t *testing.T) {
for _, ca := range []string{"fmp4", "mpegts"} {
t.Run(ca, func(t *testing.T) {
desc := &description.Session{Medias: []*description.Media{
{
Type: description.MediaTypeVideo,
Formats: []rtspformat.Format{&rtspformat.VP8{}},
},
}}
stream, err := stream.New(
512,
1460,
desc,
true,
test.NilLogger,
)
require.NoError(t, err)
defer stream.Close()
dir, err := os.MkdirTemp("", "mediamtx-agent")
require.NoError(t, err)
defer os.RemoveAll(dir)
recordPath := filepath.Join(dir, "%path/%Y-%m-%d_%H-%M-%S-%f")
n := 0
l := test.Logger(func(l logger.Level, format string, args ...interface{}) {
if n == 0 {
require.Equal(t, logger.Warn, l)
require.Equal(t, "[recorder] no supported tracks found, skipping recording", fmt.Sprintf(format, args...))
}
n++
})
var fo conf.RecordFormat
if ca == "fmp4" {
fo = conf.RecordFormatFMP4
} else {
fo = conf.RecordFormatMPEGTS
}
w := &Recorder{
PathFormat: recordPath,
Format: fo,
PartDuration: 100 * time.Millisecond,
SegmentDuration: 1 * time.Second,
PathName: "mypath",
Stream: stream,
Parent: l,
}
w.Initialize()
defer w.Close()
require.Equal(t, 1, n)
})
}
}