diff --git a/README.md b/README.md index fe5dde94..ec9bdf4e 100644 --- a/README.md +++ b/README.md @@ -23,6 +23,7 @@ Live streams can be published to the server with: |[SRT clients](#srt-clients)||H265, H264|Opus, MPEG-4 Audio (AAC)| |[SRT servers](#srt-servers)||H265, H264|Opus, MPEG-4 Audio (AAC)| |[WebRTC clients](#webrtc-clients)|Browser-based, WHIP|AV1, VP9, VP8, H264|Opus, G722, G711| +|[WebRTC servers](#webrtc-servers)|WHEP|AV1, VP9, VP8, H264|Opus, G722, G711| |[RTSP clients](#rtsp-clients)|UDP, TCP, RTSPS|AV1, VP9, VP8, H265, H264, MPEG-4 Video (H263, Xvid), MPEG-1/2 Video, M-JPEG and any RTP-compatible codec|Opus, MPEG-4 Audio (AAC), MPEG-1/2 Audio (MP3), G726, G722, G711, LPCM and any RTP-compatible codec| |[RTSP cameras and servers](#rtsp-cameras-and-servers)|UDP, UDP-Multicast, TCP, RTSPS|AV1, VP9, VP8, H265, H264, MPEG-4 Video (H263, Xvid), MPEG-1/2 Video, M-JPEG and any RTP-compatible codec|Opus, MPEG-4 Audio (AAC), MPEG-1/2 Audio (MP3), G726, G722, G711, LPCM and any RTP-compatible codec| |[RTMP clients](#rtmp-clients)|RTMP, RTMPS, Enhanced RTMP|AV1, H265, H264|MPEG-4 Audio (AAC), MPEG-1/2 Audio (MP3)| @@ -82,6 +83,7 @@ _rtsp-simple-server_ has been rebranded as _MediaMTX_. The reason is pretty obvi * [SRT clients](#srt-clients) * [SRT servers](#srt-servers) * [WebRTC clients](#webrtc-clients) + * [WebRTC servers](#webrtc-servers) * [RTSP clients](#rtsp-clients) * [RTSP cameras and servers](#rtsp-cameras-and-servers) * [RTMP clients](#rtmp-clients) @@ -593,6 +595,17 @@ Depending on the network it may be difficult to establish a connection between s Known clients that can publish with WebRTC and WHIP are [FFmpeg](#ffmpeg), [Gstreamer](#gstreamer), [OBS Studio](#obs-studio). +#### WebRTC servers + +In order to ingest into the server a WebRTC stream from an existing server, add the corresponding WHEP URL into the `source` parameter of a path: + +```yml +paths: + proxied: + # url of the source stream, in the format whep://host:port/path (HTTP) or wheps:// (HTTPS) + source: wheps://host:port/path +``` + #### RTSP clients RTSP is a protocol that allows to publish and read streams. It supports different underlying transport protocols and allows to encrypt streams in transit (see [RTSP-specific features](#rtsp-specific-features)). In order to publish a stream to the server with the RTSP protocol, use this URL: diff --git a/apidocs/openapi.yaml b/apidocs/openapi.yaml index 3ffddf52..306c2bbf 100644 --- a/apidocs/openapi.yaml +++ b/apidocs/openapi.yaml @@ -384,6 +384,7 @@ components: - srtSource - udpSource - webRTCSession + - webRTCSource id: type: string diff --git a/internal/conf/path.go b/internal/conf/path.go index f5fc4aeb..d5b8d526 100644 --- a/internal/conf/path.go +++ b/internal/conf/path.go @@ -149,7 +149,7 @@ func (pconf *PathConf) check(conf *Conf, name string) error { _, err := url.Parse(pconf.Source) if err != nil { - return fmt.Errorf("'%s' is not a valid RTSP URL", pconf.Source) + return fmt.Errorf("'%s' is not a valid URL", pconf.Source) } case strings.HasPrefix(pconf.Source, "rtmp://") || @@ -160,7 +160,7 @@ func (pconf *PathConf) check(conf *Conf, name string) error { u, err := gourl.Parse(pconf.Source) if err != nil { - return fmt.Errorf("'%s' is not a valid RTMP URL", pconf.Source) + return fmt.Errorf("'%s' is not a valid URL", pconf.Source) } if u.User != nil { @@ -180,10 +180,10 @@ func (pconf *PathConf) check(conf *Conf, name string) error { u, err := gourl.Parse(pconf.Source) if err != nil { - return fmt.Errorf("'%s' is not a valid HLS URL", pconf.Source) + return fmt.Errorf("'%s' is not a valid URL", pconf.Source) } if u.Scheme != "http" && u.Scheme != "https" { - return fmt.Errorf("'%s' is not a valid HLS URL", pconf.Source) + return fmt.Errorf("'%s' is not a valid URL", pconf.Source) } if u.User != nil { @@ -217,7 +217,19 @@ func (pconf *PathConf) check(conf *Conf, name string) error { _, err := gourl.Parse(pconf.Source) if err != nil { - return fmt.Errorf("'%s' is not a valid HLS URL", pconf.Source) + return fmt.Errorf("'%s' is not a valid URL", pconf.Source) + } + + case strings.HasPrefix(pconf.Source, "whep://") || + strings.HasPrefix(pconf.Source, "wheps://"): + if pconf.Regexp != nil { + return fmt.Errorf("a path with a regular expression (or path 'all') " + + "cannot have a WebRTC/WHEP source. use another path") + } + + _, err := gourl.Parse(pconf.Source) + if err != nil { + return fmt.Errorf("'%s' is not a valid URL", pconf.Source) } case pconf.Source == "redirect": @@ -348,6 +360,8 @@ func (pconf PathConf) HasStaticSource() bool { strings.HasPrefix(pconf.Source, "https://") || strings.HasPrefix(pconf.Source, "udp://") || strings.HasPrefix(pconf.Source, "srt://") || + strings.HasPrefix(pconf.Source, "whep://") || + strings.HasPrefix(pconf.Source, "wheps://") || pconf.Source == "rpiCamera" } diff --git a/internal/core/core.go b/internal/core/core.go index 01c8519d..0fb7b322 100644 --- a/internal/core/core.go +++ b/internal/core/core.go @@ -420,12 +420,12 @@ func (p *Core) createResources(initial bool) error { p.conf.WebRTCICEServers2, p.conf.ReadTimeout, p.conf.ReadBufferCount, - p.pathManager, - p.metrics, - p, p.conf.WebRTCICEHostNAT1To1IPs, p.conf.WebRTCICEUDPMuxAddress, p.conf.WebRTCICETCPMuxAddress, + p.pathManager, + p.metrics, + p, ) if err != nil { return err @@ -608,11 +608,11 @@ func (p *Core) closeResources(newConf *conf.Conf, calledByAPI bool) { !reflect.DeepEqual(newConf.WebRTCICEServers2, p.conf.WebRTCICEServers2) || newConf.ReadTimeout != p.conf.ReadTimeout || newConf.ReadBufferCount != p.conf.ReadBufferCount || - closeMetrics || - closePathManager || !reflect.DeepEqual(newConf.WebRTCICEHostNAT1To1IPs, p.conf.WebRTCICEHostNAT1To1IPs) || newConf.WebRTCICEUDPMuxAddress != p.conf.WebRTCICEUDPMuxAddress || - newConf.WebRTCICETCPMuxAddress != p.conf.WebRTCICETCPMuxAddress + newConf.WebRTCICETCPMuxAddress != p.conf.WebRTCICETCPMuxAddress || + closeMetrics || + closePathManager closeSRTServer := newConf == nil || newConf.SRT != p.conf.SRT || diff --git a/internal/core/hls_manager_test.go b/internal/core/hls_manager_test.go index 75e2ad24..3b2b943c 100644 --- a/internal/core/hls_manager_test.go +++ b/internal/core/hls_manager_test.go @@ -12,80 +12,72 @@ import ( "github.com/bluenviron/gortsplib/v3" "github.com/bluenviron/gortsplib/v3/pkg/formats" "github.com/bluenviron/gortsplib/v3/pkg/media" - "github.com/gin-gonic/gin" "github.com/pion/rtp" "github.com/stretchr/testify/require" ) type testHTTPAuthenticator struct { - protocol string - action string - - s *http.Server - firstReceived bool + *http.Server } func newTestHTTPAuthenticator(t *testing.T, protocol string, action string) *testHTTPAuthenticator { + firstReceived := false + + ts := &testHTTPAuthenticator{} + + ts.Server = &http.Server{ + Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + require.Equal(t, http.MethodPost, r.Method) + require.Equal(t, "/auth", r.URL.Path) + + var in struct { + IP string `json:"ip"` + User string `json:"user"` + Password string `json:"password"` + Path string `json:"path"` + Protocol string `json:"protocol"` + ID string `json:"id"` + Action string `json:"action"` + Query string `json:"query"` + } + err := json.NewDecoder(r.Body).Decode(&in) + require.NoError(t, err) + + var user string + if action == "publish" { + user = "testpublisher" + } else { + user = "testreader" + } + + if in.IP != "127.0.0.1" || + in.User != user || + in.Password != "testpass" || + in.Path != "teststream" || + in.Protocol != protocol || + (firstReceived && in.ID == "") || + in.Action != action || + (in.Query != "user=testreader&pass=testpass¶m=value" && + in.Query != "user=testpublisher&pass=testpass¶m=value" && + in.Query != "param=value") { + w.WriteHeader(http.StatusBadRequest) + return + } + + firstReceived = true + }), + } + ln, err := net.Listen("tcp", "127.0.0.1:9120") require.NoError(t, err) - ts := &testHTTPAuthenticator{ - protocol: protocol, - action: action, - } - - router := gin.New() - router.POST("/auth", ts.onAuth) - - ts.s = &http.Server{Handler: router} - go ts.s.Serve(ln) + go ts.Server.Serve(ln) return ts } func (ts *testHTTPAuthenticator) close() { - ts.s.Shutdown(context.Background()) -} - -func (ts *testHTTPAuthenticator) onAuth(ctx *gin.Context) { - var in struct { - IP string `json:"ip"` - User string `json:"user"` - Password string `json:"password"` - Path string `json:"path"` - Protocol string `json:"protocol"` - ID string `json:"id"` - Action string `json:"action"` - Query string `json:"query"` - } - err := json.NewDecoder(ctx.Request.Body).Decode(&in) - if err != nil { - ctx.AbortWithStatus(http.StatusBadRequest) - return - } - - var user string - if ts.action == "publish" { - user = "testpublisher" - } else { - user = "testreader" - } - - if in.IP != "127.0.0.1" || - in.User != user || - in.Password != "testpass" || - in.Path != "teststream" || - in.Protocol != ts.protocol || - (ts.firstReceived && in.ID == "") || - in.Action != ts.action || - (in.Query != "user=testreader&pass=testpass¶m=value" && - in.Query != "user=testpublisher&pass=testpass¶m=value" && - in.Query != "param=value") { - ctx.AbortWithStatus(http.StatusBadRequest) - return - } - - ts.firstReceived = true + ts.Server.Shutdown(context.Background()) } func httpPullFile(t *testing.T, hc *http.Client, u string) []byte { diff --git a/internal/core/hls_source.go b/internal/core/hls_source.go index 9aafbca3..020d3da1 100644 --- a/internal/core/hls_source.go +++ b/internal/core/hls_source.go @@ -18,8 +18,8 @@ import ( type hlsSourceParent interface { logger.Writer - sourceStaticImplSetReady(req pathSourceStaticSetReadyReq) pathSourceStaticSetReadyRes - sourceStaticImplSetNotReady(req pathSourceStaticSetNotReadyReq) + setReady(req pathSourceStaticSetReadyReq) pathSourceStaticSetReadyRes + setNotReady(req pathSourceStaticSetNotReadyReq) } type hlsSource struct { @@ -35,7 +35,7 @@ func newHLSSource( } func (s *hlsSource) Log(level logger.Level, format string, args ...interface{}) { - s.parent.Log(level, "[hls source] "+format, args...) + s.parent.Log(level, "[HLS source] "+format, args...) } // run implements sourceStaticImpl. @@ -44,7 +44,7 @@ func (s *hlsSource) run(ctx context.Context, cnf *conf.PathConf, reloadConf chan defer func() { if stream != nil { - s.parent.sourceStaticImplSetNotReady(pathSourceStaticSetNotReadyReq{}) + s.parent.setNotReady(pathSourceStaticSetNotReadyReq{}) } }() @@ -163,7 +163,7 @@ func (s *hlsSource) run(ctx context.Context, cnf *conf.PathConf, reloadConf chan medias = append(medias, medi) } - res := s.parent.sourceStaticImplSetReady(pathSourceStaticSetReadyReq{ + res := s.parent.setReady(pathSourceStaticSetReadyReq{ medias: medias, generateRTPPackets: true, }) @@ -171,7 +171,6 @@ func (s *hlsSource) run(ctx context.Context, cnf *conf.PathConf, reloadConf chan return res.err } - s.Log(logger.Info, "ready: %s", sourceMediaInfo(medias)) stream = res.stream return nil diff --git a/internal/core/hls_source_test.go b/internal/core/hls_source_test.go index 3f254f4b..6ca98500 100644 --- a/internal/core/hls_source_test.go +++ b/internal/core/hls_source_test.go @@ -171,10 +171,13 @@ func TestHLSSource(t *testing.T) { }, }, medias) - err = c.SetupAll(medias, baseURL) + var forma *formats.H264 + medi := medias.FindFormat(&forma) + + _, err = c.Setup(medi, baseURL, 0, 0) require.NoError(t, err) - c.OnPacketRTP(medias[0], medias[0].Formats[0], func(pkt *rtp.Packet) { + c.OnPacketRTP(medi, forma, func(pkt *rtp.Packet) { require.Equal(t, &rtp.Packet{ Header: rtp.Header{ Version: 2, diff --git a/internal/core/path.go b/internal/core/path.go index ee2650c5..d7dc158a 100644 --- a/internal/core/path.go +++ b/internal/core/path.go @@ -331,154 +331,7 @@ func (pa *path) run() { }) } - err := func() error { - for { - select { - case <-pa.onDemandStaticSourceReadyTimer.C: - for _, req := range pa.describeRequestsOnHold { - req.res <- pathDescribeRes{err: fmt.Errorf("source of path '%s' has timed out", pa.name)} - } - pa.describeRequestsOnHold = nil - - for _, req := range pa.readerAddRequestsOnHold { - req.res <- pathAddReaderRes{err: fmt.Errorf("source of path '%s' has timed out", pa.name)} - } - pa.readerAddRequestsOnHold = nil - - pa.onDemandStaticSourceStop() - - if pa.shouldClose() { - return fmt.Errorf("not in use") - } - - case <-pa.onDemandStaticSourceCloseTimer.C: - pa.setNotReady() - pa.onDemandStaticSourceStop() - - if pa.shouldClose() { - return fmt.Errorf("not in use") - } - - case <-pa.onDemandPublisherReadyTimer.C: - for _, req := range pa.describeRequestsOnHold { - req.res <- pathDescribeRes{err: fmt.Errorf("source of path '%s' has timed out", pa.name)} - } - pa.describeRequestsOnHold = nil - - for _, req := range pa.readerAddRequestsOnHold { - req.res <- pathAddReaderRes{err: fmt.Errorf("source of path '%s' has timed out", pa.name)} - } - pa.readerAddRequestsOnHold = nil - - pa.onDemandStopPublisher() - - if pa.shouldClose() { - return fmt.Errorf("not in use") - } - - case <-pa.onDemandPublisherCloseTimer.C: - pa.onDemandStopPublisher() - - if pa.shouldClose() { - return fmt.Errorf("not in use") - } - - case newConf := <-pa.chReloadConf: - if pa.conf.HasStaticSource() { - go pa.source.(*sourceStatic).reloadConf(newConf) - } - - pa.confMutex.Lock() - pa.conf = newConf - pa.confMutex.Unlock() - - case req := <-pa.chSourceStaticSetReady: - err := pa.setReady(req.medias, req.generateRTPPackets) - if err != nil { - req.res <- pathSourceStaticSetReadyRes{err: err} - } else { - if pa.conf.HasOnDemandStaticSource() { - pa.onDemandStaticSourceReadyTimer.Stop() - pa.onDemandStaticSourceReadyTimer = newEmptyTimer() - - pa.onDemandStaticSourceScheduleClose() - - for _, req := range pa.describeRequestsOnHold { - req.res <- pathDescribeRes{ - stream: pa.stream, - } - } - pa.describeRequestsOnHold = nil - - for _, req := range pa.readerAddRequestsOnHold { - pa.handleAddReaderPost(req) - } - pa.readerAddRequestsOnHold = nil - } - - req.res <- pathSourceStaticSetReadyRes{stream: pa.stream} - } - - case req := <-pa.chSourceStaticSetNotReady: - pa.setNotReady() - - // send response before calling onDemandStaticSourceStop() - // in order to avoid a deadlock due to sourceStatic.stop() - close(req.res) - - if pa.conf.HasOnDemandStaticSource() && pa.onDemandStaticSourceState != pathOnDemandStateInitial { - pa.onDemandStaticSourceStop() - } - - if pa.shouldClose() { - return fmt.Errorf("not in use") - } - - case req := <-pa.chDescribe: - pa.handleDescribe(req) - - if pa.shouldClose() { - return fmt.Errorf("not in use") - } - - case req := <-pa.chRemovePublisher: - pa.handleRemovePublisher(req) - - if pa.shouldClose() { - return fmt.Errorf("not in use") - } - - case req := <-pa.chAddPublisher: - pa.handleAddPublisher(req) - - case req := <-pa.chStartPublisher: - pa.handleStartPublisher(req) - - case req := <-pa.chStopPublisher: - pa.handleStopPublisher(req) - - if pa.shouldClose() { - return fmt.Errorf("not in use") - } - - case req := <-pa.chAddReader: - pa.handleAddReader(req) - - if pa.shouldClose() { - return fmt.Errorf("not in use") - } - - case req := <-pa.chRemoveReader: - pa.handleRemoveReader(req) - - case req := <-pa.chAPIPathsGet: - pa.handleAPIPathsGet(req) - - case <-pa.ctx.Done(): - return fmt.Errorf("terminated") - } - } - }() + err := pa.runInner() // call before destroying context pa.parent.closePath(pa) @@ -523,6 +376,155 @@ func (pa *path) run() { pa.Log(logger.Debug, "destroyed (%v)", err) } +func (pa *path) runInner() error { + for { + select { + case <-pa.onDemandStaticSourceReadyTimer.C: + for _, req := range pa.describeRequestsOnHold { + req.res <- pathDescribeRes{err: fmt.Errorf("source of path '%s' has timed out", pa.name)} + } + pa.describeRequestsOnHold = nil + + for _, req := range pa.readerAddRequestsOnHold { + req.res <- pathAddReaderRes{err: fmt.Errorf("source of path '%s' has timed out", pa.name)} + } + pa.readerAddRequestsOnHold = nil + + pa.onDemandStaticSourceStop() + + if pa.shouldClose() { + return fmt.Errorf("not in use") + } + + case <-pa.onDemandStaticSourceCloseTimer.C: + pa.setNotReady() + pa.onDemandStaticSourceStop() + + if pa.shouldClose() { + return fmt.Errorf("not in use") + } + + case <-pa.onDemandPublisherReadyTimer.C: + for _, req := range pa.describeRequestsOnHold { + req.res <- pathDescribeRes{err: fmt.Errorf("source of path '%s' has timed out", pa.name)} + } + pa.describeRequestsOnHold = nil + + for _, req := range pa.readerAddRequestsOnHold { + req.res <- pathAddReaderRes{err: fmt.Errorf("source of path '%s' has timed out", pa.name)} + } + pa.readerAddRequestsOnHold = nil + + pa.onDemandStopPublisher() + + if pa.shouldClose() { + return fmt.Errorf("not in use") + } + + case <-pa.onDemandPublisherCloseTimer.C: + pa.onDemandStopPublisher() + + if pa.shouldClose() { + return fmt.Errorf("not in use") + } + + case newConf := <-pa.chReloadConf: + if pa.conf.HasStaticSource() { + go pa.source.(*sourceStatic).reloadConf(newConf) + } + + pa.confMutex.Lock() + pa.conf = newConf + pa.confMutex.Unlock() + + case req := <-pa.chSourceStaticSetReady: + err := pa.setReady(req.medias, req.generateRTPPackets) + if err != nil { + req.res <- pathSourceStaticSetReadyRes{err: err} + } else { + if pa.conf.HasOnDemandStaticSource() { + pa.onDemandStaticSourceReadyTimer.Stop() + pa.onDemandStaticSourceReadyTimer = newEmptyTimer() + + pa.onDemandStaticSourceScheduleClose() + + for _, req := range pa.describeRequestsOnHold { + req.res <- pathDescribeRes{ + stream: pa.stream, + } + } + pa.describeRequestsOnHold = nil + + for _, req := range pa.readerAddRequestsOnHold { + pa.handleAddReaderPost(req) + } + pa.readerAddRequestsOnHold = nil + } + + req.res <- pathSourceStaticSetReadyRes{stream: pa.stream} + } + + case req := <-pa.chSourceStaticSetNotReady: + pa.setNotReady() + + // send response before calling onDemandStaticSourceStop() + // in order to avoid a deadlock due to sourceStatic.stop() + close(req.res) + + if pa.conf.HasOnDemandStaticSource() && pa.onDemandStaticSourceState != pathOnDemandStateInitial { + pa.onDemandStaticSourceStop() + } + + if pa.shouldClose() { + return fmt.Errorf("not in use") + } + + case req := <-pa.chDescribe: + pa.handleDescribe(req) + + if pa.shouldClose() { + return fmt.Errorf("not in use") + } + + case req := <-pa.chRemovePublisher: + pa.handleRemovePublisher(req) + + if pa.shouldClose() { + return fmt.Errorf("not in use") + } + + case req := <-pa.chAddPublisher: + pa.handleAddPublisher(req) + + case req := <-pa.chStartPublisher: + pa.handleStartPublisher(req) + + case req := <-pa.chStopPublisher: + pa.handleStopPublisher(req) + + if pa.shouldClose() { + return fmt.Errorf("not in use") + } + + case req := <-pa.chAddReader: + pa.handleAddReader(req) + + if pa.shouldClose() { + return fmt.Errorf("not in use") + } + + case req := <-pa.chRemoveReader: + pa.handleRemoveReader(req) + + case req := <-pa.chAPIPathsGet: + pa.handleAPIPathsGet(req) + + case <-pa.ctx.Done(): + return fmt.Errorf("terminated") + } + } +} + func (pa *path) shouldClose() bool { return pa.conf.Regexp != nil && pa.source == nil && diff --git a/internal/core/rpicamera_source.go b/internal/core/rpicamera_source.go index 83bc2541..aff203f0 100644 --- a/internal/core/rpicamera_source.go +++ b/internal/core/rpicamera_source.go @@ -53,8 +53,8 @@ func paramsFromConf(cnf *conf.PathConf) rpicamera.Params { type rpiCameraSourceParent interface { logger.Writer - sourceStaticImplSetReady(req pathSourceStaticSetReadyReq) pathSourceStaticSetReadyRes - sourceStaticImplSetNotReady(req pathSourceStaticSetNotReadyReq) + setReady(req pathSourceStaticSetReadyReq) pathSourceStaticSetReadyRes + setNotReady(req pathSourceStaticSetNotReadyReq) } type rpiCameraSource struct { @@ -70,7 +70,7 @@ func newRPICameraSource( } func (s *rpiCameraSource) Log(level logger.Level, format string, args ...interface{}) { - s.parent.Log(level, "[rpicamera source] "+format, args...) + s.parent.Log(level, "[RPI Camera source] "+format, args...) } // run implements sourceStaticImpl. @@ -87,7 +87,7 @@ func (s *rpiCameraSource) run(ctx context.Context, cnf *conf.PathConf, reloadCon onData := func(dts time.Duration, au [][]byte) { if stream == nil { - res := s.parent.sourceStaticImplSetReady(pathSourceStaticSetReadyReq{ + res := s.parent.setReady(pathSourceStaticSetReadyReq{ medias: medias, generateRTPPackets: true, }) @@ -95,7 +95,6 @@ func (s *rpiCameraSource) run(ctx context.Context, cnf *conf.PathConf, reloadCon return } - s.Log(logger.Info, "ready: %s", sourceMediaInfo(medias)) stream = res.stream } @@ -116,7 +115,7 @@ func (s *rpiCameraSource) run(ctx context.Context, cnf *conf.PathConf, reloadCon defer func() { if stream != nil { - s.parent.sourceStaticImplSetNotReady(pathSourceStaticSetNotReadyReq{}) + s.parent.setNotReady(pathSourceStaticSetNotReadyReq{}) } }() diff --git a/internal/core/rtmp_source.go b/internal/core/rtmp_source.go index 93aead5f..ba6283fa 100644 --- a/internal/core/rtmp_source.go +++ b/internal/core/rtmp_source.go @@ -20,8 +20,8 @@ import ( type rtmpSourceParent interface { logger.Writer - sourceStaticImplSetReady(req pathSourceStaticSetReadyReq) pathSourceStaticSetReadyRes - sourceStaticImplSetNotReady(req pathSourceStaticSetNotReadyReq) + setReady(req pathSourceStaticSetReadyReq) pathSourceStaticSetReadyRes + setNotReady(req pathSourceStaticSetNotReadyReq) } type rtmpSource struct { @@ -43,7 +43,7 @@ func newRTMPSource( } func (s *rtmpSource) Log(level logger.Level, format string, args ...interface{}) { - s.parent.Log(level, "[rtmp source] "+format, args...) + s.parent.Log(level, "[RTMP source] "+format, args...) } // run implements sourceStaticImpl. @@ -173,7 +173,7 @@ func (s *rtmpSource) runReader(u *url.URL, nconn net.Conn) error { } } - res := s.parent.sourceStaticImplSetReady(pathSourceStaticSetReadyReq{ + res := s.parent.setReady(pathSourceStaticSetReadyReq{ medias: medias, generateRTPPackets: true, }) @@ -181,9 +181,7 @@ func (s *rtmpSource) runReader(u *url.URL, nconn net.Conn) error { return res.err } - defer s.parent.sourceStaticImplSetNotReady(pathSourceStaticSetNotReadyReq{}) - - s.Log(logger.Info, "ready: %s", sourceMediaInfo(medias)) + defer s.parent.setNotReady(pathSourceStaticSetNotReadyReq{}) stream = res.stream diff --git a/internal/core/rtmp_source_test.go b/internal/core/rtmp_source_test.go index 49d19872..561319e5 100644 --- a/internal/core/rtmp_source_test.go +++ b/internal/core/rtmp_source_test.go @@ -119,10 +119,13 @@ func TestRTMPSource(t *testing.T) { medias, baseURL, _, err := c.Describe(u) require.NoError(t, err) - err = c.SetupAll(medias, baseURL) + var forma *formats.H264 + medi := medias.FindFormat(&forma) + + _, err = c.Setup(medi, baseURL, 0, 0) require.NoError(t, err) - c.OnPacketRTP(medias[0], medias[0].Formats[0], func(pkt *rtp.Packet) { + c.OnPacketRTP(medi, forma, func(pkt *rtp.Packet) { require.Equal(t, []byte{ 0x18, 0x0, 0x19, 0x67, 0x42, 0xc0, 0x28, 0xd9, 0x0, 0x78, 0x2, 0x27, 0xe5, 0x84, 0x0, 0x0, diff --git a/internal/core/rtsp_source.go b/internal/core/rtsp_source.go index 40569cd0..c2bce2fa 100644 --- a/internal/core/rtsp_source.go +++ b/internal/core/rtsp_source.go @@ -61,8 +61,8 @@ func createRangeHeader(cnf *conf.PathConf) (*headers.Range, error) { type rtspSourceParent interface { logger.Writer - sourceStaticImplSetReady(req pathSourceStaticSetReadyReq) pathSourceStaticSetReadyRes - sourceStaticImplSetNotReady(req pathSourceStaticSetNotReadyReq) + setReady(req pathSourceStaticSetReadyReq) pathSourceStaticSetReadyRes + setNotReady(req pathSourceStaticSetNotReadyReq) } type rtspSource struct { @@ -87,7 +87,7 @@ func newRTSPSource( } func (s *rtspSource) Log(level logger.Level, format string, args ...interface{}) { - s.parent.Log(level, "[rtsp source] "+format, args...) + s.parent.Log(level, "[RTSP source] "+format, args...) } // run implements sourceStaticImpl. @@ -142,7 +142,7 @@ func (s *rtspSource) run(ctx context.Context, cnf *conf.PathConf, reloadConf cha return err } - res := s.parent.sourceStaticImplSetReady(pathSourceStaticSetReadyReq{ + res := s.parent.setReady(pathSourceStaticSetReadyReq{ medias: medias, generateRTPPackets: false, }) @@ -150,9 +150,7 @@ func (s *rtspSource) run(ctx context.Context, cnf *conf.PathConf, reloadConf cha return res.err } - s.Log(logger.Info, "ready: %s", sourceMediaInfo(medias)) - - defer s.parent.sourceStaticImplSetNotReady(pathSourceStaticSetNotReadyReq{}) + defer s.parent.setNotReady(pathSourceStaticSetNotReadyReq{}) for _, medi := range medias { for _, forma := range medi.Formats { diff --git a/internal/core/rtsp_source_test.go b/internal/core/rtsp_source_test.go index fe0f5203..fc7f0c3d 100644 --- a/internal/core/rtsp_source_test.go +++ b/internal/core/rtsp_source_test.go @@ -9,6 +9,7 @@ import ( "github.com/bluenviron/gortsplib/v3" "github.com/bluenviron/gortsplib/v3/pkg/auth" "github.com/bluenviron/gortsplib/v3/pkg/base" + "github.com/bluenviron/gortsplib/v3/pkg/formats" "github.com/bluenviron/gortsplib/v3/pkg/media" "github.com/bluenviron/gortsplib/v3/pkg/url" "github.com/pion/rtp" @@ -41,8 +42,8 @@ func TestRTSPSource(t *testing.T) { "tls", } { t.Run(source, func(t *testing.T) { - medi := testMediaH264 - stream := gortsplib.NewServerStream(media.Medias{medi}) + serverMedia := testMediaH264 + stream := gortsplib.NewServerStream(media.Medias{serverMedia}) nonce, err := auth.GenerateNonce2() require.NoError(t, err) @@ -73,7 +74,7 @@ func TestRTSPSource(t *testing.T) { onPlay: func(ctx *gortsplib.ServerHandlerOnPlayCtx) (*base.Response, error) { go func() { time.Sleep(1 * time.Second) - stream.WritePacketRTP(medi, &rtp.Packet{ + stream.WritePacketRTP(serverMedia, &rtp.Packet{ Header: rtp.Header{ Version: 0x02, PayloadType: 96, @@ -151,10 +152,13 @@ func TestRTSPSource(t *testing.T) { medias, baseURL, _, err := c.Describe(u) require.NoError(t, err) - err = c.SetupAll(medias, baseURL) + var forma *formats.H264 + medi := medias.FindFormat(&forma) + + _, err = c.Setup(medi, baseURL, 0, 0) require.NoError(t, err) - c.OnPacketRTP(medias[0], medias[0].Formats[0], func(pkt *rtp.Packet) { + c.OnPacketRTP(medi, forma, func(pkt *rtp.Packet) { require.Equal(t, []byte{0x01, 0x02, 0x03, 0x04}, pkt.Payload) close(received) }) diff --git a/internal/core/source_static.go b/internal/core/source_static.go index b5d45b94..8ce92af4 100644 --- a/internal/core/source_static.go +++ b/internal/core/source_static.go @@ -91,6 +91,12 @@ func newSourceStatic( readTimeout, s) + case strings.HasPrefix(cnf.Source, "whep://") || + strings.HasPrefix(cnf.Source, "wheps://"): + s.impl = newWebRTCSource( + readTimeout, + s) + case cnf.Source == "rpiCamera": s.impl = newRPICameraSource( s) @@ -210,19 +216,26 @@ func (s *sourceStatic) apiSourceDescribe() pathAPISourceOrReader { return s.impl.apiSourceDescribe() } -// sourceStaticImplSetReady is called by a sourceStaticImpl. -func (s *sourceStatic) sourceStaticImplSetReady(req pathSourceStaticSetReadyReq) pathSourceStaticSetReadyRes { +// setReady is called by a sourceStaticImpl. +func (s *sourceStatic) setReady(req pathSourceStaticSetReadyReq) pathSourceStaticSetReadyRes { req.res = make(chan pathSourceStaticSetReadyRes) select { case s.chSourceStaticImplSetReady <- req: - return <-req.res + res := <-req.res + + if res.err == nil { + s.impl.Log(logger.Info, "ready: %s", sourceMediaInfo(req.medias)) + } + + return res + case <-s.ctx.Done(): return pathSourceStaticSetReadyRes{err: fmt.Errorf("terminated")} } } -// sourceStaticImplSetNotReady is called by a sourceStaticImpl. -func (s *sourceStatic) sourceStaticImplSetNotReady(req pathSourceStaticSetNotReadyReq) { +// setNotReady is called by a sourceStaticImpl. +func (s *sourceStatic) setNotReady(req pathSourceStaticSetNotReadyReq) { req.res = make(chan struct{}) select { case s.chSourceStaticImplSetNotReady <- req: diff --git a/internal/core/srt_source.go b/internal/core/srt_source.go index 18d398bc..c926b55b 100644 --- a/internal/core/srt_source.go +++ b/internal/core/srt_source.go @@ -17,8 +17,8 @@ import ( type srtSourceParent interface { logger.Writer - sourceStaticImplSetReady(req pathSourceStaticSetReadyReq) pathSourceStaticSetReadyRes - sourceStaticImplSetNotReady(req pathSourceStaticSetNotReadyReq) + setReady(req pathSourceStaticSetReadyReq) pathSourceStaticSetReadyRes + setNotReady(req pathSourceStaticSetNotReadyReq) } type srtSource struct { @@ -39,7 +39,7 @@ func newSRTSource( } func (s *srtSource) Log(level logger.Level, format string, args ...interface{}) { - s.parent.Log(level, "[srt source] "+format, args...) + s.parent.Log(level, "[SRT source] "+format, args...) } // run implements sourceStaticImpl. @@ -191,7 +191,7 @@ func (s *srtSource) runReader(sconn srt.Conn) error { medias = append(medias, medi) } - res := s.parent.sourceStaticImplSetReady(pathSourceStaticSetReadyReq{ + res := s.parent.setReady(pathSourceStaticSetReadyReq{ medias: medias, generateRTPPackets: true, }) @@ -199,8 +199,6 @@ func (s *srtSource) runReader(sconn srt.Conn) error { return res.err } - s.Log(logger.Info, "ready: %s", sourceMediaInfo(medias)) - stream = res.stream for { diff --git a/internal/core/srt_source_test.go b/internal/core/srt_source_test.go index b85ec42b..ce48cfb8 100644 --- a/internal/core/srt_source_test.go +++ b/internal/core/srt_source_test.go @@ -5,6 +5,7 @@ import ( "testing" "github.com/bluenviron/gortsplib/v3" + "github.com/bluenviron/gortsplib/v3/pkg/formats" "github.com/bluenviron/gortsplib/v3/pkg/url" "github.com/bluenviron/mediacommon/pkg/formats/mpegts" "github.com/datarhei/gosrt" @@ -81,10 +82,13 @@ func TestSRTSource(t *testing.T) { medias, baseURL, _, err := c.Describe(u) require.NoError(t, err) - err = c.SetupAll(medias, baseURL) + var forma *formats.H264 + medi := medias.FindFormat(&forma) + + _, err = c.Setup(medi, baseURL, 0, 0) require.NoError(t, err) - c.OnPacketRTP(medias[0], medias[0].Formats[0], func(pkt *rtp.Packet) { + c.OnPacketRTP(medi, forma, func(pkt *rtp.Packet) { require.Equal(t, []byte{5, 1}, pkt.Payload) close(received) }) diff --git a/internal/core/udp_source.go b/internal/core/udp_source.go index b28ff112..45bdfdce 100644 --- a/internal/core/udp_source.go +++ b/internal/core/udp_source.go @@ -63,8 +63,8 @@ func (r *packetConnReader) Read(p []byte) (int, error) { type udpSourceParent interface { logger.Writer - sourceStaticImplSetReady(req pathSourceStaticSetReadyReq) pathSourceStaticSetReadyRes - sourceStaticImplSetNotReady(req pathSourceStaticSetNotReadyReq) + setReady(req pathSourceStaticSetReadyReq) pathSourceStaticSetReadyRes + setNotReady(req pathSourceStaticSetNotReadyReq) } type udpSource struct { @@ -83,7 +83,7 @@ func newUDPSource( } func (s *udpSource) Log(level logger.Level, format string, args ...interface{}) { - s.parent.Log(level, "[udp source] "+format, args...) + s.parent.Log(level, "[UDP source] "+format, args...) } // run implements sourceStaticImpl. @@ -239,7 +239,7 @@ func (s *udpSource) runReader(pc net.PacketConn) error { medias = append(medias, medi) } - res := s.parent.sourceStaticImplSetReady(pathSourceStaticSetReadyReq{ + res := s.parent.setReady(pathSourceStaticSetReadyReq{ medias: medias, generateRTPPackets: true, }) @@ -247,9 +247,7 @@ func (s *udpSource) runReader(pc net.PacketConn) error { return res.err } - defer s.parent.sourceStaticImplSetNotReady(pathSourceStaticSetNotReadyReq{}) - - s.Log(logger.Info, "ready: %s", sourceMediaInfo(medias)) + defer s.parent.setNotReady(pathSourceStaticSetNotReadyReq{}) stream = res.stream diff --git a/internal/core/webrtc_http_server.go b/internal/core/webrtc_http_server.go index 28dbadf7..ad331ecf 100644 --- a/internal/core/webrtc_http_server.go +++ b/internal/core/webrtc_http_server.go @@ -2,24 +2,21 @@ package core import ( _ "embed" - "encoding/json" "fmt" "io" "net" "net/http" - "regexp" - "strconv" "strings" "time" "github.com/gin-gonic/gin" "github.com/google/uuid" - "github.com/pion/sdp/v3" "github.com/pion/webrtc/v3" "github.com/bluenviron/mediamtx/internal/conf" "github.com/bluenviron/mediamtx/internal/httpserv" "github.com/bluenviron/mediamtx/internal/logger" + "github.com/bluenviron/mediamtx/internal/whip" ) //go:embed webrtc_publish_index.html @@ -28,137 +25,6 @@ var webrtcPublishIndex []byte //go:embed webrtc_read_index.html var webrtcReadIndex []byte -func quoteCredential(v string) string { - b, _ := json.Marshal(v) - s := string(b) - return s[1 : len(s)-1] -} - -func unquoteCredential(v string) string { - var s string - json.Unmarshal([]byte("\""+v+"\""), &s) - return s -} - -func iceServersToLinkHeader(iceServers []webrtc.ICEServer) []string { - ret := make([]string, len(iceServers)) - - for i, server := range iceServers { - link := "<" + server.URLs[0] + ">; rel=\"ice-server\"" - if server.Username != "" { - link += "; username=\"" + quoteCredential(server.Username) + "\"" + - "; credential=\"" + quoteCredential(server.Credential.(string)) + "\"; credential-type=\"password\"" - } - ret[i] = link - } - - return ret -} - -var reLink = regexp.MustCompile(`^<(.+?)>; rel="ice-server"(; username="(.+?)"` + - `; credential="(.+?)"; credential-type="password")?`) - -func linkHeaderToIceServers(link []string) []webrtc.ICEServer { - var ret []webrtc.ICEServer - - for _, li := range link { - m := reLink.FindStringSubmatch(li) - if m != nil { - s := webrtc.ICEServer{ - URLs: []string{m[1]}, - } - - if m[3] != "" { - s.Username = unquoteCredential(m[3]) - s.Credential = unquoteCredential(m[4]) - s.CredentialType = webrtc.ICECredentialTypePassword - } - - ret = append(ret, s) - } - } - - return ret -} - -func unmarshalICEFragment(buf []byte) ([]*webrtc.ICECandidateInit, error) { - buf = append([]byte("v=0\r\no=- 0 0 IN IP4 0.0.0.0\r\ns=-\r\nt=0 0\r\n"), buf...) - - var sdp sdp.SessionDescription - err := sdp.Unmarshal(buf) - if err != nil { - return nil, err - } - - usernameFragment, ok := sdp.Attribute("ice-ufrag") - if !ok { - return nil, fmt.Errorf("ice-ufrag attribute is missing") - } - - var ret []*webrtc.ICECandidateInit - - for _, media := range sdp.MediaDescriptions { - mid, ok := media.Attribute("mid") - if !ok { - return nil, fmt.Errorf("mid attribute is missing") - } - - tmp, err := strconv.ParseUint(mid, 10, 16) - if err != nil { - return nil, fmt.Errorf("invalid mid attribute") - } - midNum := uint16(tmp) - - for _, attr := range media.Attributes { - if attr.Key == "candidate" { - ret = append(ret, &webrtc.ICECandidateInit{ - Candidate: attr.Value, - SDPMid: &mid, - SDPMLineIndex: &midNum, - UsernameFragment: &usernameFragment, - }) - } - } - } - - return ret, nil -} - -func marshalICEFragment(offer *webrtc.SessionDescription, candidates []*webrtc.ICECandidateInit) ([]byte, error) { - var sdp sdp.SessionDescription - err := sdp.Unmarshal([]byte(offer.SDP)) - if err != nil || len(sdp.MediaDescriptions) == 0 { - return nil, err - } - - firstMedia := sdp.MediaDescriptions[0] - iceUfrag, _ := firstMedia.Attribute("ice-ufrag") - icePwd, _ := firstMedia.Attribute("ice-pwd") - - candidatesByMedia := make(map[uint16][]*webrtc.ICECandidateInit) - for _, candidate := range candidates { - mid := *candidate.SDPMLineIndex - candidatesByMedia[mid] = append(candidatesByMedia[mid], candidate) - } - - frag := "a=ice-ufrag:" + iceUfrag + "\r\n" + - "a=ice-pwd:" + icePwd + "\r\n" - - for mid, media := range sdp.MediaDescriptions { - cbm, ok := candidatesByMedia[uint16(mid)] - if ok { - frag += "m=" + media.MediaName.String() + "\r\n" + - "a=mid:" + strconv.FormatUint(uint64(mid), 10) + "\r\n" - - for _, candidate := range cbm { - frag += "a=" + candidate.Candidate + "\r\n" - } - } - } - - return []byte(frag), nil -} - type webRTCHTTPServerParent interface { logger.Writer generateICEServers() ([]webrtc.ICEServer, error) @@ -358,7 +224,7 @@ func (s *webRTCHTTPServer) onRequest(ctx *gin.Context) { ctx.Writer.Header().Set("Access-Control-Allow-Methods", "OPTIONS, GET, POST, PATCH") ctx.Writer.Header().Set("Access-Control-Allow-Headers", "Authorization, Content-Type, If-Match") - ctx.Writer.Header()["Link"] = iceServersToLinkHeader(servers) + ctx.Writer.Header()["Link"] = whip.LinkHeaderMarshal(servers) ctx.Writer.WriteHeader(http.StatusNoContent) case http.MethodPost: @@ -397,7 +263,7 @@ func (s *webRTCHTTPServer) onRequest(ctx *gin.Context) { ctx.Writer.Header().Set("E-Tag", res.sx.secret.String()) ctx.Writer.Header().Set("ID", res.sx.uuid.String()) ctx.Writer.Header().Set("Accept-Patch", "application/trickle-ice-sdpfrag") - ctx.Writer.Header()["Link"] = iceServersToLinkHeader(servers) + ctx.Writer.Header()["Link"] = whip.LinkHeaderMarshal(servers) ctx.Writer.Header().Set("Location", ctx.Request.URL.String()) ctx.Writer.WriteHeader(http.StatusCreated) ctx.Writer.Write(res.answer) @@ -419,7 +285,7 @@ func (s *webRTCHTTPServer) onRequest(ctx *gin.Context) { return } - candidates, err := unmarshalICEFragment(byts) + candidates, err := whip.ICEFragmentUnmarshal(byts) if err != nil { ctx.Writer.WriteHeader(http.StatusBadRequest) return diff --git a/internal/core/webrtc_manager.go b/internal/core/webrtc_manager.go index b57c684e..7ebcc112 100644 --- a/internal/core/webrtc_manager.go +++ b/internal/core/webrtc_manager.go @@ -16,6 +16,7 @@ import ( "github.com/google/uuid" "github.com/pion/ice/v2" + "github.com/pion/interceptor" "github.com/pion/webrtc/v3" "github.com/bluenviron/mediamtx/internal/conf" @@ -25,12 +26,94 @@ import ( const ( webrtcPauseAfterAuthError = 2 * time.Second webrtcHandshakeTimeout = 10 * time.Second - webrtcTrackGatherTimeout = 5 * time.Second + webrtcTrackGatherTimeout = 3 * time.Second webrtcPayloadMaxSize = 1188 // 1200 - 12 (RTP header) webrtcStreamID = "mediamtx" webrtcTurnSecretExpiration = 24 * 3600 * time.Second ) +var videoCodecs = []webrtc.RTPCodecParameters{ + { + RTPCodecCapability: webrtc.RTPCodecCapability{ + MimeType: webrtc.MimeTypeAV1, + ClockRate: 90000, + }, + PayloadType: 96, + }, + { + RTPCodecCapability: webrtc.RTPCodecCapability{ + MimeType: webrtc.MimeTypeVP9, + ClockRate: 90000, + SDPFmtpLine: "profile-id=0", + }, + PayloadType: 97, + }, + { + RTPCodecCapability: webrtc.RTPCodecCapability{ + MimeType: webrtc.MimeTypeVP9, + ClockRate: 90000, + SDPFmtpLine: "profile-id=1", + }, + PayloadType: 98, + }, + { + RTPCodecCapability: webrtc.RTPCodecCapability{ + MimeType: webrtc.MimeTypeVP8, + ClockRate: 90000, + }, + PayloadType: 99, + }, + { + RTPCodecCapability: webrtc.RTPCodecCapability{ + MimeType: webrtc.MimeTypeH264, + ClockRate: 90000, + SDPFmtpLine: "level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=42001f", + }, + PayloadType: 100, + }, + { + RTPCodecCapability: webrtc.RTPCodecCapability{ + MimeType: webrtc.MimeTypeH264, + ClockRate: 90000, + SDPFmtpLine: "level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=42e01f", + }, + PayloadType: 101, + }, +} + +var audioCodecs = []webrtc.RTPCodecParameters{ + { + RTPCodecCapability: webrtc.RTPCodecCapability{ + MimeType: webrtc.MimeTypeOpus, + ClockRate: 48000, + Channels: 2, + SDPFmtpLine: "minptime=10;useinbandfec=1", + }, + PayloadType: 111, + }, + { + RTPCodecCapability: webrtc.RTPCodecCapability{ + MimeType: webrtc.MimeTypeG722, + ClockRate: 8000, + }, + PayloadType: 9, + }, + { + RTPCodecCapability: webrtc.RTPCodecCapability{ + MimeType: webrtc.MimeTypePCMU, + ClockRate: 8000, + }, + PayloadType: 0, + }, + { + RTPCodecCapability: webrtc.RTPCodecCapability{ + MimeType: webrtc.MimeTypePCMA, + ClockRate: 8000, + }, + PayloadType: 8, + }, +} + func randInt63() (int64, error) { var b [8]byte _, err := rand.Read(b[:]) @@ -84,6 +167,53 @@ func randomTurnUser() (string, error) { return string(b), nil } +func webrtcNewAPI( + iceHostNAT1To1IPs []string, + iceUDPMux ice.UDPMux, + iceTCPMux ice.TCPMux, +) (*webrtc.API, error) { + settingsEngine := webrtc.SettingEngine{} + + if len(iceHostNAT1To1IPs) != 0 { + settingsEngine.SetNAT1To1IPs(iceHostNAT1To1IPs, webrtc.ICECandidateTypeHost) + } + + if iceUDPMux != nil { + settingsEngine.SetICEUDPMux(iceUDPMux) + } + + if iceTCPMux != nil { + settingsEngine.SetICETCPMux(iceTCPMux) + settingsEngine.SetNetworkTypes([]webrtc.NetworkType{webrtc.NetworkTypeTCP4}) + } + + mediaEngine := &webrtc.MediaEngine{} + + for _, codec := range videoCodecs { + err := mediaEngine.RegisterCodec(codec, webrtc.RTPCodecTypeVideo) + if err != nil { + return nil, err + } + } + + for _, codec := range audioCodecs { + err := mediaEngine.RegisterCodec(codec, webrtc.RTPCodecTypeAudio) + if err != nil { + return nil, err + } + } + + interceptorRegistry := &interceptor.Registry{} + if err := webrtc.RegisterDefaultInterceptors(mediaEngine, interceptorRegistry); err != nil { + return nil, err + } + + return webrtc.NewAPI( + webrtc.WithSettingEngine(settingsEngine), + webrtc.WithMediaEngine(mediaEngine), + webrtc.WithInterceptorRegistry(interceptorRegistry)), nil +} + type webRTCManagerAPISessionsListRes struct { data *apiWebRTCSessionsList err error @@ -154,16 +284,14 @@ type webRTCManager struct { metrics *metrics parent webRTCManagerParent - ctx context.Context - ctxCancel func() - httpServer *webRTCHTTPServer - udpMuxLn net.PacketConn - tcpMuxLn net.Listener - sessions map[*webRTCSession]struct{} - sessionsBySecret map[uuid.UUID]*webRTCSession - iceHostNAT1To1IPs []string - iceUDPMux ice.UDPMux - iceTCPMux ice.TCPMux + ctx context.Context + ctxCancel func() + httpServer *webRTCHTTPServer + udpMuxLn net.PacketConn + tcpMuxLn net.Listener + api *webrtc.API + sessions map[*webRTCSession]struct{} + sessionsBySecret map[uuid.UUID]*webRTCSession // in chNewSession chan webRTCNewSessionReq @@ -187,12 +315,12 @@ func newWebRTCManager( iceServers []conf.WebRTCICEServer, readTimeout conf.StringDuration, readBufferCount int, - pathManager *pathManager, - metrics *metrics, - parent webRTCManagerParent, iceHostNAT1To1IPs []string, iceUDPMuxAddress string, iceTCPMuxAddress string, + pathManager *pathManager, + metrics *metrics, + parent webRTCManagerParent, ) (*webRTCManager, error) { ctx, ctxCancel := context.WithCancel(context.Background()) @@ -206,7 +334,6 @@ func newWebRTCManager( parent: parent, ctx: ctx, ctxCancel: ctxCancel, - iceHostNAT1To1IPs: iceHostNAT1To1IPs, sessions: make(map[*webRTCSession]struct{}), sessionsBySecret: make(map[uuid.UUID]*webRTCSession), chNewSession: make(chan webRTCNewSessionReq), @@ -235,6 +362,8 @@ func newWebRTCManager( return nil, err } + var iceUDPMux ice.UDPMux + if iceUDPMuxAddress != "" { m.udpMuxLn, err = net.ListenPacket(restrictNetwork("udp", iceUDPMuxAddress)) if err != nil { @@ -242,9 +371,11 @@ func newWebRTCManager( ctxCancel() return nil, err } - m.iceUDPMux = webrtc.NewICEUDPMux(nil, m.udpMuxLn) + iceUDPMux = webrtc.NewICEUDPMux(nil, m.udpMuxLn) } + var iceTCPMux ice.TCPMux + if iceTCPMuxAddress != "" { m.tcpMuxLn, err = net.Listen(restrictNetwork("tcp", iceTCPMuxAddress)) if err != nil { @@ -253,7 +384,16 @@ func newWebRTCManager( ctxCancel() return nil, err } - m.iceTCPMux = webrtc.NewICETCPMux(nil, m.tcpMuxLn, 8) + iceTCPMux = webrtc.NewICETCPMux(nil, m.tcpMuxLn, 8) + } + + m.api, err = webrtcNewAPI(iceHostNAT1To1IPs, iceUDPMux, iceTCPMux) + if err != nil { + m.udpMuxLn.Close() + m.tcpMuxLn.Close() + m.httpServer.close() + ctxCancel() + return nil, err } str := "listener opened on " + address + " (HTTP)" @@ -297,11 +437,9 @@ outer: sx := newWebRTCSession( m.ctx, m.readBufferCount, + m.api, req, &wg, - m.iceHostNAT1To1IPs, - m.iceUDPMux, - m.iceTCPMux, m.pathManager, m, ) diff --git a/internal/core/webrtc_manager_test.go b/internal/core/webrtc_manager_test.go index 99ae4f3b..d1ec4b59 100644 --- a/internal/core/webrtc_manager_test.go +++ b/internal/core/webrtc_manager_test.go @@ -2,9 +2,8 @@ package core import ( "bytes" - "io" + "context" "net/http" - "sync" "testing" "time" @@ -15,106 +14,22 @@ import ( "github.com/pion/rtp" "github.com/pion/webrtc/v3" "github.com/stretchr/testify/require" + + "github.com/bluenviron/mediamtx/internal/logger" + "github.com/bluenviron/mediamtx/internal/webrtcpc" + "github.com/bluenviron/mediamtx/internal/whip" ) -func whipGetICEServers( - t *testing.T, - hc *http.Client, - ur string, -) []webrtc.ICEServer { - req, err := http.NewRequest("OPTIONS", ur, nil) - require.NoError(t, err) +type nilLogger struct{} - res, err := hc.Do(req) - require.NoError(t, err) - defer res.Body.Close() - - require.Equal(t, http.StatusNoContent, res.StatusCode) - - link, ok := res.Header["Link"] - require.Equal(t, true, ok) - servers := linkHeaderToIceServers(link) - require.NotEqual(t, 0, len(servers)) - - return servers -} - -func whipPostOffer( - t *testing.T, - hc *http.Client, - ur string, - offer *webrtc.SessionDescription, -) (*webrtc.SessionDescription, string) { - req, err := http.NewRequest("POST", ur, bytes.NewReader([]byte(offer.SDP))) - require.NoError(t, err) - - req.Header.Set("Content-Type", "application/sdp") - - res, err := hc.Do(req) - require.NoError(t, err) - defer res.Body.Close() - - require.Equal(t, http.StatusCreated, res.StatusCode) - require.Equal(t, "application/sdp", res.Header.Get("Content-Type")) - require.Equal(t, "application/trickle-ice-sdpfrag", res.Header.Get("Accept-Patch")) - loc := req.URL.Path - if req.URL.RawQuery != "" { - loc += "?" + req.URL.RawQuery - } - require.Equal(t, loc, res.Header.Get("Location")) - - link, ok := res.Header["Link"] - require.Equal(t, true, ok) - servers := linkHeaderToIceServers(link) - require.NotEqual(t, 0, len(servers)) - - etag := res.Header.Get("E-Tag") - require.NotEqual(t, "", etag) - - require.NotEqual(t, "", res.Header.Get("ID")) - - sdp, err := io.ReadAll(res.Body) - require.NoError(t, err) - - answer := &webrtc.SessionDescription{ - Type: webrtc.SDPTypeAnswer, - SDP: string(sdp), - } - - return answer, etag -} - -func whipPostCandidate( - t *testing.T, - ur string, - offer *webrtc.SessionDescription, - etag string, - candidate *webrtc.ICECandidateInit, -) { - frag, err := marshalICEFragment(offer, []*webrtc.ICECandidateInit{candidate}) - require.NoError(t, err) - - req, err := http.NewRequest("PATCH", ur, bytes.NewReader(frag)) - require.NoError(t, err) - - req.Header.Set("Content-Type", "application/trickle-ice-sdpfrag") - req.Header.Set("If-Match", etag) - - hc := &http.Client{Transport: &http.Transport{}} - - res, err := hc.Do(req) - require.NoError(t, err) - defer res.Body.Close() - - require.Equal(t, http.StatusNoContent, res.StatusCode) +func (nilLogger) Log(_ logger.Level, _ string, _ ...interface{}) { } type webRTCTestClient struct { - pc *webrtc.PeerConnection + pc *webrtcpc.PeerConnection outgoingTrack1 *webrtc.TrackLocalStaticRTP outgoingTrack2 *webrtc.TrackLocalStaticRTP incomingTrack chan *webrtc.TrackRemote - closed chan struct{} } func newWebRTCTestClient( @@ -123,35 +38,16 @@ func newWebRTCTestClient( ur string, publish bool, ) *webRTCTestClient { - iceServers := whipGetICEServers(t, hc, ur) - - pc, err := webrtc.NewPeerConnection(webrtc.Configuration{ - ICEServers: iceServers, - }) + iceServers, err := whip.GetICEServers(context.Background(), hc, ur) require.NoError(t, err) - connected := make(chan struct{}) - closed := make(chan struct{}) - var stateChangeMutex sync.Mutex + c := &webRTCTestClient{} - pc.OnConnectionStateChange(func(state webrtc.PeerConnectionState) { - stateChangeMutex.Lock() - defer stateChangeMutex.Unlock() + api, err := webrtcNewAPI(nil, nil, nil) + require.NoError(t, err) - select { - case <-closed: - return - default: - } - - switch state { - case webrtc.PeerConnectionStateConnected: - close(connected) - - case webrtc.PeerConnectionStateClosed: - close(closed) - } - }) + pc, err := webrtcpc.New(iceServers, api, nilLogger{}) + require.NoError(t, err) var outgoingTrack1 *webrtc.TrackLocalStaticRTP var outgoingTrack2 *webrtc.TrackLocalStaticRTP @@ -198,31 +94,30 @@ func newWebRTCTestClient( offer, err := pc.CreateOffer(nil) require.NoError(t, err) - answer, etag := whipPostOffer(t, hc, ur, &offer) - - // test adding additional candidates, even if it is not mandatory here - gatheringDone := make(chan struct{}) - pc.OnICECandidate(func(i *webrtc.ICECandidate) { - if i != nil { - c := i.ToJSON() - whipPostCandidate(t, ur, &offer, etag, &c) - } else { - close(gatheringDone) - } - }) + res, err := whip.PostOffer(context.Background(), hc, ur, &offer) + require.NoError(t, err) err = pc.SetLocalDescription(offer) require.NoError(t, err) - err = pc.SetRemoteDescription(*answer) + // test adding additional candidates, even if it is not mandatory here +outer: + for { + select { + case c := <-pc.NewLocalCandidate(): + err := whip.PostCandidate(context.Background(), hc, ur, &offer, res.ETag, c) + require.NoError(t, err) + case <-pc.GatheringDone(): + break outer + } + } + + err = pc.SetRemoteDescription(*res.Answer) require.NoError(t, err) - <-gatheringDone - <-connected + <-pc.Connected() if publish { - time.Sleep(200 * time.Millisecond) - err := outgoingTrack1.WriteRTP(&rtp.Packet{ Header: rtp.Header{ Version: 2, @@ -232,7 +127,7 @@ func newWebRTCTestClient( Timestamp: 45343, SSRC: 563423, }, - Payload: []byte{0x01, 0x02, 0x03, 0x04}, + Payload: []byte{1}, }) require.NoError(t, err) @@ -245,25 +140,22 @@ func newWebRTCTestClient( Timestamp: 45343, SSRC: 563423, }, - Payload: []byte{0x01, 0x02, 0x03, 0x04}, + Payload: []byte{2}, }) require.NoError(t, err) time.Sleep(200 * time.Millisecond) } - return &webRTCTestClient{ - pc: pc, - outgoingTrack1: outgoingTrack1, - outgoingTrack2: outgoingTrack2, - incomingTrack: incomingTrack, - closed: closed, - } + c.pc = pc + c.outgoingTrack1 = outgoingTrack1 + c.outgoingTrack2 = outgoingTrack2 + c.incomingTrack = incomingTrack + return c } func (c *webRTCTestClient) close() { c.pc.Close() - <-c.closed } func TestWebRTCRead(t *testing.T) { @@ -359,7 +251,7 @@ func TestWebRTCRead(t *testing.T) { Timestamp: 45343, SSRC: 563423, }, - Payload: []byte{0x01, 0x02, 0x03, 0x04}, + Payload: []byte{3}, }) trak := <-c.incomingTrack @@ -370,13 +262,13 @@ func TestWebRTCRead(t *testing.T) { Header: rtp.Header{ Version: 2, Marker: true, - PayloadType: 102, + PayloadType: 100, SequenceNumber: pkt.SequenceNumber, Timestamp: pkt.Timestamp, SSRC: pkt.SSRC, CSRC: []uint32{}, }, - Payload: []byte{0x01, 0x02, 0x03, 0x04}, + Payload: []byte{3}, }, pkt) }) } @@ -390,7 +282,8 @@ func TestWebRTCReadNotFound(t *testing.T) { hc := &http.Client{Transport: &http.Transport{}} - iceServers := whipGetICEServers(t, hc, "http://localhost:8889/stream/whep") + iceServers, err := whip.GetICEServers(context.Background(), hc, "http://localhost:8889/stream/whep") + require.NoError(t, err) pc, err := webrtc.NewPeerConnection(webrtc.Configuration{ ICEServers: iceServers, @@ -525,7 +418,7 @@ func TestWebRTCPublish(t *testing.T) { received := make(chan struct{}) c.OnPacketRTP(medi, forma, func(pkt *rtp.Packet) { - require.Equal(t, []byte{0x05, 0x06, 0x07, 0x08}, pkt.Payload) + require.Equal(t, []byte{3}, pkt.Payload) close(received) }) @@ -541,7 +434,7 @@ func TestWebRTCPublish(t *testing.T) { Timestamp: 45343, SSRC: 563423, }, - Payload: []byte{0x05, 0x06, 0x07, 0x08}, + Payload: []byte{3}, }) require.NoError(t, err) diff --git a/internal/core/webrtc_pc.go b/internal/core/webrtc_pc.go deleted file mode 100644 index f5c9b289..00000000 --- a/internal/core/webrtc_pc.go +++ /dev/null @@ -1,279 +0,0 @@ -package core - -import ( - "strconv" - "sync" - - "github.com/pion/ice/v2" - "github.com/pion/interceptor" - "github.com/pion/webrtc/v3" - - "github.com/bluenviron/mediamtx/internal/logger" -) - -var videoCodecs = []webrtc.RTPCodecParameters{ - { - RTPCodecCapability: webrtc.RTPCodecCapability{ - MimeType: webrtc.MimeTypeAV1, - ClockRate: 90000, - }, - PayloadType: 96, - }, - { - RTPCodecCapability: webrtc.RTPCodecCapability{ - MimeType: webrtc.MimeTypeVP9, - ClockRate: 90000, - SDPFmtpLine: "profile-id=0", - }, - PayloadType: 97, - }, - { - RTPCodecCapability: webrtc.RTPCodecCapability{ - MimeType: webrtc.MimeTypeVP9, - ClockRate: 90000, - SDPFmtpLine: "profile-id=1", - }, - PayloadType: 98, - }, - { - RTPCodecCapability: webrtc.RTPCodecCapability{ - MimeType: webrtc.MimeTypeVP8, - ClockRate: 90000, - }, - PayloadType: 99, - }, - { - RTPCodecCapability: webrtc.RTPCodecCapability{ - MimeType: webrtc.MimeTypeH264, - ClockRate: 90000, - SDPFmtpLine: "level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=42001f", - }, - PayloadType: 100, - }, - { - RTPCodecCapability: webrtc.RTPCodecCapability{ - MimeType: webrtc.MimeTypeH264, - ClockRate: 90000, - SDPFmtpLine: "level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=42e01f", - }, - PayloadType: 101, - }, -} - -var audioCodecs = []webrtc.RTPCodecParameters{ - { - RTPCodecCapability: webrtc.RTPCodecCapability{ - MimeType: webrtc.MimeTypeOpus, - ClockRate: 48000, - Channels: 2, - SDPFmtpLine: "minptime=10;useinbandfec=1", - }, - PayloadType: 111, - }, - { - RTPCodecCapability: webrtc.RTPCodecCapability{ - MimeType: webrtc.MimeTypeG722, - ClockRate: 8000, - }, - PayloadType: 9, - }, - { - RTPCodecCapability: webrtc.RTPCodecCapability{ - MimeType: webrtc.MimeTypePCMU, - ClockRate: 8000, - }, - PayloadType: 0, - }, - { - RTPCodecCapability: webrtc.RTPCodecCapability{ - MimeType: webrtc.MimeTypePCMA, - ClockRate: 8000, - }, - PayloadType: 8, - }, -} - -type peerConnection struct { - *webrtc.PeerConnection - stateChangeMutex sync.Mutex - localCandidateRecv chan *webrtc.ICECandidateInit - connected chan struct{} - disconnected chan struct{} - closed chan struct{} - gatheringDone chan struct{} -} - -func newPeerConnection( - iceServers []webrtc.ICEServer, - iceHostNAT1To1IPs []string, - iceUDPMux ice.UDPMux, - iceTCPMux ice.TCPMux, - log logger.Writer, -) (*peerConnection, error) { - configuration := webrtc.Configuration{ICEServers: iceServers} - settingsEngine := webrtc.SettingEngine{} - - if len(iceHostNAT1To1IPs) != 0 { - settingsEngine.SetNAT1To1IPs(iceHostNAT1To1IPs, webrtc.ICECandidateTypeHost) - } - - if iceUDPMux != nil { - settingsEngine.SetICEUDPMux(iceUDPMux) - } - - if iceTCPMux != nil { - settingsEngine.SetICETCPMux(iceTCPMux) - settingsEngine.SetNetworkTypes([]webrtc.NetworkType{webrtc.NetworkTypeTCP4}) - } - - mediaEngine := &webrtc.MediaEngine{} - - for _, codec := range videoCodecs { - err := mediaEngine.RegisterCodec(codec, webrtc.RTPCodecTypeVideo) - if err != nil { - return nil, err - } - } - - for _, codec := range audioCodecs { - err := mediaEngine.RegisterCodec(codec, webrtc.RTPCodecTypeAudio) - if err != nil { - return nil, err - } - } - - interceptorRegistry := &interceptor.Registry{} - if err := webrtc.RegisterDefaultInterceptors(mediaEngine, interceptorRegistry); err != nil { - return nil, err - } - - api := webrtc.NewAPI( - webrtc.WithSettingEngine(settingsEngine), - webrtc.WithMediaEngine(mediaEngine), - webrtc.WithInterceptorRegistry(interceptorRegistry)) - - pc, err := api.NewPeerConnection(configuration) - if err != nil { - return nil, err - } - - co := &peerConnection{ - PeerConnection: pc, - localCandidateRecv: make(chan *webrtc.ICECandidateInit), - connected: make(chan struct{}), - disconnected: make(chan struct{}), - closed: make(chan struct{}), - gatheringDone: make(chan struct{}), - } - - pc.OnConnectionStateChange(func(state webrtc.PeerConnectionState) { - co.stateChangeMutex.Lock() - defer co.stateChangeMutex.Unlock() - - select { - case <-co.closed: - return - default: - } - - log.Log(logger.Debug, "peer connection state: "+state.String()) - - switch state { - case webrtc.PeerConnectionStateConnected: - log.Log(logger.Info, "peer connection established, local candidate: %v, remote candidate: %v", - co.localCandidate(), co.remoteCandidate()) - - close(co.connected) - - case webrtc.PeerConnectionStateDisconnected: - close(co.disconnected) - - case webrtc.PeerConnectionStateClosed: - close(co.closed) - } - }) - - pc.OnICECandidate(func(i *webrtc.ICECandidate) { - if i != nil { - v := i.ToJSON() - select { - case co.localCandidateRecv <- &v: - case <-co.connected: - case <-co.closed: - } - } else { - close(co.gatheringDone) - } - }) - - return co, nil -} - -func (co *peerConnection) close() { - co.PeerConnection.Close() - <-co.closed -} - -func (co *peerConnection) localCandidate() string { - var cid string - for _, stats := range co.GetStats() { - if tstats, ok := stats.(webrtc.ICECandidatePairStats); ok && tstats.Nominated { - cid = tstats.LocalCandidateID - break - } - } - - if cid != "" { - for _, stats := range co.GetStats() { - if tstats, ok := stats.(webrtc.ICECandidateStats); ok && tstats.ID == cid { - return tstats.CandidateType.String() + "/" + tstats.Protocol + "/" + - tstats.IP + "/" + strconv.FormatInt(int64(tstats.Port), 10) - } - } - } - - return "" -} - -func (co *peerConnection) remoteCandidate() string { - var cid string - for _, stats := range co.GetStats() { - if tstats, ok := stats.(webrtc.ICECandidatePairStats); ok && tstats.Nominated { - cid = tstats.RemoteCandidateID - break - } - } - - if cid != "" { - for _, stats := range co.GetStats() { - if tstats, ok := stats.(webrtc.ICECandidateStats); ok && tstats.ID == cid { - return tstats.CandidateType.String() + "/" + tstats.Protocol + "/" + - tstats.IP + "/" + strconv.FormatInt(int64(tstats.Port), 10) - } - } - } - - return "" -} - -func (co *peerConnection) bytesReceived() uint64 { - for _, stats := range co.GetStats() { - if tstats, ok := stats.(webrtc.TransportStats); ok { - if tstats.ID == "iceTransport" { - return tstats.BytesReceived - } - } - } - return 0 -} - -func (co *peerConnection) bytesSent() uint64 { - for _, stats := range co.GetStats() { - if tstats, ok := stats.(webrtc.TransportStats); ok { - if tstats.ID == "iceTransport" { - return tstats.BytesSent - } - } - } - return 0 -} diff --git a/internal/core/webrtc_session.go b/internal/core/webrtc_session.go index 8f6859c5..09c8458a 100644 --- a/internal/core/webrtc_session.go +++ b/internal/core/webrtc_session.go @@ -13,11 +13,11 @@ import ( "github.com/bluenviron/gortsplib/v3/pkg/media" "github.com/bluenviron/gortsplib/v3/pkg/ringbuffer" "github.com/google/uuid" - "github.com/pion/ice/v2" "github.com/pion/sdp/v3" "github.com/pion/webrtc/v3" "github.com/bluenviron/mediamtx/internal/logger" + "github.com/bluenviron/mediamtx/internal/webrtcpc" ) type trackRecvPair struct { @@ -25,7 +25,7 @@ type trackRecvPair struct { receiver *webrtc.RTPReceiver } -func mediasOfOutgoingTracks(tracks []*webRTCOutgoingTrack) media.Medias { +func webrtcMediasOfOutgoingTracks(tracks []*webRTCOutgoingTrack) media.Medias { ret := make(media.Medias, len(tracks)) for i, track := range tracks { ret[i] = track.media @@ -33,7 +33,7 @@ func mediasOfOutgoingTracks(tracks []*webRTCOutgoingTrack) media.Medias { return ret } -func mediasOfIncomingTracks(tracks []*webRTCIncomingTrack) media.Medias { +func webrtcMediasOfIncomingTracks(tracks []*webRTCIncomingTrack) media.Medias { ret := make(media.Medias, len(tracks)) for i, track := range tracks { ret[i] = track.media @@ -41,9 +41,16 @@ func mediasOfIncomingTracks(tracks []*webRTCIncomingTrack) media.Medias { return ret } -func waitUntilConnected( +func whipOffer(body []byte) *webrtc.SessionDescription { + return &webrtc.SessionDescription{ + Type: webrtc.SDPTypeOffer, + SDP: string(body), + } +} + +func webrtcWaitUntilConnected( ctx context.Context, - pc *peerConnection, + pc *webrtcpc.PeerConnection, ) error { t := time.NewTimer(webrtcHandshakeTimeout) defer t.Stop() @@ -54,7 +61,7 @@ outer: case <-t.C: return fmt.Errorf("deadline exceeded while waiting connection") - case <-pc.connected: + case <-pc.Connected(): break outer case <-ctx.Done(): @@ -65,7 +72,7 @@ outer: return nil } -func gatherOutgoingTracks(medias media.Medias) ([]*webRTCOutgoingTrack, error) { +func webrtcGatherOutgoingTracks(medias media.Medias) ([]*webRTCOutgoingTrack, error) { var tracks []*webRTCOutgoingTrack videoTrack, err := newWebRTCOutgoingTrackVideo(medias) @@ -94,9 +101,38 @@ func gatherOutgoingTracks(medias media.Medias) ([]*webRTCOutgoingTrack, error) { return tracks, nil } -func gatherIncomingTracks( +func webrtcTrackCount(medias []*sdp.MediaDescription) (int, error) { + videoTrack := false + audioTrack := false + trackCount := 0 + + for _, media := range medias { + switch media.MediaName.Media { + case "video": + if videoTrack { + return 0, fmt.Errorf("only a single video and a single audio track are supported") + } + videoTrack = true + + case "audio": + if audioTrack { + return 0, fmt.Errorf("only a single video and a single audio track are supported") + } + audioTrack = true + + default: + return 0, fmt.Errorf("unsupported media '%s'", media.MediaName.Media) + } + + trackCount++ + } + + return trackCount, nil +} + +func webrtcGatherIncomingTracks( ctx context.Context, - pc *peerConnection, + pc *webrtcpc.PeerConnection, trackRecv chan trackRecvPair, trackCount int, ) ([]*webRTCIncomingTrack, error) { @@ -108,6 +144,9 @@ func gatherIncomingTracks( for { select { case <-t.C: + if trackCount == 0 { + return tracks, nil + } return nil, fmt.Errorf("deadline exceeded while waiting tracks") case pair := <-trackRecv: @@ -121,7 +160,7 @@ func gatherIncomingTracks( return tracks, nil } - case <-pc.disconnected: + case <-pc.Disconnected(): return nil, fmt.Errorf("peer connection closed") case <-ctx.Done(): @@ -136,14 +175,12 @@ type webRTCSessionPathManager interface { } type webRTCSession struct { - readBufferCount int - req webRTCNewSessionReq - wg *sync.WaitGroup - iceHostNAT1To1IPs []string - iceUDPMux ice.UDPMux - iceTCPMux ice.TCPMux - pathManager webRTCSessionPathManager - parent *webRTCManager + readBufferCount int + api *webrtc.API + req webRTCNewSessionReq + wg *sync.WaitGroup + pathManager webRTCSessionPathManager + parent *webRTCManager ctx context.Context ctxCancel func() @@ -151,7 +188,7 @@ type webRTCSession struct { uuid uuid.UUID secret uuid.UUID mutex sync.RWMutex - pc *peerConnection + pc *webrtcpc.PeerConnection chNew chan webRTCNewSessionReq chAddCandidates chan webRTCAddSessionCandidatesReq @@ -160,32 +197,28 @@ type webRTCSession struct { func newWebRTCSession( parentCtx context.Context, readBufferCount int, + api *webrtc.API, req webRTCNewSessionReq, wg *sync.WaitGroup, - iceHostNAT1To1IPs []string, - iceUDPMux ice.UDPMux, - iceTCPMux ice.TCPMux, pathManager webRTCSessionPathManager, parent *webRTCManager, ) *webRTCSession { ctx, ctxCancel := context.WithCancel(parentCtx) s := &webRTCSession{ - readBufferCount: readBufferCount, - req: req, - wg: wg, - iceHostNAT1To1IPs: iceHostNAT1To1IPs, - iceUDPMux: iceUDPMux, - iceTCPMux: iceTCPMux, - parent: parent, - pathManager: pathManager, - ctx: ctx, - ctxCancel: ctxCancel, - created: time.Now(), - uuid: uuid.New(), - secret: uuid.New(), - chNew: make(chan webRTCNewSessionReq), - chAddCandidates: make(chan webRTCAddSessionCandidatesReq), + readBufferCount: readBufferCount, + api: api, + req: req, + wg: wg, + parent: parent, + pathManager: pathManager, + ctx: ctx, + ctxCancel: ctxCancel, + created: time.Now(), + uuid: uuid.New(), + secret: uuid.New(), + chNew: make(chan webRTCNewSessionReq), + chAddCandidates: make(chan webRTCAddSessionCandidatesReq), } s.Log(logger.Info, "created by %s", req.remoteAddr) @@ -276,18 +309,16 @@ func (s *webRTCSession) runPublish() (int, error) { return http.StatusInternalServerError, err } - pc, err := newPeerConnection( + pc, err := webrtcpc.New( servers, - s.iceHostNAT1To1IPs, - s.iceUDPMux, - s.iceTCPMux, + s.api, s) if err != nil { return http.StatusBadRequest, err } - defer pc.close() + defer pc.Close() - offer := s.offer() + offer := whipOffer(s.req.offer) var sdp sdp.SessionDescription err = sdp.Unmarshal([]byte(offer.SDP)) @@ -295,29 +326,9 @@ func (s *webRTCSession) runPublish() (int, error) { return http.StatusBadRequest, err } - videoTrack := false - audioTrack := false - trackCount := 0 - - for _, media := range sdp.MediaDescriptions { - switch media.MediaName.Media { - case "video": - if videoTrack { - return http.StatusBadRequest, fmt.Errorf("only a single video and a single audio track are supported") - } - videoTrack = true - - case "audio": - if audioTrack { - return http.StatusBadRequest, fmt.Errorf("only a single video and a single audio track are supported") - } - audioTrack = true - - default: - return http.StatusBadRequest, fmt.Errorf("unsupported media '%s'", media.MediaName.Media) - } - - trackCount++ + trackCount, err := webrtcTrackCount(sdp.MediaDescriptions) + if err != nil { + return http.StatusBadRequest, err } _, err = pc.AddTransceiverFromKind(webrtc.RTPCodecTypeVideo, webrtc.RtpTransceiverInit{ @@ -339,7 +350,7 @@ func (s *webRTCSession) runPublish() (int, error) { pc.OnTrack(func(track *webrtc.TrackRemote, receiver *webrtc.RTPReceiver) { select { case trackRecv <- trackRecvPair{track, receiver}: - case <-pc.closed: + case <-s.ctx.Done(): } }) @@ -358,19 +369,16 @@ func (s *webRTCSession) runPublish() (int, error) { return http.StatusBadRequest, err } - err = s.waitGatheringDone(pc) + err = pc.WaitGatheringDone(s.ctx) if err != nil { return http.StatusBadRequest, err } - tmp := pc.LocalDescription() - answer = *tmp - - s.writeAnswer(&answer) + s.writeAnswer(pc.LocalDescription()) go s.readRemoteCandidates(pc) - err = waitUntilConnected(s.ctx, pc) + err = webrtcWaitUntilConnected(s.ctx, pc) if err != nil { return 0, err } @@ -379,11 +387,11 @@ func (s *webRTCSession) runPublish() (int, error) { s.pc = pc s.mutex.Unlock() - tracks, err := gatherIncomingTracks(s.ctx, pc, trackRecv, trackCount) + tracks, err := webrtcGatherIncomingTracks(s.ctx, pc, trackRecv, trackCount) if err != nil { return 0, err } - medias := mediasOfIncomingTracks(tracks) + medias := webrtcMediasOfIncomingTracks(tracks) rres := res.path.startPublisher(pathStartPublisherReq{ author: s, @@ -403,7 +411,7 @@ func (s *webRTCSession) runPublish() (int, error) { } select { - case <-pc.disconnected: + case <-pc.Disconnected(): return 0, fmt.Errorf("peer connection closed") case <-s.ctx.Done(): @@ -443,7 +451,7 @@ func (s *webRTCSession) runRead() (int, error) { defer res.path.removeReader(pathRemoveReaderReq{author: s}) - tracks, err := gatherOutgoingTracks(res.stream.Medias()) + tracks, err := webrtcGatherOutgoingTracks(res.stream.Medias()) if err != nil { return http.StatusBadRequest, err } @@ -453,16 +461,14 @@ func (s *webRTCSession) runRead() (int, error) { return http.StatusInternalServerError, err } - pc, err := newPeerConnection( + pc, err := webrtcpc.New( servers, - s.iceHostNAT1To1IPs, - s.iceUDPMux, - s.iceTCPMux, + s.api, s) if err != nil { return http.StatusBadRequest, err } - defer pc.close() + defer pc.Close() for _, track := range tracks { var err error @@ -472,7 +478,7 @@ func (s *webRTCSession) runRead() (int, error) { } } - offer := s.offer() + offer := whipOffer(s.req.offer) err = pc.SetRemoteDescription(*offer) if err != nil { @@ -489,19 +495,16 @@ func (s *webRTCSession) runRead() (int, error) { return http.StatusBadRequest, err } - err = s.waitGatheringDone(pc) + err = pc.WaitGatheringDone(s.ctx) if err != nil { return http.StatusBadRequest, err } - tmp := pc.LocalDescription() - answer = *tmp - - s.writeAnswer(&answer) + s.writeAnswer(pc.LocalDescription()) go s.readRemoteCandidates(pc) - err = waitUntilConnected(s.ctx, pc) + err = webrtcWaitUntilConnected(s.ctx, pc) if err != nil { return 0, err } @@ -522,7 +525,7 @@ func (s *webRTCSession) runRead() (int, error) { defer res.stream.RemoveReader(s) s.Log(logger.Info, "is reading from path '%s', %s", - res.path.name, sourceMediaInfo(mediasOfOutgoingTracks(tracks))) + res.path.name, sourceMediaInfo(webrtcMediasOfOutgoingTracks(tracks))) go func() { for { @@ -535,7 +538,7 @@ func (s *webRTCSession) runRead() (int, error) { }() select { - case <-pc.disconnected: + case <-pc.Disconnected(): return 0, fmt.Errorf("peer connection closed") case err := <-writeError: @@ -546,25 +549,6 @@ func (s *webRTCSession) runRead() (int, error) { } } -func (s *webRTCSession) offer() *webrtc.SessionDescription { - return &webrtc.SessionDescription{ - Type: webrtc.SDPTypeOffer, - SDP: string(s.req.offer), - } -} - -func (s *webRTCSession) waitGatheringDone(pc *peerConnection) error { - for { - select { - case <-pc.localCandidateRecv: - case <-pc.gatheringDone: - return nil - case <-s.ctx.Done(): - return fmt.Errorf("terminated") - } - } -} - func (s *webRTCSession) writeAnswer(answer *webrtc.SessionDescription) { s.req.res <- webRTCNewSessionRes{ sx: s, @@ -572,7 +556,7 @@ func (s *webRTCSession) writeAnswer(answer *webrtc.SessionDescription) { } } -func (s *webRTCSession) readRemoteCandidates(pc *peerConnection) { +func (s *webRTCSession) readRemoteCandidates(pc *webrtcpc.PeerConnection) { for { select { case req := <-s.chAddCandidates: @@ -639,10 +623,10 @@ func (s *webRTCSession) apiItem() *apiWebRTCSession { if s.pc != nil { peerConnectionEstablished = true - localCandidate = s.pc.localCandidate() - remoteCandidate = s.pc.remoteCandidate() - bytesReceived = s.pc.bytesReceived() - bytesSent = s.pc.bytesSent() + localCandidate = s.pc.LocalCandidate() + remoteCandidate = s.pc.RemoteCandidate() + bytesReceived = s.pc.BytesReceived() + bytesSent = s.pc.BytesSent() } return &apiWebRTCSession{ diff --git a/internal/core/webrtc_source.go b/internal/core/webrtc_source.go new file mode 100644 index 00000000..fe086e53 --- /dev/null +++ b/internal/core/webrtc_source.go @@ -0,0 +1,175 @@ +package core + +import ( + "context" + "fmt" + "net/http" + "net/url" + "strings" + "time" + + "github.com/pion/sdp/v3" + "github.com/pion/webrtc/v3" + + "github.com/bluenviron/mediamtx/internal/conf" + "github.com/bluenviron/mediamtx/internal/logger" + "github.com/bluenviron/mediamtx/internal/webrtcpc" + "github.com/bluenviron/mediamtx/internal/whip" +) + +type webRTCSourceParent interface { + logger.Writer + setReady(req pathSourceStaticSetReadyReq) pathSourceStaticSetReadyRes + setNotReady(req pathSourceStaticSetNotReadyReq) +} + +type webRTCSource struct { + readTimeout conf.StringDuration + + parent webRTCSourceParent +} + +func newWebRTCSource( + readTimeout conf.StringDuration, + parent webRTCSourceParent, +) *webRTCSource { + s := &webRTCSource{ + readTimeout: readTimeout, + parent: parent, + } + + return s +} + +func (s *webRTCSource) Log(level logger.Level, format string, args ...interface{}) { + s.parent.Log(level, "[WebRTC source] "+format, args...) +} + +// run implements sourceStaticImpl. +func (s *webRTCSource) run(ctx context.Context, cnf *conf.PathConf, _ chan *conf.PathConf) error { + s.Log(logger.Debug, "connecting") + + u, err := url.Parse(cnf.Source) + if err != nil { + return err + } + + u.Scheme = strings.ReplaceAll(u.Scheme, "whep", "http") + + c := &http.Client{ + Timeout: time.Duration(s.readTimeout), + } + + iceServers, err := whip.GetICEServers(ctx, c, u.String()) + if err != nil { + return err + } + + api, err := webrtcNewAPI(nil, nil, nil) + if err != nil { + return err + } + + pc, err := webrtcpc.New(iceServers, api, s) + if err != nil { + return err + } + defer pc.Close() + + _, err = pc.AddTransceiverFromKind(webrtc.RTPCodecTypeVideo) + if err != nil { + return err + } + + _, err = pc.AddTransceiverFromKind(webrtc.RTPCodecTypeAudio) + if err != nil { + return err + } + + offer, err := pc.CreateOffer(nil) + if err != nil { + return err + } + + err = pc.SetLocalDescription(offer) + if err != nil { + return err + } + + err = pc.WaitGatheringDone(ctx) + if err != nil { + return err + } + + res, err := whip.PostOffer(ctx, c, u.String(), pc.LocalDescription()) + if err != nil { + return err + } + + var sdp sdp.SessionDescription + err = sdp.Unmarshal([]byte(res.Answer.SDP)) + if err != nil { + return err + } + + // check that there are at most two tracks + _, err = webrtcTrackCount(sdp.MediaDescriptions) + if err != nil { + return err + } + + trackRecv := make(chan trackRecvPair) + + pc.OnTrack(func(track *webrtc.TrackRemote, receiver *webrtc.RTPReceiver) { + select { + case trackRecv <- trackRecvPair{track, receiver}: + case <-ctx.Done(): + } + }) + + err = pc.SetRemoteDescription(*res.Answer) + if err != nil { + return err + } + + err = webrtcWaitUntilConnected(ctx, pc) + if err != nil { + return err + } + + tracks, err := webrtcGatherIncomingTracks(ctx, pc, trackRecv, 0) + if err != nil { + return err + } + medias := webrtcMediasOfIncomingTracks(tracks) + + rres := s.parent.setReady(pathSourceStaticSetReadyReq{ + medias: medias, + generateRTPPackets: true, + }) + if rres.err != nil { + return rres.err + } + + defer s.parent.setNotReady(pathSourceStaticSetNotReadyReq{}) + + for _, track := range tracks { + track.start(rres.stream) + } + + select { + case <-pc.Disconnected(): + return fmt.Errorf("peer connection closed") + + case <-ctx.Done(): + return fmt.Errorf("terminated") + } +} + +// apiSourceDescribe implements sourceStaticImpl. +func (*webRTCSource) apiSourceDescribe() pathAPISourceOrReader { + return pathAPISourceOrReader{ + Type: "webRTCSource", + ID: "", + } +} diff --git a/internal/core/webrtc_source_test.go b/internal/core/webrtc_source_test.go new file mode 100644 index 00000000..2e9c9e0a --- /dev/null +++ b/internal/core/webrtc_source_test.go @@ -0,0 +1,188 @@ +package core + +import ( + "context" + "io" + "net" + "net/http" + "testing" + + "github.com/bluenviron/gortsplib/v3" + "github.com/bluenviron/gortsplib/v3/pkg/formats" + "github.com/bluenviron/gortsplib/v3/pkg/url" + "github.com/pion/rtp" + "github.com/pion/webrtc/v3" + "github.com/stretchr/testify/require" + + "github.com/bluenviron/mediamtx/internal/webrtcpc" +) + +func TestWebRTCSource(t *testing.T) { + state := 0 + + api, err := webrtcNewAPI(nil, nil, nil) + require.NoError(t, err) + + pc, err := webrtcpc.New(nil, api, nilLogger{}) + require.NoError(t, err) + defer pc.Close() + + outgoingTrack1, err := webrtc.NewTrackLocalStaticRTP( + webrtc.RTPCodecCapability{ + MimeType: webrtc.MimeTypeVP8, + ClockRate: 90000, + }, + "vp8", + webrtcStreamID, + ) + require.NoError(t, err) + + _, err = pc.AddTrack(outgoingTrack1) + require.NoError(t, err) + + outgoingTrack2, err := webrtc.NewTrackLocalStaticRTP( + webrtc.RTPCodecCapability{ + MimeType: webrtc.MimeTypeOpus, + ClockRate: 48000, + Channels: 2, + }, + "opus", + webrtcStreamID, + ) + require.NoError(t, err) + + _, err = pc.AddTrack(outgoingTrack2) + require.NoError(t, err) + + httpServ := &http.Server{ + Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch state { + case 0: + require.Equal(t, http.MethodOptions, r.Method) + require.Equal(t, "/my/resource", r.URL.Path) + + w.Header().Set("Access-Control-Allow-Methods", "OPTIONS, GET, POST, PATCH") + w.Header().Set("Access-Control-Allow-Headers", "Authorization, Content-Type, If-Match") + w.WriteHeader(http.StatusNoContent) + + case 1: + require.Equal(t, http.MethodPost, r.Method) + require.Equal(t, "/my/resource", r.URL.Path) + require.Equal(t, "application/sdp", r.Header.Get("Content-Type")) + + body, err := io.ReadAll(r.Body) + require.NoError(t, err) + offer := whipOffer(body) + + err = pc.SetRemoteDescription(*offer) + require.NoError(t, err) + + answer, err := pc.CreateAnswer(nil) + require.NoError(t, err) + + err = pc.SetLocalDescription(answer) + require.NoError(t, err) + + err = pc.WaitGatheringDone(context.Background()) + require.NoError(t, err) + + w.Header().Set("Content-Type", "application/sdp") + w.Header().Set("Accept-Patch", "application/trickle-ice-sdpfrag") + w.Header().Set("E-Tag", "test_etag") + w.Header().Set("Location", "/my/resource/sessionid") + w.WriteHeader(http.StatusCreated) + w.Write([]byte(pc.LocalDescription().SDP)) + + go func() { + <-pc.Connected() + + err = outgoingTrack1.WriteRTP(&rtp.Packet{ + Header: rtp.Header{ + Version: 2, + Marker: true, + PayloadType: 96, + SequenceNumber: 123, + Timestamp: 45343, + SSRC: 563423, + }, + Payload: []byte{1}, + }) + require.NoError(t, err) + + err = outgoingTrack2.WriteRTP(&rtp.Packet{ + Header: rtp.Header{ + Version: 2, + Marker: true, + PayloadType: 97, + SequenceNumber: 1123, + Timestamp: 45343, + SSRC: 563423, + }, + Payload: []byte{2}, + }) + require.NoError(t, err) + }() + + default: + t.Errorf("should not happen since there should not be additional candidates") + } + state++ + }), + } + + ln, err := net.Listen("tcp", "localhost:5555") + require.NoError(t, err) + + go httpServ.Serve(ln) + defer httpServ.Shutdown(context.Background()) + + p, ok := newInstance("paths:\n" + + " proxied:\n" + + " source: whep://localhost:5555/my/resource\n" + + " sourceOnDemand: yes\n") + require.Equal(t, true, ok) + defer p.Close() + + c := gortsplib.Client{} + + u, err := url.Parse("rtsp://127.0.0.1:8554/proxied") + require.NoError(t, err) + + err = c.Start(u.Scheme, u.Host) + require.NoError(t, err) + defer c.Close() + + medias, baseURL, _, err := c.Describe(u) + require.NoError(t, err) + + var forma *formats.VP8 + medi := medias.FindFormat(&forma) + + _, err = c.Setup(medi, baseURL, 0, 0) + require.NoError(t, err) + + received := make(chan struct{}) + + c.OnPacketRTP(medi, forma, func(pkt *rtp.Packet) { + require.Equal(t, []byte{3}, pkt.Payload) + close(received) + }) + + _, err = c.Play(nil) + require.NoError(t, err) + + err = outgoingTrack1.WriteRTP(&rtp.Packet{ + Header: rtp.Header{ + Version: 2, + Marker: true, + PayloadType: 96, + SequenceNumber: 124, + Timestamp: 45343, + SSRC: 563423, + }, + Payload: []byte{3}, + }) + require.NoError(t, err) + + <-received +} diff --git a/internal/highleveltests/hls_manager_test.go b/internal/highleveltests/hls_manager_test.go new file mode 100644 index 00000000..3f83aeab --- /dev/null +++ b/internal/highleveltests/hls_manager_test.go @@ -0,0 +1,93 @@ +//go:build enable_highlevel_tests +// +build enable_highlevel_tests + +package highleveltests + +import ( + "net/http" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestHLSServerRead(t *testing.T) { + p, ok := newInstance("paths:\n" + + " all:\n") + require.Equal(t, true, ok) + defer p.Close() + + cnt1, err := newContainer("ffmpeg", "source", []string{ + "-re", + "-stream_loop", "-1", + "-i", "emptyvideo.mkv", + "-c", "copy", + "-f", "rtsp", + "rtsp://127.0.0.1:8554/test/stream", + }) + require.NoError(t, err) + defer cnt1.close() + + time.Sleep(1 * time.Second) + + cnt2, err := newContainer("ffmpeg", "dest", []string{ + "-i", "http://127.0.0.1:8888/test/stream/index.m3u8", + "-vframes", "1", + "-f", "image2", + "-y", "/dev/null", + }) + require.NoError(t, err) + defer cnt2.close() + require.Equal(t, 0, cnt2.wait()) +} + +func TestHLSServerAuth(t *testing.T) { + for _, result := range []string{ + "success", + "fail", + } { + t.Run(result, func(t *testing.T) { + conf := "paths:\n" + + " all:\n" + + " readUser: testreader\n" + + " readPass: testpass\n" + + " readIPs: [127.0.0.0/16]\n" + + p, ok := newInstance(conf) + require.Equal(t, true, ok) + defer p.Close() + + cnt1, err := newContainer("ffmpeg", "source", []string{ + "-re", + "-stream_loop", "-1", + "-i", "emptyvideo.mkv", + "-c", "copy", + "-f", "rtsp", + "rtsp://testpublisher:testpass@127.0.0.1:8554/teststream?param=value", + }) + require.NoError(t, err) + defer cnt1.close() + + time.Sleep(1 * time.Second) + + var usr string + if result == "success" { + usr = "testreader" + } else { + usr = "testreader2" + } + + hc := &http.Client{Transport: &http.Transport{}} + + res, err := hc.Get("http://" + usr + ":testpass@127.0.0.1:8888/teststream/index.m3u8?param=value") + require.NoError(t, err) + defer res.Body.Close() + + if result == "success" { + require.Equal(t, http.StatusOK, res.StatusCode) + } else { + require.Equal(t, http.StatusUnauthorized, res.StatusCode) + } + }) + } +} diff --git a/internal/highleveltests/hls_server_test.go b/internal/highleveltests/hls_server_test.go deleted file mode 100644 index dcb631f4..00000000 --- a/internal/highleveltests/hls_server_test.go +++ /dev/null @@ -1,182 +0,0 @@ -//go:build enable_highlevel_tests -// +build enable_highlevel_tests - -package highleveltests - -import ( - "context" - "encoding/json" - "net" - "net/http" - "testing" - "time" - - "github.com/gin-gonic/gin" - "github.com/stretchr/testify/require" -) - -type testHTTPAuthenticator struct { - action string - - s *http.Server -} - -func newTestHTTPAuthenticator(t *testing.T, action string) *testHTTPAuthenticator { - ln, err := net.Listen("tcp", "127.0.0.1:9120") - require.NoError(t, err) - - ts := &testHTTPAuthenticator{ - action: action, - } - - router := gin.New() - router.POST("/auth", ts.onAuth) - - ts.s = &http.Server{Handler: router} - go ts.s.Serve(ln) - - return ts -} - -func (ts *testHTTPAuthenticator) close() { - ts.s.Shutdown(context.Background()) -} - -func (ts *testHTTPAuthenticator) onAuth(ctx *gin.Context) { - var in struct { - IP string `json:"ip"` - User string `json:"user"` - Password string `json:"password"` - Path string `json:"path"` - Action string `json:"action"` - Query string `json:"query"` - } - err := json.NewDecoder(ctx.Request.Body).Decode(&in) - if err != nil { - ctx.AbortWithStatus(http.StatusBadRequest) - return - } - - var user string - if ts.action == "publish" { - user = "testpublisher" - } else { - user = "testreader" - } - - if in.IP != "127.0.0.1" || - in.User != user || - in.Password != "testpass" || - in.Path != "teststream" || - in.Action != ts.action || - (in.Query != "user=testreader&pass=testpass¶m=value" && - in.Query != "user=testpublisher&pass=testpass¶m=value" && - in.Query != "param=value") { - ctx.AbortWithStatus(http.StatusBadRequest) - return - } -} - -func TestHLSServerRead(t *testing.T) { - p, ok := newInstance("paths:\n" + - " all:\n") - require.Equal(t, true, ok) - defer p.Close() - - cnt1, err := newContainer("ffmpeg", "source", []string{ - "-re", - "-stream_loop", "-1", - "-i", "emptyvideo.mkv", - "-c", "copy", - "-f", "rtsp", - "rtsp://127.0.0.1:8554/test/stream", - }) - require.NoError(t, err) - defer cnt1.close() - - time.Sleep(1 * time.Second) - - cnt2, err := newContainer("ffmpeg", "dest", []string{ - "-i", "http://127.0.0.1:8888/test/stream/index.m3u8", - "-vframes", "1", - "-f", "image2", - "-y", "/dev/null", - }) - require.NoError(t, err) - defer cnt2.close() - require.Equal(t, 0, cnt2.wait()) -} - -func TestHLSServerAuth(t *testing.T) { - for _, mode := range []string{ - "internal", - "external", - } { - for _, result := range []string{ - "success", - "fail", - } { - t.Run(mode+"_"+result, func(t *testing.T) { - var conf string - if mode == "internal" { - conf = "paths:\n" + - " all:\n" + - " readUser: testreader\n" + - " readPass: testpass\n" + - " readIPs: [127.0.0.0/16]\n" - } else { - conf = "externalAuthenticationURL: http://127.0.0.1:9120/auth\n" + - "paths:\n" + - " all:\n" - } - - p, ok := newInstance(conf) - require.Equal(t, true, ok) - defer p.Close() - - var a *testHTTPAuthenticator - if mode == "external" { - a = newTestHTTPAuthenticator(t, "publish") - } - - cnt1, err := newContainer("ffmpeg", "source", []string{ - "-re", - "-stream_loop", "-1", - "-i", "emptyvideo.mkv", - "-c", "copy", - "-f", "rtsp", - "rtsp://testpublisher:testpass@127.0.0.1:8554/teststream?param=value", - }) - require.NoError(t, err) - defer cnt1.close() - - time.Sleep(1 * time.Second) - - if mode == "external" { - a.close() - a = newTestHTTPAuthenticator(t, "read") - defer a.close() - } - - var usr string - if result == "success" { - usr = "testreader" - } else { - usr = "testreader2" - } - - hc := &http.Client{Transport: &http.Transport{}} - - res, err := hc.Get("http://" + usr + ":testpass@127.0.0.1:8888/teststream/index.m3u8?param=value") - require.NoError(t, err) - defer res.Body.Close() - - if result == "success" { - require.Equal(t, http.StatusOK, res.StatusCode) - } else { - require.Equal(t, http.StatusUnauthorized, res.StatusCode) - } - }) - } - } -} diff --git a/internal/webrtcpc/pc.go b/internal/webrtcpc/pc.go new file mode 100644 index 00000000..b9bca32b --- /dev/null +++ b/internal/webrtcpc/pc.go @@ -0,0 +1,196 @@ +// Package webrtcpc contains a WebRTC peer connection wrapper. +package webrtcpc + +import ( + "context" + "fmt" + "strconv" + "sync" + + "github.com/pion/webrtc/v3" + + "github.com/bluenviron/mediamtx/internal/logger" +) + +// PeerConnection is a wrapper around webrtc.PeerConnection. +type PeerConnection struct { + *webrtc.PeerConnection + stateChangeMutex sync.Mutex + newLocalCandidate chan *webrtc.ICECandidateInit + connected chan struct{} + disconnected chan struct{} + closed chan struct{} + gatheringDone chan struct{} +} + +// New allocates a PeerConnection. +func New( + iceServers []webrtc.ICEServer, + api *webrtc.API, + log logger.Writer, +) (*PeerConnection, error) { + configuration := webrtc.Configuration{ICEServers: iceServers} + + pc, err := api.NewPeerConnection(configuration) + if err != nil { + return nil, err + } + + co := &PeerConnection{ + PeerConnection: pc, + newLocalCandidate: make(chan *webrtc.ICECandidateInit), + connected: make(chan struct{}), + disconnected: make(chan struct{}), + closed: make(chan struct{}), + gatheringDone: make(chan struct{}), + } + + pc.OnConnectionStateChange(func(state webrtc.PeerConnectionState) { + co.stateChangeMutex.Lock() + defer co.stateChangeMutex.Unlock() + + select { + case <-co.closed: + return + default: + } + + log.Log(logger.Debug, "peer connection state: "+state.String()) + + switch state { + case webrtc.PeerConnectionStateConnected: + log.Log(logger.Info, "peer connection established, local candidate: %v, remote candidate: %v", + co.LocalCandidate(), co.RemoteCandidate()) + + close(co.connected) + + case webrtc.PeerConnectionStateDisconnected: + close(co.disconnected) + + case webrtc.PeerConnectionStateClosed: + close(co.closed) + } + }) + + pc.OnICECandidate(func(i *webrtc.ICECandidate) { + if i != nil { + v := i.ToJSON() + select { + case co.newLocalCandidate <- &v: + case <-co.connected: + case <-co.closed: + } + } else { + close(co.gatheringDone) + } + }) + + return co, nil +} + +// Close closes the connection. +func (co *PeerConnection) Close() { + co.PeerConnection.Close() + <-co.closed +} + +// Connected returns when connected. +func (co *PeerConnection) Connected() <-chan struct{} { + return co.connected +} + +// Disconnected returns when disconnected. +func (co *PeerConnection) Disconnected() <-chan struct{} { + return co.disconnected +} + +// NewLocalCandidate returns when there's a new local candidate. +func (co *PeerConnection) NewLocalCandidate() <-chan *webrtc.ICECandidateInit { + return co.newLocalCandidate +} + +// GatheringDone returns when candidate gathering is complete. +func (co *PeerConnection) GatheringDone() <-chan struct{} { + return co.gatheringDone +} + +// WaitGatheringDone waits until candidate gathering is complete. +func (co *PeerConnection) WaitGatheringDone(ctx context.Context) error { + for { + select { + case <-co.NewLocalCandidate(): + case <-co.GatheringDone(): + return nil + case <-ctx.Done(): + return fmt.Errorf("terminated") + } + } +} + +// LocalCandidate returns the local candidate. +func (co *PeerConnection) LocalCandidate() string { + var cid string + for _, stats := range co.GetStats() { + if tstats, ok := stats.(webrtc.ICECandidatePairStats); ok && tstats.Nominated { + cid = tstats.LocalCandidateID + break + } + } + + if cid != "" { + for _, stats := range co.GetStats() { + if tstats, ok := stats.(webrtc.ICECandidateStats); ok && tstats.ID == cid { + return tstats.CandidateType.String() + "/" + tstats.Protocol + "/" + + tstats.IP + "/" + strconv.FormatInt(int64(tstats.Port), 10) + } + } + } + + return "" +} + +// RemoteCandidate returns the remote candidate. +func (co *PeerConnection) RemoteCandidate() string { + var cid string + for _, stats := range co.GetStats() { + if tstats, ok := stats.(webrtc.ICECandidatePairStats); ok && tstats.Nominated { + cid = tstats.RemoteCandidateID + break + } + } + + if cid != "" { + for _, stats := range co.GetStats() { + if tstats, ok := stats.(webrtc.ICECandidateStats); ok && tstats.ID == cid { + return tstats.CandidateType.String() + "/" + tstats.Protocol + "/" + + tstats.IP + "/" + strconv.FormatInt(int64(tstats.Port), 10) + } + } + } + + return "" +} + +// BytesReceived returns received bytes. +func (co *PeerConnection) BytesReceived() uint64 { + for _, stats := range co.GetStats() { + if tstats, ok := stats.(webrtc.TransportStats); ok { + if tstats.ID == "iceTransport" { + return tstats.BytesReceived + } + } + } + return 0 +} + +// BytesSent returns sent bytes. +func (co *PeerConnection) BytesSent() uint64 { + for _, stats := range co.GetStats() { + if tstats, ok := stats.(webrtc.TransportStats); ok { + if tstats.ID == "iceTransport" { + return tstats.BytesSent + } + } + } + return 0 +} diff --git a/internal/websocket/serverconn_test.go b/internal/websocket/serverconn_test.go index cc0407fa..6761986d 100644 --- a/internal/websocket/serverconn_test.go +++ b/internal/websocket/serverconn_test.go @@ -15,22 +15,22 @@ func TestServerConn(t *testing.T) { pingReceived := make(chan struct{}) pingInterval = 100 * time.Millisecond - handler := func(w http.ResponseWriter, r *http.Request) { - c, err := NewServerConn(w, r) - require.NoError(t, err) - defer c.Close() + s := &http.Server{ + Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + c, err := NewServerConn(w, r) + require.NoError(t, err) + defer c.Close() - err = c.WriteJSON("testing") - require.NoError(t, err) + err = c.WriteJSON("testing") + require.NoError(t, err) - <-pingReceived + <-pingReceived + }), } ln, err := net.Listen("tcp", "localhost:6344") require.NoError(t, err) - defer ln.Close() - s := &http.Server{Handler: http.HandlerFunc(handler)} go s.Serve(ln) defer s.Shutdown(context.Background()) diff --git a/internal/whip/get_ice_servers.go b/internal/whip/get_ice_servers.go new file mode 100644 index 00000000..907026ae --- /dev/null +++ b/internal/whip/get_ice_servers.go @@ -0,0 +1,33 @@ +package whip + +import ( + "context" + "fmt" + "net/http" + + "github.com/pion/webrtc/v3" +) + +// GetICEServers posts a WHIP/WHEP request for ICE servers. +func GetICEServers( + ctx context.Context, + hc *http.Client, + ur string, +) ([]webrtc.ICEServer, error) { + req, err := http.NewRequestWithContext(ctx, "OPTIONS", ur, nil) + if err != nil { + return nil, err + } + + res, err := hc.Do(req) + if err != nil { + return nil, err + } + defer res.Body.Close() + + if res.StatusCode != http.StatusOK && res.StatusCode != http.StatusNoContent { + return nil, fmt.Errorf("bad status code: %v", res.StatusCode) + } + + return LinkHeaderUnmarshal(res.Header["Link"]) +} diff --git a/internal/whip/ice_fragment.go b/internal/whip/ice_fragment.go new file mode 100644 index 00000000..2c615ff7 --- /dev/null +++ b/internal/whip/ice_fragment.go @@ -0,0 +1,83 @@ +package whip + +import ( + "fmt" + "strconv" + + "github.com/pion/sdp/v3" + "github.com/pion/webrtc/v3" +) + +// ICEFragmentUnmarshal decodes an ICE fragment. +func ICEFragmentUnmarshal(buf []byte) ([]*webrtc.ICECandidateInit, error) { + buf = append([]byte("v=0\r\no=- 0 0 IN IP4 0.0.0.0\r\ns=-\r\nt=0 0\r\n"), buf...) + + var sdp sdp.SessionDescription + err := sdp.Unmarshal(buf) + if err != nil { + return nil, err + } + + var ret []*webrtc.ICECandidateInit + + for _, media := range sdp.MediaDescriptions { + mid, ok := media.Attribute("mid") + if !ok { + return nil, fmt.Errorf("mid attribute is missing") + } + + tmp, err := strconv.ParseUint(mid, 10, 16) + if err != nil { + return nil, fmt.Errorf("invalid mid attribute") + } + midNum := uint16(tmp) + + for _, attr := range media.Attributes { + if attr.Key == "candidate" { + ret = append(ret, &webrtc.ICECandidateInit{ + Candidate: attr.Value, + SDPMid: &mid, + SDPMLineIndex: &midNum, + }) + } + } + } + + return ret, nil +} + +// ICEFragmentMarshal encodes an ICE fragment. +func ICEFragmentMarshal(offer string, candidates []*webrtc.ICECandidateInit) ([]byte, error) { + var sdp sdp.SessionDescription + err := sdp.Unmarshal([]byte(offer)) + if err != nil || len(sdp.MediaDescriptions) == 0 { + return nil, err + } + + firstMedia := sdp.MediaDescriptions[0] + iceUfrag, _ := firstMedia.Attribute("ice-ufrag") + icePwd, _ := firstMedia.Attribute("ice-pwd") + + candidatesByMedia := make(map[uint16][]*webrtc.ICECandidateInit) + for _, candidate := range candidates { + mid := *candidate.SDPMLineIndex + candidatesByMedia[mid] = append(candidatesByMedia[mid], candidate) + } + + frag := "a=ice-ufrag:" + iceUfrag + "\r\n" + + "a=ice-pwd:" + icePwd + "\r\n" + + for mid, media := range sdp.MediaDescriptions { + cbm, ok := candidatesByMedia[uint16(mid)] + if ok { + frag += "m=" + media.MediaName.String() + "\r\n" + + "a=mid:" + strconv.FormatUint(uint64(mid), 10) + "\r\n" + + for _, candidate := range cbm { + frag += "a=candidate:" + candidate.Candidate + "\r\n" + } + } + } + + return []byte(frag), nil +} diff --git a/internal/whip/ice_fragment_test.go b/internal/whip/ice_fragment_test.go new file mode 100644 index 00000000..7042d536 --- /dev/null +++ b/internal/whip/ice_fragment_test.go @@ -0,0 +1,208 @@ +package whip + +import ( + "testing" + + "github.com/pion/webrtc/v3" + "github.com/stretchr/testify/require" +) + +func stringPtr(v string) *string { + return &v +} + +func uint16Ptr(v uint16) *uint16 { + return &v +} + +var iceFragmentCases = []struct { + name string + offer string + candidates []*webrtc.ICECandidateInit + enc string +}{ + { + "a", + "v=0\n" + + "o=- 8429658789122714282 1690995382 IN IP4 0.0.0.0\n" + + "s=-\n" + + "t=0 0\n" + + "a=fingerprint:sha-256 EA:05:9D:04:8F:56:41:92:3E:D5:2B:55:03:" + + "1B:5A:2C:3D:D8:B3:FB:1B:D9:F7:1F:DA:77:0E:B9:E0:3D:B6:FF\n" + + "a=extmap-allow-mixed\n" + + "a=group:BUNDLE 0\n" + + "m=video 9 UDP/TLS/RTP/SAVPF 96 97 98 99 100 101 102 121 127 120 125 107 108 109 123 118 45 46 116\n" + + "c=IN IP4 0.0.0.0\n" + + "a=setup:actpass\n" + + "a=mid:0\n" + + "a=ice-ufrag:tUQMzoQAVLzlvBys\n" + + "a=ice-pwd:pimyGfJcjjRwvUjnmGOODSjtIxyDljQj\n" + + "a=rtcp-mux\n" + + "a=rtcp-rsize\n" + + "a=rtpmap:96 VP8/90000\n" + + "a=rtcp-fb:96 goog-remb \n" + + "a=rtcp-fb:96 ccm fir\n" + + "a=rtcp-fb:96 nack \n" + + "a=rtcp-fb:96 nack pli\n" + + "a=rtcp-fb:96 nack \n" + + "a=rtcp-fb:96 nack pli\n" + + "a=rtcp-fb:96 transport-cc \n" + + "a=rtpmap:97 rtx/90000\n" + + "a=fmtp:97 apt=96\n" + + "a=rtcp-fb:97 nack \n" + + "a=rtcp-fb:97 nack pli\n" + + "a=rtcp-fb:97 transport-cc \n" + + "a=rtpmap:98 VP9/90000\n" + + "a=fmtp:98 profile-id=0\n" + + "a=rtcp-fb:98 goog-remb \n" + + "a=rtcp-fb:98 ccm fir\n" + + "a=rtcp-fb:98 nack \n" + + "a=rtcp-fb:98 nack pli\n" + + "a=rtcp-fb:98 nack \n" + + "a=rtcp-fb:98 nack pli\n" + + "a=rtcp-fb:98 transport-cc \n" + + "a=rtpmap:99 rtx/90000\n" + + "a=fmtp:99 apt=98\n" + + "a=rtcp-fb:99 nack \n" + + "a=rtcp-fb:99 nack pli\n" + + "a=rtcp-fb:99 transport-cc \n" + + "a=rtpmap:100 VP9/90000\n" + + "a=fmtp:100 profile-id=1\n" + + "a=rtcp-fb:100 goog-remb \n" + + "a=rtcp-fb:100 ccm fir\n" + + "a=rtcp-fb:100 nack \n" + + "a=rtcp-fb:100 nack pli\n" + + "a=rtcp-fb:100 nack \n" + + "a=rtcp-fb:100 nack pli\n" + + "a=rtcp-fb:100 transport-cc \n" + + "a=rtpmap:101 rtx/90000\n" + + "a=fmtp:101 apt=100\n" + + "a=rtcp-fb:101 nack \n" + + "a=rtcp-fb:101 nack pli\n" + + "a=rtcp-fb:101 transport-cc \n" + + "a=rtpmap:102 H264/90000\n" + + "a=fmtp:102 level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=42001f\n" + + "a=rtcp-fb:102 goog-remb \n" + + "a=rtcp-fb:102 ccm fir\n" + + "a=rtcp-fb:102 nack \n" + + "a=rtcp-fb:102 nack pli\n" + + "a=rtcp-fb:102 nack \n" + + "a=rtcp-fb:102 nack pli\n" + + "a=rtcp-fb:102 transport-cc \n" + + "a=rtpmap:121 rtx/90000\n" + + "a=fmtp:121 apt=102\n" + + "a=rtcp-fb:121 nack \n" + + "a=rtcp-fb:121 nack pli\n" + + "a=rtcp-fb:121 transport-cc \n" + + "a=rtpmap:127 H264/90000\n" + + "a=fmtp:127 level-asymmetry-allowed=1;packetization-mode=0;profile-level-id=42001f\n" + + "a=rtcp-fb:127 goog-remb \n" + + "a=rtcp-fb:127 ccm fir\n" + + "a=rtcp-fb:127 nack \n" + + "a=rtcp-fb:127 nack pli\n" + + "a=rtcp-fb:127 nack \n" + + "a=rtcp-fb:127 nack pli\n" + + "a=rtcp-fb:127 transport-cc \n" + + "a=rtpmap:120 rtx/90000\n" + + "a=fmtp:120 apt=127\n" + + "a=rtcp-fb:120 nack \n" + + "a=rtcp-fb:120 nack pli\n" + + "a=rtcp-fb:120 transport-cc \n" + + "a=rtpmap:125 H264/90000\n" + + "a=fmtp:125 level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=42e01f\n" + + "a=rtcp-fb:125 goog-remb \n" + + "a=rtcp-fb:125 ccm fir\n" + + "a=rtcp-fb:125 nack \n" + + "a=rtcp-fb:125 nack pli\n" + + "a=rtcp-fb:125 nack \n" + + "a=rtcp-fb:125 nack pli\n" + + "a=rtcp-fb:125 transport-cc \n" + + "a=rtpmap:107 rtx/90000\n" + + "a=fmtp:107 apt=125\n" + + "a=rtcp-fb:107 nack \n" + + "a=rtcp-fb:107 nack pli\n" + + "a=rtcp-fb:107 transport-cc \n" + + "a=rtpmap:108 H264/90000\n" + + "a=fmtp:108 level-asymmetry-allowed=1;packetization-mode=0;profile-level-id=42e01f\n" + + "a=rtcp-fb:108 goog-remb \n" + + "a=rtcp-fb:108 ccm fir\n" + + "a=rtcp-fb:108 nack \n" + + "a=rtcp-fb:108 nack pli\n" + + "a=rtcp-fb:108 nack \n" + + "a=rtcp-fb:108 nack pli\n" + + "a=rtcp-fb:108 transport-cc \n" + + "a=rtpmap:109 rtx/90000\n" + + "a=fmtp:109 apt=108\n" + + "a=rtcp-fb:109 nack \n" + + "a=rtcp-fb:109 nack pli\n" + + "a=rtcp-fb:109 transport-cc \n" + + "a=rtpmap:123 H264/90000\n" + + "a=fmtp:123 level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=640032\n" + + "a=rtcp-fb:123 goog-remb \n" + + "a=rtcp-fb:123 ccm fir\n" + + "a=rtcp-fb:123 nack \n" + + "a=rtcp-fb:123 nack pli\n" + + "a=rtcp-fb:123 nack \n" + + "a=rtcp-fb:123 nack pli\n" + + "a=rtcp-fb:123 transport-cc \n" + + "a=rtpmap:118 rtx/90000\n" + + "a=fmtp:118 apt=123\n" + + "a=rtcp-fb:118 nack \n" + + "a=rtcp-fb:118 nack pli\n" + + "a=rtcp-fb:118 transport-cc \n" + + "a=rtpmap:45 AV1/90000\n" + + "a=rtcp-fb:45 goog-remb \n" + + "a=rtcp-fb:45 ccm fir\n" + + "a=rtcp-fb:45 nack \n" + + "a=rtcp-fb:45 nack pli\n" + + "a=rtcp-fb:45 nack \n" + + "a=rtcp-fb:45 nack pli\n" + + "a=rtcp-fb:45 transport-cc \n" + + "a=rtpmap:46 rtx/90000\n" + + "a=fmtp:46 apt=45\n" + + "a=rtcp-fb:46 nack \n" + + "a=rtcp-fb:46 nack pli\n" + + "a=rtcp-fb:46 transport-cc \n" + + "a=rtpmap:116 ulpfec/90000\n" + + "a=rtcp-fb:116 nack \n" + + "a=rtcp-fb:116 nack pli\n" + + "a=rtcp-fb:116 transport-cc \n" + + "a=extmap:1 http://www.ietf.org/id/draft-holmer-rmcat-transport-wide-cc-extensions-01\n" + + "a=ssrc:3421396091 cname:BmFVQDtOlcBwXZCl\n" + + "a=ssrc:3421396091 msid:BmFVQDtOlcBwXZCl CLgunVCazXXKLyEx\n" + + "a=ssrc:3421396091 mslabel:BmFVQDtOlcBwXZCl\n" + + "a=ssrc:3421396091 label:CLgunVCazXXKLyEx\n" + + "a=msid:BmFVQDtOlcBwXZCl CLgunVCazXXKLyEx\n" + + "a=sendrecv\n", + []*webrtc.ICECandidateInit{{ + Candidate: "3628911098 1 udp 2130706431 192.168.3.218 49462 typ host", + SDPMid: stringPtr("0"), + SDPMLineIndex: uint16Ptr(0), + }}, + "a=ice-ufrag:tUQMzoQAVLzlvBys\r\n" + + "a=ice-pwd:pimyGfJcjjRwvUjnmGOODSjtIxyDljQj\r\n" + + "m=video 9 UDP/TLS/RTP/SAVPF 96 97 98 99 100 101 102 121 127 120 125 107 108 109 123 118 45 46 116\r\n" + + "a=mid:0\r\n" + + "a=candidate:3628911098 1 udp 2130706431 192.168.3.218 49462 typ host\r\n", + }, +} + +func TestICEFragmentUnmarshal(t *testing.T) { + for _, ca := range iceFragmentCases { + t.Run(ca.name, func(t *testing.T) { + candidates, err := ICEFragmentUnmarshal([]byte(ca.enc)) + require.NoError(t, err) + require.Equal(t, ca.candidates, candidates) + }) + } +} + +func TestICEFragmentMarshal(t *testing.T) { + for _, ca := range iceFragmentCases { + t.Run(ca.name, func(t *testing.T) { + byts, err := ICEFragmentMarshal(ca.offer, ca.candidates) + require.NoError(t, err) + require.Equal(t, ca.enc, string(byts)) + }) + } +} diff --git a/internal/whip/link_header.go b/internal/whip/link_header.go new file mode 100644 index 00000000..8828c9af --- /dev/null +++ b/internal/whip/link_header.go @@ -0,0 +1,66 @@ +package whip + +import ( + "encoding/json" + "fmt" + "regexp" + + "github.com/pion/webrtc/v3" +) + +func quoteCredential(v string) string { + b, _ := json.Marshal(v) + s := string(b) + return s[1 : len(s)-1] +} + +func unquoteCredential(v string) string { + var s string + json.Unmarshal([]byte("\""+v+"\""), &s) + return s +} + +// LinkHeaderMarshal encodes a link header. +func LinkHeaderMarshal(iceServers []webrtc.ICEServer) []string { + ret := make([]string, len(iceServers)) + + for i, server := range iceServers { + link := "<" + server.URLs[0] + ">; rel=\"ice-server\"" + if server.Username != "" { + link += "; username=\"" + quoteCredential(server.Username) + "\"" + + "; credential=\"" + quoteCredential(server.Credential.(string)) + "\"; credential-type=\"password\"" + } + ret[i] = link + } + + return ret +} + +var reLink = regexp.MustCompile(`^<(.+?)>; rel="ice-server"(; username="(.+?)"` + + `; credential="(.+?)"; credential-type="password")?`) + +// LinkHeaderUnmarshal decodes a link header. +func LinkHeaderUnmarshal(link []string) ([]webrtc.ICEServer, error) { + ret := make([]webrtc.ICEServer, len(link)) + + for i, li := range link { + m := reLink.FindStringSubmatch(li) + if m == nil { + return nil, fmt.Errorf("invalid link header: '%s'", li) + } + + s := webrtc.ICEServer{ + URLs: []string{m[1]}, + } + + if m[3] != "" { + s.Username = unquoteCredential(m[3]) + s.Credential = unquoteCredential(m[4]) + s.CredentialType = webrtc.ICECredentialTypePassword + } + + ret[i] = s + } + + return ret, nil +} diff --git a/internal/whip/link_header_test.go b/internal/whip/link_header_test.go new file mode 100644 index 00000000..80bcdc35 --- /dev/null +++ b/internal/whip/link_header_test.go @@ -0,0 +1,52 @@ +package whip + +import ( + "testing" + + "github.com/pion/webrtc/v3" + "github.com/stretchr/testify/require" +) + +var linkHeaderCases = []struct { + name string + enc []string + dec []webrtc.ICEServer +}{ + { + "a", + []string{ + `; rel="ice-server"`, + `; rel="ice-server"; username="myuser\"a?2;B"; ` + + `credential="mypwd"; credential-type="password"`, + }, + []webrtc.ICEServer{ + { + URLs: []string{"stun:stun.l.google.com:19302"}, + }, + { + URLs: []string{"turns:turn.example.com"}, + Username: "myuser\"a?2;B", + Credential: "mypwd", + }, + }, + }, +} + +func TestLinkHeaderUnmarshal(t *testing.T) { + for _, ca := range linkHeaderCases { + t.Run(ca.name, func(t *testing.T) { + dec, err := LinkHeaderUnmarshal(ca.enc) + require.NoError(t, err) + require.Equal(t, ca.dec, dec) + }) + } +} + +func TestLinkHeaderMarshal(t *testing.T) { + for _, ca := range linkHeaderCases { + t.Run(ca.name, func(t *testing.T) { + enc := LinkHeaderMarshal(ca.dec) + require.Equal(t, ca.enc, enc) + }) + } +} diff --git a/internal/whip/post_candidate.go b/internal/whip/post_candidate.go new file mode 100644 index 00000000..22b54226 --- /dev/null +++ b/internal/whip/post_candidate.go @@ -0,0 +1,46 @@ +// Package whip contains WebRTC / WHIP utilities. +package whip + +import ( + "bytes" + "context" + "fmt" + "net/http" + + "github.com/pion/webrtc/v3" +) + +// PostCandidate posts a WHIP/WHEP candidate. +func PostCandidate( + ctx context.Context, + hc *http.Client, + ur string, + offer *webrtc.SessionDescription, + etag string, + candidate *webrtc.ICECandidateInit, +) error { + frag, err := ICEFragmentMarshal(offer.SDP, []*webrtc.ICECandidateInit{candidate}) + if err != nil { + return err + } + + req, err := http.NewRequestWithContext(ctx, "PATCH", ur, bytes.NewReader(frag)) + if err != nil { + return err + } + + req.Header.Set("Content-Type", "application/trickle-ice-sdpfrag") + req.Header.Set("If-Match", etag) + + res, err := hc.Do(req) + if err != nil { + return err + } + defer res.Body.Close() + + if res.StatusCode != http.StatusNoContent { + return fmt.Errorf("bad status code: %v", res.StatusCode) + } + + return nil +} diff --git a/internal/whip/post_offer.go b/internal/whip/post_offer.go new file mode 100644 index 00000000..2b21c4d3 --- /dev/null +++ b/internal/whip/post_offer.go @@ -0,0 +1,76 @@ +package whip + +import ( + "bytes" + "context" + "fmt" + "io" + "net/http" + + "github.com/pion/webrtc/v3" +) + +// PostOfferResponse is the response to a post offer. +type PostOfferResponse struct { + Answer *webrtc.SessionDescription + Location string + ETag string +} + +// PostOffer posts a WHIP/WHEP offer. +func PostOffer( + ctx context.Context, + hc *http.Client, + ur string, + offer *webrtc.SessionDescription, +) (*PostOfferResponse, error) { + req, err := http.NewRequestWithContext(ctx, "POST", ur, bytes.NewReader([]byte(offer.SDP))) + if err != nil { + return nil, err + } + + req.Header.Set("Content-Type", "application/sdp") + + res, err := hc.Do(req) + if err != nil { + return nil, err + } + defer res.Body.Close() + + if res.StatusCode != http.StatusCreated { + return nil, fmt.Errorf("bad status code: %v", res.StatusCode) + } + + contentType := res.Header.Get("Content-Type") + if contentType != "application/sdp" { + return nil, fmt.Errorf("bad Content-Type: expected 'application/sdp', got '%s'", contentType) + } + + acceptPatch := res.Header.Get("Accept-Patch") + if acceptPatch != "application/trickle-ice-sdpfrag" { + return nil, fmt.Errorf("wrong Accept-Patch: expected 'application/trickle-ice-sdpfrag', got '%s'", acceptPatch) + } + + Location := res.Header.Get("Location") + + etag := res.Header.Get("E-Tag") + if etag == "" { + return nil, fmt.Errorf("E-Tag is missing") + } + + sdp, err := io.ReadAll(res.Body) + if err != nil { + return nil, err + } + + answer := &webrtc.SessionDescription{ + Type: webrtc.SDPTypeAnswer, + SDP: string(sdp), + } + + return &PostOfferResponse{ + Answer: answer, + Location: Location, + ETag: etag, + }, nil +} diff --git a/mediamtx.yml b/mediamtx.yml index 575598a2..f29d2019 100644 --- a/mediamtx.yml +++ b/mediamtx.yml @@ -255,6 +255,8 @@ paths: # * https://existing-url/stream.m3u8 -> the stream is pulled from another HLS server with HTTPS # * udp://ip:port -> the stream is pulled with UDP, by listening on the specified IP and port # * srt://existing-url -> the stream is pulled from another SRT server + # * whep://existing-url -> the stream is pulled from another WebRTC server + # * wheps://existing-url -> the stream is pulled from another WebRTC server with HTTPS # * redirect -> the stream is provided by another path or server # * rpiCamera -> the stream is provided by a Raspberry Pi Camera source: publisher