From 681db142f7aa6cd9d088aa4a60685cd4d305fae2 Mon Sep 17 00:00:00 2001 From: aler9 <46489434+aler9@users.noreply.github.com> Date: Tue, 27 Apr 2021 19:19:47 +0200 Subject: [PATCH] rename client into readpublisher --- internal/clienthls/client.go | 30 +-- internal/clientrtmp/client.go | 52 ++-- internal/clientrtsp/client.go | 68 +++--- internal/path/path.go | 226 +++++++++--------- internal/pathman/pathman.go | 50 ++-- .../readpublisher.go} | 58 ++--- internal/rtmp/client.go | 2 +- internal/serverhls/server.go | 2 +- internal/serverrtmp/server.go | 2 +- internal/serverrtsp/server.go | 2 +- internal/source/source.go | 4 +- 11 files changed, 248 insertions(+), 248 deletions(-) rename internal/{client/client.go => readpublisher/readpublisher.go} (72%) diff --git a/internal/clienthls/client.go b/internal/clienthls/client.go index f059af0e..067b0c3d 100644 --- a/internal/clienthls/client.go +++ b/internal/clienthls/client.go @@ -18,9 +18,9 @@ import ( "github.com/aler9/gortsplib/pkg/rtpaac" "github.com/aler9/gortsplib/pkg/rtph264" - "github.com/aler9/rtsp-simple-server/internal/client" "github.com/aler9/rtsp-simple-server/internal/h264" "github.com/aler9/rtsp-simple-server/internal/logger" + "github.com/aler9/rtsp-simple-server/internal/readpublisher" "github.com/aler9/rtsp-simple-server/internal/stats" ) @@ -116,10 +116,10 @@ type trackIDPayloadPair struct { // PathMan is implemented by pathman.PathMan. type PathMan interface { - OnClientSetupPlay(client.SetupPlayReq) + OnReadPublisherSetupPlay(readpublisher.SetupPlayReq) } -// Parent is implemented by clientman.ClientMan. +// Parent is implemented by serverhls.Server. type Parent interface { Log(logger.Level, string, ...interface{}) OnClientClose(*Client) @@ -136,7 +136,7 @@ type Client struct { pathMan PathMan parent Parent - path client.Path + path readpublisher.Path ringBuffer *ringbuffer.RingBuffer tsQueue []*tsFile tsByName map[string]*tsFile @@ -196,8 +196,8 @@ func (c *Client) CloseRequest() { c.parent.OnClientClose(c) } -// IsClient implements client.Client. -func (c *Client) IsClient() {} +// IsReadPublisher implements readpublisher.ReadPublisher. +func (c *Client) IsReadPublisher() {} // IsSource implements path.source. func (c *Client) IsSource() {} @@ -206,7 +206,7 @@ func (c *Client) log(level logger.Level, format string, args ...interface{}) { c.parent.Log(level, "[client %s] "+format, append([]interface{}{c.pathName}, args...)...) } -// PathName returns the path name of the client. +// PathName returns the path name of the readpublisher. func (c *Client) PathName() string { return c.pathName } @@ -223,8 +223,8 @@ func (c *Client) run() { var aacDecoder *rtpaac.Decoder err := func() error { - pres := make(chan client.SetupPlayRes) - c.pathMan.OnClientSetupPlay(client.SetupPlayReq{c, c.pathName, nil, pres}) //nolint:govet + pres := make(chan readpublisher.SetupPlayRes) + c.pathMan.OnReadPublisherSetupPlay(readpublisher.SetupPlayReq{c, c.pathName, nil, pres}) //nolint:govet res := <-pres if res.Err != nil { @@ -286,7 +286,7 @@ func (c *Client) run() { if c.path != nil { res := make(chan struct{}) - c.path.OnClientRemove(client.RemoveReq{c, res}) //nolint:govet + c.path.OnReadPublisherRemove(readpublisher.RemoveReq{c, res}) //nolint:govet <-res } @@ -315,8 +315,8 @@ func (c *Client) run() { c.ringBuffer = ringbuffer.New(uint64(c.readBufferCount)) - resc := make(chan client.PlayRes) - c.path.OnClientPlay(client.PlayReq{c, resc}) //nolint:govet + resc := make(chan readpublisher.PlayRes) + c.path.OnReadPublisherPlay(readpublisher.PlayReq{c, resc}) //nolint:govet <-resc c.log(logger.Info, "is reading from path '%s'", c.pathName) @@ -483,7 +483,7 @@ func (c *Client) run() { <-writerDone res := make(chan struct{}) - c.path.OnClientRemove(client.RemoveReq{c, res}) //nolint:govet + c.path.OnReadPublisherRemove(readpublisher.RemoveReq{c, res}) //nolint:govet <-res c.parent.OnClientClose(c) @@ -495,7 +495,7 @@ func (c *Client) run() { c.log(logger.Info, "ERR: %s", err) res := make(chan struct{}) - c.path.OnClientRemove(client.RemoveReq{c, res}) //nolint:govet + c.path.OnReadPublisherRemove(readpublisher.RemoveReq{c, res}) //nolint:govet <-res c.parent.OnClientClose(c) @@ -504,7 +504,7 @@ func (c *Client) run() { case <-c.terminate: res := make(chan struct{}) - c.path.OnClientRemove(client.RemoveReq{c, res}) //nolint:govet + c.path.OnReadPublisherRemove(readpublisher.RemoveReq{c, res}) //nolint:govet <-res c.ringBuffer.Close() diff --git a/internal/clientrtmp/client.go b/internal/clientrtmp/client.go index 01f54384..0de8325b 100644 --- a/internal/clientrtmp/client.go +++ b/internal/clientrtmp/client.go @@ -18,10 +18,10 @@ import ( "github.com/aler9/gortsplib/pkg/rtph264" "github.com/notedit/rtmp/av" - "github.com/aler9/rtsp-simple-server/internal/client" "github.com/aler9/rtsp-simple-server/internal/externalcmd" "github.com/aler9/rtsp-simple-server/internal/h264" "github.com/aler9/rtsp-simple-server/internal/logger" + "github.com/aler9/rtsp-simple-server/internal/readpublisher" "github.com/aler9/rtsp-simple-server/internal/rtcpsenderset" "github.com/aler9/rtsp-simple-server/internal/rtmp" "github.com/aler9/rtsp-simple-server/internal/stats" @@ -68,11 +68,11 @@ type trackIDPayloadPair struct { // PathMan is implemented by pathman.PathMan. type PathMan interface { - OnClientSetupPlay(client.SetupPlayReq) - OnClientAnnounce(client.AnnounceReq) + OnReadPublisherSetupPlay(readpublisher.SetupPlayReq) + OnReadPublisherAnnounce(readpublisher.AnnounceReq) } -// Parent is implemented by clientman.ClientMan. +// Parent is implemented by serverrtmp.Server. type Parent interface { Log(logger.Level, string, ...interface{}) OnClientClose(*Client) @@ -149,8 +149,8 @@ func (c *Client) CloseRequest() { c.parent.OnClientClose(c) } -// IsClient implements client.Client. -func (c *Client) IsClient() {} +// IsReadPublisher implements readpublisher.ReadPublisher. +func (c *Client) IsReadPublisher() {} // IsSource implements path.source. func (c *Client) IsSource() {} @@ -195,7 +195,7 @@ func (c *Client) run() { } func (c *Client) runRead() { - var path client.Path + var path readpublisher.Path var videoTrack *gortsplib.Track var h264Decoder *rtph264.Decoder var audioTrack *gortsplib.Track @@ -205,12 +205,12 @@ func (c *Client) runRead() { err := func() error { pathName, query := pathNameAndQuery(c.conn.URL()) - sres := make(chan client.SetupPlayRes) - c.pathMan.OnClientSetupPlay(client.SetupPlayReq{c, pathName, query, sres}) //nolint:govet + sres := make(chan readpublisher.SetupPlayRes) + c.pathMan.OnReadPublisherSetupPlay(readpublisher.SetupPlayReq{c, pathName, query, sres}) //nolint:govet res := <-sres if res.Err != nil { - if _, ok := res.Err.(client.ErrAuthCritical); ok { + if _, ok := res.Err.(readpublisher.ErrAuthCritical); ok { // wait some seconds to stop brute force attacks select { case <-time.After(pauseAfterAuthError): @@ -257,7 +257,7 @@ func (c *Client) runRead() { if path != nil { res := make(chan struct{}) - path.OnClientRemove(client.RemoveReq{c, res}) //nolint:govet + path.OnReadPublisherRemove(readpublisher.RemoveReq{c, res}) //nolint:govet <-res } @@ -268,8 +268,8 @@ func (c *Client) runRead() { c.ringBuffer = ringbuffer.New(uint64(c.readBufferCount)) - pres := make(chan client.PlayRes) - path.OnClientPlay(client.PlayReq{c, pres}) //nolint:govet + pres := make(chan readpublisher.PlayRes) + path.OnReadPublisherPlay(readpublisher.PlayReq{c, pres}) //nolint:govet <-pres c.log(logger.Info, "is reading from path '%s'", path.Name()) @@ -370,7 +370,7 @@ func (c *Client) runRead() { } res := make(chan struct{}) - path.OnClientRemove(client.RemoveReq{c, res}) //nolint:govet + path.OnReadPublisherRemove(readpublisher.RemoveReq{c, res}) //nolint:govet <-res c.parent.OnClientClose(c) @@ -378,7 +378,7 @@ func (c *Client) runRead() { case <-c.terminate: res := make(chan struct{}) - path.OnClientRemove(client.RemoveReq{c, res}) //nolint:govet + path.OnReadPublisherRemove(readpublisher.RemoveReq{c, res}) //nolint:govet <-res c.ringBuffer.Close() @@ -394,7 +394,7 @@ func (c *Client) runPublish() { var tracks gortsplib.Tracks var h264Encoder *rtph264.Encoder var aacEncoder *rtpaac.Encoder - var path client.Path + var path readpublisher.Path setupDone := make(chan struct{}) go func() { @@ -423,12 +423,12 @@ func (c *Client) runPublish() { pathName, query := pathNameAndQuery(c.conn.URL()) - resc := make(chan client.AnnounceRes) - c.pathMan.OnClientAnnounce(client.AnnounceReq{c, pathName, tracks, query, resc}) //nolint:govet + resc := make(chan readpublisher.AnnounceRes) + c.pathMan.OnReadPublisherAnnounce(readpublisher.AnnounceReq{c, pathName, tracks, query, resc}) //nolint:govet res := <-resc if res.Err != nil { - if _, ok := res.Err.(client.ErrAuthCritical); ok { + if _, ok := res.Err.(readpublisher.ErrAuthCritical); ok { // wait some seconds to stop brute force attacks select { case <-time.After(pauseAfterAuthError): @@ -465,8 +465,8 @@ func (c *Client) runPublish() { readerDone := make(chan error) go func() { readerDone <- func() error { - resc := make(chan client.RecordRes) - path.OnClientRecord(client.RecordReq{Client: c, Res: resc}) + resc := make(chan readpublisher.RecordRes) + path.OnReadPublisherRecord(readpublisher.RecordReq{ReadPublisher: c, Res: resc}) res := <-resc if res.Err != nil { @@ -493,7 +493,7 @@ func (c *Client) runPublish() { }) } - defer func(path client.Path) { + defer func(path readpublisher.Path) { if path.Conf().RunOnPublish != "" { onPublishCmd.Close() } @@ -581,7 +581,7 @@ func (c *Client) runPublish() { } res := make(chan struct{}) - path.OnClientRemove(client.RemoveReq{c, res}) //nolint:govet + path.OnReadPublisherRemove(readpublisher.RemoveReq{c, res}) //nolint:govet <-res path = nil @@ -593,7 +593,7 @@ func (c *Client) runPublish() { <-readerDone res := make(chan struct{}) - path.OnClientRemove(client.RemoveReq{c, res}) //nolint:govet + path.OnReadPublisherRemove(readpublisher.RemoveReq{c, res}) //nolint:govet <-res path = nil } @@ -611,7 +611,7 @@ func (c *Client) Authenticate(authMethods []headers.AuthMethod, if !ipEqualOrInRange(ip, ips) { c.log(logger.Info, "ERR: ip '%s' not allowed", ip) - return client.ErrAuthCritical{&base.Response{ //nolint:govet + return readpublisher.ErrAuthCritical{&base.Response{ //nolint:govet StatusCode: base.StatusUnauthorized, }} } @@ -623,7 +623,7 @@ func (c *Client) Authenticate(authMethods []headers.AuthMethod, if values.Get("user") != user || values.Get("pass") != pass { - return client.ErrAuthCritical{nil} //nolint:govet + return readpublisher.ErrAuthCritical{nil} //nolint:govet } } diff --git a/internal/clientrtsp/client.go b/internal/clientrtsp/client.go index e25a762c..454a6fec 100644 --- a/internal/clientrtsp/client.go +++ b/internal/clientrtsp/client.go @@ -16,9 +16,9 @@ import ( "github.com/aler9/gortsplib/pkg/headers" "github.com/aler9/gortsplib/pkg/liberrors" - "github.com/aler9/rtsp-simple-server/internal/client" "github.com/aler9/rtsp-simple-server/internal/externalcmd" "github.com/aler9/rtsp-simple-server/internal/logger" + "github.com/aler9/rtsp-simple-server/internal/readpublisher" "github.com/aler9/rtsp-simple-server/internal/stats" "github.com/aler9/rtsp-simple-server/internal/streamproc" ) @@ -47,12 +47,12 @@ func ipEqualOrInRange(ip net.IP, ips []interface{}) bool { // PathMan is implemented by pathman.PathMan. type PathMan interface { - OnClientDescribe(client.DescribeReq) - OnClientSetupPlay(client.SetupPlayReq) - OnClientAnnounce(client.AnnounceReq) + OnReadPublisherDescribe(readpublisher.DescribeReq) + OnReadPublisherSetupPlay(readpublisher.SetupPlayReq) + OnReadPublisherAnnounce(readpublisher.AnnounceReq) } -// Parent is implemented by clientman.ClientMan. +// Parent is implemented by serverrtsp.Server. type Parent interface { Log(logger.Level, string, ...interface{}) OnClientClose(*Client) @@ -71,7 +71,7 @@ type Client struct { pathMan PathMan parent Parent - path client.Path + path readpublisher.Path authUser string authPass string authValidator *auth.Validator @@ -138,8 +138,8 @@ func (c *Client) CloseRequest() { c.parent.OnClientClose(c) } -// IsClient implements client.Client. -func (c *Client) IsClient() {} +// IsReadPublisher implements readpublisher.ReadPublisher. +func (c *Client) IsReadPublisher() {} // IsSource implements path.source. func (c *Client) IsSource() {} @@ -175,16 +175,16 @@ func (c *Client) run() { } onDescribe := func(ctx *gortsplib.ServerConnDescribeCtx) (*base.Response, []byte, error) { - resc := make(chan client.DescribeRes) - c.pathMan.OnClientDescribe(client.DescribeReq{c, ctx.Path, ctx.Req, resc}) //nolint:govet + resc := make(chan readpublisher.DescribeRes) + c.pathMan.OnReadPublisherDescribe(readpublisher.DescribeReq{c, ctx.Path, ctx.Req, resc}) //nolint:govet res := <-resc if res.Err != nil { switch terr := res.Err.(type) { - case client.ErrAuthNotCritical: + case readpublisher.ErrAuthNotCritical: return terr.Response, nil, nil - case client.ErrAuthCritical: + case readpublisher.ErrAuthCritical: // wait some seconds to stop brute force attacks select { case <-time.After(pauseAfterAuthError): @@ -192,7 +192,7 @@ func (c *Client) run() { } return terr.Response, nil, errTerminated - case client.ErrNoOnePublishing: + case readpublisher.ErrNoOnePublishing: return &base.Response{ StatusCode: base.StatusNotFound, }, nil, res.Err @@ -219,16 +219,16 @@ func (c *Client) run() { } onAnnounce := func(ctx *gortsplib.ServerConnAnnounceCtx) (*base.Response, error) { - resc := make(chan client.AnnounceRes) - c.pathMan.OnClientAnnounce(client.AnnounceReq{c, ctx.Path, ctx.Tracks, ctx.Req, resc}) //nolint:govet + resc := make(chan readpublisher.AnnounceRes) + c.pathMan.OnReadPublisherAnnounce(readpublisher.AnnounceReq{c, ctx.Path, ctx.Tracks, ctx.Req, resc}) //nolint:govet res := <-resc if res.Err != nil { switch terr := res.Err.(type) { - case client.ErrAuthNotCritical: + case readpublisher.ErrAuthNotCritical: return terr.Response, nil - case client.ErrAuthCritical: + case readpublisher.ErrAuthCritical: // wait some seconds to stop brute force attacks select { case <-time.After(pauseAfterAuthError): @@ -267,16 +267,16 @@ func (c *Client) run() { switch c.conn.State() { case gortsplib.ServerConnStateInitial, gortsplib.ServerConnStatePrePlay: // play - resc := make(chan client.SetupPlayRes) - c.pathMan.OnClientSetupPlay(client.SetupPlayReq{c, ctx.Path, ctx.Req, resc}) //nolint:govet + resc := make(chan readpublisher.SetupPlayRes) + c.pathMan.OnReadPublisherSetupPlay(readpublisher.SetupPlayReq{c, ctx.Path, ctx.Req, resc}) //nolint:govet res := <-resc if res.Err != nil { switch terr := res.Err.(type) { - case client.ErrAuthNotCritical: + case readpublisher.ErrAuthNotCritical: return terr.Response, nil - case client.ErrAuthCritical: + case readpublisher.ErrAuthCritical: // wait some seconds to stop brute force attacks select { case <-time.After(pauseAfterAuthError): @@ -284,7 +284,7 @@ func (c *Client) run() { } return terr.Response, errTerminated - case client.ErrNoOnePublishing: + case readpublisher.ErrNoOnePublishing: return &base.Response{ StatusCode: base.StatusNotFound, }, res.Err @@ -400,13 +400,13 @@ func (c *Client) run() { case gortsplib.ServerConnStatePlay: c.playStop() res := make(chan struct{}) - c.path.OnClientPause(client.PauseReq{c, res}) //nolint:govet + c.path.OnReadPublisherPause(readpublisher.PauseReq{c, res}) //nolint:govet <-res case gortsplib.ServerConnStateRecord: c.recordStop() res := make(chan struct{}) - c.path.OnClientPause(client.PauseReq{c, res}) //nolint:govet + c.path.OnReadPublisherPause(readpublisher.PauseReq{c, res}) //nolint:govet <-res } @@ -458,7 +458,7 @@ func (c *Client) run() { if c.path != nil { res := make(chan struct{}) - c.path.OnClientRemove(client.RemoveReq{c, res}) //nolint:govet + c.path.OnReadPublisherRemove(readpublisher.RemoveReq{c, res}) //nolint:govet <-res c.path = nil } @@ -480,7 +480,7 @@ func (c *Client) run() { if c.path != nil { res := make(chan struct{}) - c.path.OnClientRemove(client.RemoveReq{c, res}) //nolint:govet + c.path.OnReadPublisherRemove(readpublisher.RemoveReq{c, res}) //nolint:govet <-res c.path = nil } @@ -499,7 +499,7 @@ func (c *Client) Authenticate(authMethods []headers.AuthMethod, if !ipEqualOrInRange(ip, ips) { c.log(logger.Info, "ERR: ip '%s' not allowed", ip) - return client.ErrAuthCritical{&base.Response{ //nolint:govet + return readpublisher.ErrAuthCritical{&base.Response{ //nolint:govet StatusCode: base.StatusUnauthorized, }} } @@ -543,7 +543,7 @@ func (c *Client) Authenticate(authMethods []headers.AuthMethod, if c.authFailures > 3 { c.log(logger.Info, "ERR: unauthorized: %s", err) - return client.ErrAuthCritical{&base.Response{ //nolint:govet + return readpublisher.ErrAuthCritical{&base.Response{ //nolint:govet StatusCode: base.StatusUnauthorized, Header: base.Header{ "WWW-Authenticate": c.authValidator.GenerateHeader(), @@ -555,7 +555,7 @@ func (c *Client) Authenticate(authMethods []headers.AuthMethod, c.log(logger.Debug, "WARN: unauthorized: %s", err) } - return client.ErrAuthNotCritical{&base.Response{ //nolint:govet + return readpublisher.ErrAuthNotCritical{&base.Response{ //nolint:govet StatusCode: base.StatusUnauthorized, Header: base.Header{ "WWW-Authenticate": c.authValidator.GenerateHeader(), @@ -570,9 +570,9 @@ func (c *Client) Authenticate(authMethods []headers.AuthMethod, return nil } -func (c *Client) playStart() client.PlayRes { - resc := make(chan client.PlayRes) - c.path.OnClientPlay(client.PlayReq{c, resc}) //nolint:govet +func (c *Client) playStart() readpublisher.PlayRes { + resc := make(chan readpublisher.PlayRes) + c.path.OnReadPublisherPlay(readpublisher.PlayReq{c, resc}) //nolint:govet res := <-resc tracksLen := len(c.conn.SetuppedTracks()) @@ -606,8 +606,8 @@ func (c *Client) playStop() { } func (c *Client) recordStart() error { - resc := make(chan client.RecordRes) - c.path.OnClientRecord(client.RecordReq{Client: c, Res: resc}) + resc := make(chan readpublisher.RecordRes) + c.path.OnReadPublisherRecord(readpublisher.RecordReq{ReadPublisher: c, Res: resc}) res := <-resc if res.Err != nil { diff --git a/internal/path/path.go b/internal/path/path.go index 171a2ca7..5986e6ad 100644 --- a/internal/path/path.go +++ b/internal/path/path.go @@ -11,10 +11,10 @@ import ( "github.com/aler9/gortsplib" "github.com/aler9/gortsplib/pkg/base" - "github.com/aler9/rtsp-simple-server/internal/client" "github.com/aler9/rtsp-simple-server/internal/conf" "github.com/aler9/rtsp-simple-server/internal/externalcmd" "github.com/aler9/rtsp-simple-server/internal/logger" + "github.com/aler9/rtsp-simple-server/internal/readpublisher" "github.com/aler9/rtsp-simple-server/internal/source" "github.com/aler9/rtsp-simple-server/internal/sourcertmp" "github.com/aler9/rtsp-simple-server/internal/sourcertsp" @@ -70,10 +70,10 @@ type Path struct { stats *stats.Stats parent Parent - clients map[client.Client]clientState - clientsWg sync.WaitGroup - describeRequests []client.DescribeReq - setupPlayRequests []client.SetupPlayReq + readPublishers map[readpublisher.ReadPublisher]clientState + readPublishersWg sync.WaitGroup + describeRequests []readpublisher.DescribeReq + setupPlayRequests []readpublisher.SetupPlayReq source source.Source sourceTracks gortsplib.Tracks sp *streamproc.StreamProc @@ -92,13 +92,13 @@ type Path struct { // in extSourceSetReady chan source.ExtSetReadyReq extSourceSetNotReady chan source.ExtSetNotReadyReq - clientDescribe chan client.DescribeReq - clientSetupPlay chan client.SetupPlayReq - clientAnnounce chan client.AnnounceReq - clientPlay chan client.PlayReq - clientRecord chan client.RecordReq - clientPause chan client.PauseReq - clientRemove chan client.RemoveReq + clientDescribe chan readpublisher.DescribeReq + clientSetupPlay chan readpublisher.SetupPlayReq + clientAnnounce chan readpublisher.AnnounceReq + clientPlay chan readpublisher.PlayReq + clientRecord chan readpublisher.RecordReq + clientPause chan readpublisher.PauseReq + clientRemove chan readpublisher.RemoveReq terminate chan struct{} } @@ -128,7 +128,7 @@ func New( wg: wg, stats: stats, parent: parent, - clients: make(map[client.Client]clientState), + readPublishers: make(map[readpublisher.ReadPublisher]clientState), readers: newReadersMap(), describeTimer: newEmptyTimer(), sourceCloseTimer: newEmptyTimer(), @@ -136,13 +136,13 @@ func New( closeTimer: newEmptyTimer(), extSourceSetReady: make(chan source.ExtSetReadyReq), extSourceSetNotReady: make(chan source.ExtSetNotReadyReq), - clientDescribe: make(chan client.DescribeReq), - clientSetupPlay: make(chan client.SetupPlayReq), - clientAnnounce: make(chan client.AnnounceReq), - clientPlay: make(chan client.PlayReq), - clientRecord: make(chan client.RecordReq), - clientPause: make(chan client.PauseReq), - clientRemove: make(chan client.RemoveReq), + clientDescribe: make(chan readpublisher.DescribeReq), + clientSetupPlay: make(chan readpublisher.SetupPlayReq), + clientAnnounce: make(chan readpublisher.AnnounceReq), + clientPlay: make(chan readpublisher.PlayReq), + clientRecord: make(chan readpublisher.RecordReq), + clientPause: make(chan readpublisher.PauseReq), + clientRemove: make(chan readpublisher.RemoveReq), terminate: make(chan struct{}), } @@ -186,16 +186,16 @@ outer: select { case <-pa.describeTimer.C: for _, req := range pa.describeRequests { - req.Res <- client.DescribeRes{nil, "", fmt.Errorf("publisher of path '%s' has timed out", pa.name)} //nolint:govet + req.Res <- readpublisher.DescribeRes{nil, "", fmt.Errorf("publisher of path '%s' has timed out", pa.name)} //nolint:govet } pa.describeRequests = nil for _, req := range pa.setupPlayRequests { - req.Res <- client.SetupPlayRes{nil, nil, fmt.Errorf("publisher of path '%s' has timed out", pa.name)} //nolint:govet + req.Res <- readpublisher.SetupPlayRes{nil, nil, fmt.Errorf("publisher of path '%s' has timed out", pa.name)} //nolint:govet } pa.setupPlayRequests = nil - // set state after removeClient(), so schedule* works once + // set state after removeReadPublisher(), so schedule* works once pa.sourceState = sourceStateNotReady pa.scheduleSourceClose() @@ -234,35 +234,35 @@ outer: close(req.Res) case req := <-pa.clientDescribe: - pa.onClientDescribe(req) + pa.onReadPublisherDescribe(req) case req := <-pa.clientSetupPlay: - pa.onClientSetupPlay(req) + pa.onReadPublisherSetupPlay(req) case req := <-pa.clientAnnounce: - pa.onClientAnnounce(req) + pa.onReadPublisherAnnounce(req) case req := <-pa.clientPlay: - pa.onClientPlay(req) + pa.onReadPublisherPlay(req) case req := <-pa.clientRecord: - pa.onClientRecord(req) + pa.onReadPublisherRecord(req) case req := <-pa.clientPause: - pa.onClientPause(req) + pa.onReadPublisherPause(req) case req := <-pa.clientRemove: - if _, ok := pa.clients[req.Client]; !ok { + if _, ok := pa.readPublishers[req.ReadPublisher]; !ok { close(req.Res) continue } - if pa.clients[req.Client] != clientStatePreRemove { - pa.removeClient(req.Client) + if pa.readPublishers[req.ReadPublisher] != clientStatePreRemove { + pa.removeReadPublisher(req.ReadPublisher) } - delete(pa.clients, req.Client) - pa.clientsWg.Done() + delete(pa.readPublishers, req.ReadPublisher) + pa.readPublishersWg.Done() close(req.Res) case <-pa.terminate: @@ -292,14 +292,14 @@ outer: } for _, req := range pa.describeRequests { - req.Res <- client.DescribeRes{nil, "", fmt.Errorf("terminated")} //nolint:govet + req.Res <- readpublisher.DescribeRes{nil, "", fmt.Errorf("terminated")} //nolint:govet } for _, req := range pa.setupPlayRequests { - req.Res <- client.SetupPlayRes{nil, nil, fmt.Errorf("terminated")} //nolint:govet + req.Res <- readpublisher.SetupPlayRes{nil, nil, fmt.Errorf("terminated")} //nolint:govet } - for c, state := range pa.clients { + for c, state := range pa.readPublishers { if state != clientStatePreRemove { switch state { case clientStatePlay: @@ -312,7 +312,7 @@ outer: c.CloseRequest() } } - pa.clientsWg.Wait() + pa.readPublishersWg.Wait() close(pa.extSourceSetReady) close(pa.extSourceSetNotReady) @@ -345,19 +345,19 @@ func (pa *Path) exhaustChannels() { if !ok { return } - req.Res <- client.DescribeRes{nil, "", fmt.Errorf("terminated")} //nolint:govet + req.Res <- readpublisher.DescribeRes{nil, "", fmt.Errorf("terminated")} //nolint:govet case req, ok := <-pa.clientSetupPlay: if !ok { return } - req.Res <- client.SetupPlayRes{nil, nil, fmt.Errorf("terminated")} //nolint:govet + req.Res <- readpublisher.SetupPlayRes{nil, nil, fmt.Errorf("terminated")} //nolint:govet case req, ok := <-pa.clientAnnounce: if !ok { return } - req.Res <- client.AnnounceRes{nil, fmt.Errorf("terminated")} //nolint:govet + req.Res <- readpublisher.AnnounceRes{nil, fmt.Errorf("terminated")} //nolint:govet case req, ok := <-pa.clientPlay: if !ok { @@ -382,12 +382,12 @@ func (pa *Path) exhaustChannels() { return } - if _, ok := pa.clients[req.Client]; !ok { + if _, ok := pa.readPublishers[req.ReadPublisher]; !ok { close(req.Res) continue } - pa.clientsWg.Done() + pa.readPublishersWg.Done() close(req.Res) } } @@ -426,8 +426,8 @@ func (pa *Path) startExternalSource() { } } -func (pa *Path) hasClients() bool { - for _, state := range pa.clients { +func (pa *Path) hasReadPublishers() bool { + for _, state := range pa.readPublishers { if state != clientStatePreRemove { return true } @@ -435,8 +435,8 @@ func (pa *Path) hasClients() bool { return false } -func (pa *Path) hasClientsNotSources() bool { - for c, state := range pa.clients { +func (pa *Path) hasReadPublishersNotSources() bool { + for c, state := range pa.readPublishers { if state != clientStatePreRemove && c != pa.source { return true } @@ -444,14 +444,14 @@ func (pa *Path) hasClientsNotSources() bool { return false } -func (pa *Path) addClient(c client.Client, state clientState) { - pa.clients[c] = state - pa.clientsWg.Add(1) +func (pa *Path) addReadPublisher(c readpublisher.ReadPublisher, state clientState) { + pa.readPublishers[c] = state + pa.readPublishersWg.Add(1) } -func (pa *Path) removeClient(c client.Client) { - state := pa.clients[c] - pa.clients[c] = clientStatePreRemove +func (pa *Path) removeReadPublisher(c readpublisher.ReadPublisher) { + state := pa.readPublishers[c] + pa.readPublishers[c] = clientStatePreRemove switch state { case clientStatePlay: @@ -466,10 +466,10 @@ func (pa *Path) removeClient(c client.Client) { if pa.source == c { pa.source = nil - // close all clients that are reading or waiting to read - for oc, state := range pa.clients { + // close all readPublishers that are reading or waiting to read + for oc, state := range pa.readPublishers { if state != clientStatePreRemove { - pa.removeClient(oc) + pa.removeReadPublisher(oc) oc.CloseRequest() } } @@ -489,12 +489,12 @@ func (pa *Path) onSourceSetReady() { pa.sourceState = sourceStateReady for _, req := range pa.describeRequests { - req.Res <- client.DescribeRes{pa.sourceTracks.Write(), "", nil} //nolint:govet + req.Res <- readpublisher.DescribeRes{pa.sourceTracks.Write(), "", nil} //nolint:govet } pa.describeRequests = nil for _, req := range pa.setupPlayRequests { - pa.onClientSetupPlayPost(req) + pa.onReadPublisherSetupPlayPost(req) } pa.setupPlayRequests = nil @@ -506,10 +506,10 @@ func (pa *Path) onSourceSetReady() { func (pa *Path) onSourceSetNotReady() { pa.sourceState = sourceStateNotReady - // close all clients that are reading or waiting to read - for c, state := range pa.clients { + // close all readPublishers that are reading or waiting to read + for c, state := range pa.readPublishers { if c != pa.source && state != clientStatePreRemove { - pa.removeClient(c) + pa.removeReadPublisher(c) c.CloseRequest() } } @@ -556,9 +556,9 @@ func (pa *Path) fixedPublisherStart() { } } -func (pa *Path) onClientDescribe(req client.DescribeReq) { - if _, ok := pa.clients[req.Client]; ok { - req.Res <- client.DescribeRes{nil, "", fmt.Errorf("already subscribed")} //nolint:govet +func (pa *Path) onReadPublisherDescribe(req readpublisher.DescribeReq) { + if _, ok := pa.readPublishers[req.ReadPublisher]; ok { + req.Res <- readpublisher.DescribeRes{nil, "", fmt.Errorf("already subscribed")} //nolint:govet return } @@ -566,13 +566,13 @@ func (pa *Path) onClientDescribe(req client.DescribeReq) { pa.scheduleClose() if _, ok := pa.source.(*sourceRedirect); ok { - req.Res <- client.DescribeRes{nil, pa.conf.SourceRedirect, nil} //nolint:govet + req.Res <- readpublisher.DescribeRes{nil, pa.conf.SourceRedirect, nil} //nolint:govet return } switch pa.sourceState { case sourceStateReady: - req.Res <- client.DescribeRes{pa.sourceTracks.Write(), "", nil} //nolint:govet + req.Res <- readpublisher.DescribeRes{pa.sourceTracks.Write(), "", nil} //nolint:govet return case sourceStateWaitingDescribe: @@ -593,22 +593,22 @@ func (pa *Path) onClientDescribe(req client.DescribeReq) { } return pa.conf.Fallback }() - req.Res <- client.DescribeRes{nil, fallbackURL, nil} //nolint:govet + req.Res <- readpublisher.DescribeRes{nil, fallbackURL, nil} //nolint:govet return } - req.Res <- client.DescribeRes{nil, "", client.ErrNoOnePublishing{pa.name}} //nolint:govet + req.Res <- readpublisher.DescribeRes{nil, "", readpublisher.ErrNoOnePublishing{pa.name}} //nolint:govet return } } -func (pa *Path) onClientSetupPlay(req client.SetupPlayReq) { +func (pa *Path) onReadPublisherSetupPlay(req readpublisher.SetupPlayReq) { pa.fixedPublisherStart() pa.scheduleClose() switch pa.sourceState { case sourceStateReady: - pa.onClientSetupPlayPost(req) + pa.onReadPublisherSetupPlayPost(req) return case sourceStateWaitingDescribe: @@ -616,13 +616,13 @@ func (pa *Path) onClientSetupPlay(req client.SetupPlayReq) { return case sourceStateNotReady: - req.Res <- client.SetupPlayRes{nil, nil, client.ErrNoOnePublishing{pa.name}} //nolint:govet + req.Res <- readpublisher.SetupPlayRes{nil, nil, readpublisher.ErrNoOnePublishing{pa.name}} //nolint:govet return } } -func (pa *Path) onClientSetupPlayPost(req client.SetupPlayReq) { - if _, ok := pa.clients[req.Client]; !ok { +func (pa *Path) onReadPublisherSetupPlayPost(req readpublisher.SetupPlayReq) { + if _, ok := pa.readPublishers[req.ReadPublisher]; !ok { // prevent on-demand source from closing if pa.sourceCloseTimerStarted { pa.sourceCloseTimer = newEmptyTimer() @@ -635,40 +635,40 @@ func (pa *Path) onClientSetupPlayPost(req client.SetupPlayReq) { pa.runOnDemandCloseTimerStarted = false } - pa.addClient(req.Client, clientStatePrePlay) + pa.addReadPublisher(req.ReadPublisher, clientStatePrePlay) } - req.Res <- client.SetupPlayRes{pa, pa.sourceTracks, nil} //nolint:govet + req.Res <- readpublisher.SetupPlayRes{pa, pa.sourceTracks, nil} //nolint:govet } -func (pa *Path) onClientPlay(req client.PlayReq) { +func (pa *Path) onReadPublisherPlay(req readpublisher.PlayReq) { atomic.AddInt64(pa.stats.CountReaders, 1) - pa.clients[req.Client] = clientStatePlay - pa.readers.add(req.Client) + pa.readPublishers[req.ReadPublisher] = clientStatePlay + pa.readers.add(req.ReadPublisher) - req.Res <- client.PlayRes{TrackInfos: pa.sp.TrackInfos()} + req.Res <- readpublisher.PlayRes{TrackInfos: pa.sp.TrackInfos()} } -func (pa *Path) onClientAnnounce(req client.AnnounceReq) { - if _, ok := pa.clients[req.Client]; ok { - req.Res <- client.AnnounceRes{nil, fmt.Errorf("already publishing or reading")} //nolint:govet +func (pa *Path) onReadPublisherAnnounce(req readpublisher.AnnounceReq) { + if _, ok := pa.readPublishers[req.ReadPublisher]; ok { + req.Res <- readpublisher.AnnounceRes{nil, fmt.Errorf("already publishing or reading")} //nolint:govet return } if pa.hasExternalSource() { - req.Res <- client.AnnounceRes{nil, fmt.Errorf("path '%s' is assigned to an external source", pa.name)} //nolint:govet + req.Res <- readpublisher.AnnounceRes{nil, fmt.Errorf("path '%s' is assigned to an external source", pa.name)} //nolint:govet return } if pa.source != nil { if pa.conf.DisablePublisherOverride { - req.Res <- client.AnnounceRes{nil, fmt.Errorf("another client is already publishing on path '%s'", pa.name)} //nolint:govet + req.Res <- readpublisher.AnnounceRes{nil, fmt.Errorf("another client is already publishing on path '%s'", pa.name)} //nolint:govet return } pa.Log(logger.Info, "disconnecting existing publisher") - curPublisher := pa.source.(client.Client) - pa.removeClient(curPublisher) + curPublisher := pa.source.(readpublisher.ReadPublisher) + pa.removeReadPublisher(curPublisher) curPublisher.CloseRequest() // prevent path closure @@ -679,30 +679,30 @@ func (pa *Path) onClientAnnounce(req client.AnnounceReq) { } } - pa.addClient(req.Client, clientStatePreRecord) + pa.addReadPublisher(req.ReadPublisher, clientStatePreRecord) - pa.source = req.Client + pa.source = req.ReadPublisher pa.sourceTracks = req.Tracks - req.Res <- client.AnnounceRes{pa, nil} //nolint:govet + req.Res <- readpublisher.AnnounceRes{pa, nil} //nolint:govet } -func (pa *Path) onClientRecord(req client.RecordReq) { - if state, ok := pa.clients[req.Client]; !ok || state != clientStatePreRecord { - req.Res <- client.RecordRes{SP: nil, Err: fmt.Errorf("not recording anymore")} +func (pa *Path) onReadPublisherRecord(req readpublisher.RecordReq) { + if state, ok := pa.readPublishers[req.ReadPublisher]; !ok || state != clientStatePreRecord { + req.Res <- readpublisher.RecordRes{SP: nil, Err: fmt.Errorf("not recording anymore")} return } atomic.AddInt64(pa.stats.CountPublishers, 1) - pa.clients[req.Client] = clientStateRecord + pa.readPublishers[req.ReadPublisher] = clientStateRecord pa.onSourceSetReady() pa.sp = streamproc.New(pa, len(pa.sourceTracks)) - req.Res <- client.RecordRes{SP: pa.sp, Err: nil} + req.Res <- readpublisher.RecordRes{SP: pa.sp, Err: nil} } -func (pa *Path) onClientPause(req client.PauseReq) { - state, ok := pa.clients[req.Client] +func (pa *Path) onReadPublisherPause(req readpublisher.PauseReq) { + state, ok := pa.readPublishers[req.ReadPublisher] if !ok { close(req.Res) return @@ -710,12 +710,12 @@ func (pa *Path) onClientPause(req client.PauseReq) { if state == clientStatePlay { atomic.AddInt64(pa.stats.CountReaders, -1) - pa.clients[req.Client] = clientStatePrePlay - pa.readers.remove(req.Client) + pa.readPublishers[req.ReadPublisher] = clientStatePrePlay + pa.readers.remove(req.ReadPublisher) } else if state == clientStateRecord { atomic.AddInt64(pa.stats.CountPublishers, -1) - pa.clients[req.Client] = clientStatePreRecord + pa.readPublishers[req.ReadPublisher] = clientStatePreRecord pa.onSourceSetNotReady() } @@ -729,7 +729,7 @@ func (pa *Path) scheduleSourceClose() { if pa.sourceCloseTimerStarted || pa.sourceState == sourceStateWaitingDescribe || - pa.hasClients() { + pa.hasReadPublishers() { return } @@ -745,7 +745,7 @@ func (pa *Path) scheduleRunOnDemandClose() { if pa.runOnDemandCloseTimerStarted || pa.sourceState == sourceStateWaitingDescribe || - pa.hasClientsNotSources() { + pa.hasReadPublishersNotSources() { return } @@ -756,7 +756,7 @@ func (pa *Path) scheduleRunOnDemandClose() { func (pa *Path) scheduleClose() { if pa.conf.Regexp != nil && - !pa.hasClients() && + !pa.hasReadPublishers() && pa.source == nil && pa.sourceState != sourceStateWaitingDescribe && !pa.sourceCloseTimerStarted && @@ -795,37 +795,37 @@ func (pa *Path) OnExtSourceSetNotReady(req source.ExtSetNotReadyReq) { } // OnPathManDescribe is called by pathman.PathMan. -func (pa *Path) OnPathManDescribe(req client.DescribeReq) { +func (pa *Path) OnPathManDescribe(req readpublisher.DescribeReq) { pa.clientDescribe <- req } // OnPathManSetupPlay is called by pathman.PathMan. -func (pa *Path) OnPathManSetupPlay(req client.SetupPlayReq) { +func (pa *Path) OnPathManSetupPlay(req readpublisher.SetupPlayReq) { pa.clientSetupPlay <- req } // OnPathManAnnounce is called by pathman.PathMan. -func (pa *Path) OnPathManAnnounce(req client.AnnounceReq) { +func (pa *Path) OnPathManAnnounce(req readpublisher.AnnounceReq) { pa.clientAnnounce <- req } -// OnClientRemove is called by a client. -func (pa *Path) OnClientRemove(req client.RemoveReq) { +// OnReadPublisherRemove is called by a readpublisher. +func (pa *Path) OnReadPublisherRemove(req readpublisher.RemoveReq) { pa.clientRemove <- req } -// OnClientPlay is called by a client. -func (pa *Path) OnClientPlay(req client.PlayReq) { +// OnReadPublisherPlay is called by a readpublisher. +func (pa *Path) OnReadPublisherPlay(req readpublisher.PlayReq) { pa.clientPlay <- req } -// OnClientRecord is called by a client. -func (pa *Path) OnClientRecord(req client.RecordReq) { +// OnReadPublisherRecord is called by a readpublisher. +func (pa *Path) OnReadPublisherRecord(req readpublisher.RecordReq) { pa.clientRecord <- req } -// OnClientPause is called by a client. -func (pa *Path) OnClientPause(req client.PauseReq) { +// OnReadPublisherPause is called by a readpublisher. +func (pa *Path) OnReadPublisherPause(req readpublisher.PauseReq) { pa.clientPause <- req } diff --git a/internal/pathman/pathman.go b/internal/pathman/pathman.go index 1431c251..febf89a8 100644 --- a/internal/pathman/pathman.go +++ b/internal/pathman/pathman.go @@ -7,10 +7,10 @@ import ( "github.com/aler9/gortsplib/pkg/headers" - "github.com/aler9/rtsp-simple-server/internal/client" "github.com/aler9/rtsp-simple-server/internal/conf" "github.com/aler9/rtsp-simple-server/internal/logger" "github.com/aler9/rtsp-simple-server/internal/path" + "github.com/aler9/rtsp-simple-server/internal/readpublisher" "github.com/aler9/rtsp-simple-server/internal/stats" ) @@ -37,9 +37,9 @@ type PathManager struct { // in confReload chan map[string]*conf.PathConf pathClose chan *path.Path - clientDescribe chan client.DescribeReq - clientSetupPlay chan client.SetupPlayReq - clientAnnounce chan client.AnnounceReq + clientDescribe chan readpublisher.DescribeReq + clientSetupPlay chan readpublisher.SetupPlayReq + clientAnnounce chan readpublisher.AnnounceReq terminate chan struct{} // out @@ -71,9 +71,9 @@ func New( paths: make(map[string]*path.Path), confReload: make(chan map[string]*conf.PathConf), pathClose: make(chan *path.Path), - clientDescribe: make(chan client.DescribeReq), - clientSetupPlay: make(chan client.SetupPlayReq), - clientAnnounce: make(chan client.AnnounceReq), + clientDescribe: make(chan readpublisher.DescribeReq), + clientSetupPlay: make(chan readpublisher.SetupPlayReq), + clientAnnounce: make(chan readpublisher.AnnounceReq), terminate: make(chan struct{}), done: make(chan struct{}), } @@ -145,11 +145,11 @@ outer: case req := <-pm.clientDescribe: pathName, pathConf, err := pm.findPathConf(req.PathName) if err != nil { - req.Res <- client.DescribeRes{nil, "", err} //nolint:govet + req.Res <- readpublisher.DescribeRes{nil, "", err} //nolint:govet continue } - err = req.Client.Authenticate( + err = req.ReadPublisher.Authenticate( pm.authMethods, req.PathName, pathConf.ReadIpsParsed, @@ -157,7 +157,7 @@ outer: pathConf.ReadPass, req.Data) if err != nil { - req.Res <- client.DescribeRes{nil, "", err} //nolint:govet + req.Res <- readpublisher.DescribeRes{nil, "", err} //nolint:govet continue } @@ -171,11 +171,11 @@ outer: case req := <-pm.clientSetupPlay: pathName, pathConf, err := pm.findPathConf(req.PathName) if err != nil { - req.Res <- client.SetupPlayRes{nil, nil, err} //nolint:govet + req.Res <- readpublisher.SetupPlayRes{nil, nil, err} //nolint:govet continue } - err = req.Client.Authenticate( + err = req.ReadPublisher.Authenticate( pm.authMethods, req.PathName, pathConf.ReadIpsParsed, @@ -183,7 +183,7 @@ outer: pathConf.ReadPass, req.Data) if err != nil { - req.Res <- client.SetupPlayRes{nil, nil, err} //nolint:govet + req.Res <- readpublisher.SetupPlayRes{nil, nil, err} //nolint:govet continue } @@ -197,11 +197,11 @@ outer: case req := <-pm.clientAnnounce: pathName, pathConf, err := pm.findPathConf(req.PathName) if err != nil { - req.Res <- client.AnnounceRes{nil, err} //nolint:govet + req.Res <- readpublisher.AnnounceRes{nil, err} //nolint:govet continue } - err = req.Client.Authenticate( + err = req.ReadPublisher.Authenticate( pm.authMethods, req.PathName, pathConf.PublishIpsParsed, @@ -209,7 +209,7 @@ outer: pathConf.PublishPass, req.Data) if err != nil { - req.Res <- client.AnnounceRes{nil, err} //nolint:govet + req.Res <- readpublisher.AnnounceRes{nil, err} //nolint:govet continue } @@ -242,19 +242,19 @@ outer: if !ok { return } - req.Res <- client.DescribeRes{nil, "", fmt.Errorf("terminated")} //nolint:govet + req.Res <- readpublisher.DescribeRes{nil, "", fmt.Errorf("terminated")} //nolint:govet case req, ok := <-pm.clientSetupPlay: if !ok { return } - req.Res <- client.SetupPlayRes{nil, nil, fmt.Errorf("terminated")} //nolint:govet + req.Res <- readpublisher.SetupPlayRes{nil, nil, fmt.Errorf("terminated")} //nolint:govet case req, ok := <-pm.clientAnnounce: if !ok { return } - req.Res <- client.AnnounceRes{nil, fmt.Errorf("terminated")} //nolint:govet + req.Res <- readpublisher.AnnounceRes{nil, fmt.Errorf("terminated")} //nolint:govet } } }() @@ -325,17 +325,17 @@ func (pm *PathManager) OnPathClose(pa *path.Path) { pm.pathClose <- pa } -// OnClientDescribe is called by clientman.ClientMan. -func (pm *PathManager) OnClientDescribe(req client.DescribeReq) { +// OnReadPublisherDescribe is called by clientman.ClientMan. +func (pm *PathManager) OnReadPublisherDescribe(req readpublisher.DescribeReq) { pm.clientDescribe <- req } -// OnClientAnnounce is called by clientman.ClientMan. -func (pm *PathManager) OnClientAnnounce(req client.AnnounceReq) { +// OnReadPublisherAnnounce is called by clientman.ClientMan. +func (pm *PathManager) OnReadPublisherAnnounce(req readpublisher.AnnounceReq) { pm.clientAnnounce <- req } -// OnClientSetupPlay is called by clientman.ClientMan. -func (pm *PathManager) OnClientSetupPlay(req client.SetupPlayReq) { +// OnReadPublisherSetupPlay is called by clientman.ClientMan. +func (pm *PathManager) OnReadPublisherSetupPlay(req readpublisher.SetupPlayReq) { pm.clientSetupPlay <- req } diff --git a/internal/client/client.go b/internal/readpublisher/readpublisher.go similarity index 72% rename from internal/client/client.go rename to internal/readpublisher/readpublisher.go index 97aadffa..c099a281 100644 --- a/internal/client/client.go +++ b/internal/readpublisher/readpublisher.go @@ -1,4 +1,4 @@ -package client +package readpublisher import ( "fmt" @@ -45,10 +45,10 @@ func (ErrAuthCritical) Error() string { type Path interface { Name() string Conf() *conf.PathConf - OnClientRemove(RemoveReq) - OnClientPlay(PlayReq) - OnClientRecord(RecordReq) - OnClientPause(PauseReq) + OnReadPublisherRemove(RemoveReq) + OnReadPublisherPlay(PlayReq) + OnReadPublisherRecord(RecordReq) + OnReadPublisherPause(PauseReq) } // DescribeRes is a describe response. @@ -60,10 +60,10 @@ type DescribeRes struct { // DescribeReq is a describe request. type DescribeReq struct { - Client Client - PathName string - Data *base.Request - Res chan DescribeRes + ReadPublisher ReadPublisher + PathName string + Data *base.Request + Res chan DescribeRes } // SetupPlayRes is a setup/play response. @@ -75,10 +75,10 @@ type SetupPlayRes struct { // SetupPlayReq is a setup/play request. type SetupPlayReq struct { - Client Client - PathName string - Data interface{} - Res chan SetupPlayRes + ReadPublisher ReadPublisher + PathName string + Data interface{} + Res chan SetupPlayRes } // AnnounceRes is a announce response. @@ -89,17 +89,17 @@ type AnnounceRes struct { // AnnounceReq is a announce request. type AnnounceReq struct { - Client Client - PathName string - Tracks gortsplib.Tracks - Data interface{} - Res chan AnnounceRes + ReadPublisher ReadPublisher + PathName string + Tracks gortsplib.Tracks + Data interface{} + Res chan AnnounceRes } // RemoveReq is a remove request. type RemoveReq struct { - Client Client - Res chan struct{} + ReadPublisher ReadPublisher + Res chan struct{} } // PlayRes is a play response. @@ -109,8 +109,8 @@ type PlayRes struct { // PlayReq is a play request. type PlayReq struct { - Client Client - Res chan PlayRes + ReadPublisher ReadPublisher + Res chan PlayRes } // RecordRes is a record response. @@ -121,19 +121,19 @@ type RecordRes struct { // RecordReq is a record request. type RecordReq struct { - Client Client - Res chan RecordRes + ReadPublisher ReadPublisher + Res chan RecordRes } // PauseReq is a pause request. type PauseReq struct { - Client Client - Res chan struct{} + ReadPublisher ReadPublisher + Res chan struct{} } -// Client is implemented by all client*. -type Client interface { - IsClient() +// ReadPublisher is an entity that can read/publish from/to a path. +type ReadPublisher interface { + IsReadPublisher() IsSource() Close() CloseRequest() diff --git a/internal/rtmp/client.go b/internal/rtmp/client.go index 15d83762..c4e492cf 100644 --- a/internal/rtmp/client.go +++ b/internal/rtmp/client.go @@ -11,7 +11,7 @@ import ( // DialContext connects to a server in reading mode. func DialContext(ctx context.Context, address string) (*Conn, error) { - // https://github.com/aler9/rtmp/blob/master/format/rtmp/client.go#L74 + // https://github.com/aler9/rtmp/blob/master/format/rtmp/readpublisher.go#L74 u, err := url.Parse(address) if err != nil { diff --git a/internal/serverhls/server.go b/internal/serverhls/server.go index 3291f9f0..2eb69399 100644 --- a/internal/serverhls/server.go +++ b/internal/serverhls/server.go @@ -206,7 +206,7 @@ func (s *Server) doClientClose(c *clienthls.Client) { c.Close() } -// OnClientClose is called by a client. +// OnClientClose is called by a readpublisher. func (s *Server) OnClientClose(c *clienthls.Client) { s.clientClose <- c } diff --git a/internal/serverrtmp/server.go b/internal/serverrtmp/server.go index 51d11370..d5cc7716 100644 --- a/internal/serverrtmp/server.go +++ b/internal/serverrtmp/server.go @@ -186,7 +186,7 @@ func (s *Server) doClientClose(c *clientrtmp.Client) { c.Close() } -// OnClientClose is called by a client. +// OnClientClose is called by a readpublisher. func (s *Server) OnClientClose(c *clientrtmp.Client) { s.clientClose <- c } diff --git a/internal/serverrtsp/server.go b/internal/serverrtsp/server.go index dcbe35ae..f92515e3 100644 --- a/internal/serverrtsp/server.go +++ b/internal/serverrtsp/server.go @@ -230,7 +230,7 @@ func (s *Server) doClientClose(c *clientrtsp.Client) { c.Close() } -// OnClientClose is called by a client. +// OnClientClose is called by a readpublisher. func (s *Server) OnClientClose(c *clientrtsp.Client) { s.clientClose <- c } diff --git a/internal/source/source.go b/internal/source/source.go index 105c65be..707f13a2 100644 --- a/internal/source/source.go +++ b/internal/source/source.go @@ -4,12 +4,12 @@ import ( "github.com/aler9/gortsplib" ) -// Source is implemented by all sources (clients and external sources). +// Source is a source. type Source interface { IsSource() } -// ExtSource is implemented by all external sources. +// ExtSource is an external source. type ExtSource interface { IsSource() IsExtSource()