diff --git a/internal/core/path_test.go b/internal/core/path_test.go index 42deecc1..93278efb 100644 --- a/internal/core/path_test.go +++ b/internal/core/path_test.go @@ -842,7 +842,7 @@ func TestPathFallback(t *testing.T) { } func TestPathResolveSource(t *testing.T) { - var stream *gortsplib.ServerStream + var strm *gortsplib.ServerStream s := gortsplib.Server{ Handler: &testServer{ @@ -852,12 +852,12 @@ func TestPathResolveSource(t *testing.T) { require.Equal(t, "/a", ctx.Path) return &base.Response{ StatusCode: base.StatusOK, - }, stream, nil + }, strm, nil }, onSetup: func(_ *gortsplib.ServerHandlerOnSetupCtx) (*base.Response, *gortsplib.ServerStream, error) { return &base.Response{ StatusCode: base.StatusOK, - }, stream, nil + }, strm, nil }, onPlay: func(_ *gortsplib.ServerHandlerOnPlayCtx) (*base.Response, error) { return &base.Response{ @@ -872,13 +872,13 @@ func TestPathResolveSource(t *testing.T) { require.NoError(t, err) defer s.Close() - stream = &gortsplib.ServerStream{ + strm = &gortsplib.ServerStream{ Server: &s, Desc: &description.Session{Medias: []*description.Media{test.MediaH264}}, } - err = stream.Initialize() + err = strm.Initialize() require.NoError(t, err) - defer stream.Close() + defer strm.Close() p, ok := newInstance( "paths:\n" + diff --git a/internal/protocols/rtsp/to_stream.go b/internal/protocols/rtsp/to_stream.go index bbf277e8..658a14a8 100644 --- a/internal/protocols/rtsp/to_stream.go +++ b/internal/protocols/rtsp/to_stream.go @@ -32,7 +32,7 @@ func ToStream( source rtspSource, medias []*description.Media, pathConf *conf.Path, - strm *stream.Stream, + strm **stream.Stream, log logger.Writer, ) { for _, medi := range medias { @@ -82,7 +82,7 @@ func ToStream( return } - strm.WriteRTPPacket(cmedi, cforma, pkt, ntp, pts) + (*strm).WriteRTPPacket(cmedi, cforma, pkt, ntp, pts) }) } } diff --git a/internal/protocols/webrtc/to_stream_test.go b/internal/protocols/webrtc/to_stream_test.go index afa5437d..b259e54c 100644 --- a/internal/protocols/webrtc/to_stream_test.go +++ b/internal/protocols/webrtc/to_stream_test.go @@ -405,8 +405,8 @@ func TestToStream(t *testing.T) { err = pc2.GatherIncomingTracks() require.NoError(t, err) - var stream *stream.Stream - medias, err := ToStream(pc2, &conf.Path{}, &stream, nil) + var strm *stream.Stream + medias, err := ToStream(pc2, &conf.Path{}, &strm, nil) require.NoError(t, err) require.Equal(t, ca.out, medias[0].Formats[0]) }) diff --git a/internal/servers/rtmp/conn.go b/internal/servers/rtmp/conn.go index 14b738dd..04209f21 100644 --- a/internal/servers/rtmp/conn.go +++ b/internal/servers/rtmp/conn.go @@ -229,15 +229,15 @@ func (c *conn) runPublish() error { return err } - var stream *stream.Stream + var strm *stream.Stream - medias, err := rtmp.ToStream(r, &stream) + medias, err := rtmp.ToStream(r, &strm) if err != nil { return err } var path defs.Path - path, stream, err = c.pathManager.AddPublisher(defs.PathAddPublisherReq{ + path, strm, err = c.pathManager.AddPublisher(defs.PathAddPublisherReq{ Author: c, Desc: &description.Session{Medias: medias}, GenerateRTPPackets: true, diff --git a/internal/servers/rtsp/conn.go b/internal/servers/rtsp/conn.go index 9966c6ea..b6b85b6e 100644 --- a/internal/servers/rtsp/conn.go +++ b/internal/servers/rtsp/conn.go @@ -193,16 +193,16 @@ func (c *conn) onDescribe(ctx *gortsplib.ServerHandlerOnDescribeCtx, }, nil, nil } - var stream *gortsplib.ServerStream + var strm *gortsplib.ServerStream if !c.isTLS { - stream = res.Stream.RTSPStream(c.rserver) + strm = res.Stream.RTSPStream(c.rserver) } else { - stream = res.Stream.RTSPSStream(c.rserver) + strm = res.Stream.RTSPSStream(c.rserver) } return &base.Response{ StatusCode: base.StatusOK, - }, stream, nil + }, strm, nil } func (c *conn) handleAuthError(err *auth.Error) (*base.Response, error) { diff --git a/internal/servers/rtsp/session.go b/internal/servers/rtsp/session.go index fd9e1f03..4e91293c 100644 --- a/internal/servers/rtsp/session.go +++ b/internal/servers/rtsp/session.go @@ -331,16 +331,16 @@ func (s *session) onRecord(_ *gortsplib.ServerHandlerOnRecordCtx) (*base.Respons }, err } - s.path = path - s.stream = stream - rtsp.ToStream( s.rsession, s.rsession.AnnouncedDescription().Medias, - s.path.SafeConf(), - stream, + path.SafeConf(), + &s.stream, s) + s.path = path + s.stream = stream + return &base.Response{ StatusCode: base.StatusOK, }, nil diff --git a/internal/servers/srt/conn.go b/internal/servers/srt/conn.go index a15cd317..5684d24e 100644 --- a/internal/servers/srt/conn.go +++ b/internal/servers/srt/conn.go @@ -213,15 +213,15 @@ func (c *conn) runPublishReader(sconn srt.Conn, streamID *streamID, pathConf *co decodeErrors.Increase() }) - var stream *stream.Stream + var strm *stream.Stream - medias, err := mpegts.ToStream(r, &stream, c) + medias, err := mpegts.ToStream(r, &strm, c) if err != nil { return err } var path defs.Path - path, stream, err = c.pathManager.AddPublisher(defs.PathAddPublisherReq{ + path, strm, err = c.pathManager.AddPublisher(defs.PathAddPublisherReq{ Author: c, Desc: &description.Session{Medias: medias}, GenerateRTPPackets: true, diff --git a/internal/servers/webrtc/session.go b/internal/servers/webrtc/session.go index 183385ee..75a869a6 100644 --- a/internal/servers/webrtc/session.go +++ b/internal/servers/webrtc/session.go @@ -233,15 +233,15 @@ func (s *session) runPublish() (int, error) { return 0, err } - var stream *stream.Stream + var strm *stream.Stream - medias, err := webrtc.ToStream(pc, pathConf, &stream, s) + medias, err := webrtc.ToStream(pc, pathConf, &strm, s) if err != nil { return 0, err } var path defs.Path - path, stream, err = s.pathManager.AddPublisher(defs.PathAddPublisherReq{ + path, strm, err = s.pathManager.AddPublisher(defs.PathAddPublisherReq{ Author: s, Desc: &description.Session{Medias: medias}, GenerateRTPPackets: false, diff --git a/internal/staticsources/hls/source.go b/internal/staticsources/hls/source.go index ff7ba90b..bb96ae87 100644 --- a/internal/staticsources/hls/source.go +++ b/internal/staticsources/hls/source.go @@ -37,10 +37,10 @@ func (s *Source) Log(level logger.Level, format string, args ...any) { // Run implements StaticSource. func (s *Source) Run(params defs.StaticSourceRunParams) error { - var stream *stream.Stream + var strm *stream.Stream defer func() { - if stream != nil { + if strm != nil { s.Parent.SetNotReady(defs.PathSourceStaticSetNotReadyReq{}) } }() @@ -94,7 +94,7 @@ func (s *Source) Run(params defs.StaticSourceRunParams) error { decodeErrors.Increase() }, OnTracks: func(tracks []*gohlslib.Track) error { - medias, err2 := hls.ToStream(c, tracks, params.Conf, &stream) + medias, err2 := hls.ToStream(c, tracks, params.Conf, &strm) if err2 != nil { return err2 } @@ -107,7 +107,7 @@ func (s *Source) Run(params defs.StaticSourceRunParams) error { return res.Err } - stream = res.Stream + strm = res.Stream return nil }, diff --git a/internal/staticsources/mpegts/source.go b/internal/staticsources/mpegts/source.go index c13cec82..590576a2 100644 --- a/internal/staticsources/mpegts/source.go +++ b/internal/staticsources/mpegts/source.go @@ -116,9 +116,9 @@ func (s *Source) runReader(nc net.Conn) error { decodeErrors.Increase() }) - var stream *stream.Stream + var strm *stream.Stream - medias, err := mpegts.ToStream(mr, &stream, s) + medias, err := mpegts.ToStream(mr, &strm, s) if err != nil { return err } @@ -134,7 +134,7 @@ func (s *Source) runReader(nc net.Conn) error { defer s.Parent.SetNotReady(defs.PathSourceStaticSetNotReadyReq{}) - stream = res.Stream + strm = res.Stream for { nc.SetReadDeadline(time.Now().Add(time.Duration(s.ReadTimeout))) diff --git a/internal/staticsources/rtmp/source.go b/internal/staticsources/rtmp/source.go index 9fff458e..772b82a4 100644 --- a/internal/staticsources/rtmp/source.go +++ b/internal/staticsources/rtmp/source.go @@ -101,9 +101,9 @@ func (s *Source) runReader(conn *gortmplib.Client) error { return err } - var stream *stream.Stream + var strm *stream.Stream - medias, err := rtmp.ToStream(r, &stream) + medias, err := rtmp.ToStream(r, &strm) if err != nil { return err } @@ -123,7 +123,7 @@ func (s *Source) runReader(conn *gortmplib.Client) error { defer s.Parent.SetNotReady(defs.PathSourceStaticSetNotReadyReq{}) - stream = res.Stream + strm = res.Stream conn.NetConn().SetWriteDeadline(time.Time{}) diff --git a/internal/staticsources/rtp/source.go b/internal/staticsources/rtp/source.go index 8e3dc21d..735eba28 100644 --- a/internal/staticsources/rtp/source.go +++ b/internal/staticsources/rtp/source.go @@ -133,7 +133,7 @@ func (s *Source) runReader(desc *description.Session, nc net.Conn) error { decodeErrors.Start() defer decodeErrors.Stop() - var stream *stream.Stream + var strm *stream.Stream timeDecoder := &rtptime.GlobalDecoder{} timeDecoder.Initialize() @@ -168,14 +168,14 @@ func (s *Source) runReader(desc *description.Session, nc net.Conn) error { var pkt rtp.Packet err = pkt.Unmarshal(buf[:n]) if err != nil { - if stream != nil { + if strm != nil { decodeErrors.Increase() continue } return err } - if stream == nil { + if strm == nil { res := s.Parent.SetReady(defs.PathSourceStaticSetReadyReq{ Desc: desc, GenerateRTPPackets: false, @@ -187,7 +187,7 @@ func (s *Source) runReader(desc *description.Session, nc net.Conn) error { defer s.Parent.SetNotReady(defs.PathSourceStaticSetNotReadyReq{}) - stream = res.Stream + strm = res.Stream } media, ok := mediasByPayloadType[pkt.PayloadType] @@ -209,7 +209,7 @@ func (s *Source) runReader(desc *description.Session, nc net.Conn) error { continue } - stream.WriteRTPPacket(media.desc, forma.desc, pkt, time.Time{}, pts) + strm.WriteRTPPacket(media.desc, forma.desc, pkt, time.Time{}, pts) } } } diff --git a/internal/staticsources/rtsp/source.go b/internal/staticsources/rtsp/source.go index 23d16b10..f512be57 100644 --- a/internal/staticsources/rtsp/source.go +++ b/internal/staticsources/rtsp/source.go @@ -16,6 +16,7 @@ import ( "github.com/bluenviron/mediamtx/internal/logger" "github.com/bluenviron/mediamtx/internal/protocols/rtsp" "github.com/bluenviron/mediamtx/internal/protocols/tls" + "github.com/bluenviron/mediamtx/internal/stream" ) func createRangeHeader(cnf *conf.Path) (*headers.Range, error) { @@ -187,47 +188,7 @@ func (s *Source) Run(params defs.StaticSourceRunParams) error { readErr := make(chan error) go func() { - readErr <- func() error { - desc, _, err2 := c.Describe(u) - if err2 != nil { - return err2 - } - - err2 = c.SetupAll(desc.BaseURL, desc.Medias) - if err2 != nil { - return err2 - } - - res := s.Parent.SetReady(defs.PathSourceStaticSetReadyReq{ - Desc: desc, - GenerateRTPPackets: false, - FillNTP: !params.Conf.UseAbsoluteTimestamp, - }) - if res.Err != nil { - return res.Err - } - - defer s.Parent.SetNotReady(defs.PathSourceStaticSetNotReadyReq{}) - - rtsp.ToStream( - c, - desc.Medias, - params.Conf, - res.Stream, - s) - - rangeHeader, err2 := createRangeHeader(params.Conf) - if err2 != nil { - return err2 - } - - _, err2 = c.Play(rangeHeader) - if err2 != nil { - return err2 - } - - return c.Wait() - }() + readErr <- s.runInner(c, u, params.Conf) }() for { @@ -245,6 +206,52 @@ func (s *Source) Run(params defs.StaticSourceRunParams) error { } } +func (s *Source) runInner(c *gortsplib.Client, u *base.URL, pathConf *conf.Path) error { + desc, _, err := c.Describe(u) + if err != nil { + return err + } + + err = c.SetupAll(desc.BaseURL, desc.Medias) + if err != nil { + return err + } + + var strm *stream.Stream + + rtsp.ToStream( + c, + desc.Medias, + pathConf, + &strm, + s) + + res := s.Parent.SetReady(defs.PathSourceStaticSetReadyReq{ + Desc: desc, + GenerateRTPPackets: false, + FillNTP: !pathConf.UseAbsoluteTimestamp, + }) + if res.Err != nil { + return res.Err + } + + defer s.Parent.SetNotReady(defs.PathSourceStaticSetNotReadyReq{}) + + strm = res.Stream + + rangeHeader, err := createRangeHeader(pathConf) + if err != nil { + return err + } + + _, err = c.Play(rangeHeader) + if err != nil { + return err + } + + return c.Wait() +} + // APISourceDescribe implements StaticSource. func (*Source) APISourceDescribe() defs.APIPathSourceOrReader { return defs.APIPathSourceOrReader{ diff --git a/internal/staticsources/srt/source.go b/internal/staticsources/srt/source.go index 5e091750..48acd8a9 100644 --- a/internal/staticsources/srt/source.go +++ b/internal/staticsources/srt/source.go @@ -101,9 +101,9 @@ func (s *Source) runReader(sconn srt.Conn) error { decodeErrors.Increase() }) - var stream *stream.Stream + var strm *stream.Stream - medias, err := mpegts.ToStream(r, &stream, s) + medias, err := mpegts.ToStream(r, &strm, s) if err != nil { return err } @@ -119,7 +119,7 @@ func (s *Source) runReader(sconn srt.Conn) error { defer s.Parent.SetNotReady(defs.PathSourceStaticSetNotReadyReq{}) - stream = res.Stream + strm = res.Stream for { sconn.SetReadDeadline(time.Now().Add(time.Duration(s.ReadTimeout))) diff --git a/internal/staticsources/webrtc/source.go b/internal/staticsources/webrtc/source.go index 2ecbd9aa..848724fe 100644 --- a/internal/staticsources/webrtc/source.go +++ b/internal/staticsources/webrtc/source.go @@ -67,9 +67,9 @@ func (s *Source) Run(params defs.StaticSourceRunParams) error { return err } - var stream *stream.Stream + var strm *stream.Stream - medias, err := webrtc.ToStream(client.PeerConnection(), params.Conf, &stream, s) + medias, err := webrtc.ToStream(client.PeerConnection(), params.Conf, &strm, s) if err != nil { client.Close() //nolint:errcheck return err @@ -87,7 +87,7 @@ func (s *Source) Run(params defs.StaticSourceRunParams) error { defer s.Parent.SetNotReady(defs.PathSourceStaticSetNotReadyReq{}) - stream = rres.Stream + strm = rres.Stream client.StartReading()