From 0df5e2c81acf3a7370e287bc915f43988b24aecd Mon Sep 17 00:00:00 2001 From: Yaroslav Molochko Date: Sun, 6 Jul 2025 21:34:41 +0300 Subject: [PATCH] support routing KLV metadata (#2693) (#4670) Co-authored-by: aler9 <46489434+aler9@users.noreply.github.com> --- go.mod | 4 +- go.sum | 8 +- internal/formatprocessor/klv.go | 119 +++++++++++++++++++++ internal/formatprocessor/klv_test.go | 84 +++++++++++++++ internal/formatprocessor/processor.go | 8 ++ internal/formatprocessor/processor_test.go | 7 ++ internal/protocols/mpegts/from_stream.go | 24 +++++ internal/protocols/mpegts/to_stream.go | 20 ++++ internal/recorder/format_mpegts.go | 26 +++++ internal/unit/klv.go | 7 ++ 10 files changed, 301 insertions(+), 6 deletions(-) create mode 100644 internal/formatprocessor/klv.go create mode 100644 internal/formatprocessor/klv_test.go create mode 100644 internal/unit/klv.go diff --git a/go.mod b/go.mod index 5a0d33a0..1c032c00 100644 --- a/go.mod +++ b/go.mod @@ -10,8 +10,8 @@ require ( github.com/alecthomas/kong v1.12.0 github.com/asticode/go-astits v1.13.0 github.com/bluenviron/gohlslib/v2 v2.2.0 - github.com/bluenviron/gortsplib/v4 v4.14.2-0.20250705110245-9c1011567a97 - github.com/bluenviron/mediacommon/v2 v2.2.0 + github.com/bluenviron/gortsplib/v4 v4.14.2-0.20250706181149-52489821375e + github.com/bluenviron/mediacommon/v2 v2.2.1-0.20250706163316-d1fe0aa1b8d9 github.com/datarhei/gosrt v0.9.0 github.com/fsnotify/fsnotify v1.9.0 github.com/gin-contrib/pprof v1.5.3 diff --git a/go.sum b/go.sum index 29b2fe79..6f88888e 100644 --- a/go.sum +++ b/go.sum @@ -35,10 +35,10 @@ github.com/benburkert/openpgp v0.0.0-20160410205803-c2471f86866c h1:8XZeJrs4+ZYh github.com/benburkert/openpgp v0.0.0-20160410205803-c2471f86866c/go.mod h1:x1vxHcL/9AVzuk5HOloOEPrtJY0MaalYr78afXZ+pWI= github.com/bluenviron/gohlslib/v2 v2.2.0 h1:eIsCai3IHP0F538h2tCPCRkhQ7XSOaxeceMyPns0o1k= github.com/bluenviron/gohlslib/v2 v2.2.0/go.mod h1:sLyKB5iM6Su1kucNHuDUU9aeN/Hw4WxsV2y9k2IHMGs= -github.com/bluenviron/gortsplib/v4 v4.14.2-0.20250705110245-9c1011567a97 h1:V8m1pyQOYVEJK5RBy1SLg/Y+hgXYFFiMZOd7NhWWLAE= -github.com/bluenviron/gortsplib/v4 v4.14.2-0.20250705110245-9c1011567a97/go.mod h1:rur2QGh1wRU6KINZn8LwU8qTPFt1XafJGtsfs0KYzRo= -github.com/bluenviron/mediacommon/v2 v2.2.0 h1:fGXEX0OEvv5VhGHOv3Q2ABzOtSkIpl9UbwOHrnKWNTk= -github.com/bluenviron/mediacommon/v2 v2.2.0/go.mod h1:a6MbPmXtYda9mKibKVMZlW20GYLLrX2R7ZkUE+1pwV0= +github.com/bluenviron/gortsplib/v4 v4.14.2-0.20250706181149-52489821375e h1:XSCp4Q0DZcv2pQs2knAnB/mIr095QtPbshPSGRiPteg= +github.com/bluenviron/gortsplib/v4 v4.14.2-0.20250706181149-52489821375e/go.mod h1:rur2QGh1wRU6KINZn8LwU8qTPFt1XafJGtsfs0KYzRo= +github.com/bluenviron/mediacommon/v2 v2.2.1-0.20250706163316-d1fe0aa1b8d9 h1:cleSKsYkXx8y36auvXw3zone9t9JSFTT/4Kr+VLKvGw= +github.com/bluenviron/mediacommon/v2 v2.2.1-0.20250706163316-d1fe0aa1b8d9/go.mod h1:a6MbPmXtYda9mKibKVMZlW20GYLLrX2R7ZkUE+1pwV0= github.com/bytedance/sonic v1.13.2 h1:8/H1FempDZqC4VqjptGo14QQlJx8VdZJegxs6wwfqpQ= github.com/bytedance/sonic v1.13.2/go.mod h1:o68xyaF9u2gvVBuGHPlUVCy+ZfmNNO5ETf1+KgkJhz4= github.com/bytedance/sonic/loader v0.1.1/go.mod h1:ncP89zfokxS5LZrJxl5z0UJcsk4M4yY2JpfqGeCtNLU= diff --git a/internal/formatprocessor/klv.go b/internal/formatprocessor/klv.go new file mode 100644 index 00000000..571091ae --- /dev/null +++ b/internal/formatprocessor/klv.go @@ -0,0 +1,119 @@ +package formatprocessor + +import ( + "fmt" + "time" + + "github.com/bluenviron/gortsplib/v4/pkg/format" + "github.com/bluenviron/gortsplib/v4/pkg/format/rtpklv" + "github.com/pion/rtp" + + "github.com/bluenviron/mediamtx/internal/logger" + "github.com/bluenviron/mediamtx/internal/unit" +) + +type klv struct { + RTPMaxPayloadSize int + Format *format.KLV + GenerateRTPPackets bool + Parent logger.Writer + + encoder *rtpklv.Encoder + decoder *rtpklv.Decoder + randomStart uint32 +} + +func (t *klv) initialize() error { + if t.GenerateRTPPackets { + err := t.createEncoder() + if err != nil { + return err + } + + t.randomStart, err = randUint32() + if err != nil { + return err + } + } + + return nil +} + +func (t *klv) createEncoder() error { + t.encoder = &rtpklv.Encoder{ + PayloadMaxSize: t.RTPMaxPayloadSize, + PayloadType: t.Format.PayloadTyp, + } + return t.encoder.Init() +} + +func (t *klv) ProcessUnit(uu unit.Unit) error { //nolint:dupl + u := uu.(*unit.KLV) + + if u.Unit != nil { + // ensure the format processor's encoder is initialized + if t.encoder == nil { + err := t.createEncoder() + if err != nil { + return err + } + } + + pkts, err := t.encoder.Encode(u.Unit) + if err != nil { + return err + } + u.RTPPackets = pkts + + for _, pkt := range u.RTPPackets { + pkt.Timestamp += t.randomStart + uint32(u.PTS) + } + } + + return nil +} + +func (t *klv) ProcessRTPPacket( //nolint:dupl + pkt *rtp.Packet, + ntp time.Time, + pts int64, + hasNonRTSPReaders bool, +) (unit.Unit, error) { + u := &unit.KLV{ + Base: unit.Base{ + RTPPackets: []*rtp.Packet{pkt}, + NTP: ntp, + PTS: pts, + }, + } + + // remove padding + pkt.Padding = false + pkt.PaddingSize = 0 + + if len(pkt.Payload) > t.RTPMaxPayloadSize { + return nil, fmt.Errorf("RTP payload size (%d) is greater than maximum allowed (%d)", + len(pkt.Payload), t.RTPMaxPayloadSize) + } + + // decode from RTP + if hasNonRTSPReaders || t.decoder != nil { + if t.decoder == nil { + var err error + t.decoder, err = t.Format.CreateDecoder() + if err != nil { + return nil, err + } + } + + unit, err := t.decoder.Decode(pkt) + if err != nil { + return nil, err + } + + u.Unit = unit + } + + // route packet as is + return u, nil +} diff --git a/internal/formatprocessor/klv_test.go b/internal/formatprocessor/klv_test.go new file mode 100644 index 00000000..4e46a458 --- /dev/null +++ b/internal/formatprocessor/klv_test.go @@ -0,0 +1,84 @@ +package formatprocessor + +import ( + "testing" + "time" + + "github.com/bluenviron/gortsplib/v4/pkg/format" + "github.com/bluenviron/mediamtx/internal/unit" + "github.com/pion/rtp" + "github.com/stretchr/testify/require" +) + +func TestKlvCreateEncoder(t *testing.T) { + forma := &format.KLV{ + PayloadTyp: 96, + } + p, err := New(1472, forma, false, nil) + require.NoError(t, err) + + klvProc := p.(*klv) + err = klvProc.createEncoder() + require.NoError(t, err) +} + +func TestKlvProcessUnit(t *testing.T) { + forma := &format.KLV{ + PayloadTyp: 96, + } + p, err := New(1472, forma, true, nil) + require.NoError(t, err) + + // create test Unit + theTime := time.Now() + when := int64(5000000000) // 5 seconds in nanoseconds + u := unit.KLV{ + Base: unit.Base{ + RTPPackets: nil, + NTP: theTime, + PTS: when, + }, + Unit: []byte{1, 2, 3, 4}, + } + uu := &u + + // process the unit + err = p.ProcessUnit(uu) + require.NoError(t, err) +} + +func TestKlvProcessRTPPacket(t *testing.T) { + forma := &format.KLV{ + PayloadTyp: 96, + } + p, err := New(1472, forma, false, nil) + require.NoError(t, err) + + pkt := &rtp.Packet{ + Header: rtp.Header{ + Version: 2, + Marker: true, + PayloadType: 96, + SequenceNumber: 3446, + Timestamp: 175349, + SSRC: 563423, + Padding: true, + }, + Payload: []byte{1, 2, 3, 4}, + PaddingSize: 20, + } + _, err = p.ProcessRTPPacket(pkt, time.Time{}, 0, false) + require.NoError(t, err) + + require.Equal(t, &rtp.Packet{ + Header: rtp.Header{ + Version: 2, + Marker: true, + PayloadType: 96, + SequenceNumber: 3446, + Timestamp: 175349, + SSRC: 563423, + }, + Payload: []byte{1, 2, 3, 4}, + }, pkt) +} diff --git a/internal/formatprocessor/processor.go b/internal/formatprocessor/processor.go index 51b4a371..59fb999e 100644 --- a/internal/formatprocessor/processor.go +++ b/internal/formatprocessor/processor.go @@ -119,6 +119,14 @@ func New( Parent: parent, } + case *format.KLV: + proc = &klv{ + RTPMaxPayloadSize: rtpMaxPayloadSize, + Format: forma, + GenerateRTPPackets: generateRTPPackets, + Parent: parent, + } + case *format.MPEG4Audio: proc = &mpeg4Audio{ RTPMaxPayloadSize: rtpMaxPayloadSize, diff --git a/internal/formatprocessor/processor_test.go b/internal/formatprocessor/processor_test.go index 9a838f2a..6920c0b0 100644 --- a/internal/formatprocessor/processor_test.go +++ b/internal/formatprocessor/processor_test.go @@ -83,6 +83,13 @@ func TestNew(t *testing.T) { &format.LPCM{}, &lpcm{}, }, + { + "klv", + &format.KLV{ + PayloadTyp: 96, + }, + &klv{}, + }, { "generic", &format.Generic{}, diff --git a/internal/protocols/mpegts/from_stream.go b/internal/protocols/mpegts/from_stream.go index a5b8768e..2311f3f8 100644 --- a/internal/protocols/mpegts/from_stream.go +++ b/internal/protocols/mpegts/from_stream.go @@ -229,6 +229,30 @@ func FromStream( } return bw.Flush() }) + case *format.KLV: + track := &mcmpegts.Track{ + Codec: &mcmpegts.CodecKLV{ + Synchronous: true, + }, + } + + addTrack( + media, + forma, + track, + func(u unit.Unit) error { + tunit := u.(*unit.KLV) + if tunit.Unit == nil { + return nil + } + + sconn.SetWriteDeadline(time.Now().Add(writeTimeout)) + err := (*w).WriteKLV(track, multiplyAndDivide(tunit.PTS, 90000, 90000), tunit.Unit) + if err != nil { + return err + } + return bw.Flush() + }) case *format.MPEG4Audio: co := forma.GetConfig() diff --git a/internal/protocols/mpegts/to_stream.go b/internal/protocols/mpegts/to_stream.go index 1e59e45c..cc3291e6 100644 --- a/internal/protocols/mpegts/to_stream.go +++ b/internal/protocols/mpegts/to_stream.go @@ -139,6 +139,26 @@ func ToStream( return nil }) + case *mpegts.CodecKLV: + medi = &description.Media{ + Type: description.MediaTypeApplication, + Formats: []format.Format{&format.KLV{ + PayloadTyp: 96, + }}, + } + r.OnDataKLV(track, func(pts int64, uni []byte) error { + pts = td.Decode(pts) + + (*stream).WriteUnit(medi, medi.Formats[0], &unit.KLV{ + Base: unit.Base{ + NTP: time.Now(), + PTS: pts, + }, + Unit: uni, + }) + return nil + }) + case *mpegts.CodecMPEG4Audio: medi = &description.Media{ Type: description.MediaTypeAudio, diff --git a/internal/recorder/format_mpegts.go b/internal/recorder/format_mpegts.go index fdaf1638..1313b8bf 100644 --- a/internal/recorder/format_mpegts.go +++ b/internal/recorder/format_mpegts.go @@ -279,6 +279,32 @@ func (f *formatMPEGTS) initialize() bool { ) }) + case *rtspformat.KLV: + track := addTrack(forma, &mpegts.CodecKLV{ + Synchronous: true, + }) + + f.ri.stream.AddReader( + f.ri, + media, + forma, + func(u unit.Unit) error { + tunit := u.(*unit.KLV) + if tunit.Unit == nil { + return nil + } + + return f.write( + timestampToDuration(tunit.PTS, 90000), + tunit.NTP, + false, + true, + func() error { + return f.mw.WriteKLV(track, multiplyAndDivide(tunit.PTS, 90000, 90000), tunit.Unit) + }, + ) + }) + case *rtspformat.MPEG4Audio: co := forma.GetConfig() if co == nil { diff --git a/internal/unit/klv.go b/internal/unit/klv.go new file mode 100644 index 00000000..c2b5b268 --- /dev/null +++ b/internal/unit/klv.go @@ -0,0 +1,7 @@ +package unit + +// KLV is a KLV data unit. +type KLV struct { + Base + Unit []byte +}