diff --git a/client.go b/client.go index 62bd51c8..e5c137fb 100644 --- a/client.go +++ b/client.go @@ -11,6 +11,10 @@ import ( "time" "github.com/aler9/gortsplib" + "github.com/aler9/gortsplib/auth" + "github.com/aler9/gortsplib/base" + "github.com/aler9/gortsplib/headers" + "github.com/aler9/gortsplib/rtcpreceiver" ) const ( @@ -41,7 +45,7 @@ type clientSetupPlayReq struct { } type readRequestPair struct { - req *gortsplib.Request + req *base.Request res chan error } @@ -96,17 +100,17 @@ type client struct { path *path authUser string authPass string - authHelper *gortsplib.AuthServer + authHelper *auth.Server authFailures int streamProtocol gortsplib.StreamProtocol streamTracks map[int]*clientTrack - rtcpReceivers []*gortsplib.RtcpReceiver + rtcpReceivers []*rtcpreceiver.RtcpReceiver udpLastFrameTimes []*int64 - describeCSeq gortsplib.HeaderValue + describeCSeq base.HeaderValue describeUrl string describe chan describeRes - tcpFrame chan *gortsplib.InterleavedFrame + tcpFrame chan *base.InterleavedFrame terminate chan struct{} done chan struct{} } @@ -123,7 +127,7 @@ func newClient(p *program, nconn net.Conn) *client { state: clientStateInitial, streamTracks: make(map[int]*clientTrack), describe: make(chan describeRes), - tcpFrame: make(chan *gortsplib.InterleavedFrame), + tcpFrame: make(chan *base.InterleavedFrame), terminate: make(chan struct{}), done: make(chan struct{}), } @@ -211,12 +215,12 @@ func (c *client) run() { close(c.done) } -func (c *client) writeResError(cseq gortsplib.HeaderValue, code gortsplib.StatusCode, err error) { +func (c *client) writeResError(cseq base.HeaderValue, code base.StatusCode, err error) { c.log("ERR: %s", err) - c.conn.WriteResponse(&gortsplib.Response{ + c.conn.WriteResponse(&base.Response{ StatusCode: code, - Header: gortsplib.Header{ + Header: base.Header{ "CSeq": cseq, }, }) @@ -225,7 +229,7 @@ func (c *client) writeResError(cseq gortsplib.HeaderValue, code gortsplib.Status var errAuthCritical = errors.New("auth critical") var errAuthNotCritical = errors.New("auth not critical") -func (c *client) authenticate(ips []interface{}, user string, pass string, req *gortsplib.Request) error { +func (c *client) authenticate(ips []interface{}, user string, pass string, req *base.Request) error { // validate ip err := func() error { if ips == nil { @@ -254,7 +258,7 @@ func (c *client) authenticate(ips []interface{}, user string, pass string, req * if c.authHelper == nil || c.authUser != user || c.authPass != pass { c.authUser = user c.authPass = pass - c.authHelper = gortsplib.NewAuthServer(user, pass, c.p.conf.authMethodsParsed) + c.authHelper = auth.NewServer(user, pass, c.p.conf.authMethodsParsed) } err := c.authHelper.ValidateHeader(req.Header["Authorization"], req.Method, req.Url) @@ -280,9 +284,9 @@ func (c *client) authenticate(ips []interface{}, user string, pass string, req * retErr = errAuthNotCritical } - c.conn.WriteResponse(&gortsplib.Response{ - StatusCode: gortsplib.StatusUnauthorized, - Header: gortsplib.Header{ + c.conn.WriteResponse(&base.Response{ + StatusCode: base.StatusUnauthorized, + Header: base.Header{ "CSeq": req.Header["CSeq"], "WWW-Authenticate": c.authHelper.GenerateHeader(), }, @@ -303,18 +307,18 @@ func (c *client) authenticate(ips []interface{}, user string, pass string, req * return nil } -func (c *client) handleRequest(req *gortsplib.Request) error { +func (c *client) handleRequest(req *base.Request) error { c.log(string(req.Method)) cseq, ok := req.Header["CSeq"] if !ok || len(cseq) != 1 { - c.writeResError(nil, gortsplib.StatusBadRequest, fmt.Errorf("cseq missing")) + c.writeResError(nil, base.StatusBadRequest, fmt.Errorf("cseq missing")) return errRunTerminate } pathName := req.Url.Path if len(pathName) < 1 || pathName[0] != '/' { - c.writeResError(cseq, gortsplib.StatusBadRequest, fmt.Errorf("path must begin with a slash")) + c.writeResError(cseq, base.StatusBadRequest, fmt.Errorf("path must begin with a slash")) return errRunTerminate } pathName = pathName[1:] // strip leading slash @@ -326,39 +330,39 @@ func (c *client) handleRequest(req *gortsplib.Request) error { } switch req.Method { - case gortsplib.OPTIONS: - c.conn.WriteResponse(&gortsplib.Response{ - StatusCode: gortsplib.StatusOK, - Header: gortsplib.Header{ + case base.OPTIONS: + c.conn.WriteResponse(&base.Response{ + StatusCode: base.StatusOK, + Header: base.Header{ "CSeq": cseq, - "Public": gortsplib.HeaderValue{strings.Join([]string{ - string(gortsplib.GET_PARAMETER), - string(gortsplib.DESCRIBE), - string(gortsplib.ANNOUNCE), - string(gortsplib.SETUP), - string(gortsplib.PLAY), - string(gortsplib.RECORD), - string(gortsplib.TEARDOWN), + "Public": base.HeaderValue{strings.Join([]string{ + string(base.GET_PARAMETER), + string(base.DESCRIBE), + string(base.ANNOUNCE), + string(base.SETUP), + string(base.PLAY), + string(base.RECORD), + string(base.TEARDOWN), }, ", ")}, }, }) return nil - // GET_PARAMETER is used like a ping - case gortsplib.GET_PARAMETER: - c.conn.WriteResponse(&gortsplib.Response{ - StatusCode: gortsplib.StatusOK, - Header: gortsplib.Header{ + // GET_PARAMETER is used like a ping + case base.GET_PARAMETER: + c.conn.WriteResponse(&base.Response{ + StatusCode: base.StatusOK, + Header: base.Header{ "CSeq": cseq, - "Content-Type": gortsplib.HeaderValue{"text/parameters"}, + "Content-Type": base.HeaderValue{"text/parameters"}, }, Content: []byte("\n"), }) return nil - case gortsplib.DESCRIBE: + case base.DESCRIBE: if c.state != clientStateInitial { - c.writeResError(cseq, gortsplib.StatusBadRequest, + c.writeResError(cseq, base.StatusBadRequest, fmt.Errorf("client is in state '%s' instead of '%s'", c.state, clientStateInitial)) return errRunTerminate } @@ -367,7 +371,7 @@ func (c *client) handleRequest(req *gortsplib.Request) error { pathConf, err := c.p.conf.checkPathNameAndFindConf(pathName) if err != nil { - c.writeResError(cseq, gortsplib.StatusBadRequest, err) + c.writeResError(cseq, base.StatusBadRequest, err) return errRunTerminate } @@ -386,9 +390,9 @@ func (c *client) handleRequest(req *gortsplib.Request) error { return errRunWaitDescription - case gortsplib.ANNOUNCE: + case base.ANNOUNCE: if c.state != clientStateInitial { - c.writeResError(cseq, gortsplib.StatusBadRequest, + c.writeResError(cseq, base.StatusBadRequest, fmt.Errorf("client is in state '%s' instead of '%s'", c.state, clientStateInitial)) return errRunTerminate } @@ -397,7 +401,7 @@ func (c *client) handleRequest(req *gortsplib.Request) error { pathConf, err := c.p.conf.checkPathNameAndFindConf(pathName) if err != nil { - c.writeResError(cseq, gortsplib.StatusBadRequest, err) + c.writeResError(cseq, base.StatusBadRequest, err) return errRunTerminate } @@ -411,23 +415,23 @@ func (c *client) handleRequest(req *gortsplib.Request) error { ct, ok := req.Header["Content-Type"] if !ok || len(ct) != 1 { - c.writeResError(cseq, gortsplib.StatusBadRequest, fmt.Errorf("Content-Type header missing")) + c.writeResError(cseq, base.StatusBadRequest, fmt.Errorf("Content-Type header missing")) return errRunTerminate } if ct[0] != "application/sdp" { - c.writeResError(cseq, gortsplib.StatusBadRequest, fmt.Errorf("unsupported Content-Type '%s'", ct)) + c.writeResError(cseq, base.StatusBadRequest, fmt.Errorf("unsupported Content-Type '%s'", ct)) return errRunTerminate } tracks, err := gortsplib.ReadTracks(req.Content) if err != nil { - c.writeResError(cseq, gortsplib.StatusBadRequest, fmt.Errorf("invalid SDP: %s", err)) + c.writeResError(cseq, base.StatusBadRequest, fmt.Errorf("invalid SDP: %s", err)) return errRunTerminate } if len(tracks) == 0 { - c.writeResError(cseq, gortsplib.StatusBadRequest, fmt.Errorf("no tracks defined")) + c.writeResError(cseq, base.StatusBadRequest, fmt.Errorf("no tracks defined")) return errRunTerminate } @@ -437,33 +441,33 @@ func (c *client) handleRequest(req *gortsplib.Request) error { c.p.clientAnnounce <- clientAnnounceReq{res, c, pathName, pathConf, len(tracks), sdp} err = <-res if err != nil { - c.writeResError(cseq, gortsplib.StatusBadRequest, err) + c.writeResError(cseq, base.StatusBadRequest, err) return errRunTerminate } - c.conn.WriteResponse(&gortsplib.Response{ - StatusCode: gortsplib.StatusOK, - Header: gortsplib.Header{ + c.conn.WriteResponse(&base.Response{ + StatusCode: base.StatusOK, + Header: base.Header{ "CSeq": cseq, }, }) return nil - case gortsplib.SETUP: - th, err := gortsplib.ReadHeaderTransport(req.Header["Transport"]) + case base.SETUP: + th, err := headers.ReadTransport(req.Header["Transport"]) if err != nil { - c.writeResError(cseq, gortsplib.StatusBadRequest, fmt.Errorf("transport header: %s", err)) + c.writeResError(cseq, base.StatusBadRequest, fmt.Errorf("transport header: %s", err)) return errRunTerminate } if th.Cast != nil && *th.Cast == gortsplib.StreamMulticast { - c.writeResError(cseq, gortsplib.StatusBadRequest, fmt.Errorf("multicast is not supported")) + c.writeResError(cseq, base.StatusBadRequest, fmt.Errorf("multicast is not supported")) return errRunTerminate } basePath, controlPath, err := splitPath(pathName) if err != nil { - c.writeResError(cseq, gortsplib.StatusBadRequest, err) + c.writeResError(cseq, base.StatusBadRequest, err) return errRunTerminate } @@ -472,14 +476,14 @@ func (c *client) handleRequest(req *gortsplib.Request) error { switch c.state { // play case clientStateInitial, clientStatePrePlay: - if th.Mode != nil && *th.Mode != "play" { - c.writeResError(cseq, gortsplib.StatusBadRequest, fmt.Errorf("transport header must contain mode=play or not contain a mode")) + if th.Mode != nil && *th.Mode != gortsplib.TransportModePlay { + c.writeResError(cseq, base.StatusBadRequest, fmt.Errorf("transport header must contain mode=play or not contain a mode")) return errRunTerminate } pathConf, err := c.p.conf.checkPathNameAndFindConf(basePath) if err != nil { - c.writeResError(cseq, gortsplib.StatusBadRequest, err) + c.writeResError(cseq, base.StatusBadRequest, err) return errRunTerminate } @@ -492,41 +496,41 @@ func (c *client) handleRequest(req *gortsplib.Request) error { } if c.path != nil && basePath != c.path.name { - c.writeResError(cseq, gortsplib.StatusBadRequest, fmt.Errorf("path has changed, was '%s', now is '%s'", c.path.name, basePath)) + c.writeResError(cseq, base.StatusBadRequest, fmt.Errorf("path has changed, was '%s', now is '%s'", c.path.name, basePath)) return errRunTerminate } if !strings.HasPrefix(controlPath, "trackID=") { - c.writeResError(cseq, gortsplib.StatusBadRequest, fmt.Errorf("invalid control path (%s)", controlPath)) + c.writeResError(cseq, base.StatusBadRequest, fmt.Errorf("invalid control path (%s)", controlPath)) return errRunTerminate } tmp, err := strconv.ParseInt(controlPath[len("trackID="):], 10, 64) if err != nil || tmp < 0 { - c.writeResError(cseq, gortsplib.StatusBadRequest, fmt.Errorf("invalid track id (%s)", controlPath)) + c.writeResError(cseq, base.StatusBadRequest, fmt.Errorf("invalid track id (%s)", controlPath)) return errRunTerminate } trackId := int(tmp) if _, ok := c.streamTracks[trackId]; ok { - c.writeResError(cseq, gortsplib.StatusBadRequest, fmt.Errorf("track %d has already been setup", trackId)) + c.writeResError(cseq, base.StatusBadRequest, fmt.Errorf("track %d has already been setup", trackId)) return errRunTerminate } // play with UDP if th.Protocol == gortsplib.StreamProtocolUDP { if _, ok := c.p.conf.protocolsParsed[gortsplib.StreamProtocolUDP]; !ok { - c.writeResError(cseq, gortsplib.StatusUnsupportedTransport, fmt.Errorf("UDP streaming is disabled")) + c.writeResError(cseq, base.StatusUnsupportedTransport, fmt.Errorf("UDP streaming is disabled")) return errRunTerminate } if len(c.streamTracks) > 0 && c.streamProtocol != gortsplib.StreamProtocolUDP { - c.writeResError(cseq, gortsplib.StatusBadRequest, fmt.Errorf("can't receive tracks with different protocols")) + c.writeResError(cseq, base.StatusBadRequest, fmt.Errorf("can't receive tracks with different protocols")) return errRunTerminate } if th.ClientPorts == nil { - c.writeResError(cseq, gortsplib.StatusBadRequest, fmt.Errorf("transport header does not have valid client ports (%v)", req.Header["Transport"])) + c.writeResError(cseq, base.StatusBadRequest, fmt.Errorf("transport header does not have valid client ports (%v)", req.Header["Transport"])) return errRunTerminate } @@ -534,7 +538,7 @@ func (c *client) handleRequest(req *gortsplib.Request) error { c.p.clientSetupPlay <- clientSetupPlayReq{res, c, basePath, trackId} err = <-res if err != nil { - c.writeResError(cseq, gortsplib.StatusBadRequest, err) + c.writeResError(cseq, base.StatusBadRequest, err) return errRunTerminate } @@ -544,7 +548,7 @@ func (c *client) handleRequest(req *gortsplib.Request) error { rtcpPort: (*th.ClientPorts)[1], } - th := &gortsplib.HeaderTransport{ + th := &headers.Transport{ Protocol: gortsplib.StreamProtocolUDP, Cast: func() *gortsplib.StreamCast { v := gortsplib.StreamUnicast @@ -554,12 +558,12 @@ func (c *client) handleRequest(req *gortsplib.Request) error { ServerPorts: &[2]int{c.p.conf.RtpPort, c.p.conf.RtcpPort}, } - c.conn.WriteResponse(&gortsplib.Response{ - StatusCode: gortsplib.StatusOK, - Header: gortsplib.Header{ + c.conn.WriteResponse(&base.Response{ + StatusCode: base.StatusOK, + Header: base.Header{ "CSeq": cseq, "Transport": th.Write(), - "Session": gortsplib.HeaderValue{"12345678"}, + "Session": base.HeaderValue{"12345678"}, }, }) return nil @@ -567,12 +571,12 @@ func (c *client) handleRequest(req *gortsplib.Request) error { // play with TCP } else { if _, ok := c.p.conf.protocolsParsed[gortsplib.StreamProtocolTCP]; !ok { - c.writeResError(cseq, gortsplib.StatusUnsupportedTransport, fmt.Errorf("TCP streaming is disabled")) + c.writeResError(cseq, base.StatusUnsupportedTransport, fmt.Errorf("TCP streaming is disabled")) return errRunTerminate } if len(c.streamTracks) > 0 && c.streamProtocol != gortsplib.StreamProtocolTCP { - c.writeResError(cseq, gortsplib.StatusBadRequest, fmt.Errorf("can't receive tracks with different protocols")) + c.writeResError(cseq, base.StatusBadRequest, fmt.Errorf("can't receive tracks with different protocols")) return errRunTerminate } @@ -580,7 +584,7 @@ func (c *client) handleRequest(req *gortsplib.Request) error { c.p.clientSetupPlay <- clientSetupPlayReq{res, c, basePath, trackId} err = <-res if err != nil { - c.writeResError(cseq, gortsplib.StatusBadRequest, err) + c.writeResError(cseq, base.StatusBadRequest, err) return errRunTerminate } @@ -592,17 +596,17 @@ func (c *client) handleRequest(req *gortsplib.Request) error { interleavedIds := [2]int{trackId * 2, (trackId * 2) + 1} - th := &gortsplib.HeaderTransport{ + th := &headers.Transport{ Protocol: gortsplib.StreamProtocolTCP, InterleavedIds: &interleavedIds, } - c.conn.WriteResponse(&gortsplib.Response{ - StatusCode: gortsplib.StatusOK, - Header: gortsplib.Header{ + c.conn.WriteResponse(&base.Response{ + StatusCode: base.StatusOK, + Header: base.Header{ "CSeq": cseq, "Transport": th.Write(), - "Session": gortsplib.HeaderValue{"12345678"}, + "Session": base.HeaderValue{"12345678"}, }, }) return nil @@ -610,36 +614,36 @@ func (c *client) handleRequest(req *gortsplib.Request) error { // record case clientStatePreRecord: - if th.Mode == nil || *th.Mode != "record" { - c.writeResError(cseq, gortsplib.StatusBadRequest, fmt.Errorf("transport header does not contain mode=record")) + if th.Mode == nil || *th.Mode != gortsplib.TransportModeRecord { + c.writeResError(cseq, base.StatusBadRequest, fmt.Errorf("transport header does not contain mode=record")) return errRunTerminate } // after ANNOUNCE, c.path is already set if basePath != c.path.name { - c.writeResError(cseq, gortsplib.StatusBadRequest, fmt.Errorf("path has changed, was '%s', now is '%s'", c.path.name, basePath)) + c.writeResError(cseq, base.StatusBadRequest, fmt.Errorf("path has changed, was '%s', now is '%s'", c.path.name, basePath)) return errRunTerminate } // record with UDP if th.Protocol == gortsplib.StreamProtocolUDP { if _, ok := c.p.conf.protocolsParsed[gortsplib.StreamProtocolUDP]; !ok { - c.writeResError(cseq, gortsplib.StatusUnsupportedTransport, fmt.Errorf("UDP streaming is disabled")) + c.writeResError(cseq, base.StatusUnsupportedTransport, fmt.Errorf("UDP streaming is disabled")) return errRunTerminate } if len(c.streamTracks) > 0 && c.streamProtocol != gortsplib.StreamProtocolUDP { - c.writeResError(cseq, gortsplib.StatusBadRequest, fmt.Errorf("can't publish tracks with different protocols")) + c.writeResError(cseq, base.StatusBadRequest, fmt.Errorf("can't publish tracks with different protocols")) return errRunTerminate } if th.ClientPorts == nil { - c.writeResError(cseq, gortsplib.StatusBadRequest, fmt.Errorf("transport header does not have valid client ports (%s)", req.Header["Transport"])) + c.writeResError(cseq, base.StatusBadRequest, fmt.Errorf("transport header does not have valid client ports (%s)", req.Header["Transport"])) return errRunTerminate } if len(c.streamTracks) >= c.path.publisherTrackCount { - c.writeResError(cseq, gortsplib.StatusBadRequest, fmt.Errorf("all the tracks have already been setup")) + c.writeResError(cseq, base.StatusBadRequest, fmt.Errorf("all the tracks have already been setup")) return errRunTerminate } @@ -649,7 +653,7 @@ func (c *client) handleRequest(req *gortsplib.Request) error { rtcpPort: (*th.ClientPorts)[1], } - th := &gortsplib.HeaderTransport{ + th := &headers.Transport{ Protocol: gortsplib.StreamProtocolUDP, Cast: func() *gortsplib.StreamCast { v := gortsplib.StreamUnicast @@ -659,12 +663,12 @@ func (c *client) handleRequest(req *gortsplib.Request) error { ServerPorts: &[2]int{c.p.conf.RtpPort, c.p.conf.RtcpPort}, } - c.conn.WriteResponse(&gortsplib.Response{ - StatusCode: gortsplib.StatusOK, - Header: gortsplib.Header{ + c.conn.WriteResponse(&base.Response{ + StatusCode: base.StatusOK, + Header: base.Header{ "CSeq": cseq, "Transport": th.Write(), - "Session": gortsplib.HeaderValue{"12345678"}, + "Session": base.HeaderValue{"12345678"}, }, }) return nil @@ -672,29 +676,29 @@ func (c *client) handleRequest(req *gortsplib.Request) error { // record with TCP } else { if _, ok := c.p.conf.protocolsParsed[gortsplib.StreamProtocolTCP]; !ok { - c.writeResError(cseq, gortsplib.StatusUnsupportedTransport, fmt.Errorf("TCP streaming is disabled")) + c.writeResError(cseq, base.StatusUnsupportedTransport, fmt.Errorf("TCP streaming is disabled")) return errRunTerminate } if len(c.streamTracks) > 0 && c.streamProtocol != gortsplib.StreamProtocolTCP { - c.writeResError(cseq, gortsplib.StatusBadRequest, fmt.Errorf("can't publish tracks with different protocols")) + c.writeResError(cseq, base.StatusBadRequest, fmt.Errorf("can't publish tracks with different protocols")) return errRunTerminate } interleavedIds := [2]int{len(c.streamTracks) * 2, 1 + len(c.streamTracks)*2} if th.InterleavedIds == nil { - c.writeResError(cseq, gortsplib.StatusBadRequest, fmt.Errorf("transport header does not contain the interleaved field")) + c.writeResError(cseq, base.StatusBadRequest, fmt.Errorf("transport header does not contain the interleaved field")) return errRunTerminate } if (*th.InterleavedIds)[0] != interleavedIds[0] || (*th.InterleavedIds)[1] != interleavedIds[1] { - c.writeResError(cseq, gortsplib.StatusBadRequest, fmt.Errorf("wrong interleaved ids, expected %v, got %v", interleavedIds, *th.InterleavedIds)) + c.writeResError(cseq, base.StatusBadRequest, fmt.Errorf("wrong interleaved ids, expected %v, got %v", interleavedIds, *th.InterleavedIds)) return errRunTerminate } if len(c.streamTracks) >= c.path.publisherTrackCount { - c.writeResError(cseq, gortsplib.StatusBadRequest, fmt.Errorf("all the tracks have already been setup")) + c.writeResError(cseq, base.StatusBadRequest, fmt.Errorf("all the tracks have already been setup")) return errRunTerminate } @@ -704,30 +708,30 @@ func (c *client) handleRequest(req *gortsplib.Request) error { rtcpPort: 0, } - ht := &gortsplib.HeaderTransport{ + ht := &headers.Transport{ Protocol: gortsplib.StreamProtocolTCP, InterleavedIds: &interleavedIds, } - c.conn.WriteResponse(&gortsplib.Response{ - StatusCode: gortsplib.StatusOK, - Header: gortsplib.Header{ + c.conn.WriteResponse(&base.Response{ + StatusCode: base.StatusOK, + Header: base.Header{ "CSeq": cseq, "Transport": ht.Write(), - "Session": gortsplib.HeaderValue{"12345678"}, + "Session": base.HeaderValue{"12345678"}, }, }) return nil } default: - c.writeResError(cseq, gortsplib.StatusBadRequest, fmt.Errorf("client is in state '%s'", c.state)) + c.writeResError(cseq, base.StatusBadRequest, fmt.Errorf("client is in state '%s'", c.state)) return errRunTerminate } - case gortsplib.PLAY: + case base.PLAY: if c.state != clientStatePrePlay { - c.writeResError(cseq, gortsplib.StatusBadRequest, + c.writeResError(cseq, base.StatusBadRequest, fmt.Errorf("client is in state '%s' instead of '%s'", c.state, clientStatePrePlay)) return errRunTerminate } @@ -738,31 +742,31 @@ func (c *client) handleRequest(req *gortsplib.Request) error { pathName = strings.TrimSuffix(pathName, "/") if pathName != c.path.name { - c.writeResError(cseq, gortsplib.StatusBadRequest, fmt.Errorf("path has changed, was '%s', now is '%s'", c.path.name, pathName)) + c.writeResError(cseq, base.StatusBadRequest, fmt.Errorf("path has changed, was '%s', now is '%s'", c.path.name, pathName)) return errRunTerminate } if len(c.streamTracks) == 0 { - c.writeResError(cseq, gortsplib.StatusBadRequest, fmt.Errorf("no tracks have been setup")) + c.writeResError(cseq, base.StatusBadRequest, fmt.Errorf("no tracks have been setup")) return errRunTerminate } // write response before setting state // otherwise, in case of TCP connections, RTP packets could be sent // before the response - c.conn.WriteResponse(&gortsplib.Response{ - StatusCode: gortsplib.StatusOK, - Header: gortsplib.Header{ + c.conn.WriteResponse(&base.Response{ + StatusCode: base.StatusOK, + Header: base.Header{ "CSeq": cseq, - "Session": gortsplib.HeaderValue{"12345678"}, + "Session": base.HeaderValue{"12345678"}, }, }) return errRunPlay - case gortsplib.RECORD: + case base.RECORD: if c.state != clientStatePreRecord { - c.writeResError(cseq, gortsplib.StatusBadRequest, + c.writeResError(cseq, base.StatusBadRequest, fmt.Errorf("client is in state '%s' instead of '%s'", c.state, clientStatePreRecord)) return errRunTerminate } @@ -773,31 +777,31 @@ func (c *client) handleRequest(req *gortsplib.Request) error { pathName = strings.TrimSuffix(pathName, "/") if pathName != c.path.name { - c.writeResError(cseq, gortsplib.StatusBadRequest, fmt.Errorf("path has changed, was '%s', now is '%s'", c.path.name, pathName)) + c.writeResError(cseq, base.StatusBadRequest, fmt.Errorf("path has changed, was '%s', now is '%s'", c.path.name, pathName)) return errRunTerminate } if len(c.streamTracks) != c.path.publisherTrackCount { - c.writeResError(cseq, gortsplib.StatusBadRequest, fmt.Errorf("not all tracks have been setup")) + c.writeResError(cseq, base.StatusBadRequest, fmt.Errorf("not all tracks have been setup")) return errRunTerminate } - c.conn.WriteResponse(&gortsplib.Response{ - StatusCode: gortsplib.StatusOK, - Header: gortsplib.Header{ + c.conn.WriteResponse(&base.Response{ + StatusCode: base.StatusOK, + Header: base.Header{ "CSeq": cseq, - "Session": gortsplib.HeaderValue{"12345678"}, + "Session": base.HeaderValue{"12345678"}, }, }) return errRunRecord - case gortsplib.TEARDOWN: + case base.TEARDOWN: // close connection silently return errRunTerminate default: - c.writeResError(cseq, gortsplib.StatusBadRequest, fmt.Errorf("unhandled method '%s'", req.Method)) + c.writeResError(cseq, base.StatusBadRequest, fmt.Errorf("unhandled method '%s'", req.Method)) return errRunTerminate } } @@ -853,16 +857,16 @@ func (c *client) runWaitDescription() bool { select { case res := <-c.describe: if res.err != nil { - c.writeResError(c.describeCSeq, gortsplib.StatusNotFound, res.err) + c.writeResError(c.describeCSeq, base.StatusNotFound, res.err) return true } - c.conn.WriteResponse(&gortsplib.Response{ - StatusCode: gortsplib.StatusOK, - Header: gortsplib.Header{ + c.conn.WriteResponse(&base.Response{ + StatusCode: base.StatusOK, + Header: base.Header{ "CSeq": c.describeCSeq, - "Content-Base": gortsplib.HeaderValue{c.describeUrl + "/"}, - "Content-Type": gortsplib.HeaderValue{"application/sdp"}, + "Content-Base": base.HeaderValue{c.describeUrl + "/"}, + "Content-Type": base.HeaderValue{"application/sdp"}, }, Content: res.sdp, }) @@ -956,10 +960,10 @@ func (c *client) runPlayTCP() { } switch recvt := recv.(type) { - case *gortsplib.InterleavedFrame: + case *base.InterleavedFrame: // rtcp feedback is handled by gortsplib - case *gortsplib.Request: + case *base.Request: res := make(chan error) readRequest <- readRequestPair{recvt, res} err := <-res @@ -991,7 +995,7 @@ func (c *client) runPlayTCP() { return case frame := <-c.tcpFrame: - c.conn.WriteFrameTCP(frame) + c.conn.WriteFrameTCP(frame.TrackId, frame.StreamType, frame.Content) case <-c.terminate: go func() { @@ -1007,9 +1011,9 @@ func (c *client) runPlayTCP() { } func (c *client) runRecord() bool { - c.rtcpReceivers = make([]*gortsplib.RtcpReceiver, len(c.streamTracks)) + c.rtcpReceivers = make([]*rtcpreceiver.RtcpReceiver, len(c.streamTracks)) for trackId := range c.streamTracks { - c.rtcpReceivers[trackId] = gortsplib.NewRtcpReceiver() + c.rtcpReceivers[trackId] = rtcpreceiver.New() } if c.streamProtocol == gortsplib.StreamProtocolUDP { @@ -1153,7 +1157,7 @@ func (c *client) runRecordTCP() { } switch recvt := recv.(type) { - case *gortsplib.InterleavedFrame: + case *base.InterleavedFrame: if recvt.TrackId >= len(c.streamTracks) { readDone <- fmt.Errorf("invalid track id '%d'", recvt.TrackId) break @@ -1163,7 +1167,7 @@ func (c *client) runRecordTCP() { c.p.readersMap.forwardFrame(c.path, recvt.TrackId, recvt.StreamType, recvt.Content) - case *gortsplib.Request: + case *base.Request: err := c.handleRequest(recvt) if err != nil { readDone <- err @@ -1194,11 +1198,7 @@ func (c *client) runRecordTCP() { case <-receiverReportTicker.C: for trackId := range c.streamTracks { frame := c.rtcpReceivers[trackId].Report() - c.conn.WriteFrameTCP(&gortsplib.InterleavedFrame{ - TrackId: trackId, - StreamType: gortsplib.StreamTypeRtcp, - Content: frame, - }) + c.conn.WriteFrameTCP(trackId, gortsplib.StreamTypeRtcp, frame) } case <-c.terminate: diff --git a/conf.go b/conf.go index 68bf1e5c..e8b17da3 100644 --- a/conf.go +++ b/conf.go @@ -10,6 +10,7 @@ import ( "time" "github.com/aler9/gortsplib" + "github.com/aler9/gortsplib/headers" "gopkg.in/yaml.v2" ) @@ -44,7 +45,7 @@ type conf struct { ReadTimeout time.Duration `yaml:"readTimeout"` WriteTimeout time.Duration `yaml:"writeTimeout"` AuthMethods []string `yaml:"authMethods"` - authMethodsParsed []gortsplib.AuthMethod `` + authMethodsParsed []headers.AuthMethod `` Metrics bool `yaml:"metrics"` Pprof bool `yaml:"pprof"` LogDestinations []string `yaml:"logDestinations"` @@ -140,10 +141,10 @@ func loadConf(fpath string, stdin io.Reader) (*conf, error) { for _, method := range conf.AuthMethods { switch method { case "basic": - conf.authMethodsParsed = append(conf.authMethodsParsed, gortsplib.Basic) + conf.authMethodsParsed = append(conf.authMethodsParsed, headers.AuthBasic) case "digest": - conf.authMethodsParsed = append(conf.authMethodsParsed, gortsplib.Digest) + conf.authMethodsParsed = append(conf.authMethodsParsed, headers.AuthDigest) default: return nil, fmt.Errorf("unsupported authentication method: %s", method) diff --git a/go.mod b/go.mod index 252e20a3..2bdd23c7 100644 --- a/go.mod +++ b/go.mod @@ -5,9 +5,10 @@ go 1.12 require ( github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 // indirect github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d // indirect - github.com/aler9/gortsplib v0.0.0-20201003114231-8660eaf8974f + github.com/aler9/gortsplib v0.0.0-20201005185315-235585df65a0 github.com/davecgh/go-spew v1.1.1 // indirect github.com/notedit/rtmp v0.0.2 + github.com/pion/rtp v1.6.1 // indirect github.com/stretchr/testify v1.6.1 gopkg.in/alecthomas/kingpin.v2 v2.2.6 gopkg.in/yaml.v2 v2.2.8 diff --git a/go.sum b/go.sum index 27d3c7af..2ccd1671 100644 --- a/go.sum +++ b/go.sum @@ -2,8 +2,8 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 h1:JYp7IbQjafo github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d h1:UQZhZ2O0vMHr2cI+DC1Mbh0TJxzA3RcLoMsFw+aXw7E= github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho= -github.com/aler9/gortsplib v0.0.0-20201003114231-8660eaf8974f h1:orjd0t6Aa7rySwueDyDFlQF89xIQCcZR1cHkCiYM2IA= -github.com/aler9/gortsplib v0.0.0-20201003114231-8660eaf8974f/go.mod h1:QjIqmMY0GHQ2Wan9jIS1amsNn0ncDnufvWD/r5IkDQI= +github.com/aler9/gortsplib v0.0.0-20201005185315-235585df65a0 h1:I5414yPgTl2FtykWQPeKapx+mauDl7imBUwZpVNjHnQ= +github.com/aler9/gortsplib v0.0.0-20201005185315-235585df65a0/go.mod h1:8mpBfMEJIZn2C5fMM6vRYHgGH49WX0EH8gP1SDxv0Uw= github.com/aler9/sdp-dirty/v3 v3.0.0-20200919115950-f1abc664f625 h1:A3upkpYzceQTuBPvVleu1zd6R8jInhg5ifimSO7ku/o= github.com/aler9/sdp-dirty/v3 v3.0.0-20200919115950-f1abc664f625/go.mod h1:5bO/aUQr9m3OasDatNNcVqKAgs7r5hgGXmszWHaC6mI= github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= @@ -19,6 +19,8 @@ github.com/pion/rtcp v1.2.3 h1:2wrhKnqgSz91Q5nzYTO07mQXztYPtxL8a0XOss4rJqA= github.com/pion/rtcp v1.2.3/go.mod h1:zGhIv0RPRF0Z1Wiij22pUt5W/c9fevqSzT4jje/oK7I= github.com/pion/rtp v1.6.0 h1:4Ssnl/T5W2LzxHj9ssYpGVEQh3YYhQFNVmSWO88MMwk= github.com/pion/rtp v1.6.0/go.mod h1:QgfogHsMBVE/RFNno467U/KBqfUywEH+HK+0rtnwsdI= +github.com/pion/rtp v1.6.1 h1:2Y2elcVBrahYnHKN2X7rMHX/r1R4TEBMP1LaVu/wNhk= +github.com/pion/rtp v1.6.1/go.mod h1:bDb5n+BFZxXx0Ea7E5qe+klMuqiBrP+w8XSjiWtCUko= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= diff --git a/main_test.go b/main_test.go index df583ddb..84fe3857 100644 --- a/main_test.go +++ b/main_test.go @@ -130,7 +130,7 @@ func TestPublish(t *testing.T) { default: cnt1, err := newContainer("gstreamer", "source", []string{ - "filesrc location=emptyvideo.ts ! tsdemux ! queue ! video/x-h264 ! rtspclientsink " + + "filesrc location=emptyvideo.ts ! tsdemux ! queue ! video/x-h264 ! h264parse config-interval=1 ! rtspclientsink " + "location=rtsp://" + ownDockerIp + ":8554/teststream protocols=" + conf.publishProto + " latency=0", }) require.NoError(t, err) diff --git a/serverudp.go b/serverudp.go index d2e05b5e..e58fee06 100644 --- a/serverudp.go +++ b/serverudp.go @@ -6,6 +6,7 @@ import ( "time" "github.com/aler9/gortsplib" + "github.com/aler9/gortsplib/multibuffer" ) const ( @@ -21,7 +22,7 @@ type serverUDP struct { p *program pc *net.UDPConn streamType gortsplib.StreamType - readBuf *gortsplib.MultiBuffer + readBuf *multibuffer.MultiBuffer writec chan udpBufAddrPair done chan struct{} @@ -39,7 +40,7 @@ func newServerUDP(p *program, port int, streamType gortsplib.StreamType) (*serve p: p, pc: pc, streamType: streamType, - readBuf: gortsplib.NewMultiBuffer(2, udpReadBufferSize), + readBuf: multibuffer.New(2, udpReadBufferSize), writec: make(chan udpBufAddrPair), done: make(chan struct{}), } diff --git a/sourcertmp.go b/sourcertmp.go index 4da9094d..f92cfcfd 100644 --- a/sourcertmp.go +++ b/sourcertmp.go @@ -2,146 +2,22 @@ package main import ( "fmt" - "math/rand" "net" "sync/atomic" "time" "github.com/aler9/gortsplib" + "github.com/aler9/gortsplib/rtpaac" + "github.com/aler9/gortsplib/rtph264" "github.com/notedit/rtmp/av" "github.com/notedit/rtmp/codec/h264" "github.com/notedit/rtmp/format/rtmp" - "github.com/pion/rtp" ) const ( sourceRtmpRetryInterval = 5 * time.Second - rtpPayloadMaxSize = 1460 // 1500 - ip header - udp header - rtp header ) -type rtpH264Encoder struct { - seqnum uint16 - ssrc uint32 - initialTs uint32 - started time.Duration -} - -func newRtpH264Encoder() *rtpH264Encoder { - return &rtpH264Encoder{ - seqnum: uint16(0), - ssrc: rand.Uint32(), - initialTs: rand.Uint32(), - } -} - -func (e *rtpH264Encoder) Encode(nalus [][]byte, timestamp time.Duration) ([][]byte, error) { - var frames [][]byte - - if e.started == time.Duration(0) { - e.started = timestamp - } - - // rtp/h264 uses a 90khz clock - rtpTs := e.initialTs + uint32((timestamp-e.started).Seconds()*90000) - - for i, nalu := range nalus { - naluFrames, err := e.encodeNalu(nalu, rtpTs, (i == len(nalus)-1)) - if err != nil { - return nil, err - } - frames = append(frames, naluFrames...) - } - - return frames, nil -} - -func (e *rtpH264Encoder) encodeNalu(nalu []byte, rtpTs uint32, isFinal bool) ([][]byte, error) { - // if the NALU fits into the RTP packet, use a single NALU packet - if len(nalu) < rtpPayloadMaxSize { - rpkt := &rtp.Packet{ - Header: rtp.Header{ - Version: 0x02, - PayloadType: 96, - SequenceNumber: e.seqnum, - Timestamp: rtpTs, - SSRC: e.ssrc, - }, - Payload: nalu, - } - e.seqnum++ - - if isFinal { - rpkt.Header.Marker = true - } - - frame, err := rpkt.Marshal() - if err != nil { - return nil, err - } - - return [][]byte{frame}, nil - } - - // otherwise, use fragmentation units - // use only FU-A, not FU-B, since we always use non-interleaved mode - // (set with packetization-mode=1) - - frameCount := (len(nalu) - 1) / (rtpPayloadMaxSize - 2) - lastFrameSize := (len(nalu) - 1) % (rtpPayloadMaxSize - 2) - if lastFrameSize > 0 { - frameCount++ - } - frames := make([][]byte, frameCount) - - nri := (nalu[0] >> 5) & 0x03 - typ := nalu[0] & 0x1F - nalu = nalu[1:] // remove header - - for i := 0; i < frameCount; i++ { - indicator := 0 | (nri << 5) | 28 // FU-A - - start := uint8(0) - if i == 0 { - start = 1 - } - end := uint8(0) - le := rtpPayloadMaxSize - 2 - if i == (len(frames) - 1) { - end = 1 - le = lastFrameSize - } - header := (start << 7) | (end << 6) | typ - - data := append([]byte{indicator, header}, nalu[:le]...) - nalu = nalu[le:] - - rpkt := &rtp.Packet{ - Header: rtp.Header{ - Version: 0x02, - PayloadType: 96, - SequenceNumber: e.seqnum, - Timestamp: rtpTs, - SSRC: e.ssrc, - }, - Payload: data, - } - e.seqnum++ - - if isFinal && i == (len(frames)-1) { - rpkt.Header.Marker = true - } - - frame, err := rpkt.Marshal() - if err != nil { - return nil, err - } - - frames[i] = frame - } - - return frames, nil -} - type sourceRtmpState int const ( @@ -271,31 +147,107 @@ func (s *sourceRtmp) runInnerInner() bool { return true } - // wait for SPS and PPS - sps, pps, err := func() ([]byte, []byte, error) { + // gather video and audio features + var h264Sps []byte + var h264Pps []byte + var aacConfig []byte + confDone := make(chan struct{}) + confClose := uint32(0) + go func() { + defer close(confDone) + for { - pkt, err := conn.ReadPacket() + var pkt av.Packet + pkt, err = conn.ReadPacket() if err != nil { - return nil, nil, err + return } - if pkt.Type == av.H264DecoderConfig { + if atomic.LoadUint32(&confClose) > 0 { + return + } + + switch pkt.Type { + case av.H264DecoderConfig: codec, err := h264.FromDecoderConfig(pkt.Data) if err != nil { panic(err) } - return codec.SPS[0], codec.PPS[0], nil + h264Sps = codec.SPS[0] + h264Pps = codec.PPS[0] + + if aacConfig != nil { + return + } + + case av.AACDecoderConfig: + aacConfig = pkt.Data + + if h264Sps != nil { + return + } } } }() + + timer := time.NewTimer(5 * time.Second) + + select { + case <-confDone: + case <-timer.C: + atomic.StoreUint32(&confClose, 1) + <-confDone + } + if err != nil { s.path.log("rtmp source ERR: %s", err) return true } - track := gortsplib.NewTrackH264(0, sps, pps) - tracks := gortsplib.Tracks{track} + var tracks gortsplib.Tracks + var videoTrack *gortsplib.Track + var audioTrack *gortsplib.Track + var h264Encoder *rtph264.Encoder + var aacEncoder *rtpaac.Encoder + + if h264Sps != nil { + videoTrack, err = gortsplib.NewTrackH264(len(tracks), h264Sps, h264Pps) + if err != nil { + s.path.log("rtmp source ERR: %s", err) + return true + } + + h264Encoder, err = rtph264.NewEncoder(uint8(len(tracks))) + if err != nil { + s.path.log("rtmp source ERR: %s", err) + return true + } + + tracks = append(tracks, videoTrack) + } + + if aacConfig != nil { + audioTrack, err = gortsplib.NewTrackAac(len(tracks), aacConfig) + if err != nil { + s.path.log("rtmp source ERR: %s", err) + return true + } + + aacEncoder, err = rtpaac.NewEncoder(uint8(len(tracks)), aacConfig) + if err != nil { + s.path.log("rtmp source ERR: %s", err) + return true + } + + tracks = append(tracks, audioTrack) + } + + if len(tracks) == 0 { + s.path.log("rtmp source ERR: no tracks found") + return true + } + s.path.publisherSdp = tracks.Write() s.path.publisherTrackCount = len(tracks) @@ -304,7 +256,6 @@ func (s *sourceRtmp) runInnerInner() bool { readDone := make(chan error) go func() { - encoder := newRtpH264Encoder() for { pkt, err := conn.ReadPacket() @@ -313,7 +264,13 @@ func (s *sourceRtmp) runInnerInner() bool { return } - if pkt.Type == av.H264 { + switch pkt.Type { + case av.H264: + if h264Sps == nil { + readDone <- fmt.Errorf("rtmp source ERR: received an H264 frame, but track is not setup up") + return + } + // decode from AVCC format nalus, typ := h264.SplitNALUs(pkt.Data) if typ != h264.NALU_AVCC { @@ -322,15 +279,35 @@ func (s *sourceRtmp) runInnerInner() bool { } // encode into RTP/H264 format - frames, err := encoder.Encode(nalus, pkt.Time) + frames, err := h264Encoder.Write(nalus, pkt.Time) if err != nil { readDone <- err return } for _, f := range frames { - s.p.readersMap.forwardFrame(s.path, 0, gortsplib.StreamTypeRtp, f) + s.p.readersMap.forwardFrame(s.path, videoTrack.Id, gortsplib.StreamTypeRtp, f) } + + case av.AAC: + if aacConfig == nil { + readDone <- fmt.Errorf("rtmp source ERR: received an AAC frame, but track is not setup up") + return + } + + frames, err := aacEncoder.Write(pkt.Data, pkt.Time) + if err != nil { + readDone <- err + return + } + + for _, f := range frames { + s.p.readersMap.forwardFrame(s.path, audioTrack.Id, gortsplib.StreamTypeRtp, f) + } + + default: + readDone <- fmt.Errorf("rtmp source ERR: unexpected packet: %v", pkt.Type) + return } } }() diff --git a/sourcertsp.go b/sourcertsp.go index 42ac6f5e..11f7ae3b 100644 --- a/sourcertsp.go +++ b/sourcertsp.go @@ -174,7 +174,7 @@ func (s *sourceRtsp) runInnerInner() bool { func (s *sourceRtsp) runUDP(conn *gortsplib.ConnClient) bool { for _, track := range s.tracks { - _, err := conn.SetupUDP(s.path.conf.sourceUrl, gortsplib.SetupModePlay, track, 0, 0) + _, err := conn.SetupUDP(s.path.conf.sourceUrl, gortsplib.TransportModePlay, track, 0, 0) if err != nil { conn.Close() s.path.log("rtsp source ERR: %s", err) @@ -195,44 +195,44 @@ func (s *sourceRtsp) runUDP(conn *gortsplib.ConnClient) bool { var wg sync.WaitGroup // receive RTP packets - for _, track := range s.tracks { + for trackId := range s.tracks { wg.Add(1) - go func(track *gortsplib.Track) { + go func(trackId int) { defer wg.Done() for { - buf, err := conn.ReadFrameUDP(track, gortsplib.StreamTypeRtp) + buf, err := conn.ReadFrameUDP(trackId, gortsplib.StreamTypeRtp) if err != nil { break } - s.p.readersMap.forwardFrame(s.path, track.Id, + s.p.readersMap.forwardFrame(s.path, trackId, gortsplib.StreamTypeRtp, buf) } - }(track) + }(trackId) } // receive RTCP packets - for _, track := range s.tracks { + for trackId := range s.tracks { wg.Add(1) - go func(track *gortsplib.Track) { + go func(trackId int) { defer wg.Done() for { - buf, err := conn.ReadFrameUDP(track, gortsplib.StreamTypeRtcp) + buf, err := conn.ReadFrameUDP(trackId, gortsplib.StreamTypeRtcp) if err != nil { break } - s.p.readersMap.forwardFrame(s.path, track.Id, + s.p.readersMap.forwardFrame(s.path, trackId, gortsplib.StreamTypeRtcp, buf) } - }(track) + }(trackId) } tcpConnDone := make(chan error) go func() { - tcpConnDone <- conn.LoopUDP(s.path.conf.sourceUrl) + tcpConnDone <- conn.LoopUDP() }() var ret bool @@ -264,7 +264,7 @@ outer: func (s *sourceRtsp) runTCP(conn *gortsplib.ConnClient) bool { for _, track := range s.tracks { - _, err := conn.SetupTCP(s.path.conf.sourceUrl, gortsplib.SetupModePlay, track) + _, err := conn.SetupTCP(s.path.conf.sourceUrl, gortsplib.TransportModePlay, track) if err != nil { conn.Close() s.path.log("rtsp source ERR: %s", err) @@ -285,13 +285,13 @@ func (s *sourceRtsp) runTCP(conn *gortsplib.ConnClient) bool { tcpConnDone := make(chan error) go func() { for { - frame, err := conn.ReadFrameTCP() + trackId, streamType, content, err := conn.ReadFrameTCP() if err != nil { tcpConnDone <- err return } - s.p.readersMap.forwardFrame(s.path, frame.TrackId, frame.StreamType, frame.Content) + s.p.readersMap.forwardFrame(s.path, trackId, streamType, content) } }() diff --git a/utils.go b/utils.go index 25639894..2b0e5b64 100644 --- a/utils.go +++ b/utils.go @@ -8,6 +8,7 @@ import ( "sync" "github.com/aler9/gortsplib" + "github.com/aler9/gortsplib/base" ) func parseIpCidrList(in []string) ([]interface{}, error) { @@ -241,7 +242,7 @@ func (m *readersMap) forwardFrame(path *path, trackId int, streamType gortsplib. } } else { - c.tcpFrame <- &gortsplib.InterleavedFrame{ + c.tcpFrame <- &base.InterleavedFrame{ TrackId: trackId, StreamType: streamType, Content: frame,