diff --git a/go.mod b/go.mod index 0791bf18..c1aadc00 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.15 require ( github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 // indirect github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d // indirect - github.com/aler9/gortsplib v0.0.0-20210318191934-8936db52e4cf + github.com/aler9/gortsplib v0.0.0-20210320202503-3defa7074839 github.com/davecgh/go-spew v1.1.1 // indirect github.com/fsnotify/fsnotify v1.4.9 github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 diff --git a/go.sum b/go.sum index d30de79d..8578715e 100644 --- a/go.sum +++ b/go.sum @@ -2,8 +2,8 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 h1:JYp7IbQjafo github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d h1:UQZhZ2O0vMHr2cI+DC1Mbh0TJxzA3RcLoMsFw+aXw7E= github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho= -github.com/aler9/gortsplib v0.0.0-20210318191934-8936db52e4cf h1:5wdYKMopoCRspHReiEqx6KWM1aqmQORumi9aWLlv7hM= -github.com/aler9/gortsplib v0.0.0-20210318191934-8936db52e4cf/go.mod h1:aj4kDzanb3JZ46sFywWShcsnqqXTLE/3PNjwDhQZGM0= +github.com/aler9/gortsplib v0.0.0-20210320202503-3defa7074839 h1:o9h0vJiX/65HrYoOKuEGhfY8ZWrzrkaB2Pp5KUvUNL8= +github.com/aler9/gortsplib v0.0.0-20210320202503-3defa7074839/go.mod h1:aj4kDzanb3JZ46sFywWShcsnqqXTLE/3PNjwDhQZGM0= github.com/aler9/rtmp v0.0.0-20210309202041-2d7177b7300d h1:LAX8pNvYpGgFpKdbPpEZWjNkHbmyvjMrT3vO7s7aaKU= github.com/aler9/rtmp v0.0.0-20210309202041-2d7177b7300d/go.mod h1:vzuE21rowz+lT1NGsWbreIvYulgBpCGnQyeTyFblUHc= github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= diff --git a/internal/path/path.go b/internal/path/path.go index 1100940b..15e179c2 100644 --- a/internal/path/path.go +++ b/internal/path/path.go @@ -50,8 +50,9 @@ type sourceRedirect struct{} func (*sourceRedirect) IsSource() {} type extSourceSetReadyReq struct { - tracks gortsplib.Tracks - res chan struct{} + tracks gortsplib.Tracks + startingPoints []*client.TrackStartingPoint + res chan struct{} } type extSourceSetNotReadyReq struct { @@ -246,7 +247,7 @@ outer: case req := <-pa.extSourceSetReady: pa.sourceTracks = req.tracks - pa.sourceTrackStartingPoints = make([]*client.TrackStartingPoint, len(req.tracks)) + pa.sourceTrackStartingPoints = req.startingPoints pa.onSourceSetReady() close(req.res) @@ -820,9 +821,10 @@ func (pa *Path) Name() string { } // OnExtSourceSetReady is called by a external source. -func (pa *Path) OnExtSourceSetReady(tracks gortsplib.Tracks) { +func (pa *Path) OnExtSourceSetReady(tracks gortsplib.Tracks, + startingPoints []*client.TrackStartingPoint) { res := make(chan struct{}) - pa.extSourceSetReady <- extSourceSetReadyReq{tracks, res} + pa.extSourceSetReady <- extSourceSetReadyReq{tracks, startingPoints, res} <-res } diff --git a/internal/sourcertmp/source.go b/internal/sourcertmp/source.go index 2c8eb735..529b6f3f 100644 --- a/internal/sourcertmp/source.go +++ b/internal/sourcertmp/source.go @@ -14,6 +14,7 @@ import ( "github.com/notedit/rtmp/codec/h264" "github.com/notedit/rtmp/format/rtmp" + "github.com/aler9/rtsp-simple-server/internal/client" "github.com/aler9/rtsp-simple-server/internal/logger" "github.com/aler9/rtsp-simple-server/internal/rtmputils" "github.com/aler9/rtsp-simple-server/internal/stats" @@ -26,7 +27,7 @@ const ( // Parent is implemeneted by path.Path. type Parent interface { Log(logger.Level, string, ...interface{}) - OnExtSourceSetReady(gortsplib.Tracks) + OnExtSourceSetReady(gortsplib.Tracks, []*client.TrackStartingPoint) OnExtSourceSetNotReady() OnFrame(int, gortsplib.StreamType, []byte) } @@ -173,7 +174,8 @@ func (s *Source) runInner() bool { } s.log(logger.Info, "ready") - s.parent.OnExtSourceSetReady(tracks) + s.parent.OnExtSourceSetReady(tracks, + make([]*client.TrackStartingPoint, len(tracks))) defer s.parent.OnExtSourceSetNotReady() readerDone := make(chan error) diff --git a/internal/sourcertsp/source.go b/internal/sourcertsp/source.go index 7ce7afd5..b81819a8 100644 --- a/internal/sourcertsp/source.go +++ b/internal/sourcertsp/source.go @@ -8,6 +8,7 @@ import ( "github.com/aler9/gortsplib" "github.com/aler9/gortsplib/pkg/base" + "github.com/aler9/rtsp-simple-server/internal/client" "github.com/aler9/rtsp-simple-server/internal/logger" "github.com/aler9/rtsp-simple-server/internal/stats" ) @@ -19,7 +20,7 @@ const ( // Parent is implemented by path.Path. type Parent interface { Log(logger.Level, string, ...interface{}) - OnExtSourceSetReady(gortsplib.Tracks) + OnExtSourceSetReady(gortsplib.Tracks, []*client.TrackStartingPoint) OnExtSourceSetNotReady() OnFrame(int, gortsplib.StreamType, []byte) } @@ -147,8 +148,47 @@ func (s *Source) runInner() bool { return true } + startingPoints := make([]*client.TrackStartingPoint, len(conn.Tracks())) + + if conn.RTPInfo() != nil { + for _, info := range *conn.RTPInfo() { + ipath, ok := info.URL.RTSPPath() + if !ok { + continue + } + + trackID := func() int { + for _, tr := range conn.Tracks() { + u, err := tr.URL() + if err != nil { + continue + } + + tpath, ok := u.RTSPPath() + if !ok { + continue + } + + if tpath == ipath { + return tr.ID + } + } + return -1 + }() + if trackID < 0 { + continue + } + + startingPoints[trackID] = &client.TrackStartingPoint{ + Filled: true, + SequenceNumber: info.SequenceNumber, + Timestamp: info.Timestamp, + } + } + } + s.log(logger.Info, "ready") - s.parent.OnExtSourceSetReady(conn.Tracks()) + s.parent.OnExtSourceSetReady(conn.Tracks(), startingPoints) defer s.parent.OnExtSourceSetNotReady() done := conn.ReadFrames(func(trackID int, streamType gortsplib.StreamType, payload []byte) { diff --git a/main_sourcertsp_test.go b/main_sourcertsp_test.go index 8077e7b8..c47ec040 100644 --- a/main_sourcertsp_test.go +++ b/main_sourcertsp_test.go @@ -1,10 +1,16 @@ package main import ( + "bufio" + "net" "os" + "strings" "testing" "time" + "github.com/aler9/gortsplib" + "github.com/aler9/gortsplib/pkg/base" + "github.com/aler9/gortsplib/pkg/headers" "github.com/stretchr/testify/require" ) @@ -111,3 +117,164 @@ func TestSourceRTSP(t *testing.T) { }) } } + +func TestSourceRTSPRTPInfo(t *testing.T) { + l, err := net.Listen("tcp", "localhost:8555") + require.NoError(t, err) + defer l.Close() + + serverDone := make(chan struct{}) + defer func() { <-serverDone }() + go func() { + defer close(serverDone) + + conn, err := l.Accept() + require.NoError(t, err) + bconn := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn)) + + var req base.Request + err = req.Read(bconn.Reader) + require.NoError(t, err) + require.Equal(t, base.Options, req.Method) + + err = base.Response{ + StatusCode: base.StatusOK, + Header: base.Header{ + "Public": base.HeaderValue{strings.Join([]string{ + string(base.Describe), + string(base.Setup), + string(base.Play), + }, ", ")}, + }, + }.Write(bconn.Writer) + require.NoError(t, err) + + err = req.Read(bconn.Reader) + require.NoError(t, err) + require.Equal(t, base.Describe, req.Method) + + track1, err := gortsplib.NewTrackH264(96, []byte("123456"), []byte("123456")) + require.NoError(t, err) + + track2, err := gortsplib.NewTrackH264(96, []byte("123456"), []byte("123456")) + require.NoError(t, err) + + err = base.Response{ + StatusCode: base.StatusOK, + Header: base.Header{ + "Content-Type": base.HeaderValue{"application/sdp"}, + }, + Body: gortsplib.Tracks{track1, track2}.Write(), + }.Write(bconn.Writer) + require.NoError(t, err) + + err = req.Read(bconn.Reader) + require.NoError(t, err) + require.Equal(t, base.Setup, req.Method) + + var th headers.Transport + err = th.Read(req.Header["Transport"]) + require.NoError(t, err) + + err = base.Response{ + StatusCode: base.StatusOK, + Header: base.Header{ + "Transport": headers.Transport{ + Protocol: gortsplib.StreamProtocolTCP, + Delivery: func() *base.StreamDelivery { + v := base.StreamDeliveryUnicast + return &v + }(), + ClientPorts: th.ClientPorts, + InterleavedIds: &[2]int{0, 1}, + }.Write(), + }, + }.Write(bconn.Writer) + require.NoError(t, err) + + err = req.Read(bconn.Reader) + require.NoError(t, err) + require.Equal(t, base.Setup, req.Method) + + err = th.Read(req.Header["Transport"]) + require.NoError(t, err) + + err = base.Response{ + StatusCode: base.StatusOK, + Header: base.Header{ + "Transport": headers.Transport{ + Protocol: gortsplib.StreamProtocolTCP, + Delivery: func() *base.StreamDelivery { + v := base.StreamDeliveryUnicast + return &v + }(), + ClientPorts: th.ClientPorts, + InterleavedIds: &[2]int{2, 3}, + }.Write(), + }, + }.Write(bconn.Writer) + require.NoError(t, err) + + err = req.Read(bconn.Reader) + require.NoError(t, err) + require.Equal(t, base.Play, req.Method) + + err = base.Response{ + StatusCode: base.StatusOK, + Header: base.Header{ + "RTP-Info": headers.RTPInfo{ + { + URL: base.MustParseURL("rtsp://127.0.0.1/stream/trackID=1"), + SequenceNumber: 34254, + Timestamp: 156457686, + }, + }.Write(), + }, + }.Write(bconn.Writer) + require.NoError(t, err) + + err = req.Read(bconn.Reader) + require.NoError(t, err) + require.Equal(t, base.Teardown, req.Method) + + err = base.Response{ + StatusCode: base.StatusOK, + }.Write(bconn.Writer) + require.NoError(t, err) + + conn.Close() + }() + + p1, ok := testProgram("rtmpDisable: yes\n" + + "paths:\n" + + " proxied:\n" + + " source: rtsp://localhost:8555/stream\n" + + " sourceProtocol: tcp\n") + require.Equal(t, true, ok) + defer p1.close() + + time.Sleep(1 * time.Second) + + conf := gortsplib.ClientConf{ + StreamProtocol: func() *gortsplib.StreamProtocol { + v := gortsplib.StreamProtocolTCP + return &v + }(), + } + + dest, err := conf.DialRead("rtsp://" + ownDockerIP + ":8554/proxied") + require.NoError(t, err) + defer dest.Close() + + require.Equal(t, &headers.RTPInfo{ + &headers.RTPInfoEntry{ + URL: &base.URL{ + Scheme: "rtsp", + Host: ownDockerIP + ":8554", + Path: "/proxied/trackID=1", + }, + SequenceNumber: 34254, + Timestamp: 156457686, + }, + }, dest.RTPInfo()) +}