mediamtx/internal/stream/stream_reader.go
Alessandro Ros 4c3ac34425
Some checks are pending
code_lint / golangci_lint (push) Waiting to run
code_lint / mod_tidy (push) Waiting to run
code_lint / api_docs (push) Waiting to run
code_test / test_64 (push) Waiting to run
code_test / test_32 (push) Waiting to run
code_test / test_highlevel (push) Waiting to run
fix memory leak in case of errors during initialization of a reader (#3831)
2024-10-05 00:49:44 +02:00

69 lines
1.1 KiB
Go

package stream
import (
"fmt"
"github.com/bluenviron/gortsplib/v4/pkg/ringbuffer"
"github.com/bluenviron/mediamtx/internal/logger"
)
type streamReader struct {
queueSize int
parent logger.Writer
writeErrLogger logger.Writer
buffer *ringbuffer.RingBuffer
started bool
// out
err chan error
}
func (w *streamReader) initialize() {
w.writeErrLogger = logger.NewLimitedLogger(w.parent)
buffer, _ := ringbuffer.New(uint64(w.queueSize))
w.buffer = buffer
w.err = make(chan error)
}
func (w *streamReader) start() {
w.started = true
go w.run()
}
func (w *streamReader) stop() {
w.buffer.Close()
if w.started {
<-w.err
}
}
func (w *streamReader) error() chan error {
return w.err
}
func (w *streamReader) run() {
w.err <- w.runInner()
close(w.err)
}
func (w *streamReader) runInner() error {
for {
cb, ok := w.buffer.Pull()
if !ok {
return fmt.Errorf("terminated")
}
err := cb.(func() error)()
if err != nil {
return err
}
}
}
func (w *streamReader) push(cb func() error) {
ok := w.buffer.Push(cb)
if !ok {
w.writeErrLogger.Log(logger.Warn, "write queue is full")
}
}