diff --git a/conf.go b/conf.go index e351fdaf..d279bfe8 100644 --- a/conf.go +++ b/conf.go @@ -28,7 +28,7 @@ type ConfPath struct { type conf struct { Protocols []string `yaml:"protocols"` - protocolsParsed map[streamProtocol]struct{} + protocolsParsed map[gortsplib.StreamProtocol]struct{} RtspPort int `yaml:"rtspPort"` RtpPort int `yaml:"rtpPort"` RtcpPort int `yaml:"rtcpPort"` @@ -83,14 +83,14 @@ func loadConf(fpath string, stdin io.Reader) (*conf, error) { if len(conf.Protocols) == 0 { conf.Protocols = []string{"udp", "tcp"} } - conf.protocolsParsed = make(map[streamProtocol]struct{}) + conf.protocolsParsed = make(map[gortsplib.StreamProtocol]struct{}) for _, proto := range conf.Protocols { switch proto { case "udp": - conf.protocolsParsed[streamProtocolUdp] = struct{}{} + conf.protocolsParsed[gortsplib.StreamProtocolUdp] = struct{}{} case "tcp": - conf.protocolsParsed[streamProtocolTcp] = struct{}{} + conf.protocolsParsed[gortsplib.StreamProtocolTcp] = struct{}{} default: return nil, fmt.Errorf("unsupported protocol: %s", proto) diff --git a/go.mod b/go.mod index cedecbaf..7be3b3e1 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.13 require ( github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 // indirect github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d // indirect - github.com/aler9/gortsplib v0.0.0-20200719094715-6806ec79c031 + github.com/aler9/gortsplib v0.0.0-20200719162513-a119764b9644 github.com/aler9/sdp/v3 v3.0.0-20200719093237-2c3d108a7436 github.com/stretchr/testify v1.6.1 gopkg.in/alecthomas/kingpin.v2 v2.2.6 diff --git a/go.sum b/go.sum index e960564f..329b627c 100644 --- a/go.sum +++ b/go.sum @@ -2,8 +2,8 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 h1:JYp7IbQjafo github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d h1:UQZhZ2O0vMHr2cI+DC1Mbh0TJxzA3RcLoMsFw+aXw7E= github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho= -github.com/aler9/gortsplib v0.0.0-20200719094715-6806ec79c031 h1:Kb+H1mkbmzbAIcX0++A8kHwdhpQiNe6reFcWNUATcVk= -github.com/aler9/gortsplib v0.0.0-20200719094715-6806ec79c031/go.mod h1:kBMvjIdOHRjLdV+oT28JD72JUPpJuwxOc9u72GG8GpY= +github.com/aler9/gortsplib v0.0.0-20200719162513-a119764b9644 h1:14g114ATdvGCrnKOHRLklmPwtchF2LQAjdyORVBEzoQ= +github.com/aler9/gortsplib v0.0.0-20200719162513-a119764b9644/go.mod h1:kBMvjIdOHRjLdV+oT28JD72JUPpJuwxOc9u72GG8GpY= github.com/aler9/sdp/v3 v3.0.0-20200719093237-2c3d108a7436 h1:W0iNErWKvSAyJBNVx+qQoyFrWOFVgS6f/WEME/D3EZc= github.com/aler9/sdp/v3 v3.0.0-20200719093237-2c3d108a7436/go.mod h1:OnlEK3QI7YtM+ShZWtGajmOHLZ3bjU80AcIS5e34i1U= github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= diff --git a/main.go b/main.go index c9842a41..0f833e02 100644 --- a/main.go +++ b/main.go @@ -16,25 +16,6 @@ import ( var Version = "v0.0.0" -type track struct { - rtpPort int - rtcpPort int -} - -type streamProtocol int - -const ( - streamProtocolUdp streamProtocol = iota - streamProtocolTcp -) - -func (s streamProtocol) String() string { - if s == streamProtocolUdp { - return "udp" - } - return "tcp" -} - type programEvent interface { isProgramEvent() } @@ -71,7 +52,7 @@ type programEventClientSetupPlay struct { res chan error client *serverClient path string - protocol streamProtocol + protocol gortsplib.StreamProtocol rtpPort int rtcpPort int } @@ -81,7 +62,7 @@ func (programEventClientSetupPlay) isProgramEvent() {} type programEventClientSetupRecord struct { res chan error client *serverClient - protocol streamProtocol + protocol gortsplib.StreamProtocol rtpPort int rtcpPort int } @@ -335,7 +316,7 @@ outer: evt.client.path = evt.path evt.client.streamProtocol = evt.protocol - evt.client.streamTracks = append(evt.client.streamTracks, &track{ + evt.client.streamTracks = append(evt.client.streamTracks, &serverClientTrack{ rtpPort: evt.rtpPort, rtcpPort: evt.rtcpPort, }) @@ -344,7 +325,7 @@ outer: case programEventClientSetupRecord: evt.client.streamProtocol = evt.protocol - evt.client.streamTracks = append(evt.client.streamTracks, &track{ + evt.client.streamTracks = append(evt.client.streamTracks, &serverClientTrack{ rtpPort: evt.rtpPort, rtcpPort: evt.rtcpPort, }) @@ -401,7 +382,7 @@ outer: continue } - client.RtcpReceivers[trackId].OnFrame(evt.streamType, evt.buf) + client.rtcpReceivers[trackId].OnFrame(evt.streamType, evt.buf) p.forwardFrame(client.path, trackId, evt.streamType, evt.buf) case programEventClientFrameTcp: @@ -496,7 +477,7 @@ func (p *program) findPublisher(addr *net.UDPAddr, streamType gortsplib.StreamTy continue } - if cl.streamProtocol != streamProtocolUdp || + if cl.streamProtocol != gortsplib.StreamProtocolUdp || cl.state != clientStateRecord || !cl.ip().Equal(addr.IP) { continue @@ -520,7 +501,7 @@ func (p *program) findPublisher(addr *net.UDPAddr, streamType gortsplib.StreamTy func (p *program) forwardFrame(path string, trackId int, streamType gortsplib.StreamType, frame []byte) { for client := range p.clients { if client.path == path && client.state == clientStatePlay { - if client.streamProtocol == streamProtocolUdp { + if client.streamProtocol == gortsplib.StreamProtocolUdp { if streamType == gortsplib.StreamTypeRtp { p.rtpl.write(&udpAddrBufPair{ addr: &net.UDPAddr{ diff --git a/server-client.go b/server-client.go index 0424d8bb..c46a7c87 100644 --- a/server-client.go +++ b/server-client.go @@ -19,6 +19,11 @@ const ( clientReceiverReportInterval = 10 * time.Second ) +type serverClientTrack struct { + rtpPort int + rtcpPort int +} + type serverClientEvent interface { isServerClientEvent() } @@ -74,13 +79,13 @@ type serverClient struct { authFailures int streamSdpText []byte // only if publisher streamSdpParsed *sdp.SessionDescription // only if publisher - streamProtocol streamProtocol - streamTracks []*track - RtcpReceivers []*gortsplib.RtcpReceiver + streamProtocol gortsplib.StreamProtocol + streamTracks []*serverClientTrack + rtcpReceivers []*gortsplib.RtcpReceiver readBuf *doubleBuffer writeBuf *doubleBuffer - events chan serverClientEvent // only if state = Play and streamProtocol = TCP + events chan serverClientEvent // only if state = Play and gortsplib.StreamProtocol = TCP done chan struct{} } @@ -486,7 +491,7 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) bool { } return false }() { - if _, ok := c.p.conf.protocolsParsed[streamProtocolUdp]; !ok { + if _, ok := c.p.conf.protocolsParsed[gortsplib.StreamProtocolUdp]; !ok { c.writeResError(req, gortsplib.StatusUnsupportedTransport, fmt.Errorf("UDP streaming is disabled")) return false } @@ -502,13 +507,13 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) bool { return false } - if len(c.streamTracks) > 0 && c.streamProtocol != streamProtocolUdp { + if len(c.streamTracks) > 0 && c.streamProtocol != gortsplib.StreamProtocolUdp { c.writeResError(req, gortsplib.StatusBadRequest, fmt.Errorf("can't receive tracks with different protocols")) return false } res := make(chan error) - c.p.events <- programEventClientSetupPlay{res, c, path, streamProtocolUdp, rtpPort, rtcpPort} + c.p.events <- programEventClientSetupPlay{res, c, path, gortsplib.StreamProtocolUdp, rtpPort, rtcpPort} err = <-res if err != nil { c.writeResError(req, gortsplib.StatusBadRequest, err) @@ -532,7 +537,7 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) bool { // play via TCP } else if _, ok := th["RTP/AVP/TCP"]; ok { - if _, ok := c.p.conf.protocolsParsed[streamProtocolTcp]; !ok { + if _, ok := c.p.conf.protocolsParsed[gortsplib.StreamProtocolTcp]; !ok { c.writeResError(req, gortsplib.StatusUnsupportedTransport, fmt.Errorf("TCP streaming is disabled")) return false } @@ -542,13 +547,13 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) bool { return false } - if len(c.streamTracks) > 0 && c.streamProtocol != streamProtocolTcp { + if len(c.streamTracks) > 0 && c.streamProtocol != gortsplib.StreamProtocolTcp { c.writeResError(req, gortsplib.StatusBadRequest, fmt.Errorf("can't receive tracks with different protocols")) return false } res := make(chan error) - c.p.events <- programEventClientSetupPlay{res, c, path, streamProtocolTcp, 0, 0} + c.p.events <- programEventClientSetupPlay{res, c, path, gortsplib.StreamProtocolTcp, 0, 0} err = <-res if err != nil { c.writeResError(req, gortsplib.StatusBadRequest, err) @@ -601,7 +606,7 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) bool { } return false }() { - if _, ok := c.p.conf.protocolsParsed[streamProtocolUdp]; !ok { + if _, ok := c.p.conf.protocolsParsed[gortsplib.StreamProtocolUdp]; !ok { c.writeResError(req, gortsplib.StatusUnsupportedTransport, fmt.Errorf("UDP streaming is disabled")) return false } @@ -612,7 +617,7 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) bool { return false } - if len(c.streamTracks) > 0 && c.streamProtocol != streamProtocolUdp { + if len(c.streamTracks) > 0 && c.streamProtocol != gortsplib.StreamProtocolUdp { c.writeResError(req, gortsplib.StatusBadRequest, fmt.Errorf("can't publish tracks with different protocols")) return false } @@ -623,7 +628,7 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) bool { } res := make(chan error) - c.p.events <- programEventClientSetupRecord{res, c, streamProtocolUdp, rtpPort, rtcpPort} + c.p.events <- programEventClientSetupRecord{res, c, gortsplib.StreamProtocolUdp, rtpPort, rtcpPort} err := <-res if err != nil { c.writeResError(req, gortsplib.StatusBadRequest, err) @@ -647,12 +652,12 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) bool { // record via TCP } else if _, ok := th["RTP/AVP/TCP"]; ok { - if _, ok := c.p.conf.protocolsParsed[streamProtocolTcp]; !ok { + if _, ok := c.p.conf.protocolsParsed[gortsplib.StreamProtocolTcp]; !ok { c.writeResError(req, gortsplib.StatusUnsupportedTransport, fmt.Errorf("TCP streaming is disabled")) return false } - if len(c.streamTracks) > 0 && c.streamProtocol != streamProtocolTcp { + if len(c.streamTracks) > 0 && c.streamProtocol != gortsplib.StreamProtocolTcp { c.writeResError(req, gortsplib.StatusBadRequest, fmt.Errorf("can't publish tracks with different protocols")) return false } @@ -675,7 +680,7 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) bool { } res := make(chan error) - c.p.events <- programEventClientSetupRecord{res, c, streamProtocolTcp, 0, 0} + c.p.events <- programEventClientSetupRecord{res, c, gortsplib.StreamProtocolTcp, 0, 0} err := <-res if err != nil { c.writeResError(req, gortsplib.StatusBadRequest, err) @@ -782,7 +787,7 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) bool { func (c *serverClient) runPlay(path string) { pconf := c.findConfForPath(path) - if c.streamProtocol == streamProtocolTcp { + if c.streamProtocol == gortsplib.StreamProtocolTcp { c.writeBuf = newDoubleBuffer(2048) c.events = make(chan serverClientEvent) } @@ -809,7 +814,7 @@ func (c *serverClient) runPlay(path string) { } } - if c.streamProtocol == streamProtocolTcp { + if c.streamProtocol == gortsplib.StreamProtocolTcp { readDone := make(chan error) go func() { buf := make([]byte, 2048) @@ -880,9 +885,9 @@ func (c *serverClient) runPlay(path string) { func (c *serverClient) runRecord(path string) { pconf := c.findConfForPath(path) - c.RtcpReceivers = make([]*gortsplib.RtcpReceiver, len(c.streamTracks)) + c.rtcpReceivers = make([]*gortsplib.RtcpReceiver, len(c.streamTracks)) for trackId := range c.streamTracks { - c.RtcpReceivers[trackId] = gortsplib.NewRtcpReceiver() + c.rtcpReceivers[trackId] = gortsplib.NewRtcpReceiver() } done := make(chan struct{}) @@ -907,7 +912,7 @@ func (c *serverClient) runRecord(path string) { } } - if c.streamProtocol == streamProtocolTcp { + if c.streamProtocol == gortsplib.StreamProtocolTcp { frame := &gortsplib.InterleavedFrame{} readDone := make(chan error) @@ -929,7 +934,7 @@ func (c *serverClient) runRecord(path string) { break } - c.RtcpReceivers[frame.TrackId].OnFrame(frame.StreamType, frame.Content) + c.rtcpReceivers[frame.TrackId].OnFrame(frame.StreamType, frame.Content) c.p.events <- programEventClientFrameTcp{ c.path, frame.TrackId, @@ -961,7 +966,7 @@ func (c *serverClient) runRecord(path string) { case <-checkStreamTicker.C: for trackId := range c.streamTracks { - if time.Since(c.RtcpReceivers[trackId].LastFrameTime()) >= c.p.conf.StreamDeadAfter { + if time.Since(c.rtcpReceivers[trackId].LastFrameTime()) >= c.p.conf.StreamDeadAfter { c.log("ERR: stream is dead") c.conn.NetConn().Close() <-readDone @@ -971,7 +976,7 @@ func (c *serverClient) runRecord(path string) { case <-receiverReportTicker.C: for trackId := range c.streamTracks { - frame := c.RtcpReceivers[trackId].Report() + frame := c.rtcpReceivers[trackId].Report() c.conn.WriteFrame(&gortsplib.InterleavedFrame{ TrackId: trackId, StreamType: gortsplib.StreamTypeRtcp, @@ -1016,7 +1021,7 @@ func (c *serverClient) runRecord(path string) { case <-checkStreamTicker.C: for trackId := range c.streamTracks { - if time.Since(c.RtcpReceivers[trackId].LastFrameTime()) >= c.p.conf.StreamDeadAfter { + if time.Since(c.rtcpReceivers[trackId].LastFrameTime()) >= c.p.conf.StreamDeadAfter { c.log("ERR: stream is dead") c.conn.NetConn().Close() <-readDone @@ -1026,7 +1031,7 @@ func (c *serverClient) runRecord(path string) { case <-receiverReportTicker.C: for trackId := range c.streamTracks { - frame := c.RtcpReceivers[trackId].Report() + frame := c.rtcpReceivers[trackId].Report() c.p.rtcpl.writeChan <- &udpAddrBufPair{ addr: &net.UDPAddr{ IP: c.ip(), @@ -1048,7 +1053,7 @@ func (c *serverClient) runRecord(path string) { <-done for trackId := range c.streamTracks { - c.RtcpReceivers[trackId].Close() + c.rtcpReceivers[trackId].Close() } if runOnPublishCmd != nil { diff --git a/source.go b/source.go index 9e6b5d3f..5f104654 100644 --- a/source.go +++ b/source.go @@ -27,9 +27,9 @@ type source struct { p *program path string u *url.URL - proto streamProtocol + proto gortsplib.StreamProtocol ready bool - clientTracks []*gortsplib.Track + tracks []*gortsplib.Track serverSdpText []byte serverSdpParsed *sdp.SessionDescription rtcpReceivers []*gortsplib.RtcpReceiver @@ -59,15 +59,15 @@ func newSource(p *program, path string, sourceStr string, sourceProtocol string) } } - proto, err := func() (streamProtocol, error) { + proto, err := func() (gortsplib.StreamProtocol, error) { switch sourceProtocol { case "udp": - return streamProtocolUdp, nil + return gortsplib.StreamProtocolUdp, nil case "tcp": - return streamProtocolTcp, nil + return gortsplib.StreamProtocolTcp, nil } - return streamProtocol(0), fmt.Errorf("unsupported protocol '%s'", sourceProtocol) + return gortsplib.StreamProtocol(0), fmt.Errorf("unsupported protocol '%s'", sourceProtocol) }() if err != nil { return nil, err @@ -155,20 +155,20 @@ func (s *source) do() bool { return true } - clientTracks, _, err := conn.Describe(s.u) + tracks, _, err := conn.Describe(s.u) if err != nil { s.log("ERR: %s", err) return true } // create a filtered SDP that is used by the server (not by the client) - serverSdpParsed, serverSdpText := sdpForServer(clientTracks) + serverSdpParsed, serverSdpText := sdpForServer(tracks) - s.clientTracks = clientTracks + s.tracks = tracks s.serverSdpText = serverSdpText s.serverSdpParsed = serverSdpParsed - if s.proto == streamProtocolUdp { + if s.proto == gortsplib.StreamProtocolUdp { return s.runUdp(conn) } else { return s.runTcp(conn) @@ -187,7 +187,7 @@ func (s *source) runUdp(conn *gortsplib.ConnClient) bool { } }() - for i, track := range s.clientTracks { + for i, track := range s.tracks { var rtpPort int var rtcpPort int var rtpl *sourceUdpListener @@ -240,8 +240,8 @@ func (s *source) runUdp(conn *gortsplib.ConnClient) bool { return true } - s.rtcpReceivers = make([]*gortsplib.RtcpReceiver, len(s.clientTracks)) - for trackId := range s.clientTracks { + s.rtcpReceivers = make([]*gortsplib.RtcpReceiver, len(s.tracks)) + for trackId := range s.tracks { s.rtcpReceivers[trackId] = gortsplib.NewRtcpReceiver() } @@ -250,12 +250,16 @@ func (s *source) runUdp(conn *gortsplib.ConnClient) bool { pair.rtcpl.start() } - sendKeepaliveTicker := time.NewTicker(sourceKeepaliveInterval) + s.p.events <- programEventStreamerReady{s} + + tcpConnDone := make(chan error, 1) + go func() { + tcpConnDone <- conn.LoopUDP(s.u) + }() + checkStreamTicker := time.NewTicker(sourceCheckStreamInterval) receiverReportTicker := time.NewTicker(sourceReceiverReportInterval) - s.p.events <- programEventStreamerReady{s} - var ret bool outer: @@ -265,16 +269,13 @@ outer: ret = false break outer - case <-sendKeepaliveTicker.C: - _, err := conn.Options(s.u) - if err != nil { - s.log("ERR: %s", err) - ret = true - break outer - } + case err := <-tcpConnDone: + s.log("ERR: %s", err) + ret = true + break outer case <-checkStreamTicker.C: - for trackId := range s.clientTracks { + for trackId := range s.tracks { if time.Since(s.rtcpReceivers[trackId].LastFrameTime()) >= s.p.conf.StreamDeadAfter { s.log("ERR: stream is dead") ret = true @@ -283,7 +284,7 @@ outer: } case <-receiverReportTicker.C: - for trackId := range s.clientTracks { + for trackId := range s.tracks { frame := s.rtcpReceivers[trackId].Report() sourceUdpListenerPairs[trackId].rtcpl.writeChan <- &udpAddrBufPair{ addr: &net.UDPAddr{ @@ -297,7 +298,6 @@ outer: } } - sendKeepaliveTicker.Stop() checkStreamTicker.Stop() receiverReportTicker.Stop() @@ -308,7 +308,7 @@ outer: pair.rtcpl.stop() } - for trackId := range s.clientTracks { + for trackId := range s.tracks { s.rtcpReceivers[trackId].Close() } @@ -316,7 +316,7 @@ outer: } func (s *source) runTcp(conn *gortsplib.ConnClient) bool { - for _, track := range s.clientTracks { + for _, track := range s.tracks { _, err := conn.SetupTcp(s.u, track) if err != nil { s.log("ERR: %s", err) @@ -330,8 +330,8 @@ func (s *source) runTcp(conn *gortsplib.ConnClient) bool { return true } - s.rtcpReceivers = make([]*gortsplib.RtcpReceiver, len(s.clientTracks)) - for trackId := range s.clientTracks { + s.rtcpReceivers = make([]*gortsplib.RtcpReceiver, len(s.tracks)) + for trackId := range s.tracks { s.rtcpReceivers[trackId] = gortsplib.NewRtcpReceiver() } @@ -375,7 +375,7 @@ outer: break outer case <-receiverReportTicker.C: - for trackId := range s.clientTracks { + for trackId := range s.tracks { frame := s.rtcpReceivers[trackId].Report() conn.WriteFrame(&gortsplib.InterleavedFrame{ @@ -391,7 +391,7 @@ outer: s.p.events <- programEventStreamerNotReady{s} - for trackId := range s.clientTracks { + for trackId := range s.tracks { s.rtcpReceivers[trackId].Close() }