diff --git a/client.go b/client.go index 0e542bf5..73f9fe3e 100644 --- a/client.go +++ b/client.go @@ -115,10 +115,9 @@ type client struct { describe chan describeRes tcpFrame chan *base.InterleavedFrame terminate chan struct{} - done chan struct{} } -func newClient(p *program, nconn net.Conn) *client { +func newClient(p *program, nconn net.Conn) { c := &client{ p: p, conn: gortsplib.NewConnServer(gortsplib.ConnServerConf{ @@ -132,11 +131,14 @@ func newClient(p *program, nconn net.Conn) *client { describe: make(chan describeRes), tcpFrame: make(chan *base.InterleavedFrame), terminate: make(chan struct{}), - done: make(chan struct{}), } + p.clients[c] = struct{}{} + atomic.AddInt64(p.countClients, 1) + c.log("connected") + + p.clientsWg.Add(1) go c.run() - return c } func (c *client) close() { @@ -194,6 +196,8 @@ var errRunPlay = errors.New("play") var errRunRecord = errors.New("record") func (c *client) run() { + defer c.p.clientsWg.Done() + var onConnectCmd *externalcmd.ExternalCmd if c.p.conf.RunOnConnect != "" { var err error @@ -215,7 +219,6 @@ func (c *client) run() { close(c.describe) close(c.tcpFrame) - close(c.done) } func (c *client) writeResError(cseq base.HeaderValue, code base.StatusCode, err error) { diff --git a/main.go b/main.go index d045bb79..484f03a9 100644 --- a/main.go +++ b/main.go @@ -5,6 +5,7 @@ import ( "log" "net" "os" + "sync" "sync/atomic" "time" @@ -31,6 +32,7 @@ type program struct { serverUdpRtcp *serverUDP serverTcp *serverTCP clients map[*client]struct{} + clientsWg sync.WaitGroup udpPublishersMap *udpPublishersMap readersMap *readersMap // use pointers to avoid a crash on 32bit platforms @@ -186,6 +188,8 @@ func (p *program) log(format string, args ...interface{}) { } func (p *program) run() { + defer close(p.done) + if p.metrics != nil { go p.metrics.run() } @@ -220,10 +224,7 @@ outer: } case conn := <-p.clientNew: - c := newClient(p, conn) - p.clients[c] = struct{}{} - atomic.AddInt64(p.countClients, 1) - c.log("connected") + newClient(p, conn) case client := <-p.clientClose: if _, ok := p.clients[client]; !ok { @@ -352,7 +353,7 @@ outer: p.readersMap.clear() for _, p := range p.paths { - p.onClose(true) + p.onClose() } p.serverTcp.close() @@ -367,9 +368,10 @@ outer: for c := range p.clients { c.close() - <-c.done } + p.clientsWg.Wait() + if p.metrics != nil { p.metrics.close() } @@ -389,7 +391,6 @@ outer: close(p.clientRecord) close(p.sourceRtspReady) close(p.sourceRtspNotReady) - close(p.done) } func (p *program) close() { diff --git a/path.go b/path.go index f682732f..330a283f 100644 --- a/path.go +++ b/path.go @@ -77,7 +77,7 @@ func (pa *path) onInit() { } } -func (pa *path) onClose(wait bool) { +func (pa *path) onClose() { if source, ok := pa.source.(*sourceRtsp); ok { close(source.terminate) <-source.done @@ -105,10 +105,6 @@ func (pa *path) onClose(wait bool) { c.describe <- describeRes{nil, fmt.Errorf("publisher of path '%s' has timed out", pa.name)} } else { c.close() - - if wait { - <-c.done - } } } } @@ -193,7 +189,7 @@ func (pa *path) onCheck() { if pa.conf.Regexp != nil && pa.source == nil && !pa.hasClients() { - pa.onClose(false) + pa.onClose() delete(pa.p.paths, pa.name) } } diff --git a/servertcp.go b/servertcp.go index e8341b14..f0cef10e 100644 --- a/servertcp.go +++ b/servertcp.go @@ -34,6 +34,8 @@ func (l *serverTCP) log(format string, args ...interface{}) { } func (l *serverTCP) run() { + defer close(l.done) + for { conn, err := l.listener.AcceptTCP() if err != nil { @@ -42,8 +44,6 @@ func (l *serverTCP) run() { l.p.clientNew <- conn } - - close(l.done) } func (l *serverTCP) close() { diff --git a/serverudp.go b/serverudp.go index e58fee06..9882ea4a 100644 --- a/serverudp.go +++ b/serverudp.go @@ -60,6 +60,8 @@ func (l *serverUDP) log(format string, args ...interface{}) { } func (l *serverUDP) run() { + defer close(l.done) + writeDone := make(chan struct{}) go func() { defer close(writeDone) @@ -99,8 +101,6 @@ func (l *serverUDP) run() { close(l.writec) <-writeDone - - close(l.done) } func (l *serverUDP) close() { diff --git a/sourcertmp.go b/sourcertmp.go index 3b5e7853..08c6866c 100644 --- a/sourcertmp.go +++ b/sourcertmp.go @@ -62,6 +62,8 @@ func newSourceRtmp(p *program, path *path) *sourceRtmp { func (s *sourceRtmp) isSource() {} func (s *sourceRtmp) run(initialState sourceRtmpState) { + defer close(s.done) + s.applyState(initialState) outer: @@ -81,7 +83,6 @@ outer: } close(s.setState) - close(s.done) } func (s *sourceRtmp) applyState(state sourceRtmpState) { diff --git a/sourcertsp.go b/sourcertsp.go index a8e70f64..e9cd83df 100644 --- a/sourcertsp.go +++ b/sourcertsp.go @@ -57,6 +57,8 @@ func newSourceRtsp(p *program, path *path) *sourceRtsp { func (s *sourceRtsp) isSource() {} func (s *sourceRtsp) run(initialState sourceRtspState) { + defer close(s.done) + s.applyState(initialState) outer: @@ -76,7 +78,6 @@ outer: } close(s.setState) - close(s.done) } func (s *sourceRtsp) applyState(state sourceRtspState) {