diff --git a/Makefile b/Makefile index 42e69cf5..bfc4e98f 100644 --- a/Makefile +++ b/Makefile @@ -68,6 +68,7 @@ define CONFIG_RUN #rtspPort: 8555 #rtpPort: 8002 #rtcpPort: 8003 +metrics: yes paths: all: diff --git a/README.md b/README.md index 9ea56853..4acc4368 100644 --- a/README.md +++ b/README.md @@ -188,14 +188,32 @@ systemctl enable rtsp-simple-server systemctl start rtsp-simple-server ``` -#### Client count +#### Monitoring -The current number of clients, publishers and readers is printed in each log line; for instance, the line: -``` -2020/01/01 00:00:00 [2/1/1] [client 127.0.0.1:44428] OPTION -``` +There are multiple ways to monitor the server usage over time: +* The current number of clients, publishers and readers is printed in each log line; for instance, the line: + ``` + 2020/01/01 00:00:00 [2/1/1] [client 127.0.0.1:44428] OPTION + ``` + means that there are 2 clients, 1 publisher and 1 receiver. -means that there are 2 clients, 1 publisher and 1 receiver. +* A metrics exporter, compatible with Prometheus, can be enabled with the option `metrics: yes`; then the server can be queried for metrics with Prometheus or with a simple HTTP request: + ``` + wget -qO- localhost:9998 + ``` + Obtaining: + ``` + clients 23 1596122687740 + publishers 15 1596122687740 + readers 8 1596122687740 + ``` + +* A performance monitor, compatible with pprof, can be enabled with the option `pprof: yes`; then the server can be queried for metrics with pprof-compatible tools, like: + ``` + docker run --rm -it --network=host golang:1.14-alpine3.12 go tool pprof -text http://localhost:9999/debug/pprof/goroutine + docker run --rm -it --network=host golang:1.14-alpine3.12 go tool pprof -text http://localhost:9999/debug/pprof/heap + docker run --rm -it --network=host golang:1.14-alpine3.12 go tool pprof -text http://localhost:9999/debug/pprof/profile?seconds=30 + ``` #### Full command-line usage diff --git a/client.go b/client.go index f55f98d5..ba51c713 100644 --- a/client.go +++ b/client.go @@ -932,7 +932,7 @@ func (c *client) runRecord(path string) { case <-receiverReportTicker.C: for trackId := range c.streamTracks { frame := c.rtcpReceivers[trackId].Report() - c.p.rtcpl.writeChan <- &udpAddrBufPair{ + c.p.serverRtcp.writeChan <- &udpAddrBufPair{ addr: &net.UDPAddr{ IP: c.ip(), Zone: c.zone(), diff --git a/conf.go b/conf.go index ad5398cb..c13c7ed2 100644 --- a/conf.go +++ b/conf.go @@ -42,6 +42,7 @@ type conf struct { WriteTimeout time.Duration `yaml:"writeTimeout"` AuthMethods []string `yaml:"authMethods"` authMethodsParsed []gortsplib.AuthMethod `` + Metrics bool `yaml:"metrics"` Pprof bool `yaml:"pprof"` Paths map[string]*confPath `yaml:"paths"` } diff --git a/go.mod b/go.mod index bc3e2a7e..206e19d7 100644 --- a/go.mod +++ b/go.mod @@ -5,9 +5,10 @@ go 1.12 require ( github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 // indirect github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d // indirect - github.com/aler9/gortsplib v0.0.0-20200728150440-efac0eb60921 + github.com/aler9/gortsplib v0.0.0-20200730132448-3b14ef755b05 github.com/aler9/sdp/v3 v3.0.0-20200719093237-2c3d108a7436 + github.com/davecgh/go-spew v1.1.1 // indirect github.com/stretchr/testify v1.6.1 gopkg.in/alecthomas/kingpin.v2 v2.2.6 - gopkg.in/yaml.v2 v2.2.2 + gopkg.in/yaml.v2 v2.2.8 ) diff --git a/go.sum b/go.sum index 91318072..0423c21b 100644 --- a/go.sum +++ b/go.sum @@ -2,12 +2,14 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 h1:JYp7IbQjafo github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d h1:UQZhZ2O0vMHr2cI+DC1Mbh0TJxzA3RcLoMsFw+aXw7E= github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho= -github.com/aler9/gortsplib v0.0.0-20200728150440-efac0eb60921 h1:GkF8VqCVRDK1jlOwnorAh2uLEhzZoqNJ7et/uYBkHI4= -github.com/aler9/gortsplib v0.0.0-20200728150440-efac0eb60921/go.mod h1:kBMvjIdOHRjLdV+oT28JD72JUPpJuwxOc9u72GG8GpY= +github.com/aler9/gortsplib v0.0.0-20200730132448-3b14ef755b05 h1:DWxXmtYvLYt3FNAZYKENS8NmxtkpBZJIbtZyOZ8Xu+c= +github.com/aler9/gortsplib v0.0.0-20200730132448-3b14ef755b05/go.mod h1:kBMvjIdOHRjLdV+oT28JD72JUPpJuwxOc9u72GG8GpY= github.com/aler9/sdp/v3 v3.0.0-20200719093237-2c3d108a7436 h1:W0iNErWKvSAyJBNVx+qQoyFrWOFVgS6f/WEME/D3EZc= github.com/aler9/sdp/v3 v3.0.0-20200719093237-2c3d108a7436/go.mod h1:OnlEK3QI7YtM+ShZWtGajmOHLZ3bjU80AcIS5e34i1U= github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/pion/randutil v0.1.0 h1:CFG1UdESneORglEsnimhUjf33Rwjubwj6xfiOXBa3mA= github.com/pion/randutil v0.1.0/go.mod h1:XcJrSMMbbMRhASFVOlj/5hQial/Y8oH/HVo7TBZq+j8= github.com/pion/rtcp v1.2.3 h1:2wrhKnqgSz91Q5nzYTO07mQXztYPtxL8a0XOss4rJqA= @@ -27,5 +29,7 @@ gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+ gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10= +gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/main.go b/main.go index 45780c37..9de9bde2 100644 --- a/main.go +++ b/main.go @@ -17,10 +17,20 @@ import ( var Version = "v0.0.0" +const ( + pprofAddress = ":9999" +) + type programEvent interface { isProgramEvent() } +type programEventMetrics struct { + res chan *metricsData +} + +func (programEventMetrics) isProgramEvent() {} + type programEventClientNew struct { nconn net.Conn } @@ -151,9 +161,10 @@ func (programEventTerminate) isProgramEvent() {} type program struct { conf *conf - rtspl *serverTcp - rtpl *serverUdp - rtcpl *serverUdp + metrics *metrics + serverRtsp *serverTcp + serverRtp *serverUdp + serverRtcp *serverUdp sources []*source clients map[*client]struct{} paths map[string]*path @@ -196,7 +207,7 @@ func newProgram(args []string, stdin io.Reader) (*program, error) { continue } - newPath(p, path, confp, true) + p.paths[path] = newPath(p, path, confp, true) if confp.Source != "record" { s := newSource(p, path, confp) @@ -207,36 +218,42 @@ func newProgram(args []string, stdin io.Reader) (*program, error) { p.log("rtsp-simple-server %s", Version) + if conf.Metrics { + p.metrics = newMetrics(p) + } + if conf.Pprof { go func(mux *http.ServeMux) { - server := &http.Server{ - Addr: ":9999", + p.log("[pprof] opened on " + pprofAddress) + panic((&http.Server{ + Addr: pprofAddress, Handler: mux, - } - p.log("pprof is available on :9999") - panic(server.ListenAndServe()) + }).ListenAndServe()) }(http.DefaultServeMux) http.DefaultServeMux = http.NewServeMux() } - p.rtpl, err = newServerUdp(p, conf.RtpPort, gortsplib.StreamTypeRtp) + p.serverRtp, err = newServerUdp(p, conf.RtpPort, gortsplib.StreamTypeRtp) if err != nil { return nil, err } - p.rtcpl, err = newServerUdp(p, conf.RtcpPort, gortsplib.StreamTypeRtcp) + p.serverRtcp, err = newServerUdp(p, conf.RtcpPort, gortsplib.StreamTypeRtcp) if err != nil { return nil, err } - p.rtspl, err = newServerTcp(p) + p.serverRtsp, err = newServerTcp(p) if err != nil { return nil, err } - go p.rtpl.run() - go p.rtcpl.run() - go p.rtspl.run() + if p.metrics != nil { + go p.metrics.run() + } + go p.serverRtp.run() + go p.serverRtcp.run() + go p.serverRtsp.run() for _, s := range p.sources { go s.run() } @@ -264,6 +281,13 @@ outer: case rawEvt := <-p.events: switch evt := rawEvt.(type) { + case programEventMetrics: + evt.res <- &metricsData{ + clientCount: len(p.clients), + publisherCount: p.publisherCount, + readerCount: p.readerCount, + } + case programEventClientNew: c := newClient(p, evt.nconn) p.clients[c] = struct{}{} @@ -304,7 +328,7 @@ outer: } } else { - newPath(p, evt.path, p.findConfForPath(evt.path), false) + p.paths[evt.path] = newPath(p, evt.path, p.findConfForPath(evt.path), false) } p.paths[evt.path].publisher = evt.client @@ -413,6 +437,9 @@ outer: go func() { for rawEvt := range p.events { switch evt := rawEvt.(type) { + case programEventMetrics: + evt.res <- nil + case programEventClientClose: close(evt.done) @@ -451,15 +478,19 @@ outer: <-s.done } - p.rtspl.close() - p.rtcpl.close() - p.rtpl.close() + p.serverRtsp.close() + p.serverRtcp.close() + p.serverRtp.close() for c := range p.clients { c.conn.NetConn().Close() <-c.done } + if p.metrics != nil { + p.metrics.close() + } + close(p.events) close(p.done) } @@ -514,7 +545,7 @@ func (p *program) forwardFrame(path string, trackId int, streamType gortsplib.St if c.pathId == path && c.state == clientStatePlay { if c.streamProtocol == gortsplib.StreamProtocolUdp { if streamType == gortsplib.StreamTypeRtp { - p.rtpl.write(&udpAddrBufPair{ + p.serverRtp.write(&udpAddrBufPair{ addr: &net.UDPAddr{ IP: c.ip(), Zone: c.zone(), @@ -523,7 +554,7 @@ func (p *program) forwardFrame(path string, trackId int, streamType gortsplib.St buf: frame, }) } else { - p.rtcpl.write(&udpAddrBufPair{ + p.serverRtcp.write(&udpAddrBufPair{ addr: &net.UDPAddr{ IP: c.ip(), Zone: c.zone(), diff --git a/metrics.go b/metrics.go new file mode 100644 index 00000000..a57bed5b --- /dev/null +++ b/metrics.go @@ -0,0 +1,77 @@ +package main + +import ( + "context" + "fmt" + "io" + "net/http" + "time" +) + +const ( + metricsAddress = ":9998" +) + +type metricsData struct { + clientCount int + publisherCount int + readerCount int +} + +type metrics struct { + p *program + mux *http.ServeMux + server *http.Server +} + +func newMetrics(p *program) *metrics { + m := &metrics{ + p: p, + } + + m.mux = http.NewServeMux() + m.mux.HandleFunc("/metrics", m.onMetrics) + m.server = &http.Server{ + Addr: metricsAddress, + Handler: m.mux, + } + m.log("opened on " + metricsAddress) + + return m +} + +func (m *metrics) log(format string, args ...interface{}) { + m.p.log("[metrics] "+format, args...) +} + +func (m *metrics) run() { + err := m.server.ListenAndServe() + if err != http.ErrServerClosed { + panic(err) + } +} + +func (m *metrics) close() { + m.server.Shutdown(context.Background()) +} + +func (m *metrics) onMetrics(w http.ResponseWriter, req *http.Request) { + res := make(chan *metricsData) + m.p.events <- programEventMetrics{res} + data := <-res + + if data == nil { + w.WriteHeader(http.StatusInternalServerError) + return + } + + out := "" + now := time.Now().UnixNano() / 1000000 + + out += fmt.Sprintf("clients %d %v\n", data.clientCount, now) + out += fmt.Sprintf("publishers %d %v\n", data.publisherCount, now) + out += fmt.Sprintf("readers %d %v\n", data.readerCount, now) + + w.WriteHeader(http.StatusOK) + io.WriteString(w, out) +} diff --git a/path.go b/path.go index 4ac50917..9e2d277e 100644 --- a/path.go +++ b/path.go @@ -28,7 +28,7 @@ type path struct { onDemandCmd *exec.Cmd } -func newPath(p *program, id string, confp *confPath, permanent bool) { +func newPath(p *program, id string, confp *confPath, permanent bool) *path { pa := &path{ p: p, id: id, @@ -36,7 +36,7 @@ func newPath(p *program, id string, confp *confPath, permanent bool) { permanent: permanent, } - p.paths[id] = pa + return pa } func (pa *path) check() { diff --git a/rtsp-simple-server.yml b/rtsp-simple-server.yml index 6d53b9f6..fd587330 100644 --- a/rtsp-simple-server.yml +++ b/rtsp-simple-server.yml @@ -17,6 +17,8 @@ writeTimeout: 5s # supported authentication methods # WARNING: both methods are insecure, use RTSP inside a VPN to enforce security. authMethods: [basic, digest] +# enable Prometheus-compatible metrics on port 9998 +metrics: false # enable pprof on port 9999 to monitor performances pprof: false diff --git a/source.go b/source.go index 9aa7e7d0..1c3f3e64 100644 --- a/source.go +++ b/source.go @@ -200,14 +200,14 @@ func (s *source) doInner(terminate chan struct{}) bool { func (s *source) runUdp(terminate chan struct{}, conn *gortsplib.ConnClient) bool { type trackListenerPair struct { - rtpl *gortsplib.ConnClientUdpListener - rtcpl *gortsplib.ConnClientUdpListener + serverRtp *gortsplib.ConnClientUdpListener + serverRtcp *gortsplib.ConnClientUdpListener } var listeners []*trackListenerPair for _, track := range s.tracks { - var rtpl *gortsplib.ConnClientUdpListener - var rtcpl *gortsplib.ConnClientUdpListener + var serverRtp *gortsplib.ConnClientUdpListener + var serverRtcp *gortsplib.ConnClientUdpListener var err error for { @@ -216,7 +216,7 @@ func (s *source) runUdp(terminate chan struct{}, conn *gortsplib.ConnClient) boo rtpPort := (rand.Intn((65535-10000)/2) * 2) + 10000 rtcpPort := rtpPort + 1 - rtpl, rtcpl, _, err = conn.SetupUdp(s.confp.sourceUrl, track, rtpPort, rtcpPort) + serverRtp, serverRtcp, _, err = conn.SetupUdp(s.confp.sourceUrl, track, rtpPort, rtcpPort) if err != nil { // retry if it's a bind error if nerr, ok := err.(*net.OpError); ok { @@ -235,8 +235,8 @@ func (s *source) runUdp(terminate chan struct{}, conn *gortsplib.ConnClient) boo } listeners = append(listeners, &trackListenerPair{ - rtpl: rtpl, - rtcpl: rtcpl, + serverRtp: serverRtp, + serverRtcp: serverRtcp, }) } @@ -268,7 +268,7 @@ func (s *source) runUdp(terminate chan struct{}, conn *gortsplib.ConnClient) boo s.p.events <- programEventSourceFrame{s, trackId, gortsplib.StreamTypeRtp, buf[:n]} } - }(trackId, lp.rtpl) + }(trackId, lp.serverRtp) // receive RTCP packets go func(trackId int, l *gortsplib.ConnClientUdpListener) { @@ -285,7 +285,7 @@ func (s *source) runUdp(terminate chan struct{}, conn *gortsplib.ConnClient) boo s.p.events <- programEventSourceFrame{s, trackId, gortsplib.StreamTypeRtcp, buf[:n]} } - }(trackId, lp.rtcpl) + }(trackId, lp.serverRtcp) } tcpConnDone := make(chan error) @@ -314,8 +314,8 @@ outer: s.p.events <- programEventSourceNotReady{s} for _, lp := range listeners { - lp.rtpl.Close() - lp.rtcpl.Close() + lp.serverRtp.Close() + lp.serverRtcp.Close() } wg.Wait()