hls: rename converter into remuxer

This commit is contained in:
aler9 2021-07-29 12:16:06 +02:00
parent 0a2764ed05
commit f16acb937f
2 changed files with 43 additions and 43 deletions

View file

@ -76,7 +76,7 @@ create();
</html>
`
type hlsConverterRequest struct {
type hlsRemuxerRequest struct {
Dir string
File string
Req *http.Request
@ -84,29 +84,29 @@ type hlsConverterRequest struct {
Res chan io.Reader
}
type hlsConverterTrackIDPayloadPair struct {
type hlsRemuxerTrackIDPayloadPair struct {
trackID int
buf []byte
}
type hlsConverterPathMan interface {
type hlsRemuxerPathMan interface {
OnReadPublisherSetupPlay(readPublisherSetupPlayReq)
}
type hlsConverterParent interface {
type hlsRemuxerParent interface {
Log(logger.Level, string, ...interface{})
OnConverterClose(*hlsConverter)
OnRemuxerClose(*hlsRemuxer)
}
type hlsConverter struct {
type hlsRemuxer struct {
hlsSegmentCount int
hlsSegmentDuration time.Duration
readBufferCount int
wg *sync.WaitGroup
stats *stats
pathName string
pathMan hlsConverterPathMan
parent hlsConverterParent
pathMan hlsRemuxerPathMan
parent hlsRemuxerParent
ctx context.Context
ctxCancel func()
@ -116,10 +116,10 @@ type hlsConverter struct {
muxer *hls.Muxer
// in
request chan hlsConverterRequest
request chan hlsRemuxerRequest
}
func newHLSConverter(
func newHLSRemuxer(
parentCtx context.Context,
hlsSegmentCount int,
hlsSegmentDuration time.Duration,
@ -127,11 +127,11 @@ func newHLSConverter(
wg *sync.WaitGroup,
stats *stats,
pathName string,
pathMan hlsConverterPathMan,
parent hlsConverterParent) *hlsConverter {
pathMan hlsRemuxerPathMan,
parent hlsRemuxerParent) *hlsRemuxer {
ctx, ctxCancel := context.WithCancel(parentCtx)
c := &hlsConverter{
c := &hlsRemuxer{
hlsSegmentCount: hlsSegmentCount,
hlsSegmentDuration: hlsSegmentDuration,
readBufferCount: readBufferCount,
@ -146,7 +146,7 @@ func newHLSConverter(
v := time.Now().Unix()
return &v
}(),
request: make(chan hlsConverterRequest),
request: make(chan hlsRemuxerRequest),
}
c.log(logger.Info, "opened")
@ -157,31 +157,31 @@ func newHLSConverter(
return c
}
// ParentClose closes a Converter.
func (c *hlsConverter) ParentClose() {
// ParentClose closes a Remuxer.
func (c *hlsRemuxer) ParentClose() {
c.log(logger.Info, "closed")
}
func (c *hlsConverter) Close() {
func (c *hlsRemuxer) Close() {
c.ctxCancel()
}
// IsReadPublisher implements readPublisher.
func (c *hlsConverter) IsReadPublisher() {}
func (c *hlsRemuxer) IsReadPublisher() {}
// IsSource implements source.
func (c *hlsConverter) IsSource() {}
func (c *hlsRemuxer) IsSource() {}
func (c *hlsConverter) log(level logger.Level, format string, args ...interface{}) {
c.parent.Log(level, "[converter %s] "+format, append([]interface{}{c.pathName}, args...)...)
func (c *hlsRemuxer) log(level logger.Level, format string, args ...interface{}) {
c.parent.Log(level, "[remuxer %s] "+format, append([]interface{}{c.pathName}, args...)...)
}
// PathName returns the path name of the readPublisher
func (c *hlsConverter) PathName() string {
func (c *hlsRemuxer) PathName() string {
return c.pathName
}
func (c *hlsConverter) run() {
func (c *hlsRemuxer) run() {
defer c.wg.Done()
innerCtx, innerCtxCancel := context.WithCancel(context.Background())
@ -210,10 +210,10 @@ func (c *hlsConverter) run() {
<-res
}
c.parent.OnConverterClose(c)
c.parent.OnRemuxerClose(c)
}
func (c *hlsConverter) runInner(innerCtx context.Context) error {
func (c *hlsRemuxer) runInner(innerCtx context.Context) error {
pres := make(chan readPublisherSetupPlayRes)
c.pathMan.OnReadPublisherSetupPlay(readPublisherSetupPlayReq{
Author: c,
@ -310,7 +310,7 @@ func (c *hlsConverter) runInner(innerCtx context.Context) error {
c.path.OnReadPublisherPlay(readPublisherPlayReq{c, resc}) //nolint:govet
<-resc
c.log(logger.Info, "is converting into HLS")
c.log(logger.Info, "is remuxing into HLS")
writerDone := make(chan error)
go func() {
@ -322,7 +322,7 @@ func (c *hlsConverter) runInner(innerCtx context.Context) error {
if !ok {
return fmt.Errorf("terminated")
}
pair := data.(hlsConverterTrackIDPayloadPair)
pair := data.(hlsRemuxerTrackIDPayloadPair)
if videoTrack != nil && pair.trackID == videoTrackID {
var pkt rtp.Packet
@ -417,7 +417,7 @@ func (c *hlsConverter) runInner(innerCtx context.Context) error {
}
}
func (c *hlsConverter) runRequestHandler(terminate chan struct{}, done chan struct{}) {
func (c *hlsRemuxer) runRequestHandler(terminate chan struct{}, done chan struct{}) {
defer close(done)
for {
@ -488,7 +488,7 @@ func (c *hlsConverter) runRequestHandler(terminate chan struct{}, done chan stru
}
// OnRequest is called by hlsserver.Server.
func (c *hlsConverter) OnRequest(req hlsConverterRequest) {
func (c *hlsRemuxer) OnRequest(req hlsRemuxerRequest) {
select {
case c.request <- req:
case <-c.ctx.Done():
@ -498,8 +498,8 @@ func (c *hlsConverter) OnRequest(req hlsConverterRequest) {
}
// OnFrame implements path.Reader.
func (c *hlsConverter) OnFrame(trackID int, streamType gortsplib.StreamType, payload []byte) {
func (c *hlsRemuxer) OnFrame(trackID int, streamType gortsplib.StreamType, payload []byte) {
if streamType == gortsplib.StreamTypeRTP {
c.ringBuffer.Push(hlsConverterTrackIDPayloadPair{trackID, payload})
c.ringBuffer.Push(hlsRemuxerTrackIDPayloadPair{trackID, payload})
}
}

View file

@ -30,11 +30,11 @@ type hlsServer struct {
ctxCancel func()
wg sync.WaitGroup
ln net.Listener
converters map[string]*hlsConverter
converters map[string]*hlsRemuxer
// in
request chan hlsConverterRequest
connClose chan *hlsConverter
request chan hlsRemuxerRequest
connClose chan *hlsRemuxer
}
func newHLSServer(
@ -66,9 +66,9 @@ func newHLSServer(
ctx: ctx,
ctxCancel: ctxCancel,
ln: ln,
converters: make(map[string]*hlsConverter),
request: make(chan hlsConverterRequest),
connClose: make(chan *hlsConverter),
converters: make(map[string]*hlsRemuxer),
request: make(chan hlsRemuxerRequest),
connClose: make(chan *hlsRemuxer),
}
s.Log(logger.Info, "listener opened on "+address)
@ -101,7 +101,7 @@ outer:
case req := <-s.request:
c, ok := s.converters[req.Dir]
if !ok {
c = newHLSConverter(
c = newHLSRemuxer(
s.ctx,
s.hlsSegmentCount,
s.hlsSegmentDuration,
@ -119,7 +119,7 @@ outer:
if c2, ok := s.converters[c.PathName()]; !ok || c2 != c {
continue
}
s.doConverterClose(c)
s.doRemuxerClose(c)
case <-s.ctx.Done():
break outer
@ -129,7 +129,7 @@ outer:
s.ctxCancel()
for _, c := range s.converters {
s.doConverterClose(c)
s.doRemuxerClose(c)
}
hs.Shutdown(context.Background())
@ -173,7 +173,7 @@ func (s *hlsServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
dir = strings.TrimSuffix(dir, "/")
cres := make(chan io.Reader)
hreq := hlsConverterRequest{
hreq := hlsRemuxerRequest{
Dir: dir,
File: fname,
Req: r,
@ -206,12 +206,12 @@ func (s *hlsServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
}
func (s *hlsServer) doConverterClose(c *hlsConverter) {
func (s *hlsServer) doRemuxerClose(c *hlsRemuxer) {
delete(s.converters, c.PathName())
c.ParentClose()
}
func (s *hlsServer) OnConverterClose(c *hlsConverter) {
func (s *hlsServer) OnRemuxerClose(c *hlsRemuxer) {
select {
case s.connClose <- c:
case <-s.ctx.Done():