replace New* with Initialize() (#4345)

This commit is contained in:
Alessandro Ros 2025-03-16 15:34:53 +01:00 committed by GitHub
parent b64650ca4a
commit b329c4bbe8
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
45 changed files with 820 additions and 809 deletions

View file

@ -11,39 +11,37 @@ import (
// CertLoader is a certificate loader. It watches for changes to the certificate and key files. // CertLoader is a certificate loader. It watches for changes to the certificate and key files.
type CertLoader struct { type CertLoader struct {
log logger.Writer CertPath string
certWatcher, keyWatcher *confwatcher.ConfWatcher KeyPath string
certPath, keyPath string Parent logger.Writer
done chan struct{}
cert *tls.Certificate certWatcher, keyWatcher *confwatcher.ConfWatcher
certMu sync.RWMutex cert *tls.Certificate
certMu sync.RWMutex
done chan struct{}
} }
// New allocates a CertLoader. // Initialize initializes a CertLoader.
func New(certPath, keyPath string, log logger.Writer) (*CertLoader, error) { func (cl *CertLoader) Initialize() error {
cl := &CertLoader{ cl.done = make(chan struct{})
log: log,
certPath: certPath,
keyPath: keyPath,
done: make(chan struct{}),
}
var err error cl.certWatcher = &confwatcher.ConfWatcher{FilePath: cl.CertPath}
cl.certWatcher, err = confwatcher.New(certPath) err := cl.certWatcher.Initialize()
if err != nil { if err != nil {
return nil, err return err
} }
cl.keyWatcher, err = confwatcher.New(keyPath) cl.keyWatcher = &confwatcher.ConfWatcher{FilePath: cl.KeyPath}
err = cl.keyWatcher.Initialize()
if err != nil { if err != nil {
cl.certWatcher.Close() //nolint:errcheck cl.certWatcher.Close() //nolint:errcheck
return nil, err return err
} }
cert, err := tls.LoadX509KeyPair(certPath, keyPath) cert, err := tls.LoadX509KeyPair(cl.CertPath, cl.KeyPath)
if err != nil { if err != nil {
return nil, err return err
} }
cl.certMu.Lock() cl.certMu.Lock()
@ -52,7 +50,7 @@ func New(certPath, keyPath string, log logger.Writer) (*CertLoader, error) {
go cl.watch() go cl.watch()
return cl, nil return nil
} }
// Close closes a CertLoader and releases any underlying resources. // Close closes a CertLoader and releases any underlying resources.
@ -78,9 +76,9 @@ func (cl *CertLoader) watch() {
for { for {
select { select {
case <-cl.certWatcher.Watch(): case <-cl.certWatcher.Watch():
cert, err := tls.LoadX509KeyPair(cl.certPath, cl.keyPath) cert, err := tls.LoadX509KeyPair(cl.CertPath, cl.KeyPath)
if err != nil { if err != nil {
cl.log.Log(logger.Error, "certloader failed to load after change to %s: %s", cl.certPath, err.Error()) cl.Parent.Log(logger.Error, "certloader failed to load after change to %s: %s", cl.CertPath, err.Error())
continue continue
} }
@ -88,11 +86,11 @@ func (cl *CertLoader) watch() {
cl.cert = &cert cl.cert = &cert
cl.certMu.Unlock() cl.certMu.Unlock()
cl.log.Log(logger.Info, "certificate reloaded after change to %s", cl.certPath) cl.Parent.Log(logger.Info, "certificate reloaded after change to %s", cl.CertPath)
case <-cl.keyWatcher.Watch(): case <-cl.keyWatcher.Watch():
cert, err := tls.LoadX509KeyPair(cl.certPath, cl.keyPath) cert, err := tls.LoadX509KeyPair(cl.CertPath, cl.KeyPath)
if err != nil { if err != nil {
cl.log.Log(logger.Error, "certloader failed to load after change to %s: %s", cl.keyPath, err.Error()) cl.Parent.Log(logger.Error, "certloader failed to load after change to %s: %s", cl.KeyPath, err.Error())
continue continue
} }
@ -100,7 +98,7 @@ func (cl *CertLoader) watch() {
cl.cert = &cert cl.cert = &cert
cl.certMu.Unlock() cl.certMu.Unlock()
cl.log.Log(logger.Info, "certificate reloaded after change to %s", cl.keyPath) cl.Parent.Log(logger.Info, "certificate reloaded after change to %s", cl.KeyPath)
case <-cl.done: case <-cl.done:
return return
} }

View file

@ -22,7 +22,12 @@ func TestCertReload(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
defer os.Remove(serverKeyPath) defer os.Remove(serverKeyPath)
loader, err := New(serverCertPath, serverKeyPath, test.NilLogger) loader := &CertLoader{
CertPath: serverCertPath,
KeyPath: serverKeyPath,
Parent: test.NilLogger,
}
err = loader.Initialize()
require.NoError(t, err) require.NoError(t, err)
defer loader.Close() defer loader.Close()

View file

@ -16,8 +16,10 @@ const (
// ConfWatcher is a configuration file watcher. // ConfWatcher is a configuration file watcher.
type ConfWatcher struct { type ConfWatcher struct {
inner *fsnotify.Watcher FilePath string
watchedPath string
inner *fsnotify.Watcher
absolutePath string
// in // in
terminate chan struct{} terminate chan struct{}
@ -27,38 +29,35 @@ type ConfWatcher struct {
done chan struct{} done chan struct{}
} }
// New allocates a ConfWatcher. // Initialize initializes a ConfWatcher.
func New(confPath string) (*ConfWatcher, error) { func (w *ConfWatcher) Initialize() error {
if _, err := os.Stat(confPath); err != nil { if _, err := os.Stat(w.FilePath); err != nil {
return nil, err return err
} }
inner, err := fsnotify.NewWatcher() var err error
w.inner, err = fsnotify.NewWatcher()
if err != nil { if err != nil {
return nil, err return err
} }
// use absolute paths to support Darwin // use absolute paths to support Darwin
absolutePath, _ := filepath.Abs(confPath) w.absolutePath, _ = filepath.Abs(w.FilePath)
parentPath := filepath.Dir(absolutePath) parentPath := filepath.Dir(w.absolutePath)
err = inner.Add(parentPath) err = w.inner.Add(parentPath)
if err != nil { if err != nil {
inner.Close() //nolint:errcheck w.inner.Close() //nolint:errcheck
return nil, err return err
} }
w := &ConfWatcher{ w.terminate = make(chan struct{})
inner: inner, w.signal = make(chan struct{})
watchedPath: absolutePath, w.done = make(chan struct{})
terminate: make(chan struct{}),
signal: make(chan struct{}),
done: make(chan struct{}),
}
go w.run() go w.run()
return w, nil return nil
} }
// Close closes a ConfWatcher. // Close closes a ConfWatcher.
@ -71,7 +70,7 @@ func (w *ConfWatcher) run() {
defer close(w.done) defer close(w.done)
var lastCalled time.Time var lastCalled time.Time
previousWatchedPath, _ := filepath.EvalSymlinks(w.watchedPath) previousWatchedPath, _ := filepath.EvalSymlinks(w.absolutePath)
outer: outer:
for { for {
@ -81,7 +80,7 @@ outer:
continue continue
} }
currentWatchedPath, _ := filepath.EvalSymlinks(w.watchedPath) currentWatchedPath, _ := filepath.EvalSymlinks(w.absolutePath)
eventPath, _ := filepath.Abs(event.Name) eventPath, _ := filepath.Abs(event.Name)
eventPath, _ = filepath.EvalSymlinks(eventPath) eventPath, _ = filepath.EvalSymlinks(eventPath)

View file

@ -10,7 +10,8 @@ import (
) )
func TestNoFile(t *testing.T) { func TestNoFile(t *testing.T) {
_, err := New("/nonexistent") w := &ConfWatcher{FilePath: "/nonexistent"}
err := w.Initialize()
require.Error(t, err) require.Error(t, err)
} }
@ -18,7 +19,8 @@ func TestWrite(t *testing.T) {
fpath, err := test.CreateTempFile([]byte("{}")) fpath, err := test.CreateTempFile([]byte("{}"))
require.NoError(t, err) require.NoError(t, err)
w, err := New(fpath) w := &ConfWatcher{FilePath: fpath}
err = w.Initialize()
require.NoError(t, err) require.NoError(t, err)
defer w.Close() defer w.Close()
@ -43,7 +45,8 @@ func TestWriteMultipleTimes(t *testing.T) {
fpath, err := test.CreateTempFile([]byte("{}")) fpath, err := test.CreateTempFile([]byte("{}"))
require.NoError(t, err) require.NoError(t, err)
w, err := New(fpath) w := &ConfWatcher{FilePath: fpath}
err = w.Initialize()
require.NoError(t, err) require.NoError(t, err)
defer w.Close() defer w.Close()
@ -86,7 +89,8 @@ func TestDeleteCreate(t *testing.T) {
fpath, err := test.CreateTempFile([]byte("{}")) fpath, err := test.CreateTempFile([]byte("{}"))
require.NoError(t, err) require.NoError(t, err)
w, err := New(fpath) w := &ConfWatcher{FilePath: fpath}
err = w.Initialize()
require.NoError(t, err) require.NoError(t, err)
defer w.Close() defer w.Close()
@ -117,7 +121,8 @@ func TestSymlinkDeleteCreate(t *testing.T) {
err = os.Symlink(fpath, fpath+"-sym") err = os.Symlink(fpath, fpath+"-sym")
require.NoError(t, err) require.NoError(t, err)
w, err := New(fpath + "-sym") w := &ConfWatcher{FilePath: fpath + "-sym"}
err = w.Initialize()
require.NoError(t, err) require.NoError(t, err)
defer w.Close() defer w.Close()

View file

@ -259,7 +259,11 @@ func (p *Core) createResources(initial bool) error {
gin.SetMode(gin.ReleaseMode) gin.SetMode(gin.ReleaseMode)
p.externalCmdPool = externalcmd.NewPool() p.externalCmdPool = &externalcmd.Pool{}
err = p.externalCmdPool.Initialize()
if err != nil {
return err
}
} }
if p.authManager == nil { if p.authManager == nil {
@ -631,7 +635,8 @@ func (p *Core) createResources(initial bool) error {
} }
if initial && p.confPath != "" { if initial && p.confPath != "" {
p.confWatcher, err = confwatcher.New(p.confPath) p.confWatcher = &confwatcher.ConfWatcher{FilePath: p.confPath}
err = p.confWatcher.Initialize()
if err != nil { if err != nil {
return err return err
} }

View file

@ -577,7 +577,7 @@ func (pa *path) doAPIPathsGet(req pathAPIPathsGetReq) {
if pa.stream == nil { if pa.stream == nil {
return []string{} return []string{}
} }
return defs.MediasToCodecs(pa.stream.Desc().Medias) return defs.MediasToCodecs(pa.stream.Desc.Medias)
}(), }(),
BytesReceived: func() uint64 { BytesReceived: func() uint64 {
if pa.stream == nil { if pa.stream == nil {
@ -695,14 +695,14 @@ func (pa *path) onDemandPublisherStop(reason string) {
} }
func (pa *path) setReady(desc *description.Session, allocateEncoder bool) error { func (pa *path) setReady(desc *description.Session, allocateEncoder bool) error {
var err error pa.stream = &stream.Stream{
pa.stream, err = stream.New( WriteQueueSize: pa.writeQueueSize,
pa.writeQueueSize, UDPMaxPayloadSize: pa.udpMaxPayloadSize,
pa.udpMaxPayloadSize, Desc: desc,
desc, GenerateRTPPackets: allocateEncoder,
allocateEncoder, DecodeErrLogger: logger.NewLimitedLogger(pa.source),
logger.NewLimitedLogger(pa.source), }
) err := pa.stream.Initialize()
if err != nil { if err != nil {
return err return err
} }

View file

@ -9,9 +9,9 @@ type Pool struct {
wg sync.WaitGroup wg sync.WaitGroup
} }
// NewPool allocates a Pool. // Initialize initializes a Pool.
func NewPool() *Pool { func (p *Pool) Initialize() error {
return &Pool{} return nil
} }
// Close waits for all external commands to exit. // Close waits for all external commands to exit.

View file

@ -23,41 +23,34 @@ func randUint32() (uint32, error) {
} }
type formatProcessorAC3 struct { type formatProcessorAC3 struct {
udpMaxPayloadSize int UDPMaxPayloadSize int
format *format.AC3 Format *format.AC3
encoder *rtpac3.Encoder GenerateRTPPackets bool
decoder *rtpac3.Decoder
randomStart uint32 encoder *rtpac3.Encoder
decoder *rtpac3.Decoder
randomStart uint32
} }
func newAC3( func (t *formatProcessorAC3) initialize() error {
udpMaxPayloadSize int, if t.GenerateRTPPackets {
forma *format.AC3,
generateRTPPackets bool,
) (*formatProcessorAC3, error) {
t := &formatProcessorAC3{
udpMaxPayloadSize: udpMaxPayloadSize,
format: forma,
}
if generateRTPPackets {
err := t.createEncoder() err := t.createEncoder()
if err != nil { if err != nil {
return nil, err return err
} }
t.randomStart, err = randUint32() t.randomStart, err = randUint32()
if err != nil { if err != nil {
return nil, err return err
} }
} }
return t, nil return nil
} }
func (t *formatProcessorAC3) createEncoder() error { func (t *formatProcessorAC3) createEncoder() error {
t.encoder = &rtpac3.Encoder{ t.encoder = &rtpac3.Encoder{
PayloadType: t.format.PayloadTyp, PayloadType: t.Format.PayloadTyp,
} }
return t.encoder.Init() return t.encoder.Init()
} }
@ -96,16 +89,16 @@ func (t *formatProcessorAC3) ProcessRTPPacket( //nolint:dupl
pkt.Header.Padding = false pkt.Header.Padding = false
pkt.PaddingSize = 0 pkt.PaddingSize = 0
if pkt.MarshalSize() > t.udpMaxPayloadSize { if pkt.MarshalSize() > t.UDPMaxPayloadSize {
return nil, fmt.Errorf("payload size (%d) is greater than maximum allowed (%d)", return nil, fmt.Errorf("payload size (%d) is greater than maximum allowed (%d)",
pkt.MarshalSize(), t.udpMaxPayloadSize) pkt.MarshalSize(), t.UDPMaxPayloadSize)
} }
// decode from RTP // decode from RTP
if hasNonRTSPReaders || t.decoder != nil { if hasNonRTSPReaders || t.decoder != nil {
if t.decoder == nil { if t.decoder == nil {
var err error var err error
t.decoder, err = t.format.CreateDecoder() t.decoder, err = t.Format.CreateDecoder()
if err != nil { if err != nil {
return nil, err return nil, err
} }

View file

@ -20,42 +20,35 @@ var (
) )
type formatProcessorAV1 struct { type formatProcessorAV1 struct {
udpMaxPayloadSize int UDPMaxPayloadSize int
format *format.AV1 Format *format.AV1
encoder *rtpav1.Encoder GenerateRTPPackets bool
decoder *rtpav1.Decoder
randomStart uint32 encoder *rtpav1.Encoder
decoder *rtpav1.Decoder
randomStart uint32
} }
func newAV1( func (t *formatProcessorAV1) initialize() error {
udpMaxPayloadSize int, if t.GenerateRTPPackets {
forma *format.AV1,
generateRTPPackets bool,
) (*formatProcessorAV1, error) {
t := &formatProcessorAV1{
udpMaxPayloadSize: udpMaxPayloadSize,
format: forma,
}
if generateRTPPackets {
err := t.createEncoder() err := t.createEncoder()
if err != nil { if err != nil {
return nil, err return err
} }
t.randomStart, err = randUint32() t.randomStart, err = randUint32()
if err != nil { if err != nil {
return nil, err return err
} }
} }
return t, nil return nil
} }
func (t *formatProcessorAV1) createEncoder() error { func (t *formatProcessorAV1) createEncoder() error {
t.encoder = &rtpav1.Encoder{ t.encoder = &rtpav1.Encoder{
PayloadMaxSize: t.udpMaxPayloadSize - 12, PayloadMaxSize: t.UDPMaxPayloadSize - 12,
PayloadType: t.format.PayloadTyp, PayloadType: t.Format.PayloadTyp,
} }
return t.encoder.Init() return t.encoder.Init()
} }
@ -94,16 +87,16 @@ func (t *formatProcessorAV1) ProcessRTPPacket( //nolint:dupl
pkt.Header.Padding = false pkt.Header.Padding = false
pkt.PaddingSize = 0 pkt.PaddingSize = 0
if pkt.MarshalSize() > t.udpMaxPayloadSize { if pkt.MarshalSize() > t.UDPMaxPayloadSize {
return nil, fmt.Errorf("payload size (%d) is greater than maximum allowed (%d)", return nil, fmt.Errorf("payload size (%d) is greater than maximum allowed (%d)",
pkt.MarshalSize(), t.udpMaxPayloadSize) pkt.MarshalSize(), t.UDPMaxPayloadSize)
} }
// decode from RTP // decode from RTP
if hasNonRTSPReaders || t.decoder != nil { if hasNonRTSPReaders || t.decoder != nil {
if t.decoder == nil { if t.decoder == nil {
var err error var err error
t.decoder, err = t.format.CreateDecoder() t.decoder, err = t.Format.CreateDecoder()
if err != nil { if err != nil {
return nil, err return nil, err
} }

View file

@ -12,44 +12,37 @@ import (
) )
type formatProcessorG711 struct { type formatProcessorG711 struct {
udpMaxPayloadSize int UDPMaxPayloadSize int
format *format.G711 Format *format.G711
encoder *rtplpcm.Encoder GenerateRTPPackets bool
decoder *rtplpcm.Decoder
randomStart uint32 encoder *rtplpcm.Encoder
decoder *rtplpcm.Decoder
randomStart uint32
} }
func newG711( func (t *formatProcessorG711) initialize() error {
udpMaxPayloadSize int, if t.GenerateRTPPackets {
forma *format.G711,
generateRTPPackets bool,
) (*formatProcessorG711, error) {
t := &formatProcessorG711{
udpMaxPayloadSize: udpMaxPayloadSize,
format: forma,
}
if generateRTPPackets {
err := t.createEncoder() err := t.createEncoder()
if err != nil { if err != nil {
return nil, err return err
} }
t.randomStart, err = randUint32() t.randomStart, err = randUint32()
if err != nil { if err != nil {
return nil, err return err
} }
} }
return t, nil return nil
} }
func (t *formatProcessorG711) createEncoder() error { func (t *formatProcessorG711) createEncoder() error {
t.encoder = &rtplpcm.Encoder{ t.encoder = &rtplpcm.Encoder{
PayloadMaxSize: t.udpMaxPayloadSize - 12, PayloadMaxSize: t.UDPMaxPayloadSize - 12,
PayloadType: t.format.PayloadType(), PayloadType: t.Format.PayloadType(),
BitDepth: 8, BitDepth: 8,
ChannelCount: t.format.ChannelCount, ChannelCount: t.Format.ChannelCount,
} }
return t.encoder.Init() return t.encoder.Init()
} }
@ -88,16 +81,16 @@ func (t *formatProcessorG711) ProcessRTPPacket( //nolint:dupl
pkt.Header.Padding = false pkt.Header.Padding = false
pkt.PaddingSize = 0 pkt.PaddingSize = 0
if pkt.MarshalSize() > t.udpMaxPayloadSize { if pkt.MarshalSize() > t.UDPMaxPayloadSize {
return nil, fmt.Errorf("payload size (%d) is greater than maximum allowed (%d)", return nil, fmt.Errorf("payload size (%d) is greater than maximum allowed (%d)",
pkt.MarshalSize(), t.udpMaxPayloadSize) pkt.MarshalSize(), t.UDPMaxPayloadSize)
} }
// decode from RTP // decode from RTP
if hasNonRTSPReaders || t.decoder != nil { if hasNonRTSPReaders || t.decoder != nil {
if t.decoder == nil { if t.decoder == nil {
var err error var err error
t.decoder, err = t.format.CreateDecoder() t.decoder, err = t.Format.CreateDecoder()
if err != nil { if err != nil {
return nil, err return nil, err
} }

View file

@ -11,21 +11,17 @@ import (
) )
type formatProcessorGeneric struct { type formatProcessorGeneric struct {
udpMaxPayloadSize int UDPMaxPayloadSize int
Format format.Format
GenerateRTPPackets bool
} }
func newGeneric( func (t *formatProcessorGeneric) initialize() error {
udpMaxPayloadSize int, if t.GenerateRTPPackets {
forma format.Format, return fmt.Errorf("we don't know how to generate RTP packets of format %T", t.Format)
generateRTPPackets bool,
) (*formatProcessorGeneric, error) {
if generateRTPPackets {
return nil, fmt.Errorf("we don't know how to generate RTP packets of format %T", forma)
} }
return &formatProcessorGeneric{ return nil
udpMaxPayloadSize: udpMaxPayloadSize,
}, nil
} }
func (t *formatProcessorGeneric) ProcessUnit(_ unit.Unit) error { func (t *formatProcessorGeneric) ProcessUnit(_ unit.Unit) error {
@ -50,9 +46,9 @@ func (t *formatProcessorGeneric) ProcessRTPPacket(
pkt.Header.Padding = false pkt.Header.Padding = false
pkt.PaddingSize = 0 pkt.PaddingSize = 0
if pkt.MarshalSize() > t.udpMaxPayloadSize { if pkt.MarshalSize() > t.UDPMaxPayloadSize {
return nil, fmt.Errorf("payload size (%d) is greater than maximum allowed (%d)", return nil, fmt.Errorf("payload size (%d) is greater than maximum allowed (%d)",
pkt.MarshalSize(), t.udpMaxPayloadSize) pkt.MarshalSize(), t.UDPMaxPayloadSize)
} }
return u, nil return u, nil

View file

@ -82,36 +82,29 @@ func rtpH264ExtractParams(payload []byte) ([]byte, []byte) {
} }
type formatProcessorH264 struct { type formatProcessorH264 struct {
udpMaxPayloadSize int UDPMaxPayloadSize int
format *format.H264 Format *format.H264
encoder *rtph264.Encoder GenerateRTPPackets bool
decoder *rtph264.Decoder
randomStart uint32 encoder *rtph264.Encoder
decoder *rtph264.Decoder
randomStart uint32
} }
func newH264( func (t *formatProcessorH264) initialize() error {
udpMaxPayloadSize int, if t.GenerateRTPPackets {
forma *format.H264,
generateRTPPackets bool,
) (*formatProcessorH264, error) {
t := &formatProcessorH264{
udpMaxPayloadSize: udpMaxPayloadSize,
format: forma,
}
if generateRTPPackets {
err := t.createEncoder(nil, nil) err := t.createEncoder(nil, nil)
if err != nil { if err != nil {
return nil, err return err
} }
t.randomStart, err = randUint32() t.randomStart, err = randUint32()
if err != nil { if err != nil {
return nil, err return err
} }
} }
return t, nil return nil
} }
func (t *formatProcessorH264) createEncoder( func (t *formatProcessorH264) createEncoder(
@ -119,11 +112,11 @@ func (t *formatProcessorH264) createEncoder(
initialSequenceNumber *uint16, initialSequenceNumber *uint16,
) error { ) error {
t.encoder = &rtph264.Encoder{ t.encoder = &rtph264.Encoder{
PayloadMaxSize: t.udpMaxPayloadSize - 12, PayloadMaxSize: t.UDPMaxPayloadSize - 12,
PayloadType: t.format.PayloadTyp, PayloadType: t.Format.PayloadTyp,
SSRC: ssrc, SSRC: ssrc,
InitialSequenceNumber: initialSequenceNumber, InitialSequenceNumber: initialSequenceNumber,
PacketizationMode: t.format.PacketizationMode, PacketizationMode: t.Format.PacketizationMode,
} }
return t.encoder.Init() return t.encoder.Init()
} }
@ -131,21 +124,21 @@ func (t *formatProcessorH264) createEncoder(
func (t *formatProcessorH264) updateTrackParametersFromRTPPacket(payload []byte) { func (t *formatProcessorH264) updateTrackParametersFromRTPPacket(payload []byte) {
sps, pps := rtpH264ExtractParams(payload) sps, pps := rtpH264ExtractParams(payload)
if (sps != nil && !bytes.Equal(sps, t.format.SPS)) || if (sps != nil && !bytes.Equal(sps, t.Format.SPS)) ||
(pps != nil && !bytes.Equal(pps, t.format.PPS)) { (pps != nil && !bytes.Equal(pps, t.Format.PPS)) {
if sps == nil { if sps == nil {
sps = t.format.SPS sps = t.Format.SPS
} }
if pps == nil { if pps == nil {
pps = t.format.PPS pps = t.Format.PPS
} }
t.format.SafeSetParams(sps, pps) t.Format.SafeSetParams(sps, pps)
} }
} }
func (t *formatProcessorH264) updateTrackParametersFromAU(au [][]byte) { func (t *formatProcessorH264) updateTrackParametersFromAU(au [][]byte) {
sps := t.format.SPS sps := t.Format.SPS
pps := t.format.PPS pps := t.Format.PPS
update := false update := false
for _, nalu := range au { for _, nalu := range au {
@ -167,7 +160,7 @@ func (t *formatProcessorH264) updateTrackParametersFromAU(au [][]byte) {
} }
if update { if update {
t.format.SafeSetParams(sps, pps) t.Format.SafeSetParams(sps, pps)
} }
} }
@ -190,7 +183,7 @@ func (t *formatProcessorH264) remuxAccessUnit(au [][]byte) [][]byte {
isKeyFrame = true isKeyFrame = true
// prepend parameters // prepend parameters
if t.format.SPS != nil && t.format.PPS != nil { if t.Format.SPS != nil && t.Format.PPS != nil {
n += 2 n += 2
} }
} }
@ -205,9 +198,9 @@ func (t *formatProcessorH264) remuxAccessUnit(au [][]byte) [][]byte {
filteredNALUs := make([][]byte, n) filteredNALUs := make([][]byte, n)
i := 0 i := 0
if isKeyFrame && t.format.SPS != nil && t.format.PPS != nil { if isKeyFrame && t.Format.SPS != nil && t.Format.PPS != nil {
filteredNALUs[0] = t.format.SPS filteredNALUs[0] = t.Format.SPS
filteredNALUs[1] = t.format.PPS filteredNALUs[1] = t.Format.PPS
i = 2 i = 2
} }
@ -272,7 +265,7 @@ func (t *formatProcessorH264) ProcessRTPPacket( //nolint:dupl
pkt.PaddingSize = 0 pkt.PaddingSize = 0
// RTP packets exceed maximum size: start re-encoding them // RTP packets exceed maximum size: start re-encoding them
if pkt.MarshalSize() > t.udpMaxPayloadSize { if pkt.MarshalSize() > t.UDPMaxPayloadSize {
v1 := pkt.SSRC v1 := pkt.SSRC
v2 := pkt.SequenceNumber v2 := pkt.SequenceNumber
err := t.createEncoder(&v1, &v2) err := t.createEncoder(&v1, &v2)
@ -286,7 +279,7 @@ func (t *formatProcessorH264) ProcessRTPPacket( //nolint:dupl
if hasNonRTSPReaders || t.decoder != nil || t.encoder != nil { if hasNonRTSPReaders || t.decoder != nil || t.encoder != nil {
if t.decoder == nil { if t.decoder == nil {
var err error var err error
t.decoder, err = t.format.CreateDecoder() t.decoder, err = t.Format.CreateDecoder()
if err != nil { if err != nil {
return nil, err return nil, err
} }

View file

@ -102,36 +102,29 @@ func rtpH265ExtractParams(payload []byte) ([]byte, []byte, []byte) {
} }
type formatProcessorH265 struct { type formatProcessorH265 struct {
udpMaxPayloadSize int UDPMaxPayloadSize int
format *format.H265 Format *format.H265
encoder *rtph265.Encoder GenerateRTPPackets bool
decoder *rtph265.Decoder
randomStart uint32 encoder *rtph265.Encoder
decoder *rtph265.Decoder
randomStart uint32
} }
func newH265( func (t *formatProcessorH265) initialize() error {
udpMaxPayloadSize int, if t.GenerateRTPPackets {
forma *format.H265,
generateRTPPackets bool,
) (*formatProcessorH265, error) {
t := &formatProcessorH265{
udpMaxPayloadSize: udpMaxPayloadSize,
format: forma,
}
if generateRTPPackets {
err := t.createEncoder(nil, nil) err := t.createEncoder(nil, nil)
if err != nil { if err != nil {
return nil, err return err
} }
t.randomStart, err = randUint32() t.randomStart, err = randUint32()
if err != nil { if err != nil {
return nil, err return err
} }
} }
return t, nil return nil
} }
func (t *formatProcessorH265) createEncoder( func (t *formatProcessorH265) createEncoder(
@ -139,11 +132,11 @@ func (t *formatProcessorH265) createEncoder(
initialSequenceNumber *uint16, initialSequenceNumber *uint16,
) error { ) error {
t.encoder = &rtph265.Encoder{ t.encoder = &rtph265.Encoder{
PayloadMaxSize: t.udpMaxPayloadSize - 12, PayloadMaxSize: t.UDPMaxPayloadSize - 12,
PayloadType: t.format.PayloadTyp, PayloadType: t.Format.PayloadTyp,
SSRC: ssrc, SSRC: ssrc,
InitialSequenceNumber: initialSequenceNumber, InitialSequenceNumber: initialSequenceNumber,
MaxDONDiff: t.format.MaxDONDiff, MaxDONDiff: t.Format.MaxDONDiff,
} }
return t.encoder.Init() return t.encoder.Init()
} }
@ -151,26 +144,26 @@ func (t *formatProcessorH265) createEncoder(
func (t *formatProcessorH265) updateTrackParametersFromRTPPacket(payload []byte) { func (t *formatProcessorH265) updateTrackParametersFromRTPPacket(payload []byte) {
vps, sps, pps := rtpH265ExtractParams(payload) vps, sps, pps := rtpH265ExtractParams(payload)
if (vps != nil && !bytes.Equal(vps, t.format.VPS)) || if (vps != nil && !bytes.Equal(vps, t.Format.VPS)) ||
(sps != nil && !bytes.Equal(sps, t.format.SPS)) || (sps != nil && !bytes.Equal(sps, t.Format.SPS)) ||
(pps != nil && !bytes.Equal(pps, t.format.PPS)) { (pps != nil && !bytes.Equal(pps, t.Format.PPS)) {
if vps == nil { if vps == nil {
vps = t.format.VPS vps = t.Format.VPS
} }
if sps == nil { if sps == nil {
sps = t.format.SPS sps = t.Format.SPS
} }
if pps == nil { if pps == nil {
pps = t.format.PPS pps = t.Format.PPS
} }
t.format.SafeSetParams(vps, sps, pps) t.Format.SafeSetParams(vps, sps, pps)
} }
} }
func (t *formatProcessorH265) updateTrackParametersFromAU(au [][]byte) { func (t *formatProcessorH265) updateTrackParametersFromAU(au [][]byte) {
vps := t.format.VPS vps := t.Format.VPS
sps := t.format.SPS sps := t.Format.SPS
pps := t.format.PPS pps := t.Format.PPS
update := false update := false
for _, nalu := range au { for _, nalu := range au {
@ -178,19 +171,19 @@ func (t *formatProcessorH265) updateTrackParametersFromAU(au [][]byte) {
switch typ { switch typ {
case h265.NALUType_VPS_NUT: case h265.NALUType_VPS_NUT:
if !bytes.Equal(nalu, t.format.VPS) { if !bytes.Equal(nalu, t.Format.VPS) {
vps = nalu vps = nalu
update = true update = true
} }
case h265.NALUType_SPS_NUT: case h265.NALUType_SPS_NUT:
if !bytes.Equal(nalu, t.format.SPS) { if !bytes.Equal(nalu, t.Format.SPS) {
sps = nalu sps = nalu
update = true update = true
} }
case h265.NALUType_PPS_NUT: case h265.NALUType_PPS_NUT:
if !bytes.Equal(nalu, t.format.PPS) { if !bytes.Equal(nalu, t.Format.PPS) {
pps = nalu pps = nalu
update = true update = true
} }
@ -198,7 +191,7 @@ func (t *formatProcessorH265) updateTrackParametersFromAU(au [][]byte) {
} }
if update { if update {
t.format.SafeSetParams(vps, sps, pps) t.Format.SafeSetParams(vps, sps, pps)
} }
} }
@ -221,7 +214,7 @@ func (t *formatProcessorH265) remuxAccessUnit(au [][]byte) [][]byte {
isKeyFrame = true isKeyFrame = true
// prepend parameters // prepend parameters
if t.format.VPS != nil && t.format.SPS != nil && t.format.PPS != nil { if t.Format.VPS != nil && t.Format.SPS != nil && t.Format.PPS != nil {
n += 3 n += 3
} }
} }
@ -236,10 +229,10 @@ func (t *formatProcessorH265) remuxAccessUnit(au [][]byte) [][]byte {
filteredNALUs := make([][]byte, n) filteredNALUs := make([][]byte, n)
i := 0 i := 0
if isKeyFrame && t.format.VPS != nil && t.format.SPS != nil && t.format.PPS != nil { if isKeyFrame && t.Format.VPS != nil && t.Format.SPS != nil && t.Format.PPS != nil {
filteredNALUs[0] = t.format.VPS filteredNALUs[0] = t.Format.VPS
filteredNALUs[1] = t.format.SPS filteredNALUs[1] = t.Format.SPS
filteredNALUs[2] = t.format.PPS filteredNALUs[2] = t.Format.PPS
i = 3 i = 3
} }
@ -304,7 +297,7 @@ func (t *formatProcessorH265) ProcessRTPPacket( //nolint:dupl
pkt.PaddingSize = 0 pkt.PaddingSize = 0
// RTP packets exceed maximum size: start re-encoding them // RTP packets exceed maximum size: start re-encoding them
if pkt.MarshalSize() > t.udpMaxPayloadSize { if pkt.MarshalSize() > t.UDPMaxPayloadSize {
v1 := pkt.SSRC v1 := pkt.SSRC
v2 := pkt.SequenceNumber v2 := pkt.SequenceNumber
err := t.createEncoder(&v1, &v2) err := t.createEncoder(&v1, &v2)
@ -318,7 +311,7 @@ func (t *formatProcessorH265) ProcessRTPPacket( //nolint:dupl
if hasNonRTSPReaders || t.decoder != nil || t.encoder != nil { if hasNonRTSPReaders || t.decoder != nil || t.encoder != nil {
if t.decoder == nil { if t.decoder == nil {
var err error var err error
t.decoder, err = t.format.CreateDecoder() t.decoder, err = t.Format.CreateDecoder()
if err != nil { if err != nil {
return nil, err return nil, err
} }

View file

@ -12,44 +12,37 @@ import (
) )
type formatProcessorLPCM struct { type formatProcessorLPCM struct {
udpMaxPayloadSize int UDPMaxPayloadSize int
format *format.LPCM Format *format.LPCM
encoder *rtplpcm.Encoder GenerateRTPPackets bool
decoder *rtplpcm.Decoder
randomStart uint32 encoder *rtplpcm.Encoder
decoder *rtplpcm.Decoder
randomStart uint32
} }
func newLPCM( func (t *formatProcessorLPCM) initialize() error {
udpMaxPayloadSize int, if t.GenerateRTPPackets {
forma *format.LPCM,
generateRTPPackets bool,
) (*formatProcessorLPCM, error) {
t := &formatProcessorLPCM{
udpMaxPayloadSize: udpMaxPayloadSize,
format: forma,
}
if generateRTPPackets {
err := t.createEncoder() err := t.createEncoder()
if err != nil { if err != nil {
return nil, err return err
} }
t.randomStart, err = randUint32() t.randomStart, err = randUint32()
if err != nil { if err != nil {
return nil, err return err
} }
} }
return t, nil return nil
} }
func (t *formatProcessorLPCM) createEncoder() error { func (t *formatProcessorLPCM) createEncoder() error {
t.encoder = &rtplpcm.Encoder{ t.encoder = &rtplpcm.Encoder{
PayloadMaxSize: t.udpMaxPayloadSize - 12, PayloadMaxSize: t.UDPMaxPayloadSize - 12,
PayloadType: t.format.PayloadTyp, PayloadType: t.Format.PayloadTyp,
BitDepth: t.format.BitDepth, BitDepth: t.Format.BitDepth,
ChannelCount: t.format.ChannelCount, ChannelCount: t.Format.ChannelCount,
} }
return t.encoder.Init() return t.encoder.Init()
} }
@ -88,16 +81,16 @@ func (t *formatProcessorLPCM) ProcessRTPPacket( //nolint:dupl
pkt.Header.Padding = false pkt.Header.Padding = false
pkt.PaddingSize = 0 pkt.PaddingSize = 0
if pkt.MarshalSize() > t.udpMaxPayloadSize { if pkt.MarshalSize() > t.UDPMaxPayloadSize {
return nil, fmt.Errorf("payload size (%d) is greater than maximum allowed (%d)", return nil, fmt.Errorf("payload size (%d) is greater than maximum allowed (%d)",
pkt.MarshalSize(), t.udpMaxPayloadSize) pkt.MarshalSize(), t.UDPMaxPayloadSize)
} }
// decode from RTP // decode from RTP
if hasNonRTSPReaders || t.decoder != nil { if hasNonRTSPReaders || t.decoder != nil {
if t.decoder == nil { if t.decoder == nil {
var err error var err error
t.decoder, err = t.format.CreateDecoder() t.decoder, err = t.Format.CreateDecoder()
if err != nil { if err != nil {
return nil, err return nil, err
} }

View file

@ -13,41 +13,34 @@ import (
) )
type formatProcessorMJPEG struct { type formatProcessorMJPEG struct {
udpMaxPayloadSize int UDPMaxPayloadSize int
format *format.MJPEG Format *format.MJPEG
encoder *rtpmjpeg.Encoder GenerateRTPPackets bool
decoder *rtpmjpeg.Decoder
randomStart uint32 encoder *rtpmjpeg.Encoder
decoder *rtpmjpeg.Decoder
randomStart uint32
} }
func newMJPEG( func (t *formatProcessorMJPEG) initialize() error {
udpMaxPayloadSize int, if t.GenerateRTPPackets {
forma *format.MJPEG,
generateRTPPackets bool,
) (*formatProcessorMJPEG, error) {
t := &formatProcessorMJPEG{
udpMaxPayloadSize: udpMaxPayloadSize,
format: forma,
}
if generateRTPPackets {
err := t.createEncoder() err := t.createEncoder()
if err != nil { if err != nil {
return nil, err return err
} }
t.randomStart, err = randUint32() t.randomStart, err = randUint32()
if err != nil { if err != nil {
return nil, err return err
} }
} }
return t, nil return nil
} }
func (t *formatProcessorMJPEG) createEncoder() error { func (t *formatProcessorMJPEG) createEncoder() error {
t.encoder = &rtpmjpeg.Encoder{ t.encoder = &rtpmjpeg.Encoder{
PayloadMaxSize: t.udpMaxPayloadSize - 12, PayloadMaxSize: t.UDPMaxPayloadSize - 12,
} }
return t.encoder.Init() return t.encoder.Init()
} }
@ -87,16 +80,16 @@ func (t *formatProcessorMJPEG) ProcessRTPPacket( //nolint:dupl
pkt.Header.Padding = false pkt.Header.Padding = false
pkt.PaddingSize = 0 pkt.PaddingSize = 0
if pkt.MarshalSize() > t.udpMaxPayloadSize { if pkt.MarshalSize() > t.UDPMaxPayloadSize {
return nil, fmt.Errorf("payload size (%d) is greater than maximum allowed (%d)", return nil, fmt.Errorf("payload size (%d) is greater than maximum allowed (%d)",
pkt.MarshalSize(), t.udpMaxPayloadSize) pkt.MarshalSize(), t.UDPMaxPayloadSize)
} }
// decode from RTP // decode from RTP
if hasNonRTSPReaders || t.decoder != nil { if hasNonRTSPReaders || t.decoder != nil {
if t.decoder == nil { if t.decoder == nil {
var err error var err error
t.decoder, err = t.format.CreateDecoder() t.decoder, err = t.Format.CreateDecoder()
if err != nil { if err != nil {
return nil, err return nil, err
} }

View file

@ -13,41 +13,34 @@ import (
) )
type formatProcessorMPEG1Audio struct { type formatProcessorMPEG1Audio struct {
udpMaxPayloadSize int UDPMaxPayloadSize int
format *format.MPEG1Audio Format *format.MPEG1Audio
encoder *rtpmpeg1audio.Encoder GenerateRTPPackets bool
decoder *rtpmpeg1audio.Decoder
randomStart uint32 encoder *rtpmpeg1audio.Encoder
decoder *rtpmpeg1audio.Decoder
randomStart uint32
} }
func newMPEG1Audio( func (t *formatProcessorMPEG1Audio) initialize() error {
udpMaxPayloadSize int, if t.GenerateRTPPackets {
forma *format.MPEG1Audio,
generateRTPPackets bool,
) (*formatProcessorMPEG1Audio, error) {
t := &formatProcessorMPEG1Audio{
udpMaxPayloadSize: udpMaxPayloadSize,
format: forma,
}
if generateRTPPackets {
err := t.createEncoder() err := t.createEncoder()
if err != nil { if err != nil {
return nil, err return err
} }
t.randomStart, err = randUint32() t.randomStart, err = randUint32()
if err != nil { if err != nil {
return nil, err return err
} }
} }
return t, nil return nil
} }
func (t *formatProcessorMPEG1Audio) createEncoder() error { func (t *formatProcessorMPEG1Audio) createEncoder() error {
t.encoder = &rtpmpeg1audio.Encoder{ t.encoder = &rtpmpeg1audio.Encoder{
PayloadMaxSize: t.udpMaxPayloadSize - 12, PayloadMaxSize: t.UDPMaxPayloadSize - 12,
} }
return t.encoder.Init() return t.encoder.Init()
} }
@ -86,16 +79,16 @@ func (t *formatProcessorMPEG1Audio) ProcessRTPPacket( //nolint:dupl
pkt.Header.Padding = false pkt.Header.Padding = false
pkt.PaddingSize = 0 pkt.PaddingSize = 0
if pkt.MarshalSize() > t.udpMaxPayloadSize { if pkt.MarshalSize() > t.UDPMaxPayloadSize {
return nil, fmt.Errorf("payload size (%d) is greater than maximum allowed (%d)", return nil, fmt.Errorf("payload size (%d) is greater than maximum allowed (%d)",
pkt.MarshalSize(), t.udpMaxPayloadSize) pkt.MarshalSize(), t.UDPMaxPayloadSize)
} }
// decode from RTP // decode from RTP
if hasNonRTSPReaders || t.decoder != nil { if hasNonRTSPReaders || t.decoder != nil {
if t.decoder == nil { if t.decoder == nil {
var err error var err error
t.decoder, err = t.format.CreateDecoder() t.decoder, err = t.Format.CreateDecoder()
if err != nil { if err != nil {
return nil, err return nil, err
} }

View file

@ -22,41 +22,34 @@ var (
) )
type formatProcessorMPEG1Video struct { type formatProcessorMPEG1Video struct {
udpMaxPayloadSize int UDPMaxPayloadSize int
format *format.MPEG1Video Format *format.MPEG1Video
encoder *rtpmpeg1video.Encoder GenerateRTPPackets bool
decoder *rtpmpeg1video.Decoder
randomStart uint32 encoder *rtpmpeg1video.Encoder
decoder *rtpmpeg1video.Decoder
randomStart uint32
} }
func newMPEG1Video( func (t *formatProcessorMPEG1Video) initialize() error {
udpMaxPayloadSize int, if t.GenerateRTPPackets {
forma *format.MPEG1Video,
generateRTPPackets bool,
) (*formatProcessorMPEG1Video, error) {
t := &formatProcessorMPEG1Video{
udpMaxPayloadSize: udpMaxPayloadSize,
format: forma,
}
if generateRTPPackets {
err := t.createEncoder() err := t.createEncoder()
if err != nil { if err != nil {
return nil, err return err
} }
t.randomStart, err = randUint32() t.randomStart, err = randUint32()
if err != nil { if err != nil {
return nil, err return err
} }
} }
return t, nil return nil
} }
func (t *formatProcessorMPEG1Video) createEncoder() error { func (t *formatProcessorMPEG1Video) createEncoder() error {
t.encoder = &rtpmpeg1video.Encoder{ t.encoder = &rtpmpeg1video.Encoder{
PayloadMaxSize: t.udpMaxPayloadSize - 12, PayloadMaxSize: t.UDPMaxPayloadSize - 12,
} }
return t.encoder.Init() return t.encoder.Init()
} }
@ -96,16 +89,16 @@ func (t *formatProcessorMPEG1Video) ProcessRTPPacket( //nolint:dupl
pkt.Header.Padding = false pkt.Header.Padding = false
pkt.PaddingSize = 0 pkt.PaddingSize = 0
if pkt.MarshalSize() > t.udpMaxPayloadSize { if pkt.MarshalSize() > t.UDPMaxPayloadSize {
return nil, fmt.Errorf("payload size (%d) is greater than maximum allowed (%d)", return nil, fmt.Errorf("payload size (%d) is greater than maximum allowed (%d)",
pkt.MarshalSize(), t.udpMaxPayloadSize) pkt.MarshalSize(), t.UDPMaxPayloadSize)
} }
// decode from RTP // decode from RTP
if hasNonRTSPReaders || t.decoder != nil { if hasNonRTSPReaders || t.decoder != nil {
if t.decoder == nil { if t.decoder == nil {
var err error var err error
t.decoder, err = t.format.CreateDecoder() t.decoder, err = t.Format.CreateDecoder()
if err != nil { if err != nil {
return nil, err return nil, err
} }

View file

@ -13,45 +13,38 @@ import (
) )
type formatProcessorMPEG4Audio struct { type formatProcessorMPEG4Audio struct {
udpMaxPayloadSize int UDPMaxPayloadSize int
format *format.MPEG4Audio Format *format.MPEG4Audio
encoder *rtpmpeg4audio.Encoder GenerateRTPPackets bool
decoder *rtpmpeg4audio.Decoder
randomStart uint32 encoder *rtpmpeg4audio.Encoder
decoder *rtpmpeg4audio.Decoder
randomStart uint32
} }
func newMPEG4Audio( func (t *formatProcessorMPEG4Audio) initialize() error {
udpMaxPayloadSize int, if t.GenerateRTPPackets {
forma *format.MPEG4Audio,
generateRTPPackets bool,
) (*formatProcessorMPEG4Audio, error) {
t := &formatProcessorMPEG4Audio{
udpMaxPayloadSize: udpMaxPayloadSize,
format: forma,
}
if generateRTPPackets {
err := t.createEncoder() err := t.createEncoder()
if err != nil { if err != nil {
return nil, err return err
} }
t.randomStart, err = randUint32() t.randomStart, err = randUint32()
if err != nil { if err != nil {
return nil, err return err
} }
} }
return t, nil return nil
} }
func (t *formatProcessorMPEG4Audio) createEncoder() error { func (t *formatProcessorMPEG4Audio) createEncoder() error {
t.encoder = &rtpmpeg4audio.Encoder{ t.encoder = &rtpmpeg4audio.Encoder{
PayloadMaxSize: t.udpMaxPayloadSize - 12, PayloadMaxSize: t.UDPMaxPayloadSize - 12,
PayloadType: t.format.PayloadTyp, PayloadType: t.Format.PayloadTyp,
SizeLength: t.format.SizeLength, SizeLength: t.Format.SizeLength,
IndexLength: t.format.IndexLength, IndexLength: t.Format.IndexLength,
IndexDeltaLength: t.format.IndexDeltaLength, IndexDeltaLength: t.Format.IndexDeltaLength,
} }
return t.encoder.Init() return t.encoder.Init()
} }
@ -90,16 +83,16 @@ func (t *formatProcessorMPEG4Audio) ProcessRTPPacket( //nolint:dupl
pkt.Header.Padding = false pkt.Header.Padding = false
pkt.PaddingSize = 0 pkt.PaddingSize = 0
if pkt.MarshalSize() > t.udpMaxPayloadSize { if pkt.MarshalSize() > t.UDPMaxPayloadSize {
return nil, fmt.Errorf("payload size (%d) is greater than maximum allowed (%d)", return nil, fmt.Errorf("payload size (%d) is greater than maximum allowed (%d)",
pkt.MarshalSize(), t.udpMaxPayloadSize) pkt.MarshalSize(), t.UDPMaxPayloadSize)
} }
// decode from RTP // decode from RTP
if hasNonRTSPReaders || t.decoder != nil { if hasNonRTSPReaders || t.decoder != nil {
if t.decoder == nil { if t.decoder == nil {
var err error var err error
t.decoder, err = t.format.CreateDecoder() t.decoder, err = t.Format.CreateDecoder()
if err != nil { if err != nil {
return nil, err return nil, err
} }

View file

@ -27,42 +27,35 @@ var (
) )
type formatProcessorMPEG4Video struct { type formatProcessorMPEG4Video struct {
udpMaxPayloadSize int UDPMaxPayloadSize int
format *format.MPEG4Video Format *format.MPEG4Video
encoder *rtpmpeg4video.Encoder GenerateRTPPackets bool
decoder *rtpmpeg4video.Decoder
randomStart uint32 encoder *rtpmpeg4video.Encoder
decoder *rtpmpeg4video.Decoder
randomStart uint32
} }
func newMPEG4Video( func (t *formatProcessorMPEG4Video) initialize() error {
udpMaxPayloadSize int, if t.GenerateRTPPackets {
forma *format.MPEG4Video,
generateRTPPackets bool,
) (*formatProcessorMPEG4Video, error) {
t := &formatProcessorMPEG4Video{
udpMaxPayloadSize: udpMaxPayloadSize,
format: forma,
}
if generateRTPPackets {
err := t.createEncoder() err := t.createEncoder()
if err != nil { if err != nil {
return nil, err return err
} }
t.randomStart, err = randUint32() t.randomStart, err = randUint32()
if err != nil { if err != nil {
return nil, err return err
} }
} }
return t, nil return nil
} }
func (t *formatProcessorMPEG4Video) createEncoder() error { func (t *formatProcessorMPEG4Video) createEncoder() error {
t.encoder = &rtpmpeg4video.Encoder{ t.encoder = &rtpmpeg4video.Encoder{
PayloadMaxSize: t.udpMaxPayloadSize - 12, PayloadMaxSize: t.UDPMaxPayloadSize - 12,
PayloadType: t.format.PayloadTyp, PayloadType: t.Format.PayloadTyp,
} }
return t.encoder.Init() return t.encoder.Init()
} }
@ -75,8 +68,8 @@ func (t *formatProcessorMPEG4Video) updateTrackParameters(frame []byte) {
} }
conf := frame[:end+4] conf := frame[:end+4]
if !bytes.Equal(conf, t.format.Config) { if !bytes.Equal(conf, t.Format.Config) {
t.format.SafeSetParams(conf) t.Format.SafeSetParams(conf)
} }
} }
} }
@ -90,8 +83,8 @@ func (t *formatProcessorMPEG4Video) remuxFrame(frame []byte) []byte {
} }
if bytes.Contains(frame, []byte{0, 0, 1, byte(mpeg4video.GroupOfVOPStartCode)}) { if bytes.Contains(frame, []byte{0, 0, 1, byte(mpeg4video.GroupOfVOPStartCode)}) {
f := make([]byte, len(t.format.Config)+len(frame)) f := make([]byte, len(t.Format.Config)+len(frame))
n := copy(f, t.format.Config) n := copy(f, t.Format.Config)
copy(f[n:], frame) copy(f[n:], frame)
frame = f frame = f
} }
@ -141,16 +134,16 @@ func (t *formatProcessorMPEG4Video) ProcessRTPPacket( //nolint:dupl
pkt.Header.Padding = false pkt.Header.Padding = false
pkt.PaddingSize = 0 pkt.PaddingSize = 0
if pkt.MarshalSize() > t.udpMaxPayloadSize { if pkt.MarshalSize() > t.UDPMaxPayloadSize {
return nil, fmt.Errorf("payload size (%d) is greater than maximum allowed (%d)", return nil, fmt.Errorf("payload size (%d) is greater than maximum allowed (%d)",
pkt.MarshalSize(), t.udpMaxPayloadSize) pkt.MarshalSize(), t.UDPMaxPayloadSize)
} }
// decode from RTP // decode from RTP
if hasNonRTSPReaders || t.decoder != nil { if hasNonRTSPReaders || t.decoder != nil {
if t.decoder == nil { if t.decoder == nil {
var err error var err error
t.decoder, err = t.format.CreateDecoder() t.decoder, err = t.Format.CreateDecoder()
if err != nil { if err != nil {
return nil, err return nil, err
} }

View file

@ -13,42 +13,35 @@ import (
) )
type formatProcessorOpus struct { type formatProcessorOpus struct {
udpMaxPayloadSize int UDPMaxPayloadSize int
format *format.Opus Format *format.Opus
encoder *rtpsimpleaudio.Encoder GenerateRTPPackets bool
decoder *rtpsimpleaudio.Decoder
randomStart uint32 encoder *rtpsimpleaudio.Encoder
decoder *rtpsimpleaudio.Decoder
randomStart uint32
} }
func newOpus( func (t *formatProcessorOpus) initialize() error {
udpMaxPayloadSize int, if t.GenerateRTPPackets {
forma *format.Opus,
generateRTPPackets bool,
) (*formatProcessorOpus, error) {
t := &formatProcessorOpus{
udpMaxPayloadSize: udpMaxPayloadSize,
format: forma,
}
if generateRTPPackets {
err := t.createEncoder() err := t.createEncoder()
if err != nil { if err != nil {
return nil, err return err
} }
t.randomStart, err = randUint32() t.randomStart, err = randUint32()
if err != nil { if err != nil {
return nil, err return err
} }
} }
return t, nil return nil
} }
func (t *formatProcessorOpus) createEncoder() error { func (t *formatProcessorOpus) createEncoder() error {
t.encoder = &rtpsimpleaudio.Encoder{ t.encoder = &rtpsimpleaudio.Encoder{
PayloadMaxSize: t.udpMaxPayloadSize - 12, PayloadMaxSize: t.UDPMaxPayloadSize - 12,
PayloadType: t.format.PayloadTyp, PayloadType: t.Format.PayloadTyp,
} }
return t.encoder.Init() return t.encoder.Init()
} }
@ -68,7 +61,7 @@ func (t *formatProcessorOpus) ProcessUnit(uu unit.Unit) error { //nolint:dupl
pkt.Timestamp += t.randomStart + uint32(pts) pkt.Timestamp += t.randomStart + uint32(pts)
rtpPackets = append(rtpPackets, pkt) rtpPackets = append(rtpPackets, pkt)
pts += int64(opus.PacketDuration(packet)) * int64(t.format.ClockRate()) / int64(time.Second) pts += int64(opus.PacketDuration(packet)) * int64(t.Format.ClockRate()) / int64(time.Second)
} }
u.RTPPackets = rtpPackets u.RTPPackets = rtpPackets
@ -94,16 +87,16 @@ func (t *formatProcessorOpus) ProcessRTPPacket(
pkt.Header.Padding = false pkt.Header.Padding = false
pkt.PaddingSize = 0 pkt.PaddingSize = 0
if pkt.MarshalSize() > t.udpMaxPayloadSize { if pkt.MarshalSize() > t.UDPMaxPayloadSize {
return nil, fmt.Errorf("payload size (%d) is greater than maximum allowed (%d)", return nil, fmt.Errorf("payload size (%d) is greater than maximum allowed (%d)",
pkt.MarshalSize(), t.udpMaxPayloadSize) pkt.MarshalSize(), t.UDPMaxPayloadSize)
} }
// decode from RTP // decode from RTP
if hasNonRTSPReaders || t.decoder != nil { if hasNonRTSPReaders || t.decoder != nil {
if t.decoder == nil { if t.decoder == nil {
var err error var err error
t.decoder, err = t.format.CreateDecoder() t.decoder, err = t.Format.CreateDecoder()
if err != nil { if err != nil {
return nil, err return nil, err
} }

View file

@ -22,6 +22,8 @@ type Processor interface {
pts int64, pts int64,
hasNonRTSPReaders bool, hasNonRTSPReaders bool,
) (unit.Unit, error) ) (unit.Unit, error)
initialize() error
} }
// New allocates a Processor. // New allocates a Processor.
@ -30,50 +32,115 @@ func New(
forma format.Format, forma format.Format,
generateRTPPackets bool, generateRTPPackets bool,
) (Processor, error) { ) (Processor, error) {
var proc Processor
switch forma := forma.(type) { switch forma := forma.(type) {
case *format.AV1: case *format.AV1:
return newAV1(udpMaxPayloadSize, forma, generateRTPPackets) proc = &formatProcessorAV1{
UDPMaxPayloadSize: udpMaxPayloadSize,
Format: forma,
GenerateRTPPackets: generateRTPPackets,
}
case *format.VP9: case *format.VP9:
return newVP9(udpMaxPayloadSize, forma, generateRTPPackets) proc = &formatProcessorVP9{
UDPMaxPayloadSize: udpMaxPayloadSize,
Format: forma,
GenerateRTPPackets: generateRTPPackets,
}
case *format.VP8: case *format.VP8:
return newVP8(udpMaxPayloadSize, forma, generateRTPPackets) proc = &formatProcessorVP8{
UDPMaxPayloadSize: udpMaxPayloadSize,
Format: forma,
GenerateRTPPackets: generateRTPPackets,
}
case *format.H265: case *format.H265:
return newH265(udpMaxPayloadSize, forma, generateRTPPackets) proc = &formatProcessorH265{
UDPMaxPayloadSize: udpMaxPayloadSize,
Format: forma,
GenerateRTPPackets: generateRTPPackets,
}
case *format.H264: case *format.H264:
return newH264(udpMaxPayloadSize, forma, generateRTPPackets) proc = &formatProcessorH264{
UDPMaxPayloadSize: udpMaxPayloadSize,
Format: forma,
GenerateRTPPackets: generateRTPPackets,
}
case *format.MPEG4Video: case *format.MPEG4Video:
return newMPEG4Video(udpMaxPayloadSize, forma, generateRTPPackets) proc = &formatProcessorMPEG4Video{
UDPMaxPayloadSize: udpMaxPayloadSize,
Format: forma,
GenerateRTPPackets: generateRTPPackets,
}
case *format.MPEG1Video: case *format.MPEG1Video:
return newMPEG1Video(udpMaxPayloadSize, forma, generateRTPPackets) proc = &formatProcessorMPEG1Video{
UDPMaxPayloadSize: udpMaxPayloadSize,
Format: forma,
GenerateRTPPackets: generateRTPPackets,
}
case *format.Opus: case *format.Opus:
return newOpus(udpMaxPayloadSize, forma, generateRTPPackets) proc = &formatProcessorOpus{
UDPMaxPayloadSize: udpMaxPayloadSize,
Format: forma,
GenerateRTPPackets: generateRTPPackets,
}
case *format.MPEG4Audio: case *format.MPEG4Audio:
return newMPEG4Audio(udpMaxPayloadSize, forma, generateRTPPackets) proc = &formatProcessorMPEG4Audio{
UDPMaxPayloadSize: udpMaxPayloadSize,
Format: forma,
GenerateRTPPackets: generateRTPPackets,
}
case *format.MPEG1Audio: case *format.MPEG1Audio:
return newMPEG1Audio(udpMaxPayloadSize, forma, generateRTPPackets) proc = &formatProcessorMPEG1Audio{
UDPMaxPayloadSize: udpMaxPayloadSize,
Format: forma,
GenerateRTPPackets: generateRTPPackets,
}
case *format.MJPEG: case *format.MJPEG:
return newMJPEG(udpMaxPayloadSize, forma, generateRTPPackets) proc = &formatProcessorMJPEG{
UDPMaxPayloadSize: udpMaxPayloadSize,
Format: forma,
GenerateRTPPackets: generateRTPPackets,
}
case *format.AC3: case *format.AC3:
return newAC3(udpMaxPayloadSize, forma, generateRTPPackets) proc = &formatProcessorAC3{
UDPMaxPayloadSize: udpMaxPayloadSize,
Format: forma,
GenerateRTPPackets: generateRTPPackets,
}
case *format.G711: case *format.G711:
return newG711(udpMaxPayloadSize, forma, generateRTPPackets) proc = &formatProcessorG711{
UDPMaxPayloadSize: udpMaxPayloadSize,
Format: forma,
GenerateRTPPackets: generateRTPPackets,
}
case *format.LPCM: case *format.LPCM:
return newLPCM(udpMaxPayloadSize, forma, generateRTPPackets) proc = &formatProcessorLPCM{
UDPMaxPayloadSize: udpMaxPayloadSize,
Format: forma,
GenerateRTPPackets: generateRTPPackets,
}
default: default:
return newGeneric(udpMaxPayloadSize, forma, generateRTPPackets) proc = &formatProcessorGeneric{
UDPMaxPayloadSize: udpMaxPayloadSize,
Format: forma,
GenerateRTPPackets: generateRTPPackets,
}
} }
err := proc.initialize()
return proc, err
} }

View file

@ -13,42 +13,35 @@ import (
) )
type formatProcessorVP8 struct { type formatProcessorVP8 struct {
udpMaxPayloadSize int UDPMaxPayloadSize int
format *format.VP8 Format *format.VP8
encoder *rtpvp8.Encoder GenerateRTPPackets bool
decoder *rtpvp8.Decoder
randomStart uint32 encoder *rtpvp8.Encoder
decoder *rtpvp8.Decoder
randomStart uint32
} }
func newVP8( func (t *formatProcessorVP8) initialize() error {
udpMaxPayloadSize int, if t.GenerateRTPPackets {
forma *format.VP8,
generateRTPPackets bool,
) (*formatProcessorVP8, error) {
t := &formatProcessorVP8{
udpMaxPayloadSize: udpMaxPayloadSize,
format: forma,
}
if generateRTPPackets {
err := t.createEncoder() err := t.createEncoder()
if err != nil { if err != nil {
return nil, err return err
} }
t.randomStart, err = randUint32() t.randomStart, err = randUint32()
if err != nil { if err != nil {
return nil, err return err
} }
} }
return t, nil return nil
} }
func (t *formatProcessorVP8) createEncoder() error { func (t *formatProcessorVP8) createEncoder() error {
t.encoder = &rtpvp8.Encoder{ t.encoder = &rtpvp8.Encoder{
PayloadMaxSize: t.udpMaxPayloadSize - 12, PayloadMaxSize: t.UDPMaxPayloadSize - 12,
PayloadType: t.format.PayloadTyp, PayloadType: t.Format.PayloadTyp,
} }
return t.encoder.Init() return t.encoder.Init()
} }
@ -87,16 +80,16 @@ func (t *formatProcessorVP8) ProcessRTPPacket( //nolint:dupl
pkt.Header.Padding = false pkt.Header.Padding = false
pkt.PaddingSize = 0 pkt.PaddingSize = 0
if pkt.MarshalSize() > t.udpMaxPayloadSize { if pkt.MarshalSize() > t.UDPMaxPayloadSize {
return nil, fmt.Errorf("payload size (%d) is greater than maximum allowed (%d)", return nil, fmt.Errorf("payload size (%d) is greater than maximum allowed (%d)",
pkt.MarshalSize(), t.udpMaxPayloadSize) pkt.MarshalSize(), t.UDPMaxPayloadSize)
} }
// decode from RTP // decode from RTP
if hasNonRTSPReaders || t.decoder != nil { if hasNonRTSPReaders || t.decoder != nil {
if t.decoder == nil { if t.decoder == nil {
var err error var err error
t.decoder, err = t.format.CreateDecoder() t.decoder, err = t.Format.CreateDecoder()
if err != nil { if err != nil {
return nil, err return nil, err
} }

View file

@ -13,42 +13,35 @@ import (
) )
type formatProcessorVP9 struct { type formatProcessorVP9 struct {
udpMaxPayloadSize int UDPMaxPayloadSize int
format *format.VP9 Format *format.VP9
encoder *rtpvp9.Encoder GenerateRTPPackets bool
decoder *rtpvp9.Decoder
randomStart uint32 encoder *rtpvp9.Encoder
decoder *rtpvp9.Decoder
randomStart uint32
} }
func newVP9( func (t *formatProcessorVP9) initialize() error {
udpMaxPayloadSize int, if t.GenerateRTPPackets {
forma *format.VP9,
generateRTPPackets bool,
) (*formatProcessorVP9, error) {
t := &formatProcessorVP9{
udpMaxPayloadSize: udpMaxPayloadSize,
format: forma,
}
if generateRTPPackets {
err := t.createEncoder() err := t.createEncoder()
if err != nil { if err != nil {
return nil, err return err
} }
t.randomStart, err = randUint32() t.randomStart, err = randUint32()
if err != nil { if err != nil {
return nil, err return err
} }
} }
return t, nil return nil
} }
func (t *formatProcessorVP9) createEncoder() error { func (t *formatProcessorVP9) createEncoder() error {
t.encoder = &rtpvp9.Encoder{ t.encoder = &rtpvp9.Encoder{
PayloadMaxSize: t.udpMaxPayloadSize - 12, PayloadMaxSize: t.UDPMaxPayloadSize - 12,
PayloadType: t.format.PayloadTyp, PayloadType: t.Format.PayloadTyp,
} }
return t.encoder.Init() return t.encoder.Init()
} }
@ -87,16 +80,16 @@ func (t *formatProcessorVP9) ProcessRTPPacket( //nolint:dupl
pkt.Header.Padding = false pkt.Header.Padding = false
pkt.PaddingSize = 0 pkt.PaddingSize = 0
if pkt.MarshalSize() > t.udpMaxPayloadSize { if pkt.MarshalSize() > t.UDPMaxPayloadSize {
return nil, fmt.Errorf("payload size (%d) is greater than maximum allowed (%d)", return nil, fmt.Errorf("payload size (%d) is greater than maximum allowed (%d)",
pkt.MarshalSize(), t.udpMaxPayloadSize) pkt.MarshalSize(), t.UDPMaxPayloadSize)
} }
// decode from RTP // decode from RTP
if hasNonRTSPReaders || t.decoder != nil { if hasNonRTSPReaders || t.decoder != nil {
if t.decoder == nil { if t.decoder == nil {
var err error var err error
t.decoder, err = t.format.CreateDecoder() t.decoder, err = t.Format.CreateDecoder()
if err != nil { if err != nil {
return nil, err return nil, err
} }

View file

@ -36,7 +36,7 @@ func setupVideoTrack(
} }
var videoFormatAV1 *format.AV1 var videoFormatAV1 *format.AV1
videoMedia := strea.Desc().FindFormat(&videoFormatAV1) videoMedia := strea.Desc.FindFormat(&videoFormatAV1)
if videoFormatAV1 != nil { if videoFormatAV1 != nil {
track := &gohlslib.Track{ track := &gohlslib.Track{
@ -71,7 +71,7 @@ func setupVideoTrack(
} }
var videoFormatVP9 *format.VP9 var videoFormatVP9 *format.VP9
videoMedia = strea.Desc().FindFormat(&videoFormatVP9) videoMedia = strea.Desc.FindFormat(&videoFormatVP9)
if videoFormatVP9 != nil { if videoFormatVP9 != nil {
track := &gohlslib.Track{ track := &gohlslib.Track{
@ -106,7 +106,7 @@ func setupVideoTrack(
} }
var videoFormatH265 *format.H265 var videoFormatH265 *format.H265
videoMedia = strea.Desc().FindFormat(&videoFormatH265) videoMedia = strea.Desc.FindFormat(&videoFormatH265)
if videoFormatH265 != nil { if videoFormatH265 != nil {
vps, sps, pps := videoFormatH265.SafeParams() vps, sps, pps := videoFormatH265.SafeParams()
@ -146,7 +146,7 @@ func setupVideoTrack(
} }
var videoFormatH264 *format.H264 var videoFormatH264 *format.H264
videoMedia = strea.Desc().FindFormat(&videoFormatH264) videoMedia = strea.Desc.FindFormat(&videoFormatH264)
if videoFormatH264 != nil { if videoFormatH264 != nil {
sps, pps := videoFormatH264.SafeParams() sps, pps := videoFormatH264.SafeParams()
@ -202,7 +202,7 @@ func setupAudioTracks(
strea.AddReader(reader, medi, forma, readFunc) strea.AddReader(reader, medi, forma, readFunc)
} }
for _, media := range strea.Desc().Medias { for _, media := range strea.Desc.Medias {
for _, forma := range media.Formats { for _, forma := range media.Formats {
switch forma := forma.(type) { switch forma := forma.(type) {
case *format.Opus: case *format.Opus:
@ -297,7 +297,7 @@ func FromStream(
} }
n := 1 n := 1
for _, media := range stream.Desc().Medias { for _, media := range stream.Desc.Medias {
for _, forma := range media.Formats { for _, forma := range media.Formats {
if _, ok := setuppedFormats[forma]; !ok { if _, ok := setuppedFormats[forma]; !ok {
reader.Log(logger.Warn, "skipping track %d (%s)", n, forma.Codec()) reader.Log(logger.Warn, "skipping track %d (%s)", n, forma.Codec())

View file

@ -14,16 +14,17 @@ import (
) )
func TestFromStreamNoSupportedCodecs(t *testing.T) { func TestFromStreamNoSupportedCodecs(t *testing.T) {
stream, err := stream.New( strm := &stream.Stream{
512, WriteQueueSize: 512,
1460, UDPMaxPayloadSize: 1472,
&description.Session{Medias: []*description.Media{{ Desc: &description.Session{Medias: []*description.Media{{
Type: description.MediaTypeVideo, Type: description.MediaTypeVideo,
Formats: []format.Format{&format.VP8{}}, Formats: []format.Format{&format.VP8{}},
}}}, }}},
true, GenerateRTPPackets: true,
test.NilLogger, DecodeErrLogger: test.NilLogger,
) }
err := strm.Initialize()
require.NoError(t, err) require.NoError(t, err)
l := test.Logger(func(logger.Level, string, ...interface{}) { l := test.Logger(func(logger.Level, string, ...interface{}) {
@ -32,15 +33,15 @@ func TestFromStreamNoSupportedCodecs(t *testing.T) {
m := &gohlslib.Muxer{} m := &gohlslib.Muxer{}
err = FromStream(stream, l, m) err = FromStream(strm, l, m)
require.Equal(t, ErrNoSupportedCodecs, err) require.Equal(t, ErrNoSupportedCodecs, err)
} }
func TestFromStreamSkipUnsupportedTracks(t *testing.T) { func TestFromStreamSkipUnsupportedTracks(t *testing.T) {
stream, err := stream.New( strm := &stream.Stream{
512, WriteQueueSize: 512,
1460, UDPMaxPayloadSize: 1472,
&description.Session{Medias: []*description.Media{ Desc: &description.Session{Medias: []*description.Media{
{ {
Type: description.MediaTypeVideo, Type: description.MediaTypeVideo,
Formats: []format.Format{&format.VP9{}}, Formats: []format.Format{&format.VP9{}},
@ -54,9 +55,10 @@ func TestFromStreamSkipUnsupportedTracks(t *testing.T) {
Formats: []format.Format{&format.MPEG1Audio{}}, Formats: []format.Format{&format.MPEG1Audio{}},
}, },
}}, }},
true, GenerateRTPPackets: true,
test.NilLogger, DecodeErrLogger: test.NilLogger,
) }
err := strm.Initialize()
require.NoError(t, err) require.NoError(t, err)
m := &gohlslib.Muxer{} m := &gohlslib.Muxer{}
@ -74,9 +76,9 @@ func TestFromStreamSkipUnsupportedTracks(t *testing.T) {
n++ n++
}) })
err = FromStream(stream, l, m) err = FromStream(strm, l, m)
require.NoError(t, err) require.NoError(t, err)
defer stream.RemoveReader(l) defer strm.RemoveReader(l)
require.Equal(t, 2, n) require.Equal(t, 2, n)
} }

View file

@ -50,8 +50,12 @@ func (s *Server) Initialize() error {
return fmt.Errorf("server cert is missing") return fmt.Errorf("server cert is missing")
} }
var err error s.loader = &certloader.CertLoader{
s.loader, err = certloader.New(s.ServerCert, s.ServerKey, s.Parent) CertPath: s.ServerCert,
KeyPath: s.ServerKey,
Parent: s.Parent,
}
err := s.loader.Initialize()
if err != nil { if err != nil {
return err return err
} }

View file

@ -47,7 +47,7 @@ func FromStream(
strea.AddReader(reader, media, forma, readFunc) strea.AddReader(reader, media, forma, readFunc)
} }
for _, media := range strea.Desc().Medias { for _, media := range strea.Desc.Medias {
for _, forma := range media.Formats { for _, forma := range media.Formats {
clockRate := forma.ClockRate() clockRate := forma.ClockRate()
@ -317,7 +317,7 @@ func FromStream(
} }
n := 1 n := 1
for _, medi := range strea.Desc().Medias { for _, medi := range strea.Desc.Medias {
for _, forma := range medi.Formats { for _, forma := range medi.Formats {
if _, ok := setuppedFormats[forma]; !ok { if _, ok := setuppedFormats[forma]; !ok {
reader.Log(logger.Warn, "skipping track %d (%s)", n, forma.Codec()) reader.Log(logger.Warn, "skipping track %d (%s)", n, forma.Codec())

View file

@ -13,31 +13,32 @@ import (
) )
func TestFromStreamNoSupportedCodecs(t *testing.T) { func TestFromStreamNoSupportedCodecs(t *testing.T) {
stream, err := stream.New( strm := &stream.Stream{
512, WriteQueueSize: 512,
1460, UDPMaxPayloadSize: 1472,
&description.Session{Medias: []*description.Media{{ Desc: &description.Session{Medias: []*description.Media{{
Type: description.MediaTypeVideo, Type: description.MediaTypeVideo,
Formats: []format.Format{&format.VP8{}}, Formats: []format.Format{&format.VP8{}},
}}}, }}},
true, GenerateRTPPackets: true,
test.NilLogger, DecodeErrLogger: test.NilLogger,
) }
err := strm.Initialize()
require.NoError(t, err) require.NoError(t, err)
l := test.Logger(func(logger.Level, string, ...interface{}) { l := test.Logger(func(logger.Level, string, ...interface{}) {
t.Error("should not happen") t.Error("should not happen")
}) })
err = FromStream(stream, l, nil, nil, 0) err = FromStream(strm, l, nil, nil, 0)
require.Equal(t, errNoSupportedCodecs, err) require.Equal(t, errNoSupportedCodecs, err)
} }
func TestFromStreamSkipUnsupportedTracks(t *testing.T) { func TestFromStreamSkipUnsupportedTracks(t *testing.T) {
stream, err := stream.New( strm := &stream.Stream{
512, WriteQueueSize: 512,
1460, UDPMaxPayloadSize: 1472,
&description.Session{Medias: []*description.Media{ Desc: &description.Session{Medias: []*description.Media{
{ {
Type: description.MediaTypeVideo, Type: description.MediaTypeVideo,
Formats: []format.Format{&format.H265{}}, Formats: []format.Format{&format.H265{}},
@ -47,9 +48,10 @@ func TestFromStreamSkipUnsupportedTracks(t *testing.T) {
Formats: []format.Format{&format.VP8{}}, Formats: []format.Format{&format.VP8{}},
}, },
}}, }},
true, GenerateRTPPackets: true,
test.NilLogger, DecodeErrLogger: test.NilLogger,
) }
err := strm.Initialize()
require.NoError(t, err) require.NoError(t, err)
n := 0 n := 0
@ -62,9 +64,9 @@ func TestFromStreamSkipUnsupportedTracks(t *testing.T) {
n++ n++
}) })
err = FromStream(stream, l, nil, nil, 0) err = FromStream(strm, l, nil, nil, 0)
require.NoError(t, err) require.NoError(t, err)
defer stream.RemoveReader(l) defer strm.RemoveReader(l)
require.Equal(t, 1, n) require.Equal(t, 1, n)
} }

View file

@ -16,7 +16,7 @@ import (
) )
var errNoSupportedCodecsFrom = errors.New( var errNoSupportedCodecsFrom = errors.New(
"the stream doesn't contain any supported codec, which are currently H264, MPEG-4 Audio, MPEG-1/2 Audio") "the strm doesn't contain any supported codec, which are currently H264, MPEG-4 Audio, MPEG-1/2 Audio")
func multiplyAndDivide2(v, m, d time.Duration) time.Duration { func multiplyAndDivide2(v, m, d time.Duration) time.Duration {
secs := v / d secs := v / d
@ -29,19 +29,19 @@ func timestampToDuration(t int64, clockRate int) time.Duration {
} }
func setupVideo( func setupVideo(
strea *stream.Stream, str *stream.Stream,
reader stream.Reader, reader stream.Reader,
w **Writer, w **Writer,
nconn net.Conn, nconn net.Conn,
writeTimeout time.Duration, writeTimeout time.Duration,
) format.Format { ) format.Format {
var videoFormatH264 *format.H264 var videoFormatH264 *format.H264
videoMedia := strea.Desc().FindFormat(&videoFormatH264) videoMedia := str.Desc.FindFormat(&videoFormatH264)
if videoFormatH264 != nil { if videoFormatH264 != nil {
var videoDTSExtractor *h264.DTSExtractor var videoDTSExtractor *h264.DTSExtractor
strea.AddReader( str.AddReader(
reader, reader,
videoMedia, videoMedia,
videoFormatH264, videoFormatH264,
@ -96,17 +96,17 @@ func setupVideo(
} }
func setupAudio( func setupAudio(
strea *stream.Stream, str *stream.Stream,
reader stream.Reader, reader stream.Reader,
w **Writer, w **Writer,
nconn net.Conn, nconn net.Conn,
writeTimeout time.Duration, writeTimeout time.Duration,
) format.Format { ) format.Format {
var audioFormatMPEG4Audio *format.MPEG4Audio var audioFormatMPEG4Audio *format.MPEG4Audio
audioMedia := strea.Desc().FindFormat(&audioFormatMPEG4Audio) audioMedia := str.Desc.FindFormat(&audioFormatMPEG4Audio)
if audioMedia != nil { if audioMedia != nil {
strea.AddReader( str.AddReader(
reader, reader,
audioMedia, audioMedia,
audioFormatMPEG4Audio, audioFormatMPEG4Audio,
@ -137,10 +137,10 @@ func setupAudio(
} }
var audioFormatMPEG1 *format.MPEG1Audio var audioFormatMPEG1 *format.MPEG1Audio
audioMedia = strea.Desc().FindFormat(&audioFormatMPEG1) audioMedia = str.Desc.FindFormat(&audioFormatMPEG1)
if audioMedia != nil { if audioMedia != nil {
strea.AddReader( str.AddReader(
reader, reader,
audioMedia, audioMedia,
audioFormatMPEG1, audioFormatMPEG1,
@ -182,9 +182,9 @@ func setupAudio(
return nil return nil
} }
// FromStream maps a MediaMTX stream to a RTMP stream. // FromStream maps a MediaMTX strm to a RTMP strm.
func FromStream( func FromStream(
stream *stream.Stream, str *stream.Stream,
reader stream.Reader, reader stream.Reader,
conn *Conn, conn *Conn,
nconn net.Conn, nconn net.Conn,
@ -193,7 +193,7 @@ func FromStream(
var w *Writer var w *Writer
videoFormat := setupVideo( videoFormat := setupVideo(
stream, str,
reader, reader,
&w, &w,
nconn, nconn,
@ -201,7 +201,7 @@ func FromStream(
) )
audioFormat := setupAudio( audioFormat := setupAudio(
stream, str,
reader, reader,
&w, &w,
nconn, nconn,
@ -219,7 +219,7 @@ func FromStream(
} }
n := 1 n := 1
for _, media := range stream.Desc().Medias { for _, media := range str.Desc.Medias {
for _, forma := range media.Formats { for _, forma := range media.Formats {
if forma != videoFormat && forma != audioFormat { if forma != videoFormat && forma != audioFormat {
reader.Log(logger.Warn, "skipping track %d (%s)", n, forma.Codec()) reader.Log(logger.Warn, "skipping track %d (%s)", n, forma.Codec())

View file

@ -16,31 +16,32 @@ import (
) )
func TestFromStreamNoSupportedCodecs(t *testing.T) { func TestFromStreamNoSupportedCodecs(t *testing.T) {
stream, err := stream.New( strm := &stream.Stream{
512, WriteQueueSize: 512,
1460, UDPMaxPayloadSize: 1472,
&description.Session{Medias: []*description.Media{{ Desc: &description.Session{Medias: []*description.Media{{
Type: description.MediaTypeVideo, Type: description.MediaTypeVideo,
Formats: []format.Format{&format.VP8{}}, Formats: []format.Format{&format.VP8{}},
}}}, }}},
true, GenerateRTPPackets: true,
test.NilLogger, DecodeErrLogger: test.NilLogger,
) }
err := strm.Initialize()
require.NoError(t, err) require.NoError(t, err)
l := test.Logger(func(logger.Level, string, ...interface{}) { l := test.Logger(func(logger.Level, string, ...interface{}) {
t.Error("should not happen") t.Error("should not happen")
}) })
err = FromStream(stream, l, nil, nil, 0) err = FromStream(strm, l, nil, nil, 0)
require.Equal(t, errNoSupportedCodecsFrom, err) require.Equal(t, errNoSupportedCodecsFrom, err)
} }
func TestFromStreamSkipUnsupportedTracks(t *testing.T) { func TestFromStreamSkipUnsupportedTracks(t *testing.T) {
stream, err := stream.New( strm := &stream.Stream{
512, WriteQueueSize: 512,
1460, UDPMaxPayloadSize: 1472,
&description.Session{Medias: []*description.Media{ Desc: &description.Session{Medias: []*description.Media{
{ {
Type: description.MediaTypeVideo, Type: description.MediaTypeVideo,
Formats: []format.Format{&format.VP8{}}, Formats: []format.Format{&format.VP8{}},
@ -54,9 +55,10 @@ func TestFromStreamSkipUnsupportedTracks(t *testing.T) {
Formats: []format.Format{&format.H264{}}, Formats: []format.Format{&format.H264{}},
}, },
}}, }},
true, GenerateRTPPackets: true,
test.NilLogger, DecodeErrLogger: test.NilLogger,
) }
err := strm.Initialize()
require.NoError(t, err) require.NoError(t, err)
n := 0 n := 0
@ -76,9 +78,9 @@ func TestFromStreamSkipUnsupportedTracks(t *testing.T) {
bc := bytecounter.NewReadWriter(&buf) bc := bytecounter.NewReadWriter(&buf)
conn := &Conn{mrw: message.NewReadWriter(&buf, bc, false)} conn := &Conn{mrw: message.NewReadWriter(&buf, bc, false)}
err = FromStream(stream, l, conn, nil, 0) err = FromStream(strm, l, conn, nil, 0)
require.NoError(t, err) require.NoError(t, err)
defer stream.RemoveReader(l) defer strm.RemoveReader(l)
require.Equal(t, 2, n) require.Equal(t, 2, n)
} }

View file

@ -46,7 +46,7 @@ func setupVideoTrack(
pc *PeerConnection, pc *PeerConnection,
) (format.Format, error) { ) (format.Format, error) {
var av1Format *format.AV1 var av1Format *format.AV1
media := stream.Desc().FindFormat(&av1Format) media := stream.Desc.FindFormat(&av1Format)
if av1Format != nil { if av1Format != nil {
track := &OutgoingTrack{ track := &OutgoingTrack{
@ -94,7 +94,7 @@ func setupVideoTrack(
} }
var vp9Format *format.VP9 var vp9Format *format.VP9
media = stream.Desc().FindFormat(&vp9Format) media = stream.Desc.FindFormat(&vp9Format)
if vp9Format != nil { if vp9Format != nil {
track := &OutgoingTrack{ track := &OutgoingTrack{
@ -144,7 +144,7 @@ func setupVideoTrack(
} }
var vp8Format *format.VP8 var vp8Format *format.VP8
media = stream.Desc().FindFormat(&vp8Format) media = stream.Desc.FindFormat(&vp8Format)
if vp8Format != nil { if vp8Format != nil {
track := &OutgoingTrack{ track := &OutgoingTrack{
@ -192,7 +192,7 @@ func setupVideoTrack(
} }
var h265Format *format.H265 var h265Format *format.H265
media = stream.Desc().FindFormat(&h265Format) media = stream.Desc.FindFormat(&h265Format)
if h265Format != nil { //nolint:dupl if h265Format != nil { //nolint:dupl
track := &OutgoingTrack{ track := &OutgoingTrack{
@ -251,7 +251,7 @@ func setupVideoTrack(
} }
var h264Format *format.H264 var h264Format *format.H264
media = stream.Desc().FindFormat(&h264Format) media = stream.Desc.FindFormat(&h264Format)
if h264Format != nil { //nolint:dupl if h264Format != nil { //nolint:dupl
track := &OutgoingTrack{ track := &OutgoingTrack{
@ -318,7 +318,7 @@ func setupAudioTrack(
pc *PeerConnection, pc *PeerConnection,
) (format.Format, error) { ) (format.Format, error) {
var opusFormat *format.Opus var opusFormat *format.Opus
media := stream.Desc().FindFormat(&opusFormat) media := stream.Desc.FindFormat(&opusFormat)
if opusFormat != nil { if opusFormat != nil {
var caps webrtc.RTPCodecCapability var caps webrtc.RTPCodecCapability
@ -371,7 +371,7 @@ func setupAudioTrack(
} }
var g722Format *format.G722 var g722Format *format.G722
media = stream.Desc().FindFormat(&g722Format) media = stream.Desc.FindFormat(&g722Format)
if g722Format != nil { if g722Format != nil {
track := &OutgoingTrack{ track := &OutgoingTrack{
@ -398,7 +398,7 @@ func setupAudioTrack(
} }
var g711Format *format.G711 var g711Format *format.G711
media = stream.Desc().FindFormat(&g711Format) media = stream.Desc.FindFormat(&g711Format)
if g711Format != nil { if g711Format != nil {
// These are the sample rates and channels supported by Chrome. // These are the sample rates and channels supported by Chrome.
@ -534,7 +534,7 @@ func setupAudioTrack(
} }
var lpcmFormat *format.LPCM var lpcmFormat *format.LPCM
media = stream.Desc().FindFormat(&lpcmFormat) media = stream.Desc.FindFormat(&lpcmFormat)
if lpcmFormat != nil { if lpcmFormat != nil {
if lpcmFormat.BitDepth != 16 { if lpcmFormat.BitDepth != 16 {
@ -632,7 +632,7 @@ func FromStream(
} }
n := 1 n := 1
for _, media := range stream.Desc().Medias { for _, media := range stream.Desc.Medias {
for _, forma := range media.Formats { for _, forma := range media.Formats {
if forma != videoFormat && forma != audioFormat { if forma != videoFormat && forma != audioFormat {
reader.Log(logger.Warn, "skipping track %d (%s)", n, forma.Codec()) reader.Log(logger.Warn, "skipping track %d (%s)", n, forma.Codec())

View file

@ -13,31 +13,32 @@ import (
) )
func TestFromStreamNoSupportedCodecs(t *testing.T) { func TestFromStreamNoSupportedCodecs(t *testing.T) {
stream, err := stream.New( strm := &stream.Stream{
512, WriteQueueSize: 512,
1460, UDPMaxPayloadSize: 1472,
&description.Session{Medias: []*description.Media{{ Desc: &description.Session{Medias: []*description.Media{{
Type: description.MediaTypeVideo, Type: description.MediaTypeVideo,
Formats: []format.Format{&format.MJPEG{}}, Formats: []format.Format{&format.MJPEG{}},
}}}, }}},
true, GenerateRTPPackets: true,
test.NilLogger, DecodeErrLogger: test.NilLogger,
) }
err := strm.Initialize()
require.NoError(t, err) require.NoError(t, err)
l := test.Logger(func(logger.Level, string, ...interface{}) { l := test.Logger(func(logger.Level, string, ...interface{}) {
t.Error("should not happen") t.Error("should not happen")
}) })
err = FromStream(stream, l, nil) err = FromStream(strm, l, nil)
require.Equal(t, errNoSupportedCodecsFrom, err) require.Equal(t, errNoSupportedCodecsFrom, err)
} }
func TestFromStreamSkipUnsupportedTracks(t *testing.T) { func TestFromStreamSkipUnsupportedTracks(t *testing.T) {
stream, err := stream.New( strm := &stream.Stream{
512, WriteQueueSize: 512,
1460, UDPMaxPayloadSize: 1472,
&description.Session{Medias: []*description.Media{ Desc: &description.Session{Medias: []*description.Media{
{ {
Type: description.MediaTypeVideo, Type: description.MediaTypeVideo,
Formats: []format.Format{&format.H264{}}, Formats: []format.Format{&format.H264{}},
@ -47,9 +48,10 @@ func TestFromStreamSkipUnsupportedTracks(t *testing.T) {
Formats: []format.Format{&format.MJPEG{}}, Formats: []format.Format{&format.MJPEG{}},
}, },
}}, }},
true, GenerateRTPPackets: true,
test.NilLogger, DecodeErrLogger: test.NilLogger,
) }
err := strm.Initialize()
require.NoError(t, err) require.NoError(t, err)
n := 0 n := 0
@ -64,9 +66,9 @@ func TestFromStreamSkipUnsupportedTracks(t *testing.T) {
pc := &PeerConnection{} pc := &PeerConnection{}
err = FromStream(stream, l, pc) err = FromStream(strm, l, pc)
require.NoError(t, err) require.NoError(t, err)
defer stream.RemoveReader(l) defer strm.RemoveReader(l)
require.Equal(t, 1, n) require.Equal(t, 1, n)
} }
@ -74,25 +76,26 @@ func TestFromStreamSkipUnsupportedTracks(t *testing.T) {
func TestFromStream(t *testing.T) { func TestFromStream(t *testing.T) {
for _, ca := range toFromStreamCases { for _, ca := range toFromStreamCases {
t.Run(ca.name, func(t *testing.T) { t.Run(ca.name, func(t *testing.T) {
stream, err := stream.New( strm := &stream.Stream{
512, WriteQueueSize: 512,
1460, UDPMaxPayloadSize: 1472,
&description.Session{ Desc: &description.Session{
Medias: []*description.Media{{ Medias: []*description.Media{{
Formats: []format.Format{ca.in}, Formats: []format.Format{ca.in},
}}, }},
}, },
false, GenerateRTPPackets: false,
test.NilLogger, DecodeErrLogger: test.NilLogger,
) }
err := strm.Initialize()
require.NoError(t, err) require.NoError(t, err)
defer stream.Close() defer strm.Close()
pc := &PeerConnection{} pc := &PeerConnection{}
err = FromStream(stream, nil, pc) err = FromStream(strm, nil, pc)
require.NoError(t, err) require.NoError(t, err)
defer stream.RemoveReader(nil) defer strm.RemoveReader(nil)
require.Equal(t, ca.webrtcCaps, pc.OutgoingTracks[0].Caps) require.Equal(t, ca.webrtcCaps, pc.OutgoingTracks[0].Caps)
}) })

View file

@ -135,7 +135,7 @@ func (f *formatFMP4) initialize() bool {
} }
} }
for _, media := range f.ri.rec.Stream.Desc().Medias { for _, media := range f.ri.rec.Stream.Desc.Medias {
for _, forma := range media.Formats { for _, forma := range media.Formats {
clockRate := forma.ClockRate() clockRate := forma.ClockRate()
@ -852,7 +852,7 @@ func (f *formatFMP4) initialize() bool {
} }
n := 1 n := 1
for _, medi := range f.ri.rec.Stream.Desc().Medias { for _, medi := range f.ri.rec.Stream.Desc.Medias {
for _, forma := range medi.Formats { for _, forma := range medi.Formats {
if _, ok := setuppedFormatsMap[forma]; !ok { if _, ok := setuppedFormatsMap[forma]; !ok {
f.ri.Log(logger.Warn, "skipping track %d (%s)", n, forma.Codec()) f.ri.Log(logger.Warn, "skipping track %d (%s)", n, forma.Codec())

View file

@ -77,7 +77,7 @@ func (f *formatMPEGTS) initialize() bool {
return track return track
} }
for _, media := range f.ri.rec.Stream.Desc().Medias { for _, media := range f.ri.rec.Stream.Desc.Medias {
for _, forma := range media.Formats { for _, forma := range media.Formats {
clockRate := forma.ClockRate() clockRate := forma.ClockRate()
@ -383,7 +383,7 @@ func (f *formatMPEGTS) initialize() bool {
} }
n := 1 n := 1
for _, medi := range f.ri.rec.Stream.Desc().Medias { for _, medi := range f.ri.rec.Stream.Desc.Medias {
for _, forma := range medi.Formats { for _, forma := range medi.Formats {
if _, ok := setuppedFormatsMap[forma]; !ok { if _, ok := setuppedFormatsMap[forma]; !ok {
f.ri.Log(logger.Warn, "skipping track %d (%s)", n, forma.Codec()) f.ri.Log(logger.Warn, "skipping track %d (%s)", n, forma.Codec())

View file

@ -124,15 +124,16 @@ func TestRecorder(t *testing.T) {
for _, ca := range []string{"fmp4", "mpegts"} { for _, ca := range []string{"fmp4", "mpegts"} {
t.Run(ca, func(t *testing.T) { t.Run(ca, func(t *testing.T) {
stream, err := stream.New( strm := &stream.Stream{
512, WriteQueueSize: 512,
1460, UDPMaxPayloadSize: 1472,
desc, Desc: desc,
true, GenerateRTPPackets: true,
test.NilLogger, DecodeErrLogger: test.NilLogger,
) }
err := strm.Initialize()
require.NoError(t, err) require.NoError(t, err)
defer stream.Close() defer strm.Close()
dir, err := os.MkdirTemp("", "mediamtx-agent") dir, err := os.MkdirTemp("", "mediamtx-agent")
require.NoError(t, err) require.NoError(t, err)
@ -165,7 +166,7 @@ func TestRecorder(t *testing.T) {
PartDuration: 100 * time.Millisecond, PartDuration: 100 * time.Millisecond,
SegmentDuration: 1 * time.Second, SegmentDuration: 1 * time.Second,
PathName: "mypath", PathName: "mypath",
Stream: stream, Stream: strm,
OnSegmentCreate: func(segPath string) { OnSegmentCreate: func(segPath string) {
switch n { switch n {
case 0: case 0:
@ -197,16 +198,16 @@ func TestRecorder(t *testing.T) {
} }
w.Initialize() w.Initialize()
writeToStream(stream, writeToStream(strm,
50*90000, 50*90000,
time.Date(2008, 5, 20, 22, 15, 25, 0, time.UTC)) time.Date(2008, 5, 20, 22, 15, 25, 0, time.UTC))
writeToStream(stream, writeToStream(strm,
52*90000, 52*90000,
time.Date(2008, 5, 20, 22, 16, 25, 0, time.UTC)) time.Date(2008, 5, 20, 22, 16, 25, 0, time.UTC))
// simulate a write error // simulate a write error
stream.WriteUnit(desc.Medias[0], desc.Medias[0].Formats[0], &unit.H264{ strm.WriteUnit(desc.Medias[0], desc.Medias[0].Formats[0], &unit.H264{
Base: unit.Base{ Base: unit.Base{
PTS: 0, PTS: 0,
}, },
@ -295,7 +296,7 @@ func TestRecorder(t *testing.T) {
time.Sleep(50 * time.Millisecond) time.Sleep(50 * time.Millisecond)
writeToStream(stream, writeToStream(strm,
300*90000, 300*90000,
time.Date(2010, 5, 20, 22, 15, 25, 0, time.UTC)) time.Date(2010, 5, 20, 22, 15, 25, 0, time.UTC))
@ -337,15 +338,16 @@ func TestRecorderFMP4NegativeDTS(t *testing.T) {
}, },
}} }}
stream, err := stream.New( strm := &stream.Stream{
512, WriteQueueSize: 512,
1460, UDPMaxPayloadSize: 1472,
desc, Desc: desc,
true, GenerateRTPPackets: true,
test.NilLogger, DecodeErrLogger: test.NilLogger,
) }
err := strm.Initialize()
require.NoError(t, err) require.NoError(t, err)
defer stream.Close() defer strm.Close()
dir, err := os.MkdirTemp("", "mediamtx-agent") dir, err := os.MkdirTemp("", "mediamtx-agent")
require.NoError(t, err) require.NoError(t, err)
@ -359,13 +361,13 @@ func TestRecorderFMP4NegativeDTS(t *testing.T) {
PartDuration: 100 * time.Millisecond, PartDuration: 100 * time.Millisecond,
SegmentDuration: 1 * time.Second, SegmentDuration: 1 * time.Second,
PathName: "mypath", PathName: "mypath",
Stream: stream, Stream: strm,
Parent: test.NilLogger, Parent: test.NilLogger,
} }
w.Initialize() w.Initialize()
for i := 0; i < 3; i++ { for i := 0; i < 3; i++ {
stream.WriteUnit(desc.Medias[0], desc.Medias[0].Formats[0], &unit.H264{ strm.WriteUnit(desc.Medias[0], desc.Medias[0].Formats[0], &unit.H264{
Base: unit.Base{ Base: unit.Base{
PTS: -50*90000/1000 + (int64(i) * 200 * 90000 / 1000), PTS: -50*90000/1000 + (int64(i) * 200 * 90000 / 1000),
NTP: time.Date(2008, 5, 20, 22, 15, 25, 0, time.UTC), NTP: time.Date(2008, 5, 20, 22, 15, 25, 0, time.UTC),
@ -377,7 +379,7 @@ func TestRecorderFMP4NegativeDTS(t *testing.T) {
}, },
}) })
stream.WriteUnit(desc.Medias[1], desc.Medias[1].Formats[0], &unit.MPEG4Audio{ strm.WriteUnit(desc.Medias[1], desc.Medias[1].Formats[0], &unit.MPEG4Audio{
Base: unit.Base{ Base: unit.Base{
PTS: -100*44100/1000 + (int64(i) * 200 * 44100 / 1000), PTS: -100*44100/1000 + (int64(i) * 200 * 44100 / 1000),
}, },
@ -424,15 +426,16 @@ func TestRecorderSkipTracksPartial(t *testing.T) {
}, },
}} }}
stream, err := stream.New( strm := &stream.Stream{
512, WriteQueueSize: 512,
1460, UDPMaxPayloadSize: 1472,
desc, Desc: desc,
true, GenerateRTPPackets: true,
test.NilLogger, DecodeErrLogger: test.NilLogger,
) }
err := strm.Initialize()
require.NoError(t, err) require.NoError(t, err)
defer stream.Close() defer strm.Close()
dir, err := os.MkdirTemp("", "mediamtx-agent") dir, err := os.MkdirTemp("", "mediamtx-agent")
require.NoError(t, err) require.NoError(t, err)
@ -463,7 +466,7 @@ func TestRecorderSkipTracksPartial(t *testing.T) {
PartDuration: 100 * time.Millisecond, PartDuration: 100 * time.Millisecond,
SegmentDuration: 1 * time.Second, SegmentDuration: 1 * time.Second,
PathName: "mypath", PathName: "mypath",
Stream: stream, Stream: strm,
Parent: l, Parent: l,
} }
w.Initialize() w.Initialize()
@ -484,15 +487,16 @@ func TestRecorderSkipTracksFull(t *testing.T) {
}, },
}} }}
stream, err := stream.New( strm := &stream.Stream{
512, WriteQueueSize: 512,
1460, UDPMaxPayloadSize: 1472,
desc, Desc: desc,
true, GenerateRTPPackets: true,
test.NilLogger, DecodeErrLogger: test.NilLogger,
) }
err := strm.Initialize()
require.NoError(t, err) require.NoError(t, err)
defer stream.Close() defer strm.Close()
dir, err := os.MkdirTemp("", "mediamtx-agent") dir, err := os.MkdirTemp("", "mediamtx-agent")
require.NoError(t, err) require.NoError(t, err)
@ -523,7 +527,7 @@ func TestRecorderSkipTracksFull(t *testing.T) {
PartDuration: 100 * time.Millisecond, PartDuration: 100 * time.Millisecond,
SegmentDuration: 1 * time.Second, SegmentDuration: 1 * time.Second,
PathName: "mypath", PathName: "mypath",
Stream: stream, Stream: strm,
Parent: l, Parent: l,
} }
w.Initialize() w.Initialize()

View file

@ -154,13 +154,14 @@ func TestServerRead(t *testing.T) {
t.Run("always remux off", func(t *testing.T) { t.Run("always remux off", func(t *testing.T) {
desc := &description.Session{Medias: []*description.Media{test.MediaH264}} desc := &description.Session{Medias: []*description.Media{test.MediaH264}}
str, err := stream.New( strm := &stream.Stream{
512, WriteQueueSize: 512,
1460, UDPMaxPayloadSize: 1472,
desc, Desc: desc,
true, GenerateRTPPackets: true,
test.NilLogger, DecodeErrLogger: test.NilLogger,
) }
err := strm.Initialize()
require.NoError(t, err) require.NoError(t, err)
pm := &test.PathManager{ pm := &test.PathManager{
@ -174,7 +175,7 @@ func TestServerRead(t *testing.T) {
AddReaderImpl: func(req defs.PathAddReaderReq) (defs.Path, *stream.Stream, error) { AddReaderImpl: func(req defs.PathAddReaderReq) (defs.Path, *stream.Stream, error) {
require.Equal(t, "teststream", req.AccessRequest.Name) require.Equal(t, "teststream", req.AccessRequest.Name)
require.Equal(t, "param=value", req.AccessRequest.Query) require.Equal(t, "param=value", req.AccessRequest.Query)
return &dummyPath{}, str, nil return &dummyPath{}, strm, nil
}, },
} }
@ -235,7 +236,7 @@ func TestServerRead(t *testing.T) {
go func() { go func() {
time.Sleep(100 * time.Millisecond) time.Sleep(100 * time.Millisecond)
for i := 0; i < 4; i++ { for i := 0; i < 4; i++ {
str.WriteUnit(test.MediaH264, test.FormatH264, &unit.H264{ strm.WriteUnit(test.MediaH264, test.FormatH264, &unit.H264{
Base: unit.Base{ Base: unit.Base{
NTP: time.Time{}, NTP: time.Time{},
PTS: int64(i) * 90000, PTS: int64(i) * 90000,
@ -253,13 +254,14 @@ func TestServerRead(t *testing.T) {
t.Run("always remux on", func(t *testing.T) { t.Run("always remux on", func(t *testing.T) {
desc := &description.Session{Medias: []*description.Media{test.MediaH264}} desc := &description.Session{Medias: []*description.Media{test.MediaH264}}
str, err := stream.New( strm := &stream.Stream{
512, WriteQueueSize: 512,
1460, UDPMaxPayloadSize: 1472,
desc, Desc: desc,
true, GenerateRTPPackets: true,
test.NilLogger, DecodeErrLogger: test.NilLogger,
) }
err := strm.Initialize()
require.NoError(t, err) require.NoError(t, err)
pm := &test.PathManager{ pm := &test.PathManager{
@ -273,7 +275,7 @@ func TestServerRead(t *testing.T) {
AddReaderImpl: func(req defs.PathAddReaderReq) (defs.Path, *stream.Stream, error) { AddReaderImpl: func(req defs.PathAddReaderReq) (defs.Path, *stream.Stream, error) {
require.Equal(t, "teststream", req.AccessRequest.Name) require.Equal(t, "teststream", req.AccessRequest.Name)
require.Equal(t, "", req.AccessRequest.Query) require.Equal(t, "", req.AccessRequest.Query)
return &dummyPath{}, str, nil return &dummyPath{}, strm, nil
}, },
} }
@ -301,10 +303,10 @@ func TestServerRead(t *testing.T) {
s.PathReady(&dummyPath{}) s.PathReady(&dummyPath{})
str.WaitRunningReader() strm.WaitRunningReader()
for i := 0; i < 4; i++ { for i := 0; i < 4; i++ {
str.WriteUnit(test.MediaH264, test.FormatH264, &unit.H264{ strm.WriteUnit(test.MediaH264, test.FormatH264, &unit.H264{
Base: unit.Base{ Base: unit.Base{
NTP: time.Time{}, NTP: time.Time{},
PTS: int64(i) * 90000, PTS: int64(i) * 90000,
@ -357,18 +359,19 @@ func TestDirectory(t *testing.T) {
desc := &description.Session{Medias: []*description.Media{test.MediaH264}} desc := &description.Session{Medias: []*description.Media{test.MediaH264}}
str, err := stream.New( strm := &stream.Stream{
512, WriteQueueSize: 512,
1460, UDPMaxPayloadSize: 1472,
desc, Desc: desc,
true, GenerateRTPPackets: true,
test.NilLogger, DecodeErrLogger: test.NilLogger,
) }
err = strm.Initialize()
require.NoError(t, err) require.NoError(t, err)
pm := &test.PathManager{ pm := &test.PathManager{
AddReaderImpl: func(_ defs.PathAddReaderReq) (defs.Path, *stream.Stream, error) { AddReaderImpl: func(_ defs.PathAddReaderReq) (defs.Path, *stream.Stream, error) {
return &dummyPath{}, str, nil return &dummyPath{}, strm, nil
}, },
} }

View file

@ -100,8 +100,12 @@ func (s *Server) Initialize() error {
return net.Listen(restrictnetwork.Restrict("tcp", s.Address)) return net.Listen(restrictnetwork.Restrict("tcp", s.Address))
} }
var err error s.loader = &certloader.CertLoader{
s.loader, err = certloader.New(s.ServerCert, s.ServerKey, s.Parent) CertPath: s.ServerCert,
KeyPath: s.ServerKey,
Parent: s.Parent,
}
err := s.loader.Initialize()
if err != nil { if err != nil {
return nil, err return nil, err
} }

View file

@ -38,17 +38,18 @@ func (p *dummyPath) ExternalCmdEnv() externalcmd.Environment {
} }
func (p *dummyPath) StartPublisher(req defs.PathStartPublisherReq) (*stream.Stream, error) { func (p *dummyPath) StartPublisher(req defs.PathStartPublisherReq) (*stream.Stream, error) {
var err error p.stream = &stream.Stream{
p.stream, err = stream.New( WriteQueueSize: 512,
512, UDPMaxPayloadSize: 1472,
1460, Desc: req.Desc,
req.Desc, GenerateRTPPackets: true,
true, DecodeErrLogger: test.NilLogger,
test.NilLogger, }
) err := p.stream.Initialize()
if err != nil { if err != nil {
return nil, err return nil, err
} }
close(p.streamCreated) close(p.streamCreated)
return p.stream, nil return p.stream, nil
} }
@ -147,8 +148,8 @@ func TestServerPublish(t *testing.T) {
path.stream.AddReader( path.stream.AddReader(
reader, reader,
path.stream.Desc().Medias[0], path.stream.Desc.Medias[0],
path.stream.Desc().Medias[0].Formats[0], path.stream.Desc.Medias[0].Formats[0],
func(u unit.Unit) error { func(u unit.Unit) error {
require.Equal(t, [][]byte{ require.Equal(t, [][]byte{
test.FormatH264.SPS, test.FormatH264.SPS,
@ -194,16 +195,17 @@ func TestServerRead(t *testing.T) {
} }
desc := &description.Session{Medias: []*description.Media{test.MediaH264}} desc := &description.Session{Medias: []*description.Media{test.MediaH264}}
str, err := stream.New( strm := &stream.Stream{
512, WriteQueueSize: 512,
1460, UDPMaxPayloadSize: 1472,
desc, Desc: desc,
true, GenerateRTPPackets: true,
test.NilLogger, DecodeErrLogger: test.NilLogger,
) }
err := strm.Initialize()
require.NoError(t, err) require.NoError(t, err)
path := &dummyPath{stream: str} path := &dummyPath{stream: strm}
pathManager := &test.PathManager{ pathManager := &test.PathManager{
AddReaderImpl: func(req defs.PathAddReaderReq) (defs.Path, *stream.Stream, error) { AddReaderImpl: func(req defs.PathAddReaderReq) (defs.Path, *stream.Stream, error) {
@ -247,9 +249,9 @@ func TestServerRead(t *testing.T) {
defer nconn.Close() defer nconn.Close()
go func() { go func() {
str.WaitRunningReader() strm.WaitRunningReader()
str.WriteUnit(desc.Medias[0], desc.Medias[0].Formats[0], &unit.H264{ strm.WriteUnit(desc.Medias[0], desc.Medias[0].Formats[0], &unit.H264{
Base: unit.Base{ Base: unit.Base{
NTP: time.Time{}, NTP: time.Time{},
}, },
@ -258,7 +260,7 @@ func TestServerRead(t *testing.T) {
}, },
}) })
str.WriteUnit(desc.Medias[0], desc.Medias[0].Formats[0], &unit.H264{ strm.WriteUnit(desc.Medias[0], desc.Medias[0].Formats[0], &unit.H264{
Base: unit.Base{ Base: unit.Base{
NTP: time.Time{}, NTP: time.Time{},
PTS: 2 * 90000, PTS: 2 * 90000,
@ -268,7 +270,7 @@ func TestServerRead(t *testing.T) {
}, },
}) })
str.WriteUnit(desc.Medias[0], desc.Medias[0].Formats[0], &unit.H264{ strm.WriteUnit(desc.Medias[0], desc.Medias[0].Formats[0], &unit.H264{
Base: unit.Base{ Base: unit.Base{
NTP: time.Time{}, NTP: time.Time{},
PTS: 3 * 90000, PTS: 3 * 90000,

View file

@ -121,8 +121,12 @@ func (s *Server) Initialize() error {
} }
if s.IsTLS { if s.IsTLS {
var err error s.loader = &certloader.CertLoader{
s.loader, err = certloader.New(s.ServerCert, s.ServerKey, s.Parent) CertPath: s.ServerCert,
KeyPath: s.ServerKey,
Parent: s.Parent,
}
err := s.loader.Initialize()
if err != nil { if err != nil {
return err return err
} }

View file

@ -38,20 +38,19 @@ func (p *dummyPath) ExternalCmdEnv() externalcmd.Environment {
} }
func (p *dummyPath) StartPublisher(req defs.PathStartPublisherReq) (*stream.Stream, error) { func (p *dummyPath) StartPublisher(req defs.PathStartPublisherReq) (*stream.Stream, error) {
var err error p.stream = &stream.Stream{
p.stream, err = stream.New( WriteQueueSize: 512,
512, UDPMaxPayloadSize: 1472,
1460, Desc: req.Desc,
req.Desc, GenerateRTPPackets: true,
true, DecodeErrLogger: test.NilLogger,
test.NilLogger, }
) err := p.stream.Initialize()
if err != nil { if err != nil {
return nil, err return nil, err
} }
close(p.streamCreated) close(p.streamCreated)
return p.stream, nil return p.stream, nil
} }
@ -114,8 +113,8 @@ func TestServerPublish(t *testing.T) {
path.stream.AddReader( path.stream.AddReader(
reader, reader,
path.stream.Desc().Medias[0], path.stream.Desc.Medias[0],
path.stream.Desc().Medias[0].Formats[0], path.stream.Desc.Medias[0].Formats[0],
func(u unit.Unit) error { func(u unit.Unit) error {
require.Equal(t, [][]byte{ require.Equal(t, [][]byte{
test.FormatH264.SPS, test.FormatH264.SPS,
@ -148,16 +147,17 @@ func TestServerPublish(t *testing.T) {
func TestServerRead(t *testing.T) { func TestServerRead(t *testing.T) {
desc := &description.Session{Medias: []*description.Media{test.MediaH264}} desc := &description.Session{Medias: []*description.Media{test.MediaH264}}
str, err := stream.New( strm := &stream.Stream{
512, WriteQueueSize: 512,
1460, UDPMaxPayloadSize: 1472,
desc, Desc: desc,
true, GenerateRTPPackets: true,
test.NilLogger, DecodeErrLogger: test.NilLogger,
) }
err := strm.Initialize()
require.NoError(t, err) require.NoError(t, err)
path := &dummyPath{stream: str} path := &dummyPath{stream: strm}
pathManager := &test.PathManager{ pathManager := &test.PathManager{
DescribeImpl: func(req defs.PathDescribeReq) defs.PathDescribeRes { DescribeImpl: func(req defs.PathDescribeReq) defs.PathDescribeRes {
@ -240,7 +240,7 @@ func TestServerRead(t *testing.T) {
_, err = reader.Play(nil) _, err = reader.Play(nil)
require.NoError(t, err) require.NoError(t, err)
str.WriteUnit(desc.Medias[0], desc.Medias[0].Formats[0], &unit.H264{ strm.WriteUnit(desc.Medias[0], desc.Medias[0].Formats[0], &unit.H264{
Base: unit.Base{ Base: unit.Base{
NTP: time.Time{}, NTP: time.Time{},
}, },
@ -257,16 +257,17 @@ func TestServerRedirect(t *testing.T) {
t.Run(ca, func(t *testing.T) { t.Run(ca, func(t *testing.T) {
desc := &description.Session{Medias: []*description.Media{test.MediaH264}} desc := &description.Session{Medias: []*description.Media{test.MediaH264}}
str, err := stream.New( strm := &stream.Stream{
512, WriteQueueSize: 512,
1460, UDPMaxPayloadSize: 1472,
desc, Desc: desc,
true, GenerateRTPPackets: true,
test.NilLogger, DecodeErrLogger: test.NilLogger,
) }
err := strm.Initialize()
require.NoError(t, err) require.NoError(t, err)
path := &dummyPath{stream: str} path := &dummyPath{stream: strm}
pathManager := &test.PathManager{ pathManager := &test.PathManager{
DescribeImpl: func(req defs.PathDescribeReq) defs.PathDescribeRes { DescribeImpl: func(req defs.PathDescribeReq) defs.PathDescribeRes {

View file

@ -35,17 +35,18 @@ func (p *dummyPath) ExternalCmdEnv() externalcmd.Environment {
} }
func (p *dummyPath) StartPublisher(req defs.PathStartPublisherReq) (*stream.Stream, error) { func (p *dummyPath) StartPublisher(req defs.PathStartPublisherReq) (*stream.Stream, error) {
var err error p.stream = &stream.Stream{
p.stream, err = stream.New( WriteQueueSize: 512,
512, UDPMaxPayloadSize: 1472,
1460, Desc: req.Desc,
req.Desc, GenerateRTPPackets: true,
true, DecodeErrLogger: test.NilLogger,
test.NilLogger, }
) err := p.stream.Initialize()
if err != nil { if err != nil {
return nil, err return nil, err
} }
close(p.streamCreated) close(p.streamCreated)
return p.stream, nil return p.stream, nil
} }
@ -60,7 +61,9 @@ func (p *dummyPath) RemoveReader(_ defs.PathRemoveReaderReq) {
} }
func TestServerPublish(t *testing.T) { func TestServerPublish(t *testing.T) {
externalCmdPool := externalcmd.NewPool() externalCmdPool := &externalcmd.Pool{}
err := externalCmdPool.Initialize()
require.NoError(t, err)
defer externalCmdPool.Close() defer externalCmdPool.Close()
path := &dummyPath{ path := &dummyPath{
@ -90,7 +93,7 @@ func TestServerPublish(t *testing.T) {
PathManager: pathManager, PathManager: pathManager,
Parent: test.NilLogger, Parent: test.NilLogger,
} }
err := s.Initialize() err = s.Initialize()
require.NoError(t, err) require.NoError(t, err)
defer s.Close() defer s.Close()
@ -133,8 +136,8 @@ func TestServerPublish(t *testing.T) {
path.stream.AddReader( path.stream.AddReader(
reader, reader,
path.stream.Desc().Medias[0], path.stream.Desc.Medias[0],
path.stream.Desc().Medias[0].Formats[0], path.stream.Desc.Medias[0].Formats[0],
func(u unit.Unit) error { func(u unit.Unit) error {
require.Equal(t, [][]byte{ require.Equal(t, [][]byte{
test.FormatH264.SPS, test.FormatH264.SPS,
@ -160,21 +163,24 @@ func TestServerPublish(t *testing.T) {
} }
func TestServerRead(t *testing.T) { func TestServerRead(t *testing.T) {
externalCmdPool := externalcmd.NewPool() externalCmdPool := &externalcmd.Pool{}
err := externalCmdPool.Initialize()
require.NoError(t, err)
defer externalCmdPool.Close() defer externalCmdPool.Close()
desc := &description.Session{Medias: []*description.Media{test.MediaH264}} desc := &description.Session{Medias: []*description.Media{test.MediaH264}}
str, err := stream.New( strm := &stream.Stream{
512, WriteQueueSize: 512,
1460, UDPMaxPayloadSize: 1472,
desc, Desc: desc,
true, GenerateRTPPackets: true,
test.NilLogger, DecodeErrLogger: test.NilLogger,
) }
err = strm.Initialize()
require.NoError(t, err) require.NoError(t, err)
path := &dummyPath{stream: str} path := &dummyPath{stream: strm}
pathManager := &test.PathManager{ pathManager := &test.PathManager{
AddReaderImpl: func(req defs.PathAddReaderReq) (defs.Path, *stream.Stream, error) { AddReaderImpl: func(req defs.PathAddReaderReq) (defs.Path, *stream.Stream, error) {
@ -216,9 +222,9 @@ func TestServerRead(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
defer reader.Close() defer reader.Close()
str.WaitRunningReader() strm.WaitRunningReader()
str.WriteUnit(desc.Medias[0], desc.Medias[0].Formats[0], &unit.H264{ strm.WriteUnit(desc.Medias[0], desc.Medias[0].Formats[0], &unit.H264{
Base: unit.Base{ Base: unit.Base{
NTP: time.Time{}, NTP: time.Time{},
}, },
@ -249,7 +255,7 @@ func TestServerRead(t *testing.T) {
return nil return nil
}) })
str.WriteUnit(desc.Medias[0], desc.Medias[0].Formats[0], &unit.H264{ strm.WriteUnit(desc.Medias[0], desc.Medias[0].Formats[0], &unit.H264{
Base: unit.Base{ Base: unit.Base{
NTP: time.Time{}, NTP: time.Time{},
}, },

View file

@ -52,17 +52,18 @@ func (p *dummyPath) ExternalCmdEnv() externalcmd.Environment {
} }
func (p *dummyPath) StartPublisher(req defs.PathStartPublisherReq) (*stream.Stream, error) { func (p *dummyPath) StartPublisher(req defs.PathStartPublisherReq) (*stream.Stream, error) {
var err error p.stream = &stream.Stream{
p.stream, err = stream.New( WriteQueueSize: 512,
512, UDPMaxPayloadSize: 1472,
1460, Desc: req.Desc,
req.Desc, GenerateRTPPackets: true,
true, DecodeErrLogger: test.NilLogger,
test.NilLogger, }
) err := p.stream.Initialize()
if err != nil { if err != nil {
return nil, err return nil, err
} }
close(p.streamCreated) close(p.streamCreated)
return p.stream, nil return p.stream, nil
} }
@ -316,8 +317,8 @@ func TestServerPublish(t *testing.T) {
path.stream.AddReader( path.stream.AddReader(
reader, reader,
path.stream.Desc().Medias[0], path.stream.Desc.Medias[0],
path.stream.Desc().Medias[0].Formats[0], path.stream.Desc.Medias[0].Formats[0],
func(u unit.Unit) error { func(u unit.Unit) error {
select { select {
case <-recv: case <-recv:
@ -505,16 +506,17 @@ func TestServerRead(t *testing.T) {
t.Run(ca.name, func(t *testing.T) { t.Run(ca.name, func(t *testing.T) {
desc := &description.Session{Medias: ca.medias} desc := &description.Session{Medias: ca.medias}
str, err := stream.New( strm := &stream.Stream{
512, WriteQueueSize: 512,
1460, UDPMaxPayloadSize: 1472,
desc, Desc: desc,
reflect.TypeOf(ca.unit) != reflect.TypeOf(&unit.Generic{}), GenerateRTPPackets: reflect.TypeOf(ca.unit) != reflect.TypeOf(&unit.Generic{}),
test.NilLogger, DecodeErrLogger: test.NilLogger,
) }
err := strm.Initialize()
require.NoError(t, err) require.NoError(t, err)
path := &dummyPath{stream: str} path := &dummyPath{stream: strm}
pathManager := &test.PathManager{ pathManager := &test.PathManager{
FindPathConfImpl: func(req defs.PathFindPathConfReq) (*conf.Path, error) { FindPathConfImpl: func(req defs.PathFindPathConfReq) (*conf.Path, error) {
@ -529,7 +531,7 @@ func TestServerRead(t *testing.T) {
require.Equal(t, "param=value", req.AccessRequest.Query) require.Equal(t, "param=value", req.AccessRequest.Query)
require.Equal(t, "myuser", req.AccessRequest.User) require.Equal(t, "myuser", req.AccessRequest.User)
require.Equal(t, "mypass", req.AccessRequest.Pass) require.Equal(t, "mypass", req.AccessRequest.Pass)
return path, str, nil return path, strm, nil
}, },
} }
@ -576,16 +578,16 @@ func TestServerRead(t *testing.T) {
go func() { go func() {
defer close(writerDone) defer close(writerDone)
str.WaitRunningReader() strm.WaitRunningReader()
r := reflect.New(reflect.TypeOf(ca.unit).Elem()) r := reflect.New(reflect.TypeOf(ca.unit).Elem())
r.Elem().Set(reflect.ValueOf(ca.unit).Elem()) r.Elem().Set(reflect.ValueOf(ca.unit).Elem())
if g, ok := r.Interface().(*unit.Generic); ok { if g, ok := r.Interface().(*unit.Generic); ok {
clone := *g.RTPPackets[0] clone := *g.RTPPackets[0]
str.WriteRTPPacket(desc.Medias[0], desc.Medias[0].Formats[0], &clone, time.Time{}, 0) strm.WriteRTPPacket(desc.Medias[0], desc.Medias[0].Formats[0], &clone, time.Time{}, 0)
} else { } else {
str.WriteUnit(desc.Medias[0], desc.Medias[0].Formats[0], r.Interface().(unit.Unit)) strm.WriteUnit(desc.Medias[0], desc.Medias[0].Formats[0], r.Interface().(unit.Unit))
} }
}() }()

View file

@ -26,8 +26,11 @@ type ReadFunc func(unit.Unit) error
// Stream is a media stream. // Stream is a media stream.
// It stores tracks, readers and allows to write data to readers. // It stores tracks, readers and allows to write data to readers.
type Stream struct { type Stream struct {
writeQueueSize int WriteQueueSize int
desc *description.Session UDPMaxPayloadSize int
Desc *description.Session
GenerateRTPPackets bool
DecodeErrLogger logger.Writer
bytesReceived *uint64 bytesReceived *uint64
bytesSent *uint64 bytesSent *uint64
@ -40,34 +43,28 @@ type Stream struct {
readerRunning chan struct{} readerRunning chan struct{}
} }
// New allocates a Stream. // Initialize initializes a Stream.
func New( func (s *Stream) Initialize() error {
writeQueueSize int, s.bytesReceived = new(uint64)
udpMaxPayloadSize int, s.bytesSent = new(uint64)
desc *description.Session,
generateRTPPackets bool,
decodeErrLogger logger.Writer,
) (*Stream, error) {
s := &Stream{
writeQueueSize: writeQueueSize,
desc: desc,
bytesReceived: new(uint64),
bytesSent: new(uint64),
}
s.streamMedias = make(map[*description.Media]*streamMedia) s.streamMedias = make(map[*description.Media]*streamMedia)
s.streamReaders = make(map[Reader]*streamReader) s.streamReaders = make(map[Reader]*streamReader)
s.readerRunning = make(chan struct{}) s.readerRunning = make(chan struct{})
for _, media := range desc.Medias { for _, media := range s.Desc.Medias {
var err error s.streamMedias[media] = &streamMedia{
s.streamMedias[media], err = newStreamMedia(udpMaxPayloadSize, media, generateRTPPackets, decodeErrLogger) UDPMaxPayloadSize: s.UDPMaxPayloadSize,
Media: media,
GenerateRTPPackets: s.GenerateRTPPackets,
DecodeErrLogger: s.DecodeErrLogger,
}
err := s.streamMedias[media].initialize()
if err != nil { if err != nil {
return nil, err return err
} }
} }
return s, nil return nil
} }
// Close closes all resources of the stream. // Close closes all resources of the stream.
@ -80,11 +77,6 @@ func (s *Stream) Close() {
} }
} }
// Desc returns the description of the stream.
func (s *Stream) Desc() *description.Session {
return s.desc
}
// BytesReceived returns received bytes. // BytesReceived returns received bytes.
func (s *Stream) BytesReceived() uint64 { func (s *Stream) BytesReceived() uint64 {
return atomic.LoadUint64(s.bytesReceived) return atomic.LoadUint64(s.bytesReceived)
@ -113,7 +105,7 @@ func (s *Stream) RTSPStream(server *gortsplib.Server) *gortsplib.ServerStream {
defer s.mutex.Unlock() defer s.mutex.Unlock()
if s.rtspStream == nil { if s.rtspStream == nil {
s.rtspStream = gortsplib.NewServerStream(server, s.desc) s.rtspStream = gortsplib.NewServerStream(server, s.Desc)
} }
return s.rtspStream return s.rtspStream
} }
@ -124,7 +116,7 @@ func (s *Stream) RTSPSStream(server *gortsplib.Server) *gortsplib.ServerStream {
defer s.mutex.Unlock() defer s.mutex.Unlock()
if s.rtspsStream == nil { if s.rtspsStream == nil {
s.rtspsStream = gortsplib.NewServerStream(server, s.desc) s.rtspsStream = gortsplib.NewServerStream(server, s.Desc)
} }
return s.rtspsStream return s.rtspsStream
} }
@ -138,7 +130,7 @@ func (s *Stream) AddReader(reader Reader, medi *description.Media, forma format.
sr, ok := s.streamReaders[reader] sr, ok := s.streamReaders[reader]
if !ok { if !ok {
sr = &streamReader{ sr = &streamReader{
queueSize: s.writeQueueSize, queueSize: s.WriteQueueSize,
parent: reader, parent: reader,
} }
sr.initialize() sr.initialize()

View file

@ -8,31 +8,30 @@ import (
) )
type streamMedia struct { type streamMedia struct {
UDPMaxPayloadSize int
Media *description.Media
GenerateRTPPackets bool
DecodeErrLogger logger.Writer
formats map[format.Format]*streamFormat formats map[format.Format]*streamFormat
} }
func newStreamMedia(udpMaxPayloadSize int, func (sm *streamMedia) initialize() error {
medi *description.Media, sm.formats = make(map[format.Format]*streamFormat)
generateRTPPackets bool,
decodeErrLogger logger.Writer,
) (*streamMedia, error) {
sm := &streamMedia{
formats: make(map[format.Format]*streamFormat),
}
for _, forma := range medi.Formats { for _, forma := range sm.Media.Formats {
sf := &streamFormat{ sf := &streamFormat{
udpMaxPayloadSize: udpMaxPayloadSize, udpMaxPayloadSize: sm.UDPMaxPayloadSize,
format: forma, format: forma,
generateRTPPackets: generateRTPPackets, generateRTPPackets: sm.GenerateRTPPackets,
decodeErrLogger: decodeErrLogger, decodeErrLogger: sm.DecodeErrLogger,
} }
err := sf.initialize() err := sf.initialize()
if err != nil { if err != nil {
return nil, err return err
} }
sm.formats[forma] = sf sm.formats[forma] = sf
} }
return sm, nil return nil
} }

View file

@ -64,13 +64,17 @@ func (t *SourceTester) Log(_ logger.Level, _ string, _ ...interface{}) {
// SetReady implements StaticSourceParent. // SetReady implements StaticSourceParent.
func (t *SourceTester) SetReady(req defs.PathSourceStaticSetReadyReq) defs.PathSourceStaticSetReadyRes { func (t *SourceTester) SetReady(req defs.PathSourceStaticSetReadyReq) defs.PathSourceStaticSetReadyRes {
t.stream, _ = stream.New( t.stream = &stream.Stream{
512, WriteQueueSize: 512,
1460, UDPMaxPayloadSize: 1472,
req.Desc, Desc: req.Desc,
req.GenerateRTPPackets, GenerateRTPPackets: req.GenerateRTPPackets,
t, DecodeErrLogger: t,
) }
err := t.stream.Initialize()
if err != nil {
panic(err)
}
t.reader = NilLogger t.reader = NilLogger