From bc95f6240b32d934ecc861882f780a028e0773ea Mon Sep 17 00:00:00 2001 From: Alessandro Ros Date: Sun, 20 Jul 2025 12:20:31 +0200 Subject: [PATCH] fix generating timestamp of non-RTSP MPEG-4 video streams (#4758) --- internal/formatprocessor/av1.go | 7 -- internal/formatprocessor/g711_test.go | 2 +- internal/formatprocessor/generic_test.go | 3 +- internal/formatprocessor/h264_test.go | 103 +++++++++++++----- internal/formatprocessor/h265_test.go | 107 ++++++++++++++----- internal/formatprocessor/lpcm_test.go | 2 +- internal/formatprocessor/mpeg4_video.go | 5 +- internal/formatprocessor/mpeg4_video_test.go | 71 ++++++++++++ internal/formatprocessor/opus_test.go | 2 +- internal/recorder/format_fmp4.go | 6 +- 10 files changed, 244 insertions(+), 64 deletions(-) create mode 100644 internal/formatprocessor/mpeg4_video_test.go diff --git a/internal/formatprocessor/av1.go b/internal/formatprocessor/av1.go index baab754d..111ee126 100644 --- a/internal/formatprocessor/av1.go +++ b/internal/formatprocessor/av1.go @@ -13,13 +13,6 @@ import ( "github.com/bluenviron/mediamtx/internal/unit" ) -// AV1-related parameters -var ( - AV1DefaultSequenceHeader = []byte{ - 8, 0, 0, 0, 66, 167, 191, 228, 96, 13, 0, 64, - } -) - type av1 struct { RTPMaxPayloadSize int Format *format.AV1 diff --git a/internal/formatprocessor/g711_test.go b/internal/formatprocessor/g711_test.go index eaa136d0..3e8ca39a 100644 --- a/internal/formatprocessor/g711_test.go +++ b/internal/formatprocessor/g711_test.go @@ -9,7 +9,7 @@ import ( "github.com/stretchr/testify/require" ) -func TestG711Encode(t *testing.T) { +func TestG711ProcessUnit(t *testing.T) { t.Run("alaw", func(t *testing.T) { forma := &format.G711{ PayloadTyp: 8, diff --git a/internal/formatprocessor/generic_test.go b/internal/formatprocessor/generic_test.go index 80d8d015..5baeca3c 100644 --- a/internal/formatprocessor/generic_test.go +++ b/internal/formatprocessor/generic_test.go @@ -9,7 +9,7 @@ import ( "github.com/stretchr/testify/require" ) -func TestGenericRemovePadding(t *testing.T) { +func TestGenericProcessRTPPacket(t *testing.T) { forma := &format.Generic{ PayloadTyp: 96, RTPMa: "private/90000", @@ -37,6 +37,7 @@ func TestGenericRemovePadding(t *testing.T) { _, err = p.ProcessRTPPacket(pkt, time.Time{}, 0, false) require.NoError(t, err) + // check that padding has been removed require.Equal(t, &rtp.Packet{ Header: rtp.Header{ Version: 2, diff --git a/internal/formatprocessor/h264_test.go b/internal/formatprocessor/h264_test.go index f7e21719..9b028621 100644 --- a/internal/formatprocessor/h264_test.go +++ b/internal/formatprocessor/h264_test.go @@ -28,7 +28,83 @@ func Logger(cb func(logger.Level, string, ...interface{})) logger.Writer { return &testLogger{cb: cb} } -func TestH264DynamicParams(t *testing.T) { +func TestH264ProcessUnit(t *testing.T) { + forma := &format.H264{} + + p, err := New(1450, forma, true, nil) + require.NoError(t, err) + + u1 := &unit.H264{ + Base: unit.Base{ + PTS: 30000, + }, + AU: [][]byte{ + {7, 4, 5, 6}, // SPS + {8, 1}, // PPS + {5, 1}, // IDR + }, + } + + err = p.ProcessUnit(u1) + require.NoError(t, err) + + require.Equal(t, [][]byte{ + {7, 4, 5, 6}, // SPS + {8, 1}, // PPS + {5, 1}, // IDR + }, u1.AU) + + u2 := &unit.H264{ + Base: unit.Base{ + PTS: 30000 * 2, + }, + AU: [][]byte{ + {5, 2}, // IDR + }, + } + + err = p.ProcessUnit(u2) + require.NoError(t, err) + + // test that params have been added to the SDP + require.Equal(t, []byte{7, 4, 5, 6}, forma.SPS) + require.Equal(t, []byte{8, 1}, forma.PPS) + + // test that params have been added to the frame + require.Equal(t, [][]byte{ + {7, 4, 5, 6}, // SPS + {8, 1}, // PPS + {5, 2}, // IDR + }, u2.AU) + + // test that timestamp had increased + require.Equal(t, u1.RTPPackets[0].Timestamp+30000, u2.RTPPackets[0].Timestamp) +} + +func TestH264ProcessUnitEmpty(t *testing.T) { + forma := &format.H264{ + PayloadTyp: 96, + PacketizationMode: 1, + } + + p, err := New(1450, forma, true, nil) + require.NoError(t, err) + + unit := &unit.H264{ + AU: [][]byte{ + {0x07, 0x01, 0x02, 0x03}, // SPS + {0x08, 0x01, 0x02}, // PPS + }, + } + + err = p.ProcessUnit(unit) + require.NoError(t, err) + + // if all NALUs have been removed, no RTP packets shall be generated. + require.Equal(t, []*rtp.Packet(nil), unit.RTPPackets) +} + +func TestH264ProcessRTPPacketUpdateParams(t *testing.T) { for _, ca := range []string{"standard", "aggregated"} { t.Run(ca, func(t *testing.T) { forma := &format.H264{ @@ -93,7 +169,7 @@ func TestH264DynamicParams(t *testing.T) { } } -func TestH264OversizedPackets(t *testing.T) { +func TestH264ProcessRTPPacketOversized(t *testing.T) { forma := &format.H264{ PayloadTyp: 96, SPS: []byte{0x01, 0x02, 0x03, 0x04}, @@ -201,29 +277,6 @@ func TestH264OversizedPackets(t *testing.T) { require.True(t, logged) } -func TestH264EmptyPacket(t *testing.T) { - forma := &format.H264{ - PayloadTyp: 96, - PacketizationMode: 1, - } - - p, err := New(1450, forma, true, nil) - require.NoError(t, err) - - unit := &unit.H264{ - AU: [][]byte{ - {0x07, 0x01, 0x02, 0x03}, // SPS - {0x08, 0x01, 0x02}, // PPS - }, - } - - err = p.ProcessUnit(unit) - require.NoError(t, err) - - // if all NALUs have been removed, no RTP packets must be generated. - require.Equal(t, []*rtp.Packet(nil), unit.RTPPackets) -} - func FuzzRTPH264ExtractParams(f *testing.F) { f.Fuzz(func(_ *testing.T, b []byte) { rtpH264ExtractParams(b) diff --git a/internal/formatprocessor/h265_test.go b/internal/formatprocessor/h265_test.go index bb67a9a3..3235c6c3 100644 --- a/internal/formatprocessor/h265_test.go +++ b/internal/formatprocessor/h265_test.go @@ -15,7 +15,87 @@ import ( "github.com/bluenviron/mediamtx/internal/unit" ) -func TestH265DynamicParams(t *testing.T) { +func TestH265ProcessUnit(t *testing.T) { + forma := &format.H265{} + + p, err := New(1450, forma, true, nil) + require.NoError(t, err) + + u1 := &unit.H265{ + Base: unit.Base{ + PTS: 30000, + }, + AU: [][]byte{ + {byte(mch265.NALUType_VPS_NUT) << 1, 1, 2, 3}, + {byte(mch265.NALUType_SPS_NUT) << 1, 4, 5, 6}, + {byte(mch265.NALUType_PPS_NUT) << 1, 7, 8, 9}, + {byte(mch265.NALUType_CRA_NUT) << 1, 0}, + }, + } + + err = p.ProcessUnit(u1) + require.NoError(t, err) + + require.Equal(t, [][]byte{ + {byte(mch265.NALUType_VPS_NUT) << 1, 1, 2, 3}, + {byte(mch265.NALUType_SPS_NUT) << 1, 4, 5, 6}, + {byte(mch265.NALUType_PPS_NUT) << 1, 7, 8, 9}, + {byte(mch265.NALUType_CRA_NUT) << 1, 0}, + }, u1.AU) + + u2 := &unit.H265{ + Base: unit.Base{ + PTS: 30000 * 2, + }, + AU: [][]byte{ + {byte(mch265.NALUType_CRA_NUT) << 1, 1}, + }, + } + + err = p.ProcessUnit(u2) + require.NoError(t, err) + + // test that params have been added to the SDP + require.Equal(t, []byte{byte(mch265.NALUType_VPS_NUT) << 1, 1, 2, 3}, forma.VPS) + require.Equal(t, []byte{byte(mch265.NALUType_SPS_NUT) << 1, 4, 5, 6}, forma.SPS) + require.Equal(t, []byte{byte(mch265.NALUType_PPS_NUT) << 1, 7, 8, 9}, forma.PPS) + + // test that params have been added to the frame + require.Equal(t, [][]byte{ + {byte(mch265.NALUType_VPS_NUT) << 1, 1, 2, 3}, + {byte(mch265.NALUType_SPS_NUT) << 1, 4, 5, 6}, + {byte(mch265.NALUType_PPS_NUT) << 1, 7, 8, 9}, + {byte(mch265.NALUType_CRA_NUT) << 1, 1}, + }, u2.AU) + + // test that timestamp had increased + require.Equal(t, u1.RTPPackets[0].Timestamp+30000, u2.RTPPackets[0].Timestamp) +} + +func TestH265ProcessUnitEmpty(t *testing.T) { + forma := &format.H265{ + PayloadTyp: 96, + } + + p, err := New(1450, forma, true, nil) + require.NoError(t, err) + + unit := &unit.H265{ + AU: [][]byte{ + {byte(mch265.NALUType_VPS_NUT) << 1, 10, 11, 12}, // VPS + {byte(mch265.NALUType_SPS_NUT) << 1, 13, 14, 15}, // SPS + {byte(mch265.NALUType_PPS_NUT) << 1, 16, 17, 18}, // PPS + }, + } + + err = p.ProcessUnit(unit) + require.NoError(t, err) + + // if all NALUs have been removed, no RTP packets shall be generated. + require.Equal(t, []*rtp.Packet(nil), unit.RTPPackets) +} + +func TestH265ProcessRTPPacketUpdateParams(t *testing.T) { for _, ca := range []string{"standard", "aggregated"} { t.Run(ca, func(t *testing.T) { forma := &format.H265{ @@ -88,7 +168,7 @@ func TestH265DynamicParams(t *testing.T) { } } -func TestH265OversizedPackets(t *testing.T) { +func TestH265ProcessRTPPacketOversized(t *testing.T) { forma := &format.H265{ PayloadTyp: 96, VPS: []byte{byte(mch265.NALUType_VPS_NUT) << 1, 10, 11, 12}, @@ -184,29 +264,6 @@ func TestH265OversizedPackets(t *testing.T) { require.True(t, logged) } -func TestH265EmptyPacket(t *testing.T) { - forma := &format.H265{ - PayloadTyp: 96, - } - - p, err := New(1450, forma, true, nil) - require.NoError(t, err) - - unit := &unit.H265{ - AU: [][]byte{ - {byte(mch265.NALUType_VPS_NUT) << 1, 10, 11, 12}, // VPS - {byte(mch265.NALUType_SPS_NUT) << 1, 13, 14, 15}, // SPS - {byte(mch265.NALUType_PPS_NUT) << 1, 16, 17, 18}, // PPS - }, - } - - err = p.ProcessUnit(unit) - require.NoError(t, err) - - // if all NALUs have been removed, no RTP packets must be generated. - require.Equal(t, []*rtp.Packet(nil), unit.RTPPackets) -} - func FuzzRTPH265ExtractParams(f *testing.F) { f.Fuzz(func(_ *testing.T, b []byte) { rtpH265ExtractParams(b) diff --git a/internal/formatprocessor/lpcm_test.go b/internal/formatprocessor/lpcm_test.go index 8e724f20..1ff3122a 100644 --- a/internal/formatprocessor/lpcm_test.go +++ b/internal/formatprocessor/lpcm_test.go @@ -9,7 +9,7 @@ import ( "github.com/stretchr/testify/require" ) -func TestLPCMEncode(t *testing.T) { +func TestLPCMProcessUnit(t *testing.T) { forma := &format.LPCM{ PayloadTyp: 96, BitDepth: 16, diff --git a/internal/formatprocessor/mpeg4_video.go b/internal/formatprocessor/mpeg4_video.go index dc452e71..c7125ec5 100644 --- a/internal/formatprocessor/mpeg4_video.go +++ b/internal/formatprocessor/mpeg4_video.go @@ -77,6 +77,7 @@ func (t *mpeg4Video) updateTrackParameters(frame []byte) { } func (t *mpeg4Video) remuxFrame(frame []byte) []byte { + // remove config if bytes.HasPrefix(frame, []byte{0, 0, 1, byte(mpeg4video.VisualObjectSequenceStartCode)}) { end := bytes.Index(frame[4:], []byte{0, 0, 1, byte(mpeg4video.GroupOfVOPStartCode)}) if end >= 0 { @@ -84,6 +85,7 @@ func (t *mpeg4Video) remuxFrame(frame []byte) []byte { } } + // add config if bytes.Contains(frame, []byte{0, 0, 1, byte(mpeg4video.GroupOfVOPStartCode)}) { f := make([]byte, len(t.Format.Config)+len(frame)) n := copy(f, t.Format.Config) @@ -105,12 +107,11 @@ func (t *mpeg4Video) ProcessUnit(uu unit.Unit) error { //nolint:dupl if err != nil { return err } + u.RTPPackets = pkts for _, pkt := range u.RTPPackets { pkt.Timestamp += t.randomStart + uint32(u.PTS) } - - u.RTPPackets = pkts } return nil diff --git a/internal/formatprocessor/mpeg4_video_test.go b/internal/formatprocessor/mpeg4_video_test.go new file mode 100644 index 00000000..f14d04c9 --- /dev/null +++ b/internal/formatprocessor/mpeg4_video_test.go @@ -0,0 +1,71 @@ +package formatprocessor + +import ( + "testing" + + "github.com/bluenviron/gortsplib/v4/pkg/format" + "github.com/bluenviron/mediacommon/v2/pkg/codecs/mpeg4video" + "github.com/bluenviron/mediamtx/internal/unit" + "github.com/stretchr/testify/require" +) + +func TestMPEG4VideoProcessUnit(t *testing.T) { + forma := &format.MPEG4Video{ + PayloadTyp: 96, + } + + p, err := New(1450, forma, true, nil) + require.NoError(t, err) + + u1 := &unit.MPEG4Video{ + Base: unit.Base{ + PTS: 30000, + }, + Frame: []byte{ + 0, 0, 1, byte(mpeg4video.VisualObjectSequenceStartCode), + 0, 0, 1, 0xFF, + 0, 0, 1, byte(mpeg4video.GroupOfVOPStartCode), + 0, 0, 1, 0xF0, + }, + } + + err = p.ProcessUnit(u1) + require.NoError(t, err) + + require.Equal(t, []byte{ + 0, 0, 1, byte(mpeg4video.VisualObjectSequenceStartCode), + 0, 0, 1, 0xFF, + 0, 0, 1, byte(mpeg4video.GroupOfVOPStartCode), + 0, 0, 1, 0xF0, + }, u1.Frame) + + u2 := &unit.MPEG4Video{ + Base: unit.Base{ + PTS: 30000 * 2, + }, + Frame: []byte{ + 0, 0, 1, byte(mpeg4video.GroupOfVOPStartCode), + 0, 0, 1, 0xF1, + }, + } + + err = p.ProcessUnit(u2) + require.NoError(t, err) + + // test that params have been added to the SDP + require.Equal(t, []byte{ + 0, 0, 1, byte(mpeg4video.VisualObjectSequenceStartCode), + 0, 0, 1, 0xFF, + }, forma.Config) + + // test that params have been added to the frame + require.Equal(t, []byte{ + 0, 0, 1, byte(mpeg4video.VisualObjectSequenceStartCode), + 0, 0, 1, 0xFF, + 0, 0, 1, byte(mpeg4video.GroupOfVOPStartCode), + 0, 0, 1, 0xF1, + }, u2.Frame) + + // test that timestamp had increased + require.Equal(t, u1.RTPPackets[0].Timestamp+30000, u2.RTPPackets[0].Timestamp) +} diff --git a/internal/formatprocessor/opus_test.go b/internal/formatprocessor/opus_test.go index 0782e1f0..79a6e19d 100644 --- a/internal/formatprocessor/opus_test.go +++ b/internal/formatprocessor/opus_test.go @@ -9,7 +9,7 @@ import ( "github.com/stretchr/testify/require" ) -func TestOpusEncode(t *testing.T) { +func TestOpusProcessUnit(t *testing.T) { forma := &format.Opus{ PayloadTyp: 96, ChannelCount: 2, diff --git a/internal/recorder/format_fmp4.go b/internal/recorder/format_fmp4.go index 17ebb2b8..72d52da0 100644 --- a/internal/recorder/format_fmp4.go +++ b/internal/recorder/format_fmp4.go @@ -26,6 +26,10 @@ import ( "github.com/bluenviron/mediamtx/internal/unit" ) +var av1DefaultSequenceHeader = []byte{ + 8, 0, 0, 0, 66, 167, 191, 228, 96, 13, 0, 64, +} + func mpeg1audioChannelCount(cm mpeg1audio.ChannelMode) int { switch cm { case mpeg1audio.ChannelModeStereo, @@ -133,7 +137,7 @@ func (f *formatFMP4) initialize() bool { switch forma := forma.(type) { case *rtspformat.AV1: codec := &mp4.CodecAV1{ - SequenceHeader: formatprocessor.AV1DefaultSequenceHeader, + SequenceHeader: av1DefaultSequenceHeader, } track := addTrack(forma, codec)