diff --git a/internal/clientrtmp/client.go b/internal/clientrtmp/client.go index 38d9223e..6123263d 100644 --- a/internal/clientrtmp/client.go +++ b/internal/clientrtmp/client.go @@ -56,6 +56,11 @@ func pathNameAndQuery(inURL *url.URL) (string, url.Values) { return pathName, ur.Query() } +type trackIDBufPair struct { + trackID int + buf []byte +} + // Parent is implemented by clientman.ClientMan. type Parent interface { Log(logger.Level, string, ...interface{}) @@ -313,22 +318,61 @@ func (c *Client) runRead() { return fmt.Errorf("terminated") } + pair := data.(trackIDBufPair) + now := time.Now() - switch tdata := data.(type) { - case *rtph264.NALUAndTimestamp: - if !videoInitialized { - videoInitialized = true - videoStartDTS = now - videoPTS = tdata.Timestamp + if c.videoTrack != nil && pair.trackID == c.videoTrack.ID { + nts, err := c.h264Decoder.Decode(pair.buf) + if err != nil { + if err != rtph264.ErrMorePacketsNeeded { + c.log(logger.Debug, "ERR while decoding video track: %v", err) + } + continue } - // aggregate NALUs by PTS - if tdata.Timestamp != videoPTS { + for _, nt := range nts { + if !videoInitialized { + videoInitialized = true + videoStartDTS = now + videoPTS = nt.Timestamp + } + + // aggregate NALUs by PTS + if nt.Timestamp != videoPTS { + pkt := av.Packet{ + Type: av.H264, + Data: h264.FillNALUsAVCC(videoBuf), + Time: now.Sub(videoStartDTS), + } + + c.conn.NetConn().SetWriteDeadline(time.Now().Add(c.writeTimeout)) + err := c.conn.WritePacket(pkt) + if err != nil { + return err + } + + videoBuf = nil + } + + videoPTS = nt.Timestamp + videoBuf = append(videoBuf, nt.NALU) + } + continue + } + + if c.audioTrack != nil && pair.trackID == c.audioTrack.ID { + ats, err := c.aacDecoder.Decode(pair.buf) + if err != nil { + c.log(logger.Debug, "ERR while decoding audio track: %v", err) + continue + } + + for _, at := range ats { pkt := av.Packet{ - Type: av.H264, - Data: h264.FillNALUsAVCC(videoBuf), - Time: now.Sub(videoStartDTS), + Type: av.AAC, + Data: at.AU, + Time: at.Timestamp, } c.conn.NetConn().SetWriteDeadline(time.Now().Add(c.writeTimeout)) @@ -336,25 +380,8 @@ func (c *Client) runRead() { if err != nil { return err } - - videoBuf = nil - } - - videoPTS = tdata.Timestamp - videoBuf = append(videoBuf, tdata.NALU) - - case *rtpaac.AUAndTimestamp: - pkt := av.Packet{ - Type: av.AAC, - Data: tdata.AU, - Time: tdata.Timestamp, - } - - c.conn.NetConn().SetWriteDeadline(time.Now().Add(c.writeTimeout)) - err := c.conn.WritePacket(pkt) - if err != nil { - return err } + continue } } }() @@ -616,36 +643,6 @@ func (c *Client) Authenticate(authMethods []headers.AuthMethod, // OnIncomingFrame implements path.Reader. func (c *Client) OnIncomingFrame(trackID int, streamType gortsplib.StreamType, buf []byte) { if streamType == gortsplib.StreamTypeRTP { - if c.videoTrack != nil { - if trackID == c.videoTrack.ID { - nts, err := c.h264Decoder.Decode(buf) - if err != nil { - if err != rtph264.ErrMorePacketsNeeded { - c.log(logger.Debug, "ERR while decoding video track: %v", err) - } - return - } - - for _, nt := range nts { - c.ringBuffer.Push(nt) - } - return - } - } - - if c.audioTrack != nil { - if trackID == c.audioTrack.ID { - ats, err := c.aacDecoder.Decode(buf) - if err != nil { - c.log(logger.Debug, "ERR while decoding audio track: %v", err) - return - } - - for _, at := range ats { - c.ringBuffer.Push(at) - } - return - } - } + c.ringBuffer.Push(trackIDBufPair{trackID, buf}) } }