mediamtx/internal/stream/reader.go
Alessandro Ros f5f03562d3
Some checks are pending
code_lint / go (push) Waiting to run
code_lint / go_mod (push) Waiting to run
code_lint / docs (push) Waiting to run
code_lint / api_docs (push) Waiting to run
code_test / test_64 (push) Waiting to run
code_test / test_32 (push) Waiting to run
code_test / test_e2e (push) Waiting to run
rewrite Unit definition (#5079)
Stream units now share the same struct, with a specialized payload.
2025-10-11 12:18:51 +02:00

113 lines
2.4 KiB
Go

package stream
import (
"fmt"
"github.com/bluenviron/gortsplib/v5/pkg/description"
"github.com/bluenviron/gortsplib/v5/pkg/format"
"github.com/bluenviron/gortsplib/v5/pkg/ringbuffer"
"github.com/bluenviron/mediamtx/internal/counterdumper"
"github.com/bluenviron/mediamtx/internal/logger"
"github.com/bluenviron/mediamtx/internal/unit"
)
// OnDataFunc is the callback passed to OnData().
type OnDataFunc func(*unit.Unit) error
// Reader is a stream reader.
type Reader struct {
SkipBytesSent bool
Parent logger.Writer
onDatas map[*description.Media]map[format.Format]OnDataFunc
queueSize int
buffer *ringbuffer.RingBuffer
discardedFrames *counterdumper.CounterDumper
// out
err chan error
}
// OnData registers a callback that is called when data from given format is available.
func (r *Reader) OnData(medi *description.Media, forma format.Format, cb OnDataFunc) {
if r.onDatas == nil {
r.onDatas = make(map[*description.Media]map[format.Format]OnDataFunc)
}
if r.onDatas[medi] == nil {
r.onDatas[medi] = make(map[format.Format]OnDataFunc)
}
r.onDatas[medi][forma] = cb
}
// Formats returns all formats for which the reader has registered a OnData callback.
func (r *Reader) Formats() []format.Format {
var out []format.Format
for _, formats := range r.onDatas {
for forma := range formats {
out = append(out, forma)
}
}
return out
}
// error returns whenever there's an error.
// It can be called only after stream.AddReader().
func (r *Reader) Error() chan error {
return r.err
}
func (r *Reader) start() {
buffer, _ := ringbuffer.New(uint64(r.queueSize))
r.buffer = buffer
r.err = make(chan error)
r.discardedFrames = &counterdumper.CounterDumper{
OnReport: func(val uint64) {
r.Parent.Log(logger.Warn, "reader is too slow, discarding %d %s",
val,
func() string {
if val == 1 {
return "frame"
}
return "frames"
}())
},
}
r.discardedFrames.Start()
go r.run()
}
func (r *Reader) stop() {
r.buffer.Close()
r.discardedFrames.Stop()
<-r.err
}
func (r *Reader) run() {
r.err <- r.runInner()
close(r.err)
}
func (r *Reader) runInner() error {
for {
cb, ok := r.buffer.Pull()
if !ok {
return fmt.Errorf("terminated")
}
err := cb.(func() error)()
if err != nil {
return err
}
}
}
func (r *Reader) push(cb func() error) {
ok := r.buffer.Push(cb)
if !ok {
r.discardedFrames.Increase()
}
}