rename stream decode errors into processing errors (#4370)
Some checks failed
code_lint / golangci_lint (push) Has been cancelled
code_lint / mod_tidy (push) Has been cancelled
code_lint / api_docs (push) Has been cancelled
code_test / test_64 (push) Has been cancelled
code_test / test_32 (push) Has been cancelled
code_test / test_highlevel (push) Has been cancelled

Stream errors include both errors from decoding RTP packets into
frames, and errors from encoding frames into RTP packets. "processing
errors" is more fit.
This commit is contained in:
Alessandro Ros 2025-03-26 15:14:08 +01:00 committed by GitHub
parent f851cb6961
commit 0d46cf3f74
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 32 additions and 32 deletions

View file

@ -33,14 +33,14 @@ type Stream struct {
GenerateRTPPackets bool
Parent logger.Writer
bytesReceived *uint64
bytesSent *uint64
streamMedias map[*description.Media]*streamMedia
mutex sync.RWMutex
rtspStream *gortsplib.ServerStream
rtspsStream *gortsplib.ServerStream
streamReaders map[Reader]*streamReader
decodeErrors *counterdumper.CounterDumper
bytesReceived *uint64
bytesSent *uint64
streamMedias map[*description.Media]*streamMedia
mutex sync.RWMutex
rtspStream *gortsplib.ServerStream
rtspsStream *gortsplib.ServerStream
streamReaders map[Reader]*streamReader
processingErrors *counterdumper.CounterDumper
readerRunning chan struct{}
}
@ -53,9 +53,9 @@ func (s *Stream) Initialize() error {
s.streamReaders = make(map[Reader]*streamReader)
s.readerRunning = make(chan struct{})
s.decodeErrors = &counterdumper.CounterDumper{
s.processingErrors = &counterdumper.CounterDumper{
OnReport: func(val uint64) {
s.Parent.Log(logger.Warn, "%d decode %s",
s.Parent.Log(logger.Warn, "%d processing %s",
val,
func() string {
if val == 1 {
@ -65,14 +65,14 @@ func (s *Stream) Initialize() error {
}())
},
}
s.decodeErrors.Start()
s.processingErrors.Start()
for _, media := range s.Desc.Medias {
s.streamMedias[media] = &streamMedia{
UDPMaxPayloadSize: s.UDPMaxPayloadSize,
Media: media,
GenerateRTPPackets: s.GenerateRTPPackets,
DecodeErrors: s.decodeErrors,
udpMaxPayloadSize: s.UDPMaxPayloadSize,
media: media,
generateRTPPackets: s.GenerateRTPPackets,
processingErrors: s.processingErrors,
}
err := s.streamMedias[media].initialize()
if err != nil {
@ -85,7 +85,7 @@ func (s *Stream) Initialize() error {
// Close closes all resources of the stream.
func (s *Stream) Close() {
s.decodeErrors.Stop()
s.processingErrors.Stop()
if s.rtspStream != nil {
s.rtspStream.Close()

View file

@ -22,10 +22,10 @@ func unitSize(u unit.Unit) uint64 {
}
type streamFormat struct {
UDPMaxPayloadSize int
Format format.Format
GenerateRTPPackets bool
DecodeErrors *counterdumper.CounterDumper
udpMaxPayloadSize int
format format.Format
generateRTPPackets bool
processingErrors *counterdumper.CounterDumper
proc formatprocessor.Processor
pausedReaders map[*streamReader]ReadFunc
@ -37,7 +37,7 @@ func (sf *streamFormat) initialize() error {
sf.runningReaders = make(map[*streamReader]ReadFunc)
var err error
sf.proc, err = formatprocessor.New(sf.UDPMaxPayloadSize, sf.Format, sf.GenerateRTPPackets)
sf.proc, err = formatprocessor.New(sf.udpMaxPayloadSize, sf.format, sf.generateRTPPackets)
if err != nil {
return err
}
@ -64,7 +64,7 @@ func (sf *streamFormat) startReader(sr *streamReader) {
func (sf *streamFormat) writeUnit(s *Stream, medi *description.Media, u unit.Unit) {
err := sf.proc.ProcessUnit(u)
if err != nil {
sf.DecodeErrors.Increase()
sf.processingErrors.Increase()
return
}
@ -82,7 +82,7 @@ func (sf *streamFormat) writeRTPPacket(
u, err := sf.proc.ProcessRTPPacket(pkt, ntp, pts, hasNonRTSPReaders)
if err != nil {
sf.DecodeErrors.Increase()
sf.processingErrors.Increase()
return
}

View file

@ -7,10 +7,10 @@ import (
)
type streamMedia struct {
UDPMaxPayloadSize int
Media *description.Media
GenerateRTPPackets bool
DecodeErrors *counterdumper.CounterDumper
udpMaxPayloadSize int
media *description.Media
generateRTPPackets bool
processingErrors *counterdumper.CounterDumper
formats map[format.Format]*streamFormat
}
@ -18,12 +18,12 @@ type streamMedia struct {
func (sm *streamMedia) initialize() error {
sm.formats = make(map[format.Format]*streamFormat)
for _, forma := range sm.Media.Formats {
for _, forma := range sm.media.Formats {
sf := &streamFormat{
UDPMaxPayloadSize: sm.UDPMaxPayloadSize,
Format: forma,
GenerateRTPPackets: sm.GenerateRTPPackets,
DecodeErrors: sm.DecodeErrors,
udpMaxPayloadSize: sm.udpMaxPayloadSize,
format: forma,
generateRTPPackets: sm.generateRTPPackets,
processingErrors: sm.processingErrors,
}
err := sf.initialize()
if err != nil {