From fb0122ba186fa21ee7e44d94fdfb8be75ffcbe8b Mon Sep 17 00:00:00 2001 From: aler9 <46489434+aler9@users.noreply.github.com> Date: Mon, 5 Apr 2021 18:14:06 +0200 Subject: [PATCH] RTMP client: speed up video reading by 1 frame --- go.mod | 2 +- go.sum | 4 +- internal/clientrtmp/client.go | 73 ++++++++++++++--------------------- internal/h264/nalutype.go | 40 +++++++++---------- internal/sourcertmp/source.go | 26 ++++++------- 5 files changed, 65 insertions(+), 80 deletions(-) diff --git a/go.mod b/go.mod index 7ff93531..6ea920a9 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.15 require ( github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 // indirect github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d // indirect - github.com/aler9/gortsplib v0.0.0-20210404174403-c2d5ced43bcf + github.com/aler9/gortsplib v0.0.0-20210405155604-491b36cdc4b2 github.com/davecgh/go-spew v1.1.1 // indirect github.com/fsnotify/fsnotify v1.4.9 github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 diff --git a/go.sum b/go.sum index 3b88b4f6..76b63cd5 100644 --- a/go.sum +++ b/go.sum @@ -2,8 +2,8 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 h1:JYp7IbQjafo github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d h1:UQZhZ2O0vMHr2cI+DC1Mbh0TJxzA3RcLoMsFw+aXw7E= github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho= -github.com/aler9/gortsplib v0.0.0-20210404174403-c2d5ced43bcf h1:X8ToQ1H9I7gLp37fxH7mo/U0y+GgF+kwmHpeht4MUHE= -github.com/aler9/gortsplib v0.0.0-20210404174403-c2d5ced43bcf/go.mod h1:zVCg+TQX445hh1pC5QgAuuBvvXZMWLY1XYz626dGFqY= +github.com/aler9/gortsplib v0.0.0-20210405155604-491b36cdc4b2 h1:AYmBhFE5DTGoQ8XKqmklWO3+CCYYR1X18XL/b/sndmM= +github.com/aler9/gortsplib v0.0.0-20210405155604-491b36cdc4b2/go.mod h1:zVCg+TQX445hh1pC5QgAuuBvvXZMWLY1XYz626dGFqY= github.com/aler9/rtmp v0.0.0-20210403095203-3be4a5535927 h1:95mXJ5fUCYpBRdSOnLAQAdJHHKxxxJrVCiaqDi965YQ= github.com/aler9/rtmp v0.0.0-20210403095203-3be4a5535927/go.mod h1:vzuE21rowz+lT1NGsWbreIvYulgBpCGnQyeTyFblUHc= github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= diff --git a/internal/clientrtmp/client.go b/internal/clientrtmp/client.go index 4b46f24a..ac40847e 100644 --- a/internal/clientrtmp/client.go +++ b/internal/clientrtmp/client.go @@ -223,6 +223,7 @@ func (c *Client) runRead() { var videoTrack *gortsplib.Track var h264Decoder *rtph264.Decoder var audioTrack *gortsplib.Track + var audioClockRate int var aacDecoder *rtpaac.Decoder err = func() error { @@ -241,8 +242,8 @@ func (c *Client) runRead() { } audioTrack = t - clockRate, _ := audioTrack.ClockRate() - aacDecoder = rtpaac.NewDecoder(clockRate) + audioClockRate, _ = audioTrack.ClockRate() + aacDecoder = rtpaac.NewDecoder(audioClockRate) } } @@ -284,9 +285,8 @@ func (c *Client) runRead() { go func() { writerDone <- func() error { videoInitialized := false - var videoStartDTS time.Time var videoBuf [][]byte - var videoPTS time.Duration + var videoStartDTS time.Time for { data, ok := c.ringBuffer.Pull() @@ -295,10 +295,8 @@ func (c *Client) runRead() { } pair := data.(trackIDPayloadPair) - now := time.Now() - if videoTrack != nil && pair.trackID == videoTrack.ID { - nts, err := h264Decoder.Decode(pair.buf) + nalus, _, err := h264Decoder.Decode(pair.buf) if err != nil { if err != rtph264.ErrMorePacketsNeeded { c.log(logger.Warn, "unable to decode video track: %v", err) @@ -306,46 +304,44 @@ func (c *Client) runRead() { continue } - for _, nt := range nts { + if !videoInitialized { + videoInitialized = true + videoStartDTS = time.Now() + } + + for _, nalu := range nalus { // remove SPS, PPS and AUD, not needed by RTMP - typ := h264.NALUType(nt.NALU[0] & 0x1F) + typ := h264.NALUType(nalu[0] & 0x1F) switch typ { case h264.NALUTypeSPS, h264.NALUTypePPS, h264.NALUTypeAccessUnitDelimiter: continue } - if !videoInitialized { - videoInitialized = true - videoStartDTS = now - videoPTS = nt.Timestamp - } + videoBuf = append(videoBuf, nalu) + } - // aggregate NALUs by PTS. - // for instance, aggregate a SEI and a IDR. - // this delays the stream by one frame, but is required by RTMP. - if nt.Timestamp != videoPTS { - c.conn.NetConn().SetWriteDeadline(time.Now().Add(c.writeTimeout)) - err := c.conn.WriteH264(videoBuf, now.Sub(videoStartDTS)) - if err != nil { - return err - } - videoBuf = nil + // RTP marker means that all the NALUs with the same PTS have been received. + // send them together. + marker := (pair.buf[1] >> 7 & 0x1) > 0 + if marker { + c.conn.NetConn().SetWriteDeadline(time.Now().Add(c.writeTimeout)) + err := c.conn.WriteH264(videoBuf, time.Since(videoStartDTS)) + if err != nil { + return err } - - videoPTS = nt.Timestamp - videoBuf = append(videoBuf, nt.NALU) + videoBuf = nil } } else if audioTrack != nil && pair.trackID == audioTrack.ID { - ats, err := aacDecoder.Decode(pair.buf) + aus, pts, err := aacDecoder.Decode(pair.buf) if err != nil { c.log(logger.Warn, "unable to decode audio track: %v", err) continue } - for _, at := range ats { + for i, au := range aus { c.conn.NetConn().SetWriteDeadline(time.Now().Add(c.writeTimeout)) - err := c.conn.WriteAAC(at.AU, at.Timestamp) + err := c.conn.WriteAAC(au, pts+time.Duration(i)*1000*time.Second/time.Duration(audioClockRate)) if err != nil { return err } @@ -520,8 +516,7 @@ func (c *Client) runPublish() { return err } - ts := pkt.Time + pkt.CTime - var nts []*rtph264.NALUAndTimestamp + var outNALUs [][]byte for _, nalu := range nalus { // remove SPS, PPS and AUD, not needed by RTSP / RTMP typ := h264.NALUType(nalu[0] & 0x1F) @@ -530,13 +525,10 @@ func (c *Client) runPublish() { continue } - nts = append(nts, &rtph264.NALUAndTimestamp{ - Timestamp: ts, - NALU: nalu, - }) + outNALUs = append(outNALUs, nalu) } - frames, err := h264Encoder.Encode(nts) + frames, err := h264Encoder.Encode(outNALUs, pkt.Time+pkt.CTime) if err != nil { return fmt.Errorf("ERR while encoding H264: %v", err) } @@ -550,12 +542,7 @@ func (c *Client) runPublish() { return fmt.Errorf("ERR: received an AAC frame, but track is not set up") } - frames, err := aacEncoder.Encode([]*rtpaac.AUAndTimestamp{ - { - Timestamp: pkt.Time + pkt.CTime, - AU: pkt.Data, - }, - }) + frames, err := aacEncoder.Encode([][]byte{pkt.Data}, pkt.Time+pkt.CTime) if err != nil { return fmt.Errorf("ERR while encoding AAC: %v", err) } diff --git a/internal/h264/nalutype.go b/internal/h264/nalutype.go index 120f727c..4430dbc9 100644 --- a/internal/h264/nalutype.go +++ b/internal/h264/nalutype.go @@ -14,7 +14,7 @@ const ( NALUTypeDataPartitionB NALUType = 3 NALUTypeDataPartitionC NALUType = 4 NALUTypeIDR NALUType = 5 - NALUTypeSei NALUType = 6 + NALUTypeSEI NALUType = 6 NALUTypeSPS NALUType = 7 NALUTypePPS NALUType = 8 NALUTypeAccessUnitDelimiter NALUType = 9 @@ -32,12 +32,12 @@ const ( NALUTypeSliceExtensionDepth NALUType = 21 NALUTypeReserved22 NALUType = 22 NALUTypeReserved23 NALUType = 23 - NALUTypeStapA NALUType = 24 - NALUTypeStapB NALUType = 25 - NALUTypeMtap16 NALUType = 26 - NALUTypeMtap24 NALUType = 27 - NALUTypeFuA NALUType = 28 - NALUTypeFuB NALUType = 29 + NALUTypeSTAPA NALUType = 24 + NALUTypeSTAPB NALUType = 25 + NALUTypeMTAP16 NALUType = 26 + NALUTypeMTAP24 NALUType = 27 + NALUTypeFUA NALUType = 28 + NALUTypeFUB NALUType = 29 ) // String implements fmt.Stringer. @@ -53,7 +53,7 @@ func (nt NALUType) String() string { return "DataPartitionC" case NALUTypeIDR: return "IDR" - case NALUTypeSei: + case NALUTypeSEI: return "Sei" case NALUTypeSPS: return "SPS" @@ -89,18 +89,18 @@ func (nt NALUType) String() string { return "Reserved22" case NALUTypeReserved23: return "Reserved23" - case NALUTypeStapA: - return "StapA" - case NALUTypeStapB: - return "StapB" - case NALUTypeMtap16: - return "Mtap16" - case NALUTypeMtap24: - return "Mtap24" - case NALUTypeFuA: - return "FuA" - case NALUTypeFuB: - return "FuB" + case NALUTypeSTAPA: + return "STAPA" + case NALUTypeSTAPB: + return "STAPB" + case NALUTypeMTAP16: + return "MTAP16" + case NALUTypeMTAP24: + return "MTAP24" + case NALUTypeFUA: + return "FUA" + case NALUTypeFUB: + return "FUB" } return fmt.Sprintf("unknown (%d)", nt) } diff --git a/internal/sourcertmp/source.go b/internal/sourcertmp/source.go index e650116c..e5035563 100644 --- a/internal/sourcertmp/source.go +++ b/internal/sourcertmp/source.go @@ -214,16 +214,19 @@ func (s *Source) runInner() bool { return err } - ts := pkt.Time + pkt.CTime - var nts []*rtph264.NALUAndTimestamp - for _, nt := range nalus { - nts = append(nts, &rtph264.NALUAndTimestamp{ - Timestamp: ts, - NALU: nt, - }) + var outNALUs [][]byte + for _, nalu := range nalus { + // remove SPS, PPS and AUD, not needed by RTSP / RTMP + typ := h264.NALUType(nalu[0] & 0x1F) + switch typ { + case h264.NALUTypeSPS, h264.NALUTypePPS, h264.NALUTypeAccessUnitDelimiter: + continue + } + + outNALUs = append(outNALUs, nalu) } - frames, err := h264Encoder.Encode(nts) + frames, err := h264Encoder.Encode(outNALUs, pkt.Time+pkt.CTime) if err != nil { return fmt.Errorf("ERR while encoding H264: %v", err) } @@ -237,12 +240,7 @@ func (s *Source) runInner() bool { return fmt.Errorf("ERR: received an AAC frame, but track is not set up") } - frames, err := aacEncoder.Encode([]*rtpaac.AUAndTimestamp{ - { - Timestamp: pkt.Time + pkt.CTime, - AU: pkt.Data, - }, - }) + frames, err := aacEncoder.Encode([][]byte{pkt.Data}, pkt.Time+pkt.CTime) if err != nil { return fmt.Errorf("ERR while encoding AAC: %v", err) }