From a374ba748dc1400bb6f64086d99e8c5d686ab502 Mon Sep 17 00:00:00 2001 From: aler9 <46489434+aler9@users.noreply.github.com> Date: Sat, 3 Oct 2020 21:10:41 +0200 Subject: [PATCH] support RTMP sources (#88) --- README.md | 8 +- client.go | 16 +- conf.go | 26 ++- go.mod | 3 +- go.sum | 6 +- main.go | 134 +++++++----- main_test.go | 72 ++++-- metrics.go | 18 +- path.go | 96 +++++--- rtsp-simple-server.yml | 7 +- sourcertmp.go | 361 +++++++++++++++++++++++++++++++ proxy.go => sourcertsp.go | 142 ++++++------ testimages/nginx-rtmp/Dockerfile | 8 + testimages/nginx-rtmp/nginx.conf | 23 ++ utils.go | 4 +- 15 files changed, 719 insertions(+), 205 deletions(-) create mode 100644 sourcertmp.go rename proxy.go => sourcertsp.go (58%) create mode 100644 testimages/nginx-rtmp/Dockerfile create mode 100644 testimages/nginx-rtmp/nginx.conf diff --git a/README.md b/README.md index d3fb3c88..9248192c 100644 --- a/README.md +++ b/README.md @@ -11,7 +11,7 @@ Features: * Read and publish live streams with UDP and TCP * Each stream can have multiple video and audio tracks, encoded in any format (including H264, H265, VP8, MP3, AAC, Opus, PCM) * Publish multiple streams at once, each in a separate path, that can be read by multiple users -* Pull and serve streams from other RTSP servers, always or on-demand (RTSP proxy) +* Pull and serve streams from other RTSP or RTMP servers, always or on-demand (RTSP proxy) * Provide separate authentication for reading and publishing * Run custom commands when clients connect, disconnect, read or publish streams * Compatible with Linux, Windows and Mac, does not require any dependency or interpreter, it's a single executable @@ -216,8 +216,10 @@ There are multiple ways to monitor the server usage over time: * `rtsp_clients{state="idle"}` is the count of clients that are neither publishing nor reading * `rtsp_clients{state="publishing"}` is the count of clients that are publishing * `rtsp_clients{state="reading"}` is the count of clients that are reading - * `rtsp_proxies{state="idle"}` is the count of proxy sources that are not running - * `rtsp_proxies{state="running"}` is the count of proxy sources that are running + * `rtsp_sources{type="rtsp",state="idle"}` is the count of rtsp sources that are not running + * `rtsp_sources{type="rtsp",state="running"}` is the count of rtsp sources that are running + * `rtsp_sources{type="rtmp",state="idle"}` is the count of rtmp sources that are not running + * `rtsp_sources{type="rtmp",state="running"}` is the count of rtmp sources that are running * A performance monitor, compatible with pprof, can be enabled with the option `pprof: yes`; then the server can be queried for metrics with pprof-compatible tools, like: ``` diff --git a/client.go b/client.go index a0ca4f0c..62bd51c8 100644 --- a/client.go +++ b/client.go @@ -878,7 +878,7 @@ func (c *client) runPlay() bool { // start sending frames only after sending the response to the PLAY request c.p.clientPlay <- c - c.log("is receiving on path '%s', %d %s with %s", c.path.name, len(c.streamTracks), func() string { + c.log("is reading from path '%s', %d %s with %s", c.path.name, len(c.streamTracks), func() string { if len(c.streamTracks) == 1 { return "track" } @@ -949,7 +949,7 @@ func (c *client) runPlayTCP() { readDone := make(chan error) go func() { for { - recv, err := c.conn.ReadFrameOrRequest(false) + recv, err := c.conn.ReadFrameTCPOrRequest(false) if err != nil { readDone <- err break @@ -1022,7 +1022,7 @@ func (c *client) runRecord() bool { c.p.clientRecord <- c - c.log("is publishing on path '%s', %d %s with %s", c.path.name, len(c.streamTracks), func() string { + c.log("is publishing to path '%s', %d %s with %s", c.path.name, len(c.streamTracks), func() string { if len(c.streamTracks) == 1 { return "track" } @@ -1052,9 +1052,9 @@ func (c *client) runRecord() bool { } func (c *client) runRecordUDP() { - // open the firewall by sending packets to every channel + // open the firewall by sending packets to the counterpart for _, track := range c.streamTracks { - c.p.serverRtp.write( + c.p.serverUdpRtp.write( []byte{0x80, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}, &net.UDPAddr{ IP: c.ip(), @@ -1062,7 +1062,7 @@ func (c *client) runRecordUDP() { Port: track.rtpPort, }) - c.p.serverRtcp.write( + c.p.serverUdpRtcp.write( []byte{0x80, 0xc9, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00}, &net.UDPAddr{ IP: c.ip(), @@ -1124,7 +1124,7 @@ func (c *client) runRecordUDP() { case <-receiverReportTicker.C: for trackId := range c.streamTracks { frame := c.rtcpReceivers[trackId].Report() - c.p.serverRtcp.write(frame, &net.UDPAddr{ + c.p.serverUdpRtcp.write(frame, &net.UDPAddr{ IP: c.ip(), Zone: c.zone(), Port: c.streamTracks[trackId].rtcpPort, @@ -1146,7 +1146,7 @@ func (c *client) runRecordTCP() { readDone := make(chan error) go func() { for { - recv, err := c.conn.ReadFrameOrRequest(true) + recv, err := c.conn.ReadFrameTCPOrRequest(true) if err != nil { readDone <- err break diff --git a/conf.go b/conf.go index 3baa408a..68bf1e5c 100644 --- a/conf.go +++ b/conf.go @@ -6,6 +6,7 @@ import ( "net/url" "os" "regexp" + "strings" "time" "github.com/aler9/gortsplib" @@ -214,17 +215,14 @@ func loadConf(fpath string, stdin io.Reader) (*conf, error) { pconf.Source = "record" } - if pconf.Source != "record" { + if strings.HasPrefix(pconf.Source, "rtsp://") { if pconf.regexp != nil { return nil, fmt.Errorf("a path with a regular expression (or path 'all') cannot have a RTSP source; use another path") } pconf.sourceUrl, err = url.Parse(pconf.Source) if err != nil { - return nil, fmt.Errorf("'%s' is not a valid RTSP url", pconf.Source) - } - if pconf.sourceUrl.Scheme != "rtsp" { - return nil, fmt.Errorf("'%s' is not a valid RTSP url", pconf.Source) + return nil, fmt.Errorf("'%s' is not a valid url", pconf.Source) } if pconf.sourceUrl.Port() == "" { pconf.sourceUrl.Host += ":554" @@ -251,6 +249,24 @@ func loadConf(fpath string, stdin io.Reader) (*conf, error) { default: return nil, fmt.Errorf("unsupported protocol '%s'", pconf.SourceProtocol) } + + } else if strings.HasPrefix(pconf.Source, "rtmp://") { + if pconf.regexp != nil { + return nil, fmt.Errorf("a path with a regular expression (or path 'all') cannot have a RTMP source; use another path") + } + + pconf.sourceUrl, err = url.Parse(pconf.Source) + if err != nil { + return nil, fmt.Errorf("'%s' is not a valid url", pconf.Source) + } + if pconf.sourceUrl.Port() == "" { + pconf.sourceUrl.Host += ":1935" + } + + } else if pconf.Source == "record" { + + } else { + return nil, fmt.Errorf("unsupported source: '%s'", pconf.Source) } if pconf.PublishUser != "" { diff --git a/go.mod b/go.mod index aace4b38..252e20a3 100644 --- a/go.mod +++ b/go.mod @@ -5,8 +5,9 @@ go 1.12 require ( github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 // indirect github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d // indirect - github.com/aler9/gortsplib v0.0.0-20200927133547-0a70915c8c0a + github.com/aler9/gortsplib v0.0.0-20201003114231-8660eaf8974f github.com/davecgh/go-spew v1.1.1 // indirect + github.com/notedit/rtmp v0.0.2 github.com/stretchr/testify v1.6.1 gopkg.in/alecthomas/kingpin.v2 v2.2.6 gopkg.in/yaml.v2 v2.2.8 diff --git a/go.sum b/go.sum index 17f30503..27d3c7af 100644 --- a/go.sum +++ b/go.sum @@ -2,14 +2,16 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 h1:JYp7IbQjafo github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d h1:UQZhZ2O0vMHr2cI+DC1Mbh0TJxzA3RcLoMsFw+aXw7E= github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho= -github.com/aler9/gortsplib v0.0.0-20200927133547-0a70915c8c0a h1:s69UQblZtmygP9uYMS3ZOjQjsNhRExk+EFuJMT1S3h8= -github.com/aler9/gortsplib v0.0.0-20200927133547-0a70915c8c0a/go.mod h1:QjIqmMY0GHQ2Wan9jIS1amsNn0ncDnufvWD/r5IkDQI= +github.com/aler9/gortsplib v0.0.0-20201003114231-8660eaf8974f h1:orjd0t6Aa7rySwueDyDFlQF89xIQCcZR1cHkCiYM2IA= +github.com/aler9/gortsplib v0.0.0-20201003114231-8660eaf8974f/go.mod h1:QjIqmMY0GHQ2Wan9jIS1amsNn0ncDnufvWD/r5IkDQI= github.com/aler9/sdp-dirty/v3 v3.0.0-20200919115950-f1abc664f625 h1:A3upkpYzceQTuBPvVleu1zd6R8jInhg5ifimSO7ku/o= github.com/aler9/sdp-dirty/v3 v3.0.0-20200919115950-f1abc664f625/go.mod h1:5bO/aUQr9m3OasDatNNcVqKAgs7r5hgGXmszWHaC6mI= github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/notedit/rtmp v0.0.2 h1:5+to4yezKATiJgnrcETu9LbV5G/QsWkOV9Ts2M/p33w= +github.com/notedit/rtmp v0.0.2/go.mod h1:vzuE21rowz+lT1NGsWbreIvYulgBpCGnQyeTyFblUHc= github.com/pion/randutil v0.0.0/go.mod h1:XcJrSMMbbMRhASFVOlj/5hQial/Y8oH/HVo7TBZq+j8= github.com/pion/randutil v0.1.0 h1:CFG1UdESneORglEsnimhUjf33Rwjubwj6xfiOXBa3mA= github.com/pion/randutil v0.1.0/go.mod h1:XcJrSMMbbMRhASFVOlj/5hQial/Y8oH/HVo7TBZq+j8= diff --git a/main.go b/main.go index 1cc7639c..b871f605 100644 --- a/main.go +++ b/main.go @@ -25,31 +25,35 @@ type program struct { metrics *metrics pprof *pprof paths map[string]*path - serverRtp *serverUDP - serverRtcp *serverUDP - serverRtsp *serverTCP + serverUdpRtp *serverUDP + serverUdpRtcp *serverUDP + serverTcp *serverTCP clients map[*client]struct{} udpPublishersMap *udpPublishersMap readersMap *readersMap // use pointers to avoid a crash on 32bit platforms // https://github.com/golang/go/issues/9959 - countClients *int64 - countPublishers *int64 - countReaders *int64 - countProxies *int64 - countProxiesRunning *int64 + countClients *int64 + countPublishers *int64 + countReaders *int64 + countSourcesRtsp *int64 + countSourcesRtspRunning *int64 + countSourcesRtmp *int64 + countSourcesRtmpRunning *int64 - clientNew chan net.Conn - clientClose chan *client - clientDescribe chan clientDescribeReq - clientAnnounce chan clientAnnounceReq - clientSetupPlay chan clientSetupPlayReq - clientPlay chan *client - clientRecord chan *client - proxyReady chan *proxy - proxyNotReady chan *proxy - terminate chan struct{} - done chan struct{} + clientNew chan net.Conn + clientClose chan *client + clientDescribe chan clientDescribeReq + clientAnnounce chan clientAnnounceReq + clientSetupPlay chan clientSetupPlayReq + clientPlay chan *client + clientRecord chan *client + sourceRtspReady chan *sourceRtsp + sourceRtspNotReady chan *sourceRtsp + sourceRtmpReady chan *sourceRtmp + sourceRtmpNotReady chan *sourceRtmp + terminate chan struct{} + done chan struct{} } func newProgram(args []string, stdin io.Reader) (*program, error) { @@ -95,25 +99,35 @@ func newProgram(args []string, stdin io.Reader) (*program, error) { v := int64(0) return &v }(), - countProxies: func() *int64 { + countSourcesRtsp: func() *int64 { v := int64(0) return &v }(), - countProxiesRunning: func() *int64 { + countSourcesRtspRunning: func() *int64 { v := int64(0) return &v }(), - clientNew: make(chan net.Conn), - clientClose: make(chan *client), - clientDescribe: make(chan clientDescribeReq), - clientAnnounce: make(chan clientAnnounceReq), - clientSetupPlay: make(chan clientSetupPlayReq), - clientPlay: make(chan *client), - clientRecord: make(chan *client), - proxyReady: make(chan *proxy), - proxyNotReady: make(chan *proxy), - terminate: make(chan struct{}), - done: make(chan struct{}), + countSourcesRtmp: func() *int64 { + v := int64(0) + return &v + }(), + countSourcesRtmpRunning: func() *int64 { + v := int64(0) + return &v + }(), + clientNew: make(chan net.Conn), + clientClose: make(chan *client), + clientDescribe: make(chan clientDescribeReq), + clientAnnounce: make(chan clientAnnounceReq), + clientSetupPlay: make(chan clientSetupPlayReq), + clientPlay: make(chan *client), + clientRecord: make(chan *client), + sourceRtspReady: make(chan *sourceRtsp), + sourceRtspNotReady: make(chan *sourceRtsp), + sourceRtmpReady: make(chan *sourceRtmp), + sourceRtmpNotReady: make(chan *sourceRtmp), + terminate: make(chan struct{}), + done: make(chan struct{}), } p.log("rtsp-simple-server %s", Version) @@ -139,18 +153,18 @@ func newProgram(args []string, stdin io.Reader) (*program, error) { } if _, ok := conf.protocolsParsed[gortsplib.StreamProtocolUDP]; ok { - p.serverRtp, err = newServerUDP(p, conf.RtpPort, gortsplib.StreamTypeRtp) + p.serverUdpRtp, err = newServerUDP(p, conf.RtpPort, gortsplib.StreamTypeRtp) if err != nil { return nil, err } - p.serverRtcp, err = newServerUDP(p, conf.RtcpPort, gortsplib.StreamTypeRtcp) + p.serverUdpRtcp, err = newServerUDP(p, conf.RtcpPort, gortsplib.StreamTypeRtcp) if err != nil { return nil, err } } - p.serverRtsp, err = newServerTCP(p) + p.serverTcp, err = newServerTCP(p) if err != nil { return nil, err } @@ -178,15 +192,15 @@ func (p *program) run() { go p.pprof.run() } - if p.serverRtp != nil { - go p.serverRtp.run() + if p.serverUdpRtp != nil { + go p.serverUdpRtp.run() } - if p.serverRtcp != nil { - go p.serverRtcp.run() + if p.serverUdpRtcp != nil { + go p.serverUdpRtcp.run() } - go p.serverRtsp.run() + go p.serverTcp.run() for _, p := range p.paths { p.onInit() @@ -216,7 +230,7 @@ outer: client.close() case req := <-p.clientDescribe: - // create path if not exist + // create path if it doesn't exist if _, ok := p.paths[req.pathName]; !ok { p.paths[req.pathName] = newPath(p, req.pathName, req.pathConf) } @@ -224,7 +238,7 @@ outer: p.paths[req.pathName].onDescribe(req.client) case req := <-p.clientAnnounce: - // create path if not exist + // create path if it doesn't exist if path, ok := p.paths[req.pathName]; !ok { p.paths[req.pathName] = newPath(p, req.pathName, req.pathConf) @@ -288,13 +302,17 @@ outer: client.path.onPublisherSetReady() - case proxy := <-p.proxyReady: - proxy.path.log("proxy ready") - proxy.path.onPublisherSetReady() + case s := <-p.sourceRtspReady: + s.path.onPublisherSetReady() - case proxy := <-p.proxyNotReady: - proxy.path.log("proxy not ready") - proxy.path.onPublisherSetNotReady() + case s := <-p.sourceRtspNotReady: + s.path.onPublisherSetNotReady() + + case s := <-p.sourceRtmpReady: + s.path.onPublisherSetReady() + + case s := <-p.sourceRtmpNotReady: + s.path.onPublisherSetNotReady() case <-p.terminate: break outer @@ -320,8 +338,10 @@ outer: case <-p.clientPlay: case <-p.clientRecord: - case <-p.proxyReady: - case <-p.proxyNotReady: + case <-p.sourceRtspReady: + case <-p.sourceRtspNotReady: + case <-p.sourceRtmpReady: + case <-p.sourceRtmpNotReady: } } }() @@ -333,14 +353,14 @@ outer: p.onClose(true) } - p.serverRtsp.close() + p.serverTcp.close() - if p.serverRtcp != nil { - p.serverRtcp.close() + if p.serverUdpRtcp != nil { + p.serverUdpRtcp.close() } - if p.serverRtp != nil { - p.serverRtp.close() + if p.serverUdpRtp != nil { + p.serverUdpRtp.close() } for c := range p.clients { @@ -365,8 +385,8 @@ outer: close(p.clientSetupPlay) close(p.clientPlay) close(p.clientRecord) - close(p.proxyReady) - close(p.proxyNotReady) + close(p.sourceRtspReady) + close(p.sourceRtspNotReady) close(p.done) } diff --git a/main_test.go b/main_test.go index 713904db..df583ddb 100644 --- a/main_test.go +++ b/main_test.go @@ -86,11 +86,17 @@ func (c *container) close() { func (c *container) wait() int { exec.Command("docker", "wait", "rtsp-simple-server-test-"+c.name).Run() out, _ := exec.Command("docker", "inspect", "rtsp-simple-server-test-"+c.name, - "--format={{.State.ExitCode}}").Output() + "-f", "{{.State.ExitCode}}").Output() code, _ := strconv.ParseInt(string(out[:len(out)-1]), 10, 64) return int(code) } +func (c *container) ip() string { + out, _ := exec.Command("docker", "inspect", "rtsp-simple-server-test-"+c.name, + "-f", "{{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}").Output() + return string(out[:len(out)-1]) +} + func TestPublish(t *testing.T) { for _, conf := range []struct { publishSoft string @@ -214,8 +220,7 @@ func TestRead(t *testing.T) { } func TestTCPOnly(t *testing.T) { - stdin := []byte("\n" + - "protocols: [tcp]\n") + stdin := []byte("protocols: [tcp]\n") p, err := newProgram([]string{"stdin"}, bytes.NewBuffer(stdin)) require.NoError(t, err) defer p.close() @@ -316,8 +321,7 @@ func TestPathWithQuery(t *testing.T) { func TestAuth(t *testing.T) { t.Run("publish", func(t *testing.T) { - stdin := []byte("\n" + - "paths:\n" + + stdin := []byte("paths:\n" + " all:\n" + " publishUser: testuser\n" + " publishPass: testpass\n" + @@ -361,8 +365,7 @@ func TestAuth(t *testing.T) { "vlc", } { t.Run("read_"+soft, func(t *testing.T) { - stdin := []byte("\n" + - "paths:\n" + + stdin := []byte("paths:\n" + " all:\n" + " readUser: testuser\n" + " readPass: testpass\n" + @@ -414,14 +417,13 @@ func TestAuth(t *testing.T) { } } -func TestProxy(t *testing.T) { +func TestSourceRtsp(t *testing.T) { for _, proto := range []string{ "udp", "tcp", } { t.Run(proto, func(t *testing.T) { - stdin := []byte("\n" + - "paths:\n" + + stdin := []byte("paths:\n" + " all:\n" + " readUser: testuser\n" + " readPass: testpass\n") @@ -445,8 +447,7 @@ func TestProxy(t *testing.T) { time.Sleep(1 * time.Second) - stdin = []byte("\n" + - "rtspPort: 8555\n" + + stdin = []byte("rtspPort: 8555\n" + "rtpPort: 8100\n" + "rtcpPort: 8101\n" + "\n" + @@ -477,9 +478,52 @@ func TestProxy(t *testing.T) { } } +func TestSourceRtmp(t *testing.T) { + cnt1, err := newContainer("nginx-rtmp", "rtmpserver", []string{}) + require.NoError(t, err) + defer cnt1.close() + + time.Sleep(1 * time.Second) + + cnt2, err := newContainer("ffmpeg", "source", []string{ + "-re", + "-stream_loop", "-1", + "-i", "/emptyvideo.ts", + "-c", "copy", + "-f", "flv", + "rtmp://" + cnt1.ip() + "/stream/test", + }) + require.NoError(t, err) + defer cnt2.close() + + time.Sleep(1 * time.Second) + + stdin := []byte("paths:\n" + + " proxied:\n" + + " source: rtmp://" + cnt1.ip() + "/stream/test\n" + + " sourceOnDemand: yes\n") + p, err := newProgram([]string{"stdin"}, bytes.NewBuffer(stdin)) + require.NoError(t, err) + defer p.close() + + time.Sleep(1 * time.Second) + + cnt3, err := newContainer("ffmpeg", "dest", []string{ + "-rtsp_transport", "udp", + "-i", "rtsp://" + ownDockerIp + ":8554/proxied", + "-vframes", "1", + "-f", "image2", + "-y", "/dev/null", + }) + require.NoError(t, err) + defer cnt3.close() + + code := cnt3.wait() + require.Equal(t, 0, code) +} + func TestRunOnDemand(t *testing.T) { - stdin := []byte("\n" + - "paths:\n" + + stdin := []byte("paths:\n" + " all:\n" + " runOnDemand: ffmpeg -hide_banner -loglevel error -re -i testimages/ffmpeg/emptyvideo.ts -c copy -f rtsp rtsp://localhost:8554/$RTSP_SERVER_PATH\n") p1, err := newProgram([]string{"stdin"}, bytes.NewBuffer(stdin)) diff --git a/metrics.go b/metrics.go index 83e64a27..467301b9 100644 --- a/metrics.go +++ b/metrics.go @@ -60,8 +60,10 @@ func (m *metrics) onMetrics(w http.ResponseWriter, req *http.Request) { countClients := atomic.LoadInt64(m.p.countClients) countPublishers := atomic.LoadInt64(m.p.countPublishers) countReaders := atomic.LoadInt64(m.p.countReaders) - countProxies := atomic.LoadInt64(m.p.countProxies) - countProxiesRunning := atomic.LoadInt64(m.p.countProxiesRunning) + countSourcesRtsp := atomic.LoadInt64(m.p.countSourcesRtsp) + countSourcesRtspRunning := atomic.LoadInt64(m.p.countSourcesRtspRunning) + countSourcesRtmp := atomic.LoadInt64(m.p.countSourcesRtmp) + countSourcesRtmpRunning := atomic.LoadInt64(m.p.countSourcesRtmpRunning) out := "" out += fmt.Sprintf("rtsp_clients{state=\"idle\"} %d %v\n", @@ -70,10 +72,14 @@ func (m *metrics) onMetrics(w http.ResponseWriter, req *http.Request) { countPublishers, now) out += fmt.Sprintf("rtsp_clients{state=\"reading\"} %d %v\n", countReaders, now) - out += fmt.Sprintf("rtsp_proxies{state=\"idle\"} %d %v\n", - countProxies-countProxiesRunning, now) - out += fmt.Sprintf("rtsp_proxies{state=\"running\"} %d %v\n", - countProxiesRunning, now) + out += fmt.Sprintf("rtsp_sources{type=\"rtsp\",state=\"idle\"} %d %v\n", + countSourcesRtsp-countSourcesRtspRunning, now) + out += fmt.Sprintf("rtsp_sources{type=\"rtsp\",state=\"running\"} %d %v\n", + countSourcesRtspRunning, now) + out += fmt.Sprintf("rtsp_sources{type=\"rtmp\",state=\"idle\"} %d %v\n", + countSourcesRtmp-countSourcesRtmpRunning, now) + out += fmt.Sprintf("rtsp_sources{type=\"rtmp\",state=\"running\"} %d %v\n", + countSourcesRtmpRunning, now) w.WriteHeader(http.StatusOK) io.WriteString(w, out) diff --git a/path.go b/path.go index f00cf16c..c04717cb 100644 --- a/path.go +++ b/path.go @@ -2,17 +2,18 @@ package main import ( "fmt" + "strings" "sync/atomic" "time" ) const ( - describeTimeout = 5 * time.Second - proxyStopAfterDescribeSecs = 10 * time.Second - onDemandCmdStopAfterDescribeSecs = 10 * time.Second + describeTimeout = 5 * time.Second + sourceStopAfterDescribePeriod = 10 * time.Second + onDemandCmdStopAfterDescribePeriod = 10 * time.Second ) -// a publisher is either a client or a proxy +// a publisher can be a client, a sourceRtsp or a sourceRtmp type publisher interface { isPublisher() } @@ -21,7 +22,6 @@ type path struct { p *program name string conf *pathConf - proxy *proxy publisher publisher publisherReady bool publisherTrackCount int @@ -39,9 +39,12 @@ func newPath(p *program, name string, conf *pathConf) *path { conf: conf, } - if conf.Source != "record" { - s := newProxy(p, pa, conf) - pa.proxy = s + if strings.HasPrefix(conf.Source, "rtsp://") { + s := newSourceRtsp(p, pa) + pa.publisher = s + + } else if strings.HasPrefix(conf.Source, "rtmp://") { + s := newSourceRtmp(p, pa) pa.publisher = s } @@ -53,8 +56,11 @@ func (pa *path) log(format string, args ...interface{}) { } func (pa *path) onInit() { - if pa.proxy != nil { - go pa.proxy.run(pa.proxy.state) + if source, ok := pa.publisher.(*sourceRtsp); ok { + go source.run(source.state) + + } else if source, ok := pa.publisher.(*sourceRtmp); ok { + go source.run(source.state) } if pa.conf.RunOnInit != "" { @@ -69,9 +75,13 @@ func (pa *path) onInit() { } func (pa *path) onClose(wait bool) { - if pa.proxy != nil { - close(pa.proxy.terminate) - <-pa.proxy.done + if source, ok := pa.publisher.(*sourceRtsp); ok { + close(source.terminate) + <-source.done + + } else if source, ok := pa.publisher.(*sourceRtmp); ok { + close(source.terminate) + <-source.done } if pa.onInitCmd != nil { @@ -142,22 +152,35 @@ func (pa *path) onCheck() { } } - // stop on demand proxy if needed - if pa.proxy != nil && - pa.conf.SourceOnDemand && - pa.proxy.state == proxyStateRunning && - !pa.hasClients() && - time.Since(pa.lastDescribeReq) >= proxyStopAfterDescribeSecs { - pa.log("stopping on demand proxy (not requested anymore)") - atomic.AddInt64(pa.p.countProxiesRunning, -1) - pa.proxy.state = proxyStateStopped - pa.proxy.setState <- pa.proxy.state + // stop on demand rtsp source if needed + if source, ok := pa.publisher.(*sourceRtsp); ok { + if pa.conf.SourceOnDemand && + source.state == sourceRtspStateRunning && + !pa.hasClients() && + time.Since(pa.lastDescribeReq) >= sourceStopAfterDescribePeriod { + pa.log("stopping on demand rtsp source (not requested anymore)") + atomic.AddInt64(pa.p.countSourcesRtspRunning, -1) + source.state = sourceRtspStateStopped + source.setState <- source.state + } + + // stop on demand rtmp source if needed + } else if source, ok := pa.publisher.(*sourceRtmp); ok { + if pa.conf.SourceOnDemand && + source.state == sourceRtmpStateRunning && + !pa.hasClients() && + time.Since(pa.lastDescribeReq) >= sourceStopAfterDescribePeriod { + pa.log("stopping on demand rtmp source (not requested anymore)") + atomic.AddInt64(pa.p.countSourcesRtmpRunning, -1) + source.state = sourceRtmpStateStopped + source.setState <- source.state + } } // stop on demand command if needed if pa.onDemandCmd != nil && !pa.hasClientReaders() && - time.Since(pa.lastDescribeReq) >= onDemandCmdStopAfterDescribeSecs { + time.Since(pa.lastDescribeReq) >= onDemandCmdStopAfterDescribePeriod { pa.log("stopping on demand command (not requested anymore)") pa.onDemandCmd.close() pa.onDemandCmd = nil @@ -240,12 +263,25 @@ func (pa *path) onDescribe(client *client) { // publisher was found but is not ready: put the client on hold } else if !pa.publisherReady { - if pa.proxy != nil && pa.proxy.state == proxyStateStopped { // start if needed - pa.log("starting on demand proxy") - pa.lastDescribeActivation = time.Now() - atomic.AddInt64(pa.p.countProxiesRunning, +1) - pa.proxy.state = proxyStateRunning - pa.proxy.setState <- pa.proxy.state + // start rtsp source if needed + if source, ok := pa.publisher.(*sourceRtsp); ok { + if source.state == sourceRtspStateStopped { + pa.log("starting on demand rtsp source") + pa.lastDescribeActivation = time.Now() + atomic.AddInt64(pa.p.countSourcesRtspRunning, +1) + source.state = sourceRtspStateRunning + source.setState <- source.state + } + + // start rtmp source if needed + } else if source, ok := pa.publisher.(*sourceRtmp); ok { + if source.state == sourceRtmpStateStopped { + pa.log("starting on demand rtmp source") + pa.lastDescribeActivation = time.Now() + atomic.AddInt64(pa.p.countSourcesRtmpRunning, +1) + source.state = sourceRtmpStateRunning + source.setState <- source.state + } } client.path = pa diff --git a/rtsp-simple-server.yml b/rtsp-simple-server.yml index 8719d492..ca76721e 100644 --- a/rtsp-simple-server.yml +++ b/rtsp-simple-server.yml @@ -38,12 +38,13 @@ logFile: rtsp-simple-server.log paths: all: # source of the stream - this can be: - # * record -> the stream is provided by a client through the RECORD command (like ffmpeg) - # * rtsp://original-url -> the stream is pulled from another RTSP server (proxy mode) + # * record -> the stream is provided by a client with the RECORD command (like ffmpeg) + # * rtsp://existing-url -> the stream is pulled from another RTSP server (proxy mode) + # * rtmp://existing-url -> the stream is pulled from a RTMP server source: record # if the source is an RTSP url, this is the protocol that will be used to pull the stream sourceProtocol: udp - # if the source is an RTSP url, it will be pulled only when at least one reader + # if the source is an RTSP or RTMP url, it will be pulled only when at least one reader # is connected, saving bandwidth sourceOnDemand: no diff --git a/sourcertmp.go b/sourcertmp.go new file mode 100644 index 00000000..4da9094d --- /dev/null +++ b/sourcertmp.go @@ -0,0 +1,361 @@ +package main + +import ( + "fmt" + "math/rand" + "net" + "sync/atomic" + "time" + + "github.com/aler9/gortsplib" + "github.com/notedit/rtmp/av" + "github.com/notedit/rtmp/codec/h264" + "github.com/notedit/rtmp/format/rtmp" + "github.com/pion/rtp" +) + +const ( + sourceRtmpRetryInterval = 5 * time.Second + rtpPayloadMaxSize = 1460 // 1500 - ip header - udp header - rtp header +) + +type rtpH264Encoder struct { + seqnum uint16 + ssrc uint32 + initialTs uint32 + started time.Duration +} + +func newRtpH264Encoder() *rtpH264Encoder { + return &rtpH264Encoder{ + seqnum: uint16(0), + ssrc: rand.Uint32(), + initialTs: rand.Uint32(), + } +} + +func (e *rtpH264Encoder) Encode(nalus [][]byte, timestamp time.Duration) ([][]byte, error) { + var frames [][]byte + + if e.started == time.Duration(0) { + e.started = timestamp + } + + // rtp/h264 uses a 90khz clock + rtpTs := e.initialTs + uint32((timestamp-e.started).Seconds()*90000) + + for i, nalu := range nalus { + naluFrames, err := e.encodeNalu(nalu, rtpTs, (i == len(nalus)-1)) + if err != nil { + return nil, err + } + frames = append(frames, naluFrames...) + } + + return frames, nil +} + +func (e *rtpH264Encoder) encodeNalu(nalu []byte, rtpTs uint32, isFinal bool) ([][]byte, error) { + // if the NALU fits into the RTP packet, use a single NALU packet + if len(nalu) < rtpPayloadMaxSize { + rpkt := &rtp.Packet{ + Header: rtp.Header{ + Version: 0x02, + PayloadType: 96, + SequenceNumber: e.seqnum, + Timestamp: rtpTs, + SSRC: e.ssrc, + }, + Payload: nalu, + } + e.seqnum++ + + if isFinal { + rpkt.Header.Marker = true + } + + frame, err := rpkt.Marshal() + if err != nil { + return nil, err + } + + return [][]byte{frame}, nil + } + + // otherwise, use fragmentation units + // use only FU-A, not FU-B, since we always use non-interleaved mode + // (set with packetization-mode=1) + + frameCount := (len(nalu) - 1) / (rtpPayloadMaxSize - 2) + lastFrameSize := (len(nalu) - 1) % (rtpPayloadMaxSize - 2) + if lastFrameSize > 0 { + frameCount++ + } + frames := make([][]byte, frameCount) + + nri := (nalu[0] >> 5) & 0x03 + typ := nalu[0] & 0x1F + nalu = nalu[1:] // remove header + + for i := 0; i < frameCount; i++ { + indicator := 0 | (nri << 5) | 28 // FU-A + + start := uint8(0) + if i == 0 { + start = 1 + } + end := uint8(0) + le := rtpPayloadMaxSize - 2 + if i == (len(frames) - 1) { + end = 1 + le = lastFrameSize + } + header := (start << 7) | (end << 6) | typ + + data := append([]byte{indicator, header}, nalu[:le]...) + nalu = nalu[le:] + + rpkt := &rtp.Packet{ + Header: rtp.Header{ + Version: 0x02, + PayloadType: 96, + SequenceNumber: e.seqnum, + Timestamp: rtpTs, + SSRC: e.ssrc, + }, + Payload: data, + } + e.seqnum++ + + if isFinal && i == (len(frames)-1) { + rpkt.Header.Marker = true + } + + frame, err := rpkt.Marshal() + if err != nil { + return nil, err + } + + frames[i] = frame + } + + return frames, nil +} + +type sourceRtmpState int + +const ( + sourceRtmpStateStopped sourceRtmpState = iota + sourceRtmpStateRunning +) + +type sourceRtmp struct { + p *program + path *path + state sourceRtmpState + innerRunning bool + + innerTerminate chan struct{} + innerDone chan struct{} + setState chan sourceRtmpState + terminate chan struct{} + done chan struct{} +} + +func newSourceRtmp(p *program, path *path) *sourceRtmp { + s := &sourceRtmp{ + p: p, + path: path, + setState: make(chan sourceRtmpState), + terminate: make(chan struct{}), + done: make(chan struct{}), + } + + atomic.AddInt64(p.countSourcesRtmp, +1) + + if path.conf.SourceOnDemand { + s.state = sourceRtmpStateStopped + } else { + s.state = sourceRtmpStateRunning + atomic.AddInt64(p.countSourcesRtmpRunning, +1) + } + + return s +} + +func (s *sourceRtmp) isPublisher() {} + +func (s *sourceRtmp) run(initialState sourceRtmpState) { + s.applyState(initialState) + +outer: + for { + select { + case state := <-s.setState: + s.applyState(state) + + case <-s.terminate: + break outer + } + } + + if s.innerRunning { + close(s.innerTerminate) + <-s.innerDone + } + + close(s.setState) + close(s.done) +} + +func (s *sourceRtmp) applyState(state sourceRtmpState) { + if state == sourceRtmpStateRunning { + if !s.innerRunning { + s.path.log("rtmp source started") + s.innerRunning = true + s.innerTerminate = make(chan struct{}) + s.innerDone = make(chan struct{}) + go s.runInner() + } + } else { + if s.innerRunning { + close(s.innerTerminate) + <-s.innerDone + s.innerRunning = false + s.path.log("rtmp source stopped") + } + } +} + +func (s *sourceRtmp) runInner() { + defer close(s.innerDone) + +outer: + for { + ok := s.runInnerInner() + if !ok { + break outer + } + + t := time.NewTimer(sourceRtmpRetryInterval) + defer t.Stop() + + select { + case <-s.innerTerminate: + break outer + case <-t.C: + } + } +} + +func (s *sourceRtmp) runInnerInner() bool { + s.path.log("connecting to rtmp source") + + var conn *rtmp.Conn + var nconn net.Conn + var err error + dialDone := make(chan struct{}, 1) + go func() { + defer close(dialDone) + conn, nconn, err = rtmp.NewClient().Dial(s.path.conf.Source, rtmp.PrepareReading) + }() + + select { + case <-s.innerTerminate: + return false + case <-dialDone: + } + + if err != nil { + s.path.log("rtmp source ERR: %s", err) + return true + } + + // wait for SPS and PPS + sps, pps, err := func() ([]byte, []byte, error) { + for { + pkt, err := conn.ReadPacket() + if err != nil { + return nil, nil, err + } + + if pkt.Type == av.H264DecoderConfig { + codec, err := h264.FromDecoderConfig(pkt.Data) + if err != nil { + panic(err) + } + + return codec.SPS[0], codec.PPS[0], nil + } + } + }() + if err != nil { + s.path.log("rtmp source ERR: %s", err) + return true + } + + track := gortsplib.NewTrackH264(0, sps, pps) + tracks := gortsplib.Tracks{track} + s.path.publisherSdp = tracks.Write() + s.path.publisherTrackCount = len(tracks) + + s.p.sourceRtmpReady <- s + s.path.log("rtmp source ready") + + readDone := make(chan error) + go func() { + encoder := newRtpH264Encoder() + + for { + pkt, err := conn.ReadPacket() + if err != nil { + readDone <- err + return + } + + if pkt.Type == av.H264 { + // decode from AVCC format + nalus, typ := h264.SplitNALUs(pkt.Data) + if typ != h264.NALU_AVCC { + readDone <- fmt.Errorf("invalid NALU format (%d)", typ) + return + } + + // encode into RTP/H264 format + frames, err := encoder.Encode(nalus, pkt.Time) + if err != nil { + readDone <- err + return + } + + for _, f := range frames { + s.p.readersMap.forwardFrame(s.path, 0, gortsplib.StreamTypeRtp, f) + } + } + } + }() + + var ret bool + +outer: + for { + select { + case <-s.innerTerminate: + nconn.Close() + <-readDone + ret = false + break outer + + case err := <-readDone: + nconn.Close() + s.path.log("rtmp source ERR: %s", err) + ret = true + break outer + } + } + + s.p.sourceRtmpNotReady <- s + s.path.log("rtmp source not ready") + + return ret +} diff --git a/proxy.go b/sourcertsp.go similarity index 58% rename from proxy.go rename to sourcertsp.go index dc634481..42ac6f5e 100644 --- a/proxy.go +++ b/sourcertsp.go @@ -9,56 +9,54 @@ import ( ) const ( - proxyRetryInterval = 5 * time.Second + sourceRtspRetryInterval = 5 * time.Second ) -type proxyState int +type sourceRtspState int const ( - proxyStateStopped proxyState = iota - proxyStateRunning + sourceRtspStateStopped sourceRtspState = iota + sourceRtspStateRunning ) -type proxy struct { +type sourceRtsp struct { p *program path *path - pathConf *pathConf - state proxyState + state sourceRtspState tracks []*gortsplib.Track innerRunning bool innerTerminate chan struct{} innerDone chan struct{} - setState chan proxyState + setState chan sourceRtspState terminate chan struct{} done chan struct{} } -func newProxy(p *program, path *path, pathConf *pathConf) *proxy { - s := &proxy{ +func newSourceRtsp(p *program, path *path) *sourceRtsp { + s := &sourceRtsp{ p: p, path: path, - pathConf: pathConf, - setState: make(chan proxyState), + setState: make(chan sourceRtspState), terminate: make(chan struct{}), done: make(chan struct{}), } - atomic.AddInt64(p.countProxies, +1) + atomic.AddInt64(p.countSourcesRtsp, +1) - if pathConf.SourceOnDemand { - s.state = proxyStateStopped + if path.conf.SourceOnDemand { + s.state = sourceRtspStateStopped } else { - s.state = proxyStateRunning - atomic.AddInt64(p.countProxiesRunning, +1) + s.state = sourceRtspStateRunning + atomic.AddInt64(p.countSourcesRtspRunning, +1) } return s } -func (s *proxy) isPublisher() {} +func (s *sourceRtsp) isPublisher() {} -func (s *proxy) run(initialState proxyState) { +func (s *sourceRtsp) run(initialState sourceRtspState) { s.applyState(initialState) outer: @@ -81,10 +79,10 @@ outer: close(s.done) } -func (s *proxy) applyState(state proxyState) { - if state == proxyStateRunning { +func (s *sourceRtsp) applyState(state sourceRtspState) { + if state == sourceRtspStateRunning { if !s.innerRunning { - s.path.log("proxy started") + s.path.log("rtsp source started") s.innerRunning = true s.innerTerminate = make(chan struct{}) s.innerDone = make(chan struct{}) @@ -95,52 +93,46 @@ func (s *proxy) applyState(state proxyState) { close(s.innerTerminate) <-s.innerDone s.innerRunning = false - s.path.log("proxy stopped") + s.path.log("rtsp source stopped") } } } -func (s *proxy) runInner() { +func (s *sourceRtsp) runInner() { defer close(s.innerDone) +outer: for { - ok := func() bool { - ok := s.runInnerInner() - if !ok { - return false - } - - t := time.NewTimer(proxyRetryInterval) - defer t.Stop() - - select { - case <-s.innerTerminate: - return false - case <-t.C: - } - - return true - }() + ok := s.runInnerInner() if !ok { - break + break outer + } + + t := time.NewTimer(sourceRtspRetryInterval) + defer t.Stop() + + select { + case <-s.innerTerminate: + break outer + case <-t.C: } } } -func (s *proxy) runInnerInner() bool { - s.path.log("proxy connecting") +func (s *sourceRtsp) runInnerInner() bool { + s.path.log("connecting to rtsp source") var conn *gortsplib.ConnClient var err error - dialDone := make(chan struct{}) + dialDone := make(chan struct{}, 1) go func() { + defer close(dialDone) conn, err = gortsplib.NewConnClient(gortsplib.ConnClientConf{ - Host: s.pathConf.sourceUrl.Host, + Host: s.path.conf.sourceUrl.Host, ReadTimeout: s.p.conf.ReadTimeout, WriteTimeout: s.p.conf.WriteTimeout, ReadBufferCount: 2, }) - close(dialDone) }() select { @@ -150,56 +142,55 @@ func (s *proxy) runInnerInner() bool { } if err != nil { - s.path.log("proxy ERR: %s", err) + s.path.log("rtsp source ERR: %s", err) return true } - _, err = conn.Options(s.pathConf.sourceUrl) + _, err = conn.Options(s.path.conf.sourceUrl) if err != nil { conn.Close() - s.path.log("proxy ERR: %s", err) + s.path.log("rtsp source ERR: %s", err) return true } - tracks, _, err := conn.Describe(s.pathConf.sourceUrl) + tracks, _, err := conn.Describe(s.path.conf.sourceUrl) if err != nil { conn.Close() - s.path.log("proxy ERR: %s", err) + s.path.log("rtsp source ERR: %s", err) return true } // create a filtered SDP that is used by the server (not by the client) - serverSdp := tracks.Write() - - s.tracks = tracks + s.path.publisherSdp = tracks.Write() s.path.publisherTrackCount = len(tracks) - s.path.publisherSdp = serverSdp + s.tracks = tracks - if s.pathConf.sourceProtocolParsed == gortsplib.StreamProtocolUDP { + if s.path.conf.sourceProtocolParsed == gortsplib.StreamProtocolUDP { return s.runUDP(conn) } else { return s.runTCP(conn) } } -func (s *proxy) runUDP(conn *gortsplib.ConnClient) bool { +func (s *sourceRtsp) runUDP(conn *gortsplib.ConnClient) bool { for _, track := range s.tracks { - _, err := conn.SetupUDP(s.pathConf.sourceUrl, gortsplib.SetupModePlay, track, 0, 0) + _, err := conn.SetupUDP(s.path.conf.sourceUrl, gortsplib.SetupModePlay, track, 0, 0) if err != nil { conn.Close() - s.path.log("proxy ERR: %s", err) + s.path.log("rtsp source ERR: %s", err) return true } } - _, err := conn.Play(s.pathConf.sourceUrl) + _, err := conn.Play(s.path.conf.sourceUrl) if err != nil { conn.Close() - s.path.log("proxy ERR: %s", err) + s.path.log("rtsp source ERR: %s", err) return true } - s.p.proxyReady <- s + s.p.sourceRtspReady <- s + s.path.log("rtsp source ready") var wg sync.WaitGroup @@ -241,7 +232,7 @@ func (s *proxy) runUDP(conn *gortsplib.ConnClient) bool { tcpConnDone := make(chan error) go func() { - tcpConnDone <- conn.LoopUDP(s.pathConf.sourceUrl) + tcpConnDone <- conn.LoopUDP(s.path.conf.sourceUrl) }() var ret bool @@ -257,7 +248,7 @@ outer: case err := <-tcpConnDone: conn.Close() - s.path.log("proxy ERR: %s", err) + s.path.log("rtsp source ERR: %s", err) ret = true break outer } @@ -265,29 +256,31 @@ outer: wg.Wait() - s.p.proxyNotReady <- s + s.p.sourceRtspNotReady <- s + s.path.log("rtsp source not ready") return ret } -func (s *proxy) runTCP(conn *gortsplib.ConnClient) bool { +func (s *sourceRtsp) runTCP(conn *gortsplib.ConnClient) bool { for _, track := range s.tracks { - _, err := conn.SetupTCP(s.pathConf.sourceUrl, gortsplib.SetupModePlay, track) + _, err := conn.SetupTCP(s.path.conf.sourceUrl, gortsplib.SetupModePlay, track) if err != nil { conn.Close() - s.path.log("proxy ERR: %s", err) + s.path.log("rtsp source ERR: %s", err) return true } } - _, err := conn.Play(s.pathConf.sourceUrl) + _, err := conn.Play(s.path.conf.sourceUrl) if err != nil { conn.Close() - s.path.log("proxy ERR: %s", err) + s.path.log("rtsp source ERR: %s", err) return true } - s.p.proxyReady <- s + s.p.sourceRtspReady <- s + s.path.log("rtsp source ready") tcpConnDone := make(chan error) go func() { @@ -315,13 +308,14 @@ outer: case err := <-tcpConnDone: conn.Close() - s.path.log("proxy ERR: %s", err) + s.path.log("rtsp source ERR: %s", err) ret = true break outer } } - s.p.proxyNotReady <- s + s.p.sourceRtspNotReady <- s + s.path.log("rtsp source not ready") return ret } diff --git a/testimages/nginx-rtmp/Dockerfile b/testimages/nginx-rtmp/Dockerfile new file mode 100644 index 00000000..12b81580 --- /dev/null +++ b/testimages/nginx-rtmp/Dockerfile @@ -0,0 +1,8 @@ +FROM amd64/alpine:3.12 + +RUN apk add --no-cache \ + nginx-mod-rtmp + +COPY nginx.conf /etc/nginx/ + +ENTRYPOINT [ "nginx", "-g", "daemon off;" ] diff --git a/testimages/nginx-rtmp/nginx.conf b/testimages/nginx-rtmp/nginx.conf new file mode 100644 index 00000000..f7ff31d5 --- /dev/null +++ b/testimages/nginx-rtmp/nginx.conf @@ -0,0 +1,23 @@ + +pid /run/nginx.pid; +worker_processes auto; +pcre_jit on; + +error_log /dev/null; + +include /etc/nginx/modules/*.conf; + +events { + worker_connections 20000; +} + +rtmp { + server { + listen 1935; + access_log /dev/null; + + application stream { + live on; + } + } +} diff --git a/utils.go b/utils.go index 6c57eb6f..25639894 100644 --- a/utils.go +++ b/utils.go @@ -226,14 +226,14 @@ func (m *readersMap) forwardFrame(path *path, trackId int, streamType gortsplib. if c.streamProtocol == gortsplib.StreamProtocolUDP { if streamType == gortsplib.StreamTypeRtp { - c.p.serverRtp.write(frame, &net.UDPAddr{ + c.p.serverUdpRtp.write(frame, &net.UDPAddr{ IP: c.ip(), Zone: c.zone(), Port: track.rtpPort, }) } else { - c.p.serverRtcp.write(frame, &net.UDPAddr{ + c.p.serverUdpRtcp.write(frame, &net.UDPAddr{ IP: c.ip(), Zone: c.zone(), Port: track.rtcpPort,