1
0
Fork 0
forked from External/mediamtx

playback: support serving tracks with any time scale (#3211)

This commit is contained in:
Alessandro Ros 2024-04-08 23:56:27 +02:00 committed by GitHub
parent 14bf7f4cc5
commit 55355ad58a
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 137 additions and 62 deletions

View file

@ -1,7 +1,9 @@
package playback package playback
import "github.com/bluenviron/mediacommon/pkg/formats/fmp4"
type muxer interface { type muxer interface {
writeInit(init []byte) writeInit(init *fmp4.Init)
setTrack(trackID int) setTrack(trackID int)
writeSample(dts int64, ptsOffset int32, isNonSyncSample bool, payload []byte) error writeSample(dts int64, ptsOffset int32, isNonSyncSample bool, payload []byte) error
writeFinalDTS(dts int64) writeFinalDTS(dts int64)

View file

@ -8,13 +8,16 @@ import (
"github.com/bluenviron/mediacommon/pkg/formats/fmp4/seekablebuffer" "github.com/bluenviron/mediacommon/pkg/formats/fmp4/seekablebuffer"
) )
var partSize = durationGoToMp4(1*time.Second, fmp4Timescale) const (
partSize = 1 * time.Second
)
type muxerFMP4Track struct { type muxerFMP4Track struct {
id int id int
firstDTS int64 timeScale uint32
lastDTS int64 firstDTS int64
samples []*fmp4.PartSample lastDTS int64
samples []*fmp4.PartSample
} }
func findTrack(tracks []*muxerFMP4Track, id int) *muxerFMP4Track { func findTrack(tracks []*muxerFMP4Track, id int) *muxerFMP4Track {
@ -29,26 +32,29 @@ func findTrack(tracks []*muxerFMP4Track, id int) *muxerFMP4Track {
type muxerFMP4 struct { type muxerFMP4 struct {
w io.Writer w io.Writer
init []byte init *fmp4.Init
nextSequenceNumber uint32 nextSequenceNumber uint32
tracks []*muxerFMP4Track tracks []*muxerFMP4Track
curTrack *muxerFMP4Track curTrack *muxerFMP4Track
outBuf seekablebuffer.Buffer outBuf seekablebuffer.Buffer
} }
func (w *muxerFMP4) writeInit(init []byte) { func (w *muxerFMP4) writeInit(init *fmp4.Init) {
w.init = init w.init = init
w.tracks = make([]*muxerFMP4Track, len(init.Tracks))
for i, track := range init.Tracks {
w.tracks[i] = &muxerFMP4Track{
id: track.ID,
timeScale: track.TimeScale,
firstDTS: -1,
}
}
} }
func (w *muxerFMP4) setTrack(trackID int) { func (w *muxerFMP4) setTrack(trackID int) {
w.curTrack = findTrack(w.tracks, trackID) w.curTrack = findTrack(w.tracks, trackID)
if w.curTrack == nil {
w.curTrack = &muxerFMP4Track{
id: trackID,
firstDTS: -1,
}
w.tracks = append(w.tracks, w.curTrack)
}
} }
func (w *muxerFMP4) writeSample(dts int64, ptsOffset int32, isNonSyncSample bool, payload []byte) error { func (w *muxerFMP4) writeSample(dts int64, ptsOffset int32, isNonSyncSample bool, payload []byte) error {
@ -75,7 +81,9 @@ func (w *muxerFMP4) writeSample(dts int64, ptsOffset int32, isNonSyncSample bool
}) })
w.curTrack.lastDTS = dts w.curTrack.lastDTS = dts
if (w.curTrack.lastDTS - w.curTrack.firstDTS) > int64(partSize) { partSizeMP4 := durationGoToMp4(partSize, w.curTrack.timeScale)
if (w.curTrack.lastDTS - w.curTrack.firstDTS) > partSizeMP4 {
err := w.innerFlush(false) err := w.innerFlush(false)
if err != nil { if err != nil {
return err return err
@ -141,11 +149,18 @@ func (w *muxerFMP4) innerFlush(final bool) error {
w.nextSequenceNumber++ w.nextSequenceNumber++
if w.init != nil { if w.init != nil {
_, err := w.w.Write(w.init) err := w.init.Marshal(&w.outBuf)
if err != nil { if err != nil {
return err return err
} }
_, err = w.w.Write(w.outBuf.Bytes())
if err != nil {
return err
}
w.init = nil w.init = nil
w.outBuf.Reset()
} }
err := part.Marshal(&w.outBuf) err := part.Marshal(&w.outBuf)

View file

@ -9,6 +9,7 @@ import (
"strconv" "strconv"
"time" "time"
"github.com/bluenviron/mediacommon/pkg/formats/fmp4"
"github.com/bluenviron/mediamtx/internal/conf" "github.com/bluenviron/mediamtx/internal/conf"
"github.com/bluenviron/mediamtx/internal/logger" "github.com/bluenviron/mediamtx/internal/logger"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
@ -48,7 +49,7 @@ func seekAndMux(
m muxer, m muxer,
) error { ) error {
if recordFormat == conf.RecordFormatFMP4 { if recordFormat == conf.RecordFormatFMP4 {
var firstInit []byte var firstInit *fmp4.Init
var segmentEnd time.Time var segmentEnd time.Time
err := func() error { err := func() error {
@ -67,7 +68,7 @@ func seekAndMux(
segmentStartOffset := start.Sub(segments[0].Start) segmentStartOffset := start.Sub(segments[0].Start)
segmentMaxElapsed, err := segmentFMP4SeekAndMuxParts(f, segmentStartOffset, duration, m) segmentMaxElapsed, err := segmentFMP4SeekAndMuxParts(f, segmentStartOffset, duration, firstInit, m)
if err != nil { if err != nil {
return err return err
} }
@ -99,7 +100,7 @@ func seekAndMux(
segmentStartOffset := seg.Start.Sub(start) segmentStartOffset := seg.Start.Sub(start)
segmentMaxElapsed, err := segmentFMP4WriteParts(f, segmentStartOffset, duration, m) segmentMaxElapsed, err := segmentFMP4WriteParts(f, segmentStartOffset, duration, firstInit, m)
if err != nil { if err != nil {
return err return err
} }

View file

@ -9,6 +9,7 @@ import (
"os" "os"
"time" "time"
"github.com/bluenviron/mediacommon/pkg/formats/fmp4"
"github.com/bluenviron/mediamtx/internal/conf" "github.com/bluenviron/mediamtx/internal/conf"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
) )
@ -27,7 +28,7 @@ type listEntry struct {
func computeDurationAndConcatenate(recordFormat conf.RecordFormat, segments []*Segment) ([]listEntry, error) { func computeDurationAndConcatenate(recordFormat conf.RecordFormat, segments []*Segment) ([]listEntry, error) {
if recordFormat == conf.RecordFormatFMP4 { if recordFormat == conf.RecordFormatFMP4 {
out := []listEntry{} out := []listEntry{}
var prevInit []byte var prevInit *fmp4.Init
for _, seg := range segments { for _, seg := range segments {
err := func() error { err := func() error {
@ -47,7 +48,7 @@ func computeDurationAndConcatenate(recordFormat conf.RecordFormat, segments []*S
return err return err
} }
maxDuration, err := segmentFMP4ReadMaxDuration(f) maxDuration, err := segmentFMP4ReadMaxDuration(f, init)
if err != nil { if err != nil {
return err return err
} }

View file

@ -5,45 +5,55 @@ import (
"errors" "errors"
"fmt" "fmt"
"io" "io"
"reflect"
"time" "time"
"github.com/abema/go-mp4" "github.com/abema/go-mp4"
"github.com/bluenviron/mediacommon/pkg/formats/fmp4"
) )
const ( const (
sampleFlagIsNonSyncSample = 1 << 16 sampleFlagIsNonSyncSample = 1 << 16
concatenationTolerance = 500 * time.Millisecond concatenationTolerance = 500 * time.Millisecond
fmp4Timescale = 90000
) )
func durationGoToMp4(v time.Duration, timeScale uint32) uint64 { var errTerminated = errors.New("terminated")
timeScale64 := uint64(timeScale)
func durationGoToMp4(v time.Duration, timeScale uint32) int64 {
timeScale64 := int64(timeScale)
secs := v / time.Second secs := v / time.Second
dec := v % time.Second dec := v % time.Second
return uint64(secs)*timeScale64 + uint64(dec)*timeScale64/uint64(time.Second) return int64(secs)*timeScale64 + int64(dec)*timeScale64/int64(time.Second)
} }
func durationMp4ToGo(v uint64, timeScale uint32) time.Duration { func durationMp4ToGo(v int64, timeScale uint32) time.Duration {
timeScale64 := uint64(timeScale) timeScale64 := int64(timeScale)
secs := v / timeScale64 secs := v / timeScale64
dec := v % timeScale64 dec := v % timeScale64
return time.Duration(secs)*time.Second + time.Duration(dec)*time.Second/time.Duration(timeScale64) return time.Duration(secs)*time.Second + time.Duration(dec)*time.Second/time.Duration(timeScale64)
} }
var errTerminated = errors.New("terminated") func findInitTrack(tracks []*fmp4.InitTrack, id int) *fmp4.InitTrack {
for _, track := range tracks {
if track.ID == id {
return track
}
}
return nil
}
func segmentFMP4CanBeConcatenated( func segmentFMP4CanBeConcatenated(
prevInit []byte, prevInit *fmp4.Init,
prevEnd time.Time, prevEnd time.Time,
curInit []byte, curInit *fmp4.Init,
curStart time.Time, curStart time.Time,
) bool { ) bool {
return bytes.Equal(prevInit, curInit) && return reflect.DeepEqual(prevInit, curInit) &&
!curStart.Before(prevEnd.Add(-concatenationTolerance)) && !curStart.Before(prevEnd.Add(-concatenationTolerance)) &&
!curStart.After(prevEnd.Add(concatenationTolerance)) !curStart.After(prevEnd.Add(concatenationTolerance))
} }
func segmentFMP4ReadInit(r io.ReadSeeker) ([]byte, error) { func segmentFMP4ReadInit(r io.ReadSeeker) (*fmp4.Init, error) {
buf := make([]byte, 8) buf := make([]byte, 8)
_, err := io.ReadFull(r, buf) _, err := io.ReadFull(r, buf)
if err != nil { if err != nil {
@ -81,8 +91,6 @@ func segmentFMP4ReadInit(r io.ReadSeeker) ([]byte, error) {
return nil, err return nil, err
} }
// return ftyp and moov
buf = make([]byte, ftypSize+moovSize) buf = make([]byte, ftypSize+moovSize)
_, err = io.ReadFull(r, buf) _, err = io.ReadFull(r, buf)
@ -90,10 +98,19 @@ func segmentFMP4ReadInit(r io.ReadSeeker) ([]byte, error) {
return nil, err return nil, err
} }
return buf, nil var init fmp4.Init
err = init.Unmarshal(bytes.NewReader(buf))
if err != nil {
return nil, err
}
return &init, nil
} }
func segmentFMP4ReadMaxDuration(r io.ReadSeeker) (time.Duration, error) { func segmentFMP4ReadMaxDuration(
r io.ReadSeeker,
init *fmp4.Init,
) (time.Duration, error) {
// find and skip ftyp // find and skip ftyp
buf := make([]byte, 8) buf := make([]byte, 8)
@ -203,7 +220,7 @@ func segmentFMP4ReadMaxDuration(r io.ReadSeeker) (time.Duration, error) {
return 0, err return 0, err
} }
maxElapsed := uint64(0) var maxElapsed time.Duration
// foreach traf // foreach traf
@ -220,7 +237,7 @@ func segmentFMP4ReadMaxDuration(r io.ReadSeeker) (time.Duration, error) {
return 0, fmt.Errorf("traf box not found") return 0, fmt.Errorf("traf box not found")
} }
// skip tfhd // parse tfhd
_, err = io.ReadFull(r, buf) _, err = io.ReadFull(r, buf)
if err != nil { if err != nil {
@ -233,11 +250,24 @@ func segmentFMP4ReadMaxDuration(r io.ReadSeeker) (time.Duration, error) {
tfhdSize := uint32(buf[0])<<24 | uint32(buf[1])<<16 | uint32(buf[2])<<8 | uint32(buf[3]) tfhdSize := uint32(buf[0])<<24 | uint32(buf[1])<<16 | uint32(buf[2])<<8 | uint32(buf[3])
_, err = r.Seek(int64(tfhdSize)-8, io.SeekCurrent) buf2 := make([]byte, tfhdSize-8)
_, err = io.ReadFull(r, buf2)
if err != nil { if err != nil {
return 0, err return 0, err
} }
var tfhd mp4.Tfhd
_, err = mp4.Unmarshal(bytes.NewReader(buf2), uint64(len(buf2)), &tfhd, mp4.Context{})
if err != nil {
return 0, fmt.Errorf("invalid tfhd box: %w", err)
}
track := findInitTrack(init.Tracks, int(tfhd.TrackID))
if track == nil {
return 0, fmt.Errorf("invalid track ID: %v", tfhd.TrackID)
}
// parse tfdt // parse tfdt
_, err = io.ReadFull(r, buf) _, err = io.ReadFull(r, buf)
@ -251,7 +281,7 @@ func segmentFMP4ReadMaxDuration(r io.ReadSeeker) (time.Duration, error) {
tfdtSize := uint32(buf[0])<<24 | uint32(buf[1])<<16 | uint32(buf[2])<<8 | uint32(buf[3]) tfdtSize := uint32(buf[0])<<24 | uint32(buf[1])<<16 | uint32(buf[2])<<8 | uint32(buf[3])
buf2 := make([]byte, tfdtSize-8) buf2 = make([]byte, tfdtSize-8)
_, err = io.ReadFull(r, buf2) _, err = io.ReadFull(r, buf2)
if err != nil { if err != nil {
@ -290,33 +320,37 @@ func segmentFMP4ReadMaxDuration(r io.ReadSeeker) (time.Duration, error) {
return 0, fmt.Errorf("invalid trun box: %w", err) return 0, fmt.Errorf("invalid trun box: %w", err)
} }
elapsed := tfdt.BaseMediaDecodeTimeV1 elapsed := int64(tfdt.BaseMediaDecodeTimeV1)
for _, entry := range trun.Entries { for _, entry := range trun.Entries {
elapsed += uint64(entry.SampleDuration) elapsed += int64(entry.SampleDuration)
} }
if elapsed > maxElapsed { elapsedGo := durationMp4ToGo(elapsed, track.TimeScale)
maxElapsed = elapsed
if elapsedGo > maxElapsed {
maxElapsed = elapsedGo
} }
} }
return durationMp4ToGo(maxElapsed, fmp4Timescale), nil return maxElapsed, nil
} }
func segmentFMP4SeekAndMuxParts( func segmentFMP4SeekAndMuxParts(
r io.ReadSeeker, r io.ReadSeeker,
segmentStartOffset time.Duration, segmentStartOffset time.Duration,
duration time.Duration, duration time.Duration,
init *fmp4.Init,
m muxer, m muxer,
) (time.Duration, error) { ) (time.Duration, error) {
segmentStartOffsetMP4 := durationGoToMp4(segmentStartOffset, fmp4Timescale) var segmentStartOffsetMP4 int64
durationMP4 := durationGoToMp4(duration, fmp4Timescale) var durationMP4 int64
moofOffset := uint64(0) moofOffset := uint64(0)
var tfhd *mp4.Tfhd var tfhd *mp4.Tfhd
var tfdt *mp4.Tfdt var tfdt *mp4.Tfdt
atLeastOnePartWritten := false atLeastOnePartWritten := false
maxMuxerDTS := int64(0) var timeScale uint32
var maxMuxerDTS time.Duration
breakAtNextMdat := false breakAtNextMdat := false
_, err := mp4.ReadBoxStructure(r, func(h *mp4.ReadHandle) (interface{}, error) { _, err := mp4.ReadBoxStructure(r, func(h *mp4.ReadHandle) (interface{}, error) {
@ -342,7 +376,15 @@ func segmentFMP4SeekAndMuxParts(
} }
tfdt = box.(*mp4.Tfdt) tfdt = box.(*mp4.Tfdt)
track := findInitTrack(init.Tracks, int(tfhd.TrackID))
if track == nil {
return nil, fmt.Errorf("invalid track ID: %v", tfhd.TrackID)
}
m.setTrack(int(tfhd.TrackID)) m.setTrack(int(tfhd.TrackID))
timeScale = track.TimeScale
segmentStartOffsetMP4 = durationGoToMp4(segmentStartOffset, track.TimeScale)
durationMP4 = durationGoToMp4(duration, track.TimeScale)
case "trun": case "trun":
box, _, err := h.ReadPayload() box, _, err := h.ReadPayload()
@ -358,11 +400,11 @@ func segmentFMP4SeekAndMuxParts(
return nil, err return nil, err
} }
muxerDTS := int64(tfdt.BaseMediaDecodeTimeV1) - int64(segmentStartOffsetMP4) muxerDTS := int64(tfdt.BaseMediaDecodeTimeV1) - segmentStartOffsetMP4
atLeastOneSampleWritten := false atLeastOneSampleWritten := false
for _, e := range trun.Entries { for _, e := range trun.Entries {
if muxerDTS >= int64(durationMP4) { if muxerDTS >= durationMP4 {
breakAtNextMdat = true breakAtNextMdat = true
break break
} }
@ -395,8 +437,10 @@ func segmentFMP4SeekAndMuxParts(
m.writeFinalDTS(muxerDTS) m.writeFinalDTS(muxerDTS)
} }
if muxerDTS > maxMuxerDTS { muxerDTSGo := durationMp4ToGo(muxerDTS, timeScale)
maxMuxerDTS = muxerDTS
if muxerDTSGo > maxMuxerDTS {
maxMuxerDTS = muxerDTSGo
} }
case "mdat": case "mdat":
@ -414,21 +458,23 @@ func segmentFMP4SeekAndMuxParts(
return 0, errNoSegmentsFound return 0, errNoSegmentsFound
} }
return durationMp4ToGo(uint64(maxMuxerDTS), fmp4Timescale), nil return maxMuxerDTS, nil
} }
func segmentFMP4WriteParts( func segmentFMP4WriteParts(
r io.ReadSeeker, r io.ReadSeeker,
segmentStartOffset time.Duration, segmentStartOffset time.Duration,
duration time.Duration, duration time.Duration,
init *fmp4.Init,
m muxer, m muxer,
) (time.Duration, error) { ) (time.Duration, error) {
segmentStartOffsetMP4 := durationGoToMp4(segmentStartOffset, fmp4Timescale) var segmentStartOffsetMP4 int64
durationMP4 := durationGoToMp4(duration, fmp4Timescale) var durationMP4 int64
moofOffset := uint64(0) moofOffset := uint64(0)
var tfhd *mp4.Tfhd var tfhd *mp4.Tfhd
var tfdt *mp4.Tfdt var tfdt *mp4.Tfdt
maxMuxerDTS := int64(0) var timeScale uint32
var maxMuxerDTS time.Duration
breakAtNextMdat := false breakAtNextMdat := false
_, err := mp4.ReadBoxStructure(r, func(h *mp4.ReadHandle) (interface{}, error) { _, err := mp4.ReadBoxStructure(r, func(h *mp4.ReadHandle) (interface{}, error) {
@ -454,7 +500,15 @@ func segmentFMP4WriteParts(
} }
tfdt = box.(*mp4.Tfdt) tfdt = box.(*mp4.Tfdt)
track := findInitTrack(init.Tracks, int(tfhd.TrackID))
if track == nil {
return nil, fmt.Errorf("invalid track ID: %v", tfhd.TrackID)
}
m.setTrack(int(tfhd.TrackID)) m.setTrack(int(tfhd.TrackID))
timeScale = track.TimeScale
segmentStartOffsetMP4 = durationGoToMp4(segmentStartOffset, track.TimeScale)
durationMP4 = durationGoToMp4(duration, track.TimeScale)
case "trun": case "trun":
box, _, err := h.ReadPayload() box, _, err := h.ReadPayload()
@ -470,11 +524,11 @@ func segmentFMP4WriteParts(
return nil, err return nil, err
} }
muxerDTS := int64(tfdt.BaseMediaDecodeTimeV1) + int64(segmentStartOffsetMP4) muxerDTS := int64(tfdt.BaseMediaDecodeTimeV1) + segmentStartOffsetMP4
atLeastOneSampleWritten := false atLeastOneSampleWritten := false
for _, e := range trun.Entries { for _, e := range trun.Entries {
if muxerDTS >= int64(durationMP4) { if muxerDTS >= durationMP4 {
breakAtNextMdat = true breakAtNextMdat = true
break break
} }
@ -503,8 +557,10 @@ func segmentFMP4WriteParts(
m.writeFinalDTS(muxerDTS) m.writeFinalDTS(muxerDTS)
} }
if muxerDTS > maxMuxerDTS { muxerDTSGo := durationMp4ToGo(muxerDTS, timeScale)
maxMuxerDTS = muxerDTS
if muxerDTSGo > maxMuxerDTS {
maxMuxerDTS = muxerDTSGo
} }
case "mdat": case "mdat":
@ -518,5 +574,5 @@ func segmentFMP4WriteParts(
return 0, err return 0, err
} }
return durationMp4ToGo(uint64(maxMuxerDTS), fmp4Timescale), nil return maxMuxerDTS, nil
} }