From ef4b925209b4581f60aece8fab76cdc95beea30b Mon Sep 17 00:00:00 2001 From: aler9 <46489434+aler9@users.noreply.github.com> Date: Tue, 7 Sep 2021 12:02:44 +0200 Subject: [PATCH] hls: fix discontinuity of TS counters between segments --- internal/hls/muxer.go | 130 +++--------- ...yplaylist.go => muxer_primary_playlist.go} | 10 +- ...amplaylist.go => muxer_stream_playlist.go} | 20 +- internal/hls/muxer_ts_generator.go | 190 ++++++++++++++++++ .../hls/{segment.go => muxer_ts_segment.go} | 118 +++-------- 5 files changed, 253 insertions(+), 215 deletions(-) rename internal/hls/{primaryplaylist.go => muxer_primary_playlist.go} (81%) rename internal/hls/{streamplaylist.go => muxer_stream_playlist.go} (80%) create mode 100644 internal/hls/muxer_ts_generator.go rename internal/hls/{segment.go => muxer_ts_segment.go} (53%) diff --git a/internal/hls/muxer.go b/internal/hls/muxer.go index 13b9105f..9455afe0 100644 --- a/internal/hls/muxer.go +++ b/internal/hls/muxer.go @@ -5,8 +5,6 @@ import ( "time" "github.com/aler9/gortsplib" - - "github.com/aler9/rtsp-simple-server/internal/h264" ) const ( @@ -18,20 +16,9 @@ const ( // Muxer is a HLS muxer. type Muxer struct { - hlsSegmentCount int - hlsSegmentDuration time.Duration - videoTrack *gortsplib.Track - audioTrack *gortsplib.Track - - h264Conf *gortsplib.TrackConfigH264 - aacConf *gortsplib.TrackConfigAAC - videoDTSEst *h264.DTSEstimator - audioAUCount int - currentSegment *segment - startPCR time.Time - startPTS time.Duration - primaryPlaylist *primaryPlaylist - streamPlaylist *streamPlaylist + primaryPlaylist *muxerPrimaryPlaylist + streamPlaylist *muxerStreamPlaylist + tsGenerator *muxerTSGenerator } // NewMuxer allocates a Muxer. @@ -58,16 +45,23 @@ func NewMuxer( } } + primaryPlaylist := newMuxerPrimaryPlaylist(videoTrack, audioTrack, h264Conf) + + streamPlaylist := newMuxerStreamPlaylist(hlsSegmentCount) + + tsGenerator := newMuxerTSGenerator( + hlsSegmentCount, + hlsSegmentDuration, + videoTrack, + audioTrack, + h264Conf, + aacConf, + streamPlaylist) + m := &Muxer{ - hlsSegmentCount: hlsSegmentCount, - hlsSegmentDuration: hlsSegmentDuration, - videoTrack: videoTrack, - audioTrack: audioTrack, - h264Conf: h264Conf, - aacConf: aacConf, - currentSegment: newSegment(videoTrack, audioTrack, h264Conf, aacConf), - primaryPlaylist: newPrimaryPlaylist(videoTrack, audioTrack, h264Conf), - streamPlaylist: newStreamPlaylist(hlsSegmentCount), + primaryPlaylist: primaryPlaylist, + streamPlaylist: streamPlaylist, + tsGenerator: tsGenerator, } return m, nil @@ -80,93 +74,15 @@ func (m *Muxer) Close() { // WriteH264 writes H264 NALUs, grouped by PTS, into the muxer. func (m *Muxer) WriteH264(pts time.Duration, nalus [][]byte) error { - idrPresent := func() bool { - for _, nalu := range nalus { - typ := h264.NALUType(nalu[0] & 0x1F) - if typ == h264.NALUTypeIDR { - return true - } - } - return false - }() - - // skip group silently until we find one with a IDR - if !m.currentSegment.firstPacketWritten && !idrPresent { - return nil - } - - if m.currentSegment.firstPacketWritten { - if idrPresent && - m.currentSegment.duration() >= m.hlsSegmentDuration { - m.streamPlaylist.pushSegment(m.currentSegment) - - m.currentSegment = newSegment(m.videoTrack, m.audioTrack, m.h264Conf, m.aacConf) - m.currentSegment.setStartPCR(m.startPCR) - } - } else { - m.startPCR = time.Now() - m.startPTS = pts - m.currentSegment.setStartPCR(m.startPCR) - m.videoDTSEst = h264.NewDTSEstimator() - } - - pts -= m.startPTS - - err := m.currentSegment.writeH264( - m.videoDTSEst.Feed(pts)+pcrOffset, - pts+pcrOffset, - idrPresent, - nalus) - if err != nil { - return err - } - - return nil + return m.tsGenerator.writeH264(pts, nalus) } // WriteAAC writes AAC AUs, grouped by PTS, into the muxer. func (m *Muxer) WriteAAC(pts time.Duration, aus [][]byte) error { - if m.videoTrack == nil { - if m.currentSegment.firstPacketWritten { - if m.audioAUCount >= segmentMinAUCount && - m.currentSegment.duration() >= m.hlsSegmentDuration { - m.audioAUCount = 0 - - m.streamPlaylist.pushSegment(m.currentSegment) - - m.currentSegment = newSegment(m.videoTrack, m.audioTrack, m.h264Conf, m.aacConf) - m.currentSegment.setStartPCR(m.startPCR) - } - } else { - m.startPCR = time.Now() - m.startPTS = pts - m.currentSegment.setStartPCR(m.startPCR) - } - } else { - if !m.currentSegment.firstPacketWritten { - return nil - } - } - - pts = pts - m.startPTS + pcrOffset - - for _, au := range aus { - err := m.currentSegment.writeAAC(pts, au) - if err != nil { - return err - } - - if m.videoTrack == nil { - m.audioAUCount++ - } - - pts += 1000 * time.Second / time.Duration(m.aacConf.SampleRate) - } - - return nil + return m.tsGenerator.writeAAC(pts, aus) } -// PrimaryPlaylist returns a reader to read the primary playlist +// PrimaryPlaylist returns a reader to read the primary playlist. func (m *Muxer) PrimaryPlaylist() io.Reader { return m.primaryPlaylist.reader() } @@ -176,7 +92,7 @@ func (m *Muxer) StreamPlaylist() io.Reader { return m.streamPlaylist.reader() } -// Segment returns a reader to read a segment. +// Segment returns a reader to read a segment listed in the stream playlist. func (m *Muxer) Segment(fname string) io.Reader { return m.streamPlaylist.segment(fname) } diff --git a/internal/hls/primaryplaylist.go b/internal/hls/muxer_primary_playlist.go similarity index 81% rename from internal/hls/primaryplaylist.go rename to internal/hls/muxer_primary_playlist.go index 01316fa2..04f7f2e8 100644 --- a/internal/hls/primaryplaylist.go +++ b/internal/hls/muxer_primary_playlist.go @@ -9,7 +9,7 @@ import ( "github.com/aler9/gortsplib" ) -type primaryPlaylist struct { +type muxerPrimaryPlaylist struct { videoTrack *gortsplib.Track audioTrack *gortsplib.Track h264Conf *gortsplib.TrackConfigH264 @@ -17,12 +17,12 @@ type primaryPlaylist struct { cnt []byte } -func newPrimaryPlaylist( +func newMuxerPrimaryPlaylist( videoTrack *gortsplib.Track, audioTrack *gortsplib.Track, h264Conf *gortsplib.TrackConfigH264, -) *primaryPlaylist { - p := &primaryPlaylist{ +) *muxerPrimaryPlaylist { + p := &muxerPrimaryPlaylist{ videoTrack: videoTrack, audioTrack: audioTrack, h264Conf: h264Conf, @@ -45,6 +45,6 @@ func newPrimaryPlaylist( return p } -func (p *primaryPlaylist) reader() io.Reader { +func (p *muxerPrimaryPlaylist) reader() io.Reader { return bytes.NewReader(p.cnt) } diff --git a/internal/hls/streamplaylist.go b/internal/hls/muxer_stream_playlist.go similarity index 80% rename from internal/hls/streamplaylist.go rename to internal/hls/muxer_stream_playlist.go index 9fc3a9c3..17b2cce4 100644 --- a/internal/hls/streamplaylist.go +++ b/internal/hls/muxer_stream_playlist.go @@ -21,27 +21,27 @@ func (r *asyncReader) Read(buf []byte) (int, error) { return r.reader.Read(buf) } -type streamPlaylist struct { +type muxerStreamPlaylist struct { hlsSegmentCount int mutex sync.Mutex cond *sync.Cond closed bool - segments []*segment - segmentByName map[string]*segment + segments []*muxerTSSegment + segmentByName map[string]*muxerTSSegment segmentDeleteCount int } -func newStreamPlaylist(hlsSegmentCount int) *streamPlaylist { - p := &streamPlaylist{ +func newMuxerStreamPlaylist(hlsSegmentCount int) *muxerStreamPlaylist { + p := &muxerStreamPlaylist{ hlsSegmentCount: hlsSegmentCount, - segmentByName: make(map[string]*segment), + segmentByName: make(map[string]*muxerTSSegment), } p.cond = sync.NewCond(&p.mutex) return p } -func (p *streamPlaylist) close() { +func (p *muxerStreamPlaylist) close() { func() { p.mutex.Lock() defer p.mutex.Unlock() @@ -51,7 +51,7 @@ func (p *streamPlaylist) close() { p.cond.Broadcast() } -func (p *streamPlaylist) reader() io.Reader { +func (p *muxerStreamPlaylist) reader() io.Reader { return &asyncReader{generator: func() []byte { p.mutex.Lock() defer p.mutex.Unlock() @@ -94,7 +94,7 @@ func (p *streamPlaylist) reader() io.Reader { }} } -func (p *streamPlaylist) segment(fname string) io.Reader { +func (p *muxerStreamPlaylist) segment(fname string) io.Reader { base := strings.TrimSuffix(fname, ".ts") p.mutex.Lock() @@ -108,7 +108,7 @@ func (p *streamPlaylist) segment(fname string) io.Reader { return f.reader() } -func (p *streamPlaylist) pushSegment(t *segment) { +func (p *muxerStreamPlaylist) pushSegment(t *muxerTSSegment) { func() { p.mutex.Lock() defer p.mutex.Unlock() diff --git a/internal/hls/muxer_ts_generator.go b/internal/hls/muxer_ts_generator.go new file mode 100644 index 00000000..7c0f7f72 --- /dev/null +++ b/internal/hls/muxer_ts_generator.go @@ -0,0 +1,190 @@ +package hls + +import ( + "context" + "time" + + "github.com/aler9/gortsplib" + "github.com/asticode/go-astits" + + "github.com/aler9/rtsp-simple-server/internal/aac" + "github.com/aler9/rtsp-simple-server/internal/h264" +) + +type muxerTSGenerator struct { + hlsSegmentCount int + hlsSegmentDuration time.Duration + videoTrack *gortsplib.Track + audioTrack *gortsplib.Track + h264Conf *gortsplib.TrackConfigH264 + aacConf *gortsplib.TrackConfigAAC + streamPlaylist *muxerStreamPlaylist + + tm *astits.Muxer + currentSegment *muxerTSSegment + videoDTSEst *h264.DTSEstimator + audioAUCount int + startPCR time.Time + startPTS time.Duration +} + +func newMuxerTSGenerator( + hlsSegmentCount int, + hlsSegmentDuration time.Duration, + videoTrack *gortsplib.Track, + audioTrack *gortsplib.Track, + h264Conf *gortsplib.TrackConfigH264, + aacConf *gortsplib.TrackConfigAAC, + streamPlaylist *muxerStreamPlaylist, +) *muxerTSGenerator { + m := &muxerTSGenerator{ + hlsSegmentCount: hlsSegmentCount, + hlsSegmentDuration: hlsSegmentDuration, + videoTrack: videoTrack, + audioTrack: audioTrack, + streamPlaylist: streamPlaylist, + h264Conf: h264Conf, + aacConf: aacConf, + } + + m.tm = astits.NewMuxer(context.Background(), m) + + if videoTrack != nil { + m.tm.AddElementaryStream(astits.PMTElementaryStream{ + ElementaryPID: 256, + StreamType: astits.StreamTypeH264Video, + }) + } + + if audioTrack != nil { + m.tm.AddElementaryStream(astits.PMTElementaryStream{ + ElementaryPID: 257, + StreamType: astits.StreamTypeAACAudio, + }) + } + + if videoTrack != nil { + m.tm.SetPCRPID(256) + } else { + m.tm.SetPCRPID(257) + } + + m.currentSegment = newMuxerTSSegment(m.videoTrack, m) + + return m +} + +func (m *muxerTSGenerator) Write(p []byte) (int, error) { + return m.currentSegment.write(p) +} + +func (m *muxerTSGenerator) writeH264(pts time.Duration, nalus [][]byte) error { + idrPresent := func() bool { + for _, nalu := range nalus { + typ := h264.NALUType(nalu[0] & 0x1F) + if typ == h264.NALUTypeIDR { + return true + } + } + return false + }() + + // skip group silently until we find one with a IDR + if !m.currentSegment.firstPacketWritten && !idrPresent { + return nil + } + + // switch segment or initialize the first segment + if m.currentSegment.firstPacketWritten { + if idrPresent && + m.currentSegment.duration() >= m.hlsSegmentDuration { + m.streamPlaylist.pushSegment(m.currentSegment) + m.currentSegment = newMuxerTSSegment(m.videoTrack, m) + } + } else { + m.startPCR = time.Now() + m.startPTS = pts + m.videoDTSEst = h264.NewDTSEstimator() + } + + dts := m.videoDTSEst.Feed(pts-m.startPTS) + pcrOffset + pts = pts - m.startPTS + pcrOffset + + filteredNALUs := [][]byte{ + // prepend an AUD. This is required by video.js and iOS + {byte(h264.NALUTypeAccessUnitDelimiter), 240}, + } + + for _, nalu := range nalus { + // remove existing SPS, PPS, AUD + typ := h264.NALUType(nalu[0] & 0x1F) + switch typ { + case h264.NALUTypeSPS, h264.NALUTypePPS, h264.NALUTypeAccessUnitDelimiter: + continue + } + + // add SPS and PPS before IDR + if typ == h264.NALUTypeIDR { + filteredNALUs = append(filteredNALUs, m.h264Conf.SPS) + filteredNALUs = append(filteredNALUs, m.h264Conf.PPS) + } + + filteredNALUs = append(filteredNALUs, nalu) + } + + enc, err := h264.EncodeAnnexB(filteredNALUs) + if err != nil { + return err + } + + return m.currentSegment.writeH264(m.startPCR, dts, pts, idrPresent, enc) +} + +func (m *muxerTSGenerator) writeAAC(pts time.Duration, aus [][]byte) error { + // switch segment or initialize the first segment + if m.videoTrack == nil { + if m.currentSegment.firstPacketWritten { + if m.audioAUCount >= segmentMinAUCount && + m.currentSegment.duration() >= m.hlsSegmentDuration { + m.audioAUCount = 0 + m.streamPlaylist.pushSegment(m.currentSegment) + m.currentSegment = newMuxerTSSegment(m.videoTrack, m) + } + } else { + m.startPCR = time.Now() + m.startPTS = pts + } + } else { + if !m.currentSegment.firstPacketWritten { + return nil + } + } + + pts = pts - m.startPTS + pcrOffset + + for _, au := range aus { + enc, err := aac.EncodeADTS([]*aac.ADTSPacket{ + { + SampleRate: m.aacConf.SampleRate, + ChannelCount: m.aacConf.ChannelCount, + AU: au, + }, + }) + if err != nil { + return err + } + + err = m.currentSegment.writeAAC(m.startPCR, pts, enc) + if err != nil { + return err + } + + if m.videoTrack == nil { + m.audioAUCount++ + } + + pts += 1000 * time.Second / time.Duration(m.aacConf.SampleRate) + } + + return nil +} diff --git a/internal/hls/segment.go b/internal/hls/muxer_ts_segment.go similarity index 53% rename from internal/hls/segment.go rename to internal/hls/muxer_ts_segment.go index 22a87e4b..44d56563 100644 --- a/internal/hls/segment.go +++ b/internal/hls/muxer_ts_segment.go @@ -2,68 +2,36 @@ package hls import ( "bytes" - "context" "io" "strconv" "time" "github.com/aler9/gortsplib" "github.com/asticode/go-astits" - - "github.com/aler9/rtsp-simple-server/internal/aac" - "github.com/aler9/rtsp-simple-server/internal/h264" ) -type segment struct { +type muxerTSSegment struct { videoTrack *gortsplib.Track - h264Conf *gortsplib.TrackConfigH264 - aacConf *gortsplib.TrackConfigAAC + tsgen *muxerTSGenerator name string buf bytes.Buffer - mux *astits.Muxer firstPacketWritten bool minPTS time.Duration maxPTS time.Duration - startPCR time.Time pcrSendCounter int } -func newSegment( +func newMuxerTSSegment( videoTrack *gortsplib.Track, - audioTrack *gortsplib.Track, - h264Conf *gortsplib.TrackConfigH264, - aacConf *gortsplib.TrackConfigAAC, -) *segment { - t := &segment{ + tsgen *muxerTSGenerator, +) *muxerTSSegment { + t := &muxerTSSegment{ videoTrack: videoTrack, - h264Conf: h264Conf, - aacConf: aacConf, + tsgen: tsgen, name: strconv.FormatInt(time.Now().Unix(), 10), } - t.mux = astits.NewMuxer(context.Background(), &t.buf) - - if videoTrack != nil { - t.mux.AddElementaryStream(astits.PMTElementaryStream{ - ElementaryPID: 256, - StreamType: astits.StreamTypeH264Video, - }) - } - - if audioTrack != nil { - t.mux.AddElementaryStream(astits.PMTElementaryStream{ - ElementaryPID: 257, - StreamType: astits.StreamTypeAACAudio, - }) - } - - if videoTrack != nil { - t.mux.SetPCRPID(256) - } else { - t.mux.SetPCRPID(257) - } - // WriteTable() is called automatically when WriteData() is called with // - PID == PCRPID // - AdaptationField != nil @@ -72,23 +40,24 @@ func newSegment( return t } -func (t *segment) duration() time.Duration { +func (t *muxerTSSegment) duration() time.Duration { return t.maxPTS - t.minPTS } -func (t *segment) setStartPCR(startPCR time.Time) { - t.startPCR = startPCR +func (t *muxerTSSegment) write(p []byte) (int, error) { + return t.buf.Write(p) } -func (t *segment) reader() io.Reader { +func (t *muxerTSSegment) reader() io.Reader { return bytes.NewReader(t.buf.Bytes()) } -func (t *segment) writeH264( +func (t *muxerTSSegment) writeH264( + startPCR time.Time, dts time.Duration, pts time.Duration, - isIDR bool, - nalus [][]byte) error { + idrPresent bool, + enc []byte) error { if !t.firstPacketWritten { t.firstPacketWritten = true t.minPTS = pts @@ -102,36 +71,9 @@ func (t *segment) writeH264( } } - filteredNALUs := [][]byte{ - // prepend an AUD. This is required by video.js and iOS - {byte(h264.NALUTypeAccessUnitDelimiter), 240}, - } - - for _, nalu := range nalus { - // remove existing SPS, PPS, AUD - typ := h264.NALUType(nalu[0] & 0x1F) - switch typ { - case h264.NALUTypeSPS, h264.NALUTypePPS, h264.NALUTypeAccessUnitDelimiter: - continue - } - - // add SPS and PPS before IDR - if typ == h264.NALUTypeIDR { - filteredNALUs = append(filteredNALUs, t.h264Conf.SPS) - filteredNALUs = append(filteredNALUs, t.h264Conf.PPS) - } - - filteredNALUs = append(filteredNALUs, nalu) - } - - enc, err := h264.EncodeAnnexB(filteredNALUs) - if err != nil { - return err - } - var af *astits.PacketAdaptationField - if isIDR { + if idrPresent { if af == nil { af = &astits.PacketAdaptationField{} } @@ -144,7 +86,7 @@ func (t *segment) writeH264( af = &astits.PacketAdaptationField{} } af.HasPCR = true - af.PCR = &astits.ClockReference{Base: int64(time.Since(t.startPCR).Seconds() * 90000)} + af.PCR = &astits.ClockReference{Base: int64(time.Since(startPCR).Seconds() * 90000)} t.pcrSendCounter = 3 } t.pcrSendCounter-- @@ -162,7 +104,7 @@ func (t *segment) writeH264( oh.PTS = &astits.ClockReference{Base: int64(pts.Seconds() * 90000)} } - _, err = t.mux.WriteData(&astits.MuxerData{ + _, err := t.tsgen.tm.WriteData(&astits.MuxerData{ PID: 256, AdaptationField: af, PES: &astits.PESData{ @@ -176,9 +118,10 @@ func (t *segment) writeH264( return err } -func (t *segment) writeAAC( +func (t *muxerTSSegment) writeAAC( + startPCR time.Time, pts time.Duration, - au []byte) error { + enc []byte) error { if t.videoTrack == nil { if !t.firstPacketWritten { t.firstPacketWritten = true @@ -194,17 +137,6 @@ func (t *segment) writeAAC( } } - adtsPkt, err := aac.EncodeADTS([]*aac.ADTSPacket{ - { - SampleRate: t.aacConf.SampleRate, - ChannelCount: t.aacConf.ChannelCount, - AU: au, - }, - }) - if err != nil { - return err - } - af := &astits.PacketAdaptationField{ RandomAccessIndicator: true, } @@ -214,12 +146,12 @@ func (t *segment) writeAAC( // send PCR once in a while if t.pcrSendCounter == 0 { af.HasPCR = true - af.PCR = &astits.ClockReference{Base: int64(time.Since(t.startPCR).Seconds() * 90000)} + af.PCR = &astits.ClockReference{Base: int64(time.Since(startPCR).Seconds() * 90000)} t.pcrSendCounter = 3 } } - _, err = t.mux.WriteData(&astits.MuxerData{ + _, err := t.tsgen.tm.WriteData(&astits.MuxerData{ PID: 257, AdaptationField: af, PES: &astits.PESData{ @@ -229,10 +161,10 @@ func (t *segment) writeAAC( PTSDTSIndicator: astits.PTSDTSIndicatorOnlyPTS, PTS: &astits.ClockReference{Base: int64(pts.Seconds() * 90000)}, }, - PacketLength: uint16(len(adtsPkt) + 8), + PacketLength: uint16(len(enc) + 8), StreamID: 192, // = audio }, - Data: adtsPkt, + Data: enc, }, }) return err