diff --git a/README.md b/README.md index da5693e9..08b6635d 100644 --- a/README.md +++ b/README.md @@ -207,9 +207,9 @@ There are multiple ways to monitor the server usage over time: ``` Obtaining: ``` - clients 23 1596122687740 - publishers 15 1596122687740 - readers 8 1596122687740 + rtsp_clients{state="idle"} 2 1596122687740 + rtsp_clients{state="publishing"} 15 1596122687740 + rtsp_clients{state="reading"} 8 1596122687740 ``` * A performance monitor, compatible with pprof, can be enabled with the option `pprof: yes`; then the server can be queried for metrics with pprof-compatible tools, like: diff --git a/client.go b/client.go index d3b2c93c..75e0d353 100644 --- a/client.go +++ b/client.go @@ -138,15 +138,15 @@ func newClient(p *program, nconn net.Conn) *client { func (c *client) close() { delete(c.p.clients, c) - atomic.AddInt64(&c.p.countClient, -1) + atomic.AddInt64(&c.p.countClients, -1) switch c.state { case clientStatePlay: - atomic.AddInt64(&c.p.countReader, -1) + atomic.AddInt64(&c.p.countReaders, -1) c.p.readersMap.remove(c) case clientStateRecord: - atomic.AddInt64(&c.p.countPublisher, -1) + atomic.AddInt64(&c.p.countPublishers, -1) if c.streamProtocol == gortsplib.StreamProtocolUDP { for _, track := range c.streamTracks { diff --git a/main.go b/main.go index 46eeecd8..c87f6d36 100644 --- a/main.go +++ b/main.go @@ -31,11 +31,10 @@ type program struct { clients map[*client]struct{} udpPublishersMap *udpPublishersMap readersMap *readersMap - countClient int64 - countPublisher int64 - countReader int64 + countClients int64 + countPublishers int64 + countReaders int64 - metricsGather chan metricsGatherReq clientNew chan net.Conn clientClose chan *client clientDescribe chan clientDescribeReq @@ -80,7 +79,6 @@ func newProgram(args []string, stdin io.Reader) (*program, error) { clients: make(map[*client]struct{}), udpPublishersMap: newUdpPublisherMap(), readersMap: newReadersMap(), - metricsGather: make(chan metricsGatherReq), clientNew: make(chan net.Conn), clientClose: make(chan *client), clientDescribe: make(chan clientDescribeReq), @@ -139,12 +137,12 @@ func newProgram(args []string, stdin io.Reader) (*program, error) { } func (p *program) log(format string, args ...interface{}) { - countClient := atomic.LoadInt64(&p.countClient) - countPublisher := atomic.LoadInt64(&p.countPublisher) - countReader := atomic.LoadInt64(&p.countReader) + countClients := atomic.LoadInt64(&p.countClients) + countPublishers := atomic.LoadInt64(&p.countPublishers) + countReaders := atomic.LoadInt64(&p.countReaders) - log.Printf(fmt.Sprintf("[%d/%d/%d] "+format, append([]interface{}{countClient, - countPublisher, countReader}, args...)...)) + log.Printf(fmt.Sprintf("[%d/%d/%d] "+format, append([]interface{}{countClients, + countPublishers, countReaders}, args...)...)) } func (p *program) run() { @@ -181,17 +179,10 @@ outer: path.onCheck() } - case req := <-p.metricsGather: - req.res <- &metricsData{ - countClient: p.countClient, - countPublisher: p.countPublisher, - countReader: p.countReader, - } - case conn := <-p.clientNew: c := newClient(p, conn) p.clients[c] = struct{}{} - atomic.AddInt64(&p.countClient, 1) + atomic.AddInt64(&p.countClients, 1) c.log("connected") case client := <-p.clientClose: @@ -245,12 +236,12 @@ outer: req.res <- nil case client := <-p.clientPlay: - atomic.AddInt64(&p.countReader, 1) + atomic.AddInt64(&p.countReaders, 1) client.state = clientStatePlay p.readersMap.add(client) case client := <-p.clientRecord: - atomic.AddInt64(&p.countPublisher, 1) + atomic.AddInt64(&p.countPublishers, 1) client.state = clientStateRecord if client.streamProtocol == gortsplib.StreamProtocolUDP { @@ -289,13 +280,11 @@ outer: go func() { for { select { - case req, ok := <-p.metricsGather: + case _, ok := <-p.clientNew: if !ok { return } - req.res <- nil - case <-p.clientNew: case <-p.clientClose: case <-p.clientDescribe: @@ -345,7 +334,6 @@ outer: p.logHandler.close() - close(p.metricsGather) close(p.clientNew) close(p.clientClose) close(p.clientDescribe) diff --git a/metrics.go b/metrics.go index 09ab92e5..be1c47fc 100644 --- a/metrics.go +++ b/metrics.go @@ -6,6 +6,7 @@ import ( "io" "net" "net/http" + "sync/atomic" "time" ) @@ -13,16 +14,6 @@ const ( metricsAddress = ":9998" ) -type metricsData struct { - countClient int64 - countPublisher int64 - countReader int64 -} - -type metricsGatherReq struct { - res chan *metricsData -} - type metrics struct { p *program listener net.Listener @@ -64,21 +55,19 @@ func (m *metrics) close() { } func (m *metrics) onMetrics(w http.ResponseWriter, req *http.Request) { - res := make(chan *metricsData) - m.p.metricsGather <- metricsGatherReq{res} - data := <-res - - if data == nil { - w.WriteHeader(http.StatusInternalServerError) - return - } - - out := "" now := time.Now().UnixNano() / 1000000 - out += fmt.Sprintf("clients %d %v\n", data.countClient, now) - out += fmt.Sprintf("publishers %d %v\n", data.countPublisher, now) - out += fmt.Sprintf("readers %d %v\n", data.countReader, now) + countClients := atomic.LoadInt64(&m.p.countClients) + countPublishers := atomic.LoadInt64(&m.p.countPublishers) + countReaders := atomic.LoadInt64(&m.p.countReaders) + + out := "" + out += fmt.Sprintf("rtsp_clients{state=\"idle\"} %d %v\n", + countClients-countPublishers-countReaders, now) + out += fmt.Sprintf("rtsp_clients{state=\"publishing\"} %d %v\n", + countPublishers, now) + out += fmt.Sprintf("rtsp_clients{state=\"reading\"} %d %v\n", + countReaders, now) w.WriteHeader(http.StatusOK) io.WriteString(w, out)