diff --git a/go.mod b/go.mod index a8a3b648..b68b3013 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.15 require ( github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 // indirect github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d // indirect - github.com/aler9/gortsplib v0.0.0-20210508145957-4beda10c06f9 + github.com/aler9/gortsplib v0.0.0-20210508200522-b98fa90057b4 github.com/asticode/go-astits v0.0.0-00010101000000-000000000000 github.com/davecgh/go-spew v1.1.1 // indirect github.com/fsnotify/fsnotify v1.4.9 diff --git a/go.sum b/go.sum index 2d50c2f0..72c8eb53 100644 --- a/go.sum +++ b/go.sum @@ -4,8 +4,8 @@ github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d h1:UQZhZ2O0vMHr2c github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho= github.com/aler9/go-astits v0.0.0-20210423195926-582b09ed7c04 h1:CXgQLsU4uxWAmsXNOjGLbj0A+0IlRcpZpMgI13fmVwo= github.com/aler9/go-astits v0.0.0-20210423195926-582b09ed7c04/go.mod h1:DkOWmBNQpnr9mv24KfZjq4JawCFX1FCqjLVGvO0DygQ= -github.com/aler9/gortsplib v0.0.0-20210508145957-4beda10c06f9 h1:v9QqgvTa2mCcsEFvwJigabE/fNIIOXC40fGneIPm89Y= -github.com/aler9/gortsplib v0.0.0-20210508145957-4beda10c06f9/go.mod h1:zVCg+TQX445hh1pC5QgAuuBvvXZMWLY1XYz626dGFqY= +github.com/aler9/gortsplib v0.0.0-20210508200522-b98fa90057b4 h1:mL4yLi09Bpph+cLlXdU6CJAsIox9a5grz4ik4M3Q9ik= +github.com/aler9/gortsplib v0.0.0-20210508200522-b98fa90057b4/go.mod h1:zVCg+TQX445hh1pC5QgAuuBvvXZMWLY1XYz626dGFqY= github.com/aler9/rtmp v0.0.0-20210403095203-3be4a5535927 h1:95mXJ5fUCYpBRdSOnLAQAdJHHKxxxJrVCiaqDi965YQ= github.com/aler9/rtmp v0.0.0-20210403095203-3be4a5535927/go.mod h1:vzuE21rowz+lT1NGsWbreIvYulgBpCGnQyeTyFblUHc= github.com/asticode/go-astikit v0.20.0 h1:+7N+J4E4lWx2QOkRdOf6DafWJMv6O4RRfgClwQokrH8= diff --git a/internal/clientrtmp/client.go b/internal/clientrtmp/client.go index ce1aaeda..4c731e04 100644 --- a/internal/clientrtmp/client.go +++ b/internal/clientrtmp/client.go @@ -138,7 +138,7 @@ func (c *Client) IsReadPublisher() {} func (c *Client) IsSource() {} func (c *Client) log(level logger.Level, format string, args ...interface{}) { - c.parent.Log(level, "[client %s] "+format, append([]interface{}{c.conn.NetConn().RemoteAddr().String()}, args...)...) + c.parent.Log(level, "[client %v] "+format, append([]interface{}{c.conn.NetConn().RemoteAddr()}, args...)...) } func (c *Client) ip() net.IP { diff --git a/internal/clientrtsp/client.go b/internal/clientrtsp/client.go index 747de8b6..7c79395c 100644 --- a/internal/clientrtsp/client.go +++ b/internal/clientrtsp/client.go @@ -110,7 +110,7 @@ func (c *Client) Close(err error) { } func (c *Client) log(level logger.Level, format string, args ...interface{}) { - c.parent.Log(level, "[client %s] "+format, append([]interface{}{c.conn.NetConn().RemoteAddr().String()}, args...)...) + c.parent.Log(level, "[client %v] "+format, append([]interface{}{c.conn.NetConn().RemoteAddr()}, args...)...) } // Conn returns the RTSP connection. diff --git a/internal/serverhls/server.go b/internal/serverhls/server.go index 90853b6e..f87e6997 100644 --- a/internal/serverhls/server.go +++ b/internal/serverhls/server.go @@ -156,7 +156,7 @@ outer: // ServeHTTP implements http.Handler. func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { - s.Log(logger.Info, "[client %s] %s %s", r.RemoteAddr, r.Method, r.URL.Path) + s.Log(logger.Info, "[client %v] %s %s", r.RemoteAddr, r.Method, r.URL.Path) // remove leading prefix path := r.URL.Path[1:] diff --git a/internal/serverrtsp/server.go b/internal/serverrtsp/server.go index 73858327..bed21e80 100644 --- a/internal/serverrtsp/server.go +++ b/internal/serverrtsp/server.go @@ -1,7 +1,10 @@ package serverrtsp import ( + "crypto/rand" "crypto/tls" + "encoding/binary" + "strconv" "sync" "time" @@ -15,6 +18,30 @@ import ( "github.com/aler9/rtsp-simple-server/internal/stats" ) +func newSessionVisualID(sessions map[*gortsplib.ServerSession]*sessionrtsp.Session) (string, error) { + for { + b := make([]byte, 4) + _, err := rand.Read(b) + if err != nil { + return "", err + } + + id := strconv.FormatUint(uint64(binary.LittleEndian.Uint32(b)), 10) + + alreadyPresent := func() bool { + for _, s := range sessions { + if s.VisualID() == id { + return true + } + } + return false + }() + if !alreadyPresent { + return id, nil + } + } +} + // Parent is implemented by program. type Parent interface { Log(logger.Level, string, ...interface{}) @@ -170,8 +197,8 @@ outer: close(serverErr) } -// OnConnOpen implements gortsplib.ServerHandlerOnConnOpenCtx. -func (s *Server) OnConnOpen(sc *gortsplib.ServerConn) { +// OnConnOpen implements gortsplib.ServerHandlerOnConnOpen. +func (s *Server) OnConnOpen(ctx *gortsplib.ServerHandlerOnConnOpenCtx) { c := clientrtsp.New( s.rtspAddress, s.readTimeout, @@ -179,25 +206,25 @@ func (s *Server) OnConnOpen(sc *gortsplib.ServerConn) { s.runOnConnectRestart, s.pathMan, s.stats, - sc, + ctx.Conn, s) s.mutex.Lock() - s.clients[sc] = c + s.clients[ctx.Conn] = c s.mutex.Unlock() } -// OnConnClose implements gortsplib.ServerHandlerOnConnCloseCtx. -func (s *Server) OnConnClose(sc *gortsplib.ServerConn, err error) { +// OnConnClose implements gortsplib.ServerHandlerOnConnClose. +func (s *Server) OnConnClose(ctx *gortsplib.ServerHandlerOnConnCloseCtx) { s.mutex.Lock() - c := s.clients[sc] - delete(s.clients, sc) + c := s.clients[ctx.Conn] + delete(s.clients, ctx.Conn) s.mutex.Unlock() - c.Close(err) + c.Close(ctx.Error) } -// OnRequest implements gortsplib.ServerHandlerOnRequestCtx. +// OnRequest implements gortsplib.ServerHandlerOnRequest. func (s *Server) OnRequest(sc *gortsplib.ServerConn, req *base.Request) { s.mutex.Lock() c := s.clients[sc] @@ -206,7 +233,7 @@ func (s *Server) OnRequest(sc *gortsplib.ServerConn, req *base.Request) { c.OnRequest(req) } -// OnResponse implements gortsplib.ServerHandlerOnResponseCtx. +// OnResponse implements gortsplib.ServerHandlerOnResponse. func (s *Server) OnResponse(sc *gortsplib.ServerConn, res *base.Response) { s.mutex.Lock() c := s.clients[sc] @@ -215,31 +242,38 @@ func (s *Server) OnResponse(sc *gortsplib.ServerConn, res *base.Response) { c.OnResponse(res) } -// OnSessionOpen implements gortsplib.ServerHandlerOnSessionOpenCtx. -func (s *Server) OnSessionOpen(ss *gortsplib.ServerSession) { +// OnSessionOpen implements gortsplib.ServerHandlerOnSessionOpen. +func (s *Server) OnSessionOpen(ctx *gortsplib.ServerHandlerOnSessionOpenCtx) { + s.mutex.Lock() + + // do not use ss.ID() in logs, since it allows to take ownership of a session + // use a new random ID + visualID, _ := newSessionVisualID(s.sessions) + se := sessionrtsp.New( s.rtspAddress, s.protocols, - ss, + visualID, + ctx.Session, + ctx.Conn, s.pathMan, s) - s.mutex.Lock() - s.sessions[ss] = se + s.sessions[ctx.Session] = se s.mutex.Unlock() } -// OnSessionClose implements gortsplib.ServerHandlerOnSessionCloseCtx. -func (s *Server) OnSessionClose(ss *gortsplib.ServerSession, err error) { +// OnSessionClose implements gortsplib.ServerHandlerOnSessionClose. +func (s *Server) OnSessionClose(ctx *gortsplib.ServerHandlerOnSessionCloseCtx) { s.mutex.Lock() - se := s.sessions[ss] - delete(s.sessions, ss) + se := s.sessions[ctx.Session] + delete(s.sessions, ctx.Session) s.mutex.Unlock() se.Close() } -// OnDescribe implements gortsplib.ServerHandlerOnDescribeCtx. +// OnDescribe implements gortsplib.ServerHandlerOnDescribe. func (s *Server) OnDescribe(ctx *gortsplib.ServerHandlerOnDescribeCtx) (*base.Response, []byte, error) { s.mutex.RLock() c := s.clients[ctx.Conn] @@ -247,7 +281,7 @@ func (s *Server) OnDescribe(ctx *gortsplib.ServerHandlerOnDescribeCtx) (*base.Re return c.OnDescribe(ctx) } -// OnAnnounce implements gortsplib.ServerHandlerOnAnnounceCtx. +// OnAnnounce implements gortsplib.ServerHandlerOnAnnounce. func (s *Server) OnAnnounce(ctx *gortsplib.ServerHandlerOnAnnounceCtx) (*base.Response, error) { s.mutex.RLock() c := s.clients[ctx.Conn] @@ -256,7 +290,7 @@ func (s *Server) OnAnnounce(ctx *gortsplib.ServerHandlerOnAnnounceCtx) (*base.Re return se.OnAnnounce(c, ctx) } -// OnSetup implements gortsplib.ServerHandlerOnSetupCtx. +// OnSetup implements gortsplib.ServerHandlerOnSetup. func (s *Server) OnSetup(ctx *gortsplib.ServerHandlerOnSetupCtx) (*base.Response, error) { s.mutex.RLock() c := s.clients[ctx.Conn] @@ -265,7 +299,7 @@ func (s *Server) OnSetup(ctx *gortsplib.ServerHandlerOnSetupCtx) (*base.Response return se.OnSetup(c, ctx) } -// OnPlay implements gortsplib.ServerHandlerOnPlayCtx. +// OnPlay implements gortsplib.ServerHandlerOnPlay. func (s *Server) OnPlay(ctx *gortsplib.ServerHandlerOnPlayCtx) (*base.Response, error) { s.mutex.RLock() se := s.sessions[ctx.Session] @@ -273,7 +307,7 @@ func (s *Server) OnPlay(ctx *gortsplib.ServerHandlerOnPlayCtx) (*base.Response, return se.OnPlay(ctx) } -// OnRecord implements gortsplib.ServerHandlerOnRecordCtx. +// OnRecord implements gortsplib.ServerHandlerOnRecord. func (s *Server) OnRecord(ctx *gortsplib.ServerHandlerOnRecordCtx) (*base.Response, error) { s.mutex.RLock() se := s.sessions[ctx.Session] @@ -281,7 +315,7 @@ func (s *Server) OnRecord(ctx *gortsplib.ServerHandlerOnRecordCtx) (*base.Respon return se.OnRecord(ctx) } -// OnPause implements gortsplib.ServerHandlerOnPauseCtx. +// OnPause implements gortsplib.ServerHandlerOnPause. func (s *Server) OnPause(ctx *gortsplib.ServerHandlerOnPauseCtx) (*base.Response, error) { s.mutex.RLock() se := s.sessions[ctx.Session] @@ -289,7 +323,7 @@ func (s *Server) OnPause(ctx *gortsplib.ServerHandlerOnPauseCtx) (*base.Response return se.OnPause(ctx) } -// OnFrame implements gortsplib.ServerHandlerOnFrameCtx. +// OnFrame implements gortsplib.ServerHandlerOnFrame. func (s *Server) OnFrame(ctx *gortsplib.ServerHandlerOnFrameCtx) { s.mutex.RLock() se := s.sessions[ctx.Session] diff --git a/internal/sessionrtsp/session.go b/internal/sessionrtsp/session.go index 28733b98..ea9bbd10 100644 --- a/internal/sessionrtsp/session.go +++ b/internal/sessionrtsp/session.go @@ -39,6 +39,7 @@ type Parent interface { type Session struct { rtspAddress string protocols map[gortsplib.StreamProtocol]struct{} + visualID string ss *gortsplib.ServerSession pathMan PathMan parent Parent @@ -54,19 +55,22 @@ type Session struct { func New( rtspAddress string, protocols map[gortsplib.StreamProtocol]struct{}, + visualID string, ss *gortsplib.ServerSession, + sc *gortsplib.ServerConn, pathMan PathMan, parent Parent) *Session { s := &Session{ rtspAddress: rtspAddress, protocols: protocols, + visualID: visualID, ss: ss, pathMan: pathMan, parent: parent, } - s.log(logger.Info, "created") + s.log(logger.Info, "created by %v", sc.NetConn().RemoteAddr()) return s } @@ -106,8 +110,13 @@ func (s *Session) IsReadPublisher() {} // IsSource implements source.Source. func (s *Session) IsSource() {} +// VisualID returns the visual ID of the session. +func (s *Session) VisualID() string { + return s.visualID +} + func (s *Session) log(level logger.Level, format string, args ...interface{}) { - s.parent.Log(level, "[session %s] "+format, append([]interface{}{"TODO"}, args...)...) + s.parent.Log(level, "[session %s] "+format, append([]interface{}{s.visualID}, args...)...) } // OnAnnounce is called by serverrtsp.Server.