print last decode error (#5319)
Some checks failed
code_lint / go (push) Has been cancelled
code_lint / go_mod (push) Has been cancelled
code_lint / docs (push) Has been cancelled
code_lint / api_docs (push) Has been cancelled
code_test / test_64 (push) Has been cancelled
code_test / test_32 (push) Has been cancelled
code_test / test_e2e (push) Has been cancelled

decode errors are still grouped together, but the last one is now
printed.
This commit is contained in:
Alessandro Ros 2026-01-05 15:28:24 +01:00 committed by GitHub
parent 1d9b2ff7ea
commit cd34de8770
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
16 changed files with 207 additions and 113 deletions

View file

@ -10,8 +10,8 @@ const (
callbackPeriod = 1 * time.Second
)
// CounterDumper is a counter that periodically invokes a callback if the counter is not zero.
type CounterDumper struct {
// Dumper is a counter that periodically invokes a callback if the counter is not zero.
type Dumper struct {
OnReport func(v uint64)
counter *uint64
@ -21,7 +21,7 @@ type CounterDumper struct {
}
// Start starts the counter.
func (c *CounterDumper) Start() {
func (c *Dumper) Start() {
c.counter = new(uint64)
c.terminate = make(chan struct{})
c.done = make(chan struct{})
@ -30,22 +30,22 @@ func (c *CounterDumper) Start() {
}
// Stop stops the counter.
func (c *CounterDumper) Stop() {
func (c *Dumper) Stop() {
close(c.terminate)
<-c.done
}
// Increase increases the counter value by 1.
func (c *CounterDumper) Increase() {
func (c *Dumper) Increase() {
atomic.AddUint64(c.counter, 1)
}
// Add adds value to the counter.
func (c *CounterDumper) Add(v uint64) {
func (c *Dumper) Add(v uint64) {
atomic.AddUint64(c.counter, v)
}
func (c *CounterDumper) run() {
func (c *Dumper) run() {
defer close(c.done)
t := time.NewTicker(callbackPeriod)

View file

@ -7,10 +7,10 @@ import (
"github.com/stretchr/testify/require"
)
func TestCounterDumperReport(t *testing.T) {
func TestDumperReport(t *testing.T) {
done := make(chan struct{})
c := &CounterDumper{
c := &Dumper{
OnReport: func(v uint64) {
require.Equal(t, uint64(3), v)
close(done)
@ -29,8 +29,8 @@ func TestCounterDumperReport(t *testing.T) {
}
}
func TestCounterDumperDoNotReport(t *testing.T) {
c := &CounterDumper{
func TestDumperDoNotReport(t *testing.T) {
c := &Dumper{
OnReport: func(_ uint64) {
t.Errorf("should not happen")
},

View file

@ -0,0 +1,69 @@
// Package errordumper contains a counter that that periodically invokes a callback if the counter is not zero.
package errordumper
import (
"sync"
"time"
)
const (
callbackPeriod = 1 * time.Second
)
// Dumper is a counter that periodically invokes a callback if errors were added.
type Dumper struct {
OnReport func(v uint64, last error)
mutex sync.Mutex
counter uint64
last error
terminate chan struct{}
done chan struct{}
}
// Start starts the counter.
func (c *Dumper) Start() {
c.terminate = make(chan struct{})
c.done = make(chan struct{})
go c.run()
}
// Stop stops the counter.
func (c *Dumper) Stop() {
close(c.terminate)
<-c.done
}
// Add adds an error to the counter.
func (c *Dumper) Add(err error) {
c.mutex.Lock()
defer c.mutex.Unlock()
c.counter++
c.last = err
}
func (c *Dumper) run() {
defer close(c.done)
t := time.NewTicker(callbackPeriod)
defer t.Stop()
for {
select {
case <-c.terminate:
return
case <-t.C:
c.mutex.Lock()
counter := c.counter
last := c.last
c.mutex.Unlock()
if counter != 0 {
c.OnReport(counter, last)
}
}
}
}

View file

@ -0,0 +1,43 @@
package errordumper
import (
"fmt"
"testing"
"time"
"github.com/stretchr/testify/require"
)
func TestDumperReport(t *testing.T) {
done := make(chan struct{})
c := &Dumper{
OnReport: func(v uint64, last error) {
require.Equal(t, uint64(1), v)
require.EqualError(t, last, "test error")
close(done)
},
}
c.Start()
defer c.Stop()
c.Add(fmt.Errorf("test error"))
select {
case <-done:
case <-time.After(2 * time.Second):
t.Errorf("should not happen")
}
}
func TestDumperDoNotReport(t *testing.T) {
c := &Dumper{
OnReport: func(_ uint64, _ error) {
t.Errorf("should not happen")
},
}
c.Start()
defer c.Stop()
<-time.After(2 * time.Second)
}

View file

@ -243,7 +243,7 @@ type IncomingTrack struct {
writeRTCP func([]rtcp.Packet) error
log logger.Writer
packetsLost *counterdumper.CounterDumper
packetsLost *counterdumper.Dumper
rtpReceiver *rtpreceiver.Receiver
}
@ -267,7 +267,7 @@ func (*IncomingTrack) PTSEqualsDTS(*rtp.Packet) bool {
}
func (t *IncomingTrack) start() {
t.packetsLost = &counterdumper.CounterDumper{
t.packetsLost = &counterdumper.Dumper{
OnReport: func(val uint64) {
t.log.Log(logger.Warn, "%d RTP %s lost",
val,

View file

@ -18,6 +18,7 @@ import (
"github.com/bluenviron/mediamtx/internal/conf"
"github.com/bluenviron/mediamtx/internal/counterdumper"
"github.com/bluenviron/mediamtx/internal/defs"
"github.com/bluenviron/mediamtx/internal/errordumper"
"github.com/bluenviron/mediamtx/internal/externalcmd"
"github.com/bluenviron/mediamtx/internal/hooks"
"github.com/bluenviron/mediamtx/internal/logger"
@ -51,16 +52,16 @@ type session struct {
path defs.Path
stream *stream.Stream
onUnreadHook func()
packetsLost *counterdumper.CounterDumper
decodeErrors *counterdumper.CounterDumper
discardedFrames *counterdumper.CounterDumper
packetsLost *counterdumper.Dumper
decodeErrors *errordumper.Dumper
discardedFrames *counterdumper.Dumper
}
func (s *session) initialize() {
s.uuid = uuid.New()
s.created = time.Now()
s.packetsLost = &counterdumper.CounterDumper{
s.packetsLost = &counterdumper.Dumper{
OnReport: func(val uint64) {
s.Log(logger.Warn, "%d RTP %s lost",
val,
@ -74,21 +75,18 @@ func (s *session) initialize() {
}
s.packetsLost.Start()
s.decodeErrors = &counterdumper.CounterDumper{
OnReport: func(val uint64) {
s.Log(logger.Warn, "%d decode %s",
val,
func() string {
if val == 1 {
return "error"
}
return "errors"
}())
s.decodeErrors = &errordumper.Dumper{
OnReport: func(val uint64, last error) {
if val == 1 {
s.Log(logger.Warn, "decode error: %v", last)
} else {
s.Log(logger.Warn, "%d decode errors, last was: %v", val, last)
}
},
}
s.decodeErrors.Start()
s.discardedFrames = &counterdumper.CounterDumper{
s.discardedFrames = &counterdumper.Dumper{
OnReport: func(val uint64) {
s.Log(logger.Warn, "reader is too slow, discarding %d %s",
val,
@ -385,8 +383,8 @@ func (s *session) onPacketsLost(ctx *gortsplib.ServerHandlerOnPacketsLostCtx) {
}
// onDecodeError is called by rtspServer.
func (s *session) onDecodeError(_ *gortsplib.ServerHandlerOnDecodeErrorCtx) {
s.decodeErrors.Increase()
func (s *session) onDecodeError(ctx *gortsplib.ServerHandlerOnDecodeErrorCtx) {
s.decodeErrors.Add(ctx.Error)
}
// onStreamWriteError is called by rtspServer.

View file

@ -15,8 +15,8 @@ import (
"github.com/bluenviron/mediamtx/internal/auth"
"github.com/bluenviron/mediamtx/internal/conf"
"github.com/bluenviron/mediamtx/internal/counterdumper"
"github.com/bluenviron/mediamtx/internal/defs"
"github.com/bluenviron/mediamtx/internal/errordumper"
"github.com/bluenviron/mediamtx/internal/externalcmd"
"github.com/bluenviron/mediamtx/internal/hooks"
"github.com/bluenviron/mediamtx/internal/logger"
@ -193,24 +193,21 @@ func (c *conn) runPublishReader(sconn srt.Conn, streamID *streamID, pathConf *co
return err
}
decodeErrors := &counterdumper.CounterDumper{
OnReport: func(val uint64) {
c.Log(logger.Warn, "%d decode %s",
val,
func() string {
if val == 1 {
return "error"
}
return "errors"
}())
decodeErrors := &errordumper.Dumper{
OnReport: func(val uint64, last error) {
if val == 1 {
c.Log(logger.Warn, "decode error: %v", last)
} else {
c.Log(logger.Warn, "%d decode errors, last was: %v", val, last)
}
},
}
decodeErrors.Start()
defer decodeErrors.Stop()
r.OnDecodeError(func(_ error) {
decodeErrors.Increase()
r.OnDecodeError(func(err error) {
decodeErrors.Add(err)
})
var strm *stream.Stream

View file

@ -10,8 +10,8 @@ import (
"github.com/bluenviron/gortsplib/v5/pkg/description"
"github.com/bluenviron/mediamtx/internal/conf"
"github.com/bluenviron/mediamtx/internal/counterdumper"
"github.com/bluenviron/mediamtx/internal/defs"
"github.com/bluenviron/mediamtx/internal/errordumper"
"github.com/bluenviron/mediamtx/internal/logger"
"github.com/bluenviron/mediamtx/internal/protocols/hls"
"github.com/bluenviron/mediamtx/internal/protocols/tls"
@ -45,16 +45,13 @@ func (s *Source) Run(params defs.StaticSourceRunParams) error {
}
}()
decodeErrors := &counterdumper.CounterDumper{
OnReport: func(val uint64) {
s.Log(logger.Warn, "%d decode %s",
val,
func() string {
if val == 1 {
return "error"
}
return "errors"
}())
decodeErrors := &errordumper.Dumper{
OnReport: func(val uint64, last error) {
if val == 1 {
s.Log(logger.Warn, "decode error: %v", last)
} else {
s.Log(logger.Warn, "%d decode errors, last was: %v", val, last)
}
},
}
@ -90,8 +87,8 @@ func (s *Source) Run(params defs.StaticSourceRunParams) error {
OnDownloadPart: func(u string) {
s.Log(logger.Debug, "downloading part %v", u)
},
OnDecodeError: func(_ error) {
decodeErrors.Increase()
OnDecodeError: func(err error) {
decodeErrors.Add(err)
},
OnTracks: func(tracks []*gohlslib.Track) error {
medias, err2 := hls.ToStream(c, tracks, params.Conf, &strm)

View file

@ -10,8 +10,8 @@ import (
"github.com/bluenviron/gortsplib/v5/pkg/description"
"github.com/bluenviron/mediamtx/internal/conf"
"github.com/bluenviron/mediamtx/internal/counterdumper"
"github.com/bluenviron/mediamtx/internal/defs"
"github.com/bluenviron/mediamtx/internal/errordumper"
"github.com/bluenviron/mediamtx/internal/logger"
"github.com/bluenviron/mediamtx/internal/protocols/mpegts"
"github.com/bluenviron/mediamtx/internal/protocols/udp"
@ -96,24 +96,21 @@ func (s *Source) runReader(nc net.Conn) error {
return err
}
decodeErrors := &counterdumper.CounterDumper{
OnReport: func(val uint64) {
s.Log(logger.Warn, "%d decode %s",
val,
func() string {
if val == 1 {
return "error"
}
return "errors"
}())
decodeErrors := &errordumper.Dumper{
OnReport: func(val uint64, last error) {
if val == 1 {
s.Log(logger.Warn, "decode error: %v", last)
} else {
s.Log(logger.Warn, "%d decode errors, last was: %v", val, last)
}
},
}
decodeErrors.Start()
defer decodeErrors.Stop()
mr.OnDecodeError(func(_ error) {
decodeErrors.Increase()
mr.OnDecodeError(func(err error) {
decodeErrors.Add(err)
})
var strm *stream.Stream

View file

@ -13,6 +13,7 @@ import (
"github.com/bluenviron/mediamtx/internal/conf"
"github.com/bluenviron/mediamtx/internal/counterdumper"
"github.com/bluenviron/mediamtx/internal/defs"
"github.com/bluenviron/mediamtx/internal/errordumper"
"github.com/bluenviron/mediamtx/internal/logger"
"github.com/bluenviron/mediamtx/internal/protocols/udp"
"github.com/bluenviron/mediamtx/internal/protocols/unix"
@ -102,7 +103,7 @@ func (s *Source) Run(params defs.StaticSourceRunParams) error {
}
func (s *Source) runReader(desc *description.Session, nc net.Conn) error {
packetsLost := &counterdumper.CounterDumper{
packetsLost := &counterdumper.Dumper{
OnReport: func(val uint64) {
s.Log(logger.Warn, "%d RTP %s lost",
val,
@ -118,16 +119,13 @@ func (s *Source) runReader(desc *description.Session, nc net.Conn) error {
packetsLost.Start()
defer packetsLost.Stop()
decodeErrors := &counterdumper.CounterDumper{
OnReport: func(val uint64) {
s.Log(logger.Warn, "%d decode %s",
val,
func() string {
if val == 1 {
return "error"
}
return "errors"
}())
decodeErrors := &errordumper.Dumper{
OnReport: func(val uint64, last error) {
if val == 1 {
s.Log(logger.Warn, "decode error: %v", last)
} else {
s.Log(logger.Warn, "%d decode errors, last was: %v", val, last)
}
},
}
decodeErrors.Start()
@ -169,7 +167,7 @@ func (s *Source) runReader(desc *description.Session, nc net.Conn) error {
err = pkt.Unmarshal(buf[:n])
if err != nil {
if strm != nil {
decodeErrors.Increase()
decodeErrors.Add(err)
continue
}
return err

View file

@ -15,6 +15,7 @@ import (
"github.com/bluenviron/mediamtx/internal/conf"
"github.com/bluenviron/mediamtx/internal/counterdumper"
"github.com/bluenviron/mediamtx/internal/defs"
"github.com/bluenviron/mediamtx/internal/errordumper"
"github.com/bluenviron/mediamtx/internal/logger"
"github.com/bluenviron/mediamtx/internal/protocols/rtsp"
"github.com/bluenviron/mediamtx/internal/protocols/tls"
@ -90,7 +91,7 @@ func (s *Source) Log(level logger.Level, format string, args ...any) {
func (s *Source) Run(params defs.StaticSourceRunParams) error {
s.Log(logger.Debug, "connecting")
packetsLost := &counterdumper.CounterDumper{
packetsLost := &counterdumper.Dumper{
OnReport: func(val uint64) {
s.Log(logger.Warn, "%d RTP %s lost",
val,
@ -106,16 +107,13 @@ func (s *Source) Run(params defs.StaticSourceRunParams) error {
packetsLost.Start()
defer packetsLost.Stop()
decodeErrors := &counterdumper.CounterDumper{
OnReport: func(val uint64) {
s.Log(logger.Warn, "%d decode %s",
val,
func() string {
if val == 1 {
return "error"
}
return "errors"
}())
decodeErrors := &errordumper.Dumper{
OnReport: func(val uint64, last error) {
if val == 1 {
s.Log(logger.Warn, "decode error: %v", last)
} else {
s.Log(logger.Warn, "%d decode errors, last was: %v", val, last)
}
},
}
@ -177,8 +175,8 @@ func (s *Source) Run(params defs.StaticSourceRunParams) error {
OnPacketsLost: func(lost uint64) {
packetsLost.Add(lost)
},
OnDecodeError: func(_ error) {
decodeErrors.Increase()
OnDecodeError: func(err error) {
decodeErrors.Add(err)
},
}

View file

@ -8,8 +8,8 @@ import (
srt "github.com/datarhei/gosrt"
"github.com/bluenviron/mediamtx/internal/conf"
"github.com/bluenviron/mediamtx/internal/counterdumper"
"github.com/bluenviron/mediamtx/internal/defs"
"github.com/bluenviron/mediamtx/internal/errordumper"
"github.com/bluenviron/mediamtx/internal/logger"
"github.com/bluenviron/mediamtx/internal/protocols/mpegts"
"github.com/bluenviron/mediamtx/internal/stream"
@ -81,24 +81,21 @@ func (s *Source) runReader(sconn srt.Conn) error {
return err
}
decodeErrors := &counterdumper.CounterDumper{
OnReport: func(val uint64) {
s.Log(logger.Warn, "%d decode %s",
val,
func() string {
if val == 1 {
return "error"
}
return "errors"
}())
decodeErrors := &errordumper.Dumper{
OnReport: func(val uint64, last error) {
if val == 1 {
s.Log(logger.Warn, "decode error: %v", last)
} else {
s.Log(logger.Warn, "%d decode errors, last was: %v", val, last)
}
},
}
decodeErrors.Start()
defer decodeErrors.Stop()
r.OnDecodeError(func(_ error) {
decodeErrors.Increase()
r.OnDecodeError(func(err error) {
decodeErrors.Add(err)
})
var strm *stream.Stream

View file

@ -22,7 +22,7 @@ type Reader struct {
onDatas map[*description.Media]map[format.Format]OnDataFunc
queueSize int
buffer *ringbuffer.RingBuffer
discardedFrames *counterdumper.CounterDumper
discardedFrames *counterdumper.Dumper
// out
err chan error
@ -63,7 +63,7 @@ func (r *Reader) start() {
r.buffer = buffer
r.err = make(chan error)
r.discardedFrames = &counterdumper.CounterDumper{
r.discardedFrames = &counterdumper.Dumper{
OnReport: func(val uint64) {
r.Parent.Log(logger.Warn, "reader is too slow, discarding %d %s",
val,

View file

@ -33,7 +33,7 @@ type Stream struct {
rtspStream *gortsplib.ServerStream
rtspsStream *gortsplib.ServerStream
readers map[*Reader]struct{}
processingErrors *counterdumper.CounterDumper
processingErrors *counterdumper.Dumper
}
// Initialize initializes a Stream.
@ -43,7 +43,7 @@ func (s *Stream) Initialize() error {
s.medias = make(map[*description.Media]*streamMedia)
s.readers = make(map[*Reader]struct{})
s.processingErrors = &counterdumper.CounterDumper{
s.processingErrors = &counterdumper.Dumper{
OnReport: func(val uint64) {
s.Parent.Log(logger.Warn, "%d processing %s",
val,

View file

@ -28,7 +28,7 @@ type streamFormat struct {
format format.Format
generateRTPPackets bool
fillNTP bool
processingErrors *counterdumper.CounterDumper
processingErrors *counterdumper.Dumper
parent logger.Writer
proc codecprocessor.Processor

View file

@ -12,7 +12,7 @@ type streamMedia struct {
media *description.Media
generateRTPPackets bool
fillNTP bool
processingErrors *counterdumper.CounterDumper
processingErrors *counterdumper.Dumper
parent logger.Writer
formats map[format.Format]*streamFormat