mediamtx/internal/hls/client_processor_mpegts_track.go
Alessandro Ros e5ab731d14
Improve HLS client (#1179)
* hls source: support fMP4s video streams

* hls source: start reading live streams from (end of playlist - starting point)

* hls client: wait processing of current fMP4 segment before downloading another one

* hls client: support fmp4 trun boxes with default sample duration, flags and size

* hls client: merge fmp4 init file reader and writer

* hls client: merge fmp4 part reader and writer

* hls client: improve precision of go <-> mp4 time conversion

* hls client: fix esds generation in go-mp4

* hls client: support audio in separate playlist

* hls client: support an arbitrary number of tracks in fmp4 init files

* hls client: support EXT-X-BYTERANGE

* hls client: support fmp4 segments with multiple parts at once

* hls client: support an arbitrary number of mpeg-ts tracks

* hls client: synchronize tracks around a primary track

* update go-mp4

* hls: synchronize track reproduction around a leading one

* hls client: reset stream if playback is too late

* hls client: add limit on DTS-RTC difference

* hls client: support again streams that don't provide codecs in master playlist
2022-10-23 14:04:33 +02:00

58 lines
1.2 KiB
Go

package hls
import (
"context"
"time"
"github.com/asticode/go-astits"
)
type clientProcessorMPEGTSTrack struct {
ts *clientTimeSyncMPEGTS
onEntry func(time.Duration, []byte) error
queue chan *astits.PESData
}
func newClientProcessorMPEGTSTrack(
ts *clientTimeSyncMPEGTS,
onEntry func(time.Duration, []byte) error,
) *clientProcessorMPEGTSTrack {
return &clientProcessorMPEGTSTrack{
ts: ts,
onEntry: onEntry,
queue: make(chan *astits.PESData, clientMPEGTSEntryQueueSize),
}
}
func (t *clientProcessorMPEGTSTrack) run(ctx context.Context) error {
for {
select {
case pes := <-t.queue:
err := t.processEntry(ctx, pes)
if err != nil {
return err
}
case <-ctx.Done():
return nil
}
}
}
func (t *clientProcessorMPEGTSTrack) processEntry(ctx context.Context, pes *astits.PESData) error {
rawPTS := pes.Header.OptionalHeader.PTS.Base
var rawDTS int64
if pes.Header.OptionalHeader.PTSDTSIndicator == astits.PTSDTSIndicatorBothPresent {
rawDTS = pes.Header.OptionalHeader.DTS.Base
} else {
rawDTS = rawPTS
}
pts, err := t.ts.convertAndSync(ctx, rawDTS, rawPTS)
if err != nil {
return err
}
return t.onEntry(pts, pes.Data)
}