From 3de05c13300a04029ab340b1acea2d65a149a381 Mon Sep 17 00:00:00 2001 From: Alessandro Ros Date: Sun, 7 Dec 2025 10:37:55 +0100 Subject: [PATCH] api: always reply with JSON in case of success or failure (#5252) Reply with "status": "ok" in case of success, and with "status": "error" in case of error. This makes the API more accessible and user friendly. --- api/openapi.yaml | 66 +++ internal/api/api.go | 21 +- internal/api/api_config_global.go | 4 +- internal/api/api_config_global_test.go | 2 +- internal/api/api_config_pathdefaults.go | 4 +- internal/api/api_config_pathdefaults_test.go | 2 +- internal/api/api_config_paths.go | 10 +- internal/api/api_config_paths_test.go | 2 +- internal/api/api_hls.go | 2 +- internal/api/api_hls_test.go | 112 ++++ internal/api/api_paths.go | 2 +- internal/api/api_paths_test.go | 136 +++++ internal/api/api_recordings.go | 4 +- internal/api/api_recordings_test.go | 10 +- internal/api/api_rtmp.go | 6 +- internal/api/api_rtmp_test.go | 261 ++++++++++ internal/api/api_rtsp.go | 6 +- internal/api/api_rtsp_test.go | 512 +++++++++++++++++++ internal/api/api_srt.go | 4 +- internal/api/api_srt_test.go | 256 ++++++++++ internal/api/api_test.go | 27 +- internal/api/api_webrtc.go | 4 +- internal/api/api_webrtc_test.go | 224 ++++++++ internal/api/paginate.go | 2 +- internal/api/paginate_test.go | 2 +- internal/core/api_test.go | 2 +- internal/defs/api.go | 8 +- internal/metrics/metrics.go | 10 +- internal/playback/server.go | 11 +- internal/pprof/pprof.go | 11 +- internal/servers/hls/http_server.go | 10 +- internal/servers/webrtc/http_server.go | 26 +- 32 files changed, 1691 insertions(+), 68 deletions(-) create mode 100644 internal/api/api_hls_test.go create mode 100644 internal/api/api_paths_test.go create mode 100644 internal/api/api_rtmp_test.go create mode 100644 internal/api/api_rtsp_test.go create mode 100644 internal/api/api_srt_test.go create mode 100644 internal/api/api_webrtc_test.go diff --git a/api/openapi.yaml b/api/openapi.yaml index eb3da4d2..19835de9 100644 --- a/api/openapi.yaml +++ b/api/openapi.yaml @@ -15,9 +15,19 @@ security: [] components: schemas: + OK: + type: object + properties: + status: + type: string + enum: [ok] + Error: type: object properties: + status: + type: string + enum: [error] error: type: string @@ -1170,6 +1180,10 @@ paths: responses: '200': description: the request was successful. + content: + application/json: + schema: + $ref: '#/components/schemas/OK' '500': description: server error. content: @@ -1218,6 +1232,10 @@ paths: responses: '200': description: the request was successful. + content: + application/json: + schema: + $ref: '#/components/schemas/OK' '400': description: invalid request. content: @@ -1272,6 +1290,10 @@ paths: responses: '200': description: the request was successful. + content: + application/json: + schema: + $ref: '#/components/schemas/OK' '400': description: invalid request. content: @@ -1385,6 +1407,10 @@ paths: responses: '200': description: the request was successful. + content: + application/json: + schema: + $ref: '#/components/schemas/OK' '400': description: invalid request. content: @@ -1420,6 +1446,10 @@ paths: responses: '200': description: the request was successful. + content: + application/json: + schema: + $ref: '#/components/schemas/OK' '400': description: invalid request. content: @@ -1461,6 +1491,10 @@ paths: responses: '200': description: the request was successful. + content: + application/json: + schema: + $ref: '#/components/schemas/OK' '400': description: invalid request. content: @@ -1496,6 +1530,10 @@ paths: responses: '200': description: the request was successful. + content: + application/json: + schema: + $ref: '#/components/schemas/OK' '400': description: invalid request. content: @@ -1843,6 +1881,10 @@ paths: responses: '200': description: the request was successful. + content: + application/json: + schema: + $ref: '#/components/schemas/OK' '400': description: invalid request. content: @@ -2034,6 +2076,10 @@ paths: responses: '200': description: the request was successful. + content: + application/json: + schema: + $ref: '#/components/schemas/OK' '400': description: invalid request. content: @@ -2147,6 +2193,10 @@ paths: responses: '200': description: the request was successful. + content: + application/json: + schema: + $ref: '#/components/schemas/OK' '400': description: invalid request. content: @@ -2260,6 +2310,10 @@ paths: responses: '200': description: the request was successful. + content: + application/json: + schema: + $ref: '#/components/schemas/OK' '400': description: invalid request. content: @@ -2373,6 +2427,10 @@ paths: responses: '200': description: the request was successful. + content: + application/json: + schema: + $ref: '#/components/schemas/OK' '400': description: invalid request. content: @@ -2486,6 +2544,10 @@ paths: responses: '200': description: the request was successful. + content: + application/json: + schema: + $ref: '#/components/schemas/OK' '400': description: invalid request. content: @@ -2605,6 +2667,10 @@ paths: responses: '200': description: the request was successful. + content: + application/json: + schema: + $ref: '#/components/schemas/OK' '400': description: invalid request. content: diff --git a/internal/api/api.go b/internal/api/api.go index be889cd9..1a28c95c 100644 --- a/internal/api/api.go +++ b/internal/api/api.go @@ -1,5 +1,5 @@ // Package api contains the API server. -package api +package api //nolint:revive import ( "net" @@ -220,10 +220,15 @@ func (a *API) writeError(ctx *gin.Context, status int, err error) { // add error to response ctx.JSON(status, &defs.APIError{ - Error: err.Error(), + Status: "error", + Error: err.Error(), }) } +func (a *API) writeOK(ctx *gin.Context) { + ctx.JSON(http.StatusOK, &defs.APIOK{Status: "ok"}) +} + func (a *API) middlewarePreflightRequests(ctx *gin.Context) { if ctx.Request.Method == http.MethodOptions && ctx.Request.Header.Get("Access-Control-Request-Method") != "" { @@ -246,7 +251,10 @@ func (a *API) middlewareAuth(ctx *gin.Context) { if err != nil { if err.AskCredentials { ctx.Header("WWW-Authenticate", `Basic realm="mediamtx"`) - ctx.AbortWithStatus(http.StatusUnauthorized) + ctx.AbortWithStatusJSON(http.StatusUnauthorized, &defs.APIError{ + Status: "error", + Error: "authentication error", + }) return } @@ -255,7 +263,10 @@ func (a *API) middlewareAuth(ctx *gin.Context) { // wait some seconds to delay brute force attacks <-time.After(auth.PauseAfterError) - ctx.AbortWithStatus(http.StatusUnauthorized) + ctx.AbortWithStatusJSON(http.StatusUnauthorized, &defs.APIError{ + Status: "error", + Error: "authentication error", + }) return } } @@ -269,7 +280,7 @@ func (a *API) onInfo(ctx *gin.Context) { func (a *API) onAuthJwksRefresh(ctx *gin.Context) { a.AuthManager.RefreshJWTJWKS() - ctx.Status(http.StatusOK) + a.writeOK(ctx) } // ReloadConf is called by core. diff --git a/internal/api/api_config_global.go b/internal/api/api_config_global.go index 615bdc57..8d26ffb9 100644 --- a/internal/api/api_config_global.go +++ b/internal/api/api_config_global.go @@ -1,4 +1,4 @@ -package api +package api //nolint:revive import ( "net/http" @@ -43,5 +43,5 @@ func (a *API) onConfigGlobalPatch(ctx *gin.Context) { // call it in a goroutine go a.Parent.APIConfigSet(newConf) - ctx.Status(http.StatusOK) + a.writeOK(ctx) } diff --git a/internal/api/api_config_global_test.go b/internal/api/api_config_global_test.go index 915e5335..a6c5cbe1 100644 --- a/internal/api/api_config_global_test.go +++ b/internal/api/api_config_global_test.go @@ -1,4 +1,4 @@ -package api +package api //nolint:revive import ( "bytes" diff --git a/internal/api/api_config_pathdefaults.go b/internal/api/api_config_pathdefaults.go index c8ad6de4..f53597a9 100644 --- a/internal/api/api_config_pathdefaults.go +++ b/internal/api/api_config_pathdefaults.go @@ -1,4 +1,4 @@ -package api +package api //nolint:revive import ( "net/http" @@ -40,5 +40,5 @@ func (a *API) onConfigPathDefaultsPatch(ctx *gin.Context) { a.Conf = newConf a.Parent.APIConfigSet(newConf) - ctx.Status(http.StatusOK) + a.writeOK(ctx) } diff --git a/internal/api/api_config_pathdefaults_test.go b/internal/api/api_config_pathdefaults_test.go index 9a19e137..48798c2b 100644 --- a/internal/api/api_config_pathdefaults_test.go +++ b/internal/api/api_config_pathdefaults_test.go @@ -1,4 +1,4 @@ -package api +package api //nolint:revive import ( "net/http" diff --git a/internal/api/api_config_paths.go b/internal/api/api_config_paths.go index 409cd5f9..7aedd064 100644 --- a/internal/api/api_config_paths.go +++ b/internal/api/api_config_paths.go @@ -1,4 +1,4 @@ -package api +package api //nolint:revive import ( "errors" @@ -89,7 +89,7 @@ func (a *API) onConfigPathsAdd(ctx *gin.Context) { //nolint:dupl a.Conf = newConf a.Parent.APIConfigSet(newConf) - ctx.Status(http.StatusOK) + a.writeOK(ctx) } func (a *API) onConfigPathsPatch(ctx *gin.Context) { //nolint:dupl @@ -130,7 +130,7 @@ func (a *API) onConfigPathsPatch(ctx *gin.Context) { //nolint:dupl a.Conf = newConf a.Parent.APIConfigSet(newConf) - ctx.Status(http.StatusOK) + a.writeOK(ctx) } func (a *API) onConfigPathsReplace(ctx *gin.Context) { //nolint:dupl @@ -171,7 +171,7 @@ func (a *API) onConfigPathsReplace(ctx *gin.Context) { //nolint:dupl a.Conf = newConf a.Parent.APIConfigSet(newConf) - ctx.Status(http.StatusOK) + a.writeOK(ctx) } func (a *API) onConfigPathsDelete(ctx *gin.Context) { @@ -205,5 +205,5 @@ func (a *API) onConfigPathsDelete(ctx *gin.Context) { a.Conf = newConf a.Parent.APIConfigSet(newConf) - ctx.Status(http.StatusOK) + a.writeOK(ctx) } diff --git a/internal/api/api_config_paths_test.go b/internal/api/api_config_paths_test.go index 0a2bb60d..881f11d9 100644 --- a/internal/api/api_config_paths_test.go +++ b/internal/api/api_config_paths_test.go @@ -1,4 +1,4 @@ -package api +package api //nolint:revive import ( "bytes" diff --git a/internal/api/api_hls.go b/internal/api/api_hls.go index 9c8e8411..5b9abfd3 100644 --- a/internal/api/api_hls.go +++ b/internal/api/api_hls.go @@ -1,5 +1,5 @@ //nolint:dupl -package api +package api //nolint:revive import ( "errors" diff --git a/internal/api/api_hls_test.go b/internal/api/api_hls_test.go new file mode 100644 index 00000000..72e184bf --- /dev/null +++ b/internal/api/api_hls_test.go @@ -0,0 +1,112 @@ +package api //nolint:revive + +import ( + "net/http" + "testing" + "time" + + "github.com/bluenviron/mediamtx/internal/conf" + "github.com/bluenviron/mediamtx/internal/defs" + "github.com/bluenviron/mediamtx/internal/servers/hls" + "github.com/bluenviron/mediamtx/internal/test" + "github.com/stretchr/testify/require" +) + +type testHLSServer struct { + muxers map[string]*defs.APIHLSMuxer +} + +func (s *testHLSServer) APIMuxersList() (*defs.APIHLSMuxerList, error) { + items := make([]*defs.APIHLSMuxer, 0, len(s.muxers)) + for _, muxer := range s.muxers { + items = append(items, muxer) + } + return &defs.APIHLSMuxerList{Items: items}, nil +} + +func (s *testHLSServer) APIMuxersGet(name string) (*defs.APIHLSMuxer, error) { + muxer, ok := s.muxers[name] + if !ok { + return nil, hls.ErrMuxerNotFound + } + return muxer, nil +} + +func TestHLSMuxersList(t *testing.T) { + now := time.Now() + hlsServer := &testHLSServer{ + muxers: map[string]*defs.APIHLSMuxer{ + "test1": { + Path: "test1", + Created: now, + LastRequest: now.Add(5 * time.Second), + BytesSent: 1234, + }, + "test2": { + Path: "test2", + Created: now.Add(time.Minute), + LastRequest: now.Add(time.Minute + 10*time.Second), + BytesSent: 5678, + }, + }, + } + + api := API{ + Address: "localhost:9997", + ReadTimeout: conf.Duration(10 * time.Second), + WriteTimeout: conf.Duration(10 * time.Second), + AuthManager: test.NilAuthManager, + HLSServer: hlsServer, + Parent: &testParent{}, + } + err := api.Initialize() + require.NoError(t, err) + defer api.Close() + + tr := &http.Transport{} + defer tr.CloseIdleConnections() + hc := &http.Client{Transport: tr} + + var out defs.APIHLSMuxerList + httpRequest(t, hc, http.MethodGet, "http://localhost:9997/v3/hlsmuxers/list", nil, &out) + + require.Equal(t, 2, out.ItemCount) + require.Equal(t, 1, out.PageCount) + require.Len(t, out.Items, 2) +} + +func TestHLSMuxersGet(t *testing.T) { + now := time.Now() + hlsServer := &testHLSServer{ + muxers: map[string]*defs.APIHLSMuxer{ + "mypath": { + Path: "mypath", + Created: now, + LastRequest: now.Add(5 * time.Second), + BytesSent: 9999, + }, + }, + } + + api := API{ + Address: "localhost:9997", + ReadTimeout: conf.Duration(10 * time.Second), + WriteTimeout: conf.Duration(10 * time.Second), + AuthManager: test.NilAuthManager, + HLSServer: hlsServer, + Parent: &testParent{}, + } + err := api.Initialize() + require.NoError(t, err) + defer api.Close() + + tr := &http.Transport{} + defer tr.CloseIdleConnections() + hc := &http.Client{Transport: tr} + + var out defs.APIHLSMuxer + httpRequest(t, hc, http.MethodGet, "http://localhost:9997/v3/hlsmuxers/get/mypath", nil, &out) + + require.Equal(t, "mypath", out.Path) + require.Equal(t, uint64(9999), out.BytesSent) +} diff --git a/internal/api/api_paths.go b/internal/api/api_paths.go index 93de2ec5..2d6c8474 100644 --- a/internal/api/api_paths.go +++ b/internal/api/api_paths.go @@ -1,5 +1,5 @@ //nolint:dupl -package api +package api //nolint:revive import ( "errors" diff --git a/internal/api/api_paths_test.go b/internal/api/api_paths_test.go new file mode 100644 index 00000000..4576a9c8 --- /dev/null +++ b/internal/api/api_paths_test.go @@ -0,0 +1,136 @@ +package api //nolint:revive + +import ( + "net/http" + "testing" + "time" + + "github.com/bluenviron/mediamtx/internal/conf" + "github.com/bluenviron/mediamtx/internal/defs" + "github.com/bluenviron/mediamtx/internal/test" + "github.com/stretchr/testify/require" +) + +type testPathManager struct { + paths map[string]*defs.APIPath +} + +func (m *testPathManager) APIPathsList() (*defs.APIPathList, error) { + items := make([]*defs.APIPath, 0, len(m.paths)) + for _, path := range m.paths { + items = append(items, path) + } + return &defs.APIPathList{Items: items}, nil +} + +func (m *testPathManager) APIPathsGet(name string) (*defs.APIPath, error) { + path, ok := m.paths[name] + if !ok { + return nil, conf.ErrPathNotFound + } + return path, nil +} + +func TestPathsList(t *testing.T) { + now := time.Now() + pathManager := &testPathManager{ + paths: map[string]*defs.APIPath{ + "test1": { + Name: "test1", + ConfName: "test1", + Source: &defs.APIPathSourceOrReader{Type: "publisher", ID: "pub1"}, + Ready: true, + ReadyTime: &now, + Tracks: []string{"H264", "Opus"}, + BytesReceived: 1000, + BytesSent: 2000, + Readers: []defs.APIPathSourceOrReader{ + {Type: "reader", ID: "reader1"}, + }, + }, + "test2": { + Name: "test2", + ConfName: "test2", + Ready: false, + Tracks: []string{}, + BytesReceived: 500, + BytesSent: 100, + Readers: []defs.APIPathSourceOrReader{}, + }, + }, + } + + api := API{ + Address: "localhost:9997", + ReadTimeout: conf.Duration(10 * time.Second), + WriteTimeout: conf.Duration(10 * time.Second), + AuthManager: test.NilAuthManager, + PathManager: pathManager, + Parent: &testParent{}, + } + err := api.Initialize() + require.NoError(t, err) + defer api.Close() + + tr := &http.Transport{} + defer tr.CloseIdleConnections() + hc := &http.Client{Transport: tr} + + var out defs.APIPathList + httpRequest(t, hc, http.MethodGet, "http://localhost:9997/v3/paths/list", nil, &out) + + require.Equal(t, 2, out.ItemCount) + require.Equal(t, 1, out.PageCount) + require.Len(t, out.Items, 2) +} + +func TestPathsGet(t *testing.T) { + now := time.Now() + pathManager := &testPathManager{ + paths: map[string]*defs.APIPath{ + "mystream": { + Name: "mystream", + ConfName: "mystream", + Source: &defs.APIPathSourceOrReader{Type: "rtspSession", ID: "session123"}, + Ready: true, + ReadyTime: &now, + Tracks: []string{"H264", "Opus"}, + BytesReceived: 123456, + BytesSent: 789012, + Readers: []defs.APIPathSourceOrReader{ + {Type: "hlsMuxer", ID: "muxer1"}, + {Type: "webRTCSession", ID: "session456"}, + }, + }, + }, + } + + api := API{ + Address: "localhost:9997", + ReadTimeout: conf.Duration(10 * time.Second), + WriteTimeout: conf.Duration(10 * time.Second), + AuthManager: test.NilAuthManager, + PathManager: pathManager, + Parent: &testParent{}, + } + err := api.Initialize() + require.NoError(t, err) + defer api.Close() + + tr := &http.Transport{} + defer tr.CloseIdleConnections() + hc := &http.Client{Transport: tr} + + var out defs.APIPath + httpRequest(t, hc, http.MethodGet, "http://localhost:9997/v3/paths/get/mystream", nil, &out) + + require.Equal(t, "mystream", out.Name) + require.Equal(t, "mystream", out.ConfName) + require.True(t, out.Ready) + require.NotNil(t, out.Source) + require.Equal(t, "rtspSession", out.Source.Type) + require.Len(t, out.Tracks, 2) + require.Len(t, out.Readers, 2) + require.Equal(t, uint64(123456), out.BytesReceived) + require.Equal(t, uint64(789012), out.BytesSent) +} diff --git a/internal/api/api_recordings.go b/internal/api/api_recordings.go index a490557f..e09a521c 100644 --- a/internal/api/api_recordings.go +++ b/internal/api/api_recordings.go @@ -1,4 +1,4 @@ -package api +package api //nolint:revive import ( "fmt" @@ -94,5 +94,5 @@ func (a *API) onRecordingDeleteSegment(ctx *gin.Context) { return } - ctx.Status(http.StatusOK) + a.writeOK(ctx) } diff --git a/internal/api/api_recordings_test.go b/internal/api/api_recordings_test.go index 1d1ac7ef..2e39c33c 100644 --- a/internal/api/api_recordings_test.go +++ b/internal/api/api_recordings_test.go @@ -1,4 +1,4 @@ -package api +package api //nolint:revive import ( "net/http" @@ -174,11 +174,5 @@ func TestRecordingsDeleteSegment(t *testing.T) { v.Set("start", time.Date(2008, 11, 7, 11, 22, 0, 900000000, time.Local).Format(time.RFC3339Nano)) u.RawQuery = v.Encode() - req, err := http.NewRequest(http.MethodDelete, u.String(), nil) - require.NoError(t, err) - - res, err := hc.Do(req) - require.NoError(t, err) - defer res.Body.Close() - require.Equal(t, http.StatusOK, res.StatusCode) + httpRequest(t, hc, http.MethodDelete, u.String(), nil, nil) } diff --git a/internal/api/api_rtmp.go b/internal/api/api_rtmp.go index fa398a42..1a4476dc 100644 --- a/internal/api/api_rtmp.go +++ b/internal/api/api_rtmp.go @@ -1,4 +1,4 @@ -package api +package api //nolint:revive import ( "errors" @@ -64,7 +64,7 @@ func (a *API) onRTMPConnsKick(ctx *gin.Context) { return } - ctx.Status(http.StatusOK) + a.writeOK(ctx) } func (a *API) onRTMPSConnsList(ctx *gin.Context) { @@ -122,5 +122,5 @@ func (a *API) onRTMPSConnsKick(ctx *gin.Context) { return } - ctx.Status(http.StatusOK) + a.writeOK(ctx) } diff --git a/internal/api/api_rtmp_test.go b/internal/api/api_rtmp_test.go new file mode 100644 index 00000000..dbf78f3f --- /dev/null +++ b/internal/api/api_rtmp_test.go @@ -0,0 +1,261 @@ +package api //nolint:revive + +import ( + "fmt" + "net/http" + "testing" + "time" + + "github.com/bluenviron/mediamtx/internal/conf" + "github.com/bluenviron/mediamtx/internal/defs" + "github.com/bluenviron/mediamtx/internal/servers/rtmp" + "github.com/bluenviron/mediamtx/internal/test" + "github.com/google/uuid" + "github.com/stretchr/testify/require" +) + +type testRTMPServer struct { + conns map[uuid.UUID]*defs.APIRTMPConn +} + +func (s *testRTMPServer) APIConnsList() (*defs.APIRTMPConnList, error) { + items := make([]*defs.APIRTMPConn, 0, len(s.conns)) + for _, conn := range s.conns { + items = append(items, conn) + } + return &defs.APIRTMPConnList{Items: items}, nil +} + +func (s *testRTMPServer) APIConnsGet(id uuid.UUID) (*defs.APIRTMPConn, error) { + conn, ok := s.conns[id] + if !ok { + return nil, rtmp.ErrConnNotFound + } + return conn, nil +} + +func (s *testRTMPServer) APIConnsKick(id uuid.UUID) error { + _, ok := s.conns[id] + if !ok { + return rtmp.ErrConnNotFound + } + return nil +} + +func TestRTMPConnsList(t *testing.T) { + for _, ca := range []struct { + name string + endpoint string + isSecure bool + }{ + { + name: "rtmp", + endpoint: "rtmpconns", + isSecure: false, + }, + { + name: "rtmps", + endpoint: "rtmpsconns", + isSecure: true, + }, + } { + t.Run(ca.name, func(t *testing.T) { + id1 := uuid.New() + id2 := uuid.New() + now := time.Now() + + rtmpServer := &testRTMPServer{ + conns: map[uuid.UUID]*defs.APIRTMPConn{ + id1: { + ID: id1, + Created: now, + RemoteAddr: "192.168.1.1:5000", + State: defs.APIRTMPConnStatePublish, + Path: "stream1", + Query: "token=abc", + BytesReceived: 1000, + BytesSent: 2000, + }, + id2: { + ID: id2, + Created: now.Add(time.Minute), + RemoteAddr: "192.168.1.2:5001", + State: defs.APIRTMPConnStateRead, + Path: "stream2", + Query: "", + BytesReceived: 500, + BytesSent: 1500, + }, + }, + } + + api := API{ + Address: "localhost:9997", + ReadTimeout: conf.Duration(10 * time.Second), + WriteTimeout: conf.Duration(10 * time.Second), + AuthManager: test.NilAuthManager, + Parent: &testParent{}, + } + + if ca.isSecure { + api.RTMPSServer = rtmpServer + } else { + api.RTMPServer = rtmpServer + } + + err := api.Initialize() + require.NoError(t, err) + defer api.Close() + + tr := &http.Transport{} + defer tr.CloseIdleConnections() + hc := &http.Client{Transport: tr} + + var out defs.APIRTMPConnList + httpRequest(t, hc, http.MethodGet, fmt.Sprintf("http://localhost:9997/v3/%s/list", ca.endpoint), nil, &out) + + require.Equal(t, 2, out.ItemCount) + require.Equal(t, 1, out.PageCount) + require.Len(t, out.Items, 2) + }) + } +} + +func TestRTMPConnsGet(t *testing.T) { + for _, ca := range []struct { + name string + endpoint string + path string + isSecure bool + }{ + { + name: "rtmp", + endpoint: "rtmpconns", + path: "mystream", + isSecure: false, + }, + { + name: "rtmps", + endpoint: "rtmpsconns", + path: "secure-stream", + isSecure: true, + }, + } { + t.Run(ca.name, func(t *testing.T) { + id := uuid.New() + now := time.Now() + + rtmpServer := &testRTMPServer{ + conns: map[uuid.UUID]*defs.APIRTMPConn{ + id: { + ID: id, + Created: now, + RemoteAddr: "192.168.1.100:5000", + State: defs.APIRTMPConnStatePublish, + Path: ca.path, + Query: "key=value", + BytesReceived: 999999, + BytesSent: 888888, + }, + }, + } + + api := API{ + Address: "localhost:9997", + ReadTimeout: conf.Duration(10 * time.Second), + WriteTimeout: conf.Duration(10 * time.Second), + AuthManager: test.NilAuthManager, + Parent: &testParent{}, + } + + if ca.isSecure { + api.RTMPSServer = rtmpServer + } else { + api.RTMPServer = rtmpServer + } + + err := api.Initialize() + require.NoError(t, err) + defer api.Close() + + tr := &http.Transport{} + defer tr.CloseIdleConnections() + hc := &http.Client{Transport: tr} + + var out defs.APIRTMPConn + httpRequest(t, hc, http.MethodGet, fmt.Sprintf("http://localhost:9997/v3/%s/get/%s", ca.endpoint, id), nil, &out) + + require.Equal(t, id, out.ID) + require.Equal(t, "192.168.1.100:5000", out.RemoteAddr) + require.Equal(t, defs.APIRTMPConnStatePublish, out.State) + require.Equal(t, ca.path, out.Path) + require.Equal(t, uint64(999999), out.BytesReceived) + }) + } +} + +func TestRTMPConnsKick(t *testing.T) { + for _, ca := range []struct { + name string + endpoint string + path string + isSecure bool + }{ + { + name: "rtmp", + endpoint: "rtmpconns", + path: "mystream", + isSecure: false, + }, + { + name: "rtmps", + endpoint: "rtmpsconns", + path: "secure-stream", + isSecure: true, + }, + } { + t.Run(ca.name, func(t *testing.T) { + id := uuid.New() + now := time.Now() + + rtmpServer := &testRTMPServer{ + conns: map[uuid.UUID]*defs.APIRTMPConn{ + id: { + ID: id, + Created: now, + RemoteAddr: "192.168.1.100:5000", + State: defs.APIRTMPConnStatePublish, + Path: ca.path, + Query: "", + BytesReceived: 1000, + BytesSent: 2000, + }, + }, + } + + api := API{ + Address: "localhost:9997", + ReadTimeout: conf.Duration(10 * time.Second), + WriteTimeout: conf.Duration(10 * time.Second), + AuthManager: test.NilAuthManager, + Parent: &testParent{}, + } + + if ca.isSecure { + api.RTMPSServer = rtmpServer + } else { + api.RTMPServer = rtmpServer + } + + err := api.Initialize() + require.NoError(t, err) + defer api.Close() + + tr := &http.Transport{} + defer tr.CloseIdleConnections() + hc := &http.Client{Transport: tr} + + httpRequest(t, hc, http.MethodPost, fmt.Sprintf("http://localhost:9997/v3/%s/kick/%s", ca.endpoint, id), nil, nil) + }) + } +} diff --git a/internal/api/api_rtsp.go b/internal/api/api_rtsp.go index 422ae40d..54ec9e1f 100644 --- a/internal/api/api_rtsp.go +++ b/internal/api/api_rtsp.go @@ -1,4 +1,4 @@ -package api +package api //nolint:revive import ( "errors" @@ -102,7 +102,7 @@ func (a *API) onRTSPSessionsKick(ctx *gin.Context) { return } - ctx.Status(http.StatusOK) + a.writeOK(ctx) } func (a *API) onRTSPSConnsList(ctx *gin.Context) { @@ -198,5 +198,5 @@ func (a *API) onRTSPSSessionsKick(ctx *gin.Context) { return } - ctx.Status(http.StatusOK) + a.writeOK(ctx) } diff --git a/internal/api/api_rtsp_test.go b/internal/api/api_rtsp_test.go new file mode 100644 index 00000000..33e10584 --- /dev/null +++ b/internal/api/api_rtsp_test.go @@ -0,0 +1,512 @@ +package api //nolint:revive + +import ( + "encoding/json" + "fmt" + "net/http" + "testing" + "time" + + "github.com/bluenviron/mediamtx/internal/conf" + "github.com/bluenviron/mediamtx/internal/defs" + "github.com/bluenviron/mediamtx/internal/servers/rtsp" + "github.com/bluenviron/mediamtx/internal/test" + "github.com/google/uuid" + "github.com/stretchr/testify/require" +) + +type testRTSPServer struct { + conns map[uuid.UUID]*defs.APIRTSPConn + sessions map[uuid.UUID]*defs.APIRTSPSession +} + +func (s *testRTSPServer) APIConnsList() (*defs.APIRTSPConnsList, error) { + items := make([]*defs.APIRTSPConn, 0, len(s.conns)) + for _, conn := range s.conns { + items = append(items, conn) + } + return &defs.APIRTSPConnsList{Items: items}, nil +} + +func (s *testRTSPServer) APIConnsGet(id uuid.UUID) (*defs.APIRTSPConn, error) { + conn, ok := s.conns[id] + if !ok { + return nil, rtsp.ErrConnNotFound + } + return conn, nil +} + +func (s *testRTSPServer) APISessionsList() (*defs.APIRTSPSessionList, error) { + items := make([]*defs.APIRTSPSession, 0, len(s.sessions)) + for _, session := range s.sessions { + items = append(items, session) + } + return &defs.APIRTSPSessionList{Items: items}, nil +} + +func (s *testRTSPServer) APISessionsGet(id uuid.UUID) (*defs.APIRTSPSession, error) { + session, ok := s.sessions[id] + if !ok { + return nil, rtsp.ErrSessionNotFound + } + return session, nil +} + +func (s *testRTSPServer) APISessionsKick(id uuid.UUID) error { + _, ok := s.sessions[id] + if !ok { + return rtsp.ErrSessionNotFound + } + return nil +} + +func TestRTSPConnsList(t *testing.T) { + for _, ca := range []struct { + name string + endpoint string + secure bool + }{ + { + name: "rtsp", + endpoint: "rtspconns", + secure: false, + }, + { + name: "rtsps", + endpoint: "rtspsconns", + secure: true, + }, + } { + t.Run(ca.name, func(t *testing.T) { + id1 := uuid.New() + id2 := uuid.New() + sessionID := uuid.New() + now := time.Now() + + rtspServer := &testRTSPServer{ + conns: map[uuid.UUID]*defs.APIRTSPConn{ + id1: { + ID: id1, + Created: now, + RemoteAddr: "192.168.1.1:5000", + BytesReceived: 1000, + BytesSent: 2000, + Session: &sessionID, + Tunnel: "", + }, + id2: { + ID: id2, + Created: now.Add(time.Minute), + RemoteAddr: "192.168.1.2:5001", + BytesReceived: 500, + BytesSent: 1500, + Session: nil, + Tunnel: "http", + }, + }, + } + + api := API{ + Address: "localhost:9997", + ReadTimeout: conf.Duration(10 * time.Second), + WriteTimeout: conf.Duration(10 * time.Second), + AuthManager: test.NilAuthManager, + Parent: &testParent{}, + } + if ca.secure { + api.RTSPSServer = rtspServer + } else { + api.RTSPServer = rtspServer + } + err := api.Initialize() + require.NoError(t, err) + defer api.Close() + + tr := &http.Transport{} + defer tr.CloseIdleConnections() + hc := &http.Client{Transport: tr} + + req, err := http.NewRequest(http.MethodGet, + fmt.Sprintf("http://localhost:9997/v3/%s/list", ca.endpoint), nil) + require.NoError(t, err) + + res, err := hc.Do(req) + require.NoError(t, err) + defer res.Body.Close() + + require.Equal(t, http.StatusOK, res.StatusCode) + + var out defs.APIRTSPConnsList + err = json.NewDecoder(res.Body).Decode(&out) + require.NoError(t, err) + + require.Equal(t, 2, out.ItemCount) + require.Equal(t, 1, out.PageCount) + require.Len(t, out.Items, 2) + }) + } +} + +func TestRTSPConnsGet(t *testing.T) { + for _, ca := range []struct { + name string + endpoint string + secure bool + }{ + { + name: "rtsp", + endpoint: "rtspconns", + secure: false, + }, + { + name: "rtsps", + endpoint: "rtspsconns", + secure: true, + }, + } { + t.Run(ca.name, func(t *testing.T) { + id := uuid.New() + sessionID := uuid.New() + now := time.Now() + + rtspServer := &testRTSPServer{ + conns: map[uuid.UUID]*defs.APIRTSPConn{ + id: { + ID: id, + Created: now, + RemoteAddr: "192.168.1.100:5000", + BytesReceived: 999999, + BytesSent: 888888, + Session: &sessionID, + Tunnel: "", + }, + }, + } + + api := API{ + Address: "localhost:9997", + ReadTimeout: conf.Duration(10 * time.Second), + WriteTimeout: conf.Duration(10 * time.Second), + AuthManager: test.NilAuthManager, + Parent: &testParent{}, + } + if ca.secure { + api.RTSPSServer = rtspServer + } else { + api.RTSPServer = rtspServer + } + err := api.Initialize() + require.NoError(t, err) + defer api.Close() + + tr := &http.Transport{} + defer tr.CloseIdleConnections() + hc := &http.Client{Transport: tr} + + req, err := http.NewRequest(http.MethodGet, + fmt.Sprintf("http://localhost:9997/v3/%s/get/%s", ca.endpoint, id), nil) + require.NoError(t, err) + + res, err := hc.Do(req) + require.NoError(t, err) + defer res.Body.Close() + + require.Equal(t, http.StatusOK, res.StatusCode) + + var out defs.APIRTSPConn + err = json.NewDecoder(res.Body).Decode(&out) + require.NoError(t, err) + + require.Equal(t, id, out.ID) + require.Equal(t, "192.168.1.100:5000", out.RemoteAddr) + require.Equal(t, uint64(999999), out.BytesReceived) + require.NotNil(t, out.Session) + require.Equal(t, sessionID, *out.Session) + }) + } +} + +func TestRTSPSessionsList(t *testing.T) { + for _, ca := range []struct { + name string + endpoint string + secure bool + }{ + { + name: "rtsp", + endpoint: "rtspsessions", + secure: false, + }, + { + name: "rtsps", + endpoint: "rtspssessions", + secure: true, + }, + } { + t.Run(ca.name, func(t *testing.T) { + id1 := uuid.New() + id2 := uuid.New() + now := time.Now() + transport := "UDP" + profile := "AVP" + + rtspServer := &testRTSPServer{ + sessions: map[uuid.UUID]*defs.APIRTSPSession{ + id1: { + ID: id1, + Created: now, + RemoteAddr: "192.168.1.1:5000", + State: defs.APIRTSPSessionStatePublish, + Path: "stream1", + Query: "token=abc", + Transport: &transport, + Profile: &profile, + BytesReceived: 1000, + BytesSent: 2000, + RTPPacketsReceived: 100, + RTPPacketsSent: 200, + RTPPacketsLost: 5, + RTPPacketsInError: 2, + RTPPacketsJitter: 0.5, + RTCPPacketsReceived: 10, + RTCPPacketsSent: 15, + RTCPPacketsInError: 1, + }, + id2: { + ID: id2, + Created: now.Add(time.Minute), + RemoteAddr: "192.168.1.2:5001", + State: defs.APIRTSPSessionStateRead, + Path: "stream2", + Query: "", + Transport: nil, + Profile: nil, + BytesReceived: 500, + BytesSent: 1500, + RTPPacketsReceived: 50, + RTPPacketsSent: 150, + RTPPacketsLost: 0, + RTPPacketsInError: 0, + RTPPacketsJitter: 0.1, + RTCPPacketsReceived: 5, + RTCPPacketsSent: 10, + RTCPPacketsInError: 0, + }, + }, + } + + api := API{ + Address: "localhost:9997", + ReadTimeout: conf.Duration(10 * time.Second), + WriteTimeout: conf.Duration(10 * time.Second), + AuthManager: test.NilAuthManager, + Parent: &testParent{}, + } + if ca.secure { + api.RTSPSServer = rtspServer + } else { + api.RTSPServer = rtspServer + } + err := api.Initialize() + require.NoError(t, err) + defer api.Close() + + tr := &http.Transport{} + defer tr.CloseIdleConnections() + hc := &http.Client{Transport: tr} + + req, err := http.NewRequest(http.MethodGet, + fmt.Sprintf("http://localhost:9997/v3/%s/list", ca.endpoint), nil) + require.NoError(t, err) + + res, err := hc.Do(req) + require.NoError(t, err) + defer res.Body.Close() + + require.Equal(t, http.StatusOK, res.StatusCode) + + var out defs.APIRTSPSessionList + err = json.NewDecoder(res.Body).Decode(&out) + require.NoError(t, err) + + require.Equal(t, 2, out.ItemCount) + require.Equal(t, 1, out.PageCount) + require.Len(t, out.Items, 2) + }) + } +} + +func TestRTSPSessionsGet(t *testing.T) { + for _, ca := range []struct { + name string + endpoint string + secure bool + }{ + { + name: "rtsp", + endpoint: "rtspsessions", + secure: false, + }, + { + name: "rtsps", + endpoint: "rtspssessions", + secure: true, + }, + } { + t.Run(ca.name, func(t *testing.T) { + id := uuid.New() + now := time.Now() + transport := "UDP" + profile := "AVP" + + rtspServer := &testRTSPServer{ + sessions: map[uuid.UUID]*defs.APIRTSPSession{ + id: { + ID: id, + Created: now, + RemoteAddr: "192.168.1.100:5000", + State: defs.APIRTSPSessionStatePublish, + Path: "mystream", + Query: "key=value", + Transport: &transport, + Profile: &profile, + BytesReceived: 999999, + BytesSent: 888888, + RTPPacketsReceived: 10000, + RTPPacketsSent: 20000, + RTPPacketsLost: 50, + RTPPacketsInError: 10, + RTPPacketsJitter: 1.5, + RTCPPacketsReceived: 100, + RTCPPacketsSent: 200, + RTCPPacketsInError: 5, + }, + }, + } + + api := API{ + Address: "localhost:9997", + ReadTimeout: conf.Duration(10 * time.Second), + WriteTimeout: conf.Duration(10 * time.Second), + AuthManager: test.NilAuthManager, + Parent: &testParent{}, + } + if ca.secure { + api.RTSPSServer = rtspServer + } else { + api.RTSPServer = rtspServer + } + err := api.Initialize() + require.NoError(t, err) + defer api.Close() + + tr := &http.Transport{} + defer tr.CloseIdleConnections() + hc := &http.Client{Transport: tr} + + req, err := http.NewRequest(http.MethodGet, + fmt.Sprintf("http://localhost:9997/v3/%s/get/%s", ca.endpoint, id), nil) + require.NoError(t, err) + + res, err := hc.Do(req) + require.NoError(t, err) + defer res.Body.Close() + + require.Equal(t, http.StatusOK, res.StatusCode) + + var out defs.APIRTSPSession + err = json.NewDecoder(res.Body).Decode(&out) + require.NoError(t, err) + + require.Equal(t, id, out.ID) + require.Equal(t, "192.168.1.100:5000", out.RemoteAddr) + require.Equal(t, defs.APIRTSPSessionStatePublish, out.State) + require.Equal(t, "mystream", out.Path) + require.Equal(t, uint64(999999), out.BytesReceived) + require.NotNil(t, out.Transport) + require.Equal(t, "UDP", *out.Transport) + }) + } +} + +func TestRTSPSessionsKick(t *testing.T) { + for _, ca := range []struct { + name string + endpoint string + secure bool + }{ + { + name: "rtsp", + endpoint: "rtspsessions", + secure: false, + }, + { + name: "rtsps", + endpoint: "rtspssessions", + secure: true, + }, + } { + t.Run(ca.name, func(t *testing.T) { + id := uuid.New() + now := time.Now() + transport := "UDP" + profile := "AVP" + + rtspServer := &testRTSPServer{ + sessions: map[uuid.UUID]*defs.APIRTSPSession{ + id: { + ID: id, + Created: now, + RemoteAddr: "192.168.1.100:5000", + State: defs.APIRTSPSessionStatePublish, + Path: "mystream", + Query: "", + Transport: &transport, + Profile: &profile, + BytesReceived: 1000, + BytesSent: 2000, + RTPPacketsReceived: 100, + RTPPacketsSent: 200, + RTPPacketsLost: 0, + RTPPacketsInError: 0, + RTPPacketsJitter: 0.5, + RTCPPacketsReceived: 10, + RTCPPacketsSent: 15, + RTCPPacketsInError: 0, + }, + }, + } + + api := API{ + Address: "localhost:9997", + ReadTimeout: conf.Duration(10 * time.Second), + WriteTimeout: conf.Duration(10 * time.Second), + AuthManager: test.NilAuthManager, + Parent: &testParent{}, + } + if ca.secure { + api.RTSPSServer = rtspServer + } else { + api.RTSPServer = rtspServer + } + err := api.Initialize() + require.NoError(t, err) + defer api.Close() + + tr := &http.Transport{} + defer tr.CloseIdleConnections() + hc := &http.Client{Transport: tr} + + req, err := http.NewRequest(http.MethodPost, + fmt.Sprintf("http://localhost:9997/v3/%s/kick/%s", ca.endpoint, id), nil) + require.NoError(t, err) + + res, err := hc.Do(req) + require.NoError(t, err) + defer res.Body.Close() + + require.Equal(t, http.StatusOK, res.StatusCode) + checkOK(t, res.Body) + }) + } +} diff --git a/internal/api/api_srt.go b/internal/api/api_srt.go index 6029dfd2..b9909df2 100644 --- a/internal/api/api_srt.go +++ b/internal/api/api_srt.go @@ -1,5 +1,5 @@ //nolint:dupl -package api +package api //nolint:revive import ( "errors" @@ -65,5 +65,5 @@ func (a *API) onSRTConnsKick(ctx *gin.Context) { return } - ctx.Status(http.StatusOK) + a.writeOK(ctx) } diff --git a/internal/api/api_srt_test.go b/internal/api/api_srt_test.go new file mode 100644 index 00000000..059acbbe --- /dev/null +++ b/internal/api/api_srt_test.go @@ -0,0 +1,256 @@ +package api //nolint:revive + +import ( + "fmt" + "net/http" + "testing" + "time" + + "github.com/bluenviron/mediamtx/internal/conf" + "github.com/bluenviron/mediamtx/internal/defs" + "github.com/bluenviron/mediamtx/internal/servers/srt" + "github.com/bluenviron/mediamtx/internal/test" + "github.com/google/uuid" + "github.com/stretchr/testify/require" +) + +type testSRTServer struct { + conns map[uuid.UUID]*defs.APISRTConn +} + +func (s *testSRTServer) APIConnsList() (*defs.APISRTConnList, error) { + items := make([]*defs.APISRTConn, 0, len(s.conns)) + for _, conn := range s.conns { + items = append(items, conn) + } + return &defs.APISRTConnList{Items: items}, nil +} + +func (s *testSRTServer) APIConnsGet(id uuid.UUID) (*defs.APISRTConn, error) { + conn, ok := s.conns[id] + if !ok { + return nil, srt.ErrConnNotFound + } + return conn, nil +} + +func (s *testSRTServer) APIConnsKick(id uuid.UUID) error { + _, ok := s.conns[id] + if !ok { + return srt.ErrConnNotFound + } + return nil +} + +func TestSRTConnsList(t *testing.T) { + id1 := uuid.New() + id2 := uuid.New() + now := time.Now() + + srtServer := &testSRTServer{ + conns: map[uuid.UUID]*defs.APISRTConn{ + id1: { + ID: id1, + Created: now, + RemoteAddr: "192.168.1.1:5000", + State: defs.APISRTConnStatePublish, + Path: "stream1", + Query: "token=abc", + PacketsSent: 1000, + PacketsReceived: 2000, + PacketsSentUnique: 950, + PacketsReceivedUnique: 1950, + BytesReceived: 100000, + BytesSent: 200000, + MsRTT: 10.5, + MbpsSendRate: 5.2, + MbpsReceiveRate: 4.8, + }, + id2: { + ID: id2, + Created: now.Add(time.Minute), + RemoteAddr: "192.168.1.2:5001", + State: defs.APISRTConnStateRead, + Path: "stream2", + Query: "", + PacketsSent: 500, + PacketsReceived: 1500, + PacketsSentUnique: 480, + PacketsReceivedUnique: 1470, + BytesReceived: 50000, + BytesSent: 150000, + MsRTT: 15.2, + MbpsSendRate: 3.5, + MbpsReceiveRate: 3.2, + }, + }, + } + + api := API{ + Address: "localhost:9997", + ReadTimeout: conf.Duration(10 * time.Second), + WriteTimeout: conf.Duration(10 * time.Second), + AuthManager: test.NilAuthManager, + SRTServer: srtServer, + Parent: &testParent{}, + } + err := api.Initialize() + require.NoError(t, err) + defer api.Close() + + tr := &http.Transport{} + defer tr.CloseIdleConnections() + hc := &http.Client{Transport: tr} + + var out defs.APISRTConnList + httpRequest(t, hc, http.MethodGet, "http://localhost:9997/v3/srtconns/list", nil, &out) + + require.Equal(t, 2, out.ItemCount) + require.Equal(t, 1, out.PageCount) + require.Len(t, out.Items, 2) +} + +func TestSRTConnsGet(t *testing.T) { + id := uuid.New() + now := time.Now() + + srtServer := &testSRTServer{ + conns: map[uuid.UUID]*defs.APISRTConn{ + id: { + ID: id, + Created: now, + RemoteAddr: "192.168.1.100:5000", + State: defs.APISRTConnStatePublish, + Path: "mystream", + Query: "key=value", + PacketsSent: 10000, + PacketsReceived: 20000, + PacketsSentUnique: 9900, + PacketsReceivedUnique: 19800, + PacketsSendLoss: 50, + PacketsReceivedLoss: 100, + PacketsRetrans: 60, + PacketsReceivedRetrans: 80, + PacketsSentACK: 500, + PacketsReceivedACK: 600, + PacketsSentNAK: 10, + PacketsReceivedNAK: 15, + PacketsSentKM: 2, + PacketsReceivedKM: 2, + UsSndDuration: 1000000, + PacketsReceivedBelated: 5, + PacketsSendDrop: 3, + PacketsReceivedDrop: 4, + PacketsReceivedUndecrypt: 0, + BytesReceived: 999999, + BytesSent: 888888, + BytesSentUnique: 880000, + BytesReceivedUnique: 990000, + BytesReceivedLoss: 5000, + BytesRetrans: 3000, + BytesReceivedRetrans: 4000, + BytesReceivedBelated: 200, + BytesSendDrop: 150, + BytesReceivedDrop: 180, + BytesReceivedUndecrypt: 0, + UsPacketsSendPeriod: 1000.5, + PacketsFlowWindow: 8192, + PacketsFlightSize: 256, + MsRTT: 25.5, + MbpsSendRate: 10.5, + MbpsReceiveRate: 9.8, + MbpsLinkCapacity: 100.0, + BytesAvailSendBuf: 65536, + BytesAvailReceiveBuf: 131072, + MbpsMaxBW: 50.0, + ByteMSS: 1500, + PacketsSendBuf: 128, + BytesSendBuf: 192000, + MsSendBuf: 1000, + MsSendTsbPdDelay: 120, + PacketsReceiveBuf: 256, + BytesReceiveBuf: 384000, + MsReceiveBuf: 2000, + MsReceiveTsbPdDelay: 120, + PacketsReorderTolerance: 10, + PacketsReceivedAvgBelatedTime: 50, + PacketsSendLossRate: 0.5, + PacketsReceivedLossRate: 0.6, + }, + }, + } + + api := API{ + Address: "localhost:9997", + ReadTimeout: conf.Duration(10 * time.Second), + WriteTimeout: conf.Duration(10 * time.Second), + AuthManager: test.NilAuthManager, + SRTServer: srtServer, + Parent: &testParent{}, + } + err := api.Initialize() + require.NoError(t, err) + defer api.Close() + + tr := &http.Transport{} + defer tr.CloseIdleConnections() + hc := &http.Client{Transport: tr} + + var out defs.APISRTConn + httpRequest(t, hc, http.MethodGet, fmt.Sprintf("http://localhost:9997/v3/srtconns/get/%s", id), nil, &out) + + require.Equal(t, id, out.ID) + require.Equal(t, "192.168.1.100:5000", out.RemoteAddr) + require.Equal(t, defs.APISRTConnStatePublish, out.State) + require.Equal(t, "mystream", out.Path) + require.Equal(t, uint64(999999), out.BytesReceived) + require.Equal(t, uint64(888888), out.BytesSent) + require.Equal(t, 25.5, out.MsRTT) + require.Equal(t, 10.5, out.MbpsSendRate) + require.Equal(t, 9.8, out.MbpsReceiveRate) +} + +func TestSRTConnsKick(t *testing.T) { + id := uuid.New() + now := time.Now() + + srtServer := &testSRTServer{ + conns: map[uuid.UUID]*defs.APISRTConn{ + id: { + ID: id, + Created: now, + RemoteAddr: "192.168.1.100:5000", + State: defs.APISRTConnStatePublish, + Path: "mystream", + Query: "", + PacketsSent: 1000, + PacketsReceived: 2000, + PacketsSentUnique: 950, + PacketsReceivedUnique: 1950, + BytesReceived: 100000, + BytesSent: 200000, + MsRTT: 10.5, + MbpsSendRate: 5.2, + MbpsReceiveRate: 4.8, + }, + }, + } + + api := API{ + Address: "localhost:9997", + ReadTimeout: conf.Duration(10 * time.Second), + WriteTimeout: conf.Duration(10 * time.Second), + AuthManager: test.NilAuthManager, + SRTServer: srtServer, + Parent: &testParent{}, + } + err := api.Initialize() + require.NoError(t, err) + defer api.Close() + + tr := &http.Transport{} + defer tr.CloseIdleConnections() + hc := &http.Client{Transport: tr} + + httpRequest(t, hc, http.MethodPost, fmt.Sprintf("http://localhost:9997/v3/srtconns/kick/%s", id), nil, nil) +} diff --git a/internal/api/api_test.go b/internal/api/api_test.go index 2354066d..a4520c18 100644 --- a/internal/api/api_test.go +++ b/internal/api/api_test.go @@ -1,4 +1,4 @@ -package api +package api //nolint:revive import ( "bytes" @@ -65,6 +65,7 @@ func httpRequest(t *testing.T, hc *http.Client, method string, ur string, in any } if out == nil { + checkOK(t, res.Body) return } @@ -73,10 +74,17 @@ func httpRequest(t *testing.T, hc *http.Client, method string, ur string, in any } func checkError(t *testing.T, body io.Reader, msg string) { - var resErr map[string]any - err := json.NewDecoder(body).Decode(&resErr) + var raw map[string]any + err := json.NewDecoder(body).Decode(&raw) require.NoError(t, err) - require.Equal(t, map[string]any{"error": msg}, resErr) + require.Equal(t, map[string]any{"status": "error", "error": msg}, raw) +} + +func checkOK(t *testing.T, body io.Reader) { + var raw map[string]any + err := json.NewDecoder(body).Decode(&raw) + require.NoError(t, err) + require.Equal(t, map[string]any{"status": "ok"}, raw) } func TestPreflightRequest(t *testing.T) { @@ -174,13 +182,7 @@ func TestAuthJWKSRefresh(t *testing.T) { u, err := url.Parse("http://localhost:9997/v3/auth/jwks/refresh") require.NoError(t, err) - req, err := http.NewRequest(http.MethodPost, u.String(), nil) - require.NoError(t, err) - - res, err := hc.Do(req) - require.NoError(t, err) - defer res.Body.Close() - require.Equal(t, http.StatusOK, res.StatusCode) + httpRequest(t, hc, http.MethodPost, u.String(), nil, nil) require.True(t, ok) } @@ -227,12 +229,15 @@ func TestAuthError(t *testing.T) { require.Equal(t, http.StatusUnauthorized, res.StatusCode) require.Equal(t, `Basic realm="mediamtx"`, res.Header.Get("WWW-Authenticate")) + checkError(t, res.Body, "authentication error") res, err = hc.Get("http://myuser:mypass@localhost:9997/v3/config/global/get") require.NoError(t, err) defer res.Body.Close() require.Equal(t, http.StatusUnauthorized, res.StatusCode) + require.Equal(t, ``, res.Header.Get("WWW-Authenticate")) + checkError(t, res.Body, "authentication error") require.Equal(t, 2, n) } diff --git a/internal/api/api_webrtc.go b/internal/api/api_webrtc.go index ba54b44f..2f27ca3b 100644 --- a/internal/api/api_webrtc.go +++ b/internal/api/api_webrtc.go @@ -1,5 +1,5 @@ //nolint:dupl -package api +package api //nolint:revive import ( "errors" @@ -65,5 +65,5 @@ func (a *API) onWebRTCSessionsKick(ctx *gin.Context) { return } - ctx.Status(http.StatusOK) + a.writeOK(ctx) } diff --git a/internal/api/api_webrtc_test.go b/internal/api/api_webrtc_test.go new file mode 100644 index 00000000..da547d06 --- /dev/null +++ b/internal/api/api_webrtc_test.go @@ -0,0 +1,224 @@ +package api //nolint:revive + +import ( + "fmt" + "net/http" + "testing" + "time" + + "github.com/bluenviron/mediamtx/internal/conf" + "github.com/bluenviron/mediamtx/internal/defs" + "github.com/bluenviron/mediamtx/internal/servers/webrtc" + "github.com/bluenviron/mediamtx/internal/test" + "github.com/google/uuid" + "github.com/stretchr/testify/require" +) + +type testWebRTCServer struct { + sessions map[uuid.UUID]*defs.APIWebRTCSession +} + +func (s *testWebRTCServer) APISessionsList() (*defs.APIWebRTCSessionList, error) { + items := make([]*defs.APIWebRTCSession, 0, len(s.sessions)) + for _, session := range s.sessions { + items = append(items, session) + } + return &defs.APIWebRTCSessionList{Items: items}, nil +} + +func (s *testWebRTCServer) APISessionsGet(id uuid.UUID) (*defs.APIWebRTCSession, error) { + session, ok := s.sessions[id] + if !ok { + return nil, webrtc.ErrSessionNotFound + } + return session, nil +} + +func (s *testWebRTCServer) APISessionsKick(id uuid.UUID) error { + _, ok := s.sessions[id] + if !ok { + return webrtc.ErrSessionNotFound + } + return nil +} + +func TestWebRTCSessionsList(t *testing.T) { + id1 := uuid.New() + id2 := uuid.New() + now := time.Now() + + webrtcServer := &testWebRTCServer{ + sessions: map[uuid.UUID]*defs.APIWebRTCSession{ + id1: { + ID: id1, + Created: now, + RemoteAddr: "192.168.1.1:5000", + PeerConnectionEstablished: true, + LocalCandidate: "192.168.1.100:8000", + RemoteCandidate: "192.168.1.1:5000", + State: defs.APIWebRTCSessionStatePublish, + Path: "stream1", + Query: "token=abc", + BytesReceived: 1000, + BytesSent: 2000, + RTPPacketsReceived: 100, + RTPPacketsSent: 200, + RTPPacketsLost: 5, + RTPPacketsJitter: 0.5, + RTCPPacketsReceived: 10, + RTCPPacketsSent: 15, + }, + id2: { + ID: id2, + Created: now.Add(time.Minute), + RemoteAddr: "192.168.1.2:5001", + PeerConnectionEstablished: true, + LocalCandidate: "192.168.1.100:8001", + RemoteCandidate: "192.168.1.2:5001", + State: defs.APIWebRTCSessionStateRead, + Path: "stream2", + Query: "", + BytesReceived: 500, + BytesSent: 1500, + RTPPacketsReceived: 50, + RTPPacketsSent: 150, + RTPPacketsLost: 0, + RTPPacketsJitter: 0.1, + RTCPPacketsReceived: 5, + RTCPPacketsSent: 10, + }, + }, + } + + api := API{ + Address: "localhost:9997", + ReadTimeout: conf.Duration(10 * time.Second), + WriteTimeout: conf.Duration(10 * time.Second), + AuthManager: test.NilAuthManager, + WebRTCServer: webrtcServer, + Parent: &testParent{}, + } + err := api.Initialize() + require.NoError(t, err) + defer api.Close() + + tr := &http.Transport{} + defer tr.CloseIdleConnections() + hc := &http.Client{Transport: tr} + + var out defs.APIWebRTCSessionList + httpRequest(t, hc, http.MethodGet, "http://localhost:9997/v3/webrtcsessions/list", nil, &out) + + require.Equal(t, 2, out.ItemCount) + require.Equal(t, 1, out.PageCount) + require.Len(t, out.Items, 2) +} + +func TestWebRTCSessionsGet(t *testing.T) { + id := uuid.New() + now := time.Now() + + webrtcServer := &testWebRTCServer{ + sessions: map[uuid.UUID]*defs.APIWebRTCSession{ + id: { + ID: id, + Created: now, + RemoteAddr: "192.168.1.100:5000", + PeerConnectionEstablished: true, + LocalCandidate: "192.168.1.200:8000", + RemoteCandidate: "192.168.1.100:5000", + State: defs.APIWebRTCSessionStatePublish, + Path: "mystream", + Query: "key=value", + BytesReceived: 999999, + BytesSent: 888888, + RTPPacketsReceived: 10000, + RTPPacketsSent: 20000, + RTPPacketsLost: 50, + RTPPacketsJitter: 1.5, + RTCPPacketsReceived: 100, + RTCPPacketsSent: 200, + }, + }, + } + + api := API{ + Address: "localhost:9997", + ReadTimeout: conf.Duration(10 * time.Second), + WriteTimeout: conf.Duration(10 * time.Second), + AuthManager: test.NilAuthManager, + WebRTCServer: webrtcServer, + Parent: &testParent{}, + } + err := api.Initialize() + require.NoError(t, err) + defer api.Close() + + tr := &http.Transport{} + defer tr.CloseIdleConnections() + hc := &http.Client{Transport: tr} + + var out defs.APIWebRTCSession + httpRequest(t, hc, http.MethodGet, fmt.Sprintf("http://localhost:9997/v3/webrtcsessions/get/%s", id), nil, &out) + + require.Equal(t, id, out.ID) + require.Equal(t, "192.168.1.100:5000", out.RemoteAddr) + require.Equal(t, defs.APIWebRTCSessionStatePublish, out.State) + require.Equal(t, "mystream", out.Path) + require.True(t, out.PeerConnectionEstablished) + require.Equal(t, "192.168.1.200:8000", out.LocalCandidate) + require.Equal(t, "192.168.1.100:5000", out.RemoteCandidate) + require.Equal(t, uint64(999999), out.BytesReceived) + require.Equal(t, uint64(888888), out.BytesSent) + require.Equal(t, uint64(10000), out.RTPPacketsReceived) + require.Equal(t, uint64(20000), out.RTPPacketsSent) + require.Equal(t, uint64(50), out.RTPPacketsLost) + require.Equal(t, 1.5, out.RTPPacketsJitter) +} + +func TestWebRTCSessionsKick(t *testing.T) { + id := uuid.New() + now := time.Now() + + webrtcServer := &testWebRTCServer{ + sessions: map[uuid.UUID]*defs.APIWebRTCSession{ + id: { + ID: id, + Created: now, + RemoteAddr: "192.168.1.100:5000", + PeerConnectionEstablished: true, + LocalCandidate: "192.168.1.200:8000", + RemoteCandidate: "192.168.1.100:5000", + State: defs.APIWebRTCSessionStatePublish, + Path: "mystream", + Query: "", + BytesReceived: 1000, + BytesSent: 2000, + RTPPacketsReceived: 100, + RTPPacketsSent: 200, + RTPPacketsLost: 0, + RTPPacketsJitter: 0.5, + RTCPPacketsReceived: 10, + RTCPPacketsSent: 15, + }, + }, + } + + api := API{ + Address: "localhost:9997", + ReadTimeout: conf.Duration(10 * time.Second), + WriteTimeout: conf.Duration(10 * time.Second), + AuthManager: test.NilAuthManager, + WebRTCServer: webrtcServer, + Parent: &testParent{}, + } + err := api.Initialize() + require.NoError(t, err) + defer api.Close() + + tr := &http.Transport{} + defer tr.CloseIdleConnections() + hc := &http.Client{Transport: tr} + + httpRequest(t, hc, http.MethodPost, fmt.Sprintf("http://localhost:9997/v3/webrtcsessions/kick/%s", id), nil, nil) +} diff --git a/internal/api/paginate.go b/internal/api/paginate.go index c8ab3733..b268eca6 100644 --- a/internal/api/paginate.go +++ b/internal/api/paginate.go @@ -1,4 +1,4 @@ -package api +package api //nolint:revive import ( "fmt" diff --git a/internal/api/paginate_test.go b/internal/api/paginate_test.go index 6ffcd3c1..8094a19e 100644 --- a/internal/api/paginate_test.go +++ b/internal/api/paginate_test.go @@ -1,4 +1,4 @@ -package api +package api //nolint:revive import ( "testing" diff --git a/internal/core/api_test.go b/internal/core/api_test.go index ff42bbca..bb363358 100644 --- a/internal/core/api_test.go +++ b/internal/core/api_test.go @@ -69,7 +69,7 @@ func checkError(t *testing.T, msg string, body io.Reader) { var resErr map[string]any err := json.NewDecoder(body).Decode(&resErr) require.NoError(t, err) - require.Equal(t, map[string]any{"error": msg}, resErr) + require.Equal(t, map[string]any{"status": "error", "error": msg}, resErr) } func TestAPIPathsList(t *testing.T) { diff --git a/internal/defs/api.go b/internal/defs/api.go index c0aa11e9..9bc30831 100644 --- a/internal/defs/api.go +++ b/internal/defs/api.go @@ -50,9 +50,15 @@ type APIWebRTCServer interface { APISessionsKick(uuid.UUID) error } +// APIOK is returned on success. +type APIOK struct { + Status string `json:"status"` +} + // APIError is a generic error. type APIError struct { - Error string `json:"error"` + Status string `json:"status"` + Error string `json:"error"` } // APIInfo is a info response. diff --git a/internal/metrics/metrics.go b/internal/metrics/metrics.go index 0934870f..d05a07f9 100644 --- a/internal/metrics/metrics.go +++ b/internal/metrics/metrics.go @@ -157,7 +157,10 @@ func (m *Metrics) middlewareAuth(ctx *gin.Context) { if err != nil { if err.AskCredentials { ctx.Header("WWW-Authenticate", `Basic realm="mediamtx"`) - ctx.AbortWithStatus(http.StatusUnauthorized) + ctx.AbortWithStatusJSON(http.StatusUnauthorized, &defs.APIError{ + Status: "error", + Error: "authentication error", + }) return } @@ -166,7 +169,10 @@ func (m *Metrics) middlewareAuth(ctx *gin.Context) { // wait some seconds to delay brute force attacks <-time.After(auth.PauseAfterError) - ctx.AbortWithStatus(http.StatusUnauthorized) + ctx.AbortWithStatusJSON(http.StatusUnauthorized, &defs.APIError{ + Status: "error", + Error: "authentication error", + }) return } } diff --git a/internal/playback/server.go b/internal/playback/server.go index c5b7f391..15602b47 100644 --- a/internal/playback/server.go +++ b/internal/playback/server.go @@ -9,6 +9,7 @@ import ( "github.com/bluenviron/mediamtx/internal/auth" "github.com/bluenviron/mediamtx/internal/conf" + "github.com/bluenviron/mediamtx/internal/defs" "github.com/bluenviron/mediamtx/internal/logger" "github.com/bluenviron/mediamtx/internal/protocols/httpp" "github.com/gin-gonic/gin" @@ -124,7 +125,10 @@ func (s *Server) doAuth(ctx *gin.Context, pathName string) bool { if err != nil { if err.AskCredentials { ctx.Header("WWW-Authenticate", `Basic realm="mediamtx"`) - ctx.Writer.WriteHeader(http.StatusUnauthorized) + ctx.AbortWithStatusJSON(http.StatusUnauthorized, &defs.APIError{ + Status: "error", + Error: "authentication error", + }) return false } @@ -134,7 +138,10 @@ func (s *Server) doAuth(ctx *gin.Context, pathName string) bool { // wait some seconds to delay brute force attacks <-time.After(auth.PauseAfterError) - ctx.Writer.WriteHeader(http.StatusUnauthorized) + ctx.AbortWithStatusJSON(http.StatusUnauthorized, &defs.APIError{ + Status: "error", + Error: "authentication error", + }) return false } diff --git a/internal/pprof/pprof.go b/internal/pprof/pprof.go index 5b171935..5a505c92 100644 --- a/internal/pprof/pprof.go +++ b/internal/pprof/pprof.go @@ -11,6 +11,7 @@ import ( "github.com/bluenviron/mediamtx/internal/auth" "github.com/bluenviron/mediamtx/internal/conf" + "github.com/bluenviron/mediamtx/internal/defs" "github.com/bluenviron/mediamtx/internal/logger" "github.com/bluenviron/mediamtx/internal/protocols/httpp" ) @@ -103,7 +104,10 @@ func (pp *PPROF) middlewareAuth(ctx *gin.Context) { if err != nil { if err.AskCredentials { ctx.Header("WWW-Authenticate", `Basic realm="mediamtx"`) - ctx.AbortWithStatus(http.StatusUnauthorized) + ctx.AbortWithStatusJSON(http.StatusUnauthorized, &defs.APIError{ + Status: "error", + Error: "authentication error", + }) return } @@ -112,7 +116,10 @@ func (pp *PPROF) middlewareAuth(ctx *gin.Context) { // wait some seconds to delay brute force attacks <-time.After(auth.PauseAfterError) - ctx.AbortWithStatus(http.StatusUnauthorized) + ctx.AbortWithStatusJSON(http.StatusUnauthorized, &defs.APIError{ + Status: "error", + Error: "authentication error", + }) return } } diff --git a/internal/servers/hls/http_server.go b/internal/servers/hls/http_server.go index fcf429ec..07b7f1f7 100644 --- a/internal/servers/hls/http_server.go +++ b/internal/servers/hls/http_server.go @@ -157,7 +157,10 @@ func (s *httpServer) onRequest(ctx *gin.Context) { if errors.As(err, &terr) { if terr.AskCredentials { ctx.Header("WWW-Authenticate", `Basic realm="mediamtx"`) - ctx.Writer.WriteHeader(http.StatusUnauthorized) + ctx.AbortWithStatusJSON(http.StatusUnauthorized, &defs.APIError{ + Status: "error", + Error: "authentication error", + }) return } @@ -166,7 +169,10 @@ func (s *httpServer) onRequest(ctx *gin.Context) { // wait some seconds to delay brute force attacks <-time.After(auth.PauseAfterError) - ctx.Writer.WriteHeader(http.StatusUnauthorized) + ctx.AbortWithStatusJSON(http.StatusUnauthorized, &defs.APIError{ + Status: "error", + Error: "authentication error", + }) return } diff --git a/internal/servers/webrtc/http_server.go b/internal/servers/webrtc/http_server.go index f2b89c94..7dbdaa20 100644 --- a/internal/servers/webrtc/http_server.go +++ b/internal/servers/webrtc/http_server.go @@ -49,7 +49,8 @@ func mergePathAndQuery(path string, rawQuery string) string { func writeError(ctx *gin.Context, statusCode int, err error) { ctx.JSON(statusCode, &defs.APIError{ - Error: err.Error(), + Status: "error", + Error: err.Error(), }) } @@ -138,7 +139,10 @@ func (s *httpServer) checkAuthOutsideSession(ctx *gin.Context, pathName string, if errors.As(err, &terr) { if terr.AskCredentials { ctx.Header("WWW-Authenticate", `Basic realm="mediamtx"`) - ctx.Writer.WriteHeader(http.StatusUnauthorized) + ctx.AbortWithStatusJSON(http.StatusUnauthorized, &defs.APIError{ + Status: "error", + Error: "authentication error", + }) return false } @@ -200,7 +204,10 @@ func (s *httpServer) onWHIPPost(ctx *gin.Context, pathName string, publish bool) if errors.As(err, &terr) { if terr.AskCredentials { ctx.Header("WWW-Authenticate", `Basic realm="mediamtx"`) - ctx.AbortWithStatus(http.StatusUnauthorized) + ctx.AbortWithStatusJSON(http.StatusUnauthorized, &defs.APIError{ + Status: "error", + Error: "authentication error", + }) return } @@ -209,7 +216,10 @@ func (s *httpServer) onWHIPPost(ctx *gin.Context, pathName string, publish bool) // wait some seconds to delay brute force attacks <-time.After(auth.PauseAfterError) - writeError(ctx, http.StatusUnauthorized, terr) + ctx.AbortWithStatusJSON(http.StatusUnauthorized, &defs.APIError{ + Status: "error", + Error: "authentication error", + }) return } @@ -274,7 +284,9 @@ func (s *httpServer) onWHIPPatch(ctx *gin.Context, pathName string, rawSecret st return } - ctx.Writer.WriteHeader(http.StatusNoContent) + ctx.AbortWithStatusJSON(http.StatusNoContent, &defs.APIOK{ + Status: "ok", + }) } func (s *httpServer) onWHIPDelete(ctx *gin.Context, pathName string, rawSecret string) { @@ -297,7 +309,9 @@ func (s *httpServer) onWHIPDelete(ctx *gin.Context, pathName string, rawSecret s return } - ctx.Writer.WriteHeader(http.StatusOK) + ctx.AbortWithStatusJSON(http.StatusOK, &defs.APIOK{ + Status: "ok", + }) } func (s *httpServer) onPage(ctx *gin.Context, pathName string, publish bool) {