diff --git a/internal/api/api.go b/internal/api/api.go index f2e597bd..be889cd9 100644 --- a/internal/api/api.go +++ b/internal/api/api.go @@ -2,32 +2,21 @@ package api import ( - "errors" - "fmt" "net" "net/http" - "os" "reflect" "sort" - "strings" "sync" "time" "github.com/gin-gonic/gin" - "github.com/google/uuid" "github.com/bluenviron/mediamtx/internal/auth" "github.com/bluenviron/mediamtx/internal/conf" - "github.com/bluenviron/mediamtx/internal/conf/jsonwrapper" "github.com/bluenviron/mediamtx/internal/defs" "github.com/bluenviron/mediamtx/internal/logger" "github.com/bluenviron/mediamtx/internal/protocols/httpp" "github.com/bluenviron/mediamtx/internal/recordstore" - "github.com/bluenviron/mediamtx/internal/servers/hls" - "github.com/bluenviron/mediamtx/internal/servers/rtmp" - "github.com/bluenviron/mediamtx/internal/servers/rtsp" - "github.com/bluenviron/mediamtx/internal/servers/srt" - "github.com/bluenviron/mediamtx/internal/servers/webrtc" ) func interfaceIsEmpty(i any) bool { @@ -271,276 +260,6 @@ func (a *API) middlewareAuth(ctx *gin.Context) { } } -func (a *API) onConfigGlobalGet(ctx *gin.Context) { - a.mutex.RLock() - c := a.Conf - a.mutex.RUnlock() - - ctx.JSON(http.StatusOK, c.Global()) -} - -func (a *API) onConfigGlobalPatch(ctx *gin.Context) { - var c conf.OptionalGlobal - err := jsonwrapper.Decode(ctx.Request.Body, &c) - if err != nil { - a.writeError(ctx, http.StatusBadRequest, err) - return - } - - a.mutex.Lock() - defer a.mutex.Unlock() - - newConf := a.Conf.Clone() - - newConf.PatchGlobal(&c) - - err = newConf.Validate(nil) - if err != nil { - a.writeError(ctx, http.StatusBadRequest, err) - return - } - - a.Conf = newConf - - // since reloading the configuration can cause the shutdown of the API, - // call it in a goroutine - go a.Parent.APIConfigSet(newConf) - - ctx.Status(http.StatusOK) -} - -func (a *API) onConfigPathDefaultsGet(ctx *gin.Context) { - a.mutex.RLock() - c := a.Conf - a.mutex.RUnlock() - - ctx.JSON(http.StatusOK, c.PathDefaults) -} - -func (a *API) onConfigPathDefaultsPatch(ctx *gin.Context) { - var p conf.OptionalPath - err := jsonwrapper.Decode(ctx.Request.Body, &p) - if err != nil { - a.writeError(ctx, http.StatusBadRequest, err) - return - } - - a.mutex.Lock() - defer a.mutex.Unlock() - - newConf := a.Conf.Clone() - - newConf.PatchPathDefaults(&p) - - err = newConf.Validate(nil) - if err != nil { - a.writeError(ctx, http.StatusBadRequest, err) - return - } - - a.Conf = newConf - a.Parent.APIConfigSet(newConf) - - ctx.Status(http.StatusOK) -} - -func (a *API) onConfigPathsList(ctx *gin.Context) { - a.mutex.RLock() - c := a.Conf - a.mutex.RUnlock() - - data := &defs.APIPathConfList{ - Items: make([]*conf.Path, len(c.Paths)), - } - - for i, key := range sortedKeys(c.Paths) { - data.Items[i] = c.Paths[key] - } - - data.ItemCount = len(data.Items) - pageCount, err := paginate(&data.Items, ctx.Query("itemsPerPage"), ctx.Query("page")) - if err != nil { - a.writeError(ctx, http.StatusBadRequest, err) - return - } - data.PageCount = pageCount - - ctx.JSON(http.StatusOK, data) -} - -func (a *API) onConfigPathsGet(ctx *gin.Context) { - confName, ok := paramName(ctx) - if !ok { - a.writeError(ctx, http.StatusBadRequest, fmt.Errorf("invalid name")) - return - } - - a.mutex.RLock() - c := a.Conf - a.mutex.RUnlock() - - p, ok := c.Paths[confName] - if !ok { - a.writeError(ctx, http.StatusNotFound, fmt.Errorf("path configuration not found")) - return - } - - ctx.JSON(http.StatusOK, p) -} - -func (a *API) onConfigPathsAdd(ctx *gin.Context) { //nolint:dupl - confName, ok := paramName(ctx) - if !ok { - a.writeError(ctx, http.StatusBadRequest, fmt.Errorf("invalid name")) - return - } - - var p conf.OptionalPath - err := jsonwrapper.Decode(ctx.Request.Body, &p) - if err != nil { - a.writeError(ctx, http.StatusBadRequest, err) - return - } - - a.mutex.Lock() - defer a.mutex.Unlock() - - newConf := a.Conf.Clone() - - err = newConf.AddPath(confName, &p) - if err != nil { - a.writeError(ctx, http.StatusBadRequest, err) - return - } - - err = newConf.Validate(nil) - if err != nil { - a.writeError(ctx, http.StatusBadRequest, err) - return - } - - a.Conf = newConf - a.Parent.APIConfigSet(newConf) - - ctx.Status(http.StatusOK) -} - -func (a *API) onConfigPathsPatch(ctx *gin.Context) { //nolint:dupl - confName, ok := paramName(ctx) - if !ok { - a.writeError(ctx, http.StatusBadRequest, fmt.Errorf("invalid name")) - return - } - - var p conf.OptionalPath - err := jsonwrapper.Decode(ctx.Request.Body, &p) - if err != nil { - a.writeError(ctx, http.StatusBadRequest, err) - return - } - - a.mutex.Lock() - defer a.mutex.Unlock() - - newConf := a.Conf.Clone() - - err = newConf.PatchPath(confName, &p) - if err != nil { - if errors.Is(err, conf.ErrPathNotFound) { - a.writeError(ctx, http.StatusNotFound, err) - } else { - a.writeError(ctx, http.StatusBadRequest, err) - } - return - } - - err = newConf.Validate(nil) - if err != nil { - a.writeError(ctx, http.StatusBadRequest, err) - return - } - - a.Conf = newConf - a.Parent.APIConfigSet(newConf) - - ctx.Status(http.StatusOK) -} - -func (a *API) onConfigPathsReplace(ctx *gin.Context) { //nolint:dupl - confName, ok := paramName(ctx) - if !ok { - a.writeError(ctx, http.StatusBadRequest, fmt.Errorf("invalid name")) - return - } - - var p conf.OptionalPath - err := jsonwrapper.Decode(ctx.Request.Body, &p) - if err != nil { - a.writeError(ctx, http.StatusBadRequest, err) - return - } - - a.mutex.Lock() - defer a.mutex.Unlock() - - newConf := a.Conf.Clone() - - err = newConf.ReplacePath(confName, &p) - if err != nil { - if errors.Is(err, conf.ErrPathNotFound) { - a.writeError(ctx, http.StatusNotFound, err) - } else { - a.writeError(ctx, http.StatusBadRequest, err) - } - return - } - - err = newConf.Validate(nil) - if err != nil { - a.writeError(ctx, http.StatusBadRequest, err) - return - } - - a.Conf = newConf - a.Parent.APIConfigSet(newConf) - - ctx.Status(http.StatusOK) -} - -func (a *API) onConfigPathsDelete(ctx *gin.Context) { - confName, ok := paramName(ctx) - if !ok { - a.writeError(ctx, http.StatusBadRequest, fmt.Errorf("invalid name")) - return - } - - a.mutex.Lock() - defer a.mutex.Unlock() - - newConf := a.Conf.Clone() - - err := newConf.RemovePath(confName) - if err != nil { - if errors.Is(err, conf.ErrPathNotFound) { - a.writeError(ctx, http.StatusNotFound, err) - } else { - a.writeError(ctx, http.StatusBadRequest, err) - } - return - } - - err = newConf.Validate(nil) - if err != nil { - a.writeError(ctx, http.StatusBadRequest, err) - return - } - - a.Conf = newConf - a.Parent.APIConfigSet(newConf) - - ctx.Status(http.StatusOK) -} - func (a *API) onInfo(ctx *gin.Context) { ctx.JSON(http.StatusOK, &defs.APIInfo{ Version: a.Version, @@ -553,590 +272,6 @@ func (a *API) onAuthJwksRefresh(ctx *gin.Context) { ctx.Status(http.StatusOK) } -func (a *API) onPathsList(ctx *gin.Context) { - data, err := a.PathManager.APIPathsList() - if err != nil { - a.writeError(ctx, http.StatusInternalServerError, err) - return - } - - data.ItemCount = len(data.Items) - pageCount, err := paginate(&data.Items, ctx.Query("itemsPerPage"), ctx.Query("page")) - if err != nil { - a.writeError(ctx, http.StatusBadRequest, err) - return - } - data.PageCount = pageCount - - ctx.JSON(http.StatusOK, data) -} - -func (a *API) onPathsGet(ctx *gin.Context) { - pathName, ok := paramName(ctx) - if !ok { - a.writeError(ctx, http.StatusBadRequest, fmt.Errorf("invalid name")) - return - } - - data, err := a.PathManager.APIPathsGet(pathName) - if err != nil { - if errors.Is(err, conf.ErrPathNotFound) { - a.writeError(ctx, http.StatusNotFound, err) - } else { - a.writeError(ctx, http.StatusInternalServerError, err) - } - return - } - - ctx.JSON(http.StatusOK, data) -} - -func (a *API) onRTSPConnsList(ctx *gin.Context) { - data, err := a.RTSPServer.APIConnsList() - if err != nil { - a.writeError(ctx, http.StatusInternalServerError, err) - return - } - - data.ItemCount = len(data.Items) - pageCount, err := paginate(&data.Items, ctx.Query("itemsPerPage"), ctx.Query("page")) - if err != nil { - a.writeError(ctx, http.StatusBadRequest, err) - return - } - data.PageCount = pageCount - - ctx.JSON(http.StatusOK, data) -} - -func (a *API) onRTSPConnsGet(ctx *gin.Context) { - uuid, err := uuid.Parse(ctx.Param("id")) - if err != nil { - a.writeError(ctx, http.StatusBadRequest, err) - return - } - - data, err := a.RTSPServer.APIConnsGet(uuid) - if err != nil { - if errors.Is(err, rtsp.ErrConnNotFound) { - a.writeError(ctx, http.StatusNotFound, err) - } else { - a.writeError(ctx, http.StatusInternalServerError, err) - } - return - } - - ctx.JSON(http.StatusOK, data) -} - -func (a *API) onRTSPSessionsList(ctx *gin.Context) { - data, err := a.RTSPServer.APISessionsList() - if err != nil { - a.writeError(ctx, http.StatusInternalServerError, err) - return - } - - data.ItemCount = len(data.Items) - pageCount, err := paginate(&data.Items, ctx.Query("itemsPerPage"), ctx.Query("page")) - if err != nil { - a.writeError(ctx, http.StatusBadRequest, err) - return - } - data.PageCount = pageCount - - ctx.JSON(http.StatusOK, data) -} - -func (a *API) onRTSPSessionsGet(ctx *gin.Context) { - uuid, err := uuid.Parse(ctx.Param("id")) - if err != nil { - a.writeError(ctx, http.StatusBadRequest, err) - return - } - - data, err := a.RTSPServer.APISessionsGet(uuid) - if err != nil { - if errors.Is(err, rtsp.ErrSessionNotFound) { - a.writeError(ctx, http.StatusNotFound, err) - } else { - a.writeError(ctx, http.StatusInternalServerError, err) - } - return - } - - ctx.JSON(http.StatusOK, data) -} - -func (a *API) onRTSPSessionsKick(ctx *gin.Context) { - uuid, err := uuid.Parse(ctx.Param("id")) - if err != nil { - a.writeError(ctx, http.StatusBadRequest, err) - return - } - - err = a.RTSPServer.APISessionsKick(uuid) - if err != nil { - if errors.Is(err, rtsp.ErrSessionNotFound) { - a.writeError(ctx, http.StatusNotFound, err) - } else { - a.writeError(ctx, http.StatusInternalServerError, err) - } - return - } - - ctx.Status(http.StatusOK) -} - -func (a *API) onRTSPSConnsList(ctx *gin.Context) { - data, err := a.RTSPSServer.APIConnsList() - if err != nil { - a.writeError(ctx, http.StatusInternalServerError, err) - return - } - - data.ItemCount = len(data.Items) - pageCount, err := paginate(&data.Items, ctx.Query("itemsPerPage"), ctx.Query("page")) - if err != nil { - a.writeError(ctx, http.StatusBadRequest, err) - return - } - data.PageCount = pageCount - - ctx.JSON(http.StatusOK, data) -} - -func (a *API) onRTSPSConnsGet(ctx *gin.Context) { - uuid, err := uuid.Parse(ctx.Param("id")) - if err != nil { - a.writeError(ctx, http.StatusBadRequest, err) - return - } - - data, err := a.RTSPSServer.APIConnsGet(uuid) - if err != nil { - if errors.Is(err, rtsp.ErrConnNotFound) { - a.writeError(ctx, http.StatusNotFound, err) - } else { - a.writeError(ctx, http.StatusInternalServerError, err) - } - return - } - - ctx.JSON(http.StatusOK, data) -} - -func (a *API) onRTSPSSessionsList(ctx *gin.Context) { - data, err := a.RTSPSServer.APISessionsList() - if err != nil { - a.writeError(ctx, http.StatusInternalServerError, err) - return - } - - data.ItemCount = len(data.Items) - pageCount, err := paginate(&data.Items, ctx.Query("itemsPerPage"), ctx.Query("page")) - if err != nil { - a.writeError(ctx, http.StatusBadRequest, err) - return - } - data.PageCount = pageCount - - ctx.JSON(http.StatusOK, data) -} - -func (a *API) onRTSPSSessionsGet(ctx *gin.Context) { - uuid, err := uuid.Parse(ctx.Param("id")) - if err != nil { - a.writeError(ctx, http.StatusBadRequest, err) - return - } - - data, err := a.RTSPSServer.APISessionsGet(uuid) - if err != nil { - if errors.Is(err, rtsp.ErrSessionNotFound) { - a.writeError(ctx, http.StatusNotFound, err) - } else { - a.writeError(ctx, http.StatusInternalServerError, err) - } - return - } - - ctx.JSON(http.StatusOK, data) -} - -func (a *API) onRTSPSSessionsKick(ctx *gin.Context) { - uuid, err := uuid.Parse(ctx.Param("id")) - if err != nil { - a.writeError(ctx, http.StatusBadRequest, err) - return - } - - err = a.RTSPSServer.APISessionsKick(uuid) - if err != nil { - if errors.Is(err, rtsp.ErrSessionNotFound) { - a.writeError(ctx, http.StatusNotFound, err) - } else { - a.writeError(ctx, http.StatusInternalServerError, err) - } - return - } - - ctx.Status(http.StatusOK) -} - -func (a *API) onRTMPConnsList(ctx *gin.Context) { - data, err := a.RTMPServer.APIConnsList() - if err != nil { - a.writeError(ctx, http.StatusInternalServerError, err) - return - } - - data.ItemCount = len(data.Items) - pageCount, err := paginate(&data.Items, ctx.Query("itemsPerPage"), ctx.Query("page")) - if err != nil { - a.writeError(ctx, http.StatusBadRequest, err) - return - } - data.PageCount = pageCount - - ctx.JSON(http.StatusOK, data) -} - -func (a *API) onRTMPConnsGet(ctx *gin.Context) { - uuid, err := uuid.Parse(ctx.Param("id")) - if err != nil { - a.writeError(ctx, http.StatusBadRequest, err) - return - } - - data, err := a.RTMPServer.APIConnsGet(uuid) - if err != nil { - if errors.Is(err, rtmp.ErrConnNotFound) { - a.writeError(ctx, http.StatusNotFound, err) - } else { - a.writeError(ctx, http.StatusInternalServerError, err) - } - return - } - - ctx.JSON(http.StatusOK, data) -} - -func (a *API) onRTMPConnsKick(ctx *gin.Context) { - uuid, err := uuid.Parse(ctx.Param("id")) - if err != nil { - a.writeError(ctx, http.StatusBadRequest, err) - return - } - - err = a.RTMPServer.APIConnsKick(uuid) - if err != nil { - if errors.Is(err, rtmp.ErrConnNotFound) { - a.writeError(ctx, http.StatusNotFound, err) - } else { - a.writeError(ctx, http.StatusInternalServerError, err) - } - return - } - - ctx.Status(http.StatusOK) -} - -func (a *API) onRTMPSConnsList(ctx *gin.Context) { - data, err := a.RTMPSServer.APIConnsList() - if err != nil { - a.writeError(ctx, http.StatusInternalServerError, err) - return - } - - data.ItemCount = len(data.Items) - pageCount, err := paginate(&data.Items, ctx.Query("itemsPerPage"), ctx.Query("page")) - if err != nil { - a.writeError(ctx, http.StatusBadRequest, err) - return - } - data.PageCount = pageCount - - ctx.JSON(http.StatusOK, data) -} - -func (a *API) onRTMPSConnsGet(ctx *gin.Context) { - uuid, err := uuid.Parse(ctx.Param("id")) - if err != nil { - a.writeError(ctx, http.StatusBadRequest, err) - return - } - - data, err := a.RTMPSServer.APIConnsGet(uuid) - if err != nil { - if errors.Is(err, rtmp.ErrConnNotFound) { - a.writeError(ctx, http.StatusNotFound, err) - } else { - a.writeError(ctx, http.StatusInternalServerError, err) - } - return - } - - ctx.JSON(http.StatusOK, data) -} - -func (a *API) onRTMPSConnsKick(ctx *gin.Context) { - uuid, err := uuid.Parse(ctx.Param("id")) - if err != nil { - a.writeError(ctx, http.StatusBadRequest, err) - return - } - - err = a.RTMPSServer.APIConnsKick(uuid) - if err != nil { - if errors.Is(err, rtmp.ErrConnNotFound) { - a.writeError(ctx, http.StatusNotFound, err) - } else { - a.writeError(ctx, http.StatusInternalServerError, err) - } - return - } - - ctx.Status(http.StatusOK) -} - -func (a *API) onHLSMuxersList(ctx *gin.Context) { - data, err := a.HLSServer.APIMuxersList() - if err != nil { - a.writeError(ctx, http.StatusInternalServerError, err) - return - } - - data.ItemCount = len(data.Items) - pageCount, err := paginate(&data.Items, ctx.Query("itemsPerPage"), ctx.Query("page")) - if err != nil { - a.writeError(ctx, http.StatusBadRequest, err) - return - } - data.PageCount = pageCount - - ctx.JSON(http.StatusOK, data) -} - -func (a *API) onHLSMuxersGet(ctx *gin.Context) { - pathName, ok := paramName(ctx) - if !ok { - a.writeError(ctx, http.StatusBadRequest, fmt.Errorf("invalid name")) - return - } - - data, err := a.HLSServer.APIMuxersGet(pathName) - if err != nil { - if errors.Is(err, hls.ErrMuxerNotFound) { - a.writeError(ctx, http.StatusNotFound, err) - } else { - a.writeError(ctx, http.StatusInternalServerError, err) - } - return - } - - ctx.JSON(http.StatusOK, data) -} - -func (a *API) onWebRTCSessionsList(ctx *gin.Context) { - data, err := a.WebRTCServer.APISessionsList() - if err != nil { - a.writeError(ctx, http.StatusInternalServerError, err) - return - } - - data.ItemCount = len(data.Items) - pageCount, err := paginate(&data.Items, ctx.Query("itemsPerPage"), ctx.Query("page")) - if err != nil { - a.writeError(ctx, http.StatusBadRequest, err) - return - } - data.PageCount = pageCount - - ctx.JSON(http.StatusOK, data) -} - -func (a *API) onWebRTCSessionsGet(ctx *gin.Context) { - uuid, err := uuid.Parse(ctx.Param("id")) - if err != nil { - a.writeError(ctx, http.StatusBadRequest, err) - return - } - - data, err := a.WebRTCServer.APISessionsGet(uuid) - if err != nil { - if errors.Is(err, webrtc.ErrSessionNotFound) { - a.writeError(ctx, http.StatusNotFound, err) - } else { - a.writeError(ctx, http.StatusInternalServerError, err) - } - return - } - - ctx.JSON(http.StatusOK, data) -} - -func (a *API) onWebRTCSessionsKick(ctx *gin.Context) { - uuid, err := uuid.Parse(ctx.Param("id")) - if err != nil { - a.writeError(ctx, http.StatusBadRequest, err) - return - } - - err = a.WebRTCServer.APISessionsKick(uuid) - if err != nil { - if errors.Is(err, webrtc.ErrSessionNotFound) { - a.writeError(ctx, http.StatusNotFound, err) - } else { - a.writeError(ctx, http.StatusInternalServerError, err) - } - return - } - - ctx.Status(http.StatusOK) -} - -func (a *API) onSRTConnsList(ctx *gin.Context) { - data, err := a.SRTServer.APIConnsList() - if err != nil { - a.writeError(ctx, http.StatusInternalServerError, err) - return - } - - data.ItemCount = len(data.Items) - pageCount, err := paginate(&data.Items, ctx.Query("itemsPerPage"), ctx.Query("page")) - if err != nil { - a.writeError(ctx, http.StatusBadRequest, err) - return - } - data.PageCount = pageCount - - ctx.JSON(http.StatusOK, data) -} - -func (a *API) onSRTConnsGet(ctx *gin.Context) { - uuid, err := uuid.Parse(ctx.Param("id")) - if err != nil { - a.writeError(ctx, http.StatusBadRequest, err) - return - } - - data, err := a.SRTServer.APIConnsGet(uuid) - if err != nil { - if errors.Is(err, srt.ErrConnNotFound) { - a.writeError(ctx, http.StatusNotFound, err) - } else { - a.writeError(ctx, http.StatusInternalServerError, err) - } - return - } - - ctx.JSON(http.StatusOK, data) -} - -func (a *API) onSRTConnsKick(ctx *gin.Context) { - uuid, err := uuid.Parse(ctx.Param("id")) - if err != nil { - a.writeError(ctx, http.StatusBadRequest, err) - return - } - - err = a.SRTServer.APIConnsKick(uuid) - if err != nil { - if errors.Is(err, srt.ErrConnNotFound) { - a.writeError(ctx, http.StatusNotFound, err) - } else { - a.writeError(ctx, http.StatusInternalServerError, err) - } - return - } - - ctx.Status(http.StatusOK) -} - -func (a *API) onRecordingsList(ctx *gin.Context) { - a.mutex.RLock() - c := a.Conf - a.mutex.RUnlock() - - pathNames := recordstore.FindAllPathsWithSegments(c.Paths) - - data := defs.APIRecordingList{} - - data.ItemCount = len(pathNames) - pageCount, err := paginate(&pathNames, ctx.Query("itemsPerPage"), ctx.Query("page")) - if err != nil { - a.writeError(ctx, http.StatusBadRequest, err) - return - } - data.PageCount = pageCount - - data.Items = make([]*defs.APIRecording, len(pathNames)) - - for i, pathName := range pathNames { - pathConf, _, _ := conf.FindPathConf(c.Paths, pathName) - data.Items[i] = recordingsOfPath(pathConf, pathName) - } - - ctx.JSON(http.StatusOK, data) -} - -func (a *API) onRecordingsGet(ctx *gin.Context) { - pathName, ok := paramName(ctx) - if !ok { - a.writeError(ctx, http.StatusBadRequest, fmt.Errorf("invalid name")) - return - } - - a.mutex.RLock() - c := a.Conf - a.mutex.RUnlock() - - pathConf, _, err := conf.FindPathConf(c.Paths, pathName) - if err != nil { - a.writeError(ctx, http.StatusBadRequest, err) - return - } - - ctx.JSON(http.StatusOK, recordingsOfPath(pathConf, pathName)) -} - -func (a *API) onRecordingDeleteSegment(ctx *gin.Context) { - pathName := ctx.Query("path") - - start, err := time.Parse(time.RFC3339, ctx.Query("start")) - if err != nil { - a.writeError(ctx, http.StatusBadRequest, fmt.Errorf("invalid 'start' parameter: %w", err)) - return - } - - a.mutex.RLock() - c := a.Conf - a.mutex.RUnlock() - - pathConf, _, err := conf.FindPathConf(c.Paths, pathName) - if err != nil { - a.writeError(ctx, http.StatusBadRequest, err) - return - } - - pathFormat := recordstore.PathAddExtension( - strings.ReplaceAll(pathConf.RecordPath, "%path", pathName), - pathConf.RecordFormat, - ) - - segmentPath := recordstore.Path{ - Start: start, - }.Encode(pathFormat) - - err = os.Remove(segmentPath) - if err != nil { - a.writeError(ctx, http.StatusBadRequest, err) - return - } - - ctx.Status(http.StatusOK) -} - // ReloadConf is called by core. func (a *API) ReloadConf(conf *conf.Conf) { a.mutex.Lock() diff --git a/internal/api/api_config_global.go b/internal/api/api_config_global.go new file mode 100644 index 00000000..615bdc57 --- /dev/null +++ b/internal/api/api_config_global.go @@ -0,0 +1,47 @@ +package api + +import ( + "net/http" + + "github.com/bluenviron/mediamtx/internal/conf" + "github.com/bluenviron/mediamtx/internal/conf/jsonwrapper" + "github.com/gin-gonic/gin" +) + +func (a *API) onConfigGlobalGet(ctx *gin.Context) { + a.mutex.RLock() + c := a.Conf + a.mutex.RUnlock() + + ctx.JSON(http.StatusOK, c.Global()) +} + +func (a *API) onConfigGlobalPatch(ctx *gin.Context) { + var c conf.OptionalGlobal + err := jsonwrapper.Decode(ctx.Request.Body, &c) + if err != nil { + a.writeError(ctx, http.StatusBadRequest, err) + return + } + + a.mutex.Lock() + defer a.mutex.Unlock() + + newConf := a.Conf.Clone() + + newConf.PatchGlobal(&c) + + err = newConf.Validate(nil) + if err != nil { + a.writeError(ctx, http.StatusBadRequest, err) + return + } + + a.Conf = newConf + + // since reloading the configuration can cause the shutdown of the API, + // call it in a goroutine + go a.Parent.APIConfigSet(newConf) + + ctx.Status(http.StatusOK) +} diff --git a/internal/api/api_config_global_test.go b/internal/api/api_config_global_test.go new file mode 100644 index 00000000..915e5335 --- /dev/null +++ b/internal/api/api_config_global_test.go @@ -0,0 +1,124 @@ +package api + +import ( + "bytes" + "encoding/json" + "net/http" + "testing" + "time" + + "github.com/bluenviron/mediamtx/internal/auth" + "github.com/bluenviron/mediamtx/internal/conf" + "github.com/bluenviron/mediamtx/internal/test" + "github.com/stretchr/testify/require" +) + +func TestConfigGlobalGet(t *testing.T) { + cnf := tempConf(t, "api: yes\n") + checked := false + + api := API{ + Address: "localhost:9997", + ReadTimeout: conf.Duration(10 * time.Second), + WriteTimeout: conf.Duration(10 * time.Second), + Conf: cnf, + AuthManager: &test.AuthManager{ + AuthenticateImpl: func(req *auth.Request) *auth.Error { + require.Equal(t, conf.AuthActionAPI, req.Action) + require.Equal(t, "myuser", req.Credentials.User) + require.Equal(t, "mypass", req.Credentials.Pass) + checked = true + return nil + }, + }, + Parent: &testParent{}, + } + err := api.Initialize() + require.NoError(t, err) + defer api.Close() + + tr := &http.Transport{} + defer tr.CloseIdleConnections() + hc := &http.Client{Transport: tr} + + var out map[string]any + httpRequest(t, hc, http.MethodGet, "http://myuser:mypass@localhost:9997/v3/config/global/get", nil, &out) + require.Equal(t, true, out["api"]) + + require.True(t, checked) +} + +func TestConfigGlobalPatch(t *testing.T) { + cnf := tempConf(t, "api: yes\n") + + api := API{ + Address: "localhost:9997", + ReadTimeout: conf.Duration(10 * time.Second), + WriteTimeout: conf.Duration(10 * time.Second), + Conf: cnf, + AuthManager: test.NilAuthManager, + Parent: &testParent{}, + } + err := api.Initialize() + require.NoError(t, err) + defer api.Close() + + tr := &http.Transport{} + defer tr.CloseIdleConnections() + hc := &http.Client{Transport: tr} + + httpRequest(t, hc, http.MethodPatch, "http://localhost:9997/v3/config/global/patch", + map[string]any{ + "rtmp": false, + "readTimeout": "7s", + "protocols": []string{"tcp"}, + "readBufferCount": 4096, // test setting a deprecated parameter + }, nil) + + time.Sleep(500 * time.Millisecond) + + var out map[string]any + httpRequest(t, hc, http.MethodGet, "http://localhost:9997/v3/config/global/get", nil, &out) + require.Equal(t, false, out["rtmp"]) + require.Equal(t, "7s", out["readTimeout"]) + require.Equal(t, []any{"tcp"}, out["protocols"]) + require.Equal(t, float64(4096), out["readBufferCount"]) +} + +func TestConfigGlobalPatchUnknownField(t *testing.T) { //nolint:dupl + cnf := tempConf(t, "api: yes\n") + + api := API{ + Address: "localhost:9997", + ReadTimeout: conf.Duration(10 * time.Second), + WriteTimeout: conf.Duration(10 * time.Second), + Conf: cnf, + AuthManager: test.NilAuthManager, + Parent: &testParent{}, + } + err := api.Initialize() + require.NoError(t, err) + defer api.Close() + + b := map[string]any{ + "test": "asd", + } + + byts, err := json.Marshal(b) + require.NoError(t, err) + + tr := &http.Transport{} + defer tr.CloseIdleConnections() + hc := &http.Client{Transport: tr} + + req, err := http.NewRequest(http.MethodPatch, "http://localhost:9997/v3/config/global/patch", + bytes.NewReader(byts)) + require.NoError(t, err) + + res, err := hc.Do(req) + require.NoError(t, err) + defer res.Body.Close() + + require.Equal(t, http.StatusBadRequest, res.StatusCode) + checkError(t, res.Body, "json: unknown field \"test\"") +} diff --git a/internal/api/api_config_pathdefaults.go b/internal/api/api_config_pathdefaults.go new file mode 100644 index 00000000..c8ad6de4 --- /dev/null +++ b/internal/api/api_config_pathdefaults.go @@ -0,0 +1,44 @@ +package api + +import ( + "net/http" + + "github.com/bluenviron/mediamtx/internal/conf" + "github.com/bluenviron/mediamtx/internal/conf/jsonwrapper" + "github.com/gin-gonic/gin" +) + +func (a *API) onConfigPathDefaultsGet(ctx *gin.Context) { + a.mutex.RLock() + c := a.Conf + a.mutex.RUnlock() + + ctx.JSON(http.StatusOK, c.PathDefaults) +} + +func (a *API) onConfigPathDefaultsPatch(ctx *gin.Context) { + var p conf.OptionalPath + err := jsonwrapper.Decode(ctx.Request.Body, &p) + if err != nil { + a.writeError(ctx, http.StatusBadRequest, err) + return + } + + a.mutex.Lock() + defer a.mutex.Unlock() + + newConf := a.Conf.Clone() + + newConf.PatchPathDefaults(&p) + + err = newConf.Validate(nil) + if err != nil { + a.writeError(ctx, http.StatusBadRequest, err) + return + } + + a.Conf = newConf + a.Parent.APIConfigSet(newConf) + + ctx.Status(http.StatusOK) +} diff --git a/internal/api/api_config_pathdefaults_test.go b/internal/api/api_config_pathdefaults_test.go new file mode 100644 index 00000000..9a19e137 --- /dev/null +++ b/internal/api/api_config_pathdefaults_test.go @@ -0,0 +1,66 @@ +package api + +import ( + "net/http" + "testing" + "time" + + "github.com/bluenviron/mediamtx/internal/conf" + "github.com/bluenviron/mediamtx/internal/test" + "github.com/stretchr/testify/require" +) + +func TestConfigPathDefaultsGet(t *testing.T) { + cnf := tempConf(t, "api: yes\n") + + api := API{ + Address: "localhost:9997", + ReadTimeout: conf.Duration(10 * time.Second), + WriteTimeout: conf.Duration(10 * time.Second), + Conf: cnf, + AuthManager: test.NilAuthManager, + Parent: &testParent{}, + } + err := api.Initialize() + require.NoError(t, err) + defer api.Close() + + tr := &http.Transport{} + defer tr.CloseIdleConnections() + hc := &http.Client{Transport: tr} + + var out map[string]any + httpRequest(t, hc, http.MethodGet, "http://localhost:9997/v3/config/pathdefaults/get", nil, &out) + require.Equal(t, "publisher", out["source"]) +} + +func TestConfigPathDefaultsPatch(t *testing.T) { + cnf := tempConf(t, "api: yes\n") + + api := API{ + Address: "localhost:9997", + ReadTimeout: conf.Duration(10 * time.Second), + WriteTimeout: conf.Duration(10 * time.Second), + Conf: cnf, + AuthManager: test.NilAuthManager, + Parent: &testParent{}, + } + err := api.Initialize() + require.NoError(t, err) + defer api.Close() + + tr := &http.Transport{} + defer tr.CloseIdleConnections() + hc := &http.Client{Transport: tr} + + httpRequest(t, hc, http.MethodPatch, "http://localhost:9997/v3/config/pathdefaults/patch", + map[string]any{ + "recordFormat": "fmp4", + }, nil) + + time.Sleep(500 * time.Millisecond) + + var out map[string]any + httpRequest(t, hc, http.MethodGet, "http://localhost:9997/v3/config/pathdefaults/get", nil, &out) + require.Equal(t, "fmp4", out["recordFormat"]) +} diff --git a/internal/api/api_config_paths.go b/internal/api/api_config_paths.go new file mode 100644 index 00000000..409cd5f9 --- /dev/null +++ b/internal/api/api_config_paths.go @@ -0,0 +1,209 @@ +package api + +import ( + "errors" + "fmt" + "net/http" + + "github.com/bluenviron/mediamtx/internal/conf" + "github.com/bluenviron/mediamtx/internal/conf/jsonwrapper" + "github.com/bluenviron/mediamtx/internal/defs" + "github.com/gin-gonic/gin" +) + +func (a *API) onConfigPathsList(ctx *gin.Context) { + a.mutex.RLock() + c := a.Conf + a.mutex.RUnlock() + + data := &defs.APIPathConfList{ + Items: make([]*conf.Path, len(c.Paths)), + } + + for i, key := range sortedKeys(c.Paths) { + data.Items[i] = c.Paths[key] + } + + data.ItemCount = len(data.Items) + pageCount, err := paginate(&data.Items, ctx.Query("itemsPerPage"), ctx.Query("page")) + if err != nil { + a.writeError(ctx, http.StatusBadRequest, err) + return + } + data.PageCount = pageCount + + ctx.JSON(http.StatusOK, data) +} + +func (a *API) onConfigPathsGet(ctx *gin.Context) { + confName, ok := paramName(ctx) + if !ok { + a.writeError(ctx, http.StatusBadRequest, fmt.Errorf("invalid name")) + return + } + + a.mutex.RLock() + c := a.Conf + a.mutex.RUnlock() + + p, ok := c.Paths[confName] + if !ok { + a.writeError(ctx, http.StatusNotFound, fmt.Errorf("path configuration not found")) + return + } + + ctx.JSON(http.StatusOK, p) +} + +func (a *API) onConfigPathsAdd(ctx *gin.Context) { //nolint:dupl + confName, ok := paramName(ctx) + if !ok { + a.writeError(ctx, http.StatusBadRequest, fmt.Errorf("invalid name")) + return + } + + var p conf.OptionalPath + err := jsonwrapper.Decode(ctx.Request.Body, &p) + if err != nil { + a.writeError(ctx, http.StatusBadRequest, err) + return + } + + a.mutex.Lock() + defer a.mutex.Unlock() + + newConf := a.Conf.Clone() + + err = newConf.AddPath(confName, &p) + if err != nil { + a.writeError(ctx, http.StatusBadRequest, err) + return + } + + err = newConf.Validate(nil) + if err != nil { + a.writeError(ctx, http.StatusBadRequest, err) + return + } + + a.Conf = newConf + a.Parent.APIConfigSet(newConf) + + ctx.Status(http.StatusOK) +} + +func (a *API) onConfigPathsPatch(ctx *gin.Context) { //nolint:dupl + confName, ok := paramName(ctx) + if !ok { + a.writeError(ctx, http.StatusBadRequest, fmt.Errorf("invalid name")) + return + } + + var p conf.OptionalPath + err := jsonwrapper.Decode(ctx.Request.Body, &p) + if err != nil { + a.writeError(ctx, http.StatusBadRequest, err) + return + } + + a.mutex.Lock() + defer a.mutex.Unlock() + + newConf := a.Conf.Clone() + + err = newConf.PatchPath(confName, &p) + if err != nil { + if errors.Is(err, conf.ErrPathNotFound) { + a.writeError(ctx, http.StatusNotFound, err) + } else { + a.writeError(ctx, http.StatusBadRequest, err) + } + return + } + + err = newConf.Validate(nil) + if err != nil { + a.writeError(ctx, http.StatusBadRequest, err) + return + } + + a.Conf = newConf + a.Parent.APIConfigSet(newConf) + + ctx.Status(http.StatusOK) +} + +func (a *API) onConfigPathsReplace(ctx *gin.Context) { //nolint:dupl + confName, ok := paramName(ctx) + if !ok { + a.writeError(ctx, http.StatusBadRequest, fmt.Errorf("invalid name")) + return + } + + var p conf.OptionalPath + err := jsonwrapper.Decode(ctx.Request.Body, &p) + if err != nil { + a.writeError(ctx, http.StatusBadRequest, err) + return + } + + a.mutex.Lock() + defer a.mutex.Unlock() + + newConf := a.Conf.Clone() + + err = newConf.ReplacePath(confName, &p) + if err != nil { + if errors.Is(err, conf.ErrPathNotFound) { + a.writeError(ctx, http.StatusNotFound, err) + } else { + a.writeError(ctx, http.StatusBadRequest, err) + } + return + } + + err = newConf.Validate(nil) + if err != nil { + a.writeError(ctx, http.StatusBadRequest, err) + return + } + + a.Conf = newConf + a.Parent.APIConfigSet(newConf) + + ctx.Status(http.StatusOK) +} + +func (a *API) onConfigPathsDelete(ctx *gin.Context) { + confName, ok := paramName(ctx) + if !ok { + a.writeError(ctx, http.StatusBadRequest, fmt.Errorf("invalid name")) + return + } + + a.mutex.Lock() + defer a.mutex.Unlock() + + newConf := a.Conf.Clone() + + err := newConf.RemovePath(confName) + if err != nil { + if errors.Is(err, conf.ErrPathNotFound) { + a.writeError(ctx, http.StatusNotFound, err) + } else { + a.writeError(ctx, http.StatusBadRequest, err) + } + return + } + + err = newConf.Validate(nil) + if err != nil { + a.writeError(ctx, http.StatusBadRequest, err) + return + } + + a.Conf = newConf + a.Parent.APIConfigSet(newConf) + + ctx.Status(http.StatusOK) +} diff --git a/internal/api/api_config_paths_test.go b/internal/api/api_config_paths_test.go new file mode 100644 index 00000000..0a2bb60d --- /dev/null +++ b/internal/api/api_config_paths_test.go @@ -0,0 +1,314 @@ +package api + +import ( + "bytes" + "encoding/json" + "net/http" + "testing" + "time" + + "github.com/bluenviron/mediamtx/internal/conf" + "github.com/bluenviron/mediamtx/internal/test" + "github.com/stretchr/testify/require" +) + +func TestConfigPathsList(t *testing.T) { + cnf := tempConf(t, "api: yes\n"+ + "paths:\n"+ + " path1:\n"+ + " readUser: myuser1\n"+ + " readPass: mypass1\n"+ + " path2:\n"+ + " readUser: myuser2\n"+ + " readPass: mypass2\n") + + api := API{ + Address: "localhost:9997", + ReadTimeout: conf.Duration(10 * time.Second), + WriteTimeout: conf.Duration(10 * time.Second), + Conf: cnf, + AuthManager: test.NilAuthManager, + Parent: &testParent{}, + } + err := api.Initialize() + require.NoError(t, err) + defer api.Close() + + type pathConfig map[string]any + + type listRes struct { + ItemCount int `json:"itemCount"` + PageCount int `json:"pageCount"` + Items []pathConfig `json:"items"` + } + + tr := &http.Transport{} + defer tr.CloseIdleConnections() + hc := &http.Client{Transport: tr} + + var out listRes + httpRequest(t, hc, http.MethodGet, "http://localhost:9997/v3/config/paths/list", nil, &out) + require.Equal(t, 2, out.ItemCount) + require.Equal(t, 1, out.PageCount) + require.Equal(t, "path1", out.Items[0]["name"]) + require.Equal(t, "myuser1", out.Items[0]["readUser"]) + require.Equal(t, "mypass1", out.Items[0]["readPass"]) + require.Equal(t, "path2", out.Items[1]["name"]) + require.Equal(t, "myuser2", out.Items[1]["readUser"]) + require.Equal(t, "mypass2", out.Items[1]["readPass"]) +} + +func TestConfigPathsGet(t *testing.T) { + cnf := tempConf(t, "api: yes\n"+ + "paths:\n"+ + " my/path:\n"+ + " readUser: myuser\n"+ + " readPass: mypass\n") + + api := API{ + Address: "localhost:9997", + ReadTimeout: conf.Duration(10 * time.Second), + WriteTimeout: conf.Duration(10 * time.Second), + Conf: cnf, + AuthManager: test.NilAuthManager, + Parent: &testParent{}, + } + err := api.Initialize() + require.NoError(t, err) + defer api.Close() + + tr := &http.Transport{} + defer tr.CloseIdleConnections() + hc := &http.Client{Transport: tr} + + var out map[string]any + httpRequest(t, hc, http.MethodGet, "http://localhost:9997/v3/config/paths/get/my/path", nil, &out) + require.Equal(t, "my/path", out["name"]) + require.Equal(t, "myuser", out["readUser"]) +} + +func TestConfigPathsAdd(t *testing.T) { + cnf := tempConf(t, "api: yes\n") + + api := API{ + Address: "localhost:9997", + ReadTimeout: conf.Duration(10 * time.Second), + WriteTimeout: conf.Duration(10 * time.Second), + Conf: cnf, + AuthManager: test.NilAuthManager, + Parent: &testParent{}, + } + err := api.Initialize() + require.NoError(t, err) + defer api.Close() + + tr := &http.Transport{} + defer tr.CloseIdleConnections() + hc := &http.Client{Transport: tr} + + httpRequest(t, hc, http.MethodPost, "http://localhost:9997/v3/config/paths/add/my/path", + map[string]any{ + "source": "rtsp://127.0.0.1:9999/mypath", + "sourceOnDemand": true, + "disablePublisherOverride": true, // test setting a deprecated parameter + "rpiCameraVFlip": true, + }, nil) + + var out map[string]any + httpRequest(t, hc, http.MethodGet, "http://localhost:9997/v3/config/paths/get/my/path", nil, &out) + require.Equal(t, "rtsp://127.0.0.1:9999/mypath", out["source"]) + require.Equal(t, true, out["sourceOnDemand"]) + require.Equal(t, true, out["disablePublisherOverride"]) + require.Equal(t, true, out["rpiCameraVFlip"]) +} + +func TestConfigPathsAddUnknownField(t *testing.T) { //nolint:dupl + cnf := tempConf(t, "api: yes\n") + + api := API{ + Address: "localhost:9997", + ReadTimeout: conf.Duration(10 * time.Second), + WriteTimeout: conf.Duration(10 * time.Second), + Conf: cnf, + AuthManager: test.NilAuthManager, + Parent: &testParent{}, + } + err := api.Initialize() + require.NoError(t, err) + defer api.Close() + + b := map[string]any{ + "test": "asd", + } + + byts, err := json.Marshal(b) + require.NoError(t, err) + + tr := &http.Transport{} + defer tr.CloseIdleConnections() + hc := &http.Client{Transport: tr} + + req, err := http.NewRequest(http.MethodPost, + "http://localhost:9997/v3/config/paths/add/my/path", bytes.NewReader(byts)) + require.NoError(t, err) + + res, err := hc.Do(req) + require.NoError(t, err) + defer res.Body.Close() + + require.Equal(t, http.StatusBadRequest, res.StatusCode) + checkError(t, res.Body, "json: unknown field \"test\"") +} + +func TestConfigPathsPatch(t *testing.T) { //nolint:dupl + cnf := tempConf(t, "api: yes\n") + + api := API{ + Address: "localhost:9997", + ReadTimeout: conf.Duration(10 * time.Second), + WriteTimeout: conf.Duration(10 * time.Second), + Conf: cnf, + AuthManager: test.NilAuthManager, + Parent: &testParent{}, + } + err := api.Initialize() + require.NoError(t, err) + defer api.Close() + + tr := &http.Transport{} + defer tr.CloseIdleConnections() + hc := &http.Client{Transport: tr} + + httpRequest(t, hc, http.MethodPost, "http://localhost:9997/v3/config/paths/add/my/path", + map[string]any{ + "source": "rtsp://127.0.0.1:9999/mypath", + "sourceOnDemand": true, + "disablePublisherOverride": true, // test setting a deprecated parameter + "rpiCameraVFlip": true, + }, nil) + + httpRequest(t, hc, http.MethodPatch, "http://localhost:9997/v3/config/paths/patch/my/path", + map[string]any{ + "source": "rtsp://127.0.0.1:9998/mypath", + "sourceOnDemand": true, + }, nil) + + var out map[string]any + httpRequest(t, hc, http.MethodGet, "http://localhost:9997/v3/config/paths/get/my/path", nil, &out) + require.Equal(t, "rtsp://127.0.0.1:9998/mypath", out["source"]) + require.Equal(t, true, out["sourceOnDemand"]) + require.Equal(t, true, out["disablePublisherOverride"]) + require.Equal(t, true, out["rpiCameraVFlip"]) +} + +func TestConfigPathsReplace(t *testing.T) { //nolint:dupl + cnf := tempConf(t, "api: yes\n") + + api := API{ + Address: "localhost:9997", + ReadTimeout: conf.Duration(10 * time.Second), + WriteTimeout: conf.Duration(10 * time.Second), + Conf: cnf, + AuthManager: test.NilAuthManager, + Parent: &testParent{}, + } + err := api.Initialize() + require.NoError(t, err) + defer api.Close() + + tr := &http.Transport{} + defer tr.CloseIdleConnections() + hc := &http.Client{Transport: tr} + + httpRequest(t, hc, http.MethodPost, "http://localhost:9997/v3/config/paths/add/my/path", + map[string]any{ + "source": "rtsp://127.0.0.1:9999/mypath", + "sourceOnDemand": true, + "disablePublisherOverride": true, // test setting a deprecated parameter + "rpiCameraVFlip": true, + }, nil) + + httpRequest(t, hc, http.MethodPost, "http://localhost:9997/v3/config/paths/replace/my/path", + map[string]any{ + "source": "rtsp://127.0.0.1:9998/mypath", + "sourceOnDemand": true, + }, nil) + + var out map[string]any + httpRequest(t, hc, http.MethodGet, "http://localhost:9997/v3/config/paths/get/my/path", nil, &out) + require.Equal(t, "rtsp://127.0.0.1:9998/mypath", out["source"]) + require.Equal(t, true, out["sourceOnDemand"]) + require.Equal(t, nil, out["disablePublisherOverride"]) + require.Equal(t, false, out["rpiCameraVFlip"]) +} + +func TestConfigPathsReplaceNonExisting(t *testing.T) { //nolint:dupl + cnf := tempConf(t, "api: yes\n") + + api := API{ + Address: "localhost:9997", + ReadTimeout: conf.Duration(10 * time.Second), + WriteTimeout: conf.Duration(10 * time.Second), + Conf: cnf, + AuthManager: test.NilAuthManager, + Parent: &testParent{}, + } + err := api.Initialize() + require.NoError(t, err) + defer api.Close() + + tr := &http.Transport{} + defer tr.CloseIdleConnections() + hc := &http.Client{Transport: tr} + + httpRequest(t, hc, http.MethodPost, "http://localhost:9997/v3/config/paths/replace/my/path", + map[string]any{ + "source": "rtsp://127.0.0.1:9998/mypath", + "sourceOnDemand": true, + }, nil) + + var out map[string]any + httpRequest(t, hc, http.MethodGet, "http://localhost:9997/v3/config/paths/get/my/path", nil, &out) + require.Equal(t, "rtsp://127.0.0.1:9998/mypath", out["source"]) + require.Equal(t, true, out["sourceOnDemand"]) + require.Equal(t, nil, out["disablePublisherOverride"]) + require.Equal(t, false, out["rpiCameraVFlip"]) +} + +func TestConfigPathsDelete(t *testing.T) { + cnf := tempConf(t, "api: yes\n") + + api := API{ + Address: "localhost:9997", + ReadTimeout: conf.Duration(10 * time.Second), + WriteTimeout: conf.Duration(10 * time.Second), + Conf: cnf, + AuthManager: test.NilAuthManager, + Parent: &testParent{}, + } + err := api.Initialize() + require.NoError(t, err) + defer api.Close() + + tr := &http.Transport{} + defer tr.CloseIdleConnections() + hc := &http.Client{Transport: tr} + + httpRequest(t, hc, http.MethodPost, "http://localhost:9997/v3/config/paths/add/my/path", + map[string]any{ + "source": "rtsp://127.0.0.1:9999/mypath", + "sourceOnDemand": true, + }, nil) + + httpRequest(t, hc, http.MethodDelete, "http://localhost:9997/v3/config/paths/delete/my/path", nil, nil) + + req, err := http.NewRequest(http.MethodGet, "http://localhost:9997/v3/config/paths/get/my/path", nil) + require.NoError(t, err) + + res, err := hc.Do(req) + require.NoError(t, err) + defer res.Body.Close() + + require.Equal(t, http.StatusNotFound, res.StatusCode) + checkError(t, res.Body, "path configuration not found") +} diff --git a/internal/api/api_hls.go b/internal/api/api_hls.go new file mode 100644 index 00000000..9c8e8411 --- /dev/null +++ b/internal/api/api_hls.go @@ -0,0 +1,49 @@ +//nolint:dupl +package api + +import ( + "errors" + "fmt" + "net/http" + + "github.com/bluenviron/mediamtx/internal/servers/hls" + "github.com/gin-gonic/gin" +) + +func (a *API) onHLSMuxersList(ctx *gin.Context) { + data, err := a.HLSServer.APIMuxersList() + if err != nil { + a.writeError(ctx, http.StatusInternalServerError, err) + return + } + + data.ItemCount = len(data.Items) + pageCount, err := paginate(&data.Items, ctx.Query("itemsPerPage"), ctx.Query("page")) + if err != nil { + a.writeError(ctx, http.StatusBadRequest, err) + return + } + data.PageCount = pageCount + + ctx.JSON(http.StatusOK, data) +} + +func (a *API) onHLSMuxersGet(ctx *gin.Context) { + pathName, ok := paramName(ctx) + if !ok { + a.writeError(ctx, http.StatusBadRequest, fmt.Errorf("invalid name")) + return + } + + data, err := a.HLSServer.APIMuxersGet(pathName) + if err != nil { + if errors.Is(err, hls.ErrMuxerNotFound) { + a.writeError(ctx, http.StatusNotFound, err) + } else { + a.writeError(ctx, http.StatusInternalServerError, err) + } + return + } + + ctx.JSON(http.StatusOK, data) +} diff --git a/internal/api/api_paths.go b/internal/api/api_paths.go new file mode 100644 index 00000000..93de2ec5 --- /dev/null +++ b/internal/api/api_paths.go @@ -0,0 +1,49 @@ +//nolint:dupl +package api + +import ( + "errors" + "fmt" + "net/http" + + "github.com/bluenviron/mediamtx/internal/conf" + "github.com/gin-gonic/gin" +) + +func (a *API) onPathsList(ctx *gin.Context) { + data, err := a.PathManager.APIPathsList() + if err != nil { + a.writeError(ctx, http.StatusInternalServerError, err) + return + } + + data.ItemCount = len(data.Items) + pageCount, err := paginate(&data.Items, ctx.Query("itemsPerPage"), ctx.Query("page")) + if err != nil { + a.writeError(ctx, http.StatusBadRequest, err) + return + } + data.PageCount = pageCount + + ctx.JSON(http.StatusOK, data) +} + +func (a *API) onPathsGet(ctx *gin.Context) { + pathName, ok := paramName(ctx) + if !ok { + a.writeError(ctx, http.StatusBadRequest, fmt.Errorf("invalid name")) + return + } + + data, err := a.PathManager.APIPathsGet(pathName) + if err != nil { + if errors.Is(err, conf.ErrPathNotFound) { + a.writeError(ctx, http.StatusNotFound, err) + } else { + a.writeError(ctx, http.StatusInternalServerError, err) + } + return + } + + ctx.JSON(http.StatusOK, data) +} diff --git a/internal/api/api_recordings.go b/internal/api/api_recordings.go new file mode 100644 index 00000000..a490557f --- /dev/null +++ b/internal/api/api_recordings.go @@ -0,0 +1,98 @@ +package api + +import ( + "fmt" + "net/http" + "os" + "strings" + "time" + + "github.com/bluenviron/mediamtx/internal/conf" + "github.com/bluenviron/mediamtx/internal/defs" + "github.com/bluenviron/mediamtx/internal/recordstore" + "github.com/gin-gonic/gin" +) + +func (a *API) onRecordingsList(ctx *gin.Context) { + a.mutex.RLock() + c := a.Conf + a.mutex.RUnlock() + + pathNames := recordstore.FindAllPathsWithSegments(c.Paths) + + data := defs.APIRecordingList{} + + data.ItemCount = len(pathNames) + pageCount, err := paginate(&pathNames, ctx.Query("itemsPerPage"), ctx.Query("page")) + if err != nil { + a.writeError(ctx, http.StatusBadRequest, err) + return + } + data.PageCount = pageCount + + data.Items = make([]*defs.APIRecording, len(pathNames)) + + for i, pathName := range pathNames { + pathConf, _, _ := conf.FindPathConf(c.Paths, pathName) + data.Items[i] = recordingsOfPath(pathConf, pathName) + } + + ctx.JSON(http.StatusOK, data) +} + +func (a *API) onRecordingsGet(ctx *gin.Context) { + pathName, ok := paramName(ctx) + if !ok { + a.writeError(ctx, http.StatusBadRequest, fmt.Errorf("invalid name")) + return + } + + a.mutex.RLock() + c := a.Conf + a.mutex.RUnlock() + + pathConf, _, err := conf.FindPathConf(c.Paths, pathName) + if err != nil { + a.writeError(ctx, http.StatusBadRequest, err) + return + } + + ctx.JSON(http.StatusOK, recordingsOfPath(pathConf, pathName)) +} + +func (a *API) onRecordingDeleteSegment(ctx *gin.Context) { + pathName := ctx.Query("path") + + start, err := time.Parse(time.RFC3339, ctx.Query("start")) + if err != nil { + a.writeError(ctx, http.StatusBadRequest, fmt.Errorf("invalid 'start' parameter: %w", err)) + return + } + + a.mutex.RLock() + c := a.Conf + a.mutex.RUnlock() + + pathConf, _, err := conf.FindPathConf(c.Paths, pathName) + if err != nil { + a.writeError(ctx, http.StatusBadRequest, err) + return + } + + pathFormat := recordstore.PathAddExtension( + strings.ReplaceAll(pathConf.RecordPath, "%path", pathName), + pathConf.RecordFormat, + ) + + segmentPath := recordstore.Path{ + Start: start, + }.Encode(pathFormat) + + err = os.Remove(segmentPath) + if err != nil { + a.writeError(ctx, http.StatusBadRequest, err) + return + } + + ctx.Status(http.StatusOK) +} diff --git a/internal/api/api_recordings_test.go b/internal/api/api_recordings_test.go new file mode 100644 index 00000000..1d1ac7ef --- /dev/null +++ b/internal/api/api_recordings_test.go @@ -0,0 +1,184 @@ +package api + +import ( + "net/http" + "net/url" + "os" + "path/filepath" + "testing" + "time" + + "github.com/bluenviron/mediamtx/internal/conf" + "github.com/bluenviron/mediamtx/internal/test" + "github.com/stretchr/testify/require" +) + +func TestRecordingsList(t *testing.T) { + dir, err := os.MkdirTemp("", "mediamtx-playback") + require.NoError(t, err) + defer os.RemoveAll(dir) + + cnf := tempConf(t, "pathDefaults:\n"+ + " recordPath: "+filepath.Join(dir, "%path/%Y-%m-%d_%H-%M-%S-%f")+"\n"+ + "paths:\n"+ + " mypath1:\n"+ + " all_others:\n") + + api := API{ + Address: "localhost:9997", + ReadTimeout: conf.Duration(10 * time.Second), + WriteTimeout: conf.Duration(10 * time.Second), + Conf: cnf, + AuthManager: test.NilAuthManager, + Parent: &testParent{}, + } + err = api.Initialize() + require.NoError(t, err) + defer api.Close() + + err = os.Mkdir(filepath.Join(dir, "mypath1"), 0o755) + require.NoError(t, err) + + err = os.Mkdir(filepath.Join(dir, "mypath2"), 0o755) + require.NoError(t, err) + + err = os.WriteFile(filepath.Join(dir, "mypath1", "2008-11-07_11-22-00-500000.mp4"), []byte(""), 0o644) + require.NoError(t, err) + + err = os.WriteFile(filepath.Join(dir, "mypath1", "2009-11-07_11-22-00-900000.mp4"), []byte(""), 0o644) + require.NoError(t, err) + + err = os.WriteFile(filepath.Join(dir, "mypath2", "2009-11-07_11-22-00-900000.mp4"), []byte(""), 0o644) + require.NoError(t, err) + + tr := &http.Transport{} + defer tr.CloseIdleConnections() + hc := &http.Client{Transport: tr} + + var out any + httpRequest(t, hc, http.MethodGet, "http://localhost:9997/v3/recordings/list", nil, &out) + require.Equal(t, map[string]any{ + "itemCount": float64(2), + "pageCount": float64(1), + "items": []any{ + map[string]any{ + "name": "mypath1", + "segments": []any{ + map[string]any{ + "start": time.Date(2008, 11, 7, 11, 22, 0, 500000000, time.Local).Format(time.RFC3339Nano), + }, + map[string]any{ + "start": time.Date(2009, 11, 7, 11, 22, 0, 900000000, time.Local).Format(time.RFC3339Nano), + }, + }, + }, + map[string]any{ + "name": "mypath2", + "segments": []any{ + map[string]any{ + "start": time.Date(2009, 11, 7, 11, 22, 0, 900000000, time.Local).Format(time.RFC3339Nano), + }, + }, + }, + }, + }, out) +} + +func TestRecordingsGet(t *testing.T) { + dir, err := os.MkdirTemp("", "mediamtx-playback") + require.NoError(t, err) + defer os.RemoveAll(dir) + + cnf := tempConf(t, "pathDefaults:\n"+ + " recordPath: "+filepath.Join(dir, "%path/%Y-%m-%d_%H-%M-%S-%f")+"\n"+ + "paths:\n"+ + " all_others:\n") + + api := API{ + Address: "localhost:9997", + ReadTimeout: conf.Duration(10 * time.Second), + WriteTimeout: conf.Duration(10 * time.Second), + Conf: cnf, + AuthManager: test.NilAuthManager, + Parent: &testParent{}, + } + err = api.Initialize() + require.NoError(t, err) + defer api.Close() + + err = os.Mkdir(filepath.Join(dir, "mypath1"), 0o755) + require.NoError(t, err) + + err = os.WriteFile(filepath.Join(dir, "mypath1", "2008-11-07_11-22-00-000000.mp4"), []byte(""), 0o644) + require.NoError(t, err) + + err = os.WriteFile(filepath.Join(dir, "mypath1", "2009-11-07_11-22-00-900000.mp4"), []byte(""), 0o644) + require.NoError(t, err) + + tr := &http.Transport{} + defer tr.CloseIdleConnections() + hc := &http.Client{Transport: tr} + + var out any + httpRequest(t, hc, http.MethodGet, "http://localhost:9997/v3/recordings/get/mypath1", nil, &out) + require.Equal(t, map[string]any{ + "name": "mypath1", + "segments": []any{ + map[string]any{ + "start": time.Date(2008, 11, 7, 11, 22, 0, 0, time.Local).Format(time.RFC3339Nano), + }, + map[string]any{ + "start": time.Date(2009, 11, 7, 11, 22, 0, 900000000, time.Local).Format(time.RFC3339Nano), + }, + }, + }, out) +} + +func TestRecordingsDeleteSegment(t *testing.T) { + dir, err := os.MkdirTemp("", "mediamtx-playback") + require.NoError(t, err) + defer os.RemoveAll(dir) + + cnf := tempConf(t, "pathDefaults:\n"+ + " recordPath: "+filepath.Join(dir, "%path/%Y-%m-%d_%H-%M-%S-%f")+"\n"+ + "paths:\n"+ + " all_others:\n") + + api := API{ + Address: "localhost:9997", + ReadTimeout: conf.Duration(10 * time.Second), + WriteTimeout: conf.Duration(10 * time.Second), + Conf: cnf, + AuthManager: test.NilAuthManager, + Parent: &testParent{}, + } + err = api.Initialize() + require.NoError(t, err) + defer api.Close() + + err = os.Mkdir(filepath.Join(dir, "mypath1"), 0o755) + require.NoError(t, err) + + err = os.WriteFile(filepath.Join(dir, "mypath1", "2008-11-07_11-22-00-900000.mp4"), []byte(""), 0o644) + require.NoError(t, err) + + tr := &http.Transport{} + defer tr.CloseIdleConnections() + hc := &http.Client{Transport: tr} + + u, err := url.Parse("http://localhost:9997/v3/recordings/deletesegment") + require.NoError(t, err) + + v := url.Values{} + v.Set("path", "mypath1") + v.Set("start", time.Date(2008, 11, 7, 11, 22, 0, 900000000, time.Local).Format(time.RFC3339Nano)) + u.RawQuery = v.Encode() + + req, err := http.NewRequest(http.MethodDelete, u.String(), nil) + require.NoError(t, err) + + res, err := hc.Do(req) + require.NoError(t, err) + defer res.Body.Close() + require.Equal(t, http.StatusOK, res.StatusCode) +} diff --git a/internal/api/api_rtmp.go b/internal/api/api_rtmp.go new file mode 100644 index 00000000..fa398a42 --- /dev/null +++ b/internal/api/api_rtmp.go @@ -0,0 +1,126 @@ +package api + +import ( + "errors" + "net/http" + + "github.com/bluenviron/mediamtx/internal/servers/rtmp" + "github.com/gin-gonic/gin" + "github.com/google/uuid" +) + +func (a *API) onRTMPConnsList(ctx *gin.Context) { + data, err := a.RTMPServer.APIConnsList() + if err != nil { + a.writeError(ctx, http.StatusInternalServerError, err) + return + } + + data.ItemCount = len(data.Items) + pageCount, err := paginate(&data.Items, ctx.Query("itemsPerPage"), ctx.Query("page")) + if err != nil { + a.writeError(ctx, http.StatusBadRequest, err) + return + } + data.PageCount = pageCount + + ctx.JSON(http.StatusOK, data) +} + +func (a *API) onRTMPConnsGet(ctx *gin.Context) { + uuid, err := uuid.Parse(ctx.Param("id")) + if err != nil { + a.writeError(ctx, http.StatusBadRequest, err) + return + } + + data, err := a.RTMPServer.APIConnsGet(uuid) + if err != nil { + if errors.Is(err, rtmp.ErrConnNotFound) { + a.writeError(ctx, http.StatusNotFound, err) + } else { + a.writeError(ctx, http.StatusInternalServerError, err) + } + return + } + + ctx.JSON(http.StatusOK, data) +} + +func (a *API) onRTMPConnsKick(ctx *gin.Context) { + uuid, err := uuid.Parse(ctx.Param("id")) + if err != nil { + a.writeError(ctx, http.StatusBadRequest, err) + return + } + + err = a.RTMPServer.APIConnsKick(uuid) + if err != nil { + if errors.Is(err, rtmp.ErrConnNotFound) { + a.writeError(ctx, http.StatusNotFound, err) + } else { + a.writeError(ctx, http.StatusInternalServerError, err) + } + return + } + + ctx.Status(http.StatusOK) +} + +func (a *API) onRTMPSConnsList(ctx *gin.Context) { + data, err := a.RTMPSServer.APIConnsList() + if err != nil { + a.writeError(ctx, http.StatusInternalServerError, err) + return + } + + data.ItemCount = len(data.Items) + pageCount, err := paginate(&data.Items, ctx.Query("itemsPerPage"), ctx.Query("page")) + if err != nil { + a.writeError(ctx, http.StatusBadRequest, err) + return + } + data.PageCount = pageCount + + ctx.JSON(http.StatusOK, data) +} + +func (a *API) onRTMPSConnsGet(ctx *gin.Context) { + uuid, err := uuid.Parse(ctx.Param("id")) + if err != nil { + a.writeError(ctx, http.StatusBadRequest, err) + return + } + + data, err := a.RTMPSServer.APIConnsGet(uuid) + if err != nil { + if errors.Is(err, rtmp.ErrConnNotFound) { + a.writeError(ctx, http.StatusNotFound, err) + } else { + a.writeError(ctx, http.StatusInternalServerError, err) + } + return + } + + ctx.JSON(http.StatusOK, data) +} + +func (a *API) onRTMPSConnsKick(ctx *gin.Context) { + uuid, err := uuid.Parse(ctx.Param("id")) + if err != nil { + a.writeError(ctx, http.StatusBadRequest, err) + return + } + + err = a.RTMPSServer.APIConnsKick(uuid) + if err != nil { + if errors.Is(err, rtmp.ErrConnNotFound) { + a.writeError(ctx, http.StatusNotFound, err) + } else { + a.writeError(ctx, http.StatusInternalServerError, err) + } + return + } + + ctx.Status(http.StatusOK) +} diff --git a/internal/api/api_rtsp.go b/internal/api/api_rtsp.go new file mode 100644 index 00000000..422ae40d --- /dev/null +++ b/internal/api/api_rtsp.go @@ -0,0 +1,202 @@ +package api + +import ( + "errors" + "net/http" + + "github.com/bluenviron/mediamtx/internal/servers/rtsp" + "github.com/gin-gonic/gin" + "github.com/google/uuid" +) + +func (a *API) onRTSPConnsList(ctx *gin.Context) { + data, err := a.RTSPServer.APIConnsList() + if err != nil { + a.writeError(ctx, http.StatusInternalServerError, err) + return + } + + data.ItemCount = len(data.Items) + pageCount, err := paginate(&data.Items, ctx.Query("itemsPerPage"), ctx.Query("page")) + if err != nil { + a.writeError(ctx, http.StatusBadRequest, err) + return + } + data.PageCount = pageCount + + ctx.JSON(http.StatusOK, data) +} + +func (a *API) onRTSPConnsGet(ctx *gin.Context) { + uuid, err := uuid.Parse(ctx.Param("id")) + if err != nil { + a.writeError(ctx, http.StatusBadRequest, err) + return + } + + data, err := a.RTSPServer.APIConnsGet(uuid) + if err != nil { + if errors.Is(err, rtsp.ErrConnNotFound) { + a.writeError(ctx, http.StatusNotFound, err) + } else { + a.writeError(ctx, http.StatusInternalServerError, err) + } + return + } + + ctx.JSON(http.StatusOK, data) +} + +func (a *API) onRTSPSessionsList(ctx *gin.Context) { + data, err := a.RTSPServer.APISessionsList() + if err != nil { + a.writeError(ctx, http.StatusInternalServerError, err) + return + } + + data.ItemCount = len(data.Items) + pageCount, err := paginate(&data.Items, ctx.Query("itemsPerPage"), ctx.Query("page")) + if err != nil { + a.writeError(ctx, http.StatusBadRequest, err) + return + } + data.PageCount = pageCount + + ctx.JSON(http.StatusOK, data) +} + +func (a *API) onRTSPSessionsGet(ctx *gin.Context) { + uuid, err := uuid.Parse(ctx.Param("id")) + if err != nil { + a.writeError(ctx, http.StatusBadRequest, err) + return + } + + data, err := a.RTSPServer.APISessionsGet(uuid) + if err != nil { + if errors.Is(err, rtsp.ErrSessionNotFound) { + a.writeError(ctx, http.StatusNotFound, err) + } else { + a.writeError(ctx, http.StatusInternalServerError, err) + } + return + } + + ctx.JSON(http.StatusOK, data) +} + +func (a *API) onRTSPSessionsKick(ctx *gin.Context) { + uuid, err := uuid.Parse(ctx.Param("id")) + if err != nil { + a.writeError(ctx, http.StatusBadRequest, err) + return + } + + err = a.RTSPServer.APISessionsKick(uuid) + if err != nil { + if errors.Is(err, rtsp.ErrSessionNotFound) { + a.writeError(ctx, http.StatusNotFound, err) + } else { + a.writeError(ctx, http.StatusInternalServerError, err) + } + return + } + + ctx.Status(http.StatusOK) +} + +func (a *API) onRTSPSConnsList(ctx *gin.Context) { + data, err := a.RTSPSServer.APIConnsList() + if err != nil { + a.writeError(ctx, http.StatusInternalServerError, err) + return + } + + data.ItemCount = len(data.Items) + pageCount, err := paginate(&data.Items, ctx.Query("itemsPerPage"), ctx.Query("page")) + if err != nil { + a.writeError(ctx, http.StatusBadRequest, err) + return + } + data.PageCount = pageCount + + ctx.JSON(http.StatusOK, data) +} + +func (a *API) onRTSPSConnsGet(ctx *gin.Context) { + uuid, err := uuid.Parse(ctx.Param("id")) + if err != nil { + a.writeError(ctx, http.StatusBadRequest, err) + return + } + + data, err := a.RTSPSServer.APIConnsGet(uuid) + if err != nil { + if errors.Is(err, rtsp.ErrConnNotFound) { + a.writeError(ctx, http.StatusNotFound, err) + } else { + a.writeError(ctx, http.StatusInternalServerError, err) + } + return + } + + ctx.JSON(http.StatusOK, data) +} + +func (a *API) onRTSPSSessionsList(ctx *gin.Context) { + data, err := a.RTSPSServer.APISessionsList() + if err != nil { + a.writeError(ctx, http.StatusInternalServerError, err) + return + } + + data.ItemCount = len(data.Items) + pageCount, err := paginate(&data.Items, ctx.Query("itemsPerPage"), ctx.Query("page")) + if err != nil { + a.writeError(ctx, http.StatusBadRequest, err) + return + } + data.PageCount = pageCount + + ctx.JSON(http.StatusOK, data) +} + +func (a *API) onRTSPSSessionsGet(ctx *gin.Context) { + uuid, err := uuid.Parse(ctx.Param("id")) + if err != nil { + a.writeError(ctx, http.StatusBadRequest, err) + return + } + + data, err := a.RTSPSServer.APISessionsGet(uuid) + if err != nil { + if errors.Is(err, rtsp.ErrSessionNotFound) { + a.writeError(ctx, http.StatusNotFound, err) + } else { + a.writeError(ctx, http.StatusInternalServerError, err) + } + return + } + + ctx.JSON(http.StatusOK, data) +} + +func (a *API) onRTSPSSessionsKick(ctx *gin.Context) { + uuid, err := uuid.Parse(ctx.Param("id")) + if err != nil { + a.writeError(ctx, http.StatusBadRequest, err) + return + } + + err = a.RTSPSServer.APISessionsKick(uuid) + if err != nil { + if errors.Is(err, rtsp.ErrSessionNotFound) { + a.writeError(ctx, http.StatusNotFound, err) + } else { + a.writeError(ctx, http.StatusInternalServerError, err) + } + return + } + + ctx.Status(http.StatusOK) +} diff --git a/internal/api/api_srt.go b/internal/api/api_srt.go new file mode 100644 index 00000000..6029dfd2 --- /dev/null +++ b/internal/api/api_srt.go @@ -0,0 +1,69 @@ +//nolint:dupl +package api + +import ( + "errors" + "net/http" + + "github.com/bluenviron/mediamtx/internal/servers/srt" + "github.com/gin-gonic/gin" + "github.com/google/uuid" +) + +func (a *API) onSRTConnsList(ctx *gin.Context) { + data, err := a.SRTServer.APIConnsList() + if err != nil { + a.writeError(ctx, http.StatusInternalServerError, err) + return + } + + data.ItemCount = len(data.Items) + pageCount, err := paginate(&data.Items, ctx.Query("itemsPerPage"), ctx.Query("page")) + if err != nil { + a.writeError(ctx, http.StatusBadRequest, err) + return + } + data.PageCount = pageCount + + ctx.JSON(http.StatusOK, data) +} + +func (a *API) onSRTConnsGet(ctx *gin.Context) { + uuid, err := uuid.Parse(ctx.Param("id")) + if err != nil { + a.writeError(ctx, http.StatusBadRequest, err) + return + } + + data, err := a.SRTServer.APIConnsGet(uuid) + if err != nil { + if errors.Is(err, srt.ErrConnNotFound) { + a.writeError(ctx, http.StatusNotFound, err) + } else { + a.writeError(ctx, http.StatusInternalServerError, err) + } + return + } + + ctx.JSON(http.StatusOK, data) +} + +func (a *API) onSRTConnsKick(ctx *gin.Context) { + uuid, err := uuid.Parse(ctx.Param("id")) + if err != nil { + a.writeError(ctx, http.StatusBadRequest, err) + return + } + + err = a.SRTServer.APIConnsKick(uuid) + if err != nil { + if errors.Is(err, srt.ErrConnNotFound) { + a.writeError(ctx, http.StatusNotFound, err) + } else { + a.writeError(ctx, http.StatusInternalServerError, err) + } + return + } + + ctx.Status(http.StatusOK) +} diff --git a/internal/api/api_test.go b/internal/api/api_test.go index 0612018a..2354066d 100644 --- a/internal/api/api_test.go +++ b/internal/api/api_test.go @@ -8,7 +8,6 @@ import ( "net/http" "net/url" "os" - "path/filepath" "testing" "time" @@ -73,7 +72,7 @@ func httpRequest(t *testing.T, hc *http.Client, method string, ur string, in any require.NoError(t, err) } -func checkError(t *testing.T, msg string, body io.Reader) { +func checkError(t *testing.T, body io.Reader, msg string) { var resErr map[string]any err := json.NewDecoder(body).Decode(&resErr) require.NoError(t, err) @@ -147,642 +146,6 @@ func TestInfo(t *testing.T) { }, out) } -func TestConfigGlobalGet(t *testing.T) { - cnf := tempConf(t, "api: yes\n") - checked := false - - api := API{ - Address: "localhost:9997", - ReadTimeout: conf.Duration(10 * time.Second), - WriteTimeout: conf.Duration(10 * time.Second), - Conf: cnf, - AuthManager: &test.AuthManager{ - AuthenticateImpl: func(req *auth.Request) *auth.Error { - require.Equal(t, conf.AuthActionAPI, req.Action) - require.Equal(t, "myuser", req.Credentials.User) - require.Equal(t, "mypass", req.Credentials.Pass) - checked = true - return nil - }, - }, - Parent: &testParent{}, - } - err := api.Initialize() - require.NoError(t, err) - defer api.Close() - - tr := &http.Transport{} - defer tr.CloseIdleConnections() - hc := &http.Client{Transport: tr} - - var out map[string]any - httpRequest(t, hc, http.MethodGet, "http://myuser:mypass@localhost:9997/v3/config/global/get", nil, &out) - require.Equal(t, true, out["api"]) - - require.True(t, checked) -} - -func TestConfigGlobalPatch(t *testing.T) { - cnf := tempConf(t, "api: yes\n") - - api := API{ - Address: "localhost:9997", - ReadTimeout: conf.Duration(10 * time.Second), - WriteTimeout: conf.Duration(10 * time.Second), - Conf: cnf, - AuthManager: test.NilAuthManager, - Parent: &testParent{}, - } - err := api.Initialize() - require.NoError(t, err) - defer api.Close() - - tr := &http.Transport{} - defer tr.CloseIdleConnections() - hc := &http.Client{Transport: tr} - - httpRequest(t, hc, http.MethodPatch, "http://localhost:9997/v3/config/global/patch", - map[string]any{ - "rtmp": false, - "readTimeout": "7s", - "protocols": []string{"tcp"}, - "readBufferCount": 4096, // test setting a deprecated parameter - }, nil) - - time.Sleep(500 * time.Millisecond) - - var out map[string]any - httpRequest(t, hc, http.MethodGet, "http://localhost:9997/v3/config/global/get", nil, &out) - require.Equal(t, false, out["rtmp"]) - require.Equal(t, "7s", out["readTimeout"]) - require.Equal(t, []any{"tcp"}, out["protocols"]) - require.Equal(t, float64(4096), out["readBufferCount"]) -} - -func TestConfigGlobalPatchUnknownField(t *testing.T) { //nolint:dupl - cnf := tempConf(t, "api: yes\n") - - api := API{ - Address: "localhost:9997", - ReadTimeout: conf.Duration(10 * time.Second), - WriteTimeout: conf.Duration(10 * time.Second), - Conf: cnf, - AuthManager: test.NilAuthManager, - Parent: &testParent{}, - } - err := api.Initialize() - require.NoError(t, err) - defer api.Close() - - b := map[string]any{ - "test": "asd", - } - - byts, err := json.Marshal(b) - require.NoError(t, err) - - tr := &http.Transport{} - defer tr.CloseIdleConnections() - hc := &http.Client{Transport: tr} - - req, err := http.NewRequest(http.MethodPatch, "http://localhost:9997/v3/config/global/patch", - bytes.NewReader(byts)) - require.NoError(t, err) - - res, err := hc.Do(req) - require.NoError(t, err) - defer res.Body.Close() - - require.Equal(t, http.StatusBadRequest, res.StatusCode) - checkError(t, "json: unknown field \"test\"", res.Body) -} - -func TestConfigPathDefaultsGet(t *testing.T) { - cnf := tempConf(t, "api: yes\n") - - api := API{ - Address: "localhost:9997", - ReadTimeout: conf.Duration(10 * time.Second), - WriteTimeout: conf.Duration(10 * time.Second), - Conf: cnf, - AuthManager: test.NilAuthManager, - Parent: &testParent{}, - } - err := api.Initialize() - require.NoError(t, err) - defer api.Close() - - tr := &http.Transport{} - defer tr.CloseIdleConnections() - hc := &http.Client{Transport: tr} - - var out map[string]any - httpRequest(t, hc, http.MethodGet, "http://localhost:9997/v3/config/pathdefaults/get", nil, &out) - require.Equal(t, "publisher", out["source"]) -} - -func TestConfigPathDefaultsPatch(t *testing.T) { - cnf := tempConf(t, "api: yes\n") - - api := API{ - Address: "localhost:9997", - ReadTimeout: conf.Duration(10 * time.Second), - WriteTimeout: conf.Duration(10 * time.Second), - Conf: cnf, - AuthManager: test.NilAuthManager, - Parent: &testParent{}, - } - err := api.Initialize() - require.NoError(t, err) - defer api.Close() - - tr := &http.Transport{} - defer tr.CloseIdleConnections() - hc := &http.Client{Transport: tr} - - httpRequest(t, hc, http.MethodPatch, "http://localhost:9997/v3/config/pathdefaults/patch", - map[string]any{ - "recordFormat": "fmp4", - }, nil) - - time.Sleep(500 * time.Millisecond) - - var out map[string]any - httpRequest(t, hc, http.MethodGet, "http://localhost:9997/v3/config/pathdefaults/get", nil, &out) - require.Equal(t, "fmp4", out["recordFormat"]) -} - -func TestConfigPathsList(t *testing.T) { - cnf := tempConf(t, "api: yes\n"+ - "paths:\n"+ - " path1:\n"+ - " readUser: myuser1\n"+ - " readPass: mypass1\n"+ - " path2:\n"+ - " readUser: myuser2\n"+ - " readPass: mypass2\n") - - api := API{ - Address: "localhost:9997", - ReadTimeout: conf.Duration(10 * time.Second), - WriteTimeout: conf.Duration(10 * time.Second), - Conf: cnf, - AuthManager: test.NilAuthManager, - Parent: &testParent{}, - } - err := api.Initialize() - require.NoError(t, err) - defer api.Close() - - type pathConfig map[string]any - - type listRes struct { - ItemCount int `json:"itemCount"` - PageCount int `json:"pageCount"` - Items []pathConfig `json:"items"` - } - - tr := &http.Transport{} - defer tr.CloseIdleConnections() - hc := &http.Client{Transport: tr} - - var out listRes - httpRequest(t, hc, http.MethodGet, "http://localhost:9997/v3/config/paths/list", nil, &out) - require.Equal(t, 2, out.ItemCount) - require.Equal(t, 1, out.PageCount) - require.Equal(t, "path1", out.Items[0]["name"]) - require.Equal(t, "myuser1", out.Items[0]["readUser"]) - require.Equal(t, "mypass1", out.Items[0]["readPass"]) - require.Equal(t, "path2", out.Items[1]["name"]) - require.Equal(t, "myuser2", out.Items[1]["readUser"]) - require.Equal(t, "mypass2", out.Items[1]["readPass"]) -} - -func TestConfigPathsGet(t *testing.T) { - cnf := tempConf(t, "api: yes\n"+ - "paths:\n"+ - " my/path:\n"+ - " readUser: myuser\n"+ - " readPass: mypass\n") - - api := API{ - Address: "localhost:9997", - ReadTimeout: conf.Duration(10 * time.Second), - WriteTimeout: conf.Duration(10 * time.Second), - Conf: cnf, - AuthManager: test.NilAuthManager, - Parent: &testParent{}, - } - err := api.Initialize() - require.NoError(t, err) - defer api.Close() - - tr := &http.Transport{} - defer tr.CloseIdleConnections() - hc := &http.Client{Transport: tr} - - var out map[string]any - httpRequest(t, hc, http.MethodGet, "http://localhost:9997/v3/config/paths/get/my/path", nil, &out) - require.Equal(t, "my/path", out["name"]) - require.Equal(t, "myuser", out["readUser"]) -} - -func TestConfigPathsAdd(t *testing.T) { - cnf := tempConf(t, "api: yes\n") - - api := API{ - Address: "localhost:9997", - ReadTimeout: conf.Duration(10 * time.Second), - WriteTimeout: conf.Duration(10 * time.Second), - Conf: cnf, - AuthManager: test.NilAuthManager, - Parent: &testParent{}, - } - err := api.Initialize() - require.NoError(t, err) - defer api.Close() - - tr := &http.Transport{} - defer tr.CloseIdleConnections() - hc := &http.Client{Transport: tr} - - httpRequest(t, hc, http.MethodPost, "http://localhost:9997/v3/config/paths/add/my/path", - map[string]any{ - "source": "rtsp://127.0.0.1:9999/mypath", - "sourceOnDemand": true, - "disablePublisherOverride": true, // test setting a deprecated parameter - "rpiCameraVFlip": true, - }, nil) - - var out map[string]any - httpRequest(t, hc, http.MethodGet, "http://localhost:9997/v3/config/paths/get/my/path", nil, &out) - require.Equal(t, "rtsp://127.0.0.1:9999/mypath", out["source"]) - require.Equal(t, true, out["sourceOnDemand"]) - require.Equal(t, true, out["disablePublisherOverride"]) - require.Equal(t, true, out["rpiCameraVFlip"]) -} - -func TestConfigPathsAddUnknownField(t *testing.T) { //nolint:dupl - cnf := tempConf(t, "api: yes\n") - - api := API{ - Address: "localhost:9997", - ReadTimeout: conf.Duration(10 * time.Second), - WriteTimeout: conf.Duration(10 * time.Second), - Conf: cnf, - AuthManager: test.NilAuthManager, - Parent: &testParent{}, - } - err := api.Initialize() - require.NoError(t, err) - defer api.Close() - - b := map[string]any{ - "test": "asd", - } - - byts, err := json.Marshal(b) - require.NoError(t, err) - - tr := &http.Transport{} - defer tr.CloseIdleConnections() - hc := &http.Client{Transport: tr} - - req, err := http.NewRequest(http.MethodPost, - "http://localhost:9997/v3/config/paths/add/my/path", bytes.NewReader(byts)) - require.NoError(t, err) - - res, err := hc.Do(req) - require.NoError(t, err) - defer res.Body.Close() - - require.Equal(t, http.StatusBadRequest, res.StatusCode) - checkError(t, "json: unknown field \"test\"", res.Body) -} - -func TestConfigPathsPatch(t *testing.T) { //nolint:dupl - cnf := tempConf(t, "api: yes\n") - - api := API{ - Address: "localhost:9997", - ReadTimeout: conf.Duration(10 * time.Second), - WriteTimeout: conf.Duration(10 * time.Second), - Conf: cnf, - AuthManager: test.NilAuthManager, - Parent: &testParent{}, - } - err := api.Initialize() - require.NoError(t, err) - defer api.Close() - - tr := &http.Transport{} - defer tr.CloseIdleConnections() - hc := &http.Client{Transport: tr} - - httpRequest(t, hc, http.MethodPost, "http://localhost:9997/v3/config/paths/add/my/path", - map[string]any{ - "source": "rtsp://127.0.0.1:9999/mypath", - "sourceOnDemand": true, - "disablePublisherOverride": true, // test setting a deprecated parameter - "rpiCameraVFlip": true, - }, nil) - - httpRequest(t, hc, http.MethodPatch, "http://localhost:9997/v3/config/paths/patch/my/path", - map[string]any{ - "source": "rtsp://127.0.0.1:9998/mypath", - "sourceOnDemand": true, - }, nil) - - var out map[string]any - httpRequest(t, hc, http.MethodGet, "http://localhost:9997/v3/config/paths/get/my/path", nil, &out) - require.Equal(t, "rtsp://127.0.0.1:9998/mypath", out["source"]) - require.Equal(t, true, out["sourceOnDemand"]) - require.Equal(t, true, out["disablePublisherOverride"]) - require.Equal(t, true, out["rpiCameraVFlip"]) -} - -func TestConfigPathsReplace(t *testing.T) { //nolint:dupl - cnf := tempConf(t, "api: yes\n") - - api := API{ - Address: "localhost:9997", - ReadTimeout: conf.Duration(10 * time.Second), - WriteTimeout: conf.Duration(10 * time.Second), - Conf: cnf, - AuthManager: test.NilAuthManager, - Parent: &testParent{}, - } - err := api.Initialize() - require.NoError(t, err) - defer api.Close() - - tr := &http.Transport{} - defer tr.CloseIdleConnections() - hc := &http.Client{Transport: tr} - - httpRequest(t, hc, http.MethodPost, "http://localhost:9997/v3/config/paths/add/my/path", - map[string]any{ - "source": "rtsp://127.0.0.1:9999/mypath", - "sourceOnDemand": true, - "disablePublisherOverride": true, // test setting a deprecated parameter - "rpiCameraVFlip": true, - }, nil) - - httpRequest(t, hc, http.MethodPost, "http://localhost:9997/v3/config/paths/replace/my/path", - map[string]any{ - "source": "rtsp://127.0.0.1:9998/mypath", - "sourceOnDemand": true, - }, nil) - - var out map[string]any - httpRequest(t, hc, http.MethodGet, "http://localhost:9997/v3/config/paths/get/my/path", nil, &out) - require.Equal(t, "rtsp://127.0.0.1:9998/mypath", out["source"]) - require.Equal(t, true, out["sourceOnDemand"]) - require.Equal(t, nil, out["disablePublisherOverride"]) - require.Equal(t, false, out["rpiCameraVFlip"]) -} - -func TestConfigPathsReplaceNonExisting(t *testing.T) { //nolint:dupl - cnf := tempConf(t, "api: yes\n") - - api := API{ - Address: "localhost:9997", - ReadTimeout: conf.Duration(10 * time.Second), - WriteTimeout: conf.Duration(10 * time.Second), - Conf: cnf, - AuthManager: test.NilAuthManager, - Parent: &testParent{}, - } - err := api.Initialize() - require.NoError(t, err) - defer api.Close() - - tr := &http.Transport{} - defer tr.CloseIdleConnections() - hc := &http.Client{Transport: tr} - - httpRequest(t, hc, http.MethodPost, "http://localhost:9997/v3/config/paths/replace/my/path", - map[string]any{ - "source": "rtsp://127.0.0.1:9998/mypath", - "sourceOnDemand": true, - }, nil) - - var out map[string]any - httpRequest(t, hc, http.MethodGet, "http://localhost:9997/v3/config/paths/get/my/path", nil, &out) - require.Equal(t, "rtsp://127.0.0.1:9998/mypath", out["source"]) - require.Equal(t, true, out["sourceOnDemand"]) - require.Equal(t, nil, out["disablePublisherOverride"]) - require.Equal(t, false, out["rpiCameraVFlip"]) -} - -func TestConfigPathsDelete(t *testing.T) { - cnf := tempConf(t, "api: yes\n") - - api := API{ - Address: "localhost:9997", - ReadTimeout: conf.Duration(10 * time.Second), - WriteTimeout: conf.Duration(10 * time.Second), - Conf: cnf, - AuthManager: test.NilAuthManager, - Parent: &testParent{}, - } - err := api.Initialize() - require.NoError(t, err) - defer api.Close() - - tr := &http.Transport{} - defer tr.CloseIdleConnections() - hc := &http.Client{Transport: tr} - - httpRequest(t, hc, http.MethodPost, "http://localhost:9997/v3/config/paths/add/my/path", - map[string]any{ - "source": "rtsp://127.0.0.1:9999/mypath", - "sourceOnDemand": true, - }, nil) - - httpRequest(t, hc, http.MethodDelete, "http://localhost:9997/v3/config/paths/delete/my/path", nil, nil) - - req, err := http.NewRequest(http.MethodGet, "http://localhost:9997/v3/config/paths/get/my/path", nil) - require.NoError(t, err) - - res, err := hc.Do(req) - require.NoError(t, err) - defer res.Body.Close() - - require.Equal(t, http.StatusNotFound, res.StatusCode) - checkError(t, "path configuration not found", res.Body) -} - -func TestRecordingsList(t *testing.T) { - dir, err := os.MkdirTemp("", "mediamtx-playback") - require.NoError(t, err) - defer os.RemoveAll(dir) - - cnf := tempConf(t, "pathDefaults:\n"+ - " recordPath: "+filepath.Join(dir, "%path/%Y-%m-%d_%H-%M-%S-%f")+"\n"+ - "paths:\n"+ - " mypath1:\n"+ - " all_others:\n") - - api := API{ - Address: "localhost:9997", - ReadTimeout: conf.Duration(10 * time.Second), - WriteTimeout: conf.Duration(10 * time.Second), - Conf: cnf, - AuthManager: test.NilAuthManager, - Parent: &testParent{}, - } - err = api.Initialize() - require.NoError(t, err) - defer api.Close() - - err = os.Mkdir(filepath.Join(dir, "mypath1"), 0o755) - require.NoError(t, err) - - err = os.Mkdir(filepath.Join(dir, "mypath2"), 0o755) - require.NoError(t, err) - - err = os.WriteFile(filepath.Join(dir, "mypath1", "2008-11-07_11-22-00-500000.mp4"), []byte(""), 0o644) - require.NoError(t, err) - - err = os.WriteFile(filepath.Join(dir, "mypath1", "2009-11-07_11-22-00-900000.mp4"), []byte(""), 0o644) - require.NoError(t, err) - - err = os.WriteFile(filepath.Join(dir, "mypath2", "2009-11-07_11-22-00-900000.mp4"), []byte(""), 0o644) - require.NoError(t, err) - - tr := &http.Transport{} - defer tr.CloseIdleConnections() - hc := &http.Client{Transport: tr} - - var out any - httpRequest(t, hc, http.MethodGet, "http://localhost:9997/v3/recordings/list", nil, &out) - require.Equal(t, map[string]any{ - "itemCount": float64(2), - "pageCount": float64(1), - "items": []any{ - map[string]any{ - "name": "mypath1", - "segments": []any{ - map[string]any{ - "start": time.Date(2008, 11, 7, 11, 22, 0, 500000000, time.Local).Format(time.RFC3339Nano), - }, - map[string]any{ - "start": time.Date(2009, 11, 7, 11, 22, 0, 900000000, time.Local).Format(time.RFC3339Nano), - }, - }, - }, - map[string]any{ - "name": "mypath2", - "segments": []any{ - map[string]any{ - "start": time.Date(2009, 11, 7, 11, 22, 0, 900000000, time.Local).Format(time.RFC3339Nano), - }, - }, - }, - }, - }, out) -} - -func TestRecordingsGet(t *testing.T) { - dir, err := os.MkdirTemp("", "mediamtx-playback") - require.NoError(t, err) - defer os.RemoveAll(dir) - - cnf := tempConf(t, "pathDefaults:\n"+ - " recordPath: "+filepath.Join(dir, "%path/%Y-%m-%d_%H-%M-%S-%f")+"\n"+ - "paths:\n"+ - " all_others:\n") - - api := API{ - Address: "localhost:9997", - ReadTimeout: conf.Duration(10 * time.Second), - WriteTimeout: conf.Duration(10 * time.Second), - Conf: cnf, - AuthManager: test.NilAuthManager, - Parent: &testParent{}, - } - err = api.Initialize() - require.NoError(t, err) - defer api.Close() - - err = os.Mkdir(filepath.Join(dir, "mypath1"), 0o755) - require.NoError(t, err) - - err = os.WriteFile(filepath.Join(dir, "mypath1", "2008-11-07_11-22-00-000000.mp4"), []byte(""), 0o644) - require.NoError(t, err) - - err = os.WriteFile(filepath.Join(dir, "mypath1", "2009-11-07_11-22-00-900000.mp4"), []byte(""), 0o644) - require.NoError(t, err) - - tr := &http.Transport{} - defer tr.CloseIdleConnections() - hc := &http.Client{Transport: tr} - - var out any - httpRequest(t, hc, http.MethodGet, "http://localhost:9997/v3/recordings/get/mypath1", nil, &out) - require.Equal(t, map[string]any{ - "name": "mypath1", - "segments": []any{ - map[string]any{ - "start": time.Date(2008, 11, 7, 11, 22, 0, 0, time.Local).Format(time.RFC3339Nano), - }, - map[string]any{ - "start": time.Date(2009, 11, 7, 11, 22, 0, 900000000, time.Local).Format(time.RFC3339Nano), - }, - }, - }, out) -} - -func TestRecordingsDeleteSegment(t *testing.T) { - dir, err := os.MkdirTemp("", "mediamtx-playback") - require.NoError(t, err) - defer os.RemoveAll(dir) - - cnf := tempConf(t, "pathDefaults:\n"+ - " recordPath: "+filepath.Join(dir, "%path/%Y-%m-%d_%H-%M-%S-%f")+"\n"+ - "paths:\n"+ - " all_others:\n") - - api := API{ - Address: "localhost:9997", - ReadTimeout: conf.Duration(10 * time.Second), - WriteTimeout: conf.Duration(10 * time.Second), - Conf: cnf, - AuthManager: test.NilAuthManager, - Parent: &testParent{}, - } - err = api.Initialize() - require.NoError(t, err) - defer api.Close() - - err = os.Mkdir(filepath.Join(dir, "mypath1"), 0o755) - require.NoError(t, err) - - err = os.WriteFile(filepath.Join(dir, "mypath1", "2008-11-07_11-22-00-900000.mp4"), []byte(""), 0o644) - require.NoError(t, err) - - tr := &http.Transport{} - defer tr.CloseIdleConnections() - hc := &http.Client{Transport: tr} - - u, err := url.Parse("http://localhost:9997/v3/recordings/deletesegment") - require.NoError(t, err) - - v := url.Values{} - v.Set("path", "mypath1") - v.Set("start", time.Date(2008, 11, 7, 11, 22, 0, 900000000, time.Local).Format(time.RFC3339Nano)) - u.RawQuery = v.Encode() - - req, err := http.NewRequest(http.MethodDelete, u.String(), nil) - require.NoError(t, err) - - res, err := hc.Do(req) - require.NoError(t, err) - defer res.Body.Close() - require.Equal(t, http.StatusOK, res.StatusCode) -} - func TestAuthJWKSRefresh(t *testing.T) { ok := false diff --git a/internal/api/api_webrtc.go b/internal/api/api_webrtc.go new file mode 100644 index 00000000..ba54b44f --- /dev/null +++ b/internal/api/api_webrtc.go @@ -0,0 +1,69 @@ +//nolint:dupl +package api + +import ( + "errors" + "net/http" + + "github.com/bluenviron/mediamtx/internal/servers/webrtc" + "github.com/gin-gonic/gin" + "github.com/google/uuid" +) + +func (a *API) onWebRTCSessionsList(ctx *gin.Context) { + data, err := a.WebRTCServer.APISessionsList() + if err != nil { + a.writeError(ctx, http.StatusInternalServerError, err) + return + } + + data.ItemCount = len(data.Items) + pageCount, err := paginate(&data.Items, ctx.Query("itemsPerPage"), ctx.Query("page")) + if err != nil { + a.writeError(ctx, http.StatusBadRequest, err) + return + } + data.PageCount = pageCount + + ctx.JSON(http.StatusOK, data) +} + +func (a *API) onWebRTCSessionsGet(ctx *gin.Context) { + uuid, err := uuid.Parse(ctx.Param("id")) + if err != nil { + a.writeError(ctx, http.StatusBadRequest, err) + return + } + + data, err := a.WebRTCServer.APISessionsGet(uuid) + if err != nil { + if errors.Is(err, webrtc.ErrSessionNotFound) { + a.writeError(ctx, http.StatusNotFound, err) + } else { + a.writeError(ctx, http.StatusInternalServerError, err) + } + return + } + + ctx.JSON(http.StatusOK, data) +} + +func (a *API) onWebRTCSessionsKick(ctx *gin.Context) { + uuid, err := uuid.Parse(ctx.Param("id")) + if err != nil { + a.writeError(ctx, http.StatusBadRequest, err) + return + } + + err = a.WebRTCServer.APISessionsKick(uuid) + if err != nil { + if errors.Is(err, webrtc.ErrSessionNotFound) { + a.writeError(ctx, http.StatusNotFound, err) + } else { + a.writeError(ctx, http.StatusInternalServerError, err) + } + return + } + + ctx.Status(http.StatusOK) +}