From 6564c3511beabcaa41a6c0caccdc4eb2fe568e41 Mon Sep 17 00:00:00 2001 From: aler9 <46489434+aler9@users.noreply.github.com> Date: Fri, 5 Nov 2021 16:55:00 +0100 Subject: [PATCH] hls: change Muxer letter --- internal/core/hls_muxer.go | 126 ++++++++++++++++++------------------- 1 file changed, 63 insertions(+), 63 deletions(-) diff --git a/internal/core/hls_muxer.go b/internal/core/hls_muxer.go index 77303584..1aa47ee7 100644 --- a/internal/core/hls_muxer.go +++ b/internal/core/hls_muxer.go @@ -155,7 +155,7 @@ func newHLSMuxer( parent hlsMuxerParent) *hlsMuxer { ctx, ctxCancel := context.WithCancel(parentCtx) - r := &hlsMuxer{ + m := &hlsMuxer{ hlsAlwaysRemux: hlsAlwaysRemux, hlsSegmentCount: hlsSegmentCount, hlsSegmentDuration: hlsSegmentDuration, @@ -173,35 +173,35 @@ func newHLSMuxer( request: make(chan hlsMuxerRequest), } - r.log(logger.Info, "opened") + m.log(logger.Info, "opened") - r.wg.Add(1) - go r.run() + m.wg.Add(1) + go m.run() - return r + return m } -func (r *hlsMuxer) close() { - r.ctxCancel() +func (m *hlsMuxer) close() { + m.ctxCancel() } -func (r *hlsMuxer) log(level logger.Level, format string, args ...interface{}) { - r.parent.log(level, "[muxer %s] "+format, append([]interface{}{r.pathName}, args...)...) +func (m *hlsMuxer) log(level logger.Level, format string, args ...interface{}) { + m.parent.log(level, "[muxer %s] "+format, append([]interface{}{m.pathName}, args...)...) } // PathName returns the path name. -func (r *hlsMuxer) PathName() string { - return r.pathName +func (m *hlsMuxer) PathName() string { + return m.pathName } -func (r *hlsMuxer) run() { - defer r.wg.Done() +func (m *hlsMuxer) run() { + defer m.wg.Done() innerCtx, innerCtxCancel := context.WithCancel(context.Background()) innerReady := make(chan struct{}) innerErr := make(chan error) go func() { - innerErr <- r.runInner(innerCtx, innerReady) + innerErr <- m.runInner(innerCtx, innerReady) }() isReady := false @@ -209,24 +209,24 @@ func (r *hlsMuxer) run() { err := func() error { for { select { - case <-r.ctx.Done(): + case <-m.ctx.Done(): innerCtxCancel() <-innerErr return errors.New("terminated") - case req := <-r.request: + case req := <-m.request: if isReady { - req.Res <- r.handleRequest(req) + req.Res <- m.handleRequest(req) } else { - r.requests = append(r.requests, req) + m.requests = append(m.requests, req) } case <-innerReady: isReady = true - for _, req := range r.requests { - req.Res <- r.handleRequest(req) + for _, req := range m.requests { + req.Res <- m.handleRequest(req) } - r.requests = nil + m.requests = nil case err := <-innerErr: innerCtxCancel() @@ -235,21 +235,21 @@ func (r *hlsMuxer) run() { } }() - r.ctxCancel() + m.ctxCancel() - for _, req := range r.requests { + for _, req := range m.requests { req.Res <- hlsMuxerResponse{Status: http.StatusNotFound} } - r.parent.onMuxerClose(r) + m.parent.onMuxerClose(m) - r.log(logger.Info, "closed (%v)", err) + m.log(logger.Info, "closed (%v)", err) } -func (r *hlsMuxer) runInner(innerCtx context.Context, innerReady chan struct{}) error { - res := r.pathManager.onReaderSetupPlay(pathReaderSetupPlayReq{ - Author: r, - PathName: r.pathName, +func (m *hlsMuxer) runInner(innerCtx context.Context, innerReady chan struct{}) error { + res := m.pathManager.onReaderSetupPlay(pathReaderSetupPlayReq{ + Author: m, + PathName: m.pathName, IP: nil, ValidateCredentials: nil, }) @@ -257,10 +257,10 @@ func (r *hlsMuxer) runInner(innerCtx context.Context, innerReady chan struct{}) return res.Err } - r.path = res.Path + m.path = res.Path defer func() { - r.path.onReaderRemove(pathReaderRemoveReq{Author: r}) + m.path.onReaderRemove(pathReaderRemoveReq{Author: m}) }() var videoTrack *gortsplib.Track @@ -302,28 +302,28 @@ func (r *hlsMuxer) runInner(innerCtx context.Context, innerReady chan struct{}) } var err error - r.muxer, err = hls.NewMuxer( - r.hlsSegmentCount, - time.Duration(r.hlsSegmentDuration), + m.muxer, err = hls.NewMuxer( + m.hlsSegmentCount, + time.Duration(m.hlsSegmentDuration), videoTrack, audioTrack, ) if err != nil { return err } - defer r.muxer.Close() + defer m.muxer.Close() innerReady <- struct{}{} - r.ringBuffer = ringbuffer.New(uint64(r.readBufferCount)) + m.ringBuffer = ringbuffer.New(uint64(m.readBufferCount)) - r.path.onReaderPlay(pathReaderPlayReq{Author: r}) + m.path.onReaderPlay(pathReaderPlayReq{Author: m}) writerDone := make(chan error) go func() { writerDone <- func() error { for { - data, ok := r.ringBuffer.Pull() + data, ok := m.ringBuffer.Pull() if !ok { return fmt.Errorf("terminated") } @@ -333,7 +333,7 @@ func (r *hlsMuxer) runInner(innerCtx context.Context, innerReady chan struct{}) var pkt rtp.Packet err := pkt.Unmarshal(pair.buf) if err != nil { - r.log(logger.Warn, "unable to decode RTP packet: %v", err) + m.log(logger.Warn, "unable to decode RTP packet: %v", err) continue } @@ -341,12 +341,12 @@ func (r *hlsMuxer) runInner(innerCtx context.Context, innerReady chan struct{}) if err != nil { if err != rtph264.ErrMorePacketsNeeded && err != rtph264.ErrNonStartingPacketAndNoPrevious { - r.log(logger.Warn, "unable to decode video track: %v", err) + m.log(logger.Warn, "unable to decode video track: %v", err) } continue } - err = r.muxer.WriteH264(pts, nalus) + err = m.muxer.WriteH264(pts, nalus) if err != nil { return err } @@ -354,19 +354,19 @@ func (r *hlsMuxer) runInner(innerCtx context.Context, innerReady chan struct{}) var pkt rtp.Packet err := pkt.Unmarshal(pair.buf) if err != nil { - r.log(logger.Warn, "unable to decode RTP packet: %v", err) + m.log(logger.Warn, "unable to decode RTP packet: %v", err) continue } aus, pts, err := aacDecoder.Decode(&pkt) if err != nil { if err != rtpaac.ErrMorePacketsNeeded { - r.log(logger.Warn, "unable to decode audio track: %v", err) + m.log(logger.Warn, "unable to decode audio track: %v", err) } continue } - err = r.muxer.WriteAAC(pts, aus) + err = m.muxer.WriteAAC(pts, aus) if err != nil { return err } @@ -381,9 +381,9 @@ func (r *hlsMuxer) runInner(innerCtx context.Context, innerReady chan struct{}) for { select { case <-closeCheckTicker.C: - t := time.Unix(atomic.LoadInt64(r.lastRequestTime), 0) - if !r.hlsAlwaysRemux && time.Since(t) >= closeAfterInactivity { - r.ringBuffer.Close() + t := time.Unix(atomic.LoadInt64(m.lastRequestTime), 0) + if !m.hlsAlwaysRemux && time.Since(t) >= closeAfterInactivity { + m.ringBuffer.Close() <-writerDone return nil } @@ -392,23 +392,23 @@ func (r *hlsMuxer) runInner(innerCtx context.Context, innerReady chan struct{}) return err case <-innerCtx.Done(): - r.ringBuffer.Close() + m.ringBuffer.Close() <-writerDone return nil } } } -func (r *hlsMuxer) handleRequest(req hlsMuxerRequest) hlsMuxerResponse { - atomic.StoreInt64(r.lastRequestTime, time.Now().Unix()) +func (m *hlsMuxer) handleRequest(req hlsMuxerRequest) hlsMuxerResponse { + atomic.StoreInt64(m.lastRequestTime, time.Now().Unix()) - conf := r.path.Conf() + conf := m.path.Conf() if conf.ReadIPs != nil { tmp, _, _ := net.SplitHostPort(req.Req.RemoteAddr) ip := net.ParseIP(tmp) if !ipEqualOrInRange(ip, conf.ReadIPs) { - r.log(logger.Info, "ip '%s' not allowed", ip) + m.log(logger.Info, "ip '%s' not allowed", ip) return hlsMuxerResponse{Status: http.StatusUnauthorized} } } @@ -432,7 +432,7 @@ func (r *hlsMuxer) handleRequest(req hlsMuxerRequest) hlsMuxerResponse { Header: map[string]string{ "Content-Type": `application/x-mpegURL`, }, - Body: r.muxer.PrimaryPlaylist(), + Body: m.muxer.PrimaryPlaylist(), } case req.File == "stream.m3u8": @@ -441,11 +441,11 @@ func (r *hlsMuxer) handleRequest(req hlsMuxerRequest) hlsMuxerResponse { Header: map[string]string{ "Content-Type": `application/x-mpegURL`, }, - Body: r.muxer.StreamPlaylist(), + Body: m.muxer.StreamPlaylist(), } case strings.HasSuffix(req.File, ".ts"): - r := r.muxer.Segment(req.File) + r := m.muxer.Segment(req.File) if r == nil { return hlsMuxerResponse{Status: http.StatusNotFound} } @@ -473,28 +473,28 @@ func (r *hlsMuxer) handleRequest(req hlsMuxerRequest) hlsMuxerResponse { } // onRequest is called by hlsserver.Server (forwarded from ServeHTTP). -func (r *hlsMuxer) onRequest(req hlsMuxerRequest) { +func (m *hlsMuxer) onRequest(req hlsMuxerRequest) { select { - case r.request <- req: - case <-r.ctx.Done(): + case m.request <- req: + case <-m.ctx.Done(): req.Res <- hlsMuxerResponse{Status: http.StatusNotFound} } } // onReaderAccepted implements reader. -func (r *hlsMuxer) onReaderAccepted() { - r.log(logger.Info, "is converting into HLS") +func (m *hlsMuxer) onReaderAccepted() { + m.log(logger.Info, "is converting into HLS") } // onReaderFrame implements reader. -func (r *hlsMuxer) onReaderFrame(trackID int, streamType gortsplib.StreamType, payload []byte) { +func (m *hlsMuxer) onReaderFrame(trackID int, streamType gortsplib.StreamType, payload []byte) { if streamType == gortsplib.StreamTypeRTP { - r.ringBuffer.Push(hlsMuxerTrackIDPayloadPair{trackID, payload}) + m.ringBuffer.Push(hlsMuxerTrackIDPayloadPair{trackID, payload}) } } // onReaderAPIDescribe implements reader. -func (r *hlsMuxer) onReaderAPIDescribe() interface{} { +func (m *hlsMuxer) onReaderAPIDescribe() interface{} { return struct { Type string `json:"type"` }{"hlsMuxer"}