diff --git a/internal/core/path.go b/internal/core/path.go index d79e9062..6249babf 100644 --- a/internal/core/path.go +++ b/internal/core/path.go @@ -68,15 +68,6 @@ type pathRTSPSession interface { IsRTSPSession() } -type sourceRedirect struct{} - -// onSourceAPIDescribe implements source. -func (*sourceRedirect) onSourceAPIDescribe() interface{} { - return struct { - Type string `json:"type"` - }{"redirect"} -} - type pathReaderState int const ( @@ -224,20 +215,23 @@ type path struct { externalCmdPool *externalcmd.Pool parent pathParent - ctx context.Context - ctxCancel func() - source source - sourceReady bool - sourceStaticWg sync.WaitGroup - readers map[reader]pathReaderState - describeRequests []pathDescribeReq - setupPlayRequests []pathReaderSetupPlayReq - stream *stream - onDemandCmd *externalcmd.Cmd - onReadyCmd *externalcmd.Cmd - onDemandReadyTimer *time.Timer - onDemandCloseTimer *time.Timer - onDemandState pathOnDemandState + ctx context.Context + ctxCancel func() + source source + sourceReady bool + sourceStaticWg sync.WaitGroup + stream *stream + readers map[reader]pathReaderState + describeRequestsOnHold []pathDescribeReq + setupPlayRequestsOnHold []pathReaderSetupPlayReq + onDemandCmd *externalcmd.Cmd + onReadyCmd *externalcmd.Cmd + onDemandStaticSourceState pathOnDemandState + onDemandStaticSourceReadyTimer *time.Timer + onDemandStaticSourceCloseTimer *time.Timer + onDemandPublisherState pathOnDemandState + onDemandPublisherReadyTimer *time.Timer + onDemandPublisherCloseTimer *time.Timer // in sourceStaticSetReady chan pathSourceStaticSetReadyReq @@ -271,34 +265,36 @@ func newPath( ctx, ctxCancel := context.WithCancel(parentCtx) pa := &path{ - rtspAddress: rtspAddress, - readTimeout: readTimeout, - writeTimeout: writeTimeout, - readBufferCount: readBufferCount, - confName: confName, - conf: conf, - name: name, - matches: matches, - wg: wg, - externalCmdPool: externalCmdPool, - parent: parent, - ctx: ctx, - ctxCancel: ctxCancel, - readers: make(map[reader]pathReaderState), - onDemandReadyTimer: newEmptyTimer(), - onDemandCloseTimer: newEmptyTimer(), - sourceStaticSetReady: make(chan pathSourceStaticSetReadyReq), - sourceStaticSetNotReady: make(chan pathSourceStaticSetNotReadyReq), - describe: make(chan pathDescribeReq), - publisherRemove: make(chan pathPublisherRemoveReq), - publisherAnnounce: make(chan pathPublisherAnnounceReq), - publisherRecord: make(chan pathPublisherRecordReq), - publisherPause: make(chan pathPublisherPauseReq), - readerRemove: make(chan pathReaderRemoveReq), - readerSetupPlay: make(chan pathReaderSetupPlayReq), - readerPlay: make(chan pathReaderPlayReq), - readerPause: make(chan pathReaderPauseReq), - apiPathsList: make(chan pathAPIPathsListSubReq), + rtspAddress: rtspAddress, + readTimeout: readTimeout, + writeTimeout: writeTimeout, + readBufferCount: readBufferCount, + confName: confName, + conf: conf, + name: name, + matches: matches, + wg: wg, + externalCmdPool: externalCmdPool, + parent: parent, + ctx: ctx, + ctxCancel: ctxCancel, + readers: make(map[reader]pathReaderState), + onDemandStaticSourceReadyTimer: newEmptyTimer(), + onDemandStaticSourceCloseTimer: newEmptyTimer(), + onDemandPublisherReadyTimer: newEmptyTimer(), + onDemandPublisherCloseTimer: newEmptyTimer(), + sourceStaticSetReady: make(chan pathSourceStaticSetReadyReq), + sourceStaticSetNotReady: make(chan pathSourceStaticSetNotReadyReq), + describe: make(chan pathDescribeReq), + publisherRemove: make(chan pathPublisherRemoveReq), + publisherAnnounce: make(chan pathPublisherAnnounceReq), + publisherRecord: make(chan pathPublisherRecordReq), + publisherPause: make(chan pathPublisherPauseReq), + readerRemove: make(chan pathReaderRemoveReq), + readerSetupPlay: make(chan pathReaderSetupPlayReq), + readerPlay: make(chan pathReaderPlayReq), + readerPause: make(chan pathReaderPauseReq), + apiPathsList: make(chan pathAPIPathsListSubReq), } pa.log(logger.Debug, "created") @@ -333,6 +329,22 @@ func (pa *path) Name() string { return pa.name } +func (pa *path) hasStaticSource() bool { + return strings.HasPrefix(pa.conf.Source, "rtsp://") || + strings.HasPrefix(pa.conf.Source, "rtsps://") || + strings.HasPrefix(pa.conf.Source, "rtmp://") || + strings.HasPrefix(pa.conf.Source, "http://") || + strings.HasPrefix(pa.conf.Source, "https://") +} + +func (pa *path) hasOnDemandStaticSource() bool { + return pa.hasStaticSource() && pa.conf.SourceOnDemand +} + +func (pa *path) hasOnDemandPublisher() bool { + return pa.conf.RunOnDemand != "" +} + func (pa *path) run() { defer pa.wg.Done() @@ -358,25 +370,50 @@ func (pa *path) run() { err := func() error { for { select { - case <-pa.onDemandReadyTimer.C: - for _, req := range pa.describeRequests { + case <-pa.onDemandStaticSourceReadyTimer.C: + for _, req := range pa.describeRequestsOnHold { req.res <- pathDescribeRes{err: fmt.Errorf("source of path '%s' has timed out", pa.name)} } - pa.describeRequests = nil + pa.describeRequestsOnHold = nil - for _, req := range pa.setupPlayRequests { + for _, req := range pa.setupPlayRequestsOnHold { req.res <- pathReaderSetupPlayRes{err: fmt.Errorf("source of path '%s' has timed out", pa.name)} } - pa.setupPlayRequests = nil + pa.setupPlayRequestsOnHold = nil - pa.onDemandCloseSource() + pa.onDemandStaticSourceStop() if pa.shouldClose() { return fmt.Errorf("not in use") } - case <-pa.onDemandCloseTimer.C: - pa.onDemandCloseSource() + case <-pa.onDemandStaticSourceCloseTimer.C: + pa.sourceSetNotReady() + pa.onDemandStaticSourceStop() + + if pa.shouldClose() { + return fmt.Errorf("not in use") + } + + case <-pa.onDemandPublisherReadyTimer.C: + for _, req := range pa.describeRequestsOnHold { + req.res <- pathDescribeRes{err: fmt.Errorf("source of path '%s' has timed out", pa.name)} + } + pa.describeRequestsOnHold = nil + + for _, req := range pa.setupPlayRequestsOnHold { + req.res <- pathReaderSetupPlayRes{err: fmt.Errorf("source of path '%s' has timed out", pa.name)} + } + pa.setupPlayRequestsOnHold = nil + + pa.onDemandPublisherStop() + + if pa.shouldClose() { + return fmt.Errorf("not in use") + } + + case <-pa.onDemandPublisherCloseTimer.C: + pa.onDemandPublisherStop() if pa.shouldClose() { return fmt.Errorf("not in use") @@ -385,6 +422,30 @@ func (pa *path) run() { case req := <-pa.sourceStaticSetReady: if req.source == pa.source { pa.sourceSetReady(req.tracks) + + if pa.hasOnDemandStaticSource() { + pa.onDemandStaticSourceReadyTimer.Stop() + pa.onDemandStaticSourceReadyTimer = newEmptyTimer() + + for _, req := range pa.describeRequestsOnHold { + req.res <- pathDescribeRes{ + stream: pa.stream, + } + } + pa.describeRequestsOnHold = nil + + for _, req := range pa.setupPlayRequestsOnHold { + pa.handleReaderSetupPlayPost(req) + } + pa.setupPlayRequestsOnHold = nil + + if len(pa.readers) > 0 { + pa.onDemandStaticSourceState = pathOnDemandStateReady + } else { + pa.onDemandStaticSourceScheduleClose() + } + } + req.res <- pathSourceStaticSetReadyRes{stream: pa.stream} } else { req.res <- pathSourceStaticSetReadyRes{err: fmt.Errorf("terminated")} @@ -392,10 +453,9 @@ func (pa *path) run() { case req := <-pa.sourceStaticSetNotReady: if req.source == pa.source { - if pa.isOnDemand() && pa.onDemandState != pathOnDemandStateInitial { - pa.onDemandCloseSource() - } else { - pa.sourceSetNotReady() + pa.sourceSetNotReady() + if pa.hasOnDemandStaticSource() && pa.onDemandStaticSourceState != pathOnDemandStateInitial { + pa.onDemandStaticSourceStop() } } close(req.res) @@ -458,19 +518,21 @@ func (pa *path) run() { pa.ctxCancel() - pa.onDemandReadyTimer.Stop() - pa.onDemandCloseTimer.Stop() + pa.onDemandStaticSourceReadyTimer.Stop() + pa.onDemandStaticSourceCloseTimer.Stop() + pa.onDemandPublisherReadyTimer.Stop() + pa.onDemandPublisherCloseTimer.Stop() if onInitCmd != nil { onInitCmd.Close() pa.log(logger.Info, "runOnInit command stopped") } - for _, req := range pa.describeRequests { + for _, req := range pa.describeRequestsOnHold { req.res <- pathDescribeRes{err: fmt.Errorf("terminated")} } - for _, req := range pa.setupPlayRequests { + for _, req := range pa.setupPlayRequestsOnHold { req.res <- pathReaderSetupPlayRes{err: fmt.Errorf("terminated")} } @@ -499,20 +561,8 @@ func (pa *path) shouldClose() bool { return pa.conf.Regexp != nil && pa.source == nil && len(pa.readers) == 0 && - len(pa.describeRequests) == 0 && - len(pa.setupPlayRequests) == 0 -} - -func (pa *path) hasStaticSource() bool { - return strings.HasPrefix(pa.conf.Source, "rtsp://") || - strings.HasPrefix(pa.conf.Source, "rtsps://") || - strings.HasPrefix(pa.conf.Source, "rtmp://") || - strings.HasPrefix(pa.conf.Source, "http://") || - strings.HasPrefix(pa.conf.Source, "https://") -} - -func (pa *path) isOnDemand() bool { - return (pa.hasStaticSource() && pa.conf.SourceOnDemand) || pa.conf.RunOnDemand != "" + len(pa.describeRequestsOnHold) == 0 && + len(pa.setupPlayRequestsOnHold) == 0 } func (pa *path) externalCmdEnv() externalcmd.Environment { @@ -531,64 +581,76 @@ func (pa *path) externalCmdEnv() externalcmd.Environment { return env } -func (pa *path) onDemandStartSource() { - pa.onDemandReadyTimer.Stop() - if pa.hasStaticSource() { - pa.staticSourceCreate() - pa.onDemandReadyTimer = time.NewTimer(time.Duration(pa.conf.SourceOnDemandStartTimeout)) - } else { - pa.log(logger.Info, "runOnDemand command started") - pa.onDemandCmd = externalcmd.NewCmd( - pa.externalCmdPool, - pa.conf.RunOnDemand, - pa.conf.RunOnDemandRestart, - pa.externalCmdEnv(), - func(co int) { - pa.log(logger.Info, "runOnDemand command exited with code %d", co) - }) - pa.onDemandReadyTimer = time.NewTimer(time.Duration(pa.conf.RunOnDemandStartTimeout)) - } +func (pa *path) onDemandStaticSourceStart() { + pa.staticSourceCreate() - pa.onDemandState = pathOnDemandStateWaitingReady + pa.onDemandStaticSourceReadyTimer.Stop() + pa.onDemandStaticSourceReadyTimer = time.NewTimer(time.Duration(pa.conf.SourceOnDemandStartTimeout)) + + pa.onDemandStaticSourceState = pathOnDemandStateWaitingReady } -func (pa *path) onDemandScheduleClose() { - pa.onDemandCloseTimer.Stop() - if pa.hasStaticSource() { - pa.onDemandCloseTimer = time.NewTimer(time.Duration(pa.conf.SourceOnDemandCloseAfter)) - } else { - pa.onDemandCloseTimer = time.NewTimer(time.Duration(pa.conf.RunOnDemandCloseAfter)) - } +func (pa *path) onDemandStaticSourceScheduleClose() { + pa.onDemandStaticSourceCloseTimer.Stop() + pa.onDemandStaticSourceCloseTimer = time.NewTimer(time.Duration(pa.conf.SourceOnDemandCloseAfter)) - pa.onDemandState = pathOnDemandStateClosing + pa.onDemandStaticSourceState = pathOnDemandStateClosing } -func (pa *path) onDemandCloseSource() { - if pa.onDemandState == pathOnDemandStateClosing { - pa.onDemandCloseTimer.Stop() - pa.onDemandCloseTimer = newEmptyTimer() +func (pa *path) onDemandStaticSourceStop() { + if pa.onDemandStaticSourceState == pathOnDemandStateClosing { + pa.onDemandStaticSourceCloseTimer.Stop() + pa.onDemandStaticSourceCloseTimer = newEmptyTimer() + } + + pa.onDemandStaticSourceState = pathOnDemandStateInitial + + pa.source.(sourceStatic).close() + pa.source = nil +} + +func (pa *path) onDemandPublisherStart() { + pa.log(logger.Info, "runOnDemand command started") + pa.onDemandCmd = externalcmd.NewCmd( + pa.externalCmdPool, + pa.conf.RunOnDemand, + pa.conf.RunOnDemandRestart, + pa.externalCmdEnv(), + func(co int) { + pa.log(logger.Info, "runOnDemand command exited with code %d", co) + }) + + pa.onDemandPublisherReadyTimer.Stop() + pa.onDemandPublisherReadyTimer = time.NewTimer(time.Duration(pa.conf.RunOnDemandStartTimeout)) + + pa.onDemandPublisherState = pathOnDemandStateWaitingReady +} + +func (pa *path) onDemandPublisherScheduleClose() { + pa.onDemandPublisherCloseTimer.Stop() + pa.onDemandPublisherCloseTimer = time.NewTimer(time.Duration(pa.conf.RunOnDemandCloseAfter)) + + pa.onDemandPublisherState = pathOnDemandStateClosing +} + +func (pa *path) onDemandPublisherStop() { + if pa.onDemandPublisherState == pathOnDemandStateClosing { + pa.onDemandPublisherCloseTimer.Stop() + pa.onDemandPublisherCloseTimer = newEmptyTimer() } // set state before doPublisherRemove() - pa.onDemandState = pathOnDemandStateInitial + pa.onDemandPublisherState = pathOnDemandStateInitial - if pa.hasStaticSource() { - if pa.sourceReady { - pa.sourceSetNotReady() - } - pa.source.(sourceStatic).close() - pa.source = nil - } else { - if pa.source != nil { - pa.source.(publisher).close() - pa.doPublisherRemove() - } + if pa.source != nil { + pa.source.(publisher).close() + pa.doPublisherRemove() + } - if pa.onDemandCmd != nil { - pa.onDemandCmd.Close() - pa.onDemandCmd = nil - pa.log(logger.Info, "runOnDemand command stopped") - } + if pa.onDemandCmd != nil { + pa.onDemandCmd.Close() + pa.onDemandCmd = nil + pa.log(logger.Info, "runOnDemand command stopped") } } @@ -596,29 +658,6 @@ func (pa *path) sourceSetReady(tracks gortsplib.Tracks) { pa.sourceReady = true pa.stream = newStream(tracks) - if pa.isOnDemand() { - pa.onDemandReadyTimer.Stop() - pa.onDemandReadyTimer = newEmptyTimer() - - for _, req := range pa.describeRequests { - req.res <- pathDescribeRes{ - stream: pa.stream, - } - } - pa.describeRequests = nil - - for _, req := range pa.setupPlayRequests { - pa.handleReaderSetupPlayPost(req) - } - pa.setupPlayRequests = nil - - if len(pa.readers) > 0 { - pa.onDemandState = pathOnDemandStateReady - } else { - pa.onDemandScheduleClose() - } - } - pa.parent.onPathSourceReady(pa) if pa.conf.RunOnReady != "" { @@ -669,6 +708,7 @@ func (pa *path) staticSourceCreate() { pa.readBufferCount, &pa.sourceStaticWg, pa) + case strings.HasPrefix(pa.conf.Source, "rtmp://"): pa.source = newRTMPSource( pa.ctx, @@ -677,6 +717,7 @@ func (pa *path) staticSourceCreate() { pa.writeTimeout, &pa.sourceStaticWg, pa) + case strings.HasPrefix(pa.conf.Source, "http://") || strings.HasPrefix(pa.conf.Source, "https://"): pa.source = newHLSSource( @@ -700,8 +741,8 @@ func (pa *path) doReaderRemove(r reader) { func (pa *path) doPublisherRemove() { if pa.sourceReady { - if pa.isOnDemand() && pa.onDemandState != pathOnDemandStateInitial { - pa.onDemandCloseSource() + if pa.hasOnDemandPublisher() && pa.onDemandPublisherState != pathOnDemandStateInitial { + pa.onDemandPublisherStop() } else { pa.sourceSetNotReady() } @@ -725,11 +766,19 @@ func (pa *path) handleDescribe(req pathDescribeReq) { return } - if pa.isOnDemand() { - if pa.onDemandState == pathOnDemandStateInitial { - pa.onDemandStartSource() + if pa.hasOnDemandStaticSource() { + if pa.onDemandStaticSourceState == pathOnDemandStateInitial { + pa.onDemandStaticSourceStart() } - pa.describeRequests = append(pa.describeRequests, req) + pa.describeRequestsOnHold = append(pa.describeRequestsOnHold, req) + return + } + + if pa.hasOnDemandPublisher() { + if pa.onDemandPublisherState == pathOnDemandStateInitial { + pa.onDemandPublisherStart() + } + pa.describeRequestsOnHold = append(pa.describeRequestsOnHold, req) return } @@ -761,14 +810,14 @@ func (pa *path) handlePublisherRemove(req pathPublisherRemoveReq) { } func (pa *path) handlePublisherAnnounce(req pathPublisherAnnounceReq) { - if pa.source != nil { - if pa.conf.Source != "publisher" { - req.res <- pathPublisherAnnounceRes{ - err: fmt.Errorf("can't publish to path '%s' since 'source' is not 'publisher'", pa.name), - } - return + if pa.conf.Source != "publisher" { + req.res <- pathPublisherAnnounceRes{ + err: fmt.Errorf("can't publish to path '%s' since 'source' is not 'publisher'", pa.name), } + return + } + if pa.source != nil { if pa.conf.DisablePublisherOverride { req.res <- pathPublisherAnnounceRes{err: fmt.Errorf("someone is already publishing to path '%s'", pa.name)} return @@ -794,13 +843,36 @@ func (pa *path) handlePublisherRecord(req pathPublisherRecordReq) { pa.sourceSetReady(req.tracks) + if pa.hasOnDemandPublisher() { + pa.onDemandPublisherReadyTimer.Stop() + pa.onDemandPublisherReadyTimer = newEmptyTimer() + + for _, req := range pa.describeRequestsOnHold { + req.res <- pathDescribeRes{ + stream: pa.stream, + } + } + pa.describeRequestsOnHold = nil + + for _, req := range pa.setupPlayRequestsOnHold { + pa.handleReaderSetupPlayPost(req) + } + pa.setupPlayRequestsOnHold = nil + + if len(pa.readers) > 0 { + pa.onDemandPublisherState = pathOnDemandStateReady + } else { + pa.onDemandPublisherScheduleClose() + } + } + req.res <- pathPublisherRecordRes{stream: pa.stream} } func (pa *path) handlePublisherPause(req pathPublisherPauseReq) { if req.author == pa.source && pa.sourceReady { - if pa.isOnDemand() && pa.onDemandState != pathOnDemandStateInitial { - pa.onDemandCloseSource() + if pa.hasOnDemandPublisher() && pa.onDemandPublisherState != pathOnDemandStateInitial { + pa.onDemandPublisherStop() } else { pa.sourceSetNotReady() } @@ -814,10 +886,12 @@ func (pa *path) handleReaderRemove(req pathReaderRemoveReq) { } close(req.res) - if pa.isOnDemand() && - len(pa.readers) == 0 && - pa.onDemandState == pathOnDemandStateReady { - pa.onDemandScheduleClose() + if len(pa.readers) == 0 { + if pa.hasOnDemandStaticSource() && pa.onDemandStaticSourceState == pathOnDemandStateReady { + pa.onDemandStaticSourceScheduleClose() + } else if pa.hasOnDemandPublisher() && pa.onDemandPublisherState == pathOnDemandStateReady { + pa.onDemandPublisherScheduleClose() + } } } @@ -827,11 +901,19 @@ func (pa *path) handleReaderSetupPlay(req pathReaderSetupPlayReq) { return } - if pa.isOnDemand() { - if pa.onDemandState == pathOnDemandStateInitial { - pa.onDemandStartSource() + if pa.hasOnDemandStaticSource() { + if pa.onDemandStaticSourceState == pathOnDemandStateInitial { + pa.onDemandStaticSourceStart() } - pa.setupPlayRequests = append(pa.setupPlayRequests, req) + pa.setupPlayRequestsOnHold = append(pa.setupPlayRequestsOnHold, req) + return + } + + if pa.hasOnDemandPublisher() { + if pa.onDemandPublisherState == pathOnDemandStateInitial { + pa.onDemandPublisherStart() + } + pa.setupPlayRequestsOnHold = append(pa.setupPlayRequestsOnHold, req) return } @@ -841,10 +923,10 @@ func (pa *path) handleReaderSetupPlay(req pathReaderSetupPlayReq) { func (pa *path) handleReaderSetupPlayPost(req pathReaderSetupPlayReq) { pa.readers[req.author] = pathReaderStatePrePlay - if pa.isOnDemand() && pa.onDemandState == pathOnDemandStateClosing { - pa.onDemandState = pathOnDemandStateReady - pa.onDemandCloseTimer.Stop() - pa.onDemandCloseTimer = newEmptyTimer() + if pa.hasOnDemandPublisher() && pa.onDemandPublisherState == pathOnDemandStateClosing { + pa.onDemandPublisherState = pathOnDemandStateReady + pa.onDemandPublisherCloseTimer.Stop() + pa.onDemandPublisherCloseTimer = newEmptyTimer() } req.res <- pathReaderSetupPlayRes{ diff --git a/internal/core/source.go b/internal/core/source.go index 3b26e0bc..0549c458 100644 --- a/internal/core/source.go +++ b/internal/core/source.go @@ -1,6 +1,10 @@ package core -// source is an entity that can provide a stream, statically or dynamically. +// source is an entity that can provide a stream. +// it can be: +// - a publisher +// - a static source +// - a redirect source type source interface { onSourceAPIDescribe() interface{} } @@ -10,3 +14,13 @@ type sourceStatic interface { source close() } + +// sourceRedirect is a source that redirects to another one. +type sourceRedirect struct{} + +// onSourceAPIDescribe implements source. +func (*sourceRedirect) onSourceAPIDescribe() interface{} { + return struct { + Type string `json:"type"` + }{"redirect"} +}