diff --git a/internal/clientrtmp/client.go b/internal/clientrtmp/client.go index 0de8325b..c699e648 100644 --- a/internal/clientrtmp/client.go +++ b/internal/clientrtmp/client.go @@ -152,7 +152,7 @@ func (c *Client) CloseRequest() { // IsReadPublisher implements readpublisher.ReadPublisher. func (c *Client) IsReadPublisher() {} -// IsSource implements path.source. +// IsSource implements source.Source. func (c *Client) IsSource() {} func (c *Client) log(level logger.Level, format string, args ...interface{}) { diff --git a/internal/clientrtsp/client.go b/internal/clientrtsp/client.go index 454a6fec..6b32190a 100644 --- a/internal/clientrtsp/client.go +++ b/internal/clientrtsp/client.go @@ -141,7 +141,7 @@ func (c *Client) CloseRequest() { // IsReadPublisher implements readpublisher.ReadPublisher. func (c *Client) IsReadPublisher() {} -// IsSource implements path.source. +// IsSource implements source.Source. func (c *Client) IsSource() {} func (c *Client) log(level logger.Level, format string, args ...interface{}) { diff --git a/internal/clienthls/client.go b/internal/converterhls/converter.go similarity index 90% rename from internal/clienthls/client.go rename to internal/converterhls/converter.go index 067b0c3d..f26c453d 100644 --- a/internal/clienthls/client.go +++ b/internal/converterhls/converter.go @@ -1,4 +1,4 @@ -package clienthls +package converterhls import ( "bytes" @@ -122,11 +122,11 @@ type PathMan interface { // Parent is implemented by serverhls.Server. type Parent interface { Log(logger.Level, string, ...interface{}) - OnClientClose(*Client) + OnConverterClose(*Converter) } -// Client is a HLS client. -type Client struct { +// Converter is an HLS converter. +type Converter struct { hlsSegmentCount int hlsSegmentDuration time.Duration readBufferCount int @@ -149,7 +149,7 @@ type Client struct { terminate chan struct{} } -// New allocates a Client. +// New allocates a Converter. func New( hlsSegmentCount int, hlsSegmentDuration time.Duration, @@ -158,9 +158,9 @@ func New( stats *stats.Stats, pathName string, pathMan PathMan, - parent Parent) *Client { + parent Parent) *Converter { - c := &Client{ + c := &Converter{ hlsSegmentCount: hlsSegmentCount, hlsSegmentDuration: hlsSegmentDuration, readBufferCount: readBufferCount, @@ -175,8 +175,7 @@ func New( terminate: make(chan struct{}), } - atomic.AddInt64(c.stats.CountClients, 1) - c.log(logger.Info, "connected") + c.log(logger.Info, "created") c.wg.Add(1) go c.run() @@ -184,34 +183,33 @@ func New( return c } -// Close closes a Client. -func (c *Client) Close() { - atomic.AddInt64(c.stats.CountClients, -1) - c.log(logger.Info, "disconnected") +// Close closes a Converter. +func (c *Converter) Close() { + c.log(logger.Info, "destroyed") close(c.terminate) } -// CloseRequest closes a Client. -func (c *Client) CloseRequest() { - c.parent.OnClientClose(c) +// CloseRequest closes a Converter. +func (c *Converter) CloseRequest() { + c.parent.OnConverterClose(c) } // IsReadPublisher implements readpublisher.ReadPublisher. -func (c *Client) IsReadPublisher() {} +func (c *Converter) IsReadPublisher() {} -// IsSource implements path.source. -func (c *Client) IsSource() {} +// IsSource implements source.Source. +func (c *Converter) IsSource() {} -func (c *Client) log(level logger.Level, format string, args ...interface{}) { - c.parent.Log(level, "[client %s] "+format, append([]interface{}{c.pathName}, args...)...) +func (c *Converter) log(level logger.Level, format string, args ...interface{}) { + c.parent.Log(level, "[converter %s] "+format, append([]interface{}{c.pathName}, args...)...) } // PathName returns the path name of the readpublisher. -func (c *Client) PathName() string { +func (c *Converter) PathName() string { return c.pathName } -func (c *Client) run() { +func (c *Converter) run() { defer c.wg.Done() var videoTrack *gortsplib.Track @@ -290,7 +288,7 @@ func (c *Client) run() { <-res } - c.parent.OnClientClose(c) + c.parent.OnConverterClose(c) <-c.terminate close(c.request) @@ -319,7 +317,7 @@ func (c *Client) run() { c.path.OnReadPublisherPlay(readpublisher.PlayReq{c, resc}) //nolint:govet <-resc - c.log(logger.Info, "is reading from path '%s'", c.pathName) + c.log(logger.Info, "is reading") writerDone := make(chan error) go func() { @@ -486,7 +484,7 @@ func (c *Client) run() { c.path.OnReadPublisherRemove(readpublisher.RemoveReq{c, res}) //nolint:govet <-res - c.parent.OnClientClose(c) + c.parent.OnConverterClose(c) <-c.terminate return } @@ -498,7 +496,7 @@ func (c *Client) run() { c.path.OnReadPublisherRemove(readpublisher.RemoveReq{c, res}) //nolint:govet <-res - c.parent.OnClientClose(c) + c.parent.OnConverterClose(c) <-c.terminate return @@ -514,7 +512,7 @@ func (c *Client) run() { } } -func (c *Client) runRequestHandler(done chan struct{}) { +func (c *Converter) runRequestHandler(done chan struct{}) { defer close(done) for preq := range c.request { @@ -594,20 +592,20 @@ func (c *Client) runRequestHandler(done chan struct{}) { } } -// OnRequest is called by clientman.ClientMan. -func (c *Client) OnRequest(req Request) { +// OnRequest is called by serverhls.Server. +func (c *Converter) OnRequest(req Request) { c.request <- req } // Authenticate performs an authentication. -func (c *Client) Authenticate(authMethods []headers.AuthMethod, +func (c *Converter) Authenticate(authMethods []headers.AuthMethod, pathName string, ips []interface{}, user string, pass string, req interface{}) error { return nil } // OnFrame implements path.Reader. -func (c *Client) OnFrame(trackID int, streamType gortsplib.StreamType, payload []byte) { +func (c *Converter) OnFrame(trackID int, streamType gortsplib.StreamType, payload []byte) { if streamType == gortsplib.StreamTypeRTP { c.ringBuffer.Push(trackIDPayloadPair{trackID, payload}) } diff --git a/internal/clienthls/multiaccessbuffer.go b/internal/converterhls/multiaccessbuffer.go similarity index 98% rename from internal/clienthls/multiaccessbuffer.go rename to internal/converterhls/multiaccessbuffer.go index ff06ecad..18d2be01 100644 --- a/internal/clienthls/multiaccessbuffer.go +++ b/internal/converterhls/multiaccessbuffer.go @@ -1,4 +1,4 @@ -package clienthls +package converterhls import ( "bytes" diff --git a/internal/clienthls/multiaccessbuffer_test.go b/internal/converterhls/multiaccessbuffer_test.go similarity index 97% rename from internal/clienthls/multiaccessbuffer_test.go rename to internal/converterhls/multiaccessbuffer_test.go index 1d5f4313..67b29c50 100644 --- a/internal/clienthls/multiaccessbuffer_test.go +++ b/internal/converterhls/multiaccessbuffer_test.go @@ -1,4 +1,4 @@ -package clienthls +package converterhls import ( "io" diff --git a/internal/clienthls/tsfile.go b/internal/converterhls/tsfile.go similarity index 99% rename from internal/clienthls/tsfile.go rename to internal/converterhls/tsfile.go index 79ba9e18..930f770f 100644 --- a/internal/clienthls/tsfile.go +++ b/internal/converterhls/tsfile.go @@ -1,4 +1,4 @@ -package clienthls +package converterhls import ( "context" diff --git a/internal/serverhls/server.go b/internal/serverhls/server.go index 2eb69399..90853b6e 100644 --- a/internal/serverhls/server.go +++ b/internal/serverhls/server.go @@ -9,7 +9,7 @@ import ( "sync" "time" - "github.com/aler9/rtsp-simple-server/internal/clienthls" + "github.com/aler9/rtsp-simple-server/internal/converterhls" "github.com/aler9/rtsp-simple-server/internal/logger" "github.com/aler9/rtsp-simple-server/internal/pathman" "github.com/aler9/rtsp-simple-server/internal/stats" @@ -29,13 +29,13 @@ type Server struct { pathMan *pathman.PathManager parent Parent - ln net.Listener - wg sync.WaitGroup - clients map[string]*clienthls.Client + ln net.Listener + wg sync.WaitGroup + converters map[string]*converterhls.Converter // in - request chan clienthls.Request - clientClose chan *clienthls.Client + request chan converterhls.Request + clientClose chan *converterhls.Converter terminate chan struct{} // out @@ -66,9 +66,9 @@ func New( pathMan: pathMan, parent: parent, ln: ln, - clients: make(map[string]*clienthls.Client), - request: make(chan clienthls.Request), - clientClose: make(chan *clienthls.Client), + converters: make(map[string]*converterhls.Converter), + request: make(chan converterhls.Request), + clientClose: make(chan *converterhls.Converter), terminate: make(chan struct{}), done: make(chan struct{}), } @@ -101,9 +101,9 @@ outer: for { select { case req := <-s.request: - c, ok := s.clients[req.Path] + c, ok := s.converters[req.Path] if !ok { - c = clienthls.New( + c = converterhls.New( s.hlsSegmentCount, s.hlsSegmentDuration, s.readBufferCount, @@ -112,15 +112,15 @@ outer: req.Path, s.pathMan, s) - s.clients[req.Path] = c + s.converters[req.Path] = c } c.OnRequest(req) case c := <-s.clientClose: - if c2, ok := s.clients[c.PathName()]; !ok || c2 != c { + if c2, ok := s.converters[c.PathName()]; !ok || c2 != c { continue } - s.doClientClose(c) + s.doConverterClose(c) case <-s.terminate: break outer @@ -144,8 +144,8 @@ outer: } }() - for _, c := range s.clients { - s.doClientClose(c) + for _, c := range s.converters { + s.doConverterClose(c) } hs.Shutdown(context.Background()) @@ -156,7 +156,7 @@ outer: // ServeHTTP implements http.Handler. func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { - s.Log(logger.Info, "%s %s from %s", r.Method, r.URL.Path, r.RemoteAddr) + s.Log(logger.Info, "[client %s] %s %s", r.RemoteAddr, r.Method, r.URL.Path) // remove leading prefix path := r.URL.Path[1:] @@ -174,7 +174,7 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { } cres := make(chan io.Reader) - s.request <- clienthls.Request{ + s.request <- converterhls.Request{ Path: parts[0], Subpath: parts[1], Req: r, @@ -201,12 +201,12 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { } } -func (s *Server) doClientClose(c *clienthls.Client) { - delete(s.clients, c.PathName()) +func (s *Server) doConverterClose(c *converterhls.Converter) { + delete(s.converters, c.PathName()) c.Close() } -// OnClientClose is called by a readpublisher. -func (s *Server) OnClientClose(c *clienthls.Client) { +// OnConverterClose is called by converterhls.Converter. +func (s *Server) OnConverterClose(c *converterhls.Converter) { s.clientClose <- c } diff --git a/internal/serverrtmp/server.go b/internal/serverrtmp/server.go index d5cc7716..3c368ac5 100644 --- a/internal/serverrtmp/server.go +++ b/internal/serverrtmp/server.go @@ -186,7 +186,7 @@ func (s *Server) doClientClose(c *clientrtmp.Client) { c.Close() } -// OnClientClose is called by a readpublisher. +// OnClientClose is called by clientrtmp.Client. func (s *Server) OnClientClose(c *clientrtmp.Client) { s.clientClose <- c } diff --git a/internal/serverrtsp/server.go b/internal/serverrtsp/server.go index f92515e3..b224a402 100644 --- a/internal/serverrtsp/server.go +++ b/internal/serverrtsp/server.go @@ -230,7 +230,7 @@ func (s *Server) doClientClose(c *clientrtsp.Client) { c.Close() } -// OnClientClose is called by a readpublisher. +// OnClientClose is called by clientrtsp.Client. func (s *Server) OnClientClose(c *clientrtsp.Client) { s.clientClose <- c }