mirror of
https://github.com/bluenviron/mediamtx.git
synced 2025-12-26 04:52:00 -08:00
hls: move muxer into dedicated object
This commit is contained in:
parent
f225363e7d
commit
c3c643c602
8 changed files with 374 additions and 236 deletions
2
go.mod
2
go.mod
|
|
@ -5,7 +5,7 @@ go 1.16
|
||||||
require (
|
require (
|
||||||
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 // indirect
|
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 // indirect
|
||||||
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d // indirect
|
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d // indirect
|
||||||
github.com/aler9/gortsplib v0.0.0-20210626112538-649c63cf5b62
|
github.com/aler9/gortsplib v0.0.0-20210724151831-dae5a1f04033
|
||||||
github.com/asticode/go-astits v1.9.0
|
github.com/asticode/go-astits v1.9.0
|
||||||
github.com/davecgh/go-spew v1.1.1 // indirect
|
github.com/davecgh/go-spew v1.1.1 // indirect
|
||||||
github.com/fsnotify/fsnotify v1.4.9
|
github.com/fsnotify/fsnotify v1.4.9
|
||||||
|
|
|
||||||
4
go.sum
4
go.sum
|
|
@ -2,8 +2,8 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 h1:JYp7IbQjafo
|
||||||
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
|
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
|
||||||
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d h1:UQZhZ2O0vMHr2cI+DC1Mbh0TJxzA3RcLoMsFw+aXw7E=
|
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d h1:UQZhZ2O0vMHr2cI+DC1Mbh0TJxzA3RcLoMsFw+aXw7E=
|
||||||
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho=
|
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho=
|
||||||
github.com/aler9/gortsplib v0.0.0-20210626112538-649c63cf5b62 h1:PPTqxgdDmDBQcDziEuLqS4VzmMTp5NSd7b3WZqQCtR4=
|
github.com/aler9/gortsplib v0.0.0-20210724151831-dae5a1f04033 h1:Bf0hzdN1jUWsb5Mzezq1pd18EIBeKXxk5clIpHZJ1Lk=
|
||||||
github.com/aler9/gortsplib v0.0.0-20210626112538-649c63cf5b62/go.mod h1:s5FsbPRxJhU/YedvUKAKHVY+lQEdYsiJpuN2CHb89cI=
|
github.com/aler9/gortsplib v0.0.0-20210724151831-dae5a1f04033/go.mod h1:s5FsbPRxJhU/YedvUKAKHVY+lQEdYsiJpuN2CHb89cI=
|
||||||
github.com/aler9/rtmp v0.0.0-20210403095203-3be4a5535927 h1:95mXJ5fUCYpBRdSOnLAQAdJHHKxxxJrVCiaqDi965YQ=
|
github.com/aler9/rtmp v0.0.0-20210403095203-3be4a5535927 h1:95mXJ5fUCYpBRdSOnLAQAdJHHKxxxJrVCiaqDi965YQ=
|
||||||
github.com/aler9/rtmp v0.0.0-20210403095203-3be4a5535927/go.mod h1:vzuE21rowz+lT1NGsWbreIvYulgBpCGnQyeTyFblUHc=
|
github.com/aler9/rtmp v0.0.0-20210403095203-3be4a5535927/go.mod h1:vzuE21rowz+lT1NGsWbreIvYulgBpCGnQyeTyFblUHc=
|
||||||
github.com/asticode/go-astikit v0.20.0 h1:+7N+J4E4lWx2QOkRdOf6DafWJMv6O4RRfgClwQokrH8=
|
github.com/asticode/go-astikit v0.20.0 h1:+7N+J4E4lWx2QOkRdOf6DafWJMv6O4RRfgClwQokrH8=
|
||||||
|
|
|
||||||
|
|
@ -5,10 +5,8 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"math"
|
|
||||||
"net"
|
"net"
|
||||||
"net/http"
|
"net/http"
|
||||||
"strconv"
|
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
|
|
@ -26,12 +24,6 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
// an offset is needed to
|
|
||||||
// - avoid negative PTS values
|
|
||||||
// - avoid PTS < DTS during startup
|
|
||||||
hlsConverterPTSOffset = 2 * time.Second
|
|
||||||
|
|
||||||
segmentMinAUCount = 100
|
|
||||||
closeCheckPeriod = 1 * time.Second
|
closeCheckPeriod = 1 * time.Second
|
||||||
closeAfterInactivity = 60 * time.Second
|
closeAfterInactivity = 60 * time.Second
|
||||||
)
|
)
|
||||||
|
|
@ -116,15 +108,12 @@ type hlsConverter struct {
|
||||||
pathMan hlsConverterPathMan
|
pathMan hlsConverterPathMan
|
||||||
parent hlsConverterParent
|
parent hlsConverterParent
|
||||||
|
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
ctxCancel func()
|
ctxCancel func()
|
||||||
path readPublisherPath
|
path readPublisherPath
|
||||||
ringBuffer *ringbuffer.RingBuffer
|
ringBuffer *ringbuffer.RingBuffer
|
||||||
tsQueue []*hls.TSFile
|
lastRequestTime *int64
|
||||||
tsByName map[string]*hls.TSFile
|
muxer *hls.Muxer
|
||||||
tsDeleteCount int
|
|
||||||
tsMutex sync.RWMutex
|
|
||||||
lasthlsConverterRequestTime *int64
|
|
||||||
|
|
||||||
// in
|
// in
|
||||||
request chan hlsConverterRequest
|
request chan hlsConverterRequest
|
||||||
|
|
@ -153,12 +142,11 @@ func newHLSConverter(
|
||||||
parent: parent,
|
parent: parent,
|
||||||
ctx: ctx,
|
ctx: ctx,
|
||||||
ctxCancel: ctxCancel,
|
ctxCancel: ctxCancel,
|
||||||
lasthlsConverterRequestTime: func() *int64 {
|
lastRequestTime: func() *int64 {
|
||||||
v := time.Now().Unix()
|
v := time.Now().Unix()
|
||||||
return &v
|
return &v
|
||||||
}(),
|
}(),
|
||||||
tsByName: make(map[string]*hls.TSFile),
|
request: make(chan hlsConverterRequest),
|
||||||
request: make(chan hlsConverterRequest),
|
|
||||||
}
|
}
|
||||||
|
|
||||||
c.log(logger.Info, "opened")
|
c.log(logger.Info, "opened")
|
||||||
|
|
@ -294,14 +282,19 @@ func (c *hlsConverter) runInner(innerCtx context.Context) error {
|
||||||
return fmt.Errorf("unable to find a video or audio track")
|
return fmt.Errorf("unable to find a video or audio track")
|
||||||
}
|
}
|
||||||
|
|
||||||
curTSFile := hls.NewTSFile(videoTrack, audioTrack)
|
var err error
|
||||||
c.tsByName[curTSFile.Name()] = curTSFile
|
c.muxer, err = hls.NewMuxer(
|
||||||
c.tsQueue = append(c.tsQueue, curTSFile)
|
c.hlsSegmentCount,
|
||||||
|
c.hlsSegmentDuration,
|
||||||
defer func() {
|
videoTrack,
|
||||||
curTSFile.Close()
|
audioTrack,
|
||||||
}()
|
)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer c.muxer.Close()
|
||||||
|
|
||||||
|
// start request handler only after muxer has been inizialized
|
||||||
requestHandlerTerminate := make(chan struct{})
|
requestHandlerTerminate := make(chan struct{})
|
||||||
requestHandlerDone := make(chan struct{})
|
requestHandlerDone := make(chan struct{})
|
||||||
go c.runRequestHandler(requestHandlerTerminate, requestHandlerDone)
|
go c.runRequestHandler(requestHandlerTerminate, requestHandlerDone)
|
||||||
|
|
@ -322,11 +315,7 @@ func (c *hlsConverter) runInner(innerCtx context.Context) error {
|
||||||
writerDone := make(chan error)
|
writerDone := make(chan error)
|
||||||
go func() {
|
go func() {
|
||||||
writerDone <- func() error {
|
writerDone <- func() error {
|
||||||
startPCR := time.Now()
|
|
||||||
var videoBuf [][]byte
|
var videoBuf [][]byte
|
||||||
videoDTSEst := h264.NewDTSEstimator()
|
|
||||||
videoInitialized := false
|
|
||||||
audioAUCount := 0
|
|
||||||
|
|
||||||
for {
|
for {
|
||||||
data, ok := c.ringBuffer.Pull()
|
data, ok := c.ringBuffer.Pull()
|
||||||
|
|
@ -343,21 +332,9 @@ func (c *hlsConverter) runInner(innerCtx context.Context) error {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
// skip packets that are part of frames sent before
|
|
||||||
// the initialization of the converter
|
|
||||||
if !videoInitialized {
|
|
||||||
typ := pkt.Payload[0] & 0x1F
|
|
||||||
start := pkt.Payload[1] >> 7
|
|
||||||
if typ == 28 && start != 1 { // FU-A
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
videoInitialized = true
|
|
||||||
}
|
|
||||||
|
|
||||||
nalus, pts, err := h264Decoder.DecodeRTP(&pkt)
|
nalus, pts, err := h264Decoder.DecodeRTP(&pkt)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err != rtph264.ErrMorePacketsNeeded {
|
if err != rtph264.ErrMorePacketsNeeded && err != rtph264.ErrNonStartingPacketAndNoPrevious {
|
||||||
c.log(logger.Warn, "unable to decode video track: %v", err)
|
c.log(logger.Warn, "unable to decode video track: %v", err)
|
||||||
}
|
}
|
||||||
continue
|
continue
|
||||||
|
|
@ -382,70 +359,24 @@ func (c *hlsConverter) runInner(innerCtx context.Context) error {
|
||||||
|
|
||||||
// RTP marker means that all the NALUs with the same PTS have been received.
|
// RTP marker means that all the NALUs with the same PTS have been received.
|
||||||
// send them together.
|
// send them together.
|
||||||
marker := (pair.buf[1] >> 7 & 0x1) > 0
|
if pkt.Marker {
|
||||||
if marker {
|
err := c.muxer.WriteH264(pts, videoBuf)
|
||||||
bufferHasIDR := func() bool {
|
|
||||||
for _, nalu := range videoBuf {
|
|
||||||
typ := h264.NALUType(nalu[0] & 0x1F)
|
|
||||||
if typ == h264.NALUTypeIDR {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return false
|
|
||||||
}()
|
|
||||||
|
|
||||||
// we received a marker packet but
|
|
||||||
// - no IDR has been stored yet in current file
|
|
||||||
// - there's no IDR in the buffer
|
|
||||||
// data cannot be parsed, clear buffer
|
|
||||||
if !bufferHasIDR && !curTSFile.FirstPacketWritten() {
|
|
||||||
videoBuf = nil
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
err := func() error {
|
|
||||||
c.tsMutex.Lock()
|
|
||||||
defer c.tsMutex.Unlock()
|
|
||||||
|
|
||||||
if bufferHasIDR {
|
|
||||||
if curTSFile.FirstPacketWritten() &&
|
|
||||||
curTSFile.Duration() >= c.hlsSegmentDuration {
|
|
||||||
if curTSFile != nil {
|
|
||||||
curTSFile.Close()
|
|
||||||
}
|
|
||||||
|
|
||||||
curTSFile = hls.NewTSFile(videoTrack, audioTrack)
|
|
||||||
|
|
||||||
c.tsByName[curTSFile.Name()] = curTSFile
|
|
||||||
c.tsQueue = append(c.tsQueue, curTSFile)
|
|
||||||
if len(c.tsQueue) > c.hlsSegmentCount {
|
|
||||||
delete(c.tsByName, c.tsQueue[0].Name())
|
|
||||||
c.tsQueue = c.tsQueue[1:]
|
|
||||||
c.tsDeleteCount++
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
curTSFile.SetPCR(time.Since(startPCR))
|
|
||||||
err := curTSFile.WriteH264(
|
|
||||||
videoDTSEst.Feed(pts+hlsConverterPTSOffset),
|
|
||||||
pts+hlsConverterPTSOffset,
|
|
||||||
bufferHasIDR,
|
|
||||||
videoBuf)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
videoBuf = nil
|
|
||||||
return nil
|
|
||||||
}()
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
videoBuf = nil
|
||||||
}
|
}
|
||||||
|
|
||||||
} else if audioTrack != nil && pair.trackID == audioTrackID {
|
} else if audioTrack != nil && pair.trackID == audioTrackID {
|
||||||
aus, pts, err := aacDecoder.Decode(pair.buf)
|
var pkt rtp.Packet
|
||||||
|
err := pkt.Unmarshal(pair.buf)
|
||||||
|
if err != nil {
|
||||||
|
c.log(logger.Warn, "unable to decode RTP packet: %v", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
aus, pts, err := aacDecoder.DecodeRTP(&pkt)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err != rtpaac.ErrMorePacketsNeeded {
|
if err != rtpaac.ErrMorePacketsNeeded {
|
||||||
c.log(logger.Warn, "unable to decode audio track: %v", err)
|
c.log(logger.Warn, "unable to decode audio track: %v", err)
|
||||||
|
|
@ -453,52 +384,7 @@ func (c *hlsConverter) runInner(innerCtx context.Context) error {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
err = func() error {
|
err = c.muxer.WriteAAC(pts, aus)
|
||||||
c.tsMutex.Lock()
|
|
||||||
defer c.tsMutex.Unlock()
|
|
||||||
|
|
||||||
if videoTrack == nil {
|
|
||||||
if curTSFile.FirstPacketWritten() &&
|
|
||||||
curTSFile.Duration() >= c.hlsSegmentDuration &&
|
|
||||||
audioAUCount >= segmentMinAUCount {
|
|
||||||
|
|
||||||
if curTSFile != nil {
|
|
||||||
curTSFile.Close()
|
|
||||||
}
|
|
||||||
|
|
||||||
audioAUCount = 0
|
|
||||||
curTSFile = hls.NewTSFile(videoTrack, audioTrack)
|
|
||||||
c.tsByName[curTSFile.Name()] = curTSFile
|
|
||||||
c.tsQueue = append(c.tsQueue, curTSFile)
|
|
||||||
if len(c.tsQueue) > c.hlsSegmentCount {
|
|
||||||
delete(c.tsByName, c.tsQueue[0].Name())
|
|
||||||
c.tsQueue = c.tsQueue[1:]
|
|
||||||
c.tsDeleteCount++
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
if !curTSFile.FirstPacketWritten() {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
for i, au := range aus {
|
|
||||||
auPTS := pts + time.Duration(i)*1000*time.Second/time.Duration(aacConfig.SampleRate)
|
|
||||||
|
|
||||||
audioAUCount++
|
|
||||||
curTSFile.SetPCR(time.Since(startPCR))
|
|
||||||
err := curTSFile.WriteAAC(
|
|
||||||
aacConfig.SampleRate,
|
|
||||||
aacConfig.ChannelCount,
|
|
||||||
auPTS+hlsConverterPTSOffset,
|
|
||||||
au)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}()
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
@ -513,7 +399,7 @@ func (c *hlsConverter) runInner(innerCtx context.Context) error {
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-closeCheckTicker.C:
|
case <-closeCheckTicker.C:
|
||||||
t := time.Unix(atomic.LoadInt64(c.lasthlsConverterRequestTime), 0)
|
t := time.Unix(atomic.LoadInt64(c.lastRequestTime), 0)
|
||||||
if time.Since(t) >= closeAfterInactivity {
|
if time.Since(t) >= closeAfterInactivity {
|
||||||
c.ringBuffer.Close()
|
c.ringBuffer.Close()
|
||||||
<-writerDone
|
<-writerDone
|
||||||
|
|
@ -542,7 +428,7 @@ func (c *hlsConverter) runRequestHandler(terminate chan struct{}, done chan stru
|
||||||
case preq := <-c.request:
|
case preq := <-c.request:
|
||||||
req := preq
|
req := preq
|
||||||
|
|
||||||
atomic.StoreInt64(c.lasthlsConverterRequestTime, time.Now().Unix())
|
atomic.StoreInt64(c.lastRequestTime, time.Now().Unix())
|
||||||
|
|
||||||
conf := c.path.Conf()
|
conf := c.path.Conf()
|
||||||
|
|
||||||
|
|
@ -569,61 +455,26 @@ func (c *hlsConverter) runRequestHandler(terminate chan struct{}, done chan stru
|
||||||
|
|
||||||
switch {
|
switch {
|
||||||
case req.File == "stream.m3u8":
|
case req.File == "stream.m3u8":
|
||||||
func() {
|
r := c.muxer.Playlist()
|
||||||
c.tsMutex.RLock()
|
if r == nil {
|
||||||
defer c.tsMutex.RUnlock()
|
req.W.WriteHeader(http.StatusNotFound)
|
||||||
|
req.Res <- nil
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
if len(c.tsQueue) == 0 {
|
req.W.Header().Set("Content-Type", `application/x-mpegURL`)
|
||||||
req.W.WriteHeader(http.StatusNotFound)
|
req.Res <- r
|
||||||
req.Res <- nil
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
cnt := "#EXTM3U\n"
|
|
||||||
cnt += "#EXT-X-VERSION:3\n"
|
|
||||||
cnt += "#EXT-X-ALLOW-CACHE:NO\n"
|
|
||||||
|
|
||||||
targetDuration := func() uint {
|
|
||||||
ret := uint(math.Ceil(c.hlsSegmentDuration.Seconds()))
|
|
||||||
|
|
||||||
// EXTINF, when rounded to the nearest integer, must be <= EXT-X-TARGETDURATION
|
|
||||||
for _, f := range c.tsQueue {
|
|
||||||
v2 := uint(math.Round(f.Duration().Seconds()))
|
|
||||||
if v2 > ret {
|
|
||||||
ret = v2
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return ret
|
|
||||||
}()
|
|
||||||
cnt += "#EXT-X-TARGETDURATION:" + strconv.FormatUint(uint64(targetDuration), 10) + "\n"
|
|
||||||
|
|
||||||
cnt += "#EXT-X-MEDIA-SEQUENCE:" + strconv.FormatInt(int64(c.tsDeleteCount), 10) + "\n"
|
|
||||||
|
|
||||||
for _, f := range c.tsQueue {
|
|
||||||
cnt += "#EXTINF:" + strconv.FormatFloat(f.Duration().Seconds(), 'f', -1, 64) + ",\n"
|
|
||||||
cnt += f.Name() + ".ts\n"
|
|
||||||
}
|
|
||||||
|
|
||||||
req.W.Header().Set("Content-Type", `application/x-mpegURL`)
|
|
||||||
req.Res <- bytes.NewReader([]byte(cnt))
|
|
||||||
}()
|
|
||||||
|
|
||||||
case strings.HasSuffix(req.File, ".ts"):
|
case strings.HasSuffix(req.File, ".ts"):
|
||||||
base := strings.TrimSuffix(req.File, ".ts")
|
r := c.muxer.TSFile(req.File)
|
||||||
|
if r == nil {
|
||||||
c.tsMutex.RLock()
|
|
||||||
f, ok := c.tsByName[base]
|
|
||||||
c.tsMutex.RUnlock()
|
|
||||||
|
|
||||||
if !ok {
|
|
||||||
req.W.WriteHeader(http.StatusNotFound)
|
req.W.WriteHeader(http.StatusNotFound)
|
||||||
req.Res <- nil
|
req.Res <- nil
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
req.W.Header().Set("Content-Type", `video/MP2T`)
|
req.W.Header().Set("Content-Type", `video/MP2T`)
|
||||||
req.Res <- f.NewReader()
|
req.Res <- r
|
||||||
|
|
||||||
case req.File == "":
|
case req.File == "":
|
||||||
req.Res <- bytes.NewReader([]byte(index))
|
req.Res <- bytes.NewReader([]byte(index))
|
||||||
|
|
|
||||||
|
|
@ -17,6 +17,7 @@ import (
|
||||||
"github.com/aler9/gortsplib/pkg/rtpaac"
|
"github.com/aler9/gortsplib/pkg/rtpaac"
|
||||||
"github.com/aler9/gortsplib/pkg/rtph264"
|
"github.com/aler9/gortsplib/pkg/rtph264"
|
||||||
"github.com/notedit/rtmp/av"
|
"github.com/notedit/rtmp/av"
|
||||||
|
"github.com/pion/rtp"
|
||||||
|
|
||||||
"github.com/aler9/rtsp-simple-server/internal/externalcmd"
|
"github.com/aler9/rtsp-simple-server/internal/externalcmd"
|
||||||
"github.com/aler9/rtsp-simple-server/internal/h264"
|
"github.com/aler9/rtsp-simple-server/internal/h264"
|
||||||
|
|
@ -290,9 +291,16 @@ func (c *rtmpConn) runRead(ctx context.Context) error {
|
||||||
pair := data.(rtmpConnTrackIDPayloadPair)
|
pair := data.(rtmpConnTrackIDPayloadPair)
|
||||||
|
|
||||||
if videoTrack != nil && pair.trackID == videoTrackID {
|
if videoTrack != nil && pair.trackID == videoTrackID {
|
||||||
nalus, pts, err := h264Decoder.Decode(pair.buf)
|
var pkt rtp.Packet
|
||||||
|
err := pkt.Unmarshal(pair.buf)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err != rtph264.ErrMorePacketsNeeded {
|
c.log(logger.Warn, "unable to decode RTP packet: %v", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
nalus, pts, err := h264Decoder.DecodeRTP(&pkt)
|
||||||
|
if err != nil {
|
||||||
|
if err != rtph264.ErrMorePacketsNeeded && err != rtph264.ErrNonStartingPacketAndNoPrevious {
|
||||||
c.log(logger.Warn, "unable to decode video track: %v", err)
|
c.log(logger.Warn, "unable to decode video track: %v", err)
|
||||||
}
|
}
|
||||||
continue
|
continue
|
||||||
|
|
@ -311,8 +319,7 @@ func (c *rtmpConn) runRead(ctx context.Context) error {
|
||||||
|
|
||||||
// RTP marker means that all the NALUs with the same PTS have been received.
|
// RTP marker means that all the NALUs with the same PTS have been received.
|
||||||
// send them together.
|
// send them together.
|
||||||
marker := (pair.buf[1] >> 7 & 0x1) > 0
|
if pkt.Marker {
|
||||||
if marker {
|
|
||||||
data, err := h264.EncodeAVCC(videoBuf)
|
data, err := h264.EncodeAVCC(videoBuf)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
|
@ -334,7 +341,14 @@ func (c *rtmpConn) runRead(ctx context.Context) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
} else if audioTrack != nil && pair.trackID == audioTrackID {
|
} else if audioTrack != nil && pair.trackID == audioTrackID {
|
||||||
aus, pts, err := aacDecoder.Decode(pair.buf)
|
var pkt rtp.Packet
|
||||||
|
err := pkt.Unmarshal(pair.buf)
|
||||||
|
if err != nil {
|
||||||
|
c.log(logger.Warn, "unable to decode RTP packet: %v", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
aus, pts, err := aacDecoder.DecodeRTP(&pkt)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err != rtpaac.ErrMorePacketsNeeded {
|
if err != rtpaac.ErrMorePacketsNeeded {
|
||||||
c.log(logger.Warn, "unable to decode audio track: %v", err)
|
c.log(logger.Warn, "unable to decode audio track: %v", err)
|
||||||
|
|
|
||||||
|
|
@ -61,7 +61,7 @@ func (m *multiAccessBuffer) Write(p []byte) (int, error) {
|
||||||
return n, nil
|
return n, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *multiAccessBuffer) NewReader() *multiAccessBufferReader {
|
func (m *multiAccessBuffer) NewReader() io.Reader {
|
||||||
return &multiAccessBufferReader{
|
return &multiAccessBufferReader{
|
||||||
m: m,
|
m: m,
|
||||||
}
|
}
|
||||||
|
|
|
||||||
238
internal/hls/muxer.go
Normal file
238
internal/hls/muxer.go
Normal file
|
|
@ -0,0 +1,238 @@
|
||||||
|
package hls
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"io"
|
||||||
|
"math"
|
||||||
|
"strconv"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/aler9/gortsplib"
|
||||||
|
"github.com/aler9/gortsplib/pkg/rtpaac"
|
||||||
|
|
||||||
|
"github.com/aler9/rtsp-simple-server/internal/h264"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
// an offset is needed to
|
||||||
|
// - avoid negative PTS values
|
||||||
|
// - avoid PTS < DTS during startup
|
||||||
|
ptsOffset = 2 * time.Second
|
||||||
|
|
||||||
|
segmentMinAUCount = 100
|
||||||
|
)
|
||||||
|
|
||||||
|
// Muxer is a HLS muxer.
|
||||||
|
type Muxer struct {
|
||||||
|
hlsSegmentCount int
|
||||||
|
hlsSegmentDuration time.Duration
|
||||||
|
videoTrack *gortsplib.Track
|
||||||
|
audioTrack *gortsplib.Track
|
||||||
|
|
||||||
|
aacConfig rtpaac.MPEG4AudioConfig
|
||||||
|
startPCR time.Time
|
||||||
|
videoDTSEst *h264.DTSEstimator
|
||||||
|
audioAUCount int
|
||||||
|
tsCurrent *tsFile
|
||||||
|
tsQueue []*tsFile
|
||||||
|
tsByName map[string]*tsFile
|
||||||
|
tsDeleteCount int
|
||||||
|
mutex sync.RWMutex
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewMuxer allocates a Muxer.
|
||||||
|
func NewMuxer(
|
||||||
|
hlsSegmentCount int,
|
||||||
|
hlsSegmentDuration time.Duration,
|
||||||
|
videoTrack *gortsplib.Track,
|
||||||
|
audioTrack *gortsplib.Track) (*Muxer, error) {
|
||||||
|
var aacConfig rtpaac.MPEG4AudioConfig
|
||||||
|
if audioTrack != nil {
|
||||||
|
byts, err := audioTrack.ExtractDataAAC()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
err = aacConfig.Decode(byts)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
m := &Muxer{
|
||||||
|
hlsSegmentCount: hlsSegmentCount,
|
||||||
|
hlsSegmentDuration: hlsSegmentDuration,
|
||||||
|
videoTrack: videoTrack,
|
||||||
|
audioTrack: audioTrack,
|
||||||
|
aacConfig: aacConfig,
|
||||||
|
startPCR: time.Now(),
|
||||||
|
videoDTSEst: h264.NewDTSEstimator(),
|
||||||
|
tsCurrent: newTSFile(videoTrack != nil, audioTrack != nil),
|
||||||
|
tsByName: make(map[string]*tsFile),
|
||||||
|
}
|
||||||
|
|
||||||
|
m.tsByName[m.tsCurrent.name] = m.tsCurrent
|
||||||
|
m.tsQueue = append(m.tsQueue, m.tsCurrent)
|
||||||
|
|
||||||
|
return m, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close closes a Muxer.
|
||||||
|
func (m *Muxer) Close() {
|
||||||
|
m.tsCurrent.close()
|
||||||
|
}
|
||||||
|
|
||||||
|
// WriteH264 writes H264 NALUs, grouped by PTS, into the muxer.
|
||||||
|
func (m *Muxer) WriteH264(pts time.Duration, nalus [][]byte) error {
|
||||||
|
idrPresent := func() bool {
|
||||||
|
for _, nalu := range nalus {
|
||||||
|
typ := h264.NALUType(nalu[0] & 0x1F)
|
||||||
|
if typ == h264.NALUTypeIDR {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}()
|
||||||
|
|
||||||
|
// skip group silently until we find one with a IDR
|
||||||
|
if !m.tsCurrent.firstPacketWritten && !idrPresent {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
m.mutex.Lock()
|
||||||
|
defer m.mutex.Unlock()
|
||||||
|
|
||||||
|
if idrPresent {
|
||||||
|
if m.tsCurrent.firstPacketWritten &&
|
||||||
|
m.tsCurrent.duration() >= m.hlsSegmentDuration {
|
||||||
|
if m.tsCurrent != nil {
|
||||||
|
m.tsCurrent.close()
|
||||||
|
}
|
||||||
|
|
||||||
|
m.tsCurrent = newTSFile(m.videoTrack != nil, m.audioTrack != nil)
|
||||||
|
|
||||||
|
m.tsByName[m.tsCurrent.name] = m.tsCurrent
|
||||||
|
m.tsQueue = append(m.tsQueue, m.tsCurrent)
|
||||||
|
if len(m.tsQueue) > m.hlsSegmentCount {
|
||||||
|
delete(m.tsByName, m.tsQueue[0].name)
|
||||||
|
m.tsQueue = m.tsQueue[1:]
|
||||||
|
m.tsDeleteCount++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
m.tsCurrent.setPCR(time.Since(m.startPCR))
|
||||||
|
err := m.tsCurrent.writeH264(
|
||||||
|
m.videoDTSEst.Feed(pts+ptsOffset),
|
||||||
|
pts+ptsOffset,
|
||||||
|
idrPresent,
|
||||||
|
nalus)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// WriteAAC writes AAC AUs, grouped by PTS, into the muxer.
|
||||||
|
func (m *Muxer) WriteAAC(pts time.Duration, aus [][]byte) error {
|
||||||
|
m.mutex.Lock()
|
||||||
|
defer m.mutex.Unlock()
|
||||||
|
|
||||||
|
if m.videoTrack != nil {
|
||||||
|
if m.tsCurrent.firstPacketWritten &&
|
||||||
|
m.tsCurrent.duration() >= m.hlsSegmentDuration &&
|
||||||
|
m.audioAUCount >= segmentMinAUCount {
|
||||||
|
|
||||||
|
if m.tsCurrent != nil {
|
||||||
|
m.tsCurrent.close()
|
||||||
|
}
|
||||||
|
|
||||||
|
m.audioAUCount = 0
|
||||||
|
m.tsCurrent = newTSFile(m.videoTrack != nil, m.audioTrack != nil)
|
||||||
|
m.tsByName[m.tsCurrent.name] = m.tsCurrent
|
||||||
|
m.tsQueue = append(m.tsQueue, m.tsCurrent)
|
||||||
|
if len(m.tsQueue) > m.hlsSegmentCount {
|
||||||
|
delete(m.tsByName, m.tsQueue[0].name)
|
||||||
|
m.tsQueue = m.tsQueue[1:]
|
||||||
|
m.tsDeleteCount++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if !m.tsCurrent.firstPacketWritten {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for i, au := range aus {
|
||||||
|
auPTS := pts + time.Duration(i)*1000*time.Second/time.Duration(m.aacConfig.SampleRate)
|
||||||
|
|
||||||
|
m.audioAUCount++
|
||||||
|
m.tsCurrent.setPCR(time.Since(m.startPCR))
|
||||||
|
err := m.tsCurrent.writeAAC(
|
||||||
|
m.aacConfig.SampleRate,
|
||||||
|
m.aacConfig.ChannelCount,
|
||||||
|
auPTS+ptsOffset,
|
||||||
|
au)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Playlist returns a reader to read the HLS playlist in M3U8 format.
|
||||||
|
func (m *Muxer) Playlist() io.Reader {
|
||||||
|
m.mutex.RLock()
|
||||||
|
defer m.mutex.RUnlock()
|
||||||
|
|
||||||
|
if len(m.tsQueue) == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
cnt := "#EXTM3U\n"
|
||||||
|
cnt += "#EXT-X-VERSION:3\n"
|
||||||
|
cnt += "#EXT-X-ALLOW-CACHE:NO\n"
|
||||||
|
|
||||||
|
targetDuration := func() uint {
|
||||||
|
ret := uint(math.Ceil(m.hlsSegmentDuration.Seconds()))
|
||||||
|
|
||||||
|
// EXTINF, when rounded to the nearest integer, must be <= EXT-X-TARGETDURATION
|
||||||
|
for _, f := range m.tsQueue {
|
||||||
|
v2 := uint(math.Round(f.duration().Seconds()))
|
||||||
|
if v2 > ret {
|
||||||
|
ret = v2
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return ret
|
||||||
|
}()
|
||||||
|
cnt += "#EXT-X-TARGETDURATION:" + strconv.FormatUint(uint64(targetDuration), 10) + "\n"
|
||||||
|
|
||||||
|
cnt += "#EXT-X-MEDIA-SEQUENCE:" + strconv.FormatInt(int64(m.tsDeleteCount), 10) + "\n"
|
||||||
|
|
||||||
|
for _, f := range m.tsQueue {
|
||||||
|
cnt += "#EXTINF:" + strconv.FormatFloat(f.duration().Seconds(), 'f', -1, 64) + ",\n"
|
||||||
|
cnt += f.name + ".ts\n"
|
||||||
|
}
|
||||||
|
|
||||||
|
return bytes.NewReader([]byte(cnt))
|
||||||
|
}
|
||||||
|
|
||||||
|
// TSFile returns a reader to read a given MPEG-TS file.
|
||||||
|
func (m *Muxer) TSFile(fname string) io.Reader {
|
||||||
|
base := strings.TrimSuffix(fname, ".ts")
|
||||||
|
|
||||||
|
m.mutex.RLock()
|
||||||
|
f, ok := m.tsByName[base]
|
||||||
|
m.mutex.RUnlock()
|
||||||
|
|
||||||
|
if !ok {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return f.newReader()
|
||||||
|
}
|
||||||
54
internal/hls/muxer_test.go
Normal file
54
internal/hls/muxer_test.go
Normal file
|
|
@ -0,0 +1,54 @@
|
||||||
|
package hls
|
||||||
|
|
||||||
|
import (
|
||||||
|
"io/ioutil"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/aler9/gortsplib"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestMuxer(t *testing.T) {
|
||||||
|
videoTrack, err := gortsplib.NewTrackH264(96, []byte{0x01, 0x02, 0x03, 0x04}, []byte{0x01, 0x02, 0x03, 0x04})
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
audioTrack, err := gortsplib.NewTrackAAC(97, []byte{17, 144})
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
m, err := NewMuxer(3, 5*time.Second, videoTrack, audioTrack)
|
||||||
|
require.NoError(t, err)
|
||||||
|
defer m.Close()
|
||||||
|
|
||||||
|
// group without IDR
|
||||||
|
err = m.WriteH264(1*time.Second, [][]byte{
|
||||||
|
{0x06},
|
||||||
|
{0x07},
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// group with IDR
|
||||||
|
err = m.WriteH264(2*time.Second, [][]byte{
|
||||||
|
{0x05},
|
||||||
|
{0x06},
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
err = m.WriteAAC(3*time.Second, [][]byte{
|
||||||
|
{0x01, 0x02, 0x03, 0x04},
|
||||||
|
{0x05, 0x06, 0x07, 0x08},
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// group without IDR
|
||||||
|
err = m.WriteH264(4*time.Second, [][]byte{
|
||||||
|
{0x06},
|
||||||
|
{0x07},
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
byts, err := ioutil.ReadAll(m.Playlist())
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
require.Regexp(t, `^#EXTM3U\n#EXT-X-VERSION:3\n#EXT-X-ALLOW-CACHE:NO\n#EXT-X-TARGETDURATION:5\n#EXT-X-MEDIA-SEQUENCE:0\n#EXTINF:2,\n[0-9]+\.ts\n$`, string(byts))
|
||||||
|
}
|
||||||
|
|
@ -6,15 +6,13 @@ import (
|
||||||
"strconv"
|
"strconv"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/aler9/gortsplib"
|
|
||||||
"github.com/asticode/go-astits"
|
"github.com/asticode/go-astits"
|
||||||
|
|
||||||
"github.com/aler9/rtsp-simple-server/internal/aac"
|
"github.com/aler9/rtsp-simple-server/internal/aac"
|
||||||
"github.com/aler9/rtsp-simple-server/internal/h264"
|
"github.com/aler9/rtsp-simple-server/internal/h264"
|
||||||
)
|
)
|
||||||
|
|
||||||
// TSFile is a MPEG-TS file.
|
type tsFile struct {
|
||||||
type TSFile struct {
|
|
||||||
name string
|
name string
|
||||||
buf *multiAccessBuffer
|
buf *multiAccessBuffer
|
||||||
mux *astits.Muxer
|
mux *astits.Muxer
|
||||||
|
|
@ -25,30 +23,29 @@ type TSFile struct {
|
||||||
maxPTS time.Duration
|
maxPTS time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewTSFile allocates a TSFile.
|
func newTSFile(hasVideoTrack bool, hasAudioTrack bool) *tsFile {
|
||||||
func NewTSFile(videoTrack *gortsplib.Track, audioTrack *gortsplib.Track) *TSFile {
|
t := &tsFile{
|
||||||
t := &TSFile{
|
|
||||||
buf: newMultiAccessBuffer(),
|
buf: newMultiAccessBuffer(),
|
||||||
name: strconv.FormatInt(time.Now().Unix(), 10),
|
name: strconv.FormatInt(time.Now().Unix(), 10),
|
||||||
}
|
}
|
||||||
|
|
||||||
t.mux = astits.NewMuxer(context.Background(), t.buf)
|
t.mux = astits.NewMuxer(context.Background(), t.buf)
|
||||||
|
|
||||||
if videoTrack != nil {
|
if hasVideoTrack {
|
||||||
t.mux.AddElementaryStream(astits.PMTElementaryStream{
|
t.mux.AddElementaryStream(astits.PMTElementaryStream{
|
||||||
ElementaryPID: 256,
|
ElementaryPID: 256,
|
||||||
StreamType: astits.StreamTypeH264Video,
|
StreamType: astits.StreamTypeH264Video,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
if audioTrack != nil {
|
if hasAudioTrack {
|
||||||
t.mux.AddElementaryStream(astits.PMTElementaryStream{
|
t.mux.AddElementaryStream(astits.PMTElementaryStream{
|
||||||
ElementaryPID: 257,
|
ElementaryPID: 257,
|
||||||
StreamType: astits.StreamTypeAACAudio,
|
StreamType: astits.StreamTypeAACAudio,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
if videoTrack != nil {
|
if hasVideoTrack {
|
||||||
t.pcrTrackIsVideo = true
|
t.pcrTrackIsVideo = true
|
||||||
t.mux.SetPCRPID(256)
|
t.mux.SetPCRPID(256)
|
||||||
} else {
|
} else {
|
||||||
|
|
@ -63,38 +60,23 @@ func NewTSFile(videoTrack *gortsplib.Track, audioTrack *gortsplib.Track) *TSFile
|
||||||
return t
|
return t
|
||||||
}
|
}
|
||||||
|
|
||||||
// Close closes a TSFile.
|
func (t *tsFile) close() error {
|
||||||
func (t *TSFile) Close() error {
|
|
||||||
return t.buf.Close()
|
return t.buf.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Name returns the file name.
|
func (t *tsFile) duration() time.Duration {
|
||||||
func (t *TSFile) Name() string {
|
|
||||||
return t.name
|
|
||||||
}
|
|
||||||
|
|
||||||
// Duration returns the file duration.
|
|
||||||
func (t *TSFile) Duration() time.Duration {
|
|
||||||
return t.maxPTS - t.minPTS
|
return t.maxPTS - t.minPTS
|
||||||
}
|
}
|
||||||
|
|
||||||
// FirstPacketWritten returns whether a packet ha been written into the file.
|
func (t *tsFile) setPCR(pcr time.Duration) {
|
||||||
func (t *TSFile) FirstPacketWritten() bool {
|
|
||||||
return t.firstPacketWritten
|
|
||||||
}
|
|
||||||
|
|
||||||
// SetPCR sets the PCR.
|
|
||||||
func (t *TSFile) SetPCR(pcr time.Duration) {
|
|
||||||
t.pcr = pcr
|
t.pcr = pcr
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewReader allocates a reader to read the file.
|
func (t *tsFile) newReader() io.Reader {
|
||||||
func (t *TSFile) NewReader() io.Reader {
|
|
||||||
return t.buf.NewReader()
|
return t.buf.NewReader()
|
||||||
}
|
}
|
||||||
|
|
||||||
// WriteH264 writes H264 NALUs into the file.
|
func (t *tsFile) writeH264(dts time.Duration, pts time.Duration, isIDR bool, nalus [][]byte) error {
|
||||||
func (t *TSFile) WriteH264(dts time.Duration, pts time.Duration, isIDR bool, nalus [][]byte) error {
|
|
||||||
if t.pcrTrackIsVideo {
|
if t.pcrTrackIsVideo {
|
||||||
if !t.firstPacketWritten {
|
if !t.firstPacketWritten {
|
||||||
t.firstPacketWritten = true
|
t.firstPacketWritten = true
|
||||||
|
|
@ -143,8 +125,7 @@ func (t *TSFile) WriteH264(dts time.Duration, pts time.Duration, isIDR bool, nal
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// WriteAAC writes AAC AUs into the file.
|
func (t *tsFile) writeAAC(sampleRate int, channelCount int, pts time.Duration, au []byte) error {
|
||||||
func (t *TSFile) WriteAAC(sampleRate int, channelCount int, pts time.Duration, au []byte) error {
|
|
||||||
if !t.pcrTrackIsVideo {
|
if !t.pcrTrackIsVideo {
|
||||||
if !t.firstPacketWritten {
|
if !t.firstPacketWritten {
|
||||||
t.firstPacketWritten = true
|
t.firstPacketWritten = true
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue