diff --git a/main.go b/main.go index 31fc1588..e347b281 100644 --- a/main.go +++ b/main.go @@ -44,8 +44,8 @@ type program struct { clientSetupPlay chan clientSetupPlayReq clientPlay chan *client clientRecord chan *client - sourceReady chan *source - sourceNotReady chan *source + proxyReady chan *proxy + proxyNotReady chan *proxy terminate chan struct{} done chan struct{} } @@ -88,8 +88,8 @@ func newProgram(args []string, stdin io.Reader) (*program, error) { clientSetupPlay: make(chan clientSetupPlayReq), clientPlay: make(chan *client), clientRecord: make(chan *client), - sourceReady: make(chan *source), - sourceNotReady: make(chan *source), + proxyReady: make(chan *proxy), + proxyNotReady: make(chan *proxy), terminate: make(chan struct{}), done: make(chan struct{}), } @@ -266,13 +266,13 @@ outer: client.path.onPublisherSetReady() - case source := <-p.sourceReady: - source.path.log("source ready") - source.path.onPublisherSetReady() + case proxy := <-p.proxyReady: + proxy.path.log("proxy ready") + proxy.path.onPublisherSetReady() - case source := <-p.sourceNotReady: - source.path.log("source not ready") - source.path.onPublisherSetNotReady() + case proxy := <-p.proxyNotReady: + proxy.path.log("proxy not ready") + proxy.path.onPublisherSetNotReady() case <-p.terminate: break outer @@ -298,8 +298,8 @@ outer: case <-p.clientPlay: case <-p.clientRecord: - case <-p.sourceReady: - case <-p.sourceNotReady: + case <-p.proxyReady: + case <-p.proxyNotReady: } } }() @@ -343,8 +343,8 @@ outer: close(p.clientSetupPlay) close(p.clientPlay) close(p.clientRecord) - close(p.sourceReady) - close(p.sourceNotReady) + close(p.proxyReady) + close(p.proxyNotReady) close(p.done) } diff --git a/path.go b/path.go index 0c0af7e9..88b3bf17 100644 --- a/path.go +++ b/path.go @@ -8,11 +8,11 @@ import ( const ( describeTimeout = 5 * time.Second - sourceStopAfterDescribeSecs = 10 * time.Second + proxyStopAfterDescribeSecs = 10 * time.Second onDemandCmdStopAfterDescribeSecs = 10 * time.Second ) -// a publisher is either a client or a source +// a publisher is either a client or a proxy type publisher interface { isPublisher() } @@ -21,7 +21,7 @@ type path struct { p *program name string conf *pathConf - source *source + proxy *proxy publisher publisher publisherReady bool publisherTrackCount int @@ -40,8 +40,8 @@ func newPath(p *program, name string, conf *pathConf) *path { } if conf.Source != "record" { - s := newSource(p, pa, conf) - pa.source = s + s := newProxy(p, pa, conf) + pa.proxy = s pa.publisher = s } @@ -53,8 +53,8 @@ func (pa *path) log(format string, args ...interface{}) { } func (pa *path) onInit() { - if pa.source != nil { - go pa.source.run(pa.source.state) + if pa.proxy != nil { + go pa.proxy.run(pa.proxy.state) } if pa.conf.RunOnInit != "" { @@ -69,9 +69,9 @@ func (pa *path) onInit() { } func (pa *path) onClose(wait bool) { - if pa.source != nil { - close(pa.source.terminate) - <-pa.source.done + if pa.proxy != nil { + close(pa.proxy.terminate) + <-pa.proxy.done } if pa.onInitCmd != nil { @@ -142,16 +142,16 @@ func (pa *path) onCheck() { } } - // stop on demand source if needed - if pa.source != nil && + // stop on demand proxy if needed + if pa.proxy != nil && pa.conf.SourceOnDemand && - pa.source.state == sourceStateRunning && + pa.proxy.state == proxyStateRunning && !pa.hasClients() && - time.Since(pa.lastDescribeReq) >= sourceStopAfterDescribeSecs { - pa.log("stopping on demand source (not requested anymore)") + time.Since(pa.lastDescribeReq) >= proxyStopAfterDescribeSecs { + pa.log("stopping on demand proxy (not requested anymore)") atomic.AddInt64(&pa.p.countProxiesRunning, -1) - pa.source.state = sourceStateStopped - pa.source.setState <- pa.source.state + pa.proxy.state = proxyStateStopped + pa.proxy.setState <- pa.proxy.state } // stop on demand command if needed @@ -240,12 +240,12 @@ func (pa *path) onDescribe(client *client) { // publisher was found but is not ready: put the client on hold } else if !pa.publisherReady { - if pa.source != nil && pa.source.state == sourceStateStopped { // start if needed - pa.log("starting on demand source") + if pa.proxy != nil && pa.proxy.state == proxyStateStopped { // start if needed + pa.log("starting on demand proxy") pa.lastDescribeActivation = time.Now() atomic.AddInt64(&pa.p.countProxiesRunning, +1) - pa.source.state = sourceStateRunning - pa.source.setState <- pa.source.state + pa.proxy.state = proxyStateRunning + pa.proxy.setState <- pa.proxy.state } client.path = pa diff --git a/source.go b/proxy.go similarity index 76% rename from source.go rename to proxy.go index 65cf7bfa..745d84e7 100644 --- a/source.go +++ b/proxy.go @@ -10,39 +10,39 @@ import ( ) const ( - sourceRetryInterval = 5 * time.Second - sourceUDPReadBufferSize = 2048 - sourceTCPReadBufferSize = 128 * 1024 + proxyRetryInterval = 5 * time.Second + proxyUDPReadBufferSize = 2048 + proxyTCPReadBufferSize = 128 * 1024 ) -type sourceState int +type proxyState int const ( - sourceStateStopped sourceState = iota - sourceStateRunning + proxyStateStopped proxyState = iota + proxyStateRunning ) -type source struct { +type proxy struct { p *program path *path pathConf *pathConf - state sourceState + state proxyState tracks []*gortsplib.Track innerRunning bool innerTerminate chan struct{} innerDone chan struct{} - setState chan sourceState + setState chan proxyState terminate chan struct{} done chan struct{} } -func newSource(p *program, path *path, pathConf *pathConf) *source { - s := &source{ +func newProxy(p *program, path *path, pathConf *pathConf) *proxy { + s := &proxy{ p: p, path: path, pathConf: pathConf, - setState: make(chan sourceState), + setState: make(chan proxyState), terminate: make(chan struct{}), done: make(chan struct{}), } @@ -50,18 +50,18 @@ func newSource(p *program, path *path, pathConf *pathConf) *source { atomic.AddInt64(&p.countProxies, +1) if pathConf.SourceOnDemand { - s.state = sourceStateStopped + s.state = proxyStateStopped } else { - s.state = sourceStateRunning + s.state = proxyStateRunning atomic.AddInt64(&p.countProxiesRunning, +1) } return s } -func (s *source) isPublisher() {} +func (s *proxy) isPublisher() {} -func (s *source) run(initialState sourceState) { +func (s *proxy) run(initialState proxyState) { s.applyState(initialState) outer: @@ -84,10 +84,10 @@ outer: close(s.done) } -func (s *source) applyState(state sourceState) { - if state == sourceStateRunning { +func (s *proxy) applyState(state proxyState) { + if state == proxyStateRunning { if !s.innerRunning { - s.path.log("source started") + s.path.log("proxy started") s.innerRunning = true s.innerTerminate = make(chan struct{}) s.innerDone = make(chan struct{}) @@ -98,12 +98,12 @@ func (s *source) applyState(state sourceState) { close(s.innerTerminate) <-s.innerDone s.innerRunning = false - s.path.log("source stopped") + s.path.log("proxy stopped") } } } -func (s *source) runInner() { +func (s *proxy) runInner() { defer close(s.innerDone) for { @@ -113,7 +113,7 @@ func (s *source) runInner() { return false } - t := time.NewTimer(sourceRetryInterval) + t := time.NewTimer(proxyRetryInterval) defer t.Stop() select { @@ -130,8 +130,8 @@ func (s *source) runInner() { } } -func (s *source) runInnerInner() bool { - s.path.log("source connecting") +func (s *proxy) runInnerInner() bool { + s.path.log("proxy connecting") var conn *gortsplib.ConnClient var err error @@ -152,21 +152,21 @@ func (s *source) runInnerInner() bool { } if err != nil { - s.path.log("source ERR: %s", err) + s.path.log("proxy ERR: %s", err) return true } _, err = conn.Options(s.pathConf.sourceUrl) if err != nil { conn.Close() - s.path.log("source ERR: %s", err) + s.path.log("proxy ERR: %s", err) return true } tracks, _, err := conn.Describe(s.pathConf.sourceUrl) if err != nil { conn.Close() - s.path.log("source ERR: %s", err) + s.path.log("proxy ERR: %s", err) return true } @@ -184,7 +184,7 @@ func (s *source) runInnerInner() bool { } } -func (s *source) runUDP(conn *gortsplib.ConnClient) bool { +func (s *proxy) runUDP(conn *gortsplib.ConnClient) bool { var rtpReads []gortsplib.UDPReadFunc var rtcpReads []gortsplib.UDPReadFunc @@ -202,7 +202,7 @@ func (s *source) runUDP(conn *gortsplib.ConnClient) bool { } conn.Close() - s.path.log("source ERR: %s", err) + s.path.log("proxy ERR: %s", err) return true } @@ -215,11 +215,11 @@ func (s *source) runUDP(conn *gortsplib.ConnClient) bool { _, err := conn.Play(s.pathConf.sourceUrl) if err != nil { conn.Close() - s.path.log("source ERR: %s", err) + s.path.log("proxy ERR: %s", err) return true } - s.p.sourceReady <- s + s.p.proxyReady <- s var wg sync.WaitGroup @@ -229,7 +229,7 @@ func (s *source) runUDP(conn *gortsplib.ConnClient) bool { go func(trackId int, rtpRead gortsplib.UDPReadFunc) { defer wg.Done() - multiBuf := newMultiBuffer(2, sourceUDPReadBufferSize) + multiBuf := newMultiBuffer(2, proxyUDPReadBufferSize) for { buf := multiBuf.next() @@ -250,7 +250,7 @@ func (s *source) runUDP(conn *gortsplib.ConnClient) bool { go func(trackId int, rtcpRead gortsplib.UDPReadFunc) { defer wg.Done() - multiBuf := newMultiBuffer(2, sourceUDPReadBufferSize) + multiBuf := newMultiBuffer(2, proxyUDPReadBufferSize) for { buf := multiBuf.next() @@ -283,7 +283,7 @@ outer: case err := <-tcpConnDone: conn.Close() - s.path.log("source ERR: %s", err) + s.path.log("proxy ERR: %s", err) ret = true break outer } @@ -291,17 +291,17 @@ outer: wg.Wait() - s.p.sourceNotReady <- s + s.p.proxyNotReady <- s return ret } -func (s *source) runTCP(conn *gortsplib.ConnClient) bool { +func (s *proxy) runTCP(conn *gortsplib.ConnClient) bool { for _, track := range s.tracks { _, err := conn.SetupTCP(s.pathConf.sourceUrl, track) if err != nil { conn.Close() - s.path.log("source ERR: %s", err) + s.path.log("proxy ERR: %s", err) return true } } @@ -309,14 +309,14 @@ func (s *source) runTCP(conn *gortsplib.ConnClient) bool { _, err := conn.Play(s.pathConf.sourceUrl) if err != nil { conn.Close() - s.path.log("source ERR: %s", err) + s.path.log("proxy ERR: %s", err) return true } - s.p.sourceReady <- s + s.p.proxyReady <- s frame := &gortsplib.InterleavedFrame{} - multiBuf := newMultiBuffer(2, sourceTCPReadBufferSize) + multiBuf := newMultiBuffer(2, proxyTCPReadBufferSize) tcpConnDone := make(chan error) go func() { @@ -347,13 +347,13 @@ outer: case err := <-tcpConnDone: conn.Close() - s.path.log("source ERR: %s", err) + s.path.log("proxy ERR: %s", err) ret = true break outer } } - s.p.sourceNotReady <- s + s.p.proxyNotReady <- s return ret } diff --git a/rtsp-simple-server.yml b/rtsp-simple-server.yml index 0d6bd854..8719d492 100644 --- a/rtsp-simple-server.yml +++ b/rtsp-simple-server.yml @@ -39,7 +39,7 @@ paths: all: # source of the stream - this can be: # * record -> the stream is provided by a client through the RECORD command (like ffmpeg) - # * rtsp://original-url -> the stream is pulled from another RTSP server + # * rtsp://original-url -> the stream is pulled from another RTSP server (proxy mode) source: record # if the source is an RTSP url, this is the protocol that will be used to pull the stream sourceProtocol: udp