From e78544f23ef18d2b8bfffdb067b4a7a345f6db07 Mon Sep 17 00:00:00 2001 From: aler9 <46489434+aler9@users.noreply.github.com> Date: Tue, 28 Sep 2021 15:44:59 +0200 Subject: [PATCH] update gortsplib --- go.mod | 2 +- go.sum | 4 +- internal/core/hls_muxer.go | 23 +++----- internal/core/rtmp_conn.go | 108 +++++++++++++++++++---------------- internal/core/rtmp_source.go | 26 +++++++-- internal/hls/client.go | 26 +++++++-- 6 files changed, 114 insertions(+), 75 deletions(-) diff --git a/go.mod b/go.mod index fcf5ccb0..c43eeff9 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.16 require ( github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 // indirect github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d // indirect - github.com/aler9/gortsplib v0.0.0-20210923181911-321fe06c6dab + github.com/aler9/gortsplib v0.0.0-20210928133338-9f4f3054daa7 github.com/asticode/go-astits v1.9.0 github.com/fsnotify/fsnotify v1.4.9 github.com/gin-gonic/gin v1.7.2 diff --git a/go.sum b/go.sum index 96d5400a..e5b5dacd 100644 --- a/go.sum +++ b/go.sum @@ -2,8 +2,8 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 h1:JYp7IbQjafo github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d h1:UQZhZ2O0vMHr2cI+DC1Mbh0TJxzA3RcLoMsFw+aXw7E= github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho= -github.com/aler9/gortsplib v0.0.0-20210923181911-321fe06c6dab h1:Dp3zUKCN/74UdxYBvHNKJubuYZeesDDCAB4gx6wZYh4= -github.com/aler9/gortsplib v0.0.0-20210923181911-321fe06c6dab/go.mod h1:DKI+t4Wj5YjkpxmiQhmG3qRG5VMOprDQvto62wMO68c= +github.com/aler9/gortsplib v0.0.0-20210928133338-9f4f3054daa7 h1:PGGpgZejUm7y95ai31cz3XMfwITWu/+xkQoPS4qq+io= +github.com/aler9/gortsplib v0.0.0-20210928133338-9f4f3054daa7/go.mod h1:DKI+t4Wj5YjkpxmiQhmG3qRG5VMOprDQvto62wMO68c= github.com/aler9/rtmp v0.0.0-20210403095203-3be4a5535927 h1:95mXJ5fUCYpBRdSOnLAQAdJHHKxxxJrVCiaqDi965YQ= github.com/aler9/rtmp v0.0.0-20210403095203-3be4a5535927/go.mod h1:vzuE21rowz+lT1NGsWbreIvYulgBpCGnQyeTyFblUHc= github.com/asticode/go-astikit v0.20.0 h1:+7N+J4E4lWx2QOkRdOf6DafWJMv6O4RRfgClwQokrH8= diff --git a/internal/core/hls_muxer.go b/internal/core/hls_muxer.go index adebd3e9..5b53e7cc 100644 --- a/internal/core/hls_muxer.go +++ b/internal/core/hls_muxer.go @@ -319,8 +319,6 @@ func (r *hlsMuxer) runInner(innerCtx context.Context, innerReady chan struct{}) writerDone := make(chan error) go func() { writerDone <- func() error { - var videoBuf [][]byte - for { data, ok := r.ringBuffer.Pull() if !ok { @@ -336,25 +334,18 @@ func (r *hlsMuxer) runInner(innerCtx context.Context, innerReady chan struct{}) continue } - nalus, pts, err := h264Decoder.DecodeRTP(&pkt) + nalus, pts, err := h264Decoder.DecodeUntilMarker(&pkt) if err != nil { - if err != rtph264.ErrMorePacketsNeeded && err != rtph264.ErrNonStartingPacketAndNoPrevious { + if err != rtph264.ErrMorePacketsNeeded && + err != rtph264.ErrNonStartingPacketAndNoPrevious { r.log(logger.Warn, "unable to decode video track: %v", err) } continue } - videoBuf = append(videoBuf, nalus...) - - // RTP marker means that all the NALUs with the same PTS have been received. - // send them together. - if pkt.Marker { - err := r.muxer.WriteH264(pts, videoBuf) - if err != nil { - return err - } - - videoBuf = nil + err = r.muxer.WriteH264(pts, nalus) + if err != nil { + return err } } else if audioTrack != nil && pair.trackID == audioTrackID { @@ -365,7 +356,7 @@ func (r *hlsMuxer) runInner(innerCtx context.Context, innerReady chan struct{}) continue } - aus, pts, err := aacDecoder.DecodeRTP(&pkt) + aus, pts, err := aacDecoder.Decode(&pkt) if err != nil { if err != rtpaac.ErrMorePacketsNeeded { r.log(logger.Warn, "unable to decode audio track: %v", err) diff --git a/internal/core/rtmp_conn.go b/internal/core/rtmp_conn.go index 21540b12..38dd8856 100644 --- a/internal/core/rtmp_conn.go +++ b/internal/core/rtmp_conn.go @@ -280,7 +280,6 @@ func (c *rtmpConn) runRead(ctx context.Context) error { // disable read deadline c.conn.NetConn().SetReadDeadline(time.Time{}) - var videoBuf [][]byte var videoStartPTS time.Duration var videoDTSEst *h264.DTSEstimator videoFirstIDRFound := false @@ -300,7 +299,7 @@ func (c *rtmpConn) runRead(ctx context.Context) error { continue } - nalus, pts, err := h264Decoder.DecodeRTP(&pkt) + nalus, pts, err := h264Decoder.DecodeUntilMarker(&pkt) if err != nil { if err != rtph264.ErrMorePacketsNeeded && err != rtph264.ErrNonStartingPacketAndNoPrevious { c.log(logger.Warn, "unable to decode video track: %v", err) @@ -308,6 +307,8 @@ func (c *rtmpConn) runRead(ctx context.Context) error { continue } + var nalusFiltered [][]byte + for _, nalu := range nalus { // remove SPS, PPS and AUD, not needed by RTMP typ := h264.NALUType(nalu[0] & 0x1F) @@ -316,54 +317,47 @@ func (c *rtmpConn) runRead(ctx context.Context) error { continue } - videoBuf = append(videoBuf, nalu) + nalusFiltered = append(nalusFiltered, nalu) } - // RTP marker means that all the NALUs with the same PTS have been received. - // send them together. - if pkt.Marker { - idrPresent := func() bool { - for _, nalu := range nalus { - typ := h264.NALUType(nalu[0] & 0x1F) - if typ == h264.NALUTypeIDR { - return true - } + idrPresent := func() bool { + for _, nalu := range nalus { + typ := h264.NALUType(nalu[0] & 0x1F) + if typ == h264.NALUTypeIDR { + return true } - return false - }() + } + return false + }() - // wait until we receive an IDR - if !videoFirstIDRFound { - if !idrPresent { - videoBuf = nil - continue - } - - videoFirstIDRFound = true - videoStartPTS = pts - videoDTSEst = h264.NewDTSEstimator() + // wait until we receive an IDR + if !videoFirstIDRFound { + if !idrPresent { + continue } - data, err := h264.EncodeAVCC(videoBuf) - if err != nil { - return err - } + videoFirstIDRFound = true + videoStartPTS = pts + videoDTSEst = h264.NewDTSEstimator() + } - pts -= videoStartPTS - dts := videoDTSEst.Feed(pts) + data, err := h264.EncodeAVCC(nalusFiltered) + if err != nil { + return err + } - c.conn.NetConn().SetWriteDeadline(time.Now().Add(time.Duration(c.writeTimeout))) - err = c.conn.WritePacket(av.Packet{ - Type: av.H264, - Data: data, - Time: dts, - CTime: pts - dts, - }) - if err != nil { - return err - } + pts -= videoStartPTS + dts := videoDTSEst.Feed(pts) - videoBuf = nil + c.conn.NetConn().SetWriteDeadline(time.Now().Add(time.Duration(c.writeTimeout))) + err = c.conn.WritePacket(av.Packet{ + Type: av.H264, + Data: data, + Time: dts, + CTime: pts - dts, + }) + if err != nil { + return err } } else if audioTrack != nil && pair.trackID == audioTrackID { @@ -374,7 +368,7 @@ func (c *rtmpConn) runRead(ctx context.Context) error { continue } - aus, pts, err := aacDecoder.DecodeRTP(&pkt) + aus, pts, err := aacDecoder.Decode(&pkt) if err != nil { if err != rtpaac.ErrMorePacketsNeeded { c.log(logger.Warn, "unable to decode audio track: %v", err) @@ -518,13 +512,22 @@ func (c *rtmpConn) runPublish(ctx context.Context) error { continue } - frames, err := h264Encoder.Encode(outNALUs, pkt.Time+pkt.CTime) + pkts, err := h264Encoder.Encode(outNALUs, pkt.Time+pkt.CTime) if err != nil { return fmt.Errorf("ERR while encoding H264: %v", err) } - for _, frame := range frames { - onFrame(videoTrackID, frame) + bytss := make([][]byte, len(pkts)) + for i, pkt := range pkts { + byts, err := pkt.Marshal() + if err != nil { + return fmt.Errorf("error while encoding H264: %v", err) + } + bytss[i] = byts + } + + for _, byts := range bytss { + onFrame(videoTrackID, byts) } case av.AAC: @@ -532,13 +535,22 @@ func (c *rtmpConn) runPublish(ctx context.Context) error { return fmt.Errorf("ERR: received an AAC frame, but track is not set up") } - frames, err := aacEncoder.Encode([][]byte{pkt.Data}, pkt.Time+pkt.CTime) + pkts, err := aacEncoder.Encode([][]byte{pkt.Data}, pkt.Time+pkt.CTime) if err != nil { return fmt.Errorf("ERR while encoding AAC: %v", err) } - for _, frame := range frames { - onFrame(audioTrackID, frame) + bytss := make([][]byte, len(pkts)) + for i, pkt := range pkts { + byts, err := pkt.Marshal() + if err != nil { + return fmt.Errorf("error while encoding AAC: %v", err) + } + bytss[i] = byts + } + + for _, byts := range bytss { + onFrame(audioTrackID, byts) } default: diff --git a/internal/core/rtmp_source.go b/internal/core/rtmp_source.go index 6c160628..c9ce52e0 100644 --- a/internal/core/rtmp_source.go +++ b/internal/core/rtmp_source.go @@ -206,8 +206,17 @@ func (s *rtmpSource) runInner() bool { return fmt.Errorf("ERR while encoding H264: %v", err) } - for _, pkt := range pkts { - onFrame(videoTrackID, pkt) + bytss := make([][]byte, len(pkts)) + for i, pkt := range pkts { + byts, err := pkt.Marshal() + if err != nil { + return fmt.Errorf("error while encoding H264: %v", err) + } + bytss[i] = byts + } + + for _, byts := range bytss { + onFrame(videoTrackID, byts) } case av.AAC: @@ -220,8 +229,17 @@ func (s *rtmpSource) runInner() bool { return fmt.Errorf("ERR while encoding AAC: %v", err) } - for _, pkt := range pkts { - onFrame(audioTrackID, pkt) + bytss := make([][]byte, len(pkts)) + for i, pkt := range pkts { + byts, err := pkt.Marshal() + if err != nil { + return fmt.Errorf("error while encoding AAC: %v", err) + } + bytss[i] = byts + } + + for _, byts := range bytss { + onFrame(audioTrackID, byts) } default: diff --git a/internal/hls/client.go b/internal/hls/client.go index d05eb6e7..e8e283f1 100644 --- a/internal/hls/client.go +++ b/internal/hls/client.go @@ -245,8 +245,17 @@ func (p *clientVideoProcessor) doProcess( return fmt.Errorf("error while encoding H264: %v", err) } - for _, pkt := range pkts { - p.onFrame(pkt) + bytss := make([][]byte, len(pkts)) + for i, pkt := range pkts { + byts, err := pkt.Marshal() + if err != nil { + return fmt.Errorf("error while encoding H264: %v", err) + } + bytss[i] = byts + } + + for _, byts := range bytss { + p.onFrame(byts) } return nil @@ -369,8 +378,17 @@ func (p *clientAudioProcessor) doProcess( return fmt.Errorf("error while encoding AAC: %v", err) } - for _, pkt := range pkts { - p.onFrame(pkt) + bytss := make([][]byte, len(pkts)) + for i, pkt := range pkts { + byts, err := pkt.Marshal() + if err != nil { + return fmt.Errorf("error while encoding AAC: %v", err) + } + bytss[i] = byts + } + + for _, byts := range bytss { + p.onFrame(byts) } return nil