diff --git a/client.go b/client.go index e5c137fb..7f1b6564 100644 --- a/client.go +++ b/client.go @@ -159,11 +159,11 @@ func (c *client) close() { } } - c.path.onPublisherSetNotReady() + c.path.onSourceSetNotReady() } - if c.path != nil && c.path.publisher == c { - c.path.onPublisherRemove() + if c.path != nil && c.path.source == c { + c.path.onSourceRemove() } close(c.terminate) @@ -175,7 +175,7 @@ func (c *client) log(format string, args ...interface{}) { c.p.log("[client %s] "+format, append([]interface{}{c.conn.NetConn().RemoteAddr().String()}, args...)...) } -func (c *client) isPublisher() {} +func (c *client) isSource() {} func (c *client) ip() net.IP { return c.conn.NetConn().RemoteAddr().(*net.TCPAddr).IP @@ -642,7 +642,7 @@ func (c *client) handleRequest(req *base.Request) error { return errRunTerminate } - if len(c.streamTracks) >= c.path.publisherTrackCount { + if len(c.streamTracks) >= c.path.sourceTrackCount { c.writeResError(cseq, base.StatusBadRequest, fmt.Errorf("all the tracks have already been setup")) return errRunTerminate } @@ -697,7 +697,7 @@ func (c *client) handleRequest(req *base.Request) error { return errRunTerminate } - if len(c.streamTracks) >= c.path.publisherTrackCount { + if len(c.streamTracks) >= c.path.sourceTrackCount { c.writeResError(cseq, base.StatusBadRequest, fmt.Errorf("all the tracks have already been setup")) return errRunTerminate } @@ -781,7 +781,7 @@ func (c *client) handleRequest(req *base.Request) error { return errRunTerminate } - if len(c.streamTracks) != c.path.publisherTrackCount { + if len(c.streamTracks) != c.path.sourceTrackCount { c.writeResError(cseq, base.StatusBadRequest, fmt.Errorf("not all tracks have been setup")) return errRunTerminate } diff --git a/main.go b/main.go index b871f605..4f4c2e7b 100644 --- a/main.go +++ b/main.go @@ -243,15 +243,15 @@ outer: p.paths[req.pathName] = newPath(p, req.pathName, req.pathConf) } else { - if path.publisher != nil { + if path.source != nil { req.res <- fmt.Errorf("someone is already publishing on path '%s'", req.pathName) continue } } - p.paths[req.pathName].publisher = req.client - p.paths[req.pathName].publisherTrackCount = req.trackCount - p.paths[req.pathName].publisherSdp = req.sdp + p.paths[req.pathName].source = req.client + p.paths[req.pathName].sourceTrackCount = req.trackCount + p.paths[req.pathName].sourceSdp = req.sdp req.client.path = p.paths[req.pathName] req.client.state = clientStatePreRecord @@ -259,12 +259,12 @@ outer: case req := <-p.clientSetupPlay: path, ok := p.paths[req.pathName] - if !ok || !path.publisherReady { + if !ok || !path.sourceReady { req.res <- fmt.Errorf("no one is publishing on path '%s'", req.pathName) continue } - if req.trackId >= path.publisherTrackCount { + if req.trackId >= path.sourceTrackCount { req.res <- fmt.Errorf("track %d does not exist", req.trackId) continue } @@ -300,19 +300,19 @@ outer: } } - client.path.onPublisherSetReady() + client.path.onSourceSetReady() case s := <-p.sourceRtspReady: - s.path.onPublisherSetReady() + s.path.onSourceSetReady() case s := <-p.sourceRtspNotReady: - s.path.onPublisherSetNotReady() + s.path.onSourceSetNotReady() case s := <-p.sourceRtmpReady: - s.path.onPublisherSetReady() + s.path.onSourceSetReady() case s := <-p.sourceRtmpNotReady: - s.path.onPublisherSetNotReady() + s.path.onSourceSetNotReady() case <-p.terminate: break outer diff --git a/path.go b/path.go index c04717cb..8b18cb54 100644 --- a/path.go +++ b/path.go @@ -13,19 +13,19 @@ const ( onDemandCmdStopAfterDescribePeriod = 10 * time.Second ) -// a publisher can be a client, a sourceRtsp or a sourceRtmp -type publisher interface { - isPublisher() +// a source can be a client, a sourceRtsp or a sourceRtmp +type source interface { + isSource() } type path struct { p *program name string conf *pathConf - publisher publisher - publisherReady bool - publisherTrackCount int - publisherSdp []byte + source source + sourceReady bool + sourceTrackCount int + sourceSdp []byte lastDescribeReq time.Time lastDescribeActivation time.Time onInitCmd *externalCmd @@ -41,11 +41,11 @@ func newPath(p *program, name string, conf *pathConf) *path { if strings.HasPrefix(conf.Source, "rtsp://") { s := newSourceRtsp(p, pa) - pa.publisher = s + pa.source = s } else if strings.HasPrefix(conf.Source, "rtmp://") { s := newSourceRtmp(p, pa) - pa.publisher = s + pa.source = s } return pa @@ -56,10 +56,10 @@ func (pa *path) log(format string, args ...interface{}) { } func (pa *path) onInit() { - if source, ok := pa.publisher.(*sourceRtsp); ok { + if source, ok := pa.source.(*sourceRtsp); ok { go source.run(source.state) - } else if source, ok := pa.publisher.(*sourceRtmp); ok { + } else if source, ok := pa.source.(*sourceRtmp); ok { go source.run(source.state) } @@ -75,11 +75,11 @@ func (pa *path) onInit() { } func (pa *path) onClose(wait bool) { - if source, ok := pa.publisher.(*sourceRtsp); ok { + if source, ok := pa.source.(*sourceRtsp); ok { close(source.terminate) <-source.done - } else if source, ok := pa.publisher.(*sourceRtmp); ok { + } else if source, ok := pa.source.(*sourceRtmp); ok { close(source.terminate) <-source.done } @@ -131,7 +131,7 @@ func (pa *path) hasClientsWaitingDescribe() bool { func (pa *path) hasClientReaders() bool { for c := range pa.p.clients { - if c.path == pa && c != pa.publisher { + if c.path == pa && c != pa.source { return true } } @@ -153,7 +153,7 @@ func (pa *path) onCheck() { } // stop on demand rtsp source if needed - if source, ok := pa.publisher.(*sourceRtsp); ok { + if source, ok := pa.source.(*sourceRtsp); ok { if pa.conf.SourceOnDemand && source.state == sourceRtspStateRunning && !pa.hasClients() && @@ -165,7 +165,7 @@ func (pa *path) onCheck() { } // stop on demand rtmp source if needed - } else if source, ok := pa.publisher.(*sourceRtmp); ok { + } else if source, ok := pa.source.(*sourceRtmp); ok { if pa.conf.SourceOnDemand && source.state == sourceRtmpStateRunning && !pa.hasClients() && @@ -188,28 +188,28 @@ func (pa *path) onCheck() { // remove regular expression paths if pa.conf.regexp != nil && - pa.publisher == nil && + pa.source == nil && !pa.hasClients() { pa.onClose(false) delete(pa.p.paths, pa.name) } } -func (pa *path) onPublisherRemove() { - pa.publisher = nil +func (pa *path) onSourceRemove() { + pa.source = nil // close all clients that are reading or waiting for reading for c := range pa.p.clients { if c.path == pa && c.state != clientStateWaitDescription && - c != pa.publisher { + c != pa.source { c.close() } } } -func (pa *path) onPublisherSetReady() { - pa.publisherReady = true +func (pa *path) onSourceSetReady() { + pa.sourceReady = true // reply to all clients that are waiting for a description for c := range pa.p.clients { @@ -217,19 +217,19 @@ func (pa *path) onPublisherSetReady() { c.path == pa { c.path = nil c.state = clientStateInitial - c.describe <- describeRes{pa.publisherSdp, nil} + c.describe <- describeRes{pa.sourceSdp, nil} } } } -func (pa *path) onPublisherSetNotReady() { - pa.publisherReady = false +func (pa *path) onSourceSetNotReady() { + pa.sourceReady = false // close all clients that are reading or waiting for reading for c := range pa.p.clients { if c.path == pa && c.state != clientStateWaitDescription && - c != pa.publisher { + c != pa.source { c.close() } } @@ -239,7 +239,7 @@ func (pa *path) onDescribe(client *client) { pa.lastDescribeReq = time.Now() // publisher not found - if pa.publisher == nil { + if pa.source == nil { // on demand command is available: put the client on hold if pa.conf.RunOnDemand != "" { if pa.onDemandCmd == nil { // start if needed @@ -262,9 +262,9 @@ func (pa *path) onDescribe(client *client) { } // publisher was found but is not ready: put the client on hold - } else if !pa.publisherReady { + } else if !pa.sourceReady { // start rtsp source if needed - if source, ok := pa.publisher.(*sourceRtsp); ok { + if source, ok := pa.source.(*sourceRtsp); ok { if source.state == sourceRtspStateStopped { pa.log("starting on demand rtsp source") pa.lastDescribeActivation = time.Now() @@ -274,7 +274,7 @@ func (pa *path) onDescribe(client *client) { } // start rtmp source if needed - } else if source, ok := pa.publisher.(*sourceRtmp); ok { + } else if source, ok := pa.source.(*sourceRtmp); ok { if source.state == sourceRtmpStateStopped { pa.log("starting on demand rtmp source") pa.lastDescribeActivation = time.Now() @@ -289,6 +289,6 @@ func (pa *path) onDescribe(client *client) { // publisher was found and is ready } else { - client.describe <- describeRes{pa.publisherSdp, nil} + client.describe <- describeRes{pa.sourceSdp, nil} } } diff --git a/sourcertmp.go b/sourcertmp.go index e8afa0e9..3b5e7853 100644 --- a/sourcertmp.go +++ b/sourcertmp.go @@ -59,7 +59,7 @@ func newSourceRtmp(p *program, path *path) *sourceRtmp { return s } -func (s *sourceRtmp) isPublisher() {} +func (s *sourceRtmp) isSource() {} func (s *sourceRtmp) run(initialState sourceRtmpState) { s.applyState(initialState) @@ -248,8 +248,8 @@ func (s *sourceRtmp) runInnerInner() bool { return true } - s.path.publisherSdp = tracks.Write() - s.path.publisherTrackCount = len(tracks) + s.path.sourceSdp = tracks.Write() + s.path.sourceTrackCount = len(tracks) s.p.sourceRtmpReady <- s s.path.log("rtmp source ready") diff --git a/sourcertsp.go b/sourcertsp.go index 11f7ae3b..412a4810 100644 --- a/sourcertsp.go +++ b/sourcertsp.go @@ -54,7 +54,7 @@ func newSourceRtsp(p *program, path *path) *sourceRtsp { return s } -func (s *sourceRtsp) isPublisher() {} +func (s *sourceRtsp) isSource() {} func (s *sourceRtsp) run(initialState sourceRtspState) { s.applyState(initialState) @@ -161,8 +161,8 @@ func (s *sourceRtsp) runInnerInner() bool { } // create a filtered SDP that is used by the server (not by the client) - s.path.publisherSdp = tracks.Write() - s.path.publisherTrackCount = len(tracks) + s.path.sourceSdp = tracks.Write() + s.path.sourceTrackCount = len(tracks) s.tracks = tracks if s.path.conf.sourceProtocolParsed == gortsplib.StreamProtocolUDP {