From 5ad2ea89245d88506267f6b58f8a64717ab24397 Mon Sep 17 00:00:00 2001 From: aler9 <46489434+aler9@users.noreply.github.com> Date: Sat, 27 Aug 2022 12:41:30 +0200 Subject: [PATCH] hls: move mpegts writer into dedicated folder --- internal/hls/mpegts/writer.go | 200 ++++++++++++++++++ internal/hls/muxer_variant_fmp4_part.go | 6 +- internal/hls/muxer_variant_fmp4_segment.go | 6 +- internal/hls/muxer_variant_mpegts_segment.go | 148 +++---------- .../hls/muxer_variant_mpegts_segmenter.go | 52 ++--- 5 files changed, 246 insertions(+), 166 deletions(-) create mode 100644 internal/hls/mpegts/writer.go diff --git a/internal/hls/mpegts/writer.go b/internal/hls/mpegts/writer.go new file mode 100644 index 00000000..c02fc95c --- /dev/null +++ b/internal/hls/mpegts/writer.go @@ -0,0 +1,200 @@ +package mpegts + +import ( + "bytes" + "context" + "time" + + "github.com/aler9/gortsplib" + "github.com/aler9/gortsplib/pkg/h264" + "github.com/aler9/gortsplib/pkg/mpeg4audio" + "github.com/asticode/go-astits" +) + +const ( + pcrOffset = 400 * time.Millisecond // 2 samples @ 5fps +) + +type writerFunc func(p []byte) (int, error) + +func (f writerFunc) Write(p []byte) (int, error) { + return f(p) +} + +// Writer is a MPEG-TS writer. +type Writer struct { + videoTrack *gortsplib.TrackH264 + audioTrack *gortsplib.TrackMPEG4Audio + + buf *bytes.Buffer + inner *astits.Muxer + pcrCounter int +} + +// NewWriter allocates a Writer. +func NewWriter( + videoTrack *gortsplib.TrackH264, + audioTrack *gortsplib.TrackMPEG4Audio, +) *Writer { + w := &Writer{ + videoTrack: videoTrack, + audioTrack: audioTrack, + buf: bytes.NewBuffer(nil), + } + + w.inner = astits.NewMuxer( + context.Background(), + writerFunc(func(p []byte) (int, error) { + return w.buf.Write(p) + })) + + if videoTrack != nil { + w.inner.AddElementaryStream(astits.PMTElementaryStream{ + ElementaryPID: 256, + StreamType: astits.StreamTypeH264Video, + }) + } + + if audioTrack != nil { + w.inner.AddElementaryStream(astits.PMTElementaryStream{ + ElementaryPID: 257, + StreamType: astits.StreamTypeAACAudio, + }) + } + + if videoTrack != nil { + w.inner.SetPCRPID(256) + } else { + w.inner.SetPCRPID(257) + } + + // WriteTable() is not necessary + // since it's called automatically when WriteData() is called with + // * PID == PCRPID + // * AdaptationField != nil + // * RandomAccessIndicator = true + + return w +} + +// GenerateSegment generates a MPEG-TS segment. +func (w *Writer) GenerateSegment() []byte { + w.pcrCounter = 0 + ret := w.buf.Bytes() + w.buf = bytes.NewBuffer(nil) + return ret +} + +// WriteH264 writes a group of H264 NALUs. +func (w *Writer) WriteH264( + pcr time.Duration, + dts time.Duration, + pts time.Duration, + idrPresent bool, + nalus [][]byte, +) error { + // prepend an AUD. This is required by video.js and iOS + nalus = append([][]byte{{byte(h264.NALUTypeAccessUnitDelimiter), 240}}, nalus...) + + enc, err := h264.AnnexBMarshal(nalus) + if err != nil { + return err + } + + var af *astits.PacketAdaptationField + + if idrPresent { + af = &astits.PacketAdaptationField{} + af.RandomAccessIndicator = true + } + + // send PCR once in a while + if w.pcrCounter == 0 { + if af == nil { + af = &astits.PacketAdaptationField{} + } + af.HasPCR = true + af.PCR = &astits.ClockReference{Base: int64(pcr.Seconds() * 90000)} + w.pcrCounter = 3 + } + w.pcrCounter-- + + oh := &astits.PESOptionalHeader{ + MarkerBits: 2, + } + + if dts == pts { + oh.PTSDTSIndicator = astits.PTSDTSIndicatorOnlyPTS + oh.PTS = &astits.ClockReference{Base: int64((pts + pcrOffset).Seconds() * 90000)} + } else { + oh.PTSDTSIndicator = astits.PTSDTSIndicatorBothPresent + oh.DTS = &astits.ClockReference{Base: int64((dts + pcrOffset).Seconds() * 90000)} + oh.PTS = &astits.ClockReference{Base: int64((pts + pcrOffset).Seconds() * 90000)} + } + + _, err = w.inner.WriteData(&astits.MuxerData{ + PID: 256, + AdaptationField: af, + PES: &astits.PESData{ + Header: &astits.PESHeader{ + OptionalHeader: oh, + StreamID: 224, // video + }, + Data: enc, + }, + }) + return err +} + +// WriteAAC writes an AAC AU. +func (w *Writer) WriteAAC( + pcr time.Duration, + pts time.Duration, + au []byte, +) error { + pkts := mpeg4audio.ADTSPackets{ + { + Type: w.audioTrack.Config.Type, + SampleRate: w.audioTrack.Config.SampleRate, + ChannelCount: w.audioTrack.Config.ChannelCount, + AU: au, + }, + } + + enc, err := pkts.Marshal() + if err != nil { + return err + } + + af := &astits.PacketAdaptationField{ + RandomAccessIndicator: true, + } + + if w.videoTrack == nil { + // send PCR once in a while + if w.pcrCounter == 0 { + af.HasPCR = true + af.PCR = &astits.ClockReference{Base: int64(pcr.Seconds() * 90000)} + w.pcrCounter = 3 + } + w.pcrCounter-- + } + + _, err = w.inner.WriteData(&astits.MuxerData{ + PID: 257, + AdaptationField: af, + PES: &astits.PESData{ + Header: &astits.PESHeader{ + OptionalHeader: &astits.PESOptionalHeader{ + MarkerBits: 2, + PTSDTSIndicator: astits.PTSDTSIndicatorOnlyPTS, + PTS: &astits.ClockReference{Base: int64((pts + pcrOffset).Seconds() * 90000)}, + }, + PacketLength: uint16(len(enc) + 8), + StreamID: 192, // audio + }, + Data: enc, + }, + }) + return err +} diff --git a/internal/hls/muxer_variant_fmp4_part.go b/internal/hls/muxer_variant_fmp4_part.go index 63b402f8..01b8525e 100644 --- a/internal/hls/muxer_variant_fmp4_part.go +++ b/internal/hls/muxer_variant_fmp4_part.go @@ -24,7 +24,7 @@ type muxerVariantFMP4Part struct { isIndependent bool videoSamples []*fmp4.VideoSample audioSamples []*fmp4.AudioSample - renderedContent []byte + content []byte renderedDuration time.Duration } @@ -51,7 +51,7 @@ func (p *muxerVariantFMP4Part) name() string { } func (p *muxerVariantFMP4Part) reader() io.Reader { - return bytes.NewReader(p.renderedContent) + return bytes.NewReader(p.content) } func (p *muxerVariantFMP4Part) duration() time.Duration { @@ -73,7 +73,7 @@ func (p *muxerVariantFMP4Part) duration() time.Duration { func (p *muxerVariantFMP4Part) finalize() error { if len(p.videoSamples) > 0 || len(p.audioSamples) > 0 { var err error - p.renderedContent, err = fmp4.GeneratePart( + p.content, err = fmp4.GeneratePart( p.videoTrack, p.audioTrack, p.videoSamples, diff --git a/internal/hls/muxer_variant_fmp4_segment.go b/internal/hls/muxer_variant_fmp4_segment.go index ebfa8569..687af22a 100644 --- a/internal/hls/muxer_variant_fmp4_segment.go +++ b/internal/hls/muxer_variant_fmp4_segment.go @@ -26,11 +26,11 @@ func (mbr *partsReader) Read(p []byte) (int, error) { return n, io.EOF } - copied := copy(p[n:], mbr.parts[mbr.curPart].renderedContent[mbr.curPos:]) + copied := copy(p[n:], mbr.parts[mbr.curPart].content[mbr.curPos:]) mbr.curPos += copied n += copied - if mbr.curPos == len(mbr.parts[mbr.curPart].renderedContent) { + if mbr.curPos == len(mbr.parts[mbr.curPart].content) { mbr.curPart++ mbr.curPos = 0 } @@ -111,7 +111,7 @@ func (s *muxerVariantFMP4Segment) finalize( return err } - if s.currentPart.renderedContent != nil { + if s.currentPart.content != nil { s.onPartFinalized(s.currentPart) s.parts = append(s.parts, s.currentPart) } diff --git a/internal/hls/muxer_variant_mpegts_segment.go b/internal/hls/muxer_variant_mpegts_segment.go index 16215655..6aacdf19 100644 --- a/internal/hls/muxer_variant_mpegts_segment.go +++ b/internal/hls/muxer_variant_mpegts_segment.go @@ -8,28 +8,23 @@ import ( "time" "github.com/aler9/gortsplib" - "github.com/aler9/gortsplib/pkg/h264" - "github.com/aler9/gortsplib/pkg/mpeg4audio" - "github.com/asticode/go-astits" -) -const ( - mpegtsPCROffset = 400 * time.Millisecond // 2 samples @ 5fps + "github.com/aler9/rtsp-simple-server/internal/hls/mpegts" ) type muxerVariantMPEGTSSegment struct { segmentMaxSize uint64 videoTrack *gortsplib.TrackH264 audioTrack *gortsplib.TrackMPEG4Audio - writeData func(*astits.MuxerData) (int, error) + writer *mpegts.Writer - startTime time.Time - name string - buf bytes.Buffer - startDTS *time.Duration - endDTS time.Duration - pcrSendCounter int - audioAUCount int + size uint64 + startTime time.Time + name string + startDTS *time.Duration + endDTS time.Duration + audioAUCount int + content []byte } func newMuxerVariantMPEGTSSegment( @@ -37,22 +32,17 @@ func newMuxerVariantMPEGTSSegment( segmentMaxSize uint64, videoTrack *gortsplib.TrackH264, audioTrack *gortsplib.TrackMPEG4Audio, - writeData func(*astits.MuxerData) (int, error), + writer *mpegts.Writer, ) *muxerVariantMPEGTSSegment { t := &muxerVariantMPEGTSSegment{ segmentMaxSize: segmentMaxSize, videoTrack: videoTrack, audioTrack: audioTrack, - writeData: writeData, + writer: writer, startTime: startTime, name: strconv.FormatInt(startTime.Unix(), 10), } - // WriteTable() is called automatically when WriteData() is called with - // - PID == PCRPID - // - AdaptationField != nil - // - RandomAccessIndicator = true - return t } @@ -60,16 +50,13 @@ func (t *muxerVariantMPEGTSSegment) duration() time.Duration { return t.endDTS - *t.startDTS } -func (t *muxerVariantMPEGTSSegment) write(p []byte) (int, error) { - if uint64(len(p)+t.buf.Len()) > t.segmentMaxSize { - return 0, fmt.Errorf("reached maximum segment size") - } - - return t.buf.Write(p) +func (t *muxerVariantMPEGTSSegment) reader() io.Reader { + return bytes.NewReader(t.content) } -func (t *muxerVariantMPEGTSSegment) reader() io.Reader { - return bytes.NewReader(t.buf.Bytes()) +func (t *muxerVariantMPEGTSSegment) finalize(endDTS time.Duration) { + t.endDTS = endDTS + t.content = t.writer.GenerateSegment() } func (t *muxerVariantMPEGTSSegment) writeH264( @@ -79,56 +66,16 @@ func (t *muxerVariantMPEGTSSegment) writeH264( idrPresent bool, nalus [][]byte, ) error { - // prepend an AUD. This is required by video.js and iOS - nalus = append([][]byte{{byte(h264.NALUTypeAccessUnitDelimiter), 240}}, nalus...) - - enc, err := h264.AnnexBMarshal(nalus) - if err != nil { - return err + size := uint64(0) + for _, nalu := range nalus { + size += uint64(len(nalu)) } - - var af *astits.PacketAdaptationField - - if idrPresent { - af = &astits.PacketAdaptationField{} - af.RandomAccessIndicator = true + if (t.size + size) > t.segmentMaxSize { + return fmt.Errorf("reached maximum segment size") } + t.size += size - // send PCR once in a while - if t.pcrSendCounter == 0 { - if af == nil { - af = &astits.PacketAdaptationField{} - } - af.HasPCR = true - af.PCR = &astits.ClockReference{Base: int64(pcr.Seconds() * 90000)} - t.pcrSendCounter = 3 - } - t.pcrSendCounter-- - - oh := &astits.PESOptionalHeader{ - MarkerBits: 2, - } - - if dts == pts { - oh.PTSDTSIndicator = astits.PTSDTSIndicatorOnlyPTS - oh.PTS = &astits.ClockReference{Base: int64((pts + mpegtsPCROffset).Seconds() * 90000)} - } else { - oh.PTSDTSIndicator = astits.PTSDTSIndicatorBothPresent - oh.DTS = &astits.ClockReference{Base: int64((dts + mpegtsPCROffset).Seconds() * 90000)} - oh.PTS = &astits.ClockReference{Base: int64((pts + mpegtsPCROffset).Seconds() * 90000)} - } - - _, err = t.writeData(&astits.MuxerData{ - PID: 256, - AdaptationField: af, - PES: &astits.PESData{ - Header: &astits.PESHeader{ - OptionalHeader: oh, - StreamID: 224, // video - }, - Data: enc, - }, - }) + err := t.writer.WriteH264(pcr, dts, pts, idrPresent, nalus) if err != nil { return err } @@ -136,7 +83,6 @@ func (t *muxerVariantMPEGTSSegment) writeH264( if t.startDTS == nil { t.startDTS = &dts } - t.endDTS = dts return nil @@ -147,50 +93,13 @@ func (t *muxerVariantMPEGTSSegment) writeAAC( pts time.Duration, au []byte, ) error { - pkts := mpeg4audio.ADTSPackets{ - { - Type: t.audioTrack.Config.Type, - SampleRate: t.audioTrack.Config.SampleRate, - ChannelCount: t.audioTrack.Config.ChannelCount, - AU: au, - }, + size := uint64(len(au)) + if (t.size + size) > t.segmentMaxSize { + return fmt.Errorf("reached maximum segment size") } + t.size += size - enc, err := pkts.Marshal() - if err != nil { - return err - } - - af := &astits.PacketAdaptationField{ - RandomAccessIndicator: true, - } - - if t.videoTrack == nil { - // send PCR once in a while - if t.pcrSendCounter == 0 { - af.HasPCR = true - af.PCR = &astits.ClockReference{Base: int64(pcr.Seconds() * 90000)} - t.pcrSendCounter = 3 - } - t.pcrSendCounter-- - } - - _, err = t.writeData(&astits.MuxerData{ - PID: 257, - AdaptationField: af, - PES: &astits.PESData{ - Header: &astits.PESHeader{ - OptionalHeader: &astits.PESOptionalHeader{ - MarkerBits: 2, - PTSDTSIndicator: astits.PTSDTSIndicatorOnlyPTS, - PTS: &astits.ClockReference{Base: int64((pts + mpegtsPCROffset).Seconds() * 90000)}, - }, - PacketLength: uint16(len(enc) + 8), - StreamID: 192, // audio - }, - Data: enc, - }, - }) + err := t.writer.WriteAAC(pcr, pts, au) if err != nil { return err } @@ -201,7 +110,6 @@ func (t *muxerVariantMPEGTSSegment) writeAAC( if t.startDTS == nil { t.startDTS = &pts } - t.endDTS = pts } diff --git a/internal/hls/muxer_variant_mpegts_segmenter.go b/internal/hls/muxer_variant_mpegts_segmenter.go index b97bfeb2..278eb576 100644 --- a/internal/hls/muxer_variant_mpegts_segmenter.go +++ b/internal/hls/muxer_variant_mpegts_segmenter.go @@ -1,24 +1,18 @@ package hls import ( - "context" "time" "github.com/aler9/gortsplib" "github.com/aler9/gortsplib/pkg/h264" - "github.com/asticode/go-astits" + + "github.com/aler9/rtsp-simple-server/internal/hls/mpegts" ) const ( mpegtsSegmentMinAUCount = 100 ) -type writerFunc func(p []byte) (int, error) - -func (f writerFunc) Write(p []byte) (int, error) { - return f(p) -} - type muxerVariantMPEGTSSegmenter struct { segmentDuration time.Duration segmentMaxSize uint64 @@ -26,7 +20,7 @@ type muxerVariantMPEGTSSegmenter struct { audioTrack *gortsplib.TrackMPEG4Audio onSegmentReady func(*muxerVariantMPEGTSSegment) - writer *astits.Muxer + writer *mpegts.Writer currentSegment *muxerVariantMPEGTSSegment videoDTSExtractor *h264.DTSExtractor startPCR time.Time @@ -48,31 +42,9 @@ func newMuxerVariantMPEGTSSegmenter( onSegmentReady: onSegmentReady, } - m.writer = astits.NewMuxer( - context.Background(), - writerFunc(func(p []byte) (int, error) { - return m.currentSegment.write(p) - })) - - if videoTrack != nil { - m.writer.AddElementaryStream(astits.PMTElementaryStream{ - ElementaryPID: 256, - StreamType: astits.StreamTypeH264Video, - }) - } - - if audioTrack != nil { - m.writer.AddElementaryStream(astits.PMTElementaryStream{ - ElementaryPID: 257, - StreamType: astits.StreamTypeAACAudio, - }) - } - - if videoTrack != nil { - m.writer.SetPCRPID(256) - } else { - m.writer.SetPCRPID(257) - } + m.writer = mpegts.NewWriter( + videoTrack, + audioTrack) return m } @@ -117,7 +89,7 @@ func (m *muxerVariantMPEGTSSegmenter) writeH264(pts time.Duration, nalus [][]byt // create first segment m.currentSegment = newMuxerVariantMPEGTSSegment(now, m.segmentMaxSize, - m.videoTrack, m.audioTrack, m.writer.WriteData) + m.videoTrack, m.audioTrack, m.writer) } else { if !idrPresent && !nonIDRPresent { return nil @@ -135,10 +107,10 @@ func (m *muxerVariantMPEGTSSegmenter) writeH264(pts time.Duration, nalus [][]byt // switch segment if idrPresent && (dts-*m.currentSegment.startDTS) >= m.segmentDuration { - m.currentSegment.endDTS = dts + m.currentSegment.finalize(dts) m.onSegmentReady(m.currentSegment) m.currentSegment = newMuxerVariantMPEGTSSegment(now, m.segmentMaxSize, - m.videoTrack, m.audioTrack, m.writer.WriteData) + m.videoTrack, m.audioTrack, m.writer) } } @@ -166,17 +138,17 @@ func (m *muxerVariantMPEGTSSegmenter) writeAAC(pts time.Duration, au []byte) err // create first segment m.currentSegment = newMuxerVariantMPEGTSSegment(now, m.segmentMaxSize, - m.videoTrack, m.audioTrack, m.writer.WriteData) + m.videoTrack, m.audioTrack, m.writer) } else { pts -= m.startDTS // switch segment if m.currentSegment.audioAUCount >= mpegtsSegmentMinAUCount && (pts-*m.currentSegment.startDTS) >= m.segmentDuration { - m.currentSegment.endDTS = pts + m.currentSegment.finalize(pts) m.onSegmentReady(m.currentSegment) m.currentSegment = newMuxerVariantMPEGTSSegment(now, m.segmentMaxSize, - m.videoTrack, m.audioTrack, m.writer.WriteData) + m.videoTrack, m.audioTrack, m.writer) } } } else {