diff --git a/internal/core/hls_remuxer.go b/internal/core/hls_remuxer.go index 5edada55..8b365e5f 100644 --- a/internal/core/hls_remuxer.go +++ b/internal/core/hls_remuxer.go @@ -111,6 +111,7 @@ type hlsRemuxer struct { ringBuffer *ringbuffer.RingBuffer lastRequestTime *int64 muxer *hls.Muxer + requests []hlsRemuxerRequest // in request chan hlsRemuxerRequest @@ -177,22 +178,44 @@ func (r *hlsRemuxer) PathName() string { func (r *hlsRemuxer) run() { defer r.wg.Done() - innerCtx, innerCtxCancel := context.WithCancel(context.Background()) - runErr := make(chan error) + remuxerCtx, remuxerCtxCancel := context.WithCancel(context.Background()) + remuxerReady := make(chan struct{}) + remuxerErr := make(chan error) go func() { - runErr <- r.runInner(innerCtx) + remuxerErr <- r.runRemuxer(remuxerCtx, remuxerReady) }() - select { - case err := <-runErr: - innerCtxCancel() - if err != nil { - r.log(logger.Info, "ERR: %s", err) - } + isReady := false - case <-r.ctx.Done(): - innerCtxCancel() - <-runErr +outer: + for { + select { + case <-r.ctx.Done(): + remuxerCtxCancel() + <-remuxerErr + break outer + + case req := <-r.request: + if isReady { + r.handleRequest(req) + } else { + r.requests = append(r.requests, req) + } + + case <-remuxerReady: + isReady = true + for _, req := range r.requests { + r.handleRequest(req) + } + r.requests = nil + + case err := <-remuxerErr: + remuxerCtxCancel() + if err != nil { + r.log(logger.Info, "ERR: %s", err) + } + break outer + } } r.ctxCancel() @@ -200,14 +223,13 @@ func (r *hlsRemuxer) run() { r.parent.OnRemuxerClose(r) } -func (r *hlsRemuxer) runInner(innerCtx context.Context) error { +func (r *hlsRemuxer) runRemuxer(remuxerCtx context.Context, remuxerReady chan struct{}) error { res := r.pathManager.OnReaderSetupPlay(pathReaderSetupPlayReq{ Author: r, PathName: r.pathName, IP: nil, ValidateCredentials: nil, }) - if res.Err != nil { return res.Err } @@ -283,21 +305,11 @@ func (r *hlsRemuxer) runInner(innerCtx context.Context) error { } defer r.muxer.Close() - // start request handler only after muxer has been inizialized - requestHandlerTerminate := make(chan struct{}) - requestHandlerDone := make(chan struct{}) - go r.runRequestHandler(requestHandlerTerminate, requestHandlerDone) - - defer func() { - close(requestHandlerTerminate) - <-requestHandlerDone - }() + remuxerReady <- struct{}{} r.ringBuffer = ringbuffer.New(uint64(r.readBufferCount)) - r.path.OnReaderPlay(pathReaderPlayReq{ - Author: r, - }) + r.path.OnReaderPlay(pathReaderPlayReq{Author: r}) writerDone := make(chan error) go func() { @@ -396,7 +408,7 @@ func (r *hlsRemuxer) runInner(innerCtx context.Context) error { case err := <-writerDone: return err - case <-innerCtx.Done(): + case <-remuxerCtx.Done(): r.ringBuffer.Close() <-writerDone return nil @@ -404,74 +416,62 @@ func (r *hlsRemuxer) runInner(innerCtx context.Context) error { } } -func (r *hlsRemuxer) runRequestHandler(terminate chan struct{}, done chan struct{}) { - defer close(done) +func (r *hlsRemuxer) handleRequest(req hlsRemuxerRequest) { + atomic.StoreInt64(r.lastRequestTime, time.Now().Unix()) - for { - select { - case <-terminate: + conf := r.path.Conf() + + if conf.ReadIPsParsed != nil { + tmp, _, _ := net.SplitHostPort(req.Req.RemoteAddr) + ip := net.ParseIP(tmp) + if !ipEqualOrInRange(ip, conf.ReadIPsParsed) { + r.log(logger.Info, "ERR: ip '%s' not allowed", ip) + req.W.WriteHeader(http.StatusUnauthorized) + req.Res <- nil return - - case preq := <-r.request: - req := preq - - atomic.StoreInt64(r.lastRequestTime, time.Now().Unix()) - - conf := r.path.Conf() - - if conf.ReadIPsParsed != nil { - tmp, _, _ := net.SplitHostPort(req.Req.RemoteAddr) - ip := net.ParseIP(tmp) - if !ipEqualOrInRange(ip, conf.ReadIPsParsed) { - r.log(logger.Info, "ERR: ip '%s' not allowed", ip) - req.W.WriteHeader(http.StatusUnauthorized) - req.Res <- nil - continue - } - } - - if conf.ReadUser != "" { - user, pass, ok := req.Req.BasicAuth() - if !ok || user != conf.ReadUser || pass != conf.ReadPass { - req.W.Header().Set("WWW-Authenticate", `Basic realm="rtsp-simple-server"`) - req.W.WriteHeader(http.StatusUnauthorized) - req.Res <- nil - continue - } - } - - switch { - case req.File == "stream.m3u8": - r := r.muxer.Playlist() - if r == nil { - req.W.WriteHeader(http.StatusNotFound) - req.Res <- nil - continue - } - - req.W.Header().Set("Content-Type", `application/x-mpegURL`) - req.Res <- r - - case strings.HasSuffix(req.File, ".ts"): - r := r.muxer.TSFile(req.File) - if r == nil { - req.W.WriteHeader(http.StatusNotFound) - req.Res <- nil - continue - } - - req.W.Header().Set("Content-Type", `video/MP2T`) - req.Res <- r - - case req.File == "": - req.Res <- bytes.NewReader([]byte(index)) - - default: - req.W.WriteHeader(http.StatusNotFound) - req.Res <- nil - } } } + + if conf.ReadUser != "" { + user, pass, ok := req.Req.BasicAuth() + if !ok || user != conf.ReadUser || pass != conf.ReadPass { + req.W.Header().Set("WWW-Authenticate", `Basic realm="rtsp-simple-server"`) + req.W.WriteHeader(http.StatusUnauthorized) + req.Res <- nil + return + } + } + + switch { + case req.File == "stream.m3u8": + r := r.muxer.Playlist() + if r == nil { + req.W.WriteHeader(http.StatusNotFound) + req.Res <- nil + return + } + + req.W.Header().Set("Content-Type", `application/x-mpegURL`) + req.Res <- r + + case strings.HasSuffix(req.File, ".ts"): + r := r.muxer.TSFile(req.File) + if r == nil { + req.W.WriteHeader(http.StatusNotFound) + req.Res <- nil + return + } + + req.W.Header().Set("Content-Type", `video/MP2T`) + req.Res <- r + + case req.File == "": + req.Res <- bytes.NewReader([]byte(index)) + + default: + req.W.WriteHeader(http.StatusNotFound) + req.Res <- nil + } } // OnRequest is called by hlsserver.Server (forwarded from ServeHTTP).