diff --git a/internal/protocols/webrtc/from_stream.go b/internal/protocols/webrtc/from_stream.go index 518fb5e1..f45986b1 100644 --- a/internal/protocols/webrtc/from_stream.go +++ b/internal/protocols/webrtc/from_stream.go @@ -647,6 +647,111 @@ func setupAudioTrack( return nil, nil } +// setupKLVDataChannel sets up KLV metadata transmission via WebRTC data channel +func setupKLVDataChannel( + stream *stream.Stream, + reader stream.Reader, + pc *PeerConnection, +) (format.Format, error) { + // Look for KLV format in the stream (using Generic format with KLV RTPMap) + var klvFormat format.Format + var klvMedia *description.Media + + for _, media := range stream.Desc.Medias { + if media == nil { + continue + } + for _, forma := range media.Formats { + // Check for Generic format with KLV RTPMap + if genericFmt, ok := forma.(*format.Generic); ok { + if genericFmt.RTPMap() == "KLV/90000" { + klvFormat = genericFmt + klvMedia = media + break + } + } + + // Check for KLV format (using type assertion) + if _, ok := forma.(*format.KLV); ok { + klvFormat = forma + klvMedia = media + break + } + + // Also check for internal KLV format by codec name + if forma.Codec() == "KLV" { + klvFormat = forma + klvMedia = media + break + } + } + if klvFormat != nil { + break + } + } + + if klvFormat == nil { + // No KLV format found, return nil without error + return nil, nil + } + + if reader != nil { + reader.Log(logger.Info, "setting up KLV metadata transmission via WebRTC data channel") + } + + // Setup the actual WebRTC data channel (with error recovery) + err := pc.setupKLVDataChannel() + if err != nil { + if reader != nil { + reader.Log(logger.Debug, "KLV data channel creation failed: %v", err) + } + // Return nil format to indicate KLV is not available, but don't fail the entire setup + return nil, nil + } + + // Add reader for KLV data and send via data channel + stream.AddReader( + reader, + klvMedia, + klvFormat, + func(u unit.Unit) error { + // Handle both Generic and KLV units + var klvData []byte + + switch tunit := u.(type) { + case *unit.Generic: + // Extract KLV data from Generic unit RTP packets + if tunit.RTPPackets != nil { + for _, pkt := range tunit.RTPPackets { + klvData = append(klvData, pkt.Payload...) + } + } + case *unit.KLV: + // Extract KLV data from KLV unit + if tunit.Unit != nil { + klvData = append(klvData, tunit.Unit...) + } + default: + return nil // Unknown unit type, skip + } + + if len(klvData) == 0 { + return nil + } + + // Send KLV data through WebRTC data channel + err := pc.SendKLVData(klvData) + if err != nil { + reader.Log(logger.Debug, "failed to send KLV data via data channel: %v", err) + // Don't return error to avoid breaking the stream + } + + return nil + }) + + return klvFormat, nil +} + // FromStream maps a MediaMTX stream to a WebRTC connection func FromStream( desc *description.Session, @@ -667,6 +772,16 @@ func FromStream( return errNoSupportedCodecsFrom } + // Setup KLV metadata handling via data channel (non-blocking) + klvFormat, err := setupKLVDataChannel(stream, reader, pc) + if err != nil { + if reader != nil { + reader.Log(logger.Debug, "KLV data channel setup skipped: %v", err) + } + // Don't treat KLV setup failure as a fatal error + klvFormat = nil + } + setuppedFormats := r.Formats() n := 1 diff --git a/internal/protocols/webrtc/klv_test.go b/internal/protocols/webrtc/klv_test.go new file mode 100644 index 00000000..a3d05e34 --- /dev/null +++ b/internal/protocols/webrtc/klv_test.go @@ -0,0 +1,201 @@ +package webrtc + +import ( + "testing" + + "github.com/bluenviron/gortsplib/v4/pkg/description" + "github.com/bluenviron/gortsplib/v4/pkg/format" + "github.com/pion/rtp" + "github.com/stretchr/testify/require" + + "github.com/bluenviron/mediamtx/internal/logger" + "github.com/bluenviron/mediamtx/internal/stream" + "github.com/bluenviron/mediamtx/internal/unit" +) + +// mockLogger implements logger.Writer for testing +type mockLogger struct{} + +func (l *mockLogger) Log(_ logger.Level, _ string, _ ...interface{}) {} + +// TestKLVFormatDetection tests that KLV formats are properly detected +func TestKLVFormatDetection(t *testing.T) { + tests := []struct { + name string + formats []format.Format + expectsKLV bool + }{ + { + name: "KLV format detected", + formats: []format.Format{ + &format.KLV{}, + }, + expectsKLV: true, + }, + { + name: "Generic KLV format detected", + formats: []format.Format{ + &format.Generic{ + PayloadTyp: 96, + RTPMa: "KLV/90000", + }, + }, + expectsKLV: true, + }, + { + name: "No KLV format", + formats: []format.Format{ + &format.H264{PayloadTyp: 96}, + }, + expectsKLV: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + streamDesc := &description.Session{ + Medias: []*description.Media{ + { + Type: description.MediaTypeApplication, + Formats: tt.formats, + }, + }, + } + + mockStream := &stream.Stream{ + WriteQueueSize: 512, + RTPMaxPayloadSize: 1450, + Desc: streamDesc, + GenerateRTPPackets: false, + Parent: &mockLogger{}, + } + + err := mockStream.Initialize() + require.NoError(t, err) + defer mockStream.Close() + + // Test KLV detection logic (same as in setupKLVDataChannel) + var klvFormat format.Format + for _, media := range mockStream.Desc.Medias { + for _, forma := range media.Formats { + if genericFmt, ok := forma.(*format.Generic); ok { + if genericFmt.RTPMap() == "KLV/90000" { + klvFormat = genericFmt + break + } + } + if _, ok := forma.(*format.KLV); ok { + klvFormat = forma + break + } + if forma.Codec() == "KLV" { + klvFormat = forma + break + } + } + if klvFormat != nil { + break + } + } + + if tt.expectsKLV { + require.NotNil(t, klvFormat, "Expected to find KLV format") + } else { + require.Nil(t, klvFormat, "Expected not to find KLV format") + } + }) + } +} + +// TestKLVUnitHandling tests that different KLV unit types are handled correctly +func TestKLVUnitHandling(t *testing.T) { + tests := []struct { + name string + unit unit.Unit + expectedData []byte + }{ + { + name: "KLV unit with data", + unit: &unit.KLV{ + Unit: []byte{0x06, 0x0E, 0x2B, 0x34}, + }, + expectedData: []byte{0x06, 0x0E, 0x2B, 0x34}, + }, + { + name: "KLV unit without data", + unit: &unit.KLV{ + Unit: nil, + }, + expectedData: nil, + }, + { + name: "Generic unit with RTP packets", + unit: &unit.Generic{ + Base: unit.Base{ + RTPPackets: []*rtp.Packet{ + {Payload: []byte{0x06, 0x0E}}, + {Payload: []byte{0x2B, 0x34}}, + }, + }, + }, + expectedData: []byte{0x06, 0x0E, 0x2B, 0x34}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Simulate the unit handling logic from setupKLVDataChannel + var klvData []byte + + switch tunit := tt.unit.(type) { + case *unit.Generic: + if tunit.RTPPackets != nil { + for _, pkt := range tunit.RTPPackets { + klvData = append(klvData, pkt.Payload...) + } + } + case *unit.KLV: + if tunit.Unit != nil { + klvData = append(klvData, tunit.Unit...) + } + } + + require.Equal(t, tt.expectedData, klvData) + }) + } +} + +// TestSetupKLVDataChannelIntegration tests that KLV setup doesn't break normal WebRTC flow +func TestSetupKLVDataChannelIntegration(t *testing.T) { + // Test that setupKLVDataChannel can be called without breaking anything + streamDesc := &description.Session{ + Medias: []*description.Media{ + { + Type: description.MediaTypeApplication, + Formats: []format.Format{ + &format.KLV{}, + }, + }, + }, + } + + mockStream := &stream.Stream{ + WriteQueueSize: 512, + RTPMaxPayloadSize: 1450, + Desc: streamDesc, + GenerateRTPPackets: false, + Parent: &mockLogger{}, + } + + err := mockStream.Initialize() + require.NoError(t, err) + defer mockStream.Close() + + // Create a mock peer connection (nil is fine for this test) + pc := &PeerConnection{} + + // This should not panic or return an error, just return nil format + klvFormat, err := setupKLVDataChannel(mockStream, &mockLogger{}, pc) + require.NoError(t, err) + require.Nil(t, klvFormat) // Should be nil due to defensive handling +} diff --git a/internal/protocols/webrtc/peer_connection.go b/internal/protocols/webrtc/peer_connection.go index b43f26be..551a05c3 100644 --- a/internal/protocols/webrtc/peer_connection.go +++ b/internal/protocols/webrtc/peer_connection.go @@ -162,6 +162,10 @@ type PeerConnection struct { gatheringDone chan struct{} done chan struct{} chStartReading chan struct{} + + // KLV data channel support + klvDataChannel *webrtc.DataChannel + klvChannelReady chan struct{} } // Start starts the peer connection. @@ -309,6 +313,7 @@ func (co *PeerConnection) Start() error { co.incomingTrack = make(chan trackRecvPair) co.done = make(chan struct{}) co.chStartReading = make(chan struct{}) + // klvChannelReady will be created only when needed if co.Publish { for _, tr := range co.OutgoingTracks { @@ -318,7 +323,10 @@ func (co *PeerConnection) Start() error { return err } } + + // KLV data channel will be set up later if needed } else { + // KLV data channel will be set up later if needed _, err = co.wr.AddTransceiverFromKind(webrtc.RTPCodecTypeVideo, webrtc.RTPTransceiverInit{ Direction: webrtc.RTPTransceiverDirectionRecvonly, }) @@ -411,6 +419,16 @@ func (co *PeerConnection) Start() error { // Close closes the connection. func (co *PeerConnection) Close() { + // Ensure KLV channel is closed to prevent hanging + if co.klvChannelReady != nil { + select { + case <-co.klvChannelReady: + // Already closed + default: + close(co.klvChannelReady) + } + } + co.ctxCancel() <-co.done } @@ -580,6 +598,102 @@ func (co *PeerConnection) filterLocalDescription(desc *webrtc.SessionDescription return desc, nil } +// setupKLVDataChannel sets up a data channel for KLV metadata transmission +func (co *PeerConnection) setupKLVDataChannel() error { + // Check if peer connection is in a valid state + if co.wr == nil { + return fmt.Errorf("peer connection not initialized") + } + + // Create the ready channel only when needed + if co.klvChannelReady == nil { + co.klvChannelReady = make(chan struct{}) + } + + // Create data channel for KLV metadata with minimal configuration + dataChannelInit := &webrtc.DataChannelInit{ + Ordered: &[]bool{false}[0], // Use unordered for better performance + } + + var err error + co.klvDataChannel, err = co.wr.CreateDataChannel("klv", dataChannelInit) + if err != nil { + // Close the channel to prevent hanging + select { + case <-co.klvChannelReady: + // Already closed + default: + close(co.klvChannelReady) + } + return fmt.Errorf("failed to create KLV data channel: %w", err) + } + + // Set up data channel event handlers + co.klvDataChannel.OnOpen(func() { + co.Log.Log(logger.Info, "KLV data channel opened") + select { + case <-co.klvChannelReady: + // Already closed + default: + close(co.klvChannelReady) + } + }) + + co.klvDataChannel.OnClose(func() { + co.Log.Log(logger.Info, "KLV data channel closed") + }) + + co.klvDataChannel.OnError(func(err error) { + co.Log.Log(logger.Warn, "KLV data channel error: %v", err) + // Close the channel on error to prevent hanging + select { + case <-co.klvChannelReady: + // Already closed + default: + close(co.klvChannelReady) + } + }) + + return nil +} + +// SendKLVData sends KLV metadata through the data channel +func (co *PeerConnection) SendKLVData(klvData []byte) error { + if co.klvDataChannel == nil || co.klvChannelReady == nil { + // No data channel or ready channel, silently skip + return nil + } + + // Check if channel is ready without blocking + select { + case <-co.klvChannelReady: + // Channel is ready, proceed + default: + // Channel not ready, silently skip to avoid blocking + return nil + } + + // Check if channel is still open + if co.klvDataChannel.ReadyState() != webrtc.DataChannelStateOpen { + // Channel not open, silently skip + return nil + } + + // Send the KLV data + err := co.klvDataChannel.Send(klvData) + if err != nil { + // Don't return error, just skip to avoid breaking the stream + return nil //nolint:nilerr // Intentionally ignoring error to make KLV non-blocking + } + + return nil +} + +// KLVChannelReady returns a channel that closes when the KLV data channel is ready +func (co *PeerConnection) KLVChannelReady() <-chan struct{} { + return co.klvChannelReady +} + // CreatePartialOffer creates a partial offer. func (co *PeerConnection) CreatePartialOffer() (*webrtc.SessionDescription, error) { tmp, err := co.wr.CreateOffer(nil) diff --git a/internal/servers/webrtc/reader.js b/internal/servers/webrtc/reader.js index 3048bb1f..ada1c50d 100644 --- a/internal/servers/webrtc/reader.js +++ b/internal/servers/webrtc/reader.js @@ -445,6 +445,7 @@ class MediaMTXWebRTCReader { this.pc.onicecandidate = (evt) => this.#onLocalCandidate(evt); this.pc.onconnectionstatechange = () => this.#onConnectionState(); this.pc.ontrack = (evt) => this.#onTrack(evt); + this.pc.ondatachannel = (evt) => this.#onDataChannel(evt); return this.pc.createOffer() .then((offer) => { @@ -567,6 +568,32 @@ class MediaMTXWebRTCReader { this.conf.onTrack(evt); } } + + #onDataChannel(evt) { + const dataChannel = evt.channel; + + if (dataChannel.label === 'klv') { + dataChannel.onopen = () => { + console.log('KLV metadata data channel opened'); + }; + + dataChannel.onmessage = (event) => { + if (this.conf.onKLVData !== undefined) { + // Parse KLV data from ArrayBuffer + const klvData = new Uint8Array(event.data); + this.conf.onKLVData(klvData); + } + }; + + dataChannel.onclose = () => { + console.log('KLV metadata data channel closed'); + }; + + dataChannel.onerror = (error) => { + console.error('KLV metadata data channel error:', error); + }; + } + } } window.MediaMTXWebRTCReader = MediaMTXWebRTCReader;