From c3c643c602cc8e1abb8fdf01d3ea4038eef3ef75 Mon Sep 17 00:00:00 2001 From: aler9 <46489434+aler9@users.noreply.github.com> Date: Sat, 24 Jul 2021 18:31:54 +0200 Subject: [PATCH] hls: move muxer into dedicated object --- go.mod | 2 +- go.sum | 4 +- internal/core/hls_converter.go | 243 ++++++------------------------ internal/core/rtmp_conn.go | 24 ++- internal/hls/multiaccessbuffer.go | 2 +- internal/hls/muxer.go | 238 +++++++++++++++++++++++++++++ internal/hls/muxer_test.go | 54 +++++++ internal/hls/tsfile.go | 43 ++---- 8 files changed, 374 insertions(+), 236 deletions(-) create mode 100644 internal/hls/muxer.go create mode 100644 internal/hls/muxer_test.go diff --git a/go.mod b/go.mod index d11c6335..e790d042 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.16 require ( github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 // indirect github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d // indirect - github.com/aler9/gortsplib v0.0.0-20210626112538-649c63cf5b62 + github.com/aler9/gortsplib v0.0.0-20210724151831-dae5a1f04033 github.com/asticode/go-astits v1.9.0 github.com/davecgh/go-spew v1.1.1 // indirect github.com/fsnotify/fsnotify v1.4.9 diff --git a/go.sum b/go.sum index 02416957..563c07b9 100644 --- a/go.sum +++ b/go.sum @@ -2,8 +2,8 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 h1:JYp7IbQjafo github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d h1:UQZhZ2O0vMHr2cI+DC1Mbh0TJxzA3RcLoMsFw+aXw7E= github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho= -github.com/aler9/gortsplib v0.0.0-20210626112538-649c63cf5b62 h1:PPTqxgdDmDBQcDziEuLqS4VzmMTp5NSd7b3WZqQCtR4= -github.com/aler9/gortsplib v0.0.0-20210626112538-649c63cf5b62/go.mod h1:s5FsbPRxJhU/YedvUKAKHVY+lQEdYsiJpuN2CHb89cI= +github.com/aler9/gortsplib v0.0.0-20210724151831-dae5a1f04033 h1:Bf0hzdN1jUWsb5Mzezq1pd18EIBeKXxk5clIpHZJ1Lk= +github.com/aler9/gortsplib v0.0.0-20210724151831-dae5a1f04033/go.mod h1:s5FsbPRxJhU/YedvUKAKHVY+lQEdYsiJpuN2CHb89cI= github.com/aler9/rtmp v0.0.0-20210403095203-3be4a5535927 h1:95mXJ5fUCYpBRdSOnLAQAdJHHKxxxJrVCiaqDi965YQ= github.com/aler9/rtmp v0.0.0-20210403095203-3be4a5535927/go.mod h1:vzuE21rowz+lT1NGsWbreIvYulgBpCGnQyeTyFblUHc= github.com/asticode/go-astikit v0.20.0 h1:+7N+J4E4lWx2QOkRdOf6DafWJMv6O4RRfgClwQokrH8= diff --git a/internal/core/hls_converter.go b/internal/core/hls_converter.go index c472d91a..dfff47d2 100644 --- a/internal/core/hls_converter.go +++ b/internal/core/hls_converter.go @@ -5,10 +5,8 @@ import ( "context" "fmt" "io" - "math" "net" "net/http" - "strconv" "strings" "sync" "sync/atomic" @@ -26,12 +24,6 @@ import ( ) const ( - // an offset is needed to - // - avoid negative PTS values - // - avoid PTS < DTS during startup - hlsConverterPTSOffset = 2 * time.Second - - segmentMinAUCount = 100 closeCheckPeriod = 1 * time.Second closeAfterInactivity = 60 * time.Second ) @@ -116,15 +108,12 @@ type hlsConverter struct { pathMan hlsConverterPathMan parent hlsConverterParent - ctx context.Context - ctxCancel func() - path readPublisherPath - ringBuffer *ringbuffer.RingBuffer - tsQueue []*hls.TSFile - tsByName map[string]*hls.TSFile - tsDeleteCount int - tsMutex sync.RWMutex - lasthlsConverterRequestTime *int64 + ctx context.Context + ctxCancel func() + path readPublisherPath + ringBuffer *ringbuffer.RingBuffer + lastRequestTime *int64 + muxer *hls.Muxer // in request chan hlsConverterRequest @@ -153,12 +142,11 @@ func newHLSConverter( parent: parent, ctx: ctx, ctxCancel: ctxCancel, - lasthlsConverterRequestTime: func() *int64 { + lastRequestTime: func() *int64 { v := time.Now().Unix() return &v }(), - tsByName: make(map[string]*hls.TSFile), - request: make(chan hlsConverterRequest), + request: make(chan hlsConverterRequest), } c.log(logger.Info, "opened") @@ -294,14 +282,19 @@ func (c *hlsConverter) runInner(innerCtx context.Context) error { return fmt.Errorf("unable to find a video or audio track") } - curTSFile := hls.NewTSFile(videoTrack, audioTrack) - c.tsByName[curTSFile.Name()] = curTSFile - c.tsQueue = append(c.tsQueue, curTSFile) - - defer func() { - curTSFile.Close() - }() + var err error + c.muxer, err = hls.NewMuxer( + c.hlsSegmentCount, + c.hlsSegmentDuration, + videoTrack, + audioTrack, + ) + if err != nil { + return err + } + defer c.muxer.Close() + // start request handler only after muxer has been inizialized requestHandlerTerminate := make(chan struct{}) requestHandlerDone := make(chan struct{}) go c.runRequestHandler(requestHandlerTerminate, requestHandlerDone) @@ -322,11 +315,7 @@ func (c *hlsConverter) runInner(innerCtx context.Context) error { writerDone := make(chan error) go func() { writerDone <- func() error { - startPCR := time.Now() var videoBuf [][]byte - videoDTSEst := h264.NewDTSEstimator() - videoInitialized := false - audioAUCount := 0 for { data, ok := c.ringBuffer.Pull() @@ -343,21 +332,9 @@ func (c *hlsConverter) runInner(innerCtx context.Context) error { continue } - // skip packets that are part of frames sent before - // the initialization of the converter - if !videoInitialized { - typ := pkt.Payload[0] & 0x1F - start := pkt.Payload[1] >> 7 - if typ == 28 && start != 1 { // FU-A - continue - } - - videoInitialized = true - } - nalus, pts, err := h264Decoder.DecodeRTP(&pkt) if err != nil { - if err != rtph264.ErrMorePacketsNeeded { + if err != rtph264.ErrMorePacketsNeeded && err != rtph264.ErrNonStartingPacketAndNoPrevious { c.log(logger.Warn, "unable to decode video track: %v", err) } continue @@ -382,70 +359,24 @@ func (c *hlsConverter) runInner(innerCtx context.Context) error { // RTP marker means that all the NALUs with the same PTS have been received. // send them together. - marker := (pair.buf[1] >> 7 & 0x1) > 0 - if marker { - bufferHasIDR := func() bool { - for _, nalu := range videoBuf { - typ := h264.NALUType(nalu[0] & 0x1F) - if typ == h264.NALUTypeIDR { - return true - } - } - return false - }() - - // we received a marker packet but - // - no IDR has been stored yet in current file - // - there's no IDR in the buffer - // data cannot be parsed, clear buffer - if !bufferHasIDR && !curTSFile.FirstPacketWritten() { - videoBuf = nil - continue - } - - err := func() error { - c.tsMutex.Lock() - defer c.tsMutex.Unlock() - - if bufferHasIDR { - if curTSFile.FirstPacketWritten() && - curTSFile.Duration() >= c.hlsSegmentDuration { - if curTSFile != nil { - curTSFile.Close() - } - - curTSFile = hls.NewTSFile(videoTrack, audioTrack) - - c.tsByName[curTSFile.Name()] = curTSFile - c.tsQueue = append(c.tsQueue, curTSFile) - if len(c.tsQueue) > c.hlsSegmentCount { - delete(c.tsByName, c.tsQueue[0].Name()) - c.tsQueue = c.tsQueue[1:] - c.tsDeleteCount++ - } - } - } - - curTSFile.SetPCR(time.Since(startPCR)) - err := curTSFile.WriteH264( - videoDTSEst.Feed(pts+hlsConverterPTSOffset), - pts+hlsConverterPTSOffset, - bufferHasIDR, - videoBuf) - if err != nil { - return err - } - - videoBuf = nil - return nil - }() + if pkt.Marker { + err := c.muxer.WriteH264(pts, videoBuf) if err != nil { return err } + + videoBuf = nil } } else if audioTrack != nil && pair.trackID == audioTrackID { - aus, pts, err := aacDecoder.Decode(pair.buf) + var pkt rtp.Packet + err := pkt.Unmarshal(pair.buf) + if err != nil { + c.log(logger.Warn, "unable to decode RTP packet: %v", err) + continue + } + + aus, pts, err := aacDecoder.DecodeRTP(&pkt) if err != nil { if err != rtpaac.ErrMorePacketsNeeded { c.log(logger.Warn, "unable to decode audio track: %v", err) @@ -453,52 +384,7 @@ func (c *hlsConverter) runInner(innerCtx context.Context) error { continue } - err = func() error { - c.tsMutex.Lock() - defer c.tsMutex.Unlock() - - if videoTrack == nil { - if curTSFile.FirstPacketWritten() && - curTSFile.Duration() >= c.hlsSegmentDuration && - audioAUCount >= segmentMinAUCount { - - if curTSFile != nil { - curTSFile.Close() - } - - audioAUCount = 0 - curTSFile = hls.NewTSFile(videoTrack, audioTrack) - c.tsByName[curTSFile.Name()] = curTSFile - c.tsQueue = append(c.tsQueue, curTSFile) - if len(c.tsQueue) > c.hlsSegmentCount { - delete(c.tsByName, c.tsQueue[0].Name()) - c.tsQueue = c.tsQueue[1:] - c.tsDeleteCount++ - } - } - } else { - if !curTSFile.FirstPacketWritten() { - return nil - } - } - - for i, au := range aus { - auPTS := pts + time.Duration(i)*1000*time.Second/time.Duration(aacConfig.SampleRate) - - audioAUCount++ - curTSFile.SetPCR(time.Since(startPCR)) - err := curTSFile.WriteAAC( - aacConfig.SampleRate, - aacConfig.ChannelCount, - auPTS+hlsConverterPTSOffset, - au) - if err != nil { - return err - } - } - - return nil - }() + err = c.muxer.WriteAAC(pts, aus) if err != nil { return err } @@ -513,7 +399,7 @@ func (c *hlsConverter) runInner(innerCtx context.Context) error { for { select { case <-closeCheckTicker.C: - t := time.Unix(atomic.LoadInt64(c.lasthlsConverterRequestTime), 0) + t := time.Unix(atomic.LoadInt64(c.lastRequestTime), 0) if time.Since(t) >= closeAfterInactivity { c.ringBuffer.Close() <-writerDone @@ -542,7 +428,7 @@ func (c *hlsConverter) runRequestHandler(terminate chan struct{}, done chan stru case preq := <-c.request: req := preq - atomic.StoreInt64(c.lasthlsConverterRequestTime, time.Now().Unix()) + atomic.StoreInt64(c.lastRequestTime, time.Now().Unix()) conf := c.path.Conf() @@ -569,61 +455,26 @@ func (c *hlsConverter) runRequestHandler(terminate chan struct{}, done chan stru switch { case req.File == "stream.m3u8": - func() { - c.tsMutex.RLock() - defer c.tsMutex.RUnlock() + r := c.muxer.Playlist() + if r == nil { + req.W.WriteHeader(http.StatusNotFound) + req.Res <- nil + continue + } - if len(c.tsQueue) == 0 { - req.W.WriteHeader(http.StatusNotFound) - req.Res <- nil - return - } - - cnt := "#EXTM3U\n" - cnt += "#EXT-X-VERSION:3\n" - cnt += "#EXT-X-ALLOW-CACHE:NO\n" - - targetDuration := func() uint { - ret := uint(math.Ceil(c.hlsSegmentDuration.Seconds())) - - // EXTINF, when rounded to the nearest integer, must be <= EXT-X-TARGETDURATION - for _, f := range c.tsQueue { - v2 := uint(math.Round(f.Duration().Seconds())) - if v2 > ret { - ret = v2 - } - } - - return ret - }() - cnt += "#EXT-X-TARGETDURATION:" + strconv.FormatUint(uint64(targetDuration), 10) + "\n" - - cnt += "#EXT-X-MEDIA-SEQUENCE:" + strconv.FormatInt(int64(c.tsDeleteCount), 10) + "\n" - - for _, f := range c.tsQueue { - cnt += "#EXTINF:" + strconv.FormatFloat(f.Duration().Seconds(), 'f', -1, 64) + ",\n" - cnt += f.Name() + ".ts\n" - } - - req.W.Header().Set("Content-Type", `application/x-mpegURL`) - req.Res <- bytes.NewReader([]byte(cnt)) - }() + req.W.Header().Set("Content-Type", `application/x-mpegURL`) + req.Res <- r case strings.HasSuffix(req.File, ".ts"): - base := strings.TrimSuffix(req.File, ".ts") - - c.tsMutex.RLock() - f, ok := c.tsByName[base] - c.tsMutex.RUnlock() - - if !ok { + r := c.muxer.TSFile(req.File) + if r == nil { req.W.WriteHeader(http.StatusNotFound) req.Res <- nil continue } req.W.Header().Set("Content-Type", `video/MP2T`) - req.Res <- f.NewReader() + req.Res <- r case req.File == "": req.Res <- bytes.NewReader([]byte(index)) diff --git a/internal/core/rtmp_conn.go b/internal/core/rtmp_conn.go index 7cd3c188..c37f549b 100644 --- a/internal/core/rtmp_conn.go +++ b/internal/core/rtmp_conn.go @@ -17,6 +17,7 @@ import ( "github.com/aler9/gortsplib/pkg/rtpaac" "github.com/aler9/gortsplib/pkg/rtph264" "github.com/notedit/rtmp/av" + "github.com/pion/rtp" "github.com/aler9/rtsp-simple-server/internal/externalcmd" "github.com/aler9/rtsp-simple-server/internal/h264" @@ -290,9 +291,16 @@ func (c *rtmpConn) runRead(ctx context.Context) error { pair := data.(rtmpConnTrackIDPayloadPair) if videoTrack != nil && pair.trackID == videoTrackID { - nalus, pts, err := h264Decoder.Decode(pair.buf) + var pkt rtp.Packet + err := pkt.Unmarshal(pair.buf) if err != nil { - if err != rtph264.ErrMorePacketsNeeded { + c.log(logger.Warn, "unable to decode RTP packet: %v", err) + continue + } + + nalus, pts, err := h264Decoder.DecodeRTP(&pkt) + if err != nil { + if err != rtph264.ErrMorePacketsNeeded && err != rtph264.ErrNonStartingPacketAndNoPrevious { c.log(logger.Warn, "unable to decode video track: %v", err) } continue @@ -311,8 +319,7 @@ func (c *rtmpConn) runRead(ctx context.Context) error { // RTP marker means that all the NALUs with the same PTS have been received. // send them together. - marker := (pair.buf[1] >> 7 & 0x1) > 0 - if marker { + if pkt.Marker { data, err := h264.EncodeAVCC(videoBuf) if err != nil { return err @@ -334,7 +341,14 @@ func (c *rtmpConn) runRead(ctx context.Context) error { } } else if audioTrack != nil && pair.trackID == audioTrackID { - aus, pts, err := aacDecoder.Decode(pair.buf) + var pkt rtp.Packet + err := pkt.Unmarshal(pair.buf) + if err != nil { + c.log(logger.Warn, "unable to decode RTP packet: %v", err) + continue + } + + aus, pts, err := aacDecoder.DecodeRTP(&pkt) if err != nil { if err != rtpaac.ErrMorePacketsNeeded { c.log(logger.Warn, "unable to decode audio track: %v", err) diff --git a/internal/hls/multiaccessbuffer.go b/internal/hls/multiaccessbuffer.go index cc02c74e..f70ca201 100644 --- a/internal/hls/multiaccessbuffer.go +++ b/internal/hls/multiaccessbuffer.go @@ -61,7 +61,7 @@ func (m *multiAccessBuffer) Write(p []byte) (int, error) { return n, nil } -func (m *multiAccessBuffer) NewReader() *multiAccessBufferReader { +func (m *multiAccessBuffer) NewReader() io.Reader { return &multiAccessBufferReader{ m: m, } diff --git a/internal/hls/muxer.go b/internal/hls/muxer.go new file mode 100644 index 00000000..23f7faef --- /dev/null +++ b/internal/hls/muxer.go @@ -0,0 +1,238 @@ +package hls + +import ( + "bytes" + "io" + "math" + "strconv" + "strings" + "sync" + "time" + + "github.com/aler9/gortsplib" + "github.com/aler9/gortsplib/pkg/rtpaac" + + "github.com/aler9/rtsp-simple-server/internal/h264" +) + +const ( + // an offset is needed to + // - avoid negative PTS values + // - avoid PTS < DTS during startup + ptsOffset = 2 * time.Second + + segmentMinAUCount = 100 +) + +// Muxer is a HLS muxer. +type Muxer struct { + hlsSegmentCount int + hlsSegmentDuration time.Duration + videoTrack *gortsplib.Track + audioTrack *gortsplib.Track + + aacConfig rtpaac.MPEG4AudioConfig + startPCR time.Time + videoDTSEst *h264.DTSEstimator + audioAUCount int + tsCurrent *tsFile + tsQueue []*tsFile + tsByName map[string]*tsFile + tsDeleteCount int + mutex sync.RWMutex +} + +// NewMuxer allocates a Muxer. +func NewMuxer( + hlsSegmentCount int, + hlsSegmentDuration time.Duration, + videoTrack *gortsplib.Track, + audioTrack *gortsplib.Track) (*Muxer, error) { + var aacConfig rtpaac.MPEG4AudioConfig + if audioTrack != nil { + byts, err := audioTrack.ExtractDataAAC() + if err != nil { + return nil, err + } + + err = aacConfig.Decode(byts) + if err != nil { + return nil, err + } + } + + m := &Muxer{ + hlsSegmentCount: hlsSegmentCount, + hlsSegmentDuration: hlsSegmentDuration, + videoTrack: videoTrack, + audioTrack: audioTrack, + aacConfig: aacConfig, + startPCR: time.Now(), + videoDTSEst: h264.NewDTSEstimator(), + tsCurrent: newTSFile(videoTrack != nil, audioTrack != nil), + tsByName: make(map[string]*tsFile), + } + + m.tsByName[m.tsCurrent.name] = m.tsCurrent + m.tsQueue = append(m.tsQueue, m.tsCurrent) + + return m, nil +} + +// Close closes a Muxer. +func (m *Muxer) Close() { + m.tsCurrent.close() +} + +// WriteH264 writes H264 NALUs, grouped by PTS, into the muxer. +func (m *Muxer) WriteH264(pts time.Duration, nalus [][]byte) error { + idrPresent := func() bool { + for _, nalu := range nalus { + typ := h264.NALUType(nalu[0] & 0x1F) + if typ == h264.NALUTypeIDR { + return true + } + } + return false + }() + + // skip group silently until we find one with a IDR + if !m.tsCurrent.firstPacketWritten && !idrPresent { + return nil + } + + m.mutex.Lock() + defer m.mutex.Unlock() + + if idrPresent { + if m.tsCurrent.firstPacketWritten && + m.tsCurrent.duration() >= m.hlsSegmentDuration { + if m.tsCurrent != nil { + m.tsCurrent.close() + } + + m.tsCurrent = newTSFile(m.videoTrack != nil, m.audioTrack != nil) + + m.tsByName[m.tsCurrent.name] = m.tsCurrent + m.tsQueue = append(m.tsQueue, m.tsCurrent) + if len(m.tsQueue) > m.hlsSegmentCount { + delete(m.tsByName, m.tsQueue[0].name) + m.tsQueue = m.tsQueue[1:] + m.tsDeleteCount++ + } + } + } + + m.tsCurrent.setPCR(time.Since(m.startPCR)) + err := m.tsCurrent.writeH264( + m.videoDTSEst.Feed(pts+ptsOffset), + pts+ptsOffset, + idrPresent, + nalus) + if err != nil { + return err + } + + return nil +} + +// WriteAAC writes AAC AUs, grouped by PTS, into the muxer. +func (m *Muxer) WriteAAC(pts time.Duration, aus [][]byte) error { + m.mutex.Lock() + defer m.mutex.Unlock() + + if m.videoTrack != nil { + if m.tsCurrent.firstPacketWritten && + m.tsCurrent.duration() >= m.hlsSegmentDuration && + m.audioAUCount >= segmentMinAUCount { + + if m.tsCurrent != nil { + m.tsCurrent.close() + } + + m.audioAUCount = 0 + m.tsCurrent = newTSFile(m.videoTrack != nil, m.audioTrack != nil) + m.tsByName[m.tsCurrent.name] = m.tsCurrent + m.tsQueue = append(m.tsQueue, m.tsCurrent) + if len(m.tsQueue) > m.hlsSegmentCount { + delete(m.tsByName, m.tsQueue[0].name) + m.tsQueue = m.tsQueue[1:] + m.tsDeleteCount++ + } + } + } else { + if !m.tsCurrent.firstPacketWritten { + return nil + } + } + + for i, au := range aus { + auPTS := pts + time.Duration(i)*1000*time.Second/time.Duration(m.aacConfig.SampleRate) + + m.audioAUCount++ + m.tsCurrent.setPCR(time.Since(m.startPCR)) + err := m.tsCurrent.writeAAC( + m.aacConfig.SampleRate, + m.aacConfig.ChannelCount, + auPTS+ptsOffset, + au) + if err != nil { + return err + } + } + + return nil +} + +// Playlist returns a reader to read the HLS playlist in M3U8 format. +func (m *Muxer) Playlist() io.Reader { + m.mutex.RLock() + defer m.mutex.RUnlock() + + if len(m.tsQueue) == 0 { + return nil + } + + cnt := "#EXTM3U\n" + cnt += "#EXT-X-VERSION:3\n" + cnt += "#EXT-X-ALLOW-CACHE:NO\n" + + targetDuration := func() uint { + ret := uint(math.Ceil(m.hlsSegmentDuration.Seconds())) + + // EXTINF, when rounded to the nearest integer, must be <= EXT-X-TARGETDURATION + for _, f := range m.tsQueue { + v2 := uint(math.Round(f.duration().Seconds())) + if v2 > ret { + ret = v2 + } + } + + return ret + }() + cnt += "#EXT-X-TARGETDURATION:" + strconv.FormatUint(uint64(targetDuration), 10) + "\n" + + cnt += "#EXT-X-MEDIA-SEQUENCE:" + strconv.FormatInt(int64(m.tsDeleteCount), 10) + "\n" + + for _, f := range m.tsQueue { + cnt += "#EXTINF:" + strconv.FormatFloat(f.duration().Seconds(), 'f', -1, 64) + ",\n" + cnt += f.name + ".ts\n" + } + + return bytes.NewReader([]byte(cnt)) +} + +// TSFile returns a reader to read a given MPEG-TS file. +func (m *Muxer) TSFile(fname string) io.Reader { + base := strings.TrimSuffix(fname, ".ts") + + m.mutex.RLock() + f, ok := m.tsByName[base] + m.mutex.RUnlock() + + if !ok { + return nil + } + + return f.newReader() +} diff --git a/internal/hls/muxer_test.go b/internal/hls/muxer_test.go new file mode 100644 index 00000000..07e0eeaa --- /dev/null +++ b/internal/hls/muxer_test.go @@ -0,0 +1,54 @@ +package hls + +import ( + "io/ioutil" + "testing" + "time" + + "github.com/aler9/gortsplib" + "github.com/stretchr/testify/require" +) + +func TestMuxer(t *testing.T) { + videoTrack, err := gortsplib.NewTrackH264(96, []byte{0x01, 0x02, 0x03, 0x04}, []byte{0x01, 0x02, 0x03, 0x04}) + require.NoError(t, err) + + audioTrack, err := gortsplib.NewTrackAAC(97, []byte{17, 144}) + require.NoError(t, err) + + m, err := NewMuxer(3, 5*time.Second, videoTrack, audioTrack) + require.NoError(t, err) + defer m.Close() + + // group without IDR + err = m.WriteH264(1*time.Second, [][]byte{ + {0x06}, + {0x07}, + }) + require.NoError(t, err) + + // group with IDR + err = m.WriteH264(2*time.Second, [][]byte{ + {0x05}, + {0x06}, + }) + require.NoError(t, err) + + err = m.WriteAAC(3*time.Second, [][]byte{ + {0x01, 0x02, 0x03, 0x04}, + {0x05, 0x06, 0x07, 0x08}, + }) + require.NoError(t, err) + + // group without IDR + err = m.WriteH264(4*time.Second, [][]byte{ + {0x06}, + {0x07}, + }) + require.NoError(t, err) + + byts, err := ioutil.ReadAll(m.Playlist()) + require.NoError(t, err) + + require.Regexp(t, `^#EXTM3U\n#EXT-X-VERSION:3\n#EXT-X-ALLOW-CACHE:NO\n#EXT-X-TARGETDURATION:5\n#EXT-X-MEDIA-SEQUENCE:0\n#EXTINF:2,\n[0-9]+\.ts\n$`, string(byts)) +} diff --git a/internal/hls/tsfile.go b/internal/hls/tsfile.go index 77397fdb..7e814952 100644 --- a/internal/hls/tsfile.go +++ b/internal/hls/tsfile.go @@ -6,15 +6,13 @@ import ( "strconv" "time" - "github.com/aler9/gortsplib" "github.com/asticode/go-astits" "github.com/aler9/rtsp-simple-server/internal/aac" "github.com/aler9/rtsp-simple-server/internal/h264" ) -// TSFile is a MPEG-TS file. -type TSFile struct { +type tsFile struct { name string buf *multiAccessBuffer mux *astits.Muxer @@ -25,30 +23,29 @@ type TSFile struct { maxPTS time.Duration } -// NewTSFile allocates a TSFile. -func NewTSFile(videoTrack *gortsplib.Track, audioTrack *gortsplib.Track) *TSFile { - t := &TSFile{ +func newTSFile(hasVideoTrack bool, hasAudioTrack bool) *tsFile { + t := &tsFile{ buf: newMultiAccessBuffer(), name: strconv.FormatInt(time.Now().Unix(), 10), } t.mux = astits.NewMuxer(context.Background(), t.buf) - if videoTrack != nil { + if hasVideoTrack { t.mux.AddElementaryStream(astits.PMTElementaryStream{ ElementaryPID: 256, StreamType: astits.StreamTypeH264Video, }) } - if audioTrack != nil { + if hasAudioTrack { t.mux.AddElementaryStream(astits.PMTElementaryStream{ ElementaryPID: 257, StreamType: astits.StreamTypeAACAudio, }) } - if videoTrack != nil { + if hasVideoTrack { t.pcrTrackIsVideo = true t.mux.SetPCRPID(256) } else { @@ -63,38 +60,23 @@ func NewTSFile(videoTrack *gortsplib.Track, audioTrack *gortsplib.Track) *TSFile return t } -// Close closes a TSFile. -func (t *TSFile) Close() error { +func (t *tsFile) close() error { return t.buf.Close() } -// Name returns the file name. -func (t *TSFile) Name() string { - return t.name -} - -// Duration returns the file duration. -func (t *TSFile) Duration() time.Duration { +func (t *tsFile) duration() time.Duration { return t.maxPTS - t.minPTS } -// FirstPacketWritten returns whether a packet ha been written into the file. -func (t *TSFile) FirstPacketWritten() bool { - return t.firstPacketWritten -} - -// SetPCR sets the PCR. -func (t *TSFile) SetPCR(pcr time.Duration) { +func (t *tsFile) setPCR(pcr time.Duration) { t.pcr = pcr } -// NewReader allocates a reader to read the file. -func (t *TSFile) NewReader() io.Reader { +func (t *tsFile) newReader() io.Reader { return t.buf.NewReader() } -// WriteH264 writes H264 NALUs into the file. -func (t *TSFile) WriteH264(dts time.Duration, pts time.Duration, isIDR bool, nalus [][]byte) error { +func (t *tsFile) writeH264(dts time.Duration, pts time.Duration, isIDR bool, nalus [][]byte) error { if t.pcrTrackIsVideo { if !t.firstPacketWritten { t.firstPacketWritten = true @@ -143,8 +125,7 @@ func (t *TSFile) WriteH264(dts time.Duration, pts time.Duration, isIDR bool, nal return err } -// WriteAAC writes AAC AUs into the file. -func (t *TSFile) WriteAAC(sampleRate int, channelCount int, pts time.Duration, au []byte) error { +func (t *tsFile) writeAAC(sampleRate int, channelCount int, pts time.Duration, au []byte) error { if !t.pcrTrackIsVideo { if !t.firstPacketWritten { t.firstPacketWritten = true