1
0
Fork 0
forked from External/mediamtx

playaback: use a fixed fMP4 part duration (#3203)

This commit is contained in:
Alessandro Ros 2024-04-07 22:33:49 +02:00 committed by GitHub
parent 3144b31185
commit d5a18bf78f
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 73 additions and 105 deletions

View file

@ -3,8 +3,7 @@ package playback
type muxer interface {
writeInit(init []byte)
setTrack(trackID int)
writeSample(dts int64, ptsOffset int32, isNonSyncSample bool, payload []byte)
writeSample(dts int64, ptsOffset int32, isNonSyncSample bool, payload []byte) error
writeFinalDTS(dts int64)
flush() error
finalFlush() error
}

View file

@ -2,15 +2,17 @@ package playback
import (
"io"
"time"
"github.com/bluenviron/mediacommon/pkg/formats/fmp4"
"github.com/bluenviron/mediacommon/pkg/formats/fmp4/seekablebuffer"
)
var partSize = durationGoToMp4(1*time.Second, fmp4Timescale)
type muxerFMP4Track struct {
started bool
id int
firstDTS uint64
firstDTS int64
lastDTS int64
samples []*fmp4.PartSample
}
@ -42,58 +44,27 @@ func (w *muxerFMP4) setTrack(trackID int) {
w.curTrack = findTrack(w.tracks, trackID)
if w.curTrack == nil {
w.curTrack = &muxerFMP4Track{
id: trackID,
id: trackID,
firstDTS: -1,
}
w.tracks = append(w.tracks, w.curTrack)
}
}
func (w *muxerFMP4) writeSample(dts int64, ptsOffset int32, isNonSyncSample bool, payload []byte) {
if !w.curTrack.started {
if dts >= 0 {
w.curTrack.started = true
w.curTrack.firstDTS = uint64(dts)
func (w *muxerFMP4) writeSample(dts int64, ptsOffset int32, isNonSyncSample bool, payload []byte) error {
if dts >= 0 {
if w.curTrack.firstDTS < 0 {
w.curTrack.firstDTS = dts
// reset GOP preceding the first frame
if !isNonSyncSample {
w.curTrack.samples = []*fmp4.PartSample{{
PTSOffset: ptsOffset,
IsNonSyncSample: isNonSyncSample,
Payload: payload,
}}
} else {
w.curTrack.samples = append(w.curTrack.samples, &fmp4.PartSample{
PTSOffset: ptsOffset,
IsNonSyncSample: isNonSyncSample,
Payload: payload,
})
w.curTrack.samples = nil
}
w.curTrack.lastDTS = dts
} else {
ptsOffset = 0
if !isNonSyncSample {
w.curTrack.samples = []*fmp4.PartSample{{
PTSOffset: ptsOffset,
IsNonSyncSample: isNonSyncSample,
Payload: payload,
}}
} else {
w.curTrack.samples = append(w.curTrack.samples, &fmp4.PartSample{
PTSOffset: ptsOffset,
IsNonSyncSample: isNonSyncSample,
Payload: payload,
})
}
}
} else {
if w.curTrack.samples == nil {
w.curTrack.firstDTS = uint64(dts)
} else {
diff := dts - w.curTrack.lastDTS
if diff < 0 {
diff = 0
}
w.curTrack.samples[len(w.curTrack.samples)-1].Duration = uint32(diff)
}
@ -103,25 +74,48 @@ func (w *muxerFMP4) writeSample(dts int64, ptsOffset int32, isNonSyncSample bool
Payload: payload,
})
w.curTrack.lastDTS = dts
if (w.curTrack.lastDTS - w.curTrack.firstDTS) > int64(partSize) {
err := w.innerFlush(false)
if err != nil {
return err
}
}
} else {
// store GOP preceding the first frame, with PTSOffset = 0 and Duration = 0
if !isNonSyncSample {
w.curTrack.samples = []*fmp4.PartSample{{
IsNonSyncSample: isNonSyncSample,
Payload: payload,
}}
} else {
w.curTrack.samples = append(w.curTrack.samples, &fmp4.PartSample{
IsNonSyncSample: isNonSyncSample,
Payload: payload,
})
}
}
return nil
}
func (w *muxerFMP4) writeFinalDTS(dts int64) {
if w.curTrack.started && w.curTrack.samples != nil {
if w.curTrack.firstDTS >= 0 {
diff := dts - w.curTrack.lastDTS
if diff < 0 {
diff = 0
}
w.curTrack.samples[len(w.curTrack.samples)-1].Duration = uint32(diff)
}
}
func (w *muxerFMP4) flush2(final bool) error {
func (w *muxerFMP4) innerFlush(final bool) error {
var part fmp4.Part
for _, track := range w.tracks {
if track.started && (len(track.samples) > 1 || (final && len(track.samples) != 0)) {
if track.firstDTS >= 0 && (len(track.samples) > 1 || (final && len(track.samples) != 0)) {
// do not write the final sample
// in order to allow changing its duration to compensate NTP-DTS differences
var samples []*fmp4.PartSample
if !final {
samples = track.samples[:len(track.samples)-1]
@ -131,15 +125,13 @@ func (w *muxerFMP4) flush2(final bool) error {
part.Tracks = append(part.Tracks, &fmp4.PartTrack{
ID: track.id,
BaseTime: track.firstDTS,
BaseTime: uint64(track.firstDTS),
Samples: samples,
})
if !final {
track.samples = track.samples[len(track.samples)-1:]
track.firstDTS = uint64(track.lastDTS)
} else {
track.samples = nil
track.firstDTS = track.lastDTS
}
}
}
@ -173,9 +165,5 @@ func (w *muxerFMP4) flush2(final bool) error {
}
func (w *muxerFMP4) flush() error {
return w.flush2(false)
}
func (w *muxerFMP4) finalFlush() error {
return w.flush2(true)
return w.innerFlush(true)
}

View file

@ -110,14 +110,13 @@ func seekAndMux(
}()
if err != nil {
if errors.Is(err, errStopIteration) {
return nil
break
}
return err
}
}
err = m.finalFlush()
err = m.flush()
if err != nil {
return err
}

View file

@ -229,6 +229,7 @@ func TestOnGet(t *testing.T) {
writeSegment1(t, filepath.Join(dir, "mypath", "2008-11-07_11-22-00-500000.mp4"))
writeSegment2(t, filepath.Join(dir, "mypath", "2008-11-07_11-23-02-500000.mp4"))
writeSegment2(t, filepath.Join(dir, "mypath", "2008-11-07_11-23-04-500000.mp4"))
s := &Server{
Address: "127.0.0.1:9996",
@ -252,7 +253,7 @@ func TestOnGet(t *testing.T) {
v := url.Values{}
v.Set("path", "mypath")
v.Set("start", time.Date(2008, 11, 0o7, 11, 23, 1, 500000000, time.Local).Format(time.RFC3339Nano))
v.Set("duration", "2")
v.Set("duration", "3")
v.Set("format", "fmp4")
u.RawQuery = v.Encode()
@ -283,6 +284,15 @@ func TestOnGet(t *testing.T) {
Duration: 0,
Payload: []byte{3, 4},
},
{
Duration: 90000,
IsNonSyncSample: true,
Payload: []byte{5, 6},
},
{
Duration: 90000,
Payload: []byte{7, 8},
},
},
},
},
@ -292,27 +302,11 @@ func TestOnGet(t *testing.T) {
Tracks: []*fmp4.PartTrack{
{
ID: 1,
BaseTime: 0,
Samples: []*fmp4.PartSample{
{
Duration: 90000,
IsNonSyncSample: true,
Payload: []byte{5, 6},
},
},
},
},
},
{
SequenceNumber: 2,
Tracks: []*fmp4.PartTrack{
{
ID: 1,
BaseTime: 90000,
BaseTime: 180000,
Samples: []*fmp4.PartSample{
{
Duration: 90000,
Payload: []byte{7, 8},
Payload: []byte{9, 10},
},
},
},
@ -385,6 +379,11 @@ func TestOnGetDifferentInit(t *testing.T) {
Duration: 0,
Payload: []byte{3, 4},
},
{
Duration: 90000,
IsNonSyncSample: true,
Payload: []byte{5, 6},
},
},
},
},
@ -456,17 +455,6 @@ func TestOnGetNTPCompensation(t *testing.T) {
Duration: 0,
Payload: []byte{3, 4},
},
},
},
},
},
{
SequenceNumber: 1,
Tracks: []*fmp4.PartTrack{
{
ID: 1,
BaseTime: 0,
Samples: []*fmp4.PartSample{
{
Duration: 45000, // 90 - 45
IsNonSyncSample: true,
@ -481,11 +469,11 @@ func TestOnGetNTPCompensation(t *testing.T) {
},
},
{
SequenceNumber: 2,
SequenceNumber: 1,
Tracks: []*fmp4.PartTrack{
{
ID: 1,
BaseTime: 135000, // 180 - 45
BaseTime: 135000,
Samples: []*fmp4.PartSample{
{
Duration: 90000,

View file

@ -374,12 +374,15 @@ func segmentFMP4SeekAndMuxParts(
return nil, err
}
m.writeSample(
err = m.writeSample(
muxerDTS,
e.SampleCompositionTimeOffsetV1,
(e.SampleFlags&sampleFlagIsNonSyncSample) != 0,
payload,
)
if err != nil {
return nil, err
}
muxerDTS += int64(e.SampleDuration)
}
@ -389,12 +392,6 @@ func segmentFMP4SeekAndMuxParts(
if muxerDTS > maxMuxerDTS {
maxMuxerDTS = muxerDTS
}
case "mdat":
err := m.flush()
if err != nil {
return nil, err
}
}
return nil, nil
})
@ -474,12 +471,15 @@ func segmentFMP4WriteParts(
return nil, err
}
m.writeSample(
err = m.writeSample(
muxerDTS,
e.SampleCompositionTimeOffsetV1,
(e.SampleFlags&sampleFlagIsNonSyncSample) != 0,
payload,
)
if err != nil {
return nil, err
}
muxerDTS += int64(e.SampleDuration)
}
@ -489,12 +489,6 @@ func segmentFMP4WriteParts(
if muxerDTS > maxMuxerDTS {
maxMuxerDTS = muxerDTS
}
case "mdat":
err := m.flush()
if err != nil {
return nil, err
}
}
return nil, nil
})