mirror of
https://github.com/bluenviron/mediamtx.git
synced 2026-01-20 10:29:49 -08:00
Some checks failed
code_lint / golangci_lint (push) Has been cancelled
code_lint / mod_tidy (push) Has been cancelled
code_lint / api_docs (push) Has been cancelled
code_test / test_64 (push) Has been cancelled
code_test / test_32 (push) Has been cancelled
code_test / test_highlevel (push) Has been cancelled
Stream errors include both errors from decoding RTP packets into frames, and errors from encoding frames into RTP packets. "processing errors" is more fit.
278 lines
5.9 KiB
Go
278 lines
5.9 KiB
Go
// Package stream contains the Stream object.
|
|
package stream
|
|
|
|
import (
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/bluenviron/gortsplib/v4"
|
|
"github.com/bluenviron/gortsplib/v4/pkg/description"
|
|
"github.com/bluenviron/gortsplib/v4/pkg/format"
|
|
"github.com/pion/rtp"
|
|
|
|
"github.com/bluenviron/mediamtx/internal/counterdumper"
|
|
"github.com/bluenviron/mediamtx/internal/logger"
|
|
"github.com/bluenviron/mediamtx/internal/unit"
|
|
)
|
|
|
|
// Reader is a stream reader.
|
|
type Reader interface {
|
|
logger.Writer
|
|
}
|
|
|
|
// ReadFunc is the callback passed to AddReader().
|
|
type ReadFunc func(unit.Unit) error
|
|
|
|
// Stream is a media stream.
|
|
// It stores tracks, readers and allows to write data to readers.
|
|
type Stream struct {
|
|
WriteQueueSize int
|
|
UDPMaxPayloadSize int
|
|
Desc *description.Session
|
|
GenerateRTPPackets bool
|
|
Parent logger.Writer
|
|
|
|
bytesReceived *uint64
|
|
bytesSent *uint64
|
|
streamMedias map[*description.Media]*streamMedia
|
|
mutex sync.RWMutex
|
|
rtspStream *gortsplib.ServerStream
|
|
rtspsStream *gortsplib.ServerStream
|
|
streamReaders map[Reader]*streamReader
|
|
processingErrors *counterdumper.CounterDumper
|
|
|
|
readerRunning chan struct{}
|
|
}
|
|
|
|
// Initialize initializes a Stream.
|
|
func (s *Stream) Initialize() error {
|
|
s.bytesReceived = new(uint64)
|
|
s.bytesSent = new(uint64)
|
|
s.streamMedias = make(map[*description.Media]*streamMedia)
|
|
s.streamReaders = make(map[Reader]*streamReader)
|
|
s.readerRunning = make(chan struct{})
|
|
|
|
s.processingErrors = &counterdumper.CounterDumper{
|
|
OnReport: func(val uint64) {
|
|
s.Parent.Log(logger.Warn, "%d processing %s",
|
|
val,
|
|
func() string {
|
|
if val == 1 {
|
|
return "error"
|
|
}
|
|
return "errors"
|
|
}())
|
|
},
|
|
}
|
|
s.processingErrors.Start()
|
|
|
|
for _, media := range s.Desc.Medias {
|
|
s.streamMedias[media] = &streamMedia{
|
|
udpMaxPayloadSize: s.UDPMaxPayloadSize,
|
|
media: media,
|
|
generateRTPPackets: s.GenerateRTPPackets,
|
|
processingErrors: s.processingErrors,
|
|
}
|
|
err := s.streamMedias[media].initialize()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Close closes all resources of the stream.
|
|
func (s *Stream) Close() {
|
|
s.processingErrors.Stop()
|
|
|
|
if s.rtspStream != nil {
|
|
s.rtspStream.Close()
|
|
}
|
|
if s.rtspsStream != nil {
|
|
s.rtspsStream.Close()
|
|
}
|
|
}
|
|
|
|
// BytesReceived returns received bytes.
|
|
func (s *Stream) BytesReceived() uint64 {
|
|
return atomic.LoadUint64(s.bytesReceived)
|
|
}
|
|
|
|
// BytesSent returns sent bytes.
|
|
func (s *Stream) BytesSent() uint64 {
|
|
s.mutex.RLock()
|
|
defer s.mutex.RUnlock()
|
|
|
|
bytesSent := atomic.LoadUint64(s.bytesSent)
|
|
if s.rtspStream != nil {
|
|
stats := s.rtspStream.Stats()
|
|
bytesSent += stats.BytesSent
|
|
}
|
|
if s.rtspsStream != nil {
|
|
stats := s.rtspsStream.Stats()
|
|
bytesSent += stats.BytesSent
|
|
}
|
|
return bytesSent
|
|
}
|
|
|
|
// RTSPStream returns the RTSP stream.
|
|
func (s *Stream) RTSPStream(server *gortsplib.Server) *gortsplib.ServerStream {
|
|
s.mutex.Lock()
|
|
defer s.mutex.Unlock()
|
|
|
|
if s.rtspStream == nil {
|
|
s.rtspStream = &gortsplib.ServerStream{
|
|
Server: server,
|
|
Desc: s.Desc,
|
|
}
|
|
err := s.rtspStream.Initialize()
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
}
|
|
return s.rtspStream
|
|
}
|
|
|
|
// RTSPSStream returns the RTSPS stream.
|
|
func (s *Stream) RTSPSStream(server *gortsplib.Server) *gortsplib.ServerStream {
|
|
s.mutex.Lock()
|
|
defer s.mutex.Unlock()
|
|
|
|
if s.rtspsStream == nil {
|
|
s.rtspsStream = &gortsplib.ServerStream{
|
|
Server: server,
|
|
Desc: s.Desc,
|
|
}
|
|
err := s.rtspsStream.Initialize()
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
}
|
|
return s.rtspsStream
|
|
}
|
|
|
|
// AddReader adds a reader.
|
|
// Used by all protocols except RTSP.
|
|
func (s *Stream) AddReader(reader Reader, medi *description.Media, forma format.Format, cb ReadFunc) {
|
|
s.mutex.Lock()
|
|
defer s.mutex.Unlock()
|
|
|
|
sr, ok := s.streamReaders[reader]
|
|
if !ok {
|
|
sr = &streamReader{
|
|
queueSize: s.WriteQueueSize,
|
|
parent: reader,
|
|
}
|
|
sr.initialize()
|
|
|
|
s.streamReaders[reader] = sr
|
|
}
|
|
|
|
sm := s.streamMedias[medi]
|
|
sf := sm.formats[forma]
|
|
sf.addReader(sr, cb)
|
|
}
|
|
|
|
// RemoveReader removes a reader.
|
|
// Used by all protocols except RTSP.
|
|
func (s *Stream) RemoveReader(reader Reader) {
|
|
s.mutex.Lock()
|
|
defer s.mutex.Unlock()
|
|
|
|
sr := s.streamReaders[reader]
|
|
|
|
for _, sm := range s.streamMedias {
|
|
for _, sf := range sm.formats {
|
|
sf.removeReader(sr)
|
|
}
|
|
}
|
|
|
|
delete(s.streamReaders, reader)
|
|
|
|
sr.stop()
|
|
}
|
|
|
|
// StartReader starts a reader.
|
|
// Used by all protocols except RTSP.
|
|
func (s *Stream) StartReader(reader Reader) {
|
|
s.mutex.Lock()
|
|
defer s.mutex.Unlock()
|
|
|
|
sr := s.streamReaders[reader]
|
|
|
|
sr.start()
|
|
|
|
for _, sm := range s.streamMedias {
|
|
for _, sf := range sm.formats {
|
|
sf.startReader(sr)
|
|
}
|
|
}
|
|
|
|
select {
|
|
case <-s.readerRunning:
|
|
default:
|
|
close(s.readerRunning)
|
|
}
|
|
}
|
|
|
|
// ReaderError returns whenever there's an error.
|
|
func (s *Stream) ReaderError(reader Reader) chan error {
|
|
sr := s.streamReaders[reader]
|
|
return sr.error()
|
|
}
|
|
|
|
// ReaderFormats returns all formats that a reader is reading.
|
|
func (s *Stream) ReaderFormats(reader Reader) []format.Format {
|
|
s.mutex.RLock()
|
|
defer s.mutex.RUnlock()
|
|
|
|
sr := s.streamReaders[reader]
|
|
var formats []format.Format
|
|
|
|
for _, sm := range s.streamMedias {
|
|
for forma, sf := range sm.formats {
|
|
if _, ok := sf.pausedReaders[sr]; ok {
|
|
formats = append(formats, forma)
|
|
} else if _, ok := sf.runningReaders[sr]; ok {
|
|
formats = append(formats, forma)
|
|
}
|
|
}
|
|
}
|
|
|
|
return formats
|
|
}
|
|
|
|
// WaitRunningReader waits for a running reader.
|
|
func (s *Stream) WaitRunningReader() {
|
|
<-s.readerRunning
|
|
}
|
|
|
|
// WriteUnit writes a Unit.
|
|
func (s *Stream) WriteUnit(medi *description.Media, forma format.Format, u unit.Unit) {
|
|
sm := s.streamMedias[medi]
|
|
sf := sm.formats[forma]
|
|
|
|
s.mutex.RLock()
|
|
defer s.mutex.RUnlock()
|
|
|
|
sf.writeUnit(s, medi, u)
|
|
}
|
|
|
|
// WriteRTPPacket writes a RTP packet.
|
|
func (s *Stream) WriteRTPPacket(
|
|
medi *description.Media,
|
|
forma format.Format,
|
|
pkt *rtp.Packet,
|
|
ntp time.Time,
|
|
pts int64,
|
|
) {
|
|
sm := s.streamMedias[medi]
|
|
sf := sm.formats[forma]
|
|
|
|
s.mutex.RLock()
|
|
defer s.mutex.RUnlock()
|
|
|
|
sf.writeRTPPacket(s, medi, pkt, ntp, pts)
|
|
}
|