Implement KLV datachanel for WebRTC

This implementation adds comprehensive KLV (Key-Length-Value) metadata support
to MediaMTX's WebRTC functionality. KLV metadata is transmitted via WebRTC data
channels, allowing real-time delivery of telemetry and metadata alongside video and audio streams.
This commit is contained in:
Yaroslav Molochko 2025-07-11 21:45:03 +03:00 committed by aler9
parent 653d10fb75
commit 9cd62a8c62
4 changed files with 457 additions and 0 deletions

View file

@ -647,6 +647,111 @@ func setupAudioTrack(
return nil, nil
}
// setupKLVDataChannel sets up KLV metadata transmission via WebRTC data channel
func setupKLVDataChannel(
stream *stream.Stream,
reader stream.Reader,
pc *PeerConnection,
) (format.Format, error) {
// Look for KLV format in the stream (using Generic format with KLV RTPMap)
var klvFormat format.Format
var klvMedia *description.Media
for _, media := range stream.Desc.Medias {
if media == nil {
continue
}
for _, forma := range media.Formats {
// Check for Generic format with KLV RTPMap
if genericFmt, ok := forma.(*format.Generic); ok {
if genericFmt.RTPMap() == "KLV/90000" {
klvFormat = genericFmt
klvMedia = media
break
}
}
// Check for KLV format (using type assertion)
if _, ok := forma.(*format.KLV); ok {
klvFormat = forma
klvMedia = media
break
}
// Also check for internal KLV format by codec name
if forma.Codec() == "KLV" {
klvFormat = forma
klvMedia = media
break
}
}
if klvFormat != nil {
break
}
}
if klvFormat == nil {
// No KLV format found, return nil without error
return nil, nil
}
if reader != nil {
reader.Log(logger.Info, "setting up KLV metadata transmission via WebRTC data channel")
}
// Setup the actual WebRTC data channel (with error recovery)
err := pc.setupKLVDataChannel()
if err != nil {
if reader != nil {
reader.Log(logger.Debug, "KLV data channel creation failed: %v", err)
}
// Return nil format to indicate KLV is not available, but don't fail the entire setup
return nil, nil
}
// Add reader for KLV data and send via data channel
stream.AddReader(
reader,
klvMedia,
klvFormat,
func(u unit.Unit) error {
// Handle both Generic and KLV units
var klvData []byte
switch tunit := u.(type) {
case *unit.Generic:
// Extract KLV data from Generic unit RTP packets
if tunit.RTPPackets != nil {
for _, pkt := range tunit.RTPPackets {
klvData = append(klvData, pkt.Payload...)
}
}
case *unit.KLV:
// Extract KLV data from KLV unit
if tunit.Unit != nil {
klvData = append(klvData, tunit.Unit...)
}
default:
return nil // Unknown unit type, skip
}
if len(klvData) == 0 {
return nil
}
// Send KLV data through WebRTC data channel
err := pc.SendKLVData(klvData)
if err != nil {
reader.Log(logger.Debug, "failed to send KLV data via data channel: %v", err)
// Don't return error to avoid breaking the stream
}
return nil
})
return klvFormat, nil
}
// FromStream maps a MediaMTX stream to a WebRTC connection
func FromStream(
desc *description.Session,
@ -667,6 +772,16 @@ func FromStream(
return errNoSupportedCodecsFrom
}
// Setup KLV metadata handling via data channel (non-blocking)
klvFormat, err := setupKLVDataChannel(stream, reader, pc)
if err != nil {
if reader != nil {
reader.Log(logger.Debug, "KLV data channel setup skipped: %v", err)
}
// Don't treat KLV setup failure as a fatal error
klvFormat = nil
}
setuppedFormats := r.Formats()
n := 1

View file

@ -0,0 +1,201 @@
package webrtc
import (
"testing"
"github.com/bluenviron/gortsplib/v4/pkg/description"
"github.com/bluenviron/gortsplib/v4/pkg/format"
"github.com/pion/rtp"
"github.com/stretchr/testify/require"
"github.com/bluenviron/mediamtx/internal/logger"
"github.com/bluenviron/mediamtx/internal/stream"
"github.com/bluenviron/mediamtx/internal/unit"
)
// mockLogger implements logger.Writer for testing
type mockLogger struct{}
func (l *mockLogger) Log(_ logger.Level, _ string, _ ...interface{}) {}
// TestKLVFormatDetection tests that KLV formats are properly detected
func TestKLVFormatDetection(t *testing.T) {
tests := []struct {
name string
formats []format.Format
expectsKLV bool
}{
{
name: "KLV format detected",
formats: []format.Format{
&format.KLV{},
},
expectsKLV: true,
},
{
name: "Generic KLV format detected",
formats: []format.Format{
&format.Generic{
PayloadTyp: 96,
RTPMa: "KLV/90000",
},
},
expectsKLV: true,
},
{
name: "No KLV format",
formats: []format.Format{
&format.H264{PayloadTyp: 96},
},
expectsKLV: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
streamDesc := &description.Session{
Medias: []*description.Media{
{
Type: description.MediaTypeApplication,
Formats: tt.formats,
},
},
}
mockStream := &stream.Stream{
WriteQueueSize: 512,
RTPMaxPayloadSize: 1450,
Desc: streamDesc,
GenerateRTPPackets: false,
Parent: &mockLogger{},
}
err := mockStream.Initialize()
require.NoError(t, err)
defer mockStream.Close()
// Test KLV detection logic (same as in setupKLVDataChannel)
var klvFormat format.Format
for _, media := range mockStream.Desc.Medias {
for _, forma := range media.Formats {
if genericFmt, ok := forma.(*format.Generic); ok {
if genericFmt.RTPMap() == "KLV/90000" {
klvFormat = genericFmt
break
}
}
if _, ok := forma.(*format.KLV); ok {
klvFormat = forma
break
}
if forma.Codec() == "KLV" {
klvFormat = forma
break
}
}
if klvFormat != nil {
break
}
}
if tt.expectsKLV {
require.NotNil(t, klvFormat, "Expected to find KLV format")
} else {
require.Nil(t, klvFormat, "Expected not to find KLV format")
}
})
}
}
// TestKLVUnitHandling tests that different KLV unit types are handled correctly
func TestKLVUnitHandling(t *testing.T) {
tests := []struct {
name string
unit unit.Unit
expectedData []byte
}{
{
name: "KLV unit with data",
unit: &unit.KLV{
Unit: []byte{0x06, 0x0E, 0x2B, 0x34},
},
expectedData: []byte{0x06, 0x0E, 0x2B, 0x34},
},
{
name: "KLV unit without data",
unit: &unit.KLV{
Unit: nil,
},
expectedData: nil,
},
{
name: "Generic unit with RTP packets",
unit: &unit.Generic{
Base: unit.Base{
RTPPackets: []*rtp.Packet{
{Payload: []byte{0x06, 0x0E}},
{Payload: []byte{0x2B, 0x34}},
},
},
},
expectedData: []byte{0x06, 0x0E, 0x2B, 0x34},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Simulate the unit handling logic from setupKLVDataChannel
var klvData []byte
switch tunit := tt.unit.(type) {
case *unit.Generic:
if tunit.RTPPackets != nil {
for _, pkt := range tunit.RTPPackets {
klvData = append(klvData, pkt.Payload...)
}
}
case *unit.KLV:
if tunit.Unit != nil {
klvData = append(klvData, tunit.Unit...)
}
}
require.Equal(t, tt.expectedData, klvData)
})
}
}
// TestSetupKLVDataChannelIntegration tests that KLV setup doesn't break normal WebRTC flow
func TestSetupKLVDataChannelIntegration(t *testing.T) {
// Test that setupKLVDataChannel can be called without breaking anything
streamDesc := &description.Session{
Medias: []*description.Media{
{
Type: description.MediaTypeApplication,
Formats: []format.Format{
&format.KLV{},
},
},
},
}
mockStream := &stream.Stream{
WriteQueueSize: 512,
RTPMaxPayloadSize: 1450,
Desc: streamDesc,
GenerateRTPPackets: false,
Parent: &mockLogger{},
}
err := mockStream.Initialize()
require.NoError(t, err)
defer mockStream.Close()
// Create a mock peer connection (nil is fine for this test)
pc := &PeerConnection{}
// This should not panic or return an error, just return nil format
klvFormat, err := setupKLVDataChannel(mockStream, &mockLogger{}, pc)
require.NoError(t, err)
require.Nil(t, klvFormat) // Should be nil due to defensive handling
}

