diff --git a/internal/counterdumper/counterdumper.go b/internal/counterdumper/dumper.go similarity index 74% rename from internal/counterdumper/counterdumper.go rename to internal/counterdumper/dumper.go index 63ddafd9..eb58ca6c 100644 --- a/internal/counterdumper/counterdumper.go +++ b/internal/counterdumper/dumper.go @@ -10,8 +10,8 @@ const ( callbackPeriod = 1 * time.Second ) -// CounterDumper is a counter that periodically invokes a callback if the counter is not zero. -type CounterDumper struct { +// Dumper is a counter that periodically invokes a callback if the counter is not zero. +type Dumper struct { OnReport func(v uint64) counter *uint64 @@ -21,7 +21,7 @@ type CounterDumper struct { } // Start starts the counter. -func (c *CounterDumper) Start() { +func (c *Dumper) Start() { c.counter = new(uint64) c.terminate = make(chan struct{}) c.done = make(chan struct{}) @@ -30,22 +30,22 @@ func (c *CounterDumper) Start() { } // Stop stops the counter. -func (c *CounterDumper) Stop() { +func (c *Dumper) Stop() { close(c.terminate) <-c.done } // Increase increases the counter value by 1. -func (c *CounterDumper) Increase() { +func (c *Dumper) Increase() { atomic.AddUint64(c.counter, 1) } // Add adds value to the counter. -func (c *CounterDumper) Add(v uint64) { +func (c *Dumper) Add(v uint64) { atomic.AddUint64(c.counter, v) } -func (c *CounterDumper) run() { +func (c *Dumper) run() { defer close(c.done) t := time.NewTicker(callbackPeriod) diff --git a/internal/counterdumper/counterdumper_test.go b/internal/counterdumper/dumper_test.go similarity index 78% rename from internal/counterdumper/counterdumper_test.go rename to internal/counterdumper/dumper_test.go index 1323ea4e..ba8654a4 100644 --- a/internal/counterdumper/counterdumper_test.go +++ b/internal/counterdumper/dumper_test.go @@ -7,10 +7,10 @@ import ( "github.com/stretchr/testify/require" ) -func TestCounterDumperReport(t *testing.T) { +func TestDumperReport(t *testing.T) { done := make(chan struct{}) - c := &CounterDumper{ + c := &Dumper{ OnReport: func(v uint64) { require.Equal(t, uint64(3), v) close(done) @@ -29,8 +29,8 @@ func TestCounterDumperReport(t *testing.T) { } } -func TestCounterDumperDoNotReport(t *testing.T) { - c := &CounterDumper{ +func TestDumperDoNotReport(t *testing.T) { + c := &Dumper{ OnReport: func(_ uint64) { t.Errorf("should not happen") }, diff --git a/internal/errordumper/dumper.go b/internal/errordumper/dumper.go new file mode 100644 index 00000000..0e73944f --- /dev/null +++ b/internal/errordumper/dumper.go @@ -0,0 +1,69 @@ +// Package errordumper contains a counter that that periodically invokes a callback if the counter is not zero. +package errordumper + +import ( + "sync" + "time" +) + +const ( + callbackPeriod = 1 * time.Second +) + +// Dumper is a counter that periodically invokes a callback if errors were added. +type Dumper struct { + OnReport func(v uint64, last error) + + mutex sync.Mutex + counter uint64 + last error + + terminate chan struct{} + done chan struct{} +} + +// Start starts the counter. +func (c *Dumper) Start() { + c.terminate = make(chan struct{}) + c.done = make(chan struct{}) + + go c.run() +} + +// Stop stops the counter. +func (c *Dumper) Stop() { + close(c.terminate) + <-c.done +} + +// Add adds an error to the counter. +func (c *Dumper) Add(err error) { + c.mutex.Lock() + defer c.mutex.Unlock() + c.counter++ + c.last = err +} + +func (c *Dumper) run() { + defer close(c.done) + + t := time.NewTicker(callbackPeriod) + defer t.Stop() + + for { + select { + case <-c.terminate: + return + + case <-t.C: + c.mutex.Lock() + counter := c.counter + last := c.last + c.mutex.Unlock() + + if counter != 0 { + c.OnReport(counter, last) + } + } + } +} diff --git a/internal/errordumper/dumper_test.go b/internal/errordumper/dumper_test.go new file mode 100644 index 00000000..c0c87a81 --- /dev/null +++ b/internal/errordumper/dumper_test.go @@ -0,0 +1,43 @@ +package errordumper + +import ( + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestDumperReport(t *testing.T) { + done := make(chan struct{}) + + c := &Dumper{ + OnReport: func(v uint64, last error) { + require.Equal(t, uint64(1), v) + require.EqualError(t, last, "test error") + close(done) + }, + } + c.Start() + defer c.Stop() + + c.Add(fmt.Errorf("test error")) + + select { + case <-done: + case <-time.After(2 * time.Second): + t.Errorf("should not happen") + } +} + +func TestDumperDoNotReport(t *testing.T) { + c := &Dumper{ + OnReport: func(_ uint64, _ error) { + t.Errorf("should not happen") + }, + } + c.Start() + defer c.Stop() + + <-time.After(2 * time.Second) +} diff --git a/internal/protocols/webrtc/incoming_track.go b/internal/protocols/webrtc/incoming_track.go index 6cdef34c..88bb2abf 100644 --- a/internal/protocols/webrtc/incoming_track.go +++ b/internal/protocols/webrtc/incoming_track.go @@ -243,7 +243,7 @@ type IncomingTrack struct { writeRTCP func([]rtcp.Packet) error log logger.Writer - packetsLost *counterdumper.CounterDumper + packetsLost *counterdumper.Dumper rtpReceiver *rtpreceiver.Receiver } @@ -267,7 +267,7 @@ func (*IncomingTrack) PTSEqualsDTS(*rtp.Packet) bool { } func (t *IncomingTrack) start() { - t.packetsLost = &counterdumper.CounterDumper{ + t.packetsLost = &counterdumper.Dumper{ OnReport: func(val uint64) { t.log.Log(logger.Warn, "%d RTP %s lost", val, diff --git a/internal/servers/rtsp/session.go b/internal/servers/rtsp/session.go index 4e91293c..565f0a02 100644 --- a/internal/servers/rtsp/session.go +++ b/internal/servers/rtsp/session.go @@ -18,6 +18,7 @@ import ( "github.com/bluenviron/mediamtx/internal/conf" "github.com/bluenviron/mediamtx/internal/counterdumper" "github.com/bluenviron/mediamtx/internal/defs" + "github.com/bluenviron/mediamtx/internal/errordumper" "github.com/bluenviron/mediamtx/internal/externalcmd" "github.com/bluenviron/mediamtx/internal/hooks" "github.com/bluenviron/mediamtx/internal/logger" @@ -51,16 +52,16 @@ type session struct { path defs.Path stream *stream.Stream onUnreadHook func() - packetsLost *counterdumper.CounterDumper - decodeErrors *counterdumper.CounterDumper - discardedFrames *counterdumper.CounterDumper + packetsLost *counterdumper.Dumper + decodeErrors *errordumper.Dumper + discardedFrames *counterdumper.Dumper } func (s *session) initialize() { s.uuid = uuid.New() s.created = time.Now() - s.packetsLost = &counterdumper.CounterDumper{ + s.packetsLost = &counterdumper.Dumper{ OnReport: func(val uint64) { s.Log(logger.Warn, "%d RTP %s lost", val, @@ -74,21 +75,18 @@ func (s *session) initialize() { } s.packetsLost.Start() - s.decodeErrors = &counterdumper.CounterDumper{ - OnReport: func(val uint64) { - s.Log(logger.Warn, "%d decode %s", - val, - func() string { - if val == 1 { - return "error" - } - return "errors" - }()) + s.decodeErrors = &errordumper.Dumper{ + OnReport: func(val uint64, last error) { + if val == 1 { + s.Log(logger.Warn, "decode error: %v", last) + } else { + s.Log(logger.Warn, "%d decode errors, last was: %v", val, last) + } }, } s.decodeErrors.Start() - s.discardedFrames = &counterdumper.CounterDumper{ + s.discardedFrames = &counterdumper.Dumper{ OnReport: func(val uint64) { s.Log(logger.Warn, "reader is too slow, discarding %d %s", val, @@ -385,8 +383,8 @@ func (s *session) onPacketsLost(ctx *gortsplib.ServerHandlerOnPacketsLostCtx) { } // onDecodeError is called by rtspServer. -func (s *session) onDecodeError(_ *gortsplib.ServerHandlerOnDecodeErrorCtx) { - s.decodeErrors.Increase() +func (s *session) onDecodeError(ctx *gortsplib.ServerHandlerOnDecodeErrorCtx) { + s.decodeErrors.Add(ctx.Error) } // onStreamWriteError is called by rtspServer. diff --git a/internal/servers/srt/conn.go b/internal/servers/srt/conn.go index 5684d24e..b52e0e7f 100644 --- a/internal/servers/srt/conn.go +++ b/internal/servers/srt/conn.go @@ -15,8 +15,8 @@ import ( "github.com/bluenviron/mediamtx/internal/auth" "github.com/bluenviron/mediamtx/internal/conf" - "github.com/bluenviron/mediamtx/internal/counterdumper" "github.com/bluenviron/mediamtx/internal/defs" + "github.com/bluenviron/mediamtx/internal/errordumper" "github.com/bluenviron/mediamtx/internal/externalcmd" "github.com/bluenviron/mediamtx/internal/hooks" "github.com/bluenviron/mediamtx/internal/logger" @@ -193,24 +193,21 @@ func (c *conn) runPublishReader(sconn srt.Conn, streamID *streamID, pathConf *co return err } - decodeErrors := &counterdumper.CounterDumper{ - OnReport: func(val uint64) { - c.Log(logger.Warn, "%d decode %s", - val, - func() string { - if val == 1 { - return "error" - } - return "errors" - }()) + decodeErrors := &errordumper.Dumper{ + OnReport: func(val uint64, last error) { + if val == 1 { + c.Log(logger.Warn, "decode error: %v", last) + } else { + c.Log(logger.Warn, "%d decode errors, last was: %v", val, last) + } }, } decodeErrors.Start() defer decodeErrors.Stop() - r.OnDecodeError(func(_ error) { - decodeErrors.Increase() + r.OnDecodeError(func(err error) { + decodeErrors.Add(err) }) var strm *stream.Stream diff --git a/internal/staticsources/hls/source.go b/internal/staticsources/hls/source.go index bb96ae87..b3096b80 100644 --- a/internal/staticsources/hls/source.go +++ b/internal/staticsources/hls/source.go @@ -10,8 +10,8 @@ import ( "github.com/bluenviron/gortsplib/v5/pkg/description" "github.com/bluenviron/mediamtx/internal/conf" - "github.com/bluenviron/mediamtx/internal/counterdumper" "github.com/bluenviron/mediamtx/internal/defs" + "github.com/bluenviron/mediamtx/internal/errordumper" "github.com/bluenviron/mediamtx/internal/logger" "github.com/bluenviron/mediamtx/internal/protocols/hls" "github.com/bluenviron/mediamtx/internal/protocols/tls" @@ -45,16 +45,13 @@ func (s *Source) Run(params defs.StaticSourceRunParams) error { } }() - decodeErrors := &counterdumper.CounterDumper{ - OnReport: func(val uint64) { - s.Log(logger.Warn, "%d decode %s", - val, - func() string { - if val == 1 { - return "error" - } - return "errors" - }()) + decodeErrors := &errordumper.Dumper{ + OnReport: func(val uint64, last error) { + if val == 1 { + s.Log(logger.Warn, "decode error: %v", last) + } else { + s.Log(logger.Warn, "%d decode errors, last was: %v", val, last) + } }, } @@ -90,8 +87,8 @@ func (s *Source) Run(params defs.StaticSourceRunParams) error { OnDownloadPart: func(u string) { s.Log(logger.Debug, "downloading part %v", u) }, - OnDecodeError: func(_ error) { - decodeErrors.Increase() + OnDecodeError: func(err error) { + decodeErrors.Add(err) }, OnTracks: func(tracks []*gohlslib.Track) error { medias, err2 := hls.ToStream(c, tracks, params.Conf, &strm) diff --git a/internal/staticsources/mpegts/source.go b/internal/staticsources/mpegts/source.go index 590576a2..e377b133 100644 --- a/internal/staticsources/mpegts/source.go +++ b/internal/staticsources/mpegts/source.go @@ -10,8 +10,8 @@ import ( "github.com/bluenviron/gortsplib/v5/pkg/description" "github.com/bluenviron/mediamtx/internal/conf" - "github.com/bluenviron/mediamtx/internal/counterdumper" "github.com/bluenviron/mediamtx/internal/defs" + "github.com/bluenviron/mediamtx/internal/errordumper" "github.com/bluenviron/mediamtx/internal/logger" "github.com/bluenviron/mediamtx/internal/protocols/mpegts" "github.com/bluenviron/mediamtx/internal/protocols/udp" @@ -96,24 +96,21 @@ func (s *Source) runReader(nc net.Conn) error { return err } - decodeErrors := &counterdumper.CounterDumper{ - OnReport: func(val uint64) { - s.Log(logger.Warn, "%d decode %s", - val, - func() string { - if val == 1 { - return "error" - } - return "errors" - }()) + decodeErrors := &errordumper.Dumper{ + OnReport: func(val uint64, last error) { + if val == 1 { + s.Log(logger.Warn, "decode error: %v", last) + } else { + s.Log(logger.Warn, "%d decode errors, last was: %v", val, last) + } }, } decodeErrors.Start() defer decodeErrors.Stop() - mr.OnDecodeError(func(_ error) { - decodeErrors.Increase() + mr.OnDecodeError(func(err error) { + decodeErrors.Add(err) }) var strm *stream.Stream diff --git a/internal/staticsources/rtp/source.go b/internal/staticsources/rtp/source.go index 735eba28..74602eca 100644 --- a/internal/staticsources/rtp/source.go +++ b/internal/staticsources/rtp/source.go @@ -13,6 +13,7 @@ import ( "github.com/bluenviron/mediamtx/internal/conf" "github.com/bluenviron/mediamtx/internal/counterdumper" "github.com/bluenviron/mediamtx/internal/defs" + "github.com/bluenviron/mediamtx/internal/errordumper" "github.com/bluenviron/mediamtx/internal/logger" "github.com/bluenviron/mediamtx/internal/protocols/udp" "github.com/bluenviron/mediamtx/internal/protocols/unix" @@ -102,7 +103,7 @@ func (s *Source) Run(params defs.StaticSourceRunParams) error { } func (s *Source) runReader(desc *description.Session, nc net.Conn) error { - packetsLost := &counterdumper.CounterDumper{ + packetsLost := &counterdumper.Dumper{ OnReport: func(val uint64) { s.Log(logger.Warn, "%d RTP %s lost", val, @@ -118,16 +119,13 @@ func (s *Source) runReader(desc *description.Session, nc net.Conn) error { packetsLost.Start() defer packetsLost.Stop() - decodeErrors := &counterdumper.CounterDumper{ - OnReport: func(val uint64) { - s.Log(logger.Warn, "%d decode %s", - val, - func() string { - if val == 1 { - return "error" - } - return "errors" - }()) + decodeErrors := &errordumper.Dumper{ + OnReport: func(val uint64, last error) { + if val == 1 { + s.Log(logger.Warn, "decode error: %v", last) + } else { + s.Log(logger.Warn, "%d decode errors, last was: %v", val, last) + } }, } decodeErrors.Start() @@ -169,7 +167,7 @@ func (s *Source) runReader(desc *description.Session, nc net.Conn) error { err = pkt.Unmarshal(buf[:n]) if err != nil { if strm != nil { - decodeErrors.Increase() + decodeErrors.Add(err) continue } return err diff --git a/internal/staticsources/rtsp/source.go b/internal/staticsources/rtsp/source.go index 0a21f506..766840e6 100644 --- a/internal/staticsources/rtsp/source.go +++ b/internal/staticsources/rtsp/source.go @@ -15,6 +15,7 @@ import ( "github.com/bluenviron/mediamtx/internal/conf" "github.com/bluenviron/mediamtx/internal/counterdumper" "github.com/bluenviron/mediamtx/internal/defs" + "github.com/bluenviron/mediamtx/internal/errordumper" "github.com/bluenviron/mediamtx/internal/logger" "github.com/bluenviron/mediamtx/internal/protocols/rtsp" "github.com/bluenviron/mediamtx/internal/protocols/tls" @@ -90,7 +91,7 @@ func (s *Source) Log(level logger.Level, format string, args ...any) { func (s *Source) Run(params defs.StaticSourceRunParams) error { s.Log(logger.Debug, "connecting") - packetsLost := &counterdumper.CounterDumper{ + packetsLost := &counterdumper.Dumper{ OnReport: func(val uint64) { s.Log(logger.Warn, "%d RTP %s lost", val, @@ -106,16 +107,13 @@ func (s *Source) Run(params defs.StaticSourceRunParams) error { packetsLost.Start() defer packetsLost.Stop() - decodeErrors := &counterdumper.CounterDumper{ - OnReport: func(val uint64) { - s.Log(logger.Warn, "%d decode %s", - val, - func() string { - if val == 1 { - return "error" - } - return "errors" - }()) + decodeErrors := &errordumper.Dumper{ + OnReport: func(val uint64, last error) { + if val == 1 { + s.Log(logger.Warn, "decode error: %v", last) + } else { + s.Log(logger.Warn, "%d decode errors, last was: %v", val, last) + } }, } @@ -177,8 +175,8 @@ func (s *Source) Run(params defs.StaticSourceRunParams) error { OnPacketsLost: func(lost uint64) { packetsLost.Add(lost) }, - OnDecodeError: func(_ error) { - decodeErrors.Increase() + OnDecodeError: func(err error) { + decodeErrors.Add(err) }, } diff --git a/internal/staticsources/srt/source.go b/internal/staticsources/srt/source.go index 48acd8a9..a370af8c 100644 --- a/internal/staticsources/srt/source.go +++ b/internal/staticsources/srt/source.go @@ -8,8 +8,8 @@ import ( srt "github.com/datarhei/gosrt" "github.com/bluenviron/mediamtx/internal/conf" - "github.com/bluenviron/mediamtx/internal/counterdumper" "github.com/bluenviron/mediamtx/internal/defs" + "github.com/bluenviron/mediamtx/internal/errordumper" "github.com/bluenviron/mediamtx/internal/logger" "github.com/bluenviron/mediamtx/internal/protocols/mpegts" "github.com/bluenviron/mediamtx/internal/stream" @@ -81,24 +81,21 @@ func (s *Source) runReader(sconn srt.Conn) error { return err } - decodeErrors := &counterdumper.CounterDumper{ - OnReport: func(val uint64) { - s.Log(logger.Warn, "%d decode %s", - val, - func() string { - if val == 1 { - return "error" - } - return "errors" - }()) + decodeErrors := &errordumper.Dumper{ + OnReport: func(val uint64, last error) { + if val == 1 { + s.Log(logger.Warn, "decode error: %v", last) + } else { + s.Log(logger.Warn, "%d decode errors, last was: %v", val, last) + } }, } decodeErrors.Start() defer decodeErrors.Stop() - r.OnDecodeError(func(_ error) { - decodeErrors.Increase() + r.OnDecodeError(func(err error) { + decodeErrors.Add(err) }) var strm *stream.Stream diff --git a/internal/stream/reader.go b/internal/stream/reader.go index 9353496f..359d1426 100644 --- a/internal/stream/reader.go +++ b/internal/stream/reader.go @@ -22,7 +22,7 @@ type Reader struct { onDatas map[*description.Media]map[format.Format]OnDataFunc queueSize int buffer *ringbuffer.RingBuffer - discardedFrames *counterdumper.CounterDumper + discardedFrames *counterdumper.Dumper // out err chan error @@ -63,7 +63,7 @@ func (r *Reader) start() { r.buffer = buffer r.err = make(chan error) - r.discardedFrames = &counterdumper.CounterDumper{ + r.discardedFrames = &counterdumper.Dumper{ OnReport: func(val uint64) { r.Parent.Log(logger.Warn, "reader is too slow, discarding %d %s", val, diff --git a/internal/stream/stream.go b/internal/stream/stream.go index d066cfea..a0dc3f1c 100644 --- a/internal/stream/stream.go +++ b/internal/stream/stream.go @@ -33,7 +33,7 @@ type Stream struct { rtspStream *gortsplib.ServerStream rtspsStream *gortsplib.ServerStream readers map[*Reader]struct{} - processingErrors *counterdumper.CounterDumper + processingErrors *counterdumper.Dumper } // Initialize initializes a Stream. @@ -43,7 +43,7 @@ func (s *Stream) Initialize() error { s.medias = make(map[*description.Media]*streamMedia) s.readers = make(map[*Reader]struct{}) - s.processingErrors = &counterdumper.CounterDumper{ + s.processingErrors = &counterdumper.Dumper{ OnReport: func(val uint64) { s.Parent.Log(logger.Warn, "%d processing %s", val, diff --git a/internal/stream/stream_format.go b/internal/stream/stream_format.go index 30ad5296..5c625d38 100644 --- a/internal/stream/stream_format.go +++ b/internal/stream/stream_format.go @@ -28,7 +28,7 @@ type streamFormat struct { format format.Format generateRTPPackets bool fillNTP bool - processingErrors *counterdumper.CounterDumper + processingErrors *counterdumper.Dumper parent logger.Writer proc codecprocessor.Processor diff --git a/internal/stream/stream_media.go b/internal/stream/stream_media.go index a0037c05..3469f78a 100644 --- a/internal/stream/stream_media.go +++ b/internal/stream/stream_media.go @@ -12,7 +12,7 @@ type streamMedia struct { media *description.Media generateRTPPackets bool fillNTP bool - processingErrors *counterdumper.CounterDumper + processingErrors *counterdumper.Dumper parent logger.Writer formats map[format.Format]*streamFormat