hls: change Muxer letter

This commit is contained in:
aler9 2021-11-05 16:55:00 +01:00
parent e4ba689bfd
commit 6564c3511b

View file

@ -155,7 +155,7 @@ func newHLSMuxer(
parent hlsMuxerParent) *hlsMuxer {
ctx, ctxCancel := context.WithCancel(parentCtx)
r := &hlsMuxer{
m := &hlsMuxer{
hlsAlwaysRemux: hlsAlwaysRemux,
hlsSegmentCount: hlsSegmentCount,
hlsSegmentDuration: hlsSegmentDuration,
@ -173,35 +173,35 @@ func newHLSMuxer(
request: make(chan hlsMuxerRequest),
}
r.log(logger.Info, "opened")
m.log(logger.Info, "opened")
r.wg.Add(1)
go r.run()
m.wg.Add(1)
go m.run()
return r
return m
}
func (r *hlsMuxer) close() {
r.ctxCancel()
func (m *hlsMuxer) close() {
m.ctxCancel()
}
func (r *hlsMuxer) log(level logger.Level, format string, args ...interface{}) {
r.parent.log(level, "[muxer %s] "+format, append([]interface{}{r.pathName}, args...)...)
func (m *hlsMuxer) log(level logger.Level, format string, args ...interface{}) {
m.parent.log(level, "[muxer %s] "+format, append([]interface{}{m.pathName}, args...)...)
}
// PathName returns the path name.
func (r *hlsMuxer) PathName() string {
return r.pathName
func (m *hlsMuxer) PathName() string {
return m.pathName
}
func (r *hlsMuxer) run() {
defer r.wg.Done()
func (m *hlsMuxer) run() {
defer m.wg.Done()
innerCtx, innerCtxCancel := context.WithCancel(context.Background())
innerReady := make(chan struct{})
innerErr := make(chan error)
go func() {
innerErr <- r.runInner(innerCtx, innerReady)
innerErr <- m.runInner(innerCtx, innerReady)
}()
isReady := false
@ -209,24 +209,24 @@ func (r *hlsMuxer) run() {
err := func() error {
for {
select {
case <-r.ctx.Done():
case <-m.ctx.Done():
innerCtxCancel()
<-innerErr
return errors.New("terminated")
case req := <-r.request:
case req := <-m.request:
if isReady {
req.Res <- r.handleRequest(req)
req.Res <- m.handleRequest(req)
} else {
r.requests = append(r.requests, req)
m.requests = append(m.requests, req)
}
case <-innerReady:
isReady = true
for _, req := range r.requests {
req.Res <- r.handleRequest(req)
for _, req := range m.requests {
req.Res <- m.handleRequest(req)
}
r.requests = nil
m.requests = nil
case err := <-innerErr:
innerCtxCancel()
@ -235,21 +235,21 @@ func (r *hlsMuxer) run() {
}
}()
r.ctxCancel()
m.ctxCancel()
for _, req := range r.requests {
for _, req := range m.requests {
req.Res <- hlsMuxerResponse{Status: http.StatusNotFound}
}
r.parent.onMuxerClose(r)
m.parent.onMuxerClose(m)
r.log(logger.Info, "closed (%v)", err)
m.log(logger.Info, "closed (%v)", err)
}
func (r *hlsMuxer) runInner(innerCtx context.Context, innerReady chan struct{}) error {
res := r.pathManager.onReaderSetupPlay(pathReaderSetupPlayReq{
Author: r,
PathName: r.pathName,
func (m *hlsMuxer) runInner(innerCtx context.Context, innerReady chan struct{}) error {
res := m.pathManager.onReaderSetupPlay(pathReaderSetupPlayReq{
Author: m,
PathName: m.pathName,
IP: nil,
ValidateCredentials: nil,
})
@ -257,10 +257,10 @@ func (r *hlsMuxer) runInner(innerCtx context.Context, innerReady chan struct{})
return res.Err
}
r.path = res.Path
m.path = res.Path
defer func() {
r.path.onReaderRemove(pathReaderRemoveReq{Author: r})
m.path.onReaderRemove(pathReaderRemoveReq{Author: m})
}()
var videoTrack *gortsplib.Track
@ -302,28 +302,28 @@ func (r *hlsMuxer) runInner(innerCtx context.Context, innerReady chan struct{})
}
var err error
r.muxer, err = hls.NewMuxer(
r.hlsSegmentCount,
time.Duration(r.hlsSegmentDuration),
m.muxer, err = hls.NewMuxer(
m.hlsSegmentCount,
time.Duration(m.hlsSegmentDuration),
videoTrack,
audioTrack,
)
if err != nil {
return err
}
defer r.muxer.Close()
defer m.muxer.Close()
innerReady <- struct{}{}
r.ringBuffer = ringbuffer.New(uint64(r.readBufferCount))
m.ringBuffer = ringbuffer.New(uint64(m.readBufferCount))
r.path.onReaderPlay(pathReaderPlayReq{Author: r})
m.path.onReaderPlay(pathReaderPlayReq{Author: m})
writerDone := make(chan error)
go func() {
writerDone <- func() error {
for {
data, ok := r.ringBuffer.Pull()
data, ok := m.ringBuffer.Pull()
if !ok {
return fmt.Errorf("terminated")
}
@ -333,7 +333,7 @@ func (r *hlsMuxer) runInner(innerCtx context.Context, innerReady chan struct{})
var pkt rtp.Packet
err := pkt.Unmarshal(pair.buf)
if err != nil {
r.log(logger.Warn, "unable to decode RTP packet: %v", err)
m.log(logger.Warn, "unable to decode RTP packet: %v", err)
continue
}
@ -341,12 +341,12 @@ func (r *hlsMuxer) runInner(innerCtx context.Context, innerReady chan struct{})
if err != nil {
if err != rtph264.ErrMorePacketsNeeded &&
err != rtph264.ErrNonStartingPacketAndNoPrevious {
r.log(logger.Warn, "unable to decode video track: %v", err)
m.log(logger.Warn, "unable to decode video track: %v", err)
}
continue
}
err = r.muxer.WriteH264(pts, nalus)
err = m.muxer.WriteH264(pts, nalus)
if err != nil {
return err
}
@ -354,19 +354,19 @@ func (r *hlsMuxer) runInner(innerCtx context.Context, innerReady chan struct{})
var pkt rtp.Packet
err := pkt.Unmarshal(pair.buf)
if err != nil {
r.log(logger.Warn, "unable to decode RTP packet: %v", err)
m.log(logger.Warn, "unable to decode RTP packet: %v", err)
continue
}
aus, pts, err := aacDecoder.Decode(&pkt)
if err != nil {
if err != rtpaac.ErrMorePacketsNeeded {
r.log(logger.Warn, "unable to decode audio track: %v", err)
m.log(logger.Warn, "unable to decode audio track: %v", err)
}
continue
}
err = r.muxer.WriteAAC(pts, aus)
err = m.muxer.WriteAAC(pts, aus)
if err != nil {
return err
}
@ -381,9 +381,9 @@ func (r *hlsMuxer) runInner(innerCtx context.Context, innerReady chan struct{})
for {
select {
case <-closeCheckTicker.C:
t := time.Unix(atomic.LoadInt64(r.lastRequestTime), 0)
if !r.hlsAlwaysRemux && time.Since(t) >= closeAfterInactivity {
r.ringBuffer.Close()
t := time.Unix(atomic.LoadInt64(m.lastRequestTime), 0)
if !m.hlsAlwaysRemux && time.Since(t) >= closeAfterInactivity {
m.ringBuffer.Close()
<-writerDone
return nil
}
@ -392,23 +392,23 @@ func (r *hlsMuxer) runInner(innerCtx context.Context, innerReady chan struct{})
return err
case <-innerCtx.Done():
r.ringBuffer.Close()
m.ringBuffer.Close()
<-writerDone
return nil
}
}
}
func (r *hlsMuxer) handleRequest(req hlsMuxerRequest) hlsMuxerResponse {
atomic.StoreInt64(r.lastRequestTime, time.Now().Unix())
func (m *hlsMuxer) handleRequest(req hlsMuxerRequest) hlsMuxerResponse {
atomic.StoreInt64(m.lastRequestTime, time.Now().Unix())
conf := r.path.Conf()
conf := m.path.Conf()
if conf.ReadIPs != nil {
tmp, _, _ := net.SplitHostPort(req.Req.RemoteAddr)
ip := net.ParseIP(tmp)
if !ipEqualOrInRange(ip, conf.ReadIPs) {
r.log(logger.Info, "ip '%s' not allowed", ip)
m.log(logger.Info, "ip '%s' not allowed", ip)
return hlsMuxerResponse{Status: http.StatusUnauthorized}
}
}
@ -432,7 +432,7 @@ func (r *hlsMuxer) handleRequest(req hlsMuxerRequest) hlsMuxerResponse {
Header: map[string]string{
"Content-Type": `application/x-mpegURL`,
},
Body: r.muxer.PrimaryPlaylist(),
Body: m.muxer.PrimaryPlaylist(),
}
case req.File == "stream.m3u8":
@ -441,11 +441,11 @@ func (r *hlsMuxer) handleRequest(req hlsMuxerRequest) hlsMuxerResponse {
Header: map[string]string{
"Content-Type": `application/x-mpegURL`,
},
Body: r.muxer.StreamPlaylist(),
Body: m.muxer.StreamPlaylist(),
}
case strings.HasSuffix(req.File, ".ts"):
r := r.muxer.Segment(req.File)
r := m.muxer.Segment(req.File)
if r == nil {
return hlsMuxerResponse{Status: http.StatusNotFound}
}
@ -473,28 +473,28 @@ func (r *hlsMuxer) handleRequest(req hlsMuxerRequest) hlsMuxerResponse {
}
// onRequest is called by hlsserver.Server (forwarded from ServeHTTP).
func (r *hlsMuxer) onRequest(req hlsMuxerRequest) {
func (m *hlsMuxer) onRequest(req hlsMuxerRequest) {
select {
case r.request <- req:
case <-r.ctx.Done():
case m.request <- req:
case <-m.ctx.Done():
req.Res <- hlsMuxerResponse{Status: http.StatusNotFound}
}
}
// onReaderAccepted implements reader.
func (r *hlsMuxer) onReaderAccepted() {
r.log(logger.Info, "is converting into HLS")
func (m *hlsMuxer) onReaderAccepted() {
m.log(logger.Info, "is converting into HLS")
}
// onReaderFrame implements reader.
func (r *hlsMuxer) onReaderFrame(trackID int, streamType gortsplib.StreamType, payload []byte) {
func (m *hlsMuxer) onReaderFrame(trackID int, streamType gortsplib.StreamType, payload []byte) {
if streamType == gortsplib.StreamTypeRTP {
r.ringBuffer.Push(hlsMuxerTrackIDPayloadPair{trackID, payload})
m.ringBuffer.Push(hlsMuxerTrackIDPayloadPair{trackID, payload})
}
}
// onReaderAPIDescribe implements reader.
func (r *hlsMuxer) onReaderAPIDescribe() interface{} {
func (m *hlsMuxer) onReaderAPIDescribe() interface{} {
return struct {
Type string `json:"type"`
}{"hlsMuxer"}