From a6d9578d4cf8e17ecd9604b0e5d63fb52dc459d0 Mon Sep 17 00:00:00 2001 From: aler9 <46489434+aler9@users.noreply.github.com> Date: Sat, 26 Jun 2021 13:46:31 +0200 Subject: [PATCH] RTSP server: support setupping tracks with arbitrary interleaved IDs (#402) --- go.mod | 2 +- go.sum | 4 ++-- internal/hlsconverter/converter.go | 10 ++++++++-- internal/rtmpconn/conn.go | 24 ++++++++++++++---------- internal/rtmpsource/source.go | 12 ++++++------ main_rtspreadpub_test.go | 8 ++++---- 6 files changed, 35 insertions(+), 25 deletions(-) diff --git a/go.mod b/go.mod index 78714741..933430f7 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.16 require ( github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 // indirect github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d // indirect - github.com/aler9/gortsplib v0.0.0-20210622205248-a58563453211 + github.com/aler9/gortsplib v0.0.0-20210626112538-649c63cf5b62 github.com/asticode/go-astits v0.0.0-00010101000000-000000000000 github.com/davecgh/go-spew v1.1.1 // indirect github.com/fsnotify/fsnotify v1.4.9 diff --git a/go.sum b/go.sum index d5552ca3..0b80af96 100644 --- a/go.sum +++ b/go.sum @@ -4,8 +4,8 @@ github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d h1:UQZhZ2O0vMHr2c github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho= github.com/aler9/go-astits v0.0.0-20210423195926-582b09ed7c04 h1:CXgQLsU4uxWAmsXNOjGLbj0A+0IlRcpZpMgI13fmVwo= github.com/aler9/go-astits v0.0.0-20210423195926-582b09ed7c04/go.mod h1:DkOWmBNQpnr9mv24KfZjq4JawCFX1FCqjLVGvO0DygQ= -github.com/aler9/gortsplib v0.0.0-20210622205248-a58563453211 h1:x/0AMdA9hsiNsA+FwqgZwzHPhl42T66naVS4toiLg5E= -github.com/aler9/gortsplib v0.0.0-20210622205248-a58563453211/go.mod h1:s5FsbPRxJhU/YedvUKAKHVY+lQEdYsiJpuN2CHb89cI= +github.com/aler9/gortsplib v0.0.0-20210626112538-649c63cf5b62 h1:PPTqxgdDmDBQcDziEuLqS4VzmMTp5NSd7b3WZqQCtR4= +github.com/aler9/gortsplib v0.0.0-20210626112538-649c63cf5b62/go.mod h1:s5FsbPRxJhU/YedvUKAKHVY+lQEdYsiJpuN2CHb89cI= github.com/aler9/rtmp v0.0.0-20210403095203-3be4a5535927 h1:95mXJ5fUCYpBRdSOnLAQAdJHHKxxxJrVCiaqDi965YQ= github.com/aler9/rtmp v0.0.0-20210403095203-3be4a5535927/go.mod h1:vzuE21rowz+lT1NGsWbreIvYulgBpCGnQyeTyFblUHc= github.com/asticode/go-astikit v0.20.0 h1:+7N+J4E4lWx2QOkRdOf6DafWJMv6O4RRfgClwQokrH8= diff --git a/internal/hlsconverter/converter.go b/internal/hlsconverter/converter.go index c6400d27..a7995a75 100644 --- a/internal/hlsconverter/converter.go +++ b/internal/hlsconverter/converter.go @@ -263,10 +263,12 @@ func (c *Converter) runInner(innerCtx context.Context) error { c.path = res.Path var videoTrack *gortsplib.Track + videoTrackID := -1 var h264SPS []byte var h264PPS []byte var h264Decoder *rtph264.Decoder var audioTrack *gortsplib.Track + audioTrackID := -1 var aacConfig rtpaac.MPEG4AudioConfig var aacDecoder *rtpaac.Decoder @@ -275,7 +277,9 @@ func (c *Converter) runInner(innerCtx context.Context) error { if videoTrack != nil { return fmt.Errorf("can't read track %d with HLS: too many tracks", i+1) } + videoTrack = t + videoTrackID = i var err error h264SPS, h264PPS, err = t.ExtractDataH264() @@ -289,7 +293,9 @@ func (c *Converter) runInner(innerCtx context.Context) error { if audioTrack != nil { return fmt.Errorf("can't read track %d with HLS: too many tracks", i+1) } + audioTrack = t + audioTrackID = i byts, err := t.ExtractDataAAC() if err != nil { @@ -350,7 +356,7 @@ func (c *Converter) runInner(innerCtx context.Context) error { } pair := data.(trackIDPayloadPair) - if videoTrack != nil && pair.trackID == videoTrack.ID { + if videoTrack != nil && pair.trackID == videoTrackID { var pkt rtp.Packet err := pkt.Unmarshal(pair.buf) if err != nil { @@ -454,7 +460,7 @@ func (c *Converter) runInner(innerCtx context.Context) error { } } - } else if audioTrack != nil && pair.trackID == audioTrack.ID { + } else if audioTrack != nil && pair.trackID == audioTrackID { aus, pts, err := aacDecoder.Decode(pair.buf) if err != nil { if err != rtpaac.ErrMorePacketsNeeded { diff --git a/internal/rtmpconn/conn.go b/internal/rtmpconn/conn.go index 717af4a3..9cce7d94 100644 --- a/internal/rtmpconn/conn.go +++ b/internal/rtmpconn/conn.go @@ -233,8 +233,10 @@ func (c *Conn) runRead(ctx context.Context) error { c.path = res.Path var videoTrack *gortsplib.Track + videoTrackID := -1 var h264Decoder *rtph264.Decoder var audioTrack *gortsplib.Track + audioTrackID := -1 var audioClockRate int var aacDecoder *rtpaac.Decoder @@ -245,6 +247,7 @@ func (c *Conn) runRead(ctx context.Context) error { } videoTrack = t + videoTrackID = i h264Decoder = rtph264.NewDecoder() } else if t.IsAAC() { @@ -253,6 +256,7 @@ func (c *Conn) runRead(ctx context.Context) error { } audioTrack = t + audioTrackID = i audioClockRate, _ = audioTrack.ClockRate() aacDecoder = rtpaac.NewDecoder(audioClockRate) } @@ -291,7 +295,7 @@ func (c *Conn) runRead(ctx context.Context) error { } pair := data.(trackIDPayloadPair) - if videoTrack != nil && pair.trackID == videoTrack.ID { + if videoTrack != nil && pair.trackID == videoTrackID { nalus, pts, err := h264Decoder.Decode(pair.buf) if err != nil { if err != rtph264.ErrMorePacketsNeeded { @@ -335,7 +339,7 @@ func (c *Conn) runRead(ctx context.Context) error { videoBuf = nil } - } else if audioTrack != nil && pair.trackID == audioTrack.ID { + } else if audioTrack != nil && pair.trackID == audioTrackID { aus, pts, err := aacDecoder.Decode(pair.buf) if err != nil { if err != rtpaac.ErrMorePacketsNeeded { @@ -369,24 +373,24 @@ func (c *Conn) runPublish(ctx context.Context) error { } var tracks gortsplib.Tracks - var h264Encoder *rtph264.Encoder - var aacEncoder *rtpaac.Encoder + videoTrackID := -1 + audioTrackID := -1 + var h264Encoder *rtph264.Encoder if videoTrack != nil { h264Encoder = rtph264.NewEncoder(96, nil, nil, nil) + videoTrackID = len(tracks) tracks = append(tracks, videoTrack) } + var aacEncoder *rtpaac.Encoder if audioTrack != nil { clockRate, _ := audioTrack.ClockRate() aacEncoder = rtpaac.NewEncoder(96, clockRate, nil, nil, nil) + audioTrackID = len(tracks) tracks = append(tracks, audioTrack) } - for i, t := range tracks { - t.ID = i - } - pathName, query := pathNameAndQuery(c.conn.URL()) resc := make(chan readpublisher.AnnounceRes) @@ -499,7 +503,7 @@ func (c *Conn) runPublish(ctx context.Context) error { } for _, frame := range frames { - onFrame(videoTrack.ID, frame) + onFrame(videoTrackID, frame) } case av.AAC: @@ -513,7 +517,7 @@ func (c *Conn) runPublish(ctx context.Context) error { } for _, frame := range frames { - onFrame(audioTrack.ID, frame) + onFrame(audioTrackID, frame) } default: diff --git a/internal/rtmpsource/source.go b/internal/rtmpsource/source.go index a62b787f..7e3be649 100644 --- a/internal/rtmpsource/source.go +++ b/internal/rtmpsource/source.go @@ -153,10 +153,13 @@ func (s *Source) runInner() bool { } var tracks gortsplib.Tracks + videoTrackID := -1 + audioTrackID := -1 var h264Encoder *rtph264.Encoder if videoTrack != nil { h264Encoder = rtph264.NewEncoder(96, nil, nil, nil) + videoTrackID = len(tracks) tracks = append(tracks, videoTrack) } @@ -164,13 +167,10 @@ func (s *Source) runInner() bool { if audioTrack != nil { clockRate, _ := audioTrack.ClockRate() aacEncoder = rtpaac.NewEncoder(96, clockRate, nil, nil, nil) + audioTrackID = len(tracks) tracks = append(tracks, audioTrack) } - for i, t := range tracks { - t.ID = i - } - s.log(logger.Info, "ready") cres := make(chan source.ExtSetReadyRes) @@ -232,7 +232,7 @@ func (s *Source) runInner() bool { } for _, pkt := range pkts { - onFrame(videoTrack.ID, pkt) + onFrame(videoTrackID, pkt) } case av.AAC: @@ -246,7 +246,7 @@ func (s *Source) runInner() bool { } for _, pkt := range pkts { - onFrame(audioTrack.ID, pkt) + onFrame(audioTrackID, pkt) } default: diff --git a/main_rtspreadpub_test.go b/main_rtspreadpub_test.go index 077b81ff..6a55a1b9 100644 --- a/main_rtspreadpub_test.go +++ b/main_rtspreadpub_test.go @@ -514,7 +514,7 @@ func TestClientRTSPPublisherOverride(t *testing.T) { }) }() - err = s1.WriteFrame(track.ID, gortsplib.StreamTypeRTP, + err = s1.WriteFrame(0, gortsplib.StreamTypeRTP, []byte{0x01, 0x02, 0x03, 0x04}) if ca == "enabled" { require.Error(t, err) @@ -523,7 +523,7 @@ func TestClientRTSPPublisherOverride(t *testing.T) { } if ca == "enabled" { - err = s2.WriteFrame(track.ID, gortsplib.StreamTypeRTP, + err = s2.WriteFrame(0, gortsplib.StreamTypeRTP, []byte{0x05, 0x06, 0x07, 0x08}) require.NoError(t, err) } @@ -577,7 +577,7 @@ func TestClientRTSPNonCompliantFrameSize(t *testing.T) { }) }() - err = source.WriteFrame(track.ID, gortsplib.StreamTypeRTP, input) + err = source.WriteFrame(0, gortsplib.StreamTypeRTP, input) require.NoError(t, err) <-frameRecv @@ -641,7 +641,7 @@ func TestClientRTSPNonCompliantFrameSize(t *testing.T) { }) }() - err = source.WriteFrame(track.ID, gortsplib.StreamTypeRTP, input) + err = source.WriteFrame(0, gortsplib.StreamTypeRTP, input) require.NoError(t, err) <-frameRecv