forked from External/mediamtx
split handling of on-demand sources and on-demand publishers
This commit is contained in:
parent
98b3538289
commit
58e3fa358e
2 changed files with 274 additions and 178 deletions
|
|
@ -68,15 +68,6 @@ type pathRTSPSession interface {
|
|||
IsRTSPSession()
|
||||
}
|
||||
|
||||
type sourceRedirect struct{}
|
||||
|
||||
// onSourceAPIDescribe implements source.
|
||||
func (*sourceRedirect) onSourceAPIDescribe() interface{} {
|
||||
return struct {
|
||||
Type string `json:"type"`
|
||||
}{"redirect"}
|
||||
}
|
||||
|
||||
type pathReaderState int
|
||||
|
||||
const (
|
||||
|
|
@ -224,20 +215,23 @@ type path struct {
|
|||
externalCmdPool *externalcmd.Pool
|
||||
parent pathParent
|
||||
|
||||
ctx context.Context
|
||||
ctxCancel func()
|
||||
source source
|
||||
sourceReady bool
|
||||
sourceStaticWg sync.WaitGroup
|
||||
readers map[reader]pathReaderState
|
||||
describeRequests []pathDescribeReq
|
||||
setupPlayRequests []pathReaderSetupPlayReq
|
||||
stream *stream
|
||||
onDemandCmd *externalcmd.Cmd
|
||||
onReadyCmd *externalcmd.Cmd
|
||||
onDemandReadyTimer *time.Timer
|
||||
onDemandCloseTimer *time.Timer
|
||||
onDemandState pathOnDemandState
|
||||
ctx context.Context
|
||||
ctxCancel func()
|
||||
source source
|
||||
sourceReady bool
|
||||
sourceStaticWg sync.WaitGroup
|
||||
stream *stream
|
||||
readers map[reader]pathReaderState
|
||||
describeRequestsOnHold []pathDescribeReq
|
||||
setupPlayRequestsOnHold []pathReaderSetupPlayReq
|
||||
onDemandCmd *externalcmd.Cmd
|
||||
onReadyCmd *externalcmd.Cmd
|
||||
onDemandStaticSourceState pathOnDemandState
|
||||
onDemandStaticSourceReadyTimer *time.Timer
|
||||
onDemandStaticSourceCloseTimer *time.Timer
|
||||
onDemandPublisherState pathOnDemandState
|
||||
onDemandPublisherReadyTimer *time.Timer
|
||||
onDemandPublisherCloseTimer *time.Timer
|
||||
|
||||
// in
|
||||
sourceStaticSetReady chan pathSourceStaticSetReadyReq
|
||||
|
|
@ -271,34 +265,36 @@ func newPath(
|
|||
ctx, ctxCancel := context.WithCancel(parentCtx)
|
||||
|
||||
pa := &path{
|
||||
rtspAddress: rtspAddress,
|
||||
readTimeout: readTimeout,
|
||||
writeTimeout: writeTimeout,
|
||||
readBufferCount: readBufferCount,
|
||||
confName: confName,
|
||||
conf: conf,
|
||||
name: name,
|
||||
matches: matches,
|
||||
wg: wg,
|
||||
externalCmdPool: externalCmdPool,
|
||||
parent: parent,
|
||||
ctx: ctx,
|
||||
ctxCancel: ctxCancel,
|
||||
readers: make(map[reader]pathReaderState),
|
||||
onDemandReadyTimer: newEmptyTimer(),
|
||||
onDemandCloseTimer: newEmptyTimer(),
|
||||
sourceStaticSetReady: make(chan pathSourceStaticSetReadyReq),
|
||||
sourceStaticSetNotReady: make(chan pathSourceStaticSetNotReadyReq),
|
||||
describe: make(chan pathDescribeReq),
|
||||
publisherRemove: make(chan pathPublisherRemoveReq),
|
||||
publisherAnnounce: make(chan pathPublisherAnnounceReq),
|
||||
publisherRecord: make(chan pathPublisherRecordReq),
|
||||
publisherPause: make(chan pathPublisherPauseReq),
|
||||
readerRemove: make(chan pathReaderRemoveReq),
|
||||
readerSetupPlay: make(chan pathReaderSetupPlayReq),
|
||||
readerPlay: make(chan pathReaderPlayReq),
|
||||
readerPause: make(chan pathReaderPauseReq),
|
||||
apiPathsList: make(chan pathAPIPathsListSubReq),
|
||||
rtspAddress: rtspAddress,
|
||||
readTimeout: readTimeout,
|
||||
writeTimeout: writeTimeout,
|
||||
readBufferCount: readBufferCount,
|
||||
confName: confName,
|
||||
conf: conf,
|
||||
name: name,
|
||||
matches: matches,
|
||||
wg: wg,
|
||||
externalCmdPool: externalCmdPool,
|
||||
parent: parent,
|
||||
ctx: ctx,
|
||||
ctxCancel: ctxCancel,
|
||||
readers: make(map[reader]pathReaderState),
|
||||
onDemandStaticSourceReadyTimer: newEmptyTimer(),
|
||||
onDemandStaticSourceCloseTimer: newEmptyTimer(),
|
||||
onDemandPublisherReadyTimer: newEmptyTimer(),
|
||||
onDemandPublisherCloseTimer: newEmptyTimer(),
|
||||
sourceStaticSetReady: make(chan pathSourceStaticSetReadyReq),
|
||||
sourceStaticSetNotReady: make(chan pathSourceStaticSetNotReadyReq),
|
||||
describe: make(chan pathDescribeReq),
|
||||
publisherRemove: make(chan pathPublisherRemoveReq),
|
||||
publisherAnnounce: make(chan pathPublisherAnnounceReq),
|
||||
publisherRecord: make(chan pathPublisherRecordReq),
|
||||
publisherPause: make(chan pathPublisherPauseReq),
|
||||
readerRemove: make(chan pathReaderRemoveReq),
|
||||
readerSetupPlay: make(chan pathReaderSetupPlayReq),
|
||||
readerPlay: make(chan pathReaderPlayReq),
|
||||
readerPause: make(chan pathReaderPauseReq),
|
||||
apiPathsList: make(chan pathAPIPathsListSubReq),
|
||||
}
|
||||
|
||||
pa.log(logger.Debug, "created")
|
||||
|
|
@ -333,6 +329,22 @@ func (pa *path) Name() string {
|
|||
return pa.name
|
||||
}
|
||||
|
||||
func (pa *path) hasStaticSource() bool {
|
||||
return strings.HasPrefix(pa.conf.Source, "rtsp://") ||
|
||||
strings.HasPrefix(pa.conf.Source, "rtsps://") ||
|
||||
strings.HasPrefix(pa.conf.Source, "rtmp://") ||
|
||||
strings.HasPrefix(pa.conf.Source, "http://") ||
|
||||
strings.HasPrefix(pa.conf.Source, "https://")
|
||||
}
|
||||
|
||||
func (pa *path) hasOnDemandStaticSource() bool {
|
||||
return pa.hasStaticSource() && pa.conf.SourceOnDemand
|
||||
}
|
||||
|
||||
func (pa *path) hasOnDemandPublisher() bool {
|
||||
return pa.conf.RunOnDemand != ""
|
||||
}
|
||||
|
||||
func (pa *path) run() {
|
||||
defer pa.wg.Done()
|
||||
|
||||
|
|
@ -358,25 +370,50 @@ func (pa *path) run() {
|
|||
err := func() error {
|
||||
for {
|
||||
select {
|
||||
case <-pa.onDemandReadyTimer.C:
|
||||
for _, req := range pa.describeRequests {
|
||||
case <-pa.onDemandStaticSourceReadyTimer.C:
|
||||
for _, req := range pa.describeRequestsOnHold {
|
||||
req.res <- pathDescribeRes{err: fmt.Errorf("source of path '%s' has timed out", pa.name)}
|
||||
}
|
||||
pa.describeRequests = nil
|
||||
pa.describeRequestsOnHold = nil
|
||||
|
||||
for _, req := range pa.setupPlayRequests {
|
||||
for _, req := range pa.setupPlayRequestsOnHold {
|
||||
req.res <- pathReaderSetupPlayRes{err: fmt.Errorf("source of path '%s' has timed out", pa.name)}
|
||||
}
|
||||
pa.setupPlayRequests = nil
|
||||
pa.setupPlayRequestsOnHold = nil
|
||||
|
||||
pa.onDemandCloseSource()
|
||||
pa.onDemandStaticSourceStop()
|
||||
|
||||
if pa.shouldClose() {
|
||||
return fmt.Errorf("not in use")
|
||||
}
|
||||
|
||||
case <-pa.onDemandCloseTimer.C:
|
||||
pa.onDemandCloseSource()
|
||||
case <-pa.onDemandStaticSourceCloseTimer.C:
|
||||
pa.sourceSetNotReady()
|
||||
pa.onDemandStaticSourceStop()
|
||||
|
||||
if pa.shouldClose() {
|
||||
return fmt.Errorf("not in use")
|
||||
}
|
||||
|
||||
case <-pa.onDemandPublisherReadyTimer.C:
|
||||
for _, req := range pa.describeRequestsOnHold {
|
||||
req.res <- pathDescribeRes{err: fmt.Errorf("source of path '%s' has timed out", pa.name)}
|
||||
}
|
||||
pa.describeRequestsOnHold = nil
|
||||
|
||||
for _, req := range pa.setupPlayRequestsOnHold {
|
||||
req.res <- pathReaderSetupPlayRes{err: fmt.Errorf("source of path '%s' has timed out", pa.name)}
|
||||
}
|
||||
pa.setupPlayRequestsOnHold = nil
|
||||
|
||||
pa.onDemandPublisherStop()
|
||||
|
||||
if pa.shouldClose() {
|
||||
return fmt.Errorf("not in use")
|
||||
}
|
||||
|
||||
case <-pa.onDemandPublisherCloseTimer.C:
|
||||
pa.onDemandPublisherStop()
|
||||
|
||||
if pa.shouldClose() {
|
||||
return fmt.Errorf("not in use")
|
||||
|
|
@ -385,6 +422,30 @@ func (pa *path) run() {
|
|||
case req := <-pa.sourceStaticSetReady:
|
||||
if req.source == pa.source {
|
||||
pa.sourceSetReady(req.tracks)
|
||||
|
||||
if pa.hasOnDemandStaticSource() {
|
||||
pa.onDemandStaticSourceReadyTimer.Stop()
|
||||
pa.onDemandStaticSourceReadyTimer = newEmptyTimer()
|
||||
|
||||
for _, req := range pa.describeRequestsOnHold {
|
||||
req.res <- pathDescribeRes{
|
||||
stream: pa.stream,
|
||||
}
|
||||
}
|
||||
pa.describeRequestsOnHold = nil
|
||||
|
||||
for _, req := range pa.setupPlayRequestsOnHold {
|
||||
pa.handleReaderSetupPlayPost(req)
|
||||
}
|
||||
pa.setupPlayRequestsOnHold = nil
|
||||
|
||||
if len(pa.readers) > 0 {
|
||||
pa.onDemandStaticSourceState = pathOnDemandStateReady
|
||||
} else {
|
||||
pa.onDemandStaticSourceScheduleClose()
|
||||
}
|
||||
}
|
||||
|
||||
req.res <- pathSourceStaticSetReadyRes{stream: pa.stream}
|
||||
} else {
|
||||
req.res <- pathSourceStaticSetReadyRes{err: fmt.Errorf("terminated")}
|
||||
|
|
@ -392,10 +453,9 @@ func (pa *path) run() {
|
|||
|
||||
case req := <-pa.sourceStaticSetNotReady:
|
||||
if req.source == pa.source {
|
||||
if pa.isOnDemand() && pa.onDemandState != pathOnDemandStateInitial {
|
||||
pa.onDemandCloseSource()
|
||||
} else {
|
||||
pa.sourceSetNotReady()
|
||||
pa.sourceSetNotReady()
|
||||
if pa.hasOnDemandStaticSource() && pa.onDemandStaticSourceState != pathOnDemandStateInitial {
|
||||
pa.onDemandStaticSourceStop()
|
||||
}
|
||||
}
|
||||
close(req.res)
|
||||
|
|
@ -458,19 +518,21 @@ func (pa *path) run() {
|
|||
|
||||
pa.ctxCancel()
|
||||
|
||||
pa.onDemandReadyTimer.Stop()
|
||||
pa.onDemandCloseTimer.Stop()
|
||||
pa.onDemandStaticSourceReadyTimer.Stop()
|
||||
pa.onDemandStaticSourceCloseTimer.Stop()
|
||||
pa.onDemandPublisherReadyTimer.Stop()
|
||||
pa.onDemandPublisherCloseTimer.Stop()
|
||||
|
||||
if onInitCmd != nil {
|
||||
onInitCmd.Close()
|
||||
pa.log(logger.Info, "runOnInit command stopped")
|
||||
}
|
||||
|
||||
for _, req := range pa.describeRequests {
|
||||
for _, req := range pa.describeRequestsOnHold {
|
||||
req.res <- pathDescribeRes{err: fmt.Errorf("terminated")}
|
||||
}
|
||||
|
||||
for _, req := range pa.setupPlayRequests {
|
||||
for _, req := range pa.setupPlayRequestsOnHold {
|
||||
req.res <- pathReaderSetupPlayRes{err: fmt.Errorf("terminated")}
|
||||
}
|
||||
|
||||
|
|
@ -499,20 +561,8 @@ func (pa *path) shouldClose() bool {
|
|||
return pa.conf.Regexp != nil &&
|
||||
pa.source == nil &&
|
||||
len(pa.readers) == 0 &&
|
||||
len(pa.describeRequests) == 0 &&
|
||||
len(pa.setupPlayRequests) == 0
|
||||
}
|
||||
|
||||
func (pa *path) hasStaticSource() bool {
|
||||
return strings.HasPrefix(pa.conf.Source, "rtsp://") ||
|
||||
strings.HasPrefix(pa.conf.Source, "rtsps://") ||
|
||||
strings.HasPrefix(pa.conf.Source, "rtmp://") ||
|
||||
strings.HasPrefix(pa.conf.Source, "http://") ||
|
||||
strings.HasPrefix(pa.conf.Source, "https://")
|
||||
}
|
||||
|
||||
func (pa *path) isOnDemand() bool {
|
||||
return (pa.hasStaticSource() && pa.conf.SourceOnDemand) || pa.conf.RunOnDemand != ""
|
||||
len(pa.describeRequestsOnHold) == 0 &&
|
||||
len(pa.setupPlayRequestsOnHold) == 0
|
||||
}
|
||||
|
||||
func (pa *path) externalCmdEnv() externalcmd.Environment {
|
||||
|
|
@ -531,64 +581,76 @@ func (pa *path) externalCmdEnv() externalcmd.Environment {
|
|||
return env
|
||||
}
|
||||
|
||||
func (pa *path) onDemandStartSource() {
|
||||
pa.onDemandReadyTimer.Stop()
|
||||
if pa.hasStaticSource() {
|
||||
pa.staticSourceCreate()
|
||||
pa.onDemandReadyTimer = time.NewTimer(time.Duration(pa.conf.SourceOnDemandStartTimeout))
|
||||
} else {
|
||||
pa.log(logger.Info, "runOnDemand command started")
|
||||
pa.onDemandCmd = externalcmd.NewCmd(
|
||||
pa.externalCmdPool,
|
||||
pa.conf.RunOnDemand,
|
||||
pa.conf.RunOnDemandRestart,
|
||||
pa.externalCmdEnv(),
|
||||
func(co int) {
|
||||
pa.log(logger.Info, "runOnDemand command exited with code %d", co)
|
||||
})
|
||||
pa.onDemandReadyTimer = time.NewTimer(time.Duration(pa.conf.RunOnDemandStartTimeout))
|
||||
}
|
||||
func (pa *path) onDemandStaticSourceStart() {
|
||||
pa.staticSourceCreate()
|
||||
|
||||
pa.onDemandState = pathOnDemandStateWaitingReady
|
||||
pa.onDemandStaticSourceReadyTimer.Stop()
|
||||
pa.onDemandStaticSourceReadyTimer = time.NewTimer(time.Duration(pa.conf.SourceOnDemandStartTimeout))
|
||||
|
||||
pa.onDemandStaticSourceState = pathOnDemandStateWaitingReady
|
||||
}
|
||||
|
||||
func (pa *path) onDemandScheduleClose() {
|
||||
pa.onDemandCloseTimer.Stop()
|
||||
if pa.hasStaticSource() {
|
||||
pa.onDemandCloseTimer = time.NewTimer(time.Duration(pa.conf.SourceOnDemandCloseAfter))
|
||||
} else {
|
||||
pa.onDemandCloseTimer = time.NewTimer(time.Duration(pa.conf.RunOnDemandCloseAfter))
|
||||
}
|
||||
func (pa *path) onDemandStaticSourceScheduleClose() {
|
||||
pa.onDemandStaticSourceCloseTimer.Stop()
|
||||
pa.onDemandStaticSourceCloseTimer = time.NewTimer(time.Duration(pa.conf.SourceOnDemandCloseAfter))
|
||||
|
||||
pa.onDemandState = pathOnDemandStateClosing
|
||||
pa.onDemandStaticSourceState = pathOnDemandStateClosing
|
||||
}
|
||||
|
||||
func (pa *path) onDemandCloseSource() {
|
||||
if pa.onDemandState == pathOnDemandStateClosing {
|
||||
pa.onDemandCloseTimer.Stop()
|
||||
pa.onDemandCloseTimer = newEmptyTimer()
|
||||
func (pa *path) onDemandStaticSourceStop() {
|
||||
if pa.onDemandStaticSourceState == pathOnDemandStateClosing {
|
||||
pa.onDemandStaticSourceCloseTimer.Stop()
|
||||
pa.onDemandStaticSourceCloseTimer = newEmptyTimer()
|
||||
}
|
||||
|
||||
pa.onDemandStaticSourceState = pathOnDemandStateInitial
|
||||
|
||||
pa.source.(sourceStatic).close()
|
||||
pa.source = nil
|
||||
}
|
||||
|
||||
func (pa *path) onDemandPublisherStart() {
|
||||
pa.log(logger.Info, "runOnDemand command started")
|
||||
pa.onDemandCmd = externalcmd.NewCmd(
|
||||
pa.externalCmdPool,
|
||||
pa.conf.RunOnDemand,
|
||||
pa.conf.RunOnDemandRestart,
|
||||
pa.externalCmdEnv(),
|
||||
func(co int) {
|
||||
pa.log(logger.Info, "runOnDemand command exited with code %d", co)
|
||||
})
|
||||
|
||||
pa.onDemandPublisherReadyTimer.Stop()
|
||||
pa.onDemandPublisherReadyTimer = time.NewTimer(time.Duration(pa.conf.RunOnDemandStartTimeout))
|
||||
|
||||
pa.onDemandPublisherState = pathOnDemandStateWaitingReady
|
||||
}
|
||||
|
||||
func (pa *path) onDemandPublisherScheduleClose() {
|
||||
pa.onDemandPublisherCloseTimer.Stop()
|
||||
pa.onDemandPublisherCloseTimer = time.NewTimer(time.Duration(pa.conf.RunOnDemandCloseAfter))
|
||||
|
||||
pa.onDemandPublisherState = pathOnDemandStateClosing
|
||||
}
|
||||
|
||||
func (pa *path) onDemandPublisherStop() {
|
||||
if pa.onDemandPublisherState == pathOnDemandStateClosing {
|
||||
pa.onDemandPublisherCloseTimer.Stop()
|
||||
pa.onDemandPublisherCloseTimer = newEmptyTimer()
|
||||
}
|
||||
|
||||
// set state before doPublisherRemove()
|
||||
pa.onDemandState = pathOnDemandStateInitial
|
||||
pa.onDemandPublisherState = pathOnDemandStateInitial
|
||||
|
||||
if pa.hasStaticSource() {
|
||||
if pa.sourceReady {
|
||||
pa.sourceSetNotReady()
|
||||
}
|
||||
pa.source.(sourceStatic).close()
|
||||
pa.source = nil
|
||||
} else {
|
||||
if pa.source != nil {
|
||||
pa.source.(publisher).close()
|
||||
pa.doPublisherRemove()
|
||||
}
|
||||
if pa.source != nil {
|
||||
pa.source.(publisher).close()
|
||||
pa.doPublisherRemove()
|
||||
}
|
||||
|
||||
if pa.onDemandCmd != nil {
|
||||
pa.onDemandCmd.Close()
|
||||
pa.onDemandCmd = nil
|
||||
pa.log(logger.Info, "runOnDemand command stopped")
|
||||
}
|
||||
if pa.onDemandCmd != nil {
|
||||
pa.onDemandCmd.Close()
|
||||
pa.onDemandCmd = nil
|
||||
pa.log(logger.Info, "runOnDemand command stopped")
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -596,29 +658,6 @@ func (pa *path) sourceSetReady(tracks gortsplib.Tracks) {
|
|||
pa.sourceReady = true
|
||||
pa.stream = newStream(tracks)
|
||||
|
||||
if pa.isOnDemand() {
|
||||
pa.onDemandReadyTimer.Stop()
|
||||
pa.onDemandReadyTimer = newEmptyTimer()
|
||||
|
||||
for _, req := range pa.describeRequests {
|
||||
req.res <- pathDescribeRes{
|
||||
stream: pa.stream,
|
||||
}
|
||||
}
|
||||
pa.describeRequests = nil
|
||||
|
||||
for _, req := range pa.setupPlayRequests {
|
||||
pa.handleReaderSetupPlayPost(req)
|
||||
}
|
||||
pa.setupPlayRequests = nil
|
||||
|
||||
if len(pa.readers) > 0 {
|
||||
pa.onDemandState = pathOnDemandStateReady
|
||||
} else {
|
||||
pa.onDemandScheduleClose()
|
||||
}
|
||||
}
|
||||
|
||||
pa.parent.onPathSourceReady(pa)
|
||||
|
||||
if pa.conf.RunOnReady != "" {
|
||||
|
|
@ -669,6 +708,7 @@ func (pa *path) staticSourceCreate() {
|
|||
pa.readBufferCount,
|
||||
&pa.sourceStaticWg,
|
||||
pa)
|
||||
|
||||
case strings.HasPrefix(pa.conf.Source, "rtmp://"):
|
||||
pa.source = newRTMPSource(
|
||||
pa.ctx,
|
||||
|
|
@ -677,6 +717,7 @@ func (pa *path) staticSourceCreate() {
|
|||
pa.writeTimeout,
|
||||
&pa.sourceStaticWg,
|
||||
pa)
|
||||
|
||||
case strings.HasPrefix(pa.conf.Source, "http://") ||
|
||||
strings.HasPrefix(pa.conf.Source, "https://"):
|
||||
pa.source = newHLSSource(
|
||||
|
|
@ -700,8 +741,8 @@ func (pa *path) doReaderRemove(r reader) {
|
|||
|
||||
func (pa *path) doPublisherRemove() {
|
||||
if pa.sourceReady {
|
||||
if pa.isOnDemand() && pa.onDemandState != pathOnDemandStateInitial {
|
||||
pa.onDemandCloseSource()
|
||||
if pa.hasOnDemandPublisher() && pa.onDemandPublisherState != pathOnDemandStateInitial {
|
||||
pa.onDemandPublisherStop()
|
||||
} else {
|
||||
pa.sourceSetNotReady()
|
||||
}
|
||||
|
|
@ -725,11 +766,19 @@ func (pa *path) handleDescribe(req pathDescribeReq) {
|
|||
return
|
||||
}
|
||||
|
||||
if pa.isOnDemand() {
|
||||
if pa.onDemandState == pathOnDemandStateInitial {
|
||||
pa.onDemandStartSource()
|
||||
if pa.hasOnDemandStaticSource() {
|
||||
if pa.onDemandStaticSourceState == pathOnDemandStateInitial {
|
||||
pa.onDemandStaticSourceStart()
|
||||
}
|
||||
pa.describeRequests = append(pa.describeRequests, req)
|
||||
pa.describeRequestsOnHold = append(pa.describeRequestsOnHold, req)
|
||||
return
|
||||
}
|
||||
|
||||
if pa.hasOnDemandPublisher() {
|
||||
if pa.onDemandPublisherState == pathOnDemandStateInitial {
|
||||
pa.onDemandPublisherStart()
|
||||
}
|
||||
pa.describeRequestsOnHold = append(pa.describeRequestsOnHold, req)
|
||||
return
|
||||
}
|
||||
|
||||
|
|
@ -761,14 +810,14 @@ func (pa *path) handlePublisherRemove(req pathPublisherRemoveReq) {
|
|||
}
|
||||
|
||||
func (pa *path) handlePublisherAnnounce(req pathPublisherAnnounceReq) {
|
||||
if pa.source != nil {
|
||||
if pa.conf.Source != "publisher" {
|
||||
req.res <- pathPublisherAnnounceRes{
|
||||
err: fmt.Errorf("can't publish to path '%s' since 'source' is not 'publisher'", pa.name),
|
||||
}
|
||||
return
|
||||
if pa.conf.Source != "publisher" {
|
||||
req.res <- pathPublisherAnnounceRes{
|
||||
err: fmt.Errorf("can't publish to path '%s' since 'source' is not 'publisher'", pa.name),
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
if pa.source != nil {
|
||||
if pa.conf.DisablePublisherOverride {
|
||||
req.res <- pathPublisherAnnounceRes{err: fmt.Errorf("someone is already publishing to path '%s'", pa.name)}
|
||||
return
|
||||
|
|
@ -794,13 +843,36 @@ func (pa *path) handlePublisherRecord(req pathPublisherRecordReq) {
|
|||
|
||||
pa.sourceSetReady(req.tracks)
|
||||
|
||||
if pa.hasOnDemandPublisher() {
|
||||
pa.onDemandPublisherReadyTimer.Stop()
|
||||
pa.onDemandPublisherReadyTimer = newEmptyTimer()
|
||||
|
||||
for _, req := range pa.describeRequestsOnHold {
|
||||
req.res <- pathDescribeRes{
|
||||
stream: pa.stream,
|
||||
}
|
||||
}
|
||||
pa.describeRequestsOnHold = nil
|
||||
|
||||
for _, req := range pa.setupPlayRequestsOnHold {
|
||||
pa.handleReaderSetupPlayPost(req)
|
||||
}
|
||||
pa.setupPlayRequestsOnHold = nil
|
||||
|
||||
if len(pa.readers) > 0 {
|
||||
pa.onDemandPublisherState = pathOnDemandStateReady
|
||||
} else {
|
||||
pa.onDemandPublisherScheduleClose()
|
||||
}
|
||||
}
|
||||
|
||||
req.res <- pathPublisherRecordRes{stream: pa.stream}
|
||||
}
|
||||
|
||||
func (pa *path) handlePublisherPause(req pathPublisherPauseReq) {
|
||||
if req.author == pa.source && pa.sourceReady {
|
||||
if pa.isOnDemand() && pa.onDemandState != pathOnDemandStateInitial {
|
||||
pa.onDemandCloseSource()
|
||||
if pa.hasOnDemandPublisher() && pa.onDemandPublisherState != pathOnDemandStateInitial {
|
||||
pa.onDemandPublisherStop()
|
||||
} else {
|
||||
pa.sourceSetNotReady()
|
||||
}
|
||||
|
|
@ -814,10 +886,12 @@ func (pa *path) handleReaderRemove(req pathReaderRemoveReq) {
|
|||
}
|
||||
close(req.res)
|
||||
|
||||
if pa.isOnDemand() &&
|
||||
len(pa.readers) == 0 &&
|
||||
pa.onDemandState == pathOnDemandStateReady {
|
||||
pa.onDemandScheduleClose()
|
||||
if len(pa.readers) == 0 {
|
||||
if pa.hasOnDemandStaticSource() && pa.onDemandStaticSourceState == pathOnDemandStateReady {
|
||||
pa.onDemandStaticSourceScheduleClose()
|
||||
} else if pa.hasOnDemandPublisher() && pa.onDemandPublisherState == pathOnDemandStateReady {
|
||||
pa.onDemandPublisherScheduleClose()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -827,11 +901,19 @@ func (pa *path) handleReaderSetupPlay(req pathReaderSetupPlayReq) {
|
|||
return
|
||||
}
|
||||
|
||||
if pa.isOnDemand() {
|
||||
if pa.onDemandState == pathOnDemandStateInitial {
|
||||
pa.onDemandStartSource()
|
||||
if pa.hasOnDemandStaticSource() {
|
||||
if pa.onDemandStaticSourceState == pathOnDemandStateInitial {
|
||||
pa.onDemandStaticSourceStart()
|
||||
}
|
||||
pa.setupPlayRequests = append(pa.setupPlayRequests, req)
|
||||
pa.setupPlayRequestsOnHold = append(pa.setupPlayRequestsOnHold, req)
|
||||
return
|
||||
}
|
||||
|
||||
if pa.hasOnDemandPublisher() {
|
||||
if pa.onDemandPublisherState == pathOnDemandStateInitial {
|
||||
pa.onDemandPublisherStart()
|
||||
}
|
||||
pa.setupPlayRequestsOnHold = append(pa.setupPlayRequestsOnHold, req)
|
||||
return
|
||||
}
|
||||
|
||||
|
|
@ -841,10 +923,10 @@ func (pa *path) handleReaderSetupPlay(req pathReaderSetupPlayReq) {
|
|||
func (pa *path) handleReaderSetupPlayPost(req pathReaderSetupPlayReq) {
|
||||
pa.readers[req.author] = pathReaderStatePrePlay
|
||||
|
||||
if pa.isOnDemand() && pa.onDemandState == pathOnDemandStateClosing {
|
||||
pa.onDemandState = pathOnDemandStateReady
|
||||
pa.onDemandCloseTimer.Stop()
|
||||
pa.onDemandCloseTimer = newEmptyTimer()
|
||||
if pa.hasOnDemandPublisher() && pa.onDemandPublisherState == pathOnDemandStateClosing {
|
||||
pa.onDemandPublisherState = pathOnDemandStateReady
|
||||
pa.onDemandPublisherCloseTimer.Stop()
|
||||
pa.onDemandPublisherCloseTimer = newEmptyTimer()
|
||||
}
|
||||
|
||||
req.res <- pathReaderSetupPlayRes{
|
||||
|
|
|
|||
|
|
@ -1,6 +1,10 @@
|
|||
package core
|
||||
|
||||
// source is an entity that can provide a stream, statically or dynamically.
|
||||
// source is an entity that can provide a stream.
|
||||
// it can be:
|
||||
// - a publisher
|
||||
// - a static source
|
||||
// - a redirect source
|
||||
type source interface {
|
||||
onSourceAPIDescribe() interface{}
|
||||
}
|
||||
|
|
@ -10,3 +14,13 @@ type sourceStatic interface {
|
|||
source
|
||||
close()
|
||||
}
|
||||
|
||||
// sourceRedirect is a source that redirects to another one.
|
||||
type sourceRedirect struct{}
|
||||
|
||||
// onSourceAPIDescribe implements source.
|
||||
func (*sourceRedirect) onSourceAPIDescribe() interface{} {
|
||||
return struct {
|
||||
Type string `json:"type"`
|
||||
}{"redirect"}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue