diff --git a/go.mod b/go.mod index 341306d4..a2d70982 100644 --- a/go.mod +++ b/go.mod @@ -10,7 +10,7 @@ require ( github.com/alecthomas/kong v1.9.0 github.com/asticode/go-astits v1.13.0 github.com/bluenviron/gohlslib/v2 v2.1.4-0.20250210133907-d3dddacbb9fc - github.com/bluenviron/gortsplib/v4 v4.12.4-0.20250323180412-1b127d70bb33 + github.com/bluenviron/gortsplib/v4 v4.12.4-0.20250324174248-61372cfa6800 github.com/bluenviron/mediacommon/v2 v2.0.1-0.20250324151931-b8ce69d15d3d github.com/datarhei/gosrt v0.9.0 github.com/fsnotify/fsnotify v1.8.0 diff --git a/go.sum b/go.sum index dd39d6cd..cff989ba 100644 --- a/go.sum +++ b/go.sum @@ -35,8 +35,8 @@ github.com/benburkert/openpgp v0.0.0-20160410205803-c2471f86866c h1:8XZeJrs4+ZYh github.com/benburkert/openpgp v0.0.0-20160410205803-c2471f86866c/go.mod h1:x1vxHcL/9AVzuk5HOloOEPrtJY0MaalYr78afXZ+pWI= github.com/bluenviron/gohlslib/v2 v2.1.4-0.20250210133907-d3dddacbb9fc h1:t1i9foTQ+RfFT5Ke9HV845zWtz2vtWQCWV8ZXvpzM4g= github.com/bluenviron/gohlslib/v2 v2.1.4-0.20250210133907-d3dddacbb9fc/go.mod h1:soTVqoidOT+L08hUSDreM7DebNyjjViUiEvpWlr7EIs= -github.com/bluenviron/gortsplib/v4 v4.12.4-0.20250323180412-1b127d70bb33 h1:6IJM70YqgIi/txfr2+9r7RHfLOXUhpIFueruqaIsZ64= -github.com/bluenviron/gortsplib/v4 v4.12.4-0.20250323180412-1b127d70bb33/go.mod h1:rEwUB2wda1rjnStH/mMu4SVHTLAAkZBalBp/zDlUbPc= +github.com/bluenviron/gortsplib/v4 v4.12.4-0.20250324174248-61372cfa6800 h1:WK8ynLNe5UNxAkB5je95vhwifCWe/GK+ZjW3ybO7rAY= +github.com/bluenviron/gortsplib/v4 v4.12.4-0.20250324174248-61372cfa6800/go.mod h1:rEwUB2wda1rjnStH/mMu4SVHTLAAkZBalBp/zDlUbPc= github.com/bluenviron/mediacommon/v2 v2.0.1-0.20250324151931-b8ce69d15d3d h1:AlIFt4i8ex3cGfoxLS3JoYVzSP4MgL9aMH/rp6kiYN4= github.com/bluenviron/mediacommon/v2 v2.0.1-0.20250324151931-b8ce69d15d3d/go.mod h1:iHEz1SFIet6zBwAQoh1a92vTQ3dV3LpVFbom6/SLz3k= github.com/bytedance/sonic v1.12.6 h1:/isNmCUF2x3Sh8RAp/4mh4ZGkcFAX/hLrzrK3AvpRzk= diff --git a/internal/core/path.go b/internal/core/path.go index f261cd10..a74c81bc 100644 --- a/internal/core/path.go +++ b/internal/core/path.go @@ -700,7 +700,7 @@ func (pa *path) setReady(desc *description.Session, allocateEncoder bool) error UDPMaxPayloadSize: pa.udpMaxPayloadSize, Desc: desc, GenerateRTPPackets: allocateEncoder, - DecodeErrLogger: logger.NewLimitedLogger(pa.source), + Parent: pa.source, } err := pa.stream.Initialize() if err != nil { diff --git a/internal/counterdumper/counterdumper.go b/internal/counterdumper/counterdumper.go new file mode 100644 index 00000000..63ddafd9 --- /dev/null +++ b/internal/counterdumper/counterdumper.go @@ -0,0 +1,66 @@ +// Package counterdumper contains a counter that that periodically invokes a callback if the counter is not zero. +package counterdumper + +import ( + "sync/atomic" + "time" +) + +const ( + callbackPeriod = 1 * time.Second +) + +// CounterDumper is a counter that periodically invokes a callback if the counter is not zero. +type CounterDumper struct { + OnReport func(v uint64) + + counter *uint64 + + terminate chan struct{} + done chan struct{} +} + +// Start starts the counter. +func (c *CounterDumper) Start() { + c.counter = new(uint64) + c.terminate = make(chan struct{}) + c.done = make(chan struct{}) + + go c.run() +} + +// Stop stops the counter. +func (c *CounterDumper) Stop() { + close(c.terminate) + <-c.done +} + +// Increase increases the counter value by 1. +func (c *CounterDumper) Increase() { + atomic.AddUint64(c.counter, 1) +} + +// Add adds value to the counter. +func (c *CounterDumper) Add(v uint64) { + atomic.AddUint64(c.counter, v) +} + +func (c *CounterDumper) run() { + defer close(c.done) + + t := time.NewTicker(callbackPeriod) + defer t.Stop() + + for { + select { + case <-c.terminate: + return + + case <-t.C: + v := atomic.SwapUint64(c.counter, 0) + if v != 0 { + c.OnReport(v) + } + } + } +} diff --git a/internal/counterdumper/counterdumper_test.go b/internal/counterdumper/counterdumper_test.go new file mode 100644 index 00000000..1323ea4e --- /dev/null +++ b/internal/counterdumper/counterdumper_test.go @@ -0,0 +1,42 @@ +package counterdumper + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestCounterDumperReport(t *testing.T) { + done := make(chan struct{}) + + c := &CounterDumper{ + OnReport: func(v uint64) { + require.Equal(t, uint64(3), v) + close(done) + }, + } + c.Start() + defer c.Stop() + + c.Add(2) + c.Increase() + + select { + case <-done: + case <-time.After(2 * time.Second): + t.Errorf("should not happen") + } +} + +func TestCounterDumperDoNotReport(t *testing.T) { + c := &CounterDumper{ + OnReport: func(_ uint64) { + t.Errorf("should not happen") + }, + } + c.Start() + defer c.Stop() + + <-time.After(2 * time.Second) +} diff --git a/internal/logger/limited_logger.go b/internal/logger/limited_logger.go deleted file mode 100644 index 445bef23..00000000 --- a/internal/logger/limited_logger.go +++ /dev/null @@ -1,34 +0,0 @@ -package logger - -import ( - "sync" - "time" -) - -const ( - minIntervalBetweenWarnings = 1 * time.Second -) - -type limitedLogger struct { - w Writer - mutex sync.Mutex - lastPrinted time.Time -} - -// NewLimitedLogger is a wrapper around a Writer that limits printed messages. -func NewLimitedLogger(w Writer) Writer { - return &limitedLogger{ - w: w, - } -} - -// Log is the main logging function. -func (l *limitedLogger) Log(level Level, format string, args ...interface{}) { - now := time.Now() - l.mutex.Lock() - if now.Sub(l.lastPrinted) >= minIntervalBetweenWarnings { - l.lastPrinted = now - l.w.Log(level, format, args...) - } - l.mutex.Unlock() -} diff --git a/internal/protocols/hls/from_stream_test.go b/internal/protocols/hls/from_stream_test.go index 6de6866e..e048ce9f 100644 --- a/internal/protocols/hls/from_stream_test.go +++ b/internal/protocols/hls/from_stream_test.go @@ -22,7 +22,7 @@ func TestFromStreamNoSupportedCodecs(t *testing.T) { Formats: []format.Format{&format.VP8{}}, }}}, GenerateRTPPackets: true, - DecodeErrLogger: test.NilLogger, + Parent: test.NilLogger, } err := strm.Initialize() require.NoError(t, err) @@ -56,7 +56,7 @@ func TestFromStreamSkipUnsupportedTracks(t *testing.T) { }, }}, GenerateRTPPackets: true, - DecodeErrLogger: test.NilLogger, + Parent: test.NilLogger, } err := strm.Initialize() require.NoError(t, err) diff --git a/internal/protocols/mpegts/from_stream_test.go b/internal/protocols/mpegts/from_stream_test.go index 45dd26d6..76dc3fc5 100644 --- a/internal/protocols/mpegts/from_stream_test.go +++ b/internal/protocols/mpegts/from_stream_test.go @@ -21,7 +21,7 @@ func TestFromStreamNoSupportedCodecs(t *testing.T) { Formats: []format.Format{&format.VP8{}}, }}}, GenerateRTPPackets: true, - DecodeErrLogger: test.NilLogger, + Parent: test.NilLogger, } err := strm.Initialize() require.NoError(t, err) @@ -49,7 +49,7 @@ func TestFromStreamSkipUnsupportedTracks(t *testing.T) { }, }}, GenerateRTPPackets: true, - DecodeErrLogger: test.NilLogger, + Parent: test.NilLogger, } err := strm.Initialize() require.NoError(t, err) diff --git a/internal/protocols/rtmp/from_stream_test.go b/internal/protocols/rtmp/from_stream_test.go index b5c6bab5..ca440f58 100644 --- a/internal/protocols/rtmp/from_stream_test.go +++ b/internal/protocols/rtmp/from_stream_test.go @@ -24,7 +24,7 @@ func TestFromStreamNoSupportedCodecs(t *testing.T) { Formats: []format.Format{&format.VP8{}}, }}}, GenerateRTPPackets: true, - DecodeErrLogger: test.NilLogger, + Parent: test.NilLogger, } err := strm.Initialize() require.NoError(t, err) @@ -56,7 +56,7 @@ func TestFromStreamSkipUnsupportedTracks(t *testing.T) { }, }}, GenerateRTPPackets: true, - DecodeErrLogger: test.NilLogger, + Parent: test.NilLogger, } err := strm.Initialize() require.NoError(t, err) diff --git a/internal/protocols/webrtc/from_stream_test.go b/internal/protocols/webrtc/from_stream_test.go index 530ae733..15386b3e 100644 --- a/internal/protocols/webrtc/from_stream_test.go +++ b/internal/protocols/webrtc/from_stream_test.go @@ -21,7 +21,7 @@ func TestFromStreamNoSupportedCodecs(t *testing.T) { Formats: []format.Format{&format.MJPEG{}}, }}}, GenerateRTPPackets: true, - DecodeErrLogger: test.NilLogger, + Parent: test.NilLogger, } err := strm.Initialize() require.NoError(t, err) @@ -49,7 +49,7 @@ func TestFromStreamSkipUnsupportedTracks(t *testing.T) { }, }}, GenerateRTPPackets: true, - DecodeErrLogger: test.NilLogger, + Parent: test.NilLogger, } err := strm.Initialize() require.NoError(t, err) @@ -85,7 +85,7 @@ func TestFromStream(t *testing.T) { }}, }, GenerateRTPPackets: false, - DecodeErrLogger: test.NilLogger, + Parent: test.NilLogger, } err := strm.Initialize() require.NoError(t, err) diff --git a/internal/protocols/webrtc/incoming_track.go b/internal/protocols/webrtc/incoming_track.go index 0074ef7e..9280a6a5 100644 --- a/internal/protocols/webrtc/incoming_track.go +++ b/internal/protocols/webrtc/incoming_track.go @@ -3,12 +3,12 @@ package webrtc import ( "time" - "github.com/bluenviron/gortsplib/v4/pkg/liberrors" "github.com/bluenviron/gortsplib/v4/pkg/rtpreorderer" "github.com/pion/rtcp" "github.com/pion/rtp" "github.com/pion/webrtc/v4" + "github.com/bluenviron/mediamtx/internal/counterdumper" "github.com/bluenviron/mediamtx/internal/logger" ) @@ -238,10 +238,11 @@ var incomingAudioCodecs = []webrtc.RTPCodecParameters{ type IncomingTrack struct { OnPacketRTP func(*rtp.Packet) - track *webrtc.TrackRemote - receiver *webrtc.RTPReceiver - writeRTCP func([]rtcp.Packet) error - log logger.Writer + track *webrtc.TrackRemote + receiver *webrtc.RTPReceiver + writeRTCP func([]rtcp.Packet) error + log logger.Writer + packetsLost *counterdumper.CounterDumper } func (t *IncomingTrack) initialize() { @@ -259,6 +260,20 @@ func (*IncomingTrack) PTSEqualsDTS(*rtp.Packet) bool { } func (t *IncomingTrack) start() { + t.packetsLost = &counterdumper.CounterDumper{ + OnReport: func(val uint64) { + t.log.Log(logger.Warn, "%d RTP %s lost", + val, + func() string { + if val == 1 { + return "packet" + } + return "packets" + }()) + }, + } + t.packetsLost.Start() + // read incoming RTCP packets to make interceptors work go func() { buf := make([]byte, 1500) @@ -301,7 +316,7 @@ func (t *IncomingTrack) start() { packets, lost := reorderer.Process(pkt) if lost != 0 { - t.log.Log(logger.Warn, (liberrors.ErrClientRTPPacketsLost{Lost: lost}).Error()) + t.packetsLost.Add(uint64(lost)) // do not return } @@ -316,3 +331,9 @@ func (t *IncomingTrack) start() { } }() } + +func (t *IncomingTrack) stop() { + if t.packetsLost != nil { + t.packetsLost.Stop() + } +} diff --git a/internal/protocols/webrtc/peer_connection.go b/internal/protocols/webrtc/peer_connection.go index ecc86986..7b76fa49 100644 --- a/internal/protocols/webrtc/peer_connection.go +++ b/internal/protocols/webrtc/peer_connection.go @@ -325,8 +325,13 @@ func (co *PeerConnection) Start() error { // Close closes the connection. func (co *PeerConnection) Close() { + for _, track := range co.incomingTracks { + track.stop() + } + co.ctxCancel() co.wr.Close() //nolint:errcheck + <-co.done } diff --git a/internal/recorder/recorder_test.go b/internal/recorder/recorder_test.go index 3ac2a0b5..64951d08 100644 --- a/internal/recorder/recorder_test.go +++ b/internal/recorder/recorder_test.go @@ -129,7 +129,7 @@ func TestRecorder(t *testing.T) { UDPMaxPayloadSize: 1472, Desc: desc, GenerateRTPPackets: true, - DecodeErrLogger: test.NilLogger, + Parent: test.NilLogger, } err := strm.Initialize() require.NoError(t, err) @@ -343,7 +343,7 @@ func TestRecorderFMP4NegativeDTS(t *testing.T) { UDPMaxPayloadSize: 1472, Desc: desc, GenerateRTPPackets: true, - DecodeErrLogger: test.NilLogger, + Parent: test.NilLogger, } err := strm.Initialize() require.NoError(t, err) @@ -431,7 +431,7 @@ func TestRecorderSkipTracksPartial(t *testing.T) { UDPMaxPayloadSize: 1472, Desc: desc, GenerateRTPPackets: true, - DecodeErrLogger: test.NilLogger, + Parent: test.NilLogger, } err := strm.Initialize() require.NoError(t, err) @@ -492,7 +492,7 @@ func TestRecorderSkipTracksFull(t *testing.T) { UDPMaxPayloadSize: 1472, Desc: desc, GenerateRTPPackets: true, - DecodeErrLogger: test.NilLogger, + Parent: test.NilLogger, } err := strm.Initialize() require.NoError(t, err) diff --git a/internal/servers/hls/server_test.go b/internal/servers/hls/server_test.go index cff75c9b..120f777a 100644 --- a/internal/servers/hls/server_test.go +++ b/internal/servers/hls/server_test.go @@ -159,7 +159,7 @@ func TestServerRead(t *testing.T) { UDPMaxPayloadSize: 1472, Desc: desc, GenerateRTPPackets: true, - DecodeErrLogger: test.NilLogger, + Parent: test.NilLogger, } err := strm.Initialize() require.NoError(t, err) @@ -259,7 +259,7 @@ func TestServerRead(t *testing.T) { UDPMaxPayloadSize: 1472, Desc: desc, GenerateRTPPackets: true, - DecodeErrLogger: test.NilLogger, + Parent: test.NilLogger, } err := strm.Initialize() require.NoError(t, err) @@ -364,7 +364,7 @@ func TestDirectory(t *testing.T) { UDPMaxPayloadSize: 1472, Desc: desc, GenerateRTPPackets: true, - DecodeErrLogger: test.NilLogger, + Parent: test.NilLogger, } err = strm.Initialize() require.NoError(t, err) diff --git a/internal/servers/rtmp/server_test.go b/internal/servers/rtmp/server_test.go index a4ae2f59..25ce0c71 100644 --- a/internal/servers/rtmp/server_test.go +++ b/internal/servers/rtmp/server_test.go @@ -43,7 +43,7 @@ func (p *dummyPath) StartPublisher(req defs.PathStartPublisherReq) (*stream.Stre UDPMaxPayloadSize: 1472, Desc: req.Desc, GenerateRTPPackets: true, - DecodeErrLogger: test.NilLogger, + Parent: test.NilLogger, } err := p.stream.Initialize() if err != nil { @@ -200,7 +200,7 @@ func TestServerRead(t *testing.T) { UDPMaxPayloadSize: 1472, Desc: desc, GenerateRTPPackets: true, - DecodeErrLogger: test.NilLogger, + Parent: test.NilLogger, } err := strm.Initialize() require.NoError(t, err) diff --git a/internal/servers/rtsp/server.go b/internal/servers/rtsp/server.go index ffa7eb63..7a0b4f84 100644 --- a/internal/servers/rtsp/server.go +++ b/internal/servers/rtsp/server.go @@ -306,10 +306,10 @@ func (s *Server) OnPause(ctx *gortsplib.ServerHandlerOnPauseCtx) (*base.Response return se.onPause(ctx) } -// OnPacketLost implements gortsplib.ServerHandlerOnDecodeError. -func (s *Server) OnPacketLost(ctx *gortsplib.ServerHandlerOnPacketLostCtx) { +// OnPacketsLost implements gortsplib.ServerHandlerOnPacketsLost. +func (s *Server) OnPacketsLost(ctx *gortsplib.ServerHandlerOnPacketsLostCtx) { se := ctx.Session.UserData().(*session) - se.onPacketLost(ctx) + se.onPacketsLost(ctx) } // OnDecodeError implements gortsplib.ServerHandlerOnDecodeError. diff --git a/internal/servers/rtsp/server_test.go b/internal/servers/rtsp/server_test.go index 060d11a8..2c6ee05a 100644 --- a/internal/servers/rtsp/server_test.go +++ b/internal/servers/rtsp/server_test.go @@ -43,7 +43,7 @@ func (p *dummyPath) StartPublisher(req defs.PathStartPublisherReq) (*stream.Stre UDPMaxPayloadSize: 1472, Desc: req.Desc, GenerateRTPPackets: true, - DecodeErrLogger: test.NilLogger, + Parent: test.NilLogger, } err := p.stream.Initialize() if err != nil { @@ -152,7 +152,7 @@ func TestServerRead(t *testing.T) { UDPMaxPayloadSize: 1472, Desc: desc, GenerateRTPPackets: true, - DecodeErrLogger: test.NilLogger, + Parent: test.NilLogger, } err := strm.Initialize() require.NoError(t, err) @@ -262,7 +262,7 @@ func TestServerRedirect(t *testing.T) { UDPMaxPayloadSize: 1472, Desc: desc, GenerateRTPPackets: true, - DecodeErrLogger: test.NilLogger, + Parent: test.NilLogger, } err := strm.Initialize() require.NoError(t, err) diff --git a/internal/servers/rtsp/session.go b/internal/servers/rtsp/session.go index 8895ae2b..8fa00e14 100644 --- a/internal/servers/rtsp/session.go +++ b/internal/servers/rtsp/session.go @@ -15,6 +15,7 @@ import ( "github.com/bluenviron/mediamtx/internal/auth" "github.com/bluenviron/mediamtx/internal/conf" + "github.com/bluenviron/mediamtx/internal/counterdumper" "github.com/bluenviron/mediamtx/internal/defs" "github.com/bluenviron/mediamtx/internal/externalcmd" "github.com/bluenviron/mediamtx/internal/hooks" @@ -42,22 +43,65 @@ type session struct { transport *gortsplib.Transport pathName string query string - decodeErrLogger logger.Writer - writeErrLogger logger.Writer + packetsLost *counterdumper.CounterDumper + decodeErrors *counterdumper.CounterDumper + discardedFrames *counterdumper.CounterDumper } func (s *session) initialize() { s.uuid = uuid.New() s.created = time.Now() - s.decodeErrLogger = logger.NewLimitedLogger(s) - s.writeErrLogger = logger.NewLimitedLogger(s) + s.packetsLost = &counterdumper.CounterDumper{ + OnReport: func(val uint64) { + s.Log(logger.Warn, "%d RTP %s lost", + val, + func() string { + if val == 1 { + return "packet" + } + return "packets" + }()) + }, + } + s.packetsLost.Start() + + s.decodeErrors = &counterdumper.CounterDumper{ + OnReport: func(val uint64) { + s.Log(logger.Warn, "%s decode %s", + val, + func() string { + if val == 1 { + return "error" + } + return "errors" + }()) + }, + } + s.decodeErrors.Start() + + s.discardedFrames = &counterdumper.CounterDumper{ + OnReport: func(val uint64) { + s.Log(logger.Warn, "connection is too slow, discarding %d %s", + val, + func() string { + if val == 1 { + return "frame" + } + return "frames" + }()) + }, + } + s.discardedFrames.Start() s.Log(logger.Info, "created by %v", s.rconn.NetConn().RemoteAddr()) } // Close closes a Session. func (s *session) Close() { + s.discardedFrames.Stop() + s.decodeErrors.Stop() + s.packetsLost.Stop() s.rsession.Close() } @@ -341,18 +385,19 @@ func (s *session) APISourceDescribe() defs.APIPathSourceOrReader { } // onPacketLost is called by rtspServer. -func (s *session) onPacketLost(ctx *gortsplib.ServerHandlerOnPacketLostCtx) { - s.decodeErrLogger.Log(logger.Warn, ctx.Error.Error()) +func (s *session) onPacketsLost(ctx *gortsplib.ServerHandlerOnPacketsLostCtx) { + s.packetsLost.Add(ctx.Lost) } // onDecodeError is called by rtspServer. -func (s *session) onDecodeError(ctx *gortsplib.ServerHandlerOnDecodeErrorCtx) { - s.decodeErrLogger.Log(logger.Warn, ctx.Error.Error()) +func (s *session) onDecodeError(_ *gortsplib.ServerHandlerOnDecodeErrorCtx) { + s.decodeErrors.Increase() } // onStreamWriteError is called by rtspServer. -func (s *session) onStreamWriteError(ctx *gortsplib.ServerHandlerOnStreamWriteErrorCtx) { - s.writeErrLogger.Log(logger.Warn, ctx.Error.Error()) +func (s *session) onStreamWriteError(_ *gortsplib.ServerHandlerOnStreamWriteErrorCtx) { + // currently the only error returned by OnStreamWriteError is ErrServerWriteQueueFull + s.discardedFrames.Increase() } func (s *session) apiItem() *defs.APIRTSPSession { diff --git a/internal/servers/srt/conn.go b/internal/servers/srt/conn.go index 1d948b90..ebab068a 100644 --- a/internal/servers/srt/conn.go +++ b/internal/servers/srt/conn.go @@ -16,6 +16,7 @@ import ( "github.com/bluenviron/mediamtx/internal/auth" "github.com/bluenviron/mediamtx/internal/conf" + "github.com/bluenviron/mediamtx/internal/counterdumper" "github.com/bluenviron/mediamtx/internal/defs" "github.com/bluenviron/mediamtx/internal/externalcmd" "github.com/bluenviron/mediamtx/internal/hooks" @@ -207,10 +208,24 @@ func (c *conn) runPublishReader(sconn srt.Conn, path defs.Path) error { return err } - decodeErrLogger := logger.NewLimitedLogger(c) + decodeErrors := &counterdumper.CounterDumper{ + OnReport: func(val uint64) { + c.Log(logger.Warn, "%s decode %s", + val, + func() string { + if val == 1 { + return "error" + } + return "errors" + }()) + }, + } - r.OnDecodeError(func(err error) { - decodeErrLogger.Log(logger.Warn, err.Error()) + decodeErrors.Start() + defer decodeErrors.Stop() + + r.OnDecodeError(func(_ error) { + decodeErrors.Increase() }) var stream *stream.Stream diff --git a/internal/servers/srt/server_test.go b/internal/servers/srt/server_test.go index 0b46c23c..6efb9a29 100644 --- a/internal/servers/srt/server_test.go +++ b/internal/servers/srt/server_test.go @@ -40,7 +40,7 @@ func (p *dummyPath) StartPublisher(req defs.PathStartPublisherReq) (*stream.Stre UDPMaxPayloadSize: 1472, Desc: req.Desc, GenerateRTPPackets: true, - DecodeErrLogger: test.NilLogger, + Parent: test.NilLogger, } err := p.stream.Initialize() if err != nil { @@ -176,7 +176,7 @@ func TestServerRead(t *testing.T) { UDPMaxPayloadSize: 1472, Desc: desc, GenerateRTPPackets: true, - DecodeErrLogger: test.NilLogger, + Parent: test.NilLogger, } err = strm.Initialize() require.NoError(t, err) diff --git a/internal/servers/webrtc/server_test.go b/internal/servers/webrtc/server_test.go index a17f87e3..cdfb234f 100644 --- a/internal/servers/webrtc/server_test.go +++ b/internal/servers/webrtc/server_test.go @@ -57,7 +57,7 @@ func (p *dummyPath) StartPublisher(req defs.PathStartPublisherReq) (*stream.Stre UDPMaxPayloadSize: 1472, Desc: req.Desc, GenerateRTPPackets: true, - DecodeErrLogger: test.NilLogger, + Parent: test.NilLogger, } err := p.stream.Initialize() if err != nil { @@ -511,7 +511,7 @@ func TestServerRead(t *testing.T) { UDPMaxPayloadSize: 1472, Desc: desc, GenerateRTPPackets: reflect.TypeOf(ca.unit) != reflect.TypeOf(&unit.Generic{}), - DecodeErrLogger: test.NilLogger, + Parent: test.NilLogger, } err := strm.Initialize() require.NoError(t, err) diff --git a/internal/staticsources/hls/source.go b/internal/staticsources/hls/source.go index 54da37a6..a6a44543 100644 --- a/internal/staticsources/hls/source.go +++ b/internal/staticsources/hls/source.go @@ -9,6 +9,7 @@ import ( "github.com/bluenviron/gortsplib/v4/pkg/description" "github.com/bluenviron/mediamtx/internal/conf" + "github.com/bluenviron/mediamtx/internal/counterdumper" "github.com/bluenviron/mediamtx/internal/defs" "github.com/bluenviron/mediamtx/internal/logger" "github.com/bluenviron/mediamtx/internal/protocols/hls" @@ -37,7 +38,21 @@ func (s *Source) Run(params defs.StaticSourceRunParams) error { } }() - decodeErrLogger := logger.NewLimitedLogger(s) + decodeErrors := &counterdumper.CounterDumper{ + OnReport: func(val uint64) { + s.Log(logger.Warn, "%s decode %s", + val, + func() string { + if val == 1 { + return "error" + } + return "errors" + }()) + }, + } + + decodeErrors.Start() + defer decodeErrors.Stop() tr := &http.Transport{ TLSClientConfig: tls.ConfigForFingerprint(params.Conf.SourceFingerprint), @@ -63,8 +78,8 @@ func (s *Source) Run(params defs.StaticSourceRunParams) error { OnDownloadPart: func(u string) { s.Log(logger.Debug, "downloading part %v", u) }, - OnDecodeError: func(err error) { - decodeErrLogger.Log(logger.Warn, err.Error()) + OnDecodeError: func(_ error) { + decodeErrors.Increase() }, OnTracks: func(tracks []*gohlslib.Track) error { medias, err := hls.ToStream(c, tracks, &stream) diff --git a/internal/staticsources/rtsp/source.go b/internal/staticsources/rtsp/source.go index 545343a0..121323de 100644 --- a/internal/staticsources/rtsp/source.go +++ b/internal/staticsources/rtsp/source.go @@ -10,6 +10,7 @@ import ( "github.com/pion/rtp" "github.com/bluenviron/mediamtx/internal/conf" + "github.com/bluenviron/mediamtx/internal/counterdumper" "github.com/bluenviron/mediamtx/internal/defs" "github.com/bluenviron/mediamtx/internal/logger" "github.com/bluenviron/mediamtx/internal/protocols/tls" @@ -77,7 +78,37 @@ func (s *Source) Log(level logger.Level, format string, args ...interface{}) { func (s *Source) Run(params defs.StaticSourceRunParams) error { s.Log(logger.Debug, "connecting") - decodeErrLogger := logger.NewLimitedLogger(s) + packetsLost := &counterdumper.CounterDumper{ + OnReport: func(val uint64) { + s.Log(logger.Warn, "%d RTP %s lost", + val, + func() string { + if val == 1 { + return "packet" + } + return "packets" + }()) + }, + } + + packetsLost.Start() + defer packetsLost.Stop() + + decodeErrors := &counterdumper.CounterDumper{ + OnReport: func(val uint64) { + s.Log(logger.Warn, "%s decode %s", + val, + func() string { + if val == 1 { + return "error" + } + return "errors" + }()) + }, + } + + decodeErrors.Start() + defer decodeErrors.Stop() c := &gortsplib.Client{ Transport: params.Conf.RTSPTransport.Transport, @@ -95,11 +126,11 @@ func (s *Source) Run(params defs.StaticSourceRunParams) error { OnTransportSwitch: func(err error) { s.Log(logger.Warn, err.Error()) }, - OnPacketLost: func(err error) { - decodeErrLogger.Log(logger.Warn, err.Error()) + OnPacketsLost: func(lost uint64) { + packetsLost.Add(lost) }, - OnDecodeError: func(err error) { - decodeErrLogger.Log(logger.Warn, err.Error()) + OnDecodeError: func(_ error) { + decodeErrors.Increase() }, } diff --git a/internal/staticsources/srt/source.go b/internal/staticsources/srt/source.go index 50cec79f..7aef1d6a 100644 --- a/internal/staticsources/srt/source.go +++ b/internal/staticsources/srt/source.go @@ -9,6 +9,7 @@ import ( srt "github.com/datarhei/gosrt" "github.com/bluenviron/mediamtx/internal/conf" + "github.com/bluenviron/mediamtx/internal/counterdumper" "github.com/bluenviron/mediamtx/internal/defs" "github.com/bluenviron/mediamtx/internal/logger" "github.com/bluenviron/mediamtx/internal/protocols/mpegts" @@ -75,10 +76,24 @@ func (s *Source) runReader(sconn srt.Conn) error { return err } - decodeErrLogger := logger.NewLimitedLogger(s) + decodeErrors := &counterdumper.CounterDumper{ + OnReport: func(val uint64) { + s.Log(logger.Warn, "%s decode %s", + val, + func() string { + if val == 1 { + return "error" + } + return "errors" + }()) + }, + } - r.OnDecodeError(func(err error) { - decodeErrLogger.Log(logger.Warn, err.Error()) + decodeErrors.Start() + defer decodeErrors.Stop() + + r.OnDecodeError(func(_ error) { + decodeErrors.Increase() }) var stream *stream.Stream diff --git a/internal/staticsources/udp/source.go b/internal/staticsources/udp/source.go index b5060de0..9ab88440 100644 --- a/internal/staticsources/udp/source.go +++ b/internal/staticsources/udp/source.go @@ -11,6 +11,7 @@ import ( mcmpegts "github.com/bluenviron/mediacommon/v2/pkg/formats/mpegts" "github.com/bluenviron/mediamtx/internal/conf" + "github.com/bluenviron/mediamtx/internal/counterdumper" "github.com/bluenviron/mediamtx/internal/defs" "github.com/bluenviron/mediamtx/internal/logger" "github.com/bluenviron/mediamtx/internal/protocols/mpegts" @@ -112,10 +113,24 @@ func (s *Source) runReader(pc net.PacketConn) error { return err } - decodeErrLogger := logger.NewLimitedLogger(s) + decodeErrors := &counterdumper.CounterDumper{ + OnReport: func(val uint64) { + s.Log(logger.Warn, "%s decode %s", + val, + func() string { + if val == 1 { + return "error" + } + return "errors" + }()) + }, + } - r.OnDecodeError(func(err error) { - decodeErrLogger.Log(logger.Warn, err.Error()) + decodeErrors.Start() + defer decodeErrors.Stop() + + r.OnDecodeError(func(_ error) { + decodeErrors.Increase() }) var stream *stream.Stream diff --git a/internal/stream/stream.go b/internal/stream/stream.go index 992797fd..acf75971 100644 --- a/internal/stream/stream.go +++ b/internal/stream/stream.go @@ -11,6 +11,7 @@ import ( "github.com/bluenviron/gortsplib/v4/pkg/format" "github.com/pion/rtp" + "github.com/bluenviron/mediamtx/internal/counterdumper" "github.com/bluenviron/mediamtx/internal/logger" "github.com/bluenviron/mediamtx/internal/unit" ) @@ -30,7 +31,7 @@ type Stream struct { UDPMaxPayloadSize int Desc *description.Session GenerateRTPPackets bool - DecodeErrLogger logger.Writer + Parent logger.Writer bytesReceived *uint64 bytesSent *uint64 @@ -39,6 +40,7 @@ type Stream struct { rtspStream *gortsplib.ServerStream rtspsStream *gortsplib.ServerStream streamReaders map[Reader]*streamReader + decodeErrors *counterdumper.CounterDumper readerRunning chan struct{} } @@ -51,12 +53,25 @@ func (s *Stream) Initialize() error { s.streamReaders = make(map[Reader]*streamReader) s.readerRunning = make(chan struct{}) + s.decodeErrors = &counterdumper.CounterDumper{ + OnReport: func(val uint64) { + s.Parent.Log(logger.Warn, "%s decode %s", + val, + func() string { + if val == 1 { + return "error" + } + return "errors" + }()) + }, + } + for _, media := range s.Desc.Medias { s.streamMedias[media] = &streamMedia{ UDPMaxPayloadSize: s.UDPMaxPayloadSize, Media: media, GenerateRTPPackets: s.GenerateRTPPackets, - DecodeErrLogger: s.DecodeErrLogger, + DecodeErrors: s.decodeErrors, } err := s.streamMedias[media].initialize() if err != nil { diff --git a/internal/stream/stream_format.go b/internal/stream/stream_format.go index cea65f6b..010b910e 100644 --- a/internal/stream/stream_format.go +++ b/internal/stream/stream_format.go @@ -8,8 +8,8 @@ import ( "github.com/bluenviron/gortsplib/v4/pkg/format" "github.com/pion/rtp" + "github.com/bluenviron/mediamtx/internal/counterdumper" "github.com/bluenviron/mediamtx/internal/formatprocessor" - "github.com/bluenviron/mediamtx/internal/logger" "github.com/bluenviron/mediamtx/internal/unit" ) @@ -22,10 +22,10 @@ func unitSize(u unit.Unit) uint64 { } type streamFormat struct { - udpMaxPayloadSize int - format format.Format - generateRTPPackets bool - decodeErrLogger logger.Writer + UDPMaxPayloadSize int + Format format.Format + GenerateRTPPackets bool + DecodeErrors *counterdumper.CounterDumper proc formatprocessor.Processor pausedReaders map[*streamReader]ReadFunc @@ -37,7 +37,7 @@ func (sf *streamFormat) initialize() error { sf.runningReaders = make(map[*streamReader]ReadFunc) var err error - sf.proc, err = formatprocessor.New(sf.udpMaxPayloadSize, sf.format, sf.generateRTPPackets) + sf.proc, err = formatprocessor.New(sf.UDPMaxPayloadSize, sf.Format, sf.GenerateRTPPackets) if err != nil { return err } @@ -64,7 +64,7 @@ func (sf *streamFormat) startReader(sr *streamReader) { func (sf *streamFormat) writeUnit(s *Stream, medi *description.Media, u unit.Unit) { err := sf.proc.ProcessUnit(u) if err != nil { - sf.decodeErrLogger.Log(logger.Warn, err.Error()) + sf.DecodeErrors.Increase() return } @@ -82,7 +82,7 @@ func (sf *streamFormat) writeRTPPacket( u, err := sf.proc.ProcessRTPPacket(pkt, ntp, pts, hasNonRTSPReaders) if err != nil { - sf.decodeErrLogger.Log(logger.Warn, err.Error()) + sf.DecodeErrors.Increase() return } diff --git a/internal/stream/stream_media.go b/internal/stream/stream_media.go index 1ba5b3a0..ced2edba 100644 --- a/internal/stream/stream_media.go +++ b/internal/stream/stream_media.go @@ -3,15 +3,14 @@ package stream import ( "github.com/bluenviron/gortsplib/v4/pkg/description" "github.com/bluenviron/gortsplib/v4/pkg/format" - - "github.com/bluenviron/mediamtx/internal/logger" + "github.com/bluenviron/mediamtx/internal/counterdumper" ) type streamMedia struct { UDPMaxPayloadSize int Media *description.Media GenerateRTPPackets bool - DecodeErrLogger logger.Writer + DecodeErrors *counterdumper.CounterDumper formats map[format.Format]*streamFormat } @@ -21,10 +20,10 @@ func (sm *streamMedia) initialize() error { for _, forma := range sm.Media.Formats { sf := &streamFormat{ - udpMaxPayloadSize: sm.UDPMaxPayloadSize, - format: forma, - generateRTPPackets: sm.GenerateRTPPackets, - decodeErrLogger: sm.DecodeErrLogger, + UDPMaxPayloadSize: sm.UDPMaxPayloadSize, + Format: forma, + GenerateRTPPackets: sm.GenerateRTPPackets, + DecodeErrors: sm.DecodeErrors, } err := sf.initialize() if err != nil { diff --git a/internal/stream/stream_reader.go b/internal/stream/stream_reader.go index a2e896f1..8bf6a85b 100644 --- a/internal/stream/stream_reader.go +++ b/internal/stream/stream_reader.go @@ -4,6 +4,7 @@ import ( "fmt" "github.com/bluenviron/gortsplib/v4/pkg/ringbuffer" + "github.com/bluenviron/mediamtx/internal/counterdumper" "github.com/bluenviron/mediamtx/internal/logger" ) @@ -11,16 +12,15 @@ type streamReader struct { queueSize int parent logger.Writer - writeErrLogger logger.Writer - buffer *ringbuffer.RingBuffer - started bool + buffer *ringbuffer.RingBuffer + started bool + discardedFrames *counterdumper.CounterDumper // out err chan error } func (w *streamReader) initialize() { - w.writeErrLogger = logger.NewLimitedLogger(w.parent) buffer, _ := ringbuffer.New(uint64(w.queueSize)) w.buffer = buffer w.err = make(chan error) @@ -28,12 +28,29 @@ func (w *streamReader) initialize() { func (w *streamReader) start() { w.started = true + + w.discardedFrames = &counterdumper.CounterDumper{ + OnReport: func(val uint64) { + w.parent.Log(logger.Warn, "connection is too slow, discarding %d %s", + val, + func() string { + if val == 1 { + return "frame" + } + return "frames" + }()) + }, + } + w.discardedFrames.Start() + go w.run() } func (w *streamReader) stop() { w.buffer.Close() + if w.started { + w.discardedFrames.Stop() <-w.err } } @@ -64,6 +81,6 @@ func (w *streamReader) runInner() error { func (w *streamReader) push(cb func() error) { ok := w.buffer.Push(cb) if !ok { - w.writeErrLogger.Log(logger.Warn, "write queue is full") + w.discardedFrames.Increase() } } diff --git a/internal/test/source_tester.go b/internal/test/source_tester.go index f12cad48..be5aa139 100644 --- a/internal/test/source_tester.go +++ b/internal/test/source_tester.go @@ -69,7 +69,7 @@ func (t *SourceTester) SetReady(req defs.PathSourceStaticSetReadyReq) defs.PathS UDPMaxPayloadSize: 1472, Desc: req.Desc, GenerateRTPPackets: req.GenerateRTPPackets, - DecodeErrLogger: t, + Parent: t, } err := t.stream.Initialize() if err != nil {