rtsp: call rtsp.ToStream before advertising the stream (#5288)

This commit is contained in:
Alessandro Ros 2025-12-24 20:37:01 +01:00 committed by GitHub
parent 1ad48c73dc
commit 55f93eddde
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
15 changed files with 97 additions and 90 deletions

View file

@ -842,7 +842,7 @@ func TestPathFallback(t *testing.T) {
}
func TestPathResolveSource(t *testing.T) {
var stream *gortsplib.ServerStream
var strm *gortsplib.ServerStream
s := gortsplib.Server{
Handler: &testServer{
@ -852,12 +852,12 @@ func TestPathResolveSource(t *testing.T) {
require.Equal(t, "/a", ctx.Path)
return &base.Response{
StatusCode: base.StatusOK,
}, stream, nil
}, strm, nil
},
onSetup: func(_ *gortsplib.ServerHandlerOnSetupCtx) (*base.Response, *gortsplib.ServerStream, error) {
return &base.Response{
StatusCode: base.StatusOK,
}, stream, nil
}, strm, nil
},
onPlay: func(_ *gortsplib.ServerHandlerOnPlayCtx) (*base.Response, error) {
return &base.Response{
@ -872,13 +872,13 @@ func TestPathResolveSource(t *testing.T) {
require.NoError(t, err)
defer s.Close()
stream = &gortsplib.ServerStream{
strm = &gortsplib.ServerStream{
Server: &s,
Desc: &description.Session{Medias: []*description.Media{test.MediaH264}},
}
err = stream.Initialize()
err = strm.Initialize()
require.NoError(t, err)
defer stream.Close()
defer strm.Close()
p, ok := newInstance(
"paths:\n" +

View file

@ -32,7 +32,7 @@ func ToStream(
source rtspSource,
medias []*description.Media,
pathConf *conf.Path,
strm *stream.Stream,
strm **stream.Stream,
log logger.Writer,
) {
for _, medi := range medias {
@ -82,7 +82,7 @@ func ToStream(
return
}
strm.WriteRTPPacket(cmedi, cforma, pkt, ntp, pts)
(*strm).WriteRTPPacket(cmedi, cforma, pkt, ntp, pts)
})
}
}

View file

@ -405,8 +405,8 @@ func TestToStream(t *testing.T) {
err = pc2.GatherIncomingTracks()
require.NoError(t, err)
var stream *stream.Stream
medias, err := ToStream(pc2, &conf.Path{}, &stream, nil)
var strm *stream.Stream
medias, err := ToStream(pc2, &conf.Path{}, &strm, nil)
require.NoError(t, err)
require.Equal(t, ca.out, medias[0].Formats[0])
})

View file

@ -229,15 +229,15 @@ func (c *conn) runPublish() error {
return err
}
var stream *stream.Stream
var strm *stream.Stream
medias, err := rtmp.ToStream(r, &stream)
medias, err := rtmp.ToStream(r, &strm)
if err != nil {
return err
}
var path defs.Path
path, stream, err = c.pathManager.AddPublisher(defs.PathAddPublisherReq{
path, strm, err = c.pathManager.AddPublisher(defs.PathAddPublisherReq{
Author: c,
Desc: &description.Session{Medias: medias},
GenerateRTPPackets: true,

View file

@ -193,16 +193,16 @@ func (c *conn) onDescribe(ctx *gortsplib.ServerHandlerOnDescribeCtx,
}, nil, nil
}
var stream *gortsplib.ServerStream
var strm *gortsplib.ServerStream
if !c.isTLS {
stream = res.Stream.RTSPStream(c.rserver)
strm = res.Stream.RTSPStream(c.rserver)
} else {
stream = res.Stream.RTSPSStream(c.rserver)
strm = res.Stream.RTSPSStream(c.rserver)
}
return &base.Response{
StatusCode: base.StatusOK,
}, stream, nil
}, strm, nil
}
func (c *conn) handleAuthError(err *auth.Error) (*base.Response, error) {

View file

@ -331,16 +331,16 @@ func (s *session) onRecord(_ *gortsplib.ServerHandlerOnRecordCtx) (*base.Respons
}, err
}
s.path = path
s.stream = stream
rtsp.ToStream(
s.rsession,
s.rsession.AnnouncedDescription().Medias,
s.path.SafeConf(),
stream,
path.SafeConf(),
&s.stream,
s)
s.path = path
s.stream = stream
return &base.Response{
StatusCode: base.StatusOK,
}, nil

View file

@ -213,15 +213,15 @@ func (c *conn) runPublishReader(sconn srt.Conn, streamID *streamID, pathConf *co
decodeErrors.Increase()
})
var stream *stream.Stream
var strm *stream.Stream
medias, err := mpegts.ToStream(r, &stream, c)
medias, err := mpegts.ToStream(r, &strm, c)
if err != nil {
return err
}
var path defs.Path
path, stream, err = c.pathManager.AddPublisher(defs.PathAddPublisherReq{
path, strm, err = c.pathManager.AddPublisher(defs.PathAddPublisherReq{
Author: c,
Desc: &description.Session{Medias: medias},
GenerateRTPPackets: true,

View file

@ -233,15 +233,15 @@ func (s *session) runPublish() (int, error) {
return 0, err
}
var stream *stream.Stream
var strm *stream.Stream
medias, err := webrtc.ToStream(pc, pathConf, &stream, s)
medias, err := webrtc.ToStream(pc, pathConf, &strm, s)
if err != nil {
return 0, err
}
var path defs.Path
path, stream, err = s.pathManager.AddPublisher(defs.PathAddPublisherReq{
path, strm, err = s.pathManager.AddPublisher(defs.PathAddPublisherReq{
Author: s,
Desc: &description.Session{Medias: medias},
GenerateRTPPackets: false,

View file

@ -37,10 +37,10 @@ func (s *Source) Log(level logger.Level, format string, args ...any) {
// Run implements StaticSource.
func (s *Source) Run(params defs.StaticSourceRunParams) error {
var stream *stream.Stream
var strm *stream.Stream
defer func() {
if stream != nil {
if strm != nil {
s.Parent.SetNotReady(defs.PathSourceStaticSetNotReadyReq{})
}
}()
@ -94,7 +94,7 @@ func (s *Source) Run(params defs.StaticSourceRunParams) error {
decodeErrors.Increase()
},
OnTracks: func(tracks []*gohlslib.Track) error {
medias, err2 := hls.ToStream(c, tracks, params.Conf, &stream)
medias, err2 := hls.ToStream(c, tracks, params.Conf, &strm)
if err2 != nil {
return err2
}
@ -107,7 +107,7 @@ func (s *Source) Run(params defs.StaticSourceRunParams) error {
return res.Err
}
stream = res.Stream
strm = res.Stream
return nil
},

View file

@ -116,9 +116,9 @@ func (s *Source) runReader(nc net.Conn) error {
decodeErrors.Increase()
})
var stream *stream.Stream
var strm *stream.Stream
medias, err := mpegts.ToStream(mr, &stream, s)
medias, err := mpegts.ToStream(mr, &strm, s)
if err != nil {
return err
}
@ -134,7 +134,7 @@ func (s *Source) runReader(nc net.Conn) error {
defer s.Parent.SetNotReady(defs.PathSourceStaticSetNotReadyReq{})
stream = res.Stream
strm = res.Stream
for {
nc.SetReadDeadline(time.Now().Add(time.Duration(s.ReadTimeout)))

View file

@ -101,9 +101,9 @@ func (s *Source) runReader(conn *gortmplib.Client) error {
return err
}
var stream *stream.Stream
var strm *stream.Stream
medias, err := rtmp.ToStream(r, &stream)
medias, err := rtmp.ToStream(r, &strm)
if err != nil {
return err
}
@ -123,7 +123,7 @@ func (s *Source) runReader(conn *gortmplib.Client) error {
defer s.Parent.SetNotReady(defs.PathSourceStaticSetNotReadyReq{})
stream = res.Stream
strm = res.Stream
conn.NetConn().SetWriteDeadline(time.Time{})

View file

@ -133,7 +133,7 @@ func (s *Source) runReader(desc *description.Session, nc net.Conn) error {
decodeErrors.Start()
defer decodeErrors.Stop()
var stream *stream.Stream
var strm *stream.Stream
timeDecoder := &rtptime.GlobalDecoder{}
timeDecoder.Initialize()
@ -168,14 +168,14 @@ func (s *Source) runReader(desc *description.Session, nc net.Conn) error {
var pkt rtp.Packet
err = pkt.Unmarshal(buf[:n])
if err != nil {
if stream != nil {
if strm != nil {
decodeErrors.Increase()
continue
}
return err
}
if stream == nil {
if strm == nil {
res := s.Parent.SetReady(defs.PathSourceStaticSetReadyReq{
Desc: desc,
GenerateRTPPackets: false,
@ -187,7 +187,7 @@ func (s *Source) runReader(desc *description.Session, nc net.Conn) error {
defer s.Parent.SetNotReady(defs.PathSourceStaticSetNotReadyReq{})
stream = res.Stream
strm = res.Stream
}
media, ok := mediasByPayloadType[pkt.PayloadType]
@ -209,7 +209,7 @@ func (s *Source) runReader(desc *description.Session, nc net.Conn) error {
continue
}
stream.WriteRTPPacket(media.desc, forma.desc, pkt, time.Time{}, pts)
strm.WriteRTPPacket(media.desc, forma.desc, pkt, time.Time{}, pts)
}
}
}

View file

@ -16,6 +16,7 @@ import (
"github.com/bluenviron/mediamtx/internal/logger"
"github.com/bluenviron/mediamtx/internal/protocols/rtsp"
"github.com/bluenviron/mediamtx/internal/protocols/tls"
"github.com/bluenviron/mediamtx/internal/stream"
)
func createRangeHeader(cnf *conf.Path) (*headers.Range, error) {
@ -187,47 +188,7 @@ func (s *Source) Run(params defs.StaticSourceRunParams) error {
readErr := make(chan error)
go func() {
readErr <- func() error {
desc, _, err2 := c.Describe(u)
if err2 != nil {
return err2
}
err2 = c.SetupAll(desc.BaseURL, desc.Medias)
if err2 != nil {
return err2
}
res := s.Parent.SetReady(defs.PathSourceStaticSetReadyReq{
Desc: desc,
GenerateRTPPackets: false,
FillNTP: !params.Conf.UseAbsoluteTimestamp,
})
if res.Err != nil {
return res.Err
}
defer s.Parent.SetNotReady(defs.PathSourceStaticSetNotReadyReq{})
rtsp.ToStream(
c,
desc.Medias,
params.Conf,
res.Stream,
s)
rangeHeader, err2 := createRangeHeader(params.Conf)
if err2 != nil {
return err2
}
_, err2 = c.Play(rangeHeader)
if err2 != nil {
return err2
}
return c.Wait()
}()
readErr <- s.runInner(c, u, params.Conf)
}()
for {
@ -245,6 +206,52 @@ func (s *Source) Run(params defs.StaticSourceRunParams) error {
}
}
func (s *Source) runInner(c *gortsplib.Client, u *base.URL, pathConf *conf.Path) error {
desc, _, err := c.Describe(u)
if err != nil {
return err
}
err = c.SetupAll(desc.BaseURL, desc.Medias)
if err != nil {
return err
}
var strm *stream.Stream
rtsp.ToStream(
c,
desc.Medias,
pathConf,
&strm,
s)
res := s.Parent.SetReady(defs.PathSourceStaticSetReadyReq{
Desc: desc,
GenerateRTPPackets: false,
FillNTP: !pathConf.UseAbsoluteTimestamp,
})
if res.Err != nil {
return res.Err
}
defer s.Parent.SetNotReady(defs.PathSourceStaticSetNotReadyReq{})
strm = res.Stream
rangeHeader, err := createRangeHeader(pathConf)
if err != nil {
return err
}
_, err = c.Play(rangeHeader)
if err != nil {
return err
}
return c.Wait()
}
// APISourceDescribe implements StaticSource.
func (*Source) APISourceDescribe() defs.APIPathSourceOrReader {
return defs.APIPathSourceOrReader{

View file

@ -101,9 +101,9 @@ func (s *Source) runReader(sconn srt.Conn) error {
decodeErrors.Increase()
})
var stream *stream.Stream
var strm *stream.Stream
medias, err := mpegts.ToStream(r, &stream, s)
medias, err := mpegts.ToStream(r, &strm, s)
if err != nil {
return err
}
@ -119,7 +119,7 @@ func (s *Source) runReader(sconn srt.Conn) error {
defer s.Parent.SetNotReady(defs.PathSourceStaticSetNotReadyReq{})
stream = res.Stream
strm = res.Stream
for {
sconn.SetReadDeadline(time.Now().Add(time.Duration(s.ReadTimeout)))

View file

@ -67,9 +67,9 @@ func (s *Source) Run(params defs.StaticSourceRunParams) error {
return err
}
var stream *stream.Stream
var strm *stream.Stream
medias, err := webrtc.ToStream(client.PeerConnection(), params.Conf, &stream, s)
medias, err := webrtc.ToStream(client.PeerConnection(), params.Conf, &strm, s)
if err != nil {
client.Close() //nolint:errcheck
return err
@ -87,7 +87,7 @@ func (s *Source) Run(params defs.StaticSourceRunParams) error {
defer s.Parent.SetNotReady(defs.PathSourceStaticSetNotReadyReq{})
stream = rres.Stream
strm = rres.Stream
client.StartReading()