1
0
Fork 0
forked from External/mediamtx

webrtc: support reading, publishing, proxying LPCM tracks (#3437)

This commit is contained in:
Alessandro Ros 2024-06-09 22:51:16 +02:00 committed by GitHub
parent bf8b68d757
commit eaf47e6598
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 380 additions and 109 deletions

View file

@ -20,7 +20,7 @@ func newGeneric(
generateRTPPackets bool, generateRTPPackets bool,
) (*formatProcessorGeneric, error) { ) (*formatProcessorGeneric, error) {
if generateRTPPackets { if generateRTPPackets {
return nil, fmt.Errorf("we don't know how to generate RTP packets of format %+v", forma) return nil, fmt.Errorf("we don't know how to generate RTP packets of format %T", forma)
} }
return &formatProcessorGeneric{ return &formatProcessorGeneric{

View file

@ -19,6 +19,11 @@ const (
keyFrameInterval = 2 * time.Second keyFrameInterval = 2 * time.Second
) )
const (
mimeTypeMultiopus = "audio/multiopus"
mimeTypeL16 = "audio/L16"
)
var incomingVideoCodecs = []webrtc.RTPCodecParameters{ var incomingVideoCodecs = []webrtc.RTPCodecParameters{
{ {
RTPCodecCapability: webrtc.RTPCodecCapability{ RTPCodecCapability: webrtc.RTPCodecCapability{
@ -95,7 +100,7 @@ var incomingVideoCodecs = []webrtc.RTPCodecParameters{
var incomingAudioCodecs = []webrtc.RTPCodecParameters{ var incomingAudioCodecs = []webrtc.RTPCodecParameters{
{ {
RTPCodecCapability: webrtc.RTPCodecCapability{ RTPCodecCapability: webrtc.RTPCodecCapability{
MimeType: mimeMultiopus, MimeType: mimeTypeMultiopus,
ClockRate: 48000, ClockRate: 48000,
Channels: 3, Channels: 3,
SDPFmtpLine: "channel_mapping=0,2,1;num_streams=2;coupled_streams=1", SDPFmtpLine: "channel_mapping=0,2,1;num_streams=2;coupled_streams=1",
@ -104,7 +109,7 @@ var incomingAudioCodecs = []webrtc.RTPCodecParameters{
}, },
{ {
RTPCodecCapability: webrtc.RTPCodecCapability{ RTPCodecCapability: webrtc.RTPCodecCapability{
MimeType: mimeMultiopus, MimeType: mimeTypeMultiopus,
ClockRate: 48000, ClockRate: 48000,
Channels: 4, Channels: 4,
SDPFmtpLine: "channel_mapping=0,1,2,3;num_streams=2;coupled_streams=2", SDPFmtpLine: "channel_mapping=0,1,2,3;num_streams=2;coupled_streams=2",
@ -113,7 +118,7 @@ var incomingAudioCodecs = []webrtc.RTPCodecParameters{
}, },
{ {
RTPCodecCapability: webrtc.RTPCodecCapability{ RTPCodecCapability: webrtc.RTPCodecCapability{
MimeType: mimeMultiopus, MimeType: mimeTypeMultiopus,
ClockRate: 48000, ClockRate: 48000,
Channels: 5, Channels: 5,
SDPFmtpLine: "channel_mapping=0,4,1,2,3;num_streams=3;coupled_streams=2", SDPFmtpLine: "channel_mapping=0,4,1,2,3;num_streams=3;coupled_streams=2",
@ -122,7 +127,7 @@ var incomingAudioCodecs = []webrtc.RTPCodecParameters{
}, },
{ {
RTPCodecCapability: webrtc.RTPCodecCapability{ RTPCodecCapability: webrtc.RTPCodecCapability{
MimeType: mimeMultiopus, MimeType: mimeTypeMultiopus,
ClockRate: 48000, ClockRate: 48000,
Channels: 6, Channels: 6,
SDPFmtpLine: "channel_mapping=0,4,1,2,3,5;num_streams=4;coupled_streams=2", SDPFmtpLine: "channel_mapping=0,4,1,2,3,5;num_streams=4;coupled_streams=2",
@ -131,7 +136,7 @@ var incomingAudioCodecs = []webrtc.RTPCodecParameters{
}, },
{ {
RTPCodecCapability: webrtc.RTPCodecCapability{ RTPCodecCapability: webrtc.RTPCodecCapability{
MimeType: mimeMultiopus, MimeType: mimeTypeMultiopus,
ClockRate: 48000, ClockRate: 48000,
Channels: 7, Channels: 7,
SDPFmtpLine: "channel_mapping=0,4,1,2,3,5,6;num_streams=4;coupled_streams=4", SDPFmtpLine: "channel_mapping=0,4,1,2,3,5,6;num_streams=4;coupled_streams=4",
@ -140,7 +145,7 @@ var incomingAudioCodecs = []webrtc.RTPCodecParameters{
}, },
{ {
RTPCodecCapability: webrtc.RTPCodecCapability{ RTPCodecCapability: webrtc.RTPCodecCapability{
MimeType: mimeMultiopus, MimeType: mimeTypeMultiopus,
ClockRate: 48000, ClockRate: 48000,
Channels: 8, Channels: 8,
SDPFmtpLine: "channel_mapping=0,6,1,4,5,2,3,7;num_streams=5;coupled_streams=4", SDPFmtpLine: "channel_mapping=0,6,1,4,5,2,3,7;num_streams=5;coupled_streams=4",
@ -193,6 +198,30 @@ var incomingAudioCodecs = []webrtc.RTPCodecParameters{
}, },
PayloadType: 8, PayloadType: 8,
}, },
{
RTPCodecCapability: webrtc.RTPCodecCapability{
MimeType: mimeTypeL16,
ClockRate: 8000,
Channels: 2,
},
PayloadType: 120,
},
{
RTPCodecCapability: webrtc.RTPCodecCapability{
MimeType: mimeTypeL16,
ClockRate: 16000,
Channels: 2,
},
PayloadType: 121,
},
{
RTPCodecCapability: webrtc.RTPCodecCapability{
MimeType: mimeTypeL16,
ClockRate: 48000,
Channels: 2,
},
PayloadType: 122,
},
} }
// IncomingTrack is an incoming track. // IncomingTrack is an incoming track.
@ -245,7 +274,7 @@ func newIncomingTrack(
PacketizationMode: 1, PacketizationMode: 1,
} }
case strings.ToLower(mimeMultiopus): case strings.ToLower(mimeTypeMultiopus):
t.format = &format.Opus{ t.format = &format.Opus{
PayloadTyp: uint8(track.PayloadType()), PayloadTyp: uint8(track.PayloadType()),
ChannelCount: int(track.Codec().Channels), ChannelCount: int(track.Codec().Channels),
@ -301,6 +330,14 @@ func newIncomingTrack(
ChannelCount: int(channels), ChannelCount: int(channels),
} }
case strings.ToLower(mimeTypeL16):
t.format = &format.LPCM{
PayloadTyp: uint8(track.PayloadType()),
BitDepth: 16,
SampleRate: int(track.Codec().ClockRate),
ChannelCount: int(track.Codec().Channels),
}
default: default:
return nil, fmt.Errorf("unsupported codec: %+v", track.Codec()) return nil, fmt.Errorf("unsupported codec: %+v", track.Codec())
} }

View file

@ -8,10 +8,6 @@ import (
"github.com/pion/webrtc/v3" "github.com/pion/webrtc/v3"
) )
const (
mimeMultiopus = "audio/multiopus"
)
// OutgoingTrack is a WebRTC outgoing track // OutgoingTrack is a WebRTC outgoing track
type OutgoingTrack struct { type OutgoingTrack struct {
Format format.Format Format format.Format
@ -62,7 +58,7 @@ func (t *OutgoingTrack) codecParameters() (webrtc.RTPCodecParameters, error) {
if forma.ChannelCount > 2 { if forma.ChannelCount > 2 {
return webrtc.RTPCodecParameters{ return webrtc.RTPCodecParameters{
RTPCodecCapability: webrtc.RTPCodecCapability{ RTPCodecCapability: webrtc.RTPCodecCapability{
MimeType: mimeMultiopus, MimeType: mimeTypeMultiopus,
ClockRate: 48000, ClockRate: 48000,
Channels: uint16(forma.ChannelCount), Channels: uint16(forma.ChannelCount),
}, },
@ -140,6 +136,28 @@ func (t *OutgoingTrack) codecParameters() (webrtc.RTPCodecParameters, error) {
PayloadType: 8, PayloadType: 8,
}, nil }, nil
case *format.LPCM:
if forma.BitDepth != 16 {
return webrtc.RTPCodecParameters{}, fmt.Errorf("unsupported LPCM bit depth: %d", forma.BitDepth)
}
if forma.ClockRate() != 8000 && forma.ClockRate() != 16000 && forma.ClockRate() != 48000 {
return webrtc.RTPCodecParameters{}, fmt.Errorf("unsupported clock rate: %d", forma.ClockRate())
}
if forma.ChannelCount != 1 && forma.ChannelCount != 2 {
return webrtc.RTPCodecParameters{}, fmt.Errorf("unsupported channel count: %d", forma.ChannelCount)
}
return webrtc.RTPCodecParameters{
RTPCodecCapability: webrtc.RTPCodecCapability{
MimeType: mimeTypeL16,
ClockRate: uint32(forma.ClockRate()),
Channels: uint16(forma.ChannelCount),
},
PayloadType: 96,
}, nil
default: default:
return webrtc.RTPCodecParameters{}, fmt.Errorf("unsupported track type: %T", forma) return webrtc.RTPCodecParameters{}, fmt.Errorf("unsupported track type: %T", forma)
} }

View file

@ -169,6 +169,51 @@ func TestPeerConnectionPublishRead(t *testing.T) {
ChannelCount: 1, ChannelCount: 1,
}, },
}, },
{
"l16 8000 stereo",
&format.LPCM{
PayloadTyp: 96,
BitDepth: 16,
SampleRate: 8000,
ChannelCount: 2,
},
&format.LPCM{
PayloadTyp: 96,
BitDepth: 16,
SampleRate: 8000,
ChannelCount: 2,
},
},
{
"l16 16000 stereo",
&format.LPCM{
PayloadTyp: 96,
BitDepth: 16,
SampleRate: 16000,
ChannelCount: 2,
},
&format.LPCM{
PayloadTyp: 96,
BitDepth: 16,
SampleRate: 16000,
ChannelCount: 2,
},
},
{
"l16 48khz stereo",
&format.LPCM{
PayloadTyp: 96,
BitDepth: 16,
SampleRate: 48000,
ChannelCount: 2,
},
&format.LPCM{
PayloadTyp: 96,
BitDepth: 16,
SampleRate: 48000,
ChannelCount: 2,
},
},
} { } {
t.Run(ca.name, func(t *testing.T) { t.Run(ca.name, func(t *testing.T) {
pc1 := &PeerConnection{ pc1 := &PeerConnection{

View file

@ -158,6 +158,24 @@ const enableMultichannelOpus = (section) => {
return lines.join('\r\n'); return lines.join('\r\n');
}; };
const enableL16 = (section) => {
let lines = section.split('\r\n');
lines[0] += " 120";
lines.splice(lines.length - 1, 0, "a=rtpmap:120 L16/8000/2");
lines.splice(lines.length - 1, 0, "a=rtcp-fb:120 transport-cc");
lines[0] += " 121";
lines.splice(lines.length - 1, 0, "a=rtpmap:121 L16/16000/2");
lines.splice(lines.length - 1, 0, "a=rtcp-fb:121 transport-cc");
lines[0] += " 122";
lines.splice(lines.length - 1, 0, "a=rtpmap:122 L16/48000/2");
lines.splice(lines.length - 1, 0, "a=rtcp-fb:122 transport-cc");
return lines.join('\r\n');
};
const enableStereoOpus = (section) => { const enableStereoOpus = (section) => {
let opusPayloadFormat = ''; let opusPayloadFormat = '';
let lines = section.split('\r\n'); let lines = section.split('\r\n');
@ -202,6 +220,10 @@ const editOffer = (sdp) => {
sections[i] = enableMultichannelOpus(sections[i]); sections[i] = enableMultichannelOpus(sections[i]);
} }
if (nonAdvertisedCodecs.includes('L16/48000/2')) {
sections[i] = enableL16(sections[i]);
}
break; break;
} }
} }

View file

@ -5,10 +5,12 @@ import (
"context" "context"
"net/http" "net/http"
"net/url" "net/url"
"reflect"
"testing" "testing"
"time" "time"
"github.com/bluenviron/gortsplib/v4/pkg/description" "github.com/bluenviron/gortsplib/v4/pkg/description"
"github.com/bluenviron/gortsplib/v4/pkg/format"
"github.com/bluenviron/mediamtx/internal/asyncwriter" "github.com/bluenviron/mediamtx/internal/asyncwriter"
"github.com/bluenviron/mediamtx/internal/auth" "github.com/bluenviron/mediamtx/internal/auth"
"github.com/bluenviron/mediamtx/internal/conf" "github.com/bluenviron/mediamtx/internal/conf"
@ -336,107 +338,214 @@ func TestServerPublish(t *testing.T) {
} }
func TestServerRead(t *testing.T) { func TestServerRead(t *testing.T) {
desc := &description.Session{Medias: []*description.Media{test.MediaH264}} for _, ca := range []struct {
name string
stream, err := stream.New( medias []*description.Media
1460, unit unit.Unit
desc, outRTPPayload []byte
true, }{
test.NilLogger, {
) "av1",
require.NoError(t, err) []*description.Media{{
Type: description.MediaTypeVideo,
path := &dummyPath{stream: stream} Formats: []format.Format{&format.AV1{
PayloadTyp: 96,
pathManager := &dummyPathManager{path: path} }},
}},
s := &Server{ &unit.AV1{
Address: "127.0.0.1:8886", TU: [][]byte{{1, 2}},
Encryption: false, },
ServerKey: "", []byte{0, 2, 1, 2},
ServerCert: "", },
AllowOrigin: "", // TODO: check why this doesn't work
TrustedProxies: conf.IPNetworks{}, /*{
ReadTimeout: conf.StringDuration(10 * time.Second), "vp9",
WriteQueueSize: 512, []*description.Media{{
LocalUDPAddress: "127.0.0.1:8887", Type: description.MediaTypeVideo,
LocalTCPAddress: "127.0.0.1:8887", Formats: []format.Format{&format.VP9{
IPsFromInterfaces: true, PayloadTyp: 96,
IPsFromInterfacesList: []string{}, }},
AdditionalHosts: []string{}, }},
ICEServers: []conf.WebRTCICEServer{}, &unit.VP9{
HandshakeTimeout: conf.StringDuration(10 * time.Second), Frame: []byte{1, 2},
TrackGatherTimeout: conf.StringDuration(2 * time.Second), },
ExternalCmdPool: nil, []byte{1, 2},
PathManager: pathManager, },*/
Parent: test.NilLogger, {
} "vp8",
err = s.Initialize() []*description.Media{{
require.NoError(t, err) Type: description.MediaTypeVideo,
defer s.Close() Formats: []format.Format{&format.VP8{
PayloadTyp: 96,
u, err := url.Parse("http://myuser:mypass@localhost:8886/teststream/whep?param=value") }},
require.NoError(t, err) }},
&unit.VP8{
tr := &http.Transport{} Frame: []byte{1, 2},
defer tr.CloseIdleConnections() },
hc := &http.Client{Transport: tr} []byte{0x10, 1, 2},
},
wc := &webrtc.WHIPClient{ {
HTTPClient: hc, "h264",
URL: u, []*description.Media{test.MediaH264},
Log: test.NilLogger, &unit.H264{
}
writerDone := make(chan struct{})
defer func() { <-writerDone }()
writerTerminate := make(chan struct{})
defer close(writerTerminate)
go func() {
defer close(writerDone)
for {
select {
case <-time.After(100 * time.Millisecond):
case <-writerTerminate:
return
}
stream.WriteUnit(desc.Medias[0], desc.Medias[0].Formats[0], &unit.H264{
Base: unit.Base{
NTP: time.Time{},
},
AU: [][]byte{ AU: [][]byte{
{5, 1}, {5, 1},
}, },
}) },
} []byte{
}() 0x18, 0x00, 0x19, 0x67, 0x42, 0xc0, 0x28, 0xd9,
0x00, 0x78, 0x02, 0x27, 0xe5, 0x84, 0x00, 0x00,
tracks, err := wc.Read(context.Background()) 0x03, 0x00, 0x04, 0x00, 0x00, 0x03, 0x00, 0xf0,
require.NoError(t, err) 0x3c, 0x60, 0xc9, 0x20, 0x00, 0x04, 0x08, 0x06,
defer checkClose(t, wc.Close) 0x07, 0x08, 0x00, 0x02, 0x05, 0x01,
},
pkt, err := tracks[0].ReadRTP()
require.NoError(t, err)
require.Equal(t, &rtp.Packet{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 104,
SequenceNumber: pkt.SequenceNumber,
Timestamp: pkt.Timestamp,
SSRC: pkt.SSRC,
CSRC: []uint32{},
}, },
Payload: []byte{ {
0x18, 0x00, 0x19, 0x67, 0x42, 0xc0, 0x28, 0xd9, "opus",
0x00, 0x78, 0x02, 0x27, 0xe5, 0x84, 0x00, 0x00, []*description.Media{{
0x03, 0x00, 0x04, 0x00, 0x00, 0x03, 0x00, 0xf0, Type: description.MediaTypeAudio,
0x3c, 0x60, 0xc9, 0x20, 0x00, 0x04, 0x08, 0x06, Formats: []format.Format{&format.Opus{
0x07, 0x08, 0x00, 0x02, 0x05, 0x01, PayloadTyp: 96,
ChannelCount: 2,
}},
}},
&unit.Opus{
Packets: [][]byte{{1, 2}},
},
[]byte{1, 2},
}, },
}, pkt) // TODO: check why this doesn't work
/*{
"g722",
[]*description.Media{{
Type: description.MediaTypeAudio,
Formats: []format.Format{&format.G722{}},
}},
&unit.Generic{
Base: unit.Base{
RTPPackets: []*rtp.Packet{{
Header: rtp.Header{},
Payload: []byte{1, 2},
}},
},
},
[]byte{1, 2},
},*/
{
"g711",
[]*description.Media{{
Type: description.MediaTypeAudio,
Formats: []format.Format{&format.G711{
MULaw: true,
SampleRate: 8000,
ChannelCount: 1,
}},
}},
&unit.G711{
Samples: []byte{1, 2, 3},
},
[]byte{1, 2, 3},
},
{
"lpcm",
[]*description.Media{{
Type: description.MediaTypeAudio,
Formats: []format.Format{&format.LPCM{
PayloadTyp: 96,
BitDepth: 16,
SampleRate: 48000,
ChannelCount: 2,
}},
}},
&unit.LPCM{
Samples: []byte{1, 2, 3, 4},
},
[]byte{1, 2, 3, 4},
},
} {
t.Run(ca.name, func(t *testing.T) {
desc := &description.Session{Medias: ca.medias}
stream, err := stream.New(
1460,
desc,
true,
test.NilLogger,
)
require.NoError(t, err)
path := &dummyPath{stream: stream}
pathManager := &dummyPathManager{path: path}
s := &Server{
Address: "127.0.0.1:8886",
Encryption: false,
ServerKey: "",
ServerCert: "",
AllowOrigin: "",
TrustedProxies: conf.IPNetworks{},
ReadTimeout: conf.StringDuration(10 * time.Second),
WriteQueueSize: 512,
LocalUDPAddress: "127.0.0.1:8887",
LocalTCPAddress: "127.0.0.1:8887",
IPsFromInterfaces: true,
IPsFromInterfacesList: []string{},
AdditionalHosts: []string{},
ICEServers: []conf.WebRTCICEServer{},
HandshakeTimeout: conf.StringDuration(10 * time.Second),
TrackGatherTimeout: conf.StringDuration(2 * time.Second),
ExternalCmdPool: nil,
PathManager: pathManager,
Parent: test.NilLogger,
}
err = s.Initialize()
require.NoError(t, err)
defer s.Close()
u, err := url.Parse("http://myuser:mypass@localhost:8886/teststream/whep?param=value")
require.NoError(t, err)
tr := &http.Transport{}
defer tr.CloseIdleConnections()
hc := &http.Client{Transport: tr}
wc := &webrtc.WHIPClient{
HTTPClient: hc,
URL: u,
Log: test.NilLogger,
}
writerDone := make(chan struct{})
defer func() { <-writerDone }()
writerTerminate := make(chan struct{})
defer close(writerTerminate)
go func() {
defer close(writerDone)
for {
select {
case <-time.After(100 * time.Millisecond):
case <-writerTerminate:
return
}
r := reflect.New(reflect.TypeOf(ca.unit).Elem())
r.Elem().Set(reflect.ValueOf(ca.unit).Elem())
stream.WriteUnit(desc.Medias[0], desc.Medias[0].Formats[0], r.Interface().(unit.Unit))
}
}()
tracks, err := wc.Read(context.Background())
require.NoError(t, err)
defer checkClose(t, wc.Close)
pkt, err := tracks[0].ReadRTP()
require.NoError(t, err)
require.Equal(t, ca.outRTPPayload, pkt.Payload)
})
}
} }
func TestServerPostNotFound(t *testing.T) { func TestServerPostNotFound(t *testing.T) {

View file

@ -14,6 +14,7 @@ import (
"github.com/bluenviron/gortsplib/v4/pkg/format" "github.com/bluenviron/gortsplib/v4/pkg/format"
"github.com/bluenviron/gortsplib/v4/pkg/format/rtpav1" "github.com/bluenviron/gortsplib/v4/pkg/format/rtpav1"
"github.com/bluenviron/gortsplib/v4/pkg/format/rtph264" "github.com/bluenviron/gortsplib/v4/pkg/format/rtph264"
"github.com/bluenviron/gortsplib/v4/pkg/format/rtplpcm"
"github.com/bluenviron/gortsplib/v4/pkg/format/rtpvp8" "github.com/bluenviron/gortsplib/v4/pkg/format/rtpvp8"
"github.com/bluenviron/gortsplib/v4/pkg/format/rtpvp9" "github.com/bluenviron/gortsplib/v4/pkg/format/rtpvp9"
"github.com/bluenviron/gortsplib/v4/pkg/rtptime" "github.com/bluenviron/gortsplib/v4/pkg/rtptime"
@ -259,6 +260,45 @@ func findAudioTrack(
} }
} }
var lpcmFormat *format.LPCM
media = stream.Desc().FindFormat(&lpcmFormat)
if lpcmFormat != nil {
return lpcmFormat, func(track *webrtc.OutgoingTrack) error {
encoder := &rtplpcm.Encoder{
PayloadType: 96,
BitDepth: 16,
ChannelCount: lpcmFormat.ChannelCount,
PayloadMaxSize: webrtcPayloadMaxSize,
}
err := encoder.Init()
if err != nil {
return err
}
stream.AddReader(writer, media, lpcmFormat, func(u unit.Unit) error {
tunit := u.(*unit.LPCM)
if tunit.Samples == nil {
return nil
}
packets, err := encoder.Encode(tunit.Samples)
if err != nil {
return nil //nolint:nilerr
}
for _, pkt := range packets {
pkt.Timestamp += tunit.RTPPackets[0].Timestamp
track.WriteRTP(pkt) //nolint:errcheck
}
return nil
})
return nil
}
}
return nil, nil return nil, nil
} }