View file

@ -162,6 +162,10 @@ type PeerConnection struct {
gatheringDone chan struct{}
done chan struct{}
chStartReading chan struct{}
// KLV data channel support
klvDataChannel *webrtc.DataChannel
klvChannelReady chan struct{}
}
// Start starts the peer connection.
@ -309,6 +313,7 @@ func (co *PeerConnection) Start() error {
co.incomingTrack = make(chan trackRecvPair)
co.done = make(chan struct{})
co.chStartReading = make(chan struct{})
// klvChannelReady will be created only when needed
if co.Publish {
for _, tr := range co.OutgoingTracks {
@ -318,7 +323,10 @@ func (co *PeerConnection) Start() error {
return err
}
}
// KLV data channel will be set up later if needed
} else {
// KLV data channel will be set up later if needed
_, err = co.wr.AddTransceiverFromKind(webrtc.RTPCodecTypeVideo, webrtc.RTPTransceiverInit{
Direction: webrtc.RTPTransceiverDirectionRecvonly,
})
@ -411,6 +419,16 @@ func (co *PeerConnection) Start() error {
// Close closes the connection.
func (co *PeerConnection) Close() {
// Ensure KLV channel is closed to prevent hanging
if co.klvChannelReady != nil {
select {
case <-co.klvChannelReady:
// Already closed
default:
close(co.klvChannelReady)
}
}
co.ctxCancel()
<-co.done
}
@ -580,6 +598,102 @@ func (co *PeerConnection) filterLocalDescription(desc *webrtc.SessionDescription
return desc, nil
}
// setupKLVDataChannel sets up a data channel for KLV metadata transmission
func (co *PeerConnection) setupKLVDataChannel() error {
// Check if peer connection is in a valid state
if co.wr == nil {
return fmt.Errorf("peer connection not initialized")
}
// Create the ready channel only when needed
if co.klvChannelReady == nil {
co.klvChannelReady = make(chan struct{})
}
// Create data channel for KLV metadata with minimal configuration
dataChannelInit := &webrtc.DataChannelInit{
Ordered: &[]bool{false}[0], // Use unordered for better performance
}
var err error
co.klvDataChannel, err = co.wr.CreateDataChannel("klv", dataChannelInit)
if err != nil {
// Close the channel to prevent hanging
select {
case <-co.klvChannelReady:
// Already closed
default:
close(co.klvChannelReady)
}
return fmt.Errorf("failed to create KLV data channel: %w", err)
}
// Set up data channel event handlers
co.klvDataChannel.OnOpen(func() {
co.Log.Log(logger.Info, "KLV data channel opened")
select {
case <-co.klvChannelReady:
// Already closed
default:
close(co.klvChannelReady)
}
})
co.klvDataChannel.OnClose(func() {
co.Log.Log(logger.Info, "KLV data channel closed")
})
co.klvDataChannel.OnError(func(err error) {
co.Log.Log(logger.Warn, "KLV data channel error: %v", err)
// Close the channel on error to prevent hanging
select {
case <-co.klvChannelReady:
// Already closed
default:
close(co.klvChannelReady)
}
})
return nil
}
// SendKLVData sends KLV metadata through the data channel
func (co *PeerConnection) SendKLVData(klvData []byte) error {
if co.klvDataChannel == nil || co.klvChannelReady == nil {
// No data channel or ready channel, silently skip
return nil
}
// Check if channel is ready without blocking
select {
case <-co.klvChannelReady:
// Channel is ready, proceed
default:
// Channel not ready, silently skip to avoid blocking
return nil
}
// Check if channel is still open
if co.klvDataChannel.ReadyState() != webrtc.DataChannelStateOpen {
// Channel not open, silently skip
return nil
}
// Send the KLV data
err := co.klvDataChannel.Send(klvData)
if err != nil {
// Don't return error, just skip to avoid breaking the stream
return nil //nolint:nilerr // Intentionally ignoring error to make KLV non-blocking
}
return nil
}
// KLVChannelReady returns a channel that closes when the KLV data channel is ready
func (co *PeerConnection) KLVChannelReady() <-chan struct{} {
return co.klvChannelReady
}
// CreatePartialOffer creates a partial offer.
func (co *PeerConnection) CreatePartialOffer() (*webrtc.SessionDescription, error) {
tmp, err := co.wr.CreateOffer(nil)

View file

@ -445,6 +445,7 @@ class MediaMTXWebRTCReader {
this.pc.onicecandidate = (evt) => this.#onLocalCandidate(evt);
this.pc.onconnectionstatechange = () => this.#onConnectionState();
this.pc.ontrack = (evt) => this.#onTrack(evt);
this.pc.ondatachannel = (evt) => this.#onDataChannel(evt);
return this.pc.createOffer()
.then((offer) => {
@ -567,6 +568,32 @@ class MediaMTXWebRTCReader {
this.conf.onTrack(evt);
}
}
#onDataChannel(evt) {
const dataChannel = evt.channel;
if (dataChannel.label === 'klv') {
dataChannel.onopen = () => {
console.log('KLV metadata data channel opened');
};
dataChannel.onmessage = (event) => {
if (this.conf.onKLVData !== undefined) {
// Parse KLV data from ArrayBuffer
const klvData = new Uint8Array(event.data);
this.conf.onKLVData(klvData);
}
};
dataChannel.onclose = () => {
console.log('KLV metadata data channel closed');
};
dataChannel.onerror = (error) => {
console.error('KLV metadata data channel error:', error);
};
}
}
}
window.MediaMTXWebRTCReader = MediaMTXWebRTCReader;