webrtc: improve connectivity mechanism (#2686)

This commit is contained in:
Alessandro Ros 2023-11-12 23:55:28 +01:00 committed by GitHub
parent 4cf8948fe6
commit 687d8685ef
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
13 changed files with 261 additions and 228 deletions

View file

@ -472,26 +472,29 @@ func (p *Core) createResources(initial bool) error {
if p.conf.WebRTC &&
p.webRTCManager == nil {
p.webRTCManager, err = newWebRTCManager(
p.conf.WebRTCAddress,
p.conf.WebRTCEncryption,
p.conf.WebRTCServerKey,
p.conf.WebRTCServerCert,
p.conf.WebRTCAllowOrigin,
p.conf.WebRTCTrustedProxies,
p.conf.WebRTCICEServers2,
p.conf.ReadTimeout,
p.conf.WriteQueueSize,
p.conf.WebRTCICEInterfaces,
p.conf.WebRTCICEHostNAT1To1IPs,
p.conf.WebRTCICEUDPMuxAddress,
p.conf.WebRTCICETCPMuxAddress,
p.externalCmdPool,
p.pathManager,
p.metrics,
p,
)
p.webRTCManager = &webRTCManager{
Address: p.conf.WebRTCAddress,
Encryption: p.conf.WebRTCEncryption,
ServerKey: p.conf.WebRTCServerKey,
ServerCert: p.conf.WebRTCServerCert,
AllowOrigin: p.conf.WebRTCAllowOrigin,
TrustedProxies: p.conf.WebRTCTrustedProxies,
ReadTimeout: p.conf.ReadTimeout,
WriteQueueSize: p.conf.WriteQueueSize,
LocalUDPAddress: p.conf.WebRTCLocalUDPAddress,
LocalTCPAddress: p.conf.WebRTCLocalTCPAddress,
IPsFromInterfaces: p.conf.WebRTCIPsFromInterfaces,
IPsFromInterfacesList: p.conf.WebRTCIPsFromInterfacesList,
AdditionalHosts: p.conf.WebRTCAdditionalHosts,
ICEServers: p.conf.WebRTCICEServers2,
ExternalCmdPool: p.externalCmdPool,
PathManager: p.pathManager,
Metrics: p.metrics,
Parent: p,
}
err = p.webRTCManager.initialize()
if err != nil {
p.webRTCManager = nil
return err
}
}
@ -689,13 +692,14 @@ func (p *Core) closeResources(newConf *conf.Conf, calledByAPI bool) {
newConf.WebRTCServerCert != p.conf.WebRTCServerCert ||
newConf.WebRTCAllowOrigin != p.conf.WebRTCAllowOrigin ||
!reflect.DeepEqual(newConf.WebRTCTrustedProxies, p.conf.WebRTCTrustedProxies) ||
!reflect.DeepEqual(newConf.WebRTCICEServers2, p.conf.WebRTCICEServers2) ||
newConf.ReadTimeout != p.conf.ReadTimeout ||
newConf.WriteQueueSize != p.conf.WriteQueueSize ||
!reflect.DeepEqual(newConf.WebRTCICEInterfaces, p.conf.WebRTCICEInterfaces) ||
!reflect.DeepEqual(newConf.WebRTCICEHostNAT1To1IPs, p.conf.WebRTCICEHostNAT1To1IPs) ||
newConf.WebRTCICEUDPMuxAddress != p.conf.WebRTCICEUDPMuxAddress ||
newConf.WebRTCICETCPMuxAddress != p.conf.WebRTCICETCPMuxAddress ||
newConf.WebRTCLocalUDPAddress != p.conf.WebRTCLocalUDPAddress ||
newConf.WebRTCLocalTCPAddress != p.conf.WebRTCLocalTCPAddress ||
newConf.WebRTCIPsFromInterfaces != p.conf.WebRTCIPsFromInterfaces ||
!reflect.DeepEqual(newConf.WebRTCIPsFromInterfacesList, p.conf.WebRTCIPsFromInterfacesList) ||
!reflect.DeepEqual(newConf.WebRTCAdditionalHosts, p.conf.WebRTCAdditionalHosts) ||
!reflect.DeepEqual(newConf.WebRTCICEServers2, p.conf.WebRTCICEServers2) ||
closeMetrics ||
closePathManager ||
closeLogger

View file

@ -376,10 +376,6 @@ func (pa *path) runInner() error {
case <-pa.onDemandPublisherCloseTimer.C:
pa.doOnDemandPublisherCloseTimer()
if pa.shouldClose() {
return fmt.Errorf("not in use")
}
case newConf := <-pa.chReloadConf:
pa.doReloadConf(newConf)

View file

@ -15,7 +15,6 @@ import (
"time"
"github.com/google/uuid"
"github.com/pion/ice/v2"
"github.com/pion/logging"
pwebrtc "github.com/pion/webrtc/v3"
@ -165,14 +164,24 @@ type webRTCManagerParent interface {
}
type webRTCManager struct {
allowOrigin string
trustedProxies conf.IPsOrCIDRs
iceServers []conf.WebRTCICEServer
writeQueueSize int
externalCmdPool *externalcmd.Pool
pathManager *pathManager
metrics *metrics
parent webRTCManagerParent
Address string
Encryption bool
ServerKey string
ServerCert string
AllowOrigin string
TrustedProxies conf.IPsOrCIDRs
ReadTimeout conf.StringDuration
WriteQueueSize int
LocalUDPAddress string
LocalTCPAddress string
IPsFromInterfaces bool
IPsFromInterfacesList []string
AdditionalHosts []string
ICEServers []conf.WebRTCICEServer
ExternalCmdPool *externalcmd.Pool
PathManager *pathManager
Metrics *metrics
Parent webRTCManagerParent
ctx context.Context
ctxCancel func()
@ -196,127 +205,97 @@ type webRTCManager struct {
done chan struct{}
}
func newWebRTCManager(
address string,
encryption bool,
serverKey string,
serverCert string,
allowOrigin string,
trustedProxies conf.IPsOrCIDRs,
iceServers []conf.WebRTCICEServer,
readTimeout conf.StringDuration,
writeQueueSize int,
iceInterfaces []string,
iceHostNAT1To1IPs []string,
iceUDPMuxAddress string,
iceTCPMuxAddress string,
externalCmdPool *externalcmd.Pool,
pathManager *pathManager,
metrics *metrics,
parent webRTCManagerParent,
) (*webRTCManager, error) {
func (m *webRTCManager) initialize() error {
ctx, ctxCancel := context.WithCancel(context.Background())
m := &webRTCManager{
allowOrigin: allowOrigin,
trustedProxies: trustedProxies,
iceServers: iceServers,
writeQueueSize: writeQueueSize,
externalCmdPool: externalCmdPool,
pathManager: pathManager,
metrics: metrics,
parent: parent,
ctx: ctx,
ctxCancel: ctxCancel,
sessions: make(map[*webRTCSession]struct{}),
sessionsBySecret: make(map[uuid.UUID]*webRTCSession),
chNewSession: make(chan webRTCNewSessionReq),
chCloseSession: make(chan *webRTCSession),
chAddSessionCandidates: make(chan webRTCAddSessionCandidatesReq),
chDeleteSession: make(chan webRTCDeleteSessionReq),
chAPISessionsList: make(chan webRTCManagerAPISessionsListReq),
chAPISessionsGet: make(chan webRTCManagerAPISessionsGetReq),
chAPIConnsKick: make(chan webRTCManagerAPISessionsKickReq),
done: make(chan struct{}),
}
m.ctx = ctx
m.ctxCancel = ctxCancel
m.sessions = make(map[*webRTCSession]struct{})
m.sessionsBySecret = make(map[uuid.UUID]*webRTCSession)
m.chNewSession = make(chan webRTCNewSessionReq)
m.chCloseSession = make(chan *webRTCSession)
m.chAddSessionCandidates = make(chan webRTCAddSessionCandidatesReq)
m.chDeleteSession = make(chan webRTCDeleteSessionReq)
m.chAPISessionsList = make(chan webRTCManagerAPISessionsListReq)
m.chAPISessionsGet = make(chan webRTCManagerAPISessionsGetReq)
m.chAPIConnsKick = make(chan webRTCManagerAPISessionsKickReq)
m.done = make(chan struct{})
var err error
m.httpServer, err = newWebRTCHTTPServer(
address,
encryption,
serverKey,
serverCert,
allowOrigin,
trustedProxies,
readTimeout,
pathManager,
m.Address,
m.Encryption,
m.ServerKey,
m.ServerCert,
m.AllowOrigin,
m.TrustedProxies,
m.ReadTimeout,
m.PathManager,
m,
)
if err != nil {
ctxCancel()
return nil, err
return err
}
var iceUDPMux ice.UDPMux
apiConf := webrtc.APIConf{
LocalRandomUDP: false,
IPsFromInterfaces: m.IPsFromInterfaces,
IPsFromInterfacesList: m.IPsFromInterfacesList,
AdditionalHosts: m.AdditionalHosts,
}
if iceUDPMuxAddress != "" {
m.udpMuxLn, err = net.ListenPacket(restrictnetwork.Restrict("udp", iceUDPMuxAddress))
if m.LocalUDPAddress != "" {
m.udpMuxLn, err = net.ListenPacket(restrictnetwork.Restrict("udp", m.LocalUDPAddress))
if err != nil {
m.httpServer.close()
ctxCancel()
return nil, err
return err
}
iceUDPMux = pwebrtc.NewICEUDPMux(webrtcNilLogger, m.udpMuxLn)
apiConf.ICEUDPMux = pwebrtc.NewICEUDPMux(webrtcNilLogger, m.udpMuxLn)
}
var iceTCPMux ice.TCPMux
if iceTCPMuxAddress != "" {
m.tcpMuxLn, err = net.Listen(restrictnetwork.Restrict("tcp", iceTCPMuxAddress))
if m.LocalTCPAddress != "" {
m.tcpMuxLn, err = net.Listen(restrictnetwork.Restrict("tcp", m.LocalTCPAddress))
if err != nil {
m.udpMuxLn.Close()
m.httpServer.close()
ctxCancel()
return nil, err
return err
}
iceTCPMux = pwebrtc.NewICETCPMux(webrtcNilLogger, m.tcpMuxLn, 8)
apiConf.ICETCPMux = pwebrtc.NewICETCPMux(webrtcNilLogger, m.tcpMuxLn, 8)
}
m.api, err = webrtc.NewAPI(webrtc.APIConf{
ICEInterfaces: iceInterfaces,
ICEHostNAT1To1IPs: iceHostNAT1To1IPs,
ICEUDPMux: iceUDPMux,
ICETCPMux: iceTCPMux,
})
m.api, err = webrtc.NewAPI(apiConf)
if err != nil {
m.udpMuxLn.Close()
m.tcpMuxLn.Close()
m.httpServer.close()
ctxCancel()
return nil, err
return err
}
str := "listener opened on " + address + " (HTTP)"
str := "listener opened on " + m.Address + " (HTTP)"
if m.udpMuxLn != nil {
str += ", " + iceUDPMuxAddress + " (ICE/UDP)"
str += ", " + m.LocalUDPAddress + " (ICE/UDP)"
}
if m.tcpMuxLn != nil {
str += ", " + iceTCPMuxAddress + " (ICE/TCP)"
str += ", " + m.LocalTCPAddress + " (ICE/TCP)"
}
m.Log(logger.Info, str)
if m.metrics != nil {
m.metrics.webRTCManagerSet(m)
if m.Metrics != nil {
m.Metrics.webRTCManagerSet(m)
}
go m.run()
return m, nil
return nil
}
// Log is the main logging function.
func (m *webRTCManager) Log(level logger.Level, format string, args ...interface{}) {
m.parent.Log(level, "[WebRTC] "+format, args...)
m.Parent.Log(level, "[WebRTC] "+format, args...)
}
func (m *webRTCManager) close() {
@ -336,12 +315,12 @@ outer:
case req := <-m.chNewSession:
sx := newWebRTCSession(
m.ctx,
m.writeQueueSize,
m.WriteQueueSize,
m.api,
req,
&wg,
m.externalCmdPool,
m.pathManager,
m.ExternalCmdPool,
m.PathManager,
m,
)
m.sessions[sx] = struct{}{}
@ -441,9 +420,9 @@ func (m *webRTCManager) findSessionByUUID(uuid uuid.UUID) *webRTCSession {
}
func (m *webRTCManager) generateICEServers() ([]pwebrtc.ICEServer, error) {
ret := make([]pwebrtc.ICEServer, len(m.iceServers))
ret := make([]pwebrtc.ICEServer, len(m.ICEServers))
for i, server := range m.iceServers {
for i, server := range m.ICEServers {
if server.Username == "AUTH_SECRET" {
expireDate := time.Now().Add(webrtcTurnSecretExpiration).Unix()