diff --git a/README.md b/README.md index ffbd9b7d..eeae947a 100644 --- a/README.md +++ b/README.md @@ -4,7 +4,7 @@
-_MediaMTX_/ [_rtsp-simple-server_](#note-about-rtsp-simple-server) is a ready-to-use and zero-dependency server and proxy that allows users to publish, read and proxy live video and audio streams. +_MediaMTX_ / [_rtsp-simple-server_](#note-about-rtsp-simple-server) is a ready-to-use and zero-dependency server and proxy that allows users to publish, read and proxy live video and audio streams. Live streams can be published to the server with: diff --git a/go.mod b/go.mod index 884bbbdb..b44c6236 100644 --- a/go.mod +++ b/go.mod @@ -8,7 +8,7 @@ require ( github.com/alecthomas/kong v0.7.1 github.com/asticode/go-astits v1.11.0 github.com/bluenviron/gohlslib v0.2.4 - github.com/bluenviron/gortsplib/v3 v3.6.1 + github.com/bluenviron/gortsplib/v3 v3.6.2 github.com/bluenviron/mediacommon v0.5.0 github.com/fsnotify/fsnotify v1.6.0 github.com/gin-gonic/gin v1.9.0 diff --git a/go.sum b/go.sum index cf22f1ed..63c25c84 100644 --- a/go.sum +++ b/go.sum @@ -12,8 +12,8 @@ github.com/asticode/go-astits v1.11.0 h1:GTHUXht0ZXAJXsVbsLIcyfHr1Bchi4QQwMARw2Z github.com/asticode/go-astits v1.11.0/go.mod h1:QSHmknZ51pf6KJdHKZHJTLlMegIrhega3LPWz3ND/iI= github.com/bluenviron/gohlslib v0.2.4 h1:I0J6b9jVSYWL8paaC/h/r8Fyrq6SQSQj2DGnhHSi4K4= github.com/bluenviron/gohlslib v0.2.4/go.mod h1:PghpVEkRW003EqzSny2mUhl/5w7R0w9aAYgTIRCoaTg= -github.com/bluenviron/gortsplib/v3 v3.6.1 h1:+/kPiwmdRwUasU5thOBATJQ4/yD+vrIEutJyRTB/f+0= -github.com/bluenviron/gortsplib/v3 v3.6.1/go.mod h1:gc6Z8pBUMC9QBqYxcOY9eVxjDPOrmFcwVH61Xs3Gu2A= +github.com/bluenviron/gortsplib/v3 v3.6.2 h1:rbMcohs6h3HwToJAyYcvUJm/m3PzWK5bYkNLZPVkjF8= +github.com/bluenviron/gortsplib/v3 v3.6.2/go.mod h1:U9oODQzfy3xHaWUScGwcGUHxAQWr4b7FpAo7qzu+REE= github.com/bluenviron/mediacommon v0.5.0 h1:YsVFlEknaXWhZGfz+Y1QbuzXLMVSmHODc7OnRqZoITY= github.com/bluenviron/mediacommon v0.5.0/go.mod h1:t0dqPsWUTchyvib0MhixIwXEgvDX4V9G+I0GzWLQRb8= github.com/bytedance/sonic v1.5.0/go.mod h1:ED5hyg4y6t3/9Ku1R6dU/4KyJ48DZ4jPhfY1O2AihPM= diff --git a/internal/core/webrtc_http_server.go b/internal/core/webrtc_http_server.go index 9862717d..7bfff25d 100644 --- a/internal/core/webrtc_http_server.go +++ b/internal/core/webrtc_http_server.go @@ -109,7 +109,7 @@ func marshalICEFragment(offer *webrtc.SessionDescription, candidates []*webrtc.I type webRTCHTTPServerParent interface { logger.Writer genICEServers() []webrtc.ICEServer - sessionNew(req webRTCSessionNewReq) webRTCNewSessionRes + sessionNew(req webRTCSessionNewReq) webRTCSessionNewRes sessionAddCandidates(req webRTCSessionAddCandidatesReq) webRTCSessionAddCandidatesRes } @@ -315,7 +315,9 @@ func (s *webRTCHTTPServer) onRequest(ctx *gin.Context) { videoBitrate: ctx.Query("video_bitrate"), }) if res.err != nil { - ctx.Writer.WriteHeader(http.StatusInternalServerError) + if res.errStatusCode != 0 { + ctx.Writer.WriteHeader(res.errStatusCode) + } return } diff --git a/internal/core/webrtc_manager.go b/internal/core/webrtc_manager.go index 6abe301a..b3d3a02a 100644 --- a/internal/core/webrtc_manager.go +++ b/internal/core/webrtc_manager.go @@ -8,6 +8,7 @@ import ( "fmt" "math/rand" "net" + "net/http" "regexp" "strconv" "strings" @@ -96,10 +97,11 @@ type webRTCManagerAPISessionsKickReq struct { res chan webRTCManagerAPISessionsKickRes } -type webRTCNewSessionRes struct { - sx *webRTCSession - answer []byte - err error +type webRTCSessionNewRes struct { + sx *webRTCSession + answer []byte + err error + errStatusCode int } type webRTCSessionNewReq struct { @@ -110,7 +112,7 @@ type webRTCSessionNewReq struct { videoCodec string audioCodec string videoBitrate string - res chan webRTCNewSessionRes + res chan webRTCSessionNewRes } type webRTCSessionAddCandidatesRes struct { @@ -289,7 +291,7 @@ outer: ) m.sessions[sx] = struct{}{} m.sessionsBySecret[sx.secret] = sx - req.res <- webRTCNewSessionRes{sx: sx} + req.res <- webRTCSessionNewRes{sx: sx} case sx := <-m.chSessionClose: delete(m.sessions, sx) @@ -429,8 +431,8 @@ func (m *webRTCManager) genICEServers() []webrtc.ICEServer { } // sessionNew is called by webRTCHTTPServer. -func (m *webRTCManager) sessionNew(req webRTCSessionNewReq) webRTCNewSessionRes { - req.res = make(chan webRTCNewSessionRes) +func (m *webRTCManager) sessionNew(req webRTCSessionNewReq) webRTCSessionNewRes { + req.res = make(chan webRTCSessionNewRes) select { case m.chSessionNew <- req: @@ -441,11 +443,11 @@ func (m *webRTCManager) sessionNew(req webRTCSessionNewReq) webRTCNewSessionRes return res2 case <-res1.sx.ctx.Done(): - return webRTCNewSessionRes{err: fmt.Errorf("terminated")} + return webRTCSessionNewRes{err: fmt.Errorf("terminated"), errStatusCode: http.StatusInternalServerError} } case <-m.ctx.Done(): - return webRTCNewSessionRes{err: fmt.Errorf("terminated")} + return webRTCSessionNewRes{err: fmt.Errorf("terminated"), errStatusCode: http.StatusInternalServerError} } } diff --git a/internal/core/webrtc_manager_test.go b/internal/core/webrtc_manager_test.go index 9339f811..8b7b88c2 100644 --- a/internal/core/webrtc_manager_test.go +++ b/internal/core/webrtc_manager_test.go @@ -295,6 +295,41 @@ func TestWebRTCRead(t *testing.T) { }, pkt) } +func TestWebRTCReadNotFound(t *testing.T) { + p, ok := newInstance("paths:\n" + + " all:\n") + require.Equal(t, true, ok) + defer p.Close() + + iceServers := whipGetICEServers(t, "http://localhost:8889/stream/whep") + + pc, err := webrtc.NewPeerConnection(webrtc.Configuration{ + ICEServers: iceServers, + }) + require.NoError(t, err) + defer pc.Close() + + _, err = pc.AddTransceiverFromKind(webrtc.RTPCodecTypeVideo) + require.NoError(t, err) + + offer, err := pc.CreateOffer(nil) + require.NoError(t, err) + + enc, err := json.Marshal(offer) + require.NoError(t, err) + + req, err := http.NewRequest("POST", "http://localhost:8889/stream/whep", bytes.NewReader(enc)) + require.NoError(t, err) + + req.Header.Set("Content-Type", "application/sdp") + + res, err := http.DefaultClient.Do(req) + require.NoError(t, err) + defer res.Body.Close() + + require.Equal(t, http.StatusNotFound, res.StatusCode) +} + func TestWebRTCPublish(t *testing.T) { p, ok := newInstance("paths:\n" + " all:\n") diff --git a/internal/core/webrtc_session.go b/internal/core/webrtc_session.go index df3fd465..c736ee7f 100644 --- a/internal/core/webrtc_session.go +++ b/internal/core/webrtc_session.go @@ -5,7 +5,9 @@ import ( "encoding/hex" "encoding/json" "fmt" + "net/http" "strconv" + "strings" "sync" "time" @@ -155,9 +157,9 @@ type webRTCSession struct { created time.Time uuid uuid.UUID secret uuid.UUID - answerSent bool pcMutex sync.RWMutex pc *peerConnection + answerSent bool chAddRemoteCandidates chan webRTCSessionAddCandidatesReq } @@ -218,12 +220,13 @@ func (s *webRTCSession) safePC() *peerConnection { func (s *webRTCSession) run() { defer s.wg.Done() - err := s.runInner() + errStatusCode, err := s.runInner() if !s.answerSent { select { - case s.req.res <- webRTCNewSessionRes{ - err: err, + case s.req.res <- webRTCSessionNewRes{ + err: err, + errStatusCode: errStatusCode, }: case <-s.ctx.Done(): } @@ -234,28 +237,28 @@ func (s *webRTCSession) run() { s.Log(logger.Info, "closed (%v)", err) } -func (s *webRTCSession) runInner() error { +func (s *webRTCSession) runInner() (int, error) { if s.req.publish { return s.runPublish() } return s.runRead() } -func (s *webRTCSession) runPublish() error { +func (s *webRTCSession) runPublish() (int, error) { res := s.pathManager.publisherAdd(pathPublisherAddReq{ author: s, pathName: s.req.pathName, skipAuth: true, }) if res.err != nil { - return res.err + return http.StatusInternalServerError, res.err } defer res.path.publisherRemove(pathPublisherRemoveReq{author: s}) offer, err := s.decodeOffer() if err != nil { - return err + return http.StatusBadRequest, err } pc, err := newPeerConnection( @@ -267,7 +270,7 @@ func (s *webRTCSession) runPublish() error { s.iceTCPMux, s) if err != nil { - return err + return http.StatusBadRequest, err } defer pc.close() @@ -275,14 +278,14 @@ func (s *webRTCSession) runPublish() error { Direction: webrtc.RTPTransceiverDirectionRecvonly, }) if err != nil { - return err + return http.StatusBadRequest, err } _, err = pc.AddTransceiverFromKind(webrtc.RTPCodecTypeAudio, webrtc.RtpTransceiverInit{ Direction: webrtc.RTPTransceiverDirectionRecvonly, }) if err != nil { - return err + return http.StatusBadRequest, err } trackRecv := make(chan trackRecvPair) @@ -296,23 +299,23 @@ func (s *webRTCSession) runPublish() error { err = pc.SetRemoteDescription(*offer) if err != nil { - return err + return http.StatusBadRequest, err } answer, err := pc.CreateAnswer(nil) if err != nil { - return err + return http.StatusBadRequest, err } err = pc.SetLocalDescription(answer) if err != nil { - return err + return http.StatusBadRequest, err } if s.req.videoBitrate != "" { tmp, err := strconv.ParseUint(s.req.videoBitrate, 10, 31) if err != nil { - return err + return http.StatusBadRequest, err } insertTias(&answer, tmp*1024) @@ -320,24 +323,24 @@ func (s *webRTCSession) runPublish() error { err = s.waitGatheringDone(pc) if err != nil { - return err + return http.StatusBadRequest, err } err = s.writeAnswer(pc.LocalDescription()) if err != nil { - return err + return http.StatusBadRequest, err } go s.readRemoteCandidates(pc) err = s.waitUntilConnected(pc) if err != nil { - return err + return 0, err } tracks, err := gatherIncomingTracks(s.ctx, pc, trackRecv) if err != nil { - return err + return 0, err } medias := mediasOfIncomingTracks(tracks) @@ -347,7 +350,7 @@ func (s *webRTCSession) runPublish() error { generateRTPPackets: false, }) if rres.err != nil { - return rres.err + return 0, rres.err } s.Log(logger.Info, "is publishing to path '%s', %s", @@ -360,33 +363,36 @@ func (s *webRTCSession) runPublish() error { select { case <-pc.disconnected: - return fmt.Errorf("peer connection closed") + return 0, fmt.Errorf("peer connection closed") case <-s.ctx.Done(): - return fmt.Errorf("terminated") + return 0, fmt.Errorf("terminated") } } -func (s *webRTCSession) runRead() error { +func (s *webRTCSession) runRead() (int, error) { res := s.pathManager.readerAdd(pathReaderAddReq{ author: s, pathName: s.req.pathName, skipAuth: true, }) if res.err != nil { - return res.err + if strings.HasPrefix(res.err.Error(), "no one is publishing") { + return http.StatusNotFound, res.err + } + return http.StatusInternalServerError, res.err } defer res.path.readerRemove(pathReaderRemoveReq{author: s}) tracks, err := gatherOutgoingTracks(res.stream.medias()) if err != nil { - return err + return http.StatusBadRequest, err } offer, err := s.decodeOffer() if err != nil { - return err + return http.StatusBadRequest, err } pc, err := newPeerConnection( @@ -398,7 +404,7 @@ func (s *webRTCSession) runRead() error { s.iceTCPMux, s) if err != nil { - return err + return http.StatusBadRequest, err } defer pc.close() @@ -406,40 +412,40 @@ func (s *webRTCSession) runRead() error { var err error track.sender, err = pc.AddTrack(track.track) if err != nil { - return err + return http.StatusBadRequest, err } } err = pc.SetRemoteDescription(*offer) if err != nil { - return err + return http.StatusBadRequest, err } answer, err := pc.CreateAnswer(nil) if err != nil { - return err + return http.StatusBadRequest, err } err = pc.SetLocalDescription(answer) if err != nil { - return err + return http.StatusBadRequest, err } err = s.waitGatheringDone(pc) if err != nil { - return err + return http.StatusBadRequest, err } err = s.writeAnswer(pc.LocalDescription()) if err != nil { - return err + return http.StatusBadRequest, err } go s.readRemoteCandidates(pc) err = s.waitUntilConnected(pc) if err != nil { - return err + return 0, err } ringBuffer, _ := ringbuffer.New(uint64(s.readBufferCount)) @@ -468,13 +474,13 @@ func (s *webRTCSession) runRead() error { select { case <-pc.disconnected: - return fmt.Errorf("peer connection closed") + return 0, fmt.Errorf("peer connection closed") case err := <-writeError: - return err + return 0, err case <-s.ctx.Done(): - return fmt.Errorf("terminated") + return 0, fmt.Errorf("terminated") } } @@ -511,7 +517,7 @@ func (s *webRTCSession) writeAnswer(answer *webrtc.SessionDescription) error { } select { - case s.req.res <- webRTCNewSessionRes{ + case s.req.res <- webRTCSessionNewRes{ sx: s, answer: enc, }: