update gortsplib

This commit is contained in:
aler9 2020-07-19 17:54:31 +02:00
parent 406b4f391b
commit da5e001206
6 changed files with 78 additions and 92 deletions

View file

@ -28,7 +28,7 @@ type ConfPath struct {
type conf struct {
Protocols []string `yaml:"protocols"`
protocolsParsed map[streamProtocol]struct{}
protocolsParsed map[gortsplib.StreamProtocol]struct{}
RtspPort int `yaml:"rtspPort"`
RtpPort int `yaml:"rtpPort"`
RtcpPort int `yaml:"rtcpPort"`
@ -83,14 +83,14 @@ func loadConf(fpath string, stdin io.Reader) (*conf, error) {
if len(conf.Protocols) == 0 {
conf.Protocols = []string{"udp", "tcp"}
}
conf.protocolsParsed = make(map[streamProtocol]struct{})
conf.protocolsParsed = make(map[gortsplib.StreamProtocol]struct{})
for _, proto := range conf.Protocols {
switch proto {
case "udp":
conf.protocolsParsed[streamProtocolUdp] = struct{}{}
conf.protocolsParsed[gortsplib.StreamProtocolUdp] = struct{}{}
case "tcp":
conf.protocolsParsed[streamProtocolTcp] = struct{}{}
conf.protocolsParsed[gortsplib.StreamProtocolTcp] = struct{}{}
default:
return nil, fmt.Errorf("unsupported protocol: %s", proto)

2
go.mod
View file

@ -5,7 +5,7 @@ go 1.13
require (
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 // indirect
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d // indirect
github.com/aler9/gortsplib v0.0.0-20200719094715-6806ec79c031
github.com/aler9/gortsplib v0.0.0-20200719162513-a119764b9644
github.com/aler9/sdp/v3 v3.0.0-20200719093237-2c3d108a7436
github.com/stretchr/testify v1.6.1
gopkg.in/alecthomas/kingpin.v2 v2.2.6

4
go.sum
View file

@ -2,8 +2,8 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 h1:JYp7IbQjafo
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d h1:UQZhZ2O0vMHr2cI+DC1Mbh0TJxzA3RcLoMsFw+aXw7E=
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho=
github.com/aler9/gortsplib v0.0.0-20200719094715-6806ec79c031 h1:Kb+H1mkbmzbAIcX0++A8kHwdhpQiNe6reFcWNUATcVk=
github.com/aler9/gortsplib v0.0.0-20200719094715-6806ec79c031/go.mod h1:kBMvjIdOHRjLdV+oT28JD72JUPpJuwxOc9u72GG8GpY=
github.com/aler9/gortsplib v0.0.0-20200719162513-a119764b9644 h1:14g114ATdvGCrnKOHRLklmPwtchF2LQAjdyORVBEzoQ=
github.com/aler9/gortsplib v0.0.0-20200719162513-a119764b9644/go.mod h1:kBMvjIdOHRjLdV+oT28JD72JUPpJuwxOc9u72GG8GpY=
github.com/aler9/sdp/v3 v3.0.0-20200719093237-2c3d108a7436 h1:W0iNErWKvSAyJBNVx+qQoyFrWOFVgS6f/WEME/D3EZc=
github.com/aler9/sdp/v3 v3.0.0-20200719093237-2c3d108a7436/go.mod h1:OnlEK3QI7YtM+ShZWtGajmOHLZ3bjU80AcIS5e34i1U=
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=

33
main.go
View file

@ -16,25 +16,6 @@ import (
var Version = "v0.0.0"
type track struct {
rtpPort int
rtcpPort int
}
type streamProtocol int
const (
streamProtocolUdp streamProtocol = iota
streamProtocolTcp
)
func (s streamProtocol) String() string {
if s == streamProtocolUdp {
return "udp"
}
return "tcp"
}
type programEvent interface {
isProgramEvent()
}
@ -71,7 +52,7 @@ type programEventClientSetupPlay struct {
res chan error
client *serverClient
path string
protocol streamProtocol
protocol gortsplib.StreamProtocol
rtpPort int
rtcpPort int
}
@ -81,7 +62,7 @@ func (programEventClientSetupPlay) isProgramEvent() {}
type programEventClientSetupRecord struct {
res chan error
client *serverClient
protocol streamProtocol
protocol gortsplib.StreamProtocol
rtpPort int
rtcpPort int
}
@ -335,7 +316,7 @@ outer:
evt.client.path = evt.path
evt.client.streamProtocol = evt.protocol
evt.client.streamTracks = append(evt.client.streamTracks, &track{
evt.client.streamTracks = append(evt.client.streamTracks, &serverClientTrack{
rtpPort: evt.rtpPort,
rtcpPort: evt.rtcpPort,
})
@ -344,7 +325,7 @@ outer:
case programEventClientSetupRecord:
evt.client.streamProtocol = evt.protocol
evt.client.streamTracks = append(evt.client.streamTracks, &track{
evt.client.streamTracks = append(evt.client.streamTracks, &serverClientTrack{
rtpPort: evt.rtpPort,
rtcpPort: evt.rtcpPort,
})
@ -401,7 +382,7 @@ outer:
continue
}
client.RtcpReceivers[trackId].OnFrame(evt.streamType, evt.buf)
client.rtcpReceivers[trackId].OnFrame(evt.streamType, evt.buf)
p.forwardFrame(client.path, trackId, evt.streamType, evt.buf)
case programEventClientFrameTcp:
@ -496,7 +477,7 @@ func (p *program) findPublisher(addr *net.UDPAddr, streamType gortsplib.StreamTy
continue
}
if cl.streamProtocol != streamProtocolUdp ||
if cl.streamProtocol != gortsplib.StreamProtocolUdp ||
cl.state != clientStateRecord ||
!cl.ip().Equal(addr.IP) {
continue
@ -520,7 +501,7 @@ func (p *program) findPublisher(addr *net.UDPAddr, streamType gortsplib.StreamTy
func (p *program) forwardFrame(path string, trackId int, streamType gortsplib.StreamType, frame []byte) {
for client := range p.clients {
if client.path == path && client.state == clientStatePlay {
if client.streamProtocol == streamProtocolUdp {
if client.streamProtocol == gortsplib.StreamProtocolUdp {
if streamType == gortsplib.StreamTypeRtp {
p.rtpl.write(&udpAddrBufPair{
addr: &net.UDPAddr{

View file

@ -19,6 +19,11 @@ const (
clientReceiverReportInterval = 10 * time.Second
)
type serverClientTrack struct {
rtpPort int
rtcpPort int
}
type serverClientEvent interface {
isServerClientEvent()
}
@ -74,13 +79,13 @@ type serverClient struct {
authFailures int
streamSdpText []byte // only if publisher
streamSdpParsed *sdp.SessionDescription // only if publisher
streamProtocol streamProtocol
streamTracks []*track
RtcpReceivers []*gortsplib.RtcpReceiver
streamProtocol gortsplib.StreamProtocol
streamTracks []*serverClientTrack
rtcpReceivers []*gortsplib.RtcpReceiver
readBuf *doubleBuffer
writeBuf *doubleBuffer
events chan serverClientEvent // only if state = Play and streamProtocol = TCP
events chan serverClientEvent // only if state = Play and gortsplib.StreamProtocol = TCP
done chan struct{}
}
@ -486,7 +491,7 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) bool {
}
return false
}() {
if _, ok := c.p.conf.protocolsParsed[streamProtocolUdp]; !ok {
if _, ok := c.p.conf.protocolsParsed[gortsplib.StreamProtocolUdp]; !ok {
c.writeResError(req, gortsplib.StatusUnsupportedTransport, fmt.Errorf("UDP streaming is disabled"))
return false
}
@ -502,13 +507,13 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) bool {
return false
}
if len(c.streamTracks) > 0 && c.streamProtocol != streamProtocolUdp {
if len(c.streamTracks) > 0 && c.streamProtocol != gortsplib.StreamProtocolUdp {
c.writeResError(req, gortsplib.StatusBadRequest, fmt.Errorf("can't receive tracks with different protocols"))
return false
}
res := make(chan error)
c.p.events <- programEventClientSetupPlay{res, c, path, streamProtocolUdp, rtpPort, rtcpPort}
c.p.events <- programEventClientSetupPlay{res, c, path, gortsplib.StreamProtocolUdp, rtpPort, rtcpPort}
err = <-res
if err != nil {
c.writeResError(req, gortsplib.StatusBadRequest, err)
@ -532,7 +537,7 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) bool {
// play via TCP
} else if _, ok := th["RTP/AVP/TCP"]; ok {
if _, ok := c.p.conf.protocolsParsed[streamProtocolTcp]; !ok {
if _, ok := c.p.conf.protocolsParsed[gortsplib.StreamProtocolTcp]; !ok {
c.writeResError(req, gortsplib.StatusUnsupportedTransport, fmt.Errorf("TCP streaming is disabled"))
return false
}
@ -542,13 +547,13 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) bool {
return false
}
if len(c.streamTracks) > 0 && c.streamProtocol != streamProtocolTcp {
if len(c.streamTracks) > 0 && c.streamProtocol != gortsplib.StreamProtocolTcp {
c.writeResError(req, gortsplib.StatusBadRequest, fmt.Errorf("can't receive tracks with different protocols"))
return false
}
res := make(chan error)
c.p.events <- programEventClientSetupPlay{res, c, path, streamProtocolTcp, 0, 0}
c.p.events <- programEventClientSetupPlay{res, c, path, gortsplib.StreamProtocolTcp, 0, 0}
err = <-res
if err != nil {
c.writeResError(req, gortsplib.StatusBadRequest, err)
@ -601,7 +606,7 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) bool {
}
return false
}() {
if _, ok := c.p.conf.protocolsParsed[streamProtocolUdp]; !ok {
if _, ok := c.p.conf.protocolsParsed[gortsplib.StreamProtocolUdp]; !ok {
c.writeResError(req, gortsplib.StatusUnsupportedTransport, fmt.Errorf("UDP streaming is disabled"))
return false
}
@ -612,7 +617,7 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) bool {
return false
}
if len(c.streamTracks) > 0 && c.streamProtocol != streamProtocolUdp {
if len(c.streamTracks) > 0 && c.streamProtocol != gortsplib.StreamProtocolUdp {
c.writeResError(req, gortsplib.StatusBadRequest, fmt.Errorf("can't publish tracks with different protocols"))
return false
}
@ -623,7 +628,7 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) bool {
}
res := make(chan error)
c.p.events <- programEventClientSetupRecord{res, c, streamProtocolUdp, rtpPort, rtcpPort}
c.p.events <- programEventClientSetupRecord{res, c, gortsplib.StreamProtocolUdp, rtpPort, rtcpPort}
err := <-res
if err != nil {
c.writeResError(req, gortsplib.StatusBadRequest, err)
@ -647,12 +652,12 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) bool {
// record via TCP
} else if _, ok := th["RTP/AVP/TCP"]; ok {
if _, ok := c.p.conf.protocolsParsed[streamProtocolTcp]; !ok {
if _, ok := c.p.conf.protocolsParsed[gortsplib.StreamProtocolTcp]; !ok {
c.writeResError(req, gortsplib.StatusUnsupportedTransport, fmt.Errorf("TCP streaming is disabled"))
return false
}
if len(c.streamTracks) > 0 && c.streamProtocol != streamProtocolTcp {
if len(c.streamTracks) > 0 && c.streamProtocol != gortsplib.StreamProtocolTcp {
c.writeResError(req, gortsplib.StatusBadRequest, fmt.Errorf("can't publish tracks with different protocols"))
return false
}
@ -675,7 +680,7 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) bool {
}
res := make(chan error)
c.p.events <- programEventClientSetupRecord{res, c, streamProtocolTcp, 0, 0}
c.p.events <- programEventClientSetupRecord{res, c, gortsplib.StreamProtocolTcp, 0, 0}
err := <-res
if err != nil {
c.writeResError(req, gortsplib.StatusBadRequest, err)
@ -782,7 +787,7 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) bool {
func (c *serverClient) runPlay(path string) {
pconf := c.findConfForPath(path)
if c.streamProtocol == streamProtocolTcp {
if c.streamProtocol == gortsplib.StreamProtocolTcp {
c.writeBuf = newDoubleBuffer(2048)
c.events = make(chan serverClientEvent)
}
@ -809,7 +814,7 @@ func (c *serverClient) runPlay(path string) {
}
}
if c.streamProtocol == streamProtocolTcp {
if c.streamProtocol == gortsplib.StreamProtocolTcp {
readDone := make(chan error)
go func() {
buf := make([]byte, 2048)
@ -880,9 +885,9 @@ func (c *serverClient) runPlay(path string) {
func (c *serverClient) runRecord(path string) {
pconf := c.findConfForPath(path)
c.RtcpReceivers = make([]*gortsplib.RtcpReceiver, len(c.streamTracks))
c.rtcpReceivers = make([]*gortsplib.RtcpReceiver, len(c.streamTracks))
for trackId := range c.streamTracks {
c.RtcpReceivers[trackId] = gortsplib.NewRtcpReceiver()
c.rtcpReceivers[trackId] = gortsplib.NewRtcpReceiver()
}
done := make(chan struct{})
@ -907,7 +912,7 @@ func (c *serverClient) runRecord(path string) {
}
}
if c.streamProtocol == streamProtocolTcp {
if c.streamProtocol == gortsplib.StreamProtocolTcp {
frame := &gortsplib.InterleavedFrame{}
readDone := make(chan error)
@ -929,7 +934,7 @@ func (c *serverClient) runRecord(path string) {
break
}
c.RtcpReceivers[frame.TrackId].OnFrame(frame.StreamType, frame.Content)
c.rtcpReceivers[frame.TrackId].OnFrame(frame.StreamType, frame.Content)
c.p.events <- programEventClientFrameTcp{
c.path,
frame.TrackId,
@ -961,7 +966,7 @@ func (c *serverClient) runRecord(path string) {
case <-checkStreamTicker.C:
for trackId := range c.streamTracks {
if time.Since(c.RtcpReceivers[trackId].LastFrameTime()) >= c.p.conf.StreamDeadAfter {
if time.Since(c.rtcpReceivers[trackId].LastFrameTime()) >= c.p.conf.StreamDeadAfter {
c.log("ERR: stream is dead")
c.conn.NetConn().Close()
<-readDone
@ -971,7 +976,7 @@ func (c *serverClient) runRecord(path string) {
case <-receiverReportTicker.C:
for trackId := range c.streamTracks {
frame := c.RtcpReceivers[trackId].Report()
frame := c.rtcpReceivers[trackId].Report()
c.conn.WriteFrame(&gortsplib.InterleavedFrame{
TrackId: trackId,
StreamType: gortsplib.StreamTypeRtcp,
@ -1016,7 +1021,7 @@ func (c *serverClient) runRecord(path string) {
case <-checkStreamTicker.C:
for trackId := range c.streamTracks {
if time.Since(c.RtcpReceivers[trackId].LastFrameTime()) >= c.p.conf.StreamDeadAfter {
if time.Since(c.rtcpReceivers[trackId].LastFrameTime()) >= c.p.conf.StreamDeadAfter {
c.log("ERR: stream is dead")
c.conn.NetConn().Close()
<-readDone
@ -1026,7 +1031,7 @@ func (c *serverClient) runRecord(path string) {
case <-receiverReportTicker.C:
for trackId := range c.streamTracks {
frame := c.RtcpReceivers[trackId].Report()
frame := c.rtcpReceivers[trackId].Report()
c.p.rtcpl.writeChan <- &udpAddrBufPair{
addr: &net.UDPAddr{
IP: c.ip(),
@ -1048,7 +1053,7 @@ func (c *serverClient) runRecord(path string) {
<-done
for trackId := range c.streamTracks {
c.RtcpReceivers[trackId].Close()
c.rtcpReceivers[trackId].Close()
}
if runOnPublishCmd != nil {

View file

@ -27,9 +27,9 @@ type source struct {
p *program
path string
u *url.URL
proto streamProtocol
proto gortsplib.StreamProtocol
ready bool
clientTracks []*gortsplib.Track
tracks []*gortsplib.Track
serverSdpText []byte
serverSdpParsed *sdp.SessionDescription
rtcpReceivers []*gortsplib.RtcpReceiver
@ -59,15 +59,15 @@ func newSource(p *program, path string, sourceStr string, sourceProtocol string)
}
}
proto, err := func() (streamProtocol, error) {
proto, err := func() (gortsplib.StreamProtocol, error) {
switch sourceProtocol {
case "udp":
return streamProtocolUdp, nil
return gortsplib.StreamProtocolUdp, nil
case "tcp":
return streamProtocolTcp, nil
return gortsplib.StreamProtocolTcp, nil
}
return streamProtocol(0), fmt.Errorf("unsupported protocol '%s'", sourceProtocol)
return gortsplib.StreamProtocol(0), fmt.Errorf("unsupported protocol '%s'", sourceProtocol)
}()
if err != nil {
return nil, err
@ -155,20 +155,20 @@ func (s *source) do() bool {
return true
}
clientTracks, _, err := conn.Describe(s.u)
tracks, _, err := conn.Describe(s.u)
if err != nil {
s.log("ERR: %s", err)
return true
}
// create a filtered SDP that is used by the server (not by the client)
serverSdpParsed, serverSdpText := sdpForServer(clientTracks)
serverSdpParsed, serverSdpText := sdpForServer(tracks)
s.clientTracks = clientTracks
s.tracks = tracks
s.serverSdpText = serverSdpText
s.serverSdpParsed = serverSdpParsed
if s.proto == streamProtocolUdp {
if s.proto == gortsplib.StreamProtocolUdp {
return s.runUdp(conn)
} else {
return s.runTcp(conn)
@ -187,7 +187,7 @@ func (s *source) runUdp(conn *gortsplib.ConnClient) bool {
}
}()
for i, track := range s.clientTracks {
for i, track := range s.tracks {
var rtpPort int
var rtcpPort int
var rtpl *sourceUdpListener
@ -240,8 +240,8 @@ func (s *source) runUdp(conn *gortsplib.ConnClient) bool {
return true
}
s.rtcpReceivers = make([]*gortsplib.RtcpReceiver, len(s.clientTracks))
for trackId := range s.clientTracks {
s.rtcpReceivers = make([]*gortsplib.RtcpReceiver, len(s.tracks))
for trackId := range s.tracks {
s.rtcpReceivers[trackId] = gortsplib.NewRtcpReceiver()
}
@ -250,12 +250,16 @@ func (s *source) runUdp(conn *gortsplib.ConnClient) bool {
pair.rtcpl.start()
}
sendKeepaliveTicker := time.NewTicker(sourceKeepaliveInterval)
s.p.events <- programEventStreamerReady{s}
tcpConnDone := make(chan error, 1)
go func() {
tcpConnDone <- conn.LoopUDP(s.u)
}()
checkStreamTicker := time.NewTicker(sourceCheckStreamInterval)
receiverReportTicker := time.NewTicker(sourceReceiverReportInterval)
s.p.events <- programEventStreamerReady{s}
var ret bool
outer:
@ -265,16 +269,13 @@ outer:
ret = false
break outer
case <-sendKeepaliveTicker.C:
_, err := conn.Options(s.u)
if err != nil {
s.log("ERR: %s", err)
ret = true
break outer
}
case err := <-tcpConnDone:
s.log("ERR: %s", err)
ret = true
break outer
case <-checkStreamTicker.C:
for trackId := range s.clientTracks {
for trackId := range s.tracks {
if time.Since(s.rtcpReceivers[trackId].LastFrameTime()) >= s.p.conf.StreamDeadAfter {
s.log("ERR: stream is dead")
ret = true
@ -283,7 +284,7 @@ outer:
}
case <-receiverReportTicker.C:
for trackId := range s.clientTracks {
for trackId := range s.tracks {
frame := s.rtcpReceivers[trackId].Report()
sourceUdpListenerPairs[trackId].rtcpl.writeChan <- &udpAddrBufPair{
addr: &net.UDPAddr{
@ -297,7 +298,6 @@ outer:
}
}
sendKeepaliveTicker.Stop()
checkStreamTicker.Stop()
receiverReportTicker.Stop()
@ -308,7 +308,7 @@ outer:
pair.rtcpl.stop()
}
for trackId := range s.clientTracks {
for trackId := range s.tracks {
s.rtcpReceivers[trackId].Close()
}
@ -316,7 +316,7 @@ outer:
}
func (s *source) runTcp(conn *gortsplib.ConnClient) bool {
for _, track := range s.clientTracks {
for _, track := range s.tracks {
_, err := conn.SetupTcp(s.u, track)
if err != nil {
s.log("ERR: %s", err)
@ -330,8 +330,8 @@ func (s *source) runTcp(conn *gortsplib.ConnClient) bool {
return true
}
s.rtcpReceivers = make([]*gortsplib.RtcpReceiver, len(s.clientTracks))
for trackId := range s.clientTracks {
s.rtcpReceivers = make([]*gortsplib.RtcpReceiver, len(s.tracks))
for trackId := range s.tracks {
s.rtcpReceivers[trackId] = gortsplib.NewRtcpReceiver()
}
@ -375,7 +375,7 @@ outer:
break outer
case <-receiverReportTicker.C:
for trackId := range s.clientTracks {
for trackId := range s.tracks {
frame := s.rtcpReceivers[trackId].Report()
conn.WriteFrame(&gortsplib.InterleavedFrame{
@ -391,7 +391,7 @@ outer:
s.p.events <- programEventStreamerNotReady{s}
for trackId := range s.clientTracks {
for trackId := range s.tracks {
s.rtcpReceivers[trackId].Close()
}