From f5936d67edaa7410e69e2b56b084821a5b758f3a Mon Sep 17 00:00:00 2001 From: aler9 <46489434+aler9@users.noreply.github.com> Date: Sat, 19 Sep 2020 00:08:05 +0200 Subject: [PATCH] drastically improve performance when publishing or proxying streams --- client.go | 56 +++------------------- main.go | 92 +++++------------------------------ server-udp.go | 23 +++++++-- source.go | 15 ++---- utils.go | 130 ++++++++++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 172 insertions(+), 144 deletions(-) diff --git a/client.go b/client.go index 957220d7..ac4910b2 100644 --- a/client.go +++ b/client.go @@ -44,50 +44,11 @@ type clientSetupPlayReq struct { trackId int } -type clientFrameUDPReq struct { - addr *net.UDPAddr - streamType gortsplib.StreamType - buf []byte -} - -type clientFrameTCPReq struct { - path *path - trackId int - streamType gortsplib.StreamType - buf []byte -} - type readRequestPair struct { req *gortsplib.Request res chan error } -type udpClient struct { - client *client - trackId int - streamType gortsplib.StreamType -} - -type udpClientAddr struct { - ip [net.IPv6len]byte // use a fixed-size array to enable the equality operator - port int -} - -func makeUDPClientAddr(ip net.IP, port int) udpClientAddr { - ret := udpClientAddr{ - port: port, - } - - if len(ip) == net.IPv4len { - copy(ret.ip[0:], []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0xff, 0xff}) // v4InV6Prefix - copy(ret.ip[12:], ip) - } else { - copy(ret.ip[:], ip) - } - - return ret -} - type clientTrack struct { rtpPort int rtcpPort int @@ -182,17 +143,18 @@ func (c *client) close() { switch c.state { case clientStatePlay: atomic.AddInt64(&c.p.countReader, -1) + c.p.readersMap.remove(c) case clientStateRecord: atomic.AddInt64(&c.p.countPublisher, -1) if c.streamProtocol == gortsplib.StreamProtocolUDP { for _, track := range c.streamTracks { - key := makeUDPClientAddr(c.ip(), track.rtpPort) - delete(c.p.udpClientsByAddr, key) + addr := makeUDPPublisherAddr(c.ip(), track.rtpPort) + c.p.udpPublishersMap.remove(addr) - key = makeUDPClientAddr(c.ip(), track.rtcpPort) - delete(c.p.udpClientsByAddr, key) + addr = makeUDPPublisherAddr(c.ip(), track.rtcpPort) + c.p.udpPublishersMap.remove(addr) } } @@ -1225,12 +1187,8 @@ func (c *client) runRecordTCP() { } c.rtcpReceivers[frame.TrackId].OnFrame(frame.StreamType, frame.Content) - c.p.clientFrameTCP <- clientFrameTCPReq{ - c.path, - frame.TrackId, - frame.StreamType, - frame.Content, - } + + c.p.readersMap.forwardFrame(c.path, frame.TrackId, frame.StreamType, frame.Content) case *gortsplib.Request: err := c.handleRequest(recvt) diff --git a/main.go b/main.go index 2cdb8184..4c567673 100644 --- a/main.go +++ b/main.go @@ -42,7 +42,8 @@ type program struct { serverRtcp *serverUDP serverRtsp *serverTCP clients map[*client]struct{} - udpClientsByAddr map[udpClientAddr]*udpClient + udpPublishersMap *udpPublishersMap + readersMap *readersMap countClient int64 countPublisher int64 countReader int64 @@ -55,11 +56,8 @@ type program struct { clientSetupPlay chan clientSetupPlayReq clientPlay chan *client clientRecord chan *client - clientFrameUDP chan clientFrameUDPReq - clientFrameTCP chan clientFrameTCPReq sourceReady chan *source sourceNotReady chan *source - sourceFrame chan sourceFrameReq terminate chan struct{} done chan struct{} } @@ -87,7 +85,8 @@ func newProgram(args []string, stdin io.Reader) (*program, error) { conf: conf, paths: make(map[string]*path), clients: make(map[*client]struct{}), - udpClientsByAddr: make(map[udpClientAddr]*udpClient), + udpPublishersMap: newUdpPublisherMap(), + readersMap: newReadersMap(), metricsGather: make(chan metricsGatherReq), clientNew: make(chan net.Conn), clientClose: make(chan *client), @@ -96,11 +95,8 @@ func newProgram(args []string, stdin io.Reader) (*program, error) { clientSetupPlay: make(chan clientSetupPlayReq), clientPlay: make(chan *client), clientRecord: make(chan *client), - clientFrameUDP: make(chan clientFrameUDPReq), - clientFrameTCP: make(chan clientFrameTCPReq), sourceReady: make(chan *source), sourceNotReady: make(chan *source), - sourceFrame: make(chan sourceFrameReq), terminate: make(chan struct{}), done: make(chan struct{}), } @@ -280,6 +276,7 @@ outer: case client := <-p.clientPlay: atomic.AddInt64(&p.countReader, 1) client.state = clientStatePlay + p.readersMap.add(client) case client := <-p.clientRecord: atomic.AddInt64(&p.countPublisher, 1) @@ -287,44 +284,24 @@ outer: if client.streamProtocol == gortsplib.StreamProtocolUDP { for trackId, track := range client.streamTracks { - key := makeUDPClientAddr(client.ip(), track.rtpPort) - p.udpClientsByAddr[key] = &udpClient{ + addr := makeUDPPublisherAddr(client.ip(), track.rtpPort) + p.udpPublishersMap.add(addr, &udpPublisher{ client: client, trackId: trackId, streamType: gortsplib.StreamTypeRtp, - } + }) - key = makeUDPClientAddr(client.ip(), track.rtcpPort) - p.udpClientsByAddr[key] = &udpClient{ + addr = makeUDPPublisherAddr(client.ip(), track.rtcpPort) + p.udpPublishersMap.add(addr, &udpPublisher{ client: client, trackId: trackId, streamType: gortsplib.StreamTypeRtcp, - } + }) } } client.path.onPublisherSetReady() - case req := <-p.clientFrameUDP: - pub, ok := p.udpClientsByAddr[makeUDPClientAddr(req.addr.IP, req.addr.Port)] - if !ok { - continue - } - - // client sent RTP on RTCP port or vice-versa - if pub.streamType != req.streamType { - continue - } - - atomic.StoreInt64(pub.client.udpLastFrameTimes[pub.trackId], time.Now().Unix()) - - pub.client.rtcpReceivers[pub.trackId].OnFrame(req.streamType, req.buf) - - p.forwardFrame(pub.client.path, pub.trackId, req.streamType, req.buf) - - case req := <-p.clientFrameTCP: - p.forwardFrame(req.path, req.trackId, req.streamType, req.buf) - case source := <-p.sourceReady: source.path.log("source ready") source.path.onPublisherSetReady() @@ -333,9 +310,6 @@ outer: source.path.log("source not ready") source.path.onPublisherSetNotReady() - case req := <-p.sourceFrame: - p.forwardFrame(req.source.path, req.trackId, req.streamType, req.buf) - case <-p.terminate: break outer } @@ -362,11 +336,8 @@ outer: case <-p.clientPlay: case <-p.clientRecord: - case <-p.clientFrameUDP: - case <-p.clientFrameTCP: case <-p.sourceReady: case <-p.sourceNotReady: - case <-p.sourceFrame: } } }() @@ -410,11 +381,8 @@ outer: close(p.clientSetupPlay) close(p.clientPlay) close(p.clientRecord) - close(p.clientFrameUDP) - close(p.clientFrameTCP) close(p.sourceReady) close(p.sourceNotReady) - close(p.sourceFrame) close(p.done) } @@ -435,44 +403,6 @@ func (p *program) findConfForPathName(name string) *confPath { return nil } -func (p *program) forwardFrame(path *path, trackId int, streamType gortsplib.StreamType, frame []byte) { - for c := range p.clients { - if c.path != path || - c.state != clientStatePlay { - continue - } - - track, ok := c.streamTracks[trackId] - if !ok { - continue - } - - if c.streamProtocol == gortsplib.StreamProtocolUDP { - if streamType == gortsplib.StreamTypeRtp { - p.serverRtp.write(frame, &net.UDPAddr{ - IP: c.ip(), - Zone: c.zone(), - Port: track.rtpPort, - }) - - } else { - p.serverRtcp.write(frame, &net.UDPAddr{ - IP: c.ip(), - Zone: c.zone(), - Port: track.rtcpPort, - }) - } - - } else { - c.tcpFrame <- &gortsplib.InterleavedFrame{ - TrackId: trackId, - StreamType: streamType, - Content: frame, - } - } - } -} - func main() { _, err := newProgram(os.Args[1:], os.Stdin) if err != nil { diff --git a/server-udp.go b/server-udp.go index a5cf7066..4b43e711 100644 --- a/server-udp.go +++ b/server-udp.go @@ -2,6 +2,7 @@ package main import ( "net" + "sync/atomic" "time" "github.com/aler9/gortsplib" @@ -70,11 +71,25 @@ func (l *serverUDP) run() { break } - l.p.clientFrameUDP <- clientFrameUDPReq{ - addr, - l.streamType, - buf[:n], + pub := l.p.udpPublishersMap.get(makeUDPPublisherAddr(addr.IP, addr.Port)) + if pub == nil { + continue } + + // client sent RTP on RTCP port or vice-versa + if pub.streamType != l.streamType { + continue + } + + atomic.StoreInt64(pub.client.udpLastFrameTimes[pub.trackId], time.Now().Unix()) + + pub.client.rtcpReceivers[pub.trackId].OnFrame(l.streamType, buf[:n]) + + l.p.readersMap.forwardFrame(pub.client.path, + pub.trackId, + l.streamType, + buf[:n]) + } close(l.writec) diff --git a/source.go b/source.go index baf2dfea..719a5b8b 100644 --- a/source.go +++ b/source.go @@ -14,13 +14,6 @@ const ( sourceTCPReadBufferSize = 128 * 1024 ) -type sourceFrameReq struct { - source *source - trackId int - streamType gortsplib.StreamType - buf []byte -} - type sourceState int const ( @@ -241,7 +234,8 @@ func (s *source) runUDP(conn *gortsplib.ConnClient) bool { break } - s.p.sourceFrame <- sourceFrameReq{s, trackId, gortsplib.StreamTypeRtp, buf[:n]} + s.p.readersMap.forwardFrame(s.path, trackId, + gortsplib.StreamTypeRtp, buf[:n]) } }(trackId, rtpRead) } @@ -261,7 +255,8 @@ func (s *source) runUDP(conn *gortsplib.ConnClient) bool { break } - s.p.sourceFrame <- sourceFrameReq{s, trackId, gortsplib.StreamTypeRtcp, buf[:n]} + s.p.readersMap.forwardFrame(s.path, trackId, + gortsplib.StreamTypeRtcp, buf[:n]) } }(trackId, rtcpRead) } @@ -331,7 +326,7 @@ func (s *source) runTCP(conn *gortsplib.ConnClient) bool { return } - s.p.sourceFrame <- sourceFrameReq{s, frame.TrackId, frame.StreamType, frame.Content} + s.p.readersMap.forwardFrame(s.path, frame.TrackId, frame.StreamType, frame.Content) } }() diff --git a/utils.go b/utils.go index 0db857ed..bc15bbe5 100644 --- a/utils.go +++ b/utils.go @@ -8,6 +8,9 @@ import ( "regexp" "runtime" "strings" + "sync" + + "github.com/aler9/gortsplib" ) func parseIpCidrList(in []string) ([]interface{}, error) { @@ -170,3 +173,130 @@ func isBindError(err error) bool { } return false } + +type udpPublisherAddr struct { + ip [net.IPv6len]byte // use a fixed-size array to enable the equality operator + port int +} + +func makeUDPPublisherAddr(ip net.IP, port int) udpPublisherAddr { + ret := udpPublisherAddr{ + port: port, + } + + if len(ip) == net.IPv4len { + copy(ret.ip[0:], []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0xff, 0xff}) // v4InV6Prefix + copy(ret.ip[12:], ip) + } else { + copy(ret.ip[:], ip) + } + + return ret +} + +type udpPublisher struct { + client *client + trackId int + streamType gortsplib.StreamType +} + +type udpPublishersMap struct { + mutex sync.RWMutex + ma map[udpPublisherAddr]*udpPublisher +} + +func newUdpPublisherMap() *udpPublishersMap { + return &udpPublishersMap{ + ma: make(map[udpPublisherAddr]*udpPublisher), + } +} + +func (m *udpPublishersMap) get(addr udpPublisherAddr) *udpPublisher { + m.mutex.RLock() + defer m.mutex.RUnlock() + + el, ok := m.ma[addr] + if !ok { + return nil + } + return el +} + +func (m *udpPublishersMap) add(addr udpPublisherAddr, pub *udpPublisher) { + m.mutex.Lock() + defer m.mutex.Unlock() + + m.ma[addr] = pub +} + +func (m *udpPublishersMap) remove(addr udpPublisherAddr) { + m.mutex.Lock() + defer m.mutex.Unlock() + + delete(m.ma, addr) +} + +type readersMap struct { + mutex sync.RWMutex + ma map[*client]struct{} +} + +func newReadersMap() *readersMap { + return &readersMap{ + ma: make(map[*client]struct{}), + } +} + +func (m *readersMap) add(reader *client) { + m.mutex.Lock() + defer m.mutex.Unlock() + + m.ma[reader] = struct{}{} +} + +func (m *readersMap) remove(reader *client) { + m.mutex.Lock() + defer m.mutex.Unlock() + + delete(m.ma, reader) +} + +func (m *readersMap) forwardFrame(path *path, trackId int, streamType gortsplib.StreamType, frame []byte) { + m.mutex.RLock() + defer m.mutex.RUnlock() + + for c := range m.ma { + if c.path != path { + continue + } + + track, ok := c.streamTracks[trackId] + if !ok { + continue + } + + if c.streamProtocol == gortsplib.StreamProtocolUDP { + if streamType == gortsplib.StreamTypeRtp { + c.p.serverRtp.write(frame, &net.UDPAddr{ + IP: c.ip(), + Zone: c.zone(), + Port: track.rtpPort, + }) + + } else { + c.p.serverRtcp.write(frame, &net.UDPAddr{ + IP: c.ip(), + Zone: c.zone(), + Port: track.rtcpPort, + }) + } + + } else { + c.tcpFrame <- &gortsplib.InterleavedFrame{ + TrackId: trackId, + StreamType: streamType, + Content: frame, + } + } + } +}