From 1704fba5e9bee6a1d632efe11c7ca0d8735b7535 Mon Sep 17 00:00:00 2001 From: Alessandro Ros Date: Wed, 24 Dec 2025 20:44:56 +0100 Subject: [PATCH] rtsp: avoid setupping back channels (#5074) (#5289) --- internal/staticsources/rtsp/source.go | 29 +++- internal/staticsources/rtsp/source_test.go | 179 ++++++++++++++++++++- 2 files changed, 201 insertions(+), 7 deletions(-) diff --git a/internal/staticsources/rtsp/source.go b/internal/staticsources/rtsp/source.go index f512be57..0a21f506 100644 --- a/internal/staticsources/rtsp/source.go +++ b/internal/staticsources/rtsp/source.go @@ -2,12 +2,14 @@ package rtsp import ( + "fmt" "net/url" "regexp" "time" "github.com/bluenviron/gortsplib/v5" "github.com/bluenviron/gortsplib/v5/pkg/base" + "github.com/bluenviron/gortsplib/v5/pkg/description" "github.com/bluenviron/gortsplib/v5/pkg/headers" "github.com/bluenviron/mediamtx/internal/conf" @@ -212,22 +214,39 @@ func (s *Source) runInner(c *gortsplib.Client, u *base.URL, pathConf *conf.Path) return err } - err = c.SetupAll(desc.BaseURL, desc.Medias) - if err != nil { - return err + var medias []*description.Media + + for _, m := range desc.Medias { + if !m.IsBackChannel { + _, err = c.Setup(desc.BaseURL, m, 0, 0) + if err != nil { + return err + } + + medias = append(medias, m) + } + } + + if medias == nil { + return fmt.Errorf("no medias have been setupped") + } + + desc2 := &description.Session{ + Title: desc.Title, + Medias: medias, } var strm *stream.Stream rtsp.ToStream( c, - desc.Medias, + desc2.Medias, pathConf, &strm, s) res := s.Parent.SetReady(defs.PathSourceStaticSetReadyReq{ - Desc: desc, + Desc: desc2, GenerateRTPPackets: false, FillNTP: !pathConf.UseAbsoluteTimestamp, }) diff --git a/internal/staticsources/rtsp/source_test.go b/internal/staticsources/rtsp/source_test.go index 7f15ece9..d7242f74 100644 --- a/internal/staticsources/rtsp/source_test.go +++ b/internal/staticsources/rtsp/source_test.go @@ -11,6 +11,7 @@ import ( "github.com/bluenviron/gortsplib/v5/pkg/auth" "github.com/bluenviron/gortsplib/v5/pkg/base" "github.com/bluenviron/gortsplib/v5/pkg/description" + "github.com/bluenviron/gortsplib/v5/pkg/format" "github.com/pion/rtp" "github.com/stretchr/testify/require" @@ -19,6 +20,10 @@ import ( "github.com/bluenviron/mediamtx/internal/test" ) +func ptrOf[T any](v T) *T { + return &v +} + type testServer struct { onDescribe func(*gortsplib.ServerHandlerOnDescribeCtx) (*base.Response, *gortsplib.ServerStream, error) onSetup func(*gortsplib.ServerHandlerOnSetupCtx) (*base.Response, *gortsplib.ServerStream, error) @@ -189,7 +194,7 @@ func TestSource(t *testing.T) { } } -func TestSourceNoPassword(t *testing.T) { +func TestNoPassword(t *testing.T) { var strm *gortsplib.ServerStream nonce, err := auth.GenerateNonce() @@ -290,7 +295,7 @@ func TestSourceNoPassword(t *testing.T) { <-p.Unit } -func TestSourceRange(t *testing.T) { +func TestRange(t *testing.T) { for _, ca := range []string{"clock", "npt", "smpte"} { t.Run(ca, func(t *testing.T) { var strm *gortsplib.ServerStream @@ -403,3 +408,173 @@ func TestSourceRange(t *testing.T) { }) } } + +func TestSkipBackChannel(t *testing.T) { + media0 := test.UniqueMediaH264() + media1 := test.UniqueMediaMPEG4Audio() + backChannelMedia := &description.Media{ + Type: description.MediaTypeAudio, + Formats: []format.Format{&format.Opus{PayloadTyp: 96, ChannelCount: 2}}, + IsBackChannel: true, + } + + var strm *gortsplib.ServerStream + setupCount := 0 + + s := gortsplib.Server{ + Handler: &testServer{ + onDescribe: func(_ *gortsplib.ServerHandlerOnDescribeCtx) (*base.Response, *gortsplib.ServerStream, error) { + return &base.Response{ + StatusCode: base.StatusOK, + }, strm, nil + }, + onSetup: func(_ *gortsplib.ServerHandlerOnSetupCtx) (*base.Response, *gortsplib.ServerStream, error) { + setupCount++ + return &base.Response{ + StatusCode: base.StatusOK, + }, strm, nil + }, + onPlay: func(_ *gortsplib.ServerHandlerOnPlayCtx) (*base.Response, error) { + go func() { + time.Sleep(100 * time.Millisecond) + err := strm.WritePacketRTP(media0, &rtp.Packet{ + Header: rtp.Header{ + Version: 0x02, + PayloadType: 96, + SequenceNumber: 57899, + Timestamp: 345234345, + SSRC: 978651231, + Marker: true, + }, + Payload: []byte{5, 1, 2, 3, 4}, + }) + require.NoError(t, err) + }() + + return &base.Response{ + StatusCode: base.StatusOK, + }, nil + }, + }, + RTSPAddress: "127.0.0.1:8555", + } + + err := s.Start() + require.NoError(t, err) + defer s.Close() + + strm = &gortsplib.ServerStream{ + Server: &s, + Desc: &description.Session{Medias: []*description.Media{media0, media1, backChannelMedia}}, + } + err = strm.Initialize() + require.NoError(t, err) + defer strm.Close() + + var sp conf.RTSPTransport + sp.UnmarshalJSON([]byte(`"tcp"`)) //nolint:errcheck + + p := &test.StaticSourceParent{} + p.Initialize() + defer p.Close() + + so := &Source{ + ReadTimeout: conf.Duration(10 * time.Second), + WriteTimeout: conf.Duration(10 * time.Second), + WriteQueueSize: 2048, + Parent: p, + } + + done := make(chan struct{}) + defer func() { <-done }() + + ctx, ctxCancel := context.WithCancel(context.Background()) + defer ctxCancel() + + go func() { + so.Run(defs.StaticSourceRunParams{ //nolint:errcheck + Context: ctx, + ResolvedSource: "rtsp://127.0.0.1:8555/teststream", + Conf: &conf.Path{ + RTSPTransport: conf.RTSPTransport{Protocol: ptrOf(gortsplib.ProtocolTCP)}, + }, + }) + close(done) + }() + + <-p.Unit + + require.Equal(t, 2, setupCount) +} + +func TestOnlyBackChannelsError(t *testing.T) { + backChannelMedia1 := &description.Media{ + Type: description.MediaTypeAudio, + Formats: []format.Format{&format.Opus{PayloadTyp: 96, ChannelCount: 2}}, + IsBackChannel: true, + } + backChannelMedia2 := &description.Media{ + Type: description.MediaTypeAudio, + Formats: []format.Format{&format.G711{PayloadTyp: 8, SampleRate: 8000, ChannelCount: 1}}, + IsBackChannel: true, + } + + var strm *gortsplib.ServerStream + + s := gortsplib.Server{ + Handler: &testServer{ + onDescribe: func(_ *gortsplib.ServerHandlerOnDescribeCtx) (*base.Response, *gortsplib.ServerStream, error) { + return &base.Response{ + StatusCode: base.StatusOK, + }, strm, nil + }, + onSetup: func(_ *gortsplib.ServerHandlerOnSetupCtx) (*base.Response, *gortsplib.ServerStream, error) { + return &base.Response{ + StatusCode: base.StatusOK, + }, strm, nil + }, + onPlay: func(_ *gortsplib.ServerHandlerOnPlayCtx) (*base.Response, error) { + return &base.Response{ + StatusCode: base.StatusOK, + }, nil + }, + }, + RTSPAddress: "127.0.0.1:8555", + } + + err := s.Start() + require.NoError(t, err) + defer s.Close() + + strm = &gortsplib.ServerStream{ + Server: &s, + Desc: &description.Session{Medias: []*description.Media{backChannelMedia1, backChannelMedia2}}, + } + err = strm.Initialize() + require.NoError(t, err) + defer strm.Close() + + p := &test.StaticSourceParent{} + p.Initialize() + + so := &Source{ + ReadTimeout: conf.Duration(10 * time.Second), + WriteTimeout: conf.Duration(10 * time.Second), + WriteQueueSize: 2048, + Parent: p, + } + + ctx, ctxCancel := context.WithCancel(context.Background()) + defer ctxCancel() + + err = so.Run(defs.StaticSourceRunParams{ + Context: ctx, + ResolvedSource: "rtsp://127.0.0.1:8555/teststream", + Conf: &conf.Path{ + RTSPTransport: conf.RTSPTransport{Protocol: ptrOf(gortsplib.ProtocolTCP)}, + }, + }) + + require.Error(t, err) + require.Contains(t, err.Error(), "no media") +}