From 2b2f8fcdf33430ac039c41750823deda467db721 Mon Sep 17 00:00:00 2001 From: aler9 <46489434+aler9@users.noreply.github.com> Date: Mon, 3 Aug 2020 17:54:23 +0200 Subject: [PATCH] speed up routing of udp frames --- main.go | 63 ++++++++++++++++++++++++++------------------------------ utils.go | 14 +++++++++++++ 2 files changed, 43 insertions(+), 34 deletions(-) diff --git a/main.go b/main.go index 1a86ea5d..af1ab82f 100644 --- a/main.go +++ b/main.go @@ -163,18 +163,19 @@ type programEventTerminate struct{} func (programEventTerminate) isProgramEvent() {} type program struct { - conf *conf - logFile *os.File - metrics *metrics - serverRtsp *serverTcp - serverRtp *serverUdp - serverRtcp *serverUdp - sources []*source - clients map[*client]struct{} - paths map[string]*path - cmds []*exec.Cmd - publisherCount int - readerCount int + conf *conf + logFile *os.File + metrics *metrics + serverRtsp *serverTcp + serverRtp *serverUdp + serverRtcp *serverUdp + sources []*source + clients map[*client]struct{} + udpClientPublishers map[ipKey]*client + paths map[string]*path + cmds []*exec.Cmd + publisherCount int + readerCount int events chan programEvent done chan struct{} @@ -200,11 +201,12 @@ func newProgram(args []string, stdin io.Reader) (*program, error) { } p := &program{ - conf: conf, - clients: make(map[*client]struct{}), - paths: make(map[string]*path), - events: make(chan programEvent), - done: make(chan struct{}), + conf: conf, + clients: make(map[*client]struct{}), + udpClientPublishers: make(map[ipKey]*client), + paths: make(map[string]*path), + events: make(chan programEvent), + done: make(chan struct{}), } if _, ok := p.conf.logDestinationsParsed[logDestinationFile]; ok { @@ -419,17 +421,19 @@ outer: case programEventClientRecord: p.publisherCount += 1 evt.client.state = clientStateRecord + p.udpClientPublishers[makeIpKey(evt.client.ip())] = evt.client p.paths[evt.client.pathId].publisherSetReady() close(evt.done) case programEventClientRecordStop: p.publisherCount -= 1 evt.client.state = clientStatePreRecord + delete(p.udpClientPublishers, makeIpKey(evt.client.ip())) p.paths[evt.client.pathId].publisherSetNotReady() close(evt.done) case programEventClientFrameUdp: - client, trackId := p.findClientPublisher(evt.addr, evt.streamType) + client, trackId := p.findUdpClientPublisher(evt.addr, evt.streamType) if client == nil { continue } @@ -544,31 +548,22 @@ func (p *program) findConfForPath(path string) *confPath { return nil } -func (p *program) findClientPublisher(addr *net.UDPAddr, streamType gortsplib.StreamType) (*client, int) { - for _, path := range p.paths { - cl, ok := path.publisher.(*client) - if !ok { - continue - } - - if cl.streamProtocol != gortsplib.StreamProtocolUdp || - cl.state != clientStateRecord || - !cl.ip().Equal(addr.IP) { - continue - } - - for i, t := range cl.streamTracks { +func (p *program) findUdpClientPublisher(addr *net.UDPAddr, streamType gortsplib.StreamType) (*client, int) { + c, ok := p.udpClientPublishers[makeIpKey(addr.IP)] + if ok { + for i, t := range c.streamTracks { if streamType == gortsplib.StreamTypeRtp { if t.rtpPort == addr.Port { - return cl, i + return c, i } } else { if t.rtcpPort == addr.Port { - return cl, i + return c, i } } } } + return nil, -1 } diff --git a/utils.go b/utils.go index dde19d83..8d384a74 100644 --- a/utils.go +++ b/utils.go @@ -147,3 +147,17 @@ func splitPath(path string) (string, string, error) { return comps[0], comps[1], nil } + +// use a fixed-size array for ip comparison +type ipKey [net.IPv6len]byte + +func makeIpKey(ip net.IP) ipKey { + var ret ipKey + if len(ip) == net.IPv4len { + copy(ret[0:], []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0xff, 0xff}) // v4InV6Prefix + copy(ret[12:], ip) + } else { + copy(ret[:], ip) + } + return ret +}