mirror of
https://github.com/bluenviron/mediamtx.git
synced 2025-12-25 04:22:00 -08:00
rename client into conn
This commit is contained in:
parent
51843fc577
commit
ccd65a08c1
8 changed files with 123 additions and 125 deletions
|
|
@ -1,4 +1,4 @@
|
|||
package clientrtmp
|
||||
package connrtmp
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
|
@ -56,11 +56,11 @@ type PathMan interface {
|
|||
// Parent is implemented by serverrtmp.Server.
|
||||
type Parent interface {
|
||||
Log(logger.Level, string, ...interface{})
|
||||
OnClientClose(*Client)
|
||||
OnConnClose(*Conn)
|
||||
}
|
||||
|
||||
// Client is a RTMP client.
|
||||
type Client struct {
|
||||
// Conn is a server-side RTMP connection.
|
||||
type Conn struct {
|
||||
rtspAddress string
|
||||
readTimeout time.Duration
|
||||
writeTimeout time.Duration
|
||||
|
|
@ -80,7 +80,7 @@ type Client struct {
|
|||
terminate chan struct{}
|
||||
}
|
||||
|
||||
// New allocates a Client.
|
||||
// New allocates a Conn.
|
||||
func New(
|
||||
rtspAddress string,
|
||||
readTimeout time.Duration,
|
||||
|
|
@ -92,9 +92,9 @@ func New(
|
|||
stats *stats.Stats,
|
||||
nconn net.Conn,
|
||||
pathMan PathMan,
|
||||
parent Parent) *Client {
|
||||
parent Parent) *Conn {
|
||||
|
||||
c := &Client{
|
||||
c := &Conn{
|
||||
rtspAddress: rtspAddress,
|
||||
readTimeout: readTimeout,
|
||||
writeTimeout: writeTimeout,
|
||||
|
|
@ -109,7 +109,7 @@ func New(
|
|||
terminate: make(chan struct{}),
|
||||
}
|
||||
|
||||
c.log(logger.Info, "connected")
|
||||
c.log(logger.Info, "opened")
|
||||
|
||||
c.wg.Add(1)
|
||||
go c.run()
|
||||
|
|
@ -117,32 +117,32 @@ func New(
|
|||
return c
|
||||
}
|
||||
|
||||
// Close closes a Client.
|
||||
func (c *Client) Close() {
|
||||
c.log(logger.Info, "disconnected")
|
||||
// Close closes a Conn.
|
||||
func (c *Conn) Close() {
|
||||
c.log(logger.Info, "closed")
|
||||
close(c.terminate)
|
||||
}
|
||||
|
||||
// RequestClose closes a Client.
|
||||
func (c *Client) RequestClose() {
|
||||
c.parent.OnClientClose(c)
|
||||
// RequestClose closes a Conn.
|
||||
func (c *Conn) RequestClose() {
|
||||
c.parent.OnConnClose(c)
|
||||
}
|
||||
|
||||
// IsReadPublisher implements readpublisher.ReadPublisher.
|
||||
func (c *Client) IsReadPublisher() {}
|
||||
func (c *Conn) IsReadPublisher() {}
|
||||
|
||||
// IsSource implements source.Source.
|
||||
func (c *Client) IsSource() {}
|
||||
func (c *Conn) IsSource() {}
|
||||
|
||||
func (c *Client) log(level logger.Level, format string, args ...interface{}) {
|
||||
func (c *Conn) log(level logger.Level, format string, args ...interface{}) {
|
||||
c.parent.Log(level, "[client %v] "+format, append([]interface{}{c.conn.NetConn().RemoteAddr()}, args...)...)
|
||||
}
|
||||
|
||||
func (c *Client) ip() net.IP {
|
||||
func (c *Conn) ip() net.IP {
|
||||
return c.conn.NetConn().RemoteAddr().(*net.TCPAddr).IP
|
||||
}
|
||||
|
||||
func (c *Client) run() {
|
||||
func (c *Conn) run() {
|
||||
defer c.wg.Done()
|
||||
|
||||
if c.runOnConnect != "" {
|
||||
|
|
@ -161,7 +161,7 @@ func (c *Client) run() {
|
|||
c.log(logger.Info, "ERR: %s", err)
|
||||
c.conn.NetConn().Close()
|
||||
|
||||
c.parent.OnClientClose(c)
|
||||
c.parent.OnConnClose(c)
|
||||
<-c.terminate
|
||||
return
|
||||
}
|
||||
|
|
@ -173,7 +173,7 @@ func (c *Client) run() {
|
|||
}
|
||||
}
|
||||
|
||||
func (c *Client) runRead() {
|
||||
func (c *Conn) runRead() {
|
||||
var path readpublisher.Path
|
||||
var videoTrack *gortsplib.Track
|
||||
var h264Decoder *rtph264.Decoder
|
||||
|
|
@ -247,7 +247,7 @@ func (c *Client) runRead() {
|
|||
<-res
|
||||
}
|
||||
|
||||
c.parent.OnClientClose(c)
|
||||
c.parent.OnConnClose(c)
|
||||
<-c.terminate
|
||||
return
|
||||
}
|
||||
|
|
@ -359,7 +359,7 @@ func (c *Client) runRead() {
|
|||
path.OnReadPublisherRemove(readpublisher.RemoveReq{c, res}) //nolint:govet
|
||||
<-res
|
||||
|
||||
c.parent.OnClientClose(c)
|
||||
c.parent.OnConnClose(c)
|
||||
<-c.terminate
|
||||
|
||||
case <-c.terminate:
|
||||
|
|
@ -373,7 +373,7 @@ func (c *Client) runRead() {
|
|||
}
|
||||
}
|
||||
|
||||
func (c *Client) runPublish() {
|
||||
func (c *Conn) runPublish() {
|
||||
var videoTrack *gortsplib.Track
|
||||
var audioTrack *gortsplib.Track
|
||||
var err error
|
||||
|
|
@ -449,7 +449,7 @@ func (c *Client) runPublish() {
|
|||
c.conn.NetConn().Close()
|
||||
c.log(logger.Info, "ERR: %s", err)
|
||||
|
||||
c.parent.OnClientClose(c)
|
||||
c.parent.OnConnClose(c)
|
||||
<-c.terminate
|
||||
return
|
||||
}
|
||||
|
|
@ -580,7 +580,7 @@ func (c *Client) runPublish() {
|
|||
<-res
|
||||
path = nil
|
||||
|
||||
c.parent.OnClientClose(c)
|
||||
c.parent.OnConnClose(c)
|
||||
<-c.terminate
|
||||
|
||||
case <-c.terminate:
|
||||
|
|
@ -594,7 +594,7 @@ func (c *Client) runPublish() {
|
|||
}
|
||||
}
|
||||
|
||||
func (c *Client) validateCredentials(
|
||||
func (c *Conn) validateCredentials(
|
||||
pathUser string,
|
||||
pathPass string,
|
||||
query url.Values,
|
||||
|
|
@ -609,7 +609,7 @@ func (c *Client) validateCredentials(
|
|||
}
|
||||
|
||||
// OnFrame implements path.Reader.
|
||||
func (c *Client) OnFrame(trackID int, streamType gortsplib.StreamType, payload []byte) {
|
||||
func (c *Conn) OnFrame(trackID int, streamType gortsplib.StreamType, payload []byte) {
|
||||
if streamType == gortsplib.StreamTypeRTP {
|
||||
c.ringBuffer.Push(trackIDPayloadPair{trackID, payload})
|
||||
}
|
||||
|
|
@ -1,4 +1,4 @@
|
|||
package clientrtsp
|
||||
package connrtsp
|
||||
|
||||
import (
|
||||
"errors"
|
||||
|
|
@ -42,8 +42,8 @@ type Parent interface {
|
|||
Log(logger.Level, string, ...interface{})
|
||||
}
|
||||
|
||||
// Client is a RTSP client.
|
||||
type Client struct {
|
||||
// Conn is a RTSP server-side connection.
|
||||
type Conn struct {
|
||||
rtspAddress string
|
||||
readTimeout time.Duration
|
||||
runOnConnect string
|
||||
|
|
@ -60,7 +60,7 @@ type Client struct {
|
|||
authFailures int
|
||||
}
|
||||
|
||||
// New allocates a Client.
|
||||
// New allocates a Conn.
|
||||
func New(
|
||||
rtspAddress string,
|
||||
readTimeout time.Duration,
|
||||
|
|
@ -69,9 +69,9 @@ func New(
|
|||
pathMan PathMan,
|
||||
stats *stats.Stats,
|
||||
conn *gortsplib.ServerConn,
|
||||
parent Parent) *Client {
|
||||
parent Parent) *Conn {
|
||||
|
||||
c := &Client{
|
||||
c := &Conn{
|
||||
rtspAddress: rtspAddress,
|
||||
readTimeout: readTimeout,
|
||||
runOnConnect: runOnConnect,
|
||||
|
|
@ -82,7 +82,7 @@ func New(
|
|||
parent: parent,
|
||||
}
|
||||
|
||||
c.log(logger.Info, "connected")
|
||||
c.log(logger.Info, "opened")
|
||||
|
||||
if c.runOnConnect != "" {
|
||||
_, port, _ := net.SplitHostPort(c.rtspAddress)
|
||||
|
|
@ -95,44 +95,44 @@ func New(
|
|||
return c
|
||||
}
|
||||
|
||||
// Close closes a Client.
|
||||
func (c *Client) Close(err error) {
|
||||
// Close closes a Conn.
|
||||
func (c *Conn) Close(err error) {
|
||||
if err != io.EOF && !isTeardownErr(err) && !isTerminatedErr(err) {
|
||||
c.log(logger.Info, "ERR: %v", err)
|
||||
}
|
||||
|
||||
c.log(logger.Info, "disconnected")
|
||||
c.log(logger.Info, "closed")
|
||||
|
||||
if c.onConnectCmd != nil {
|
||||
c.onConnectCmd.Close()
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Client) log(level logger.Level, format string, args ...interface{}) {
|
||||
c.parent.Log(level, "[client %v] "+format, append([]interface{}{c.conn.NetConn().RemoteAddr()}, args...)...)
|
||||
func (c *Conn) log(level logger.Level, format string, args ...interface{}) {
|
||||
c.parent.Log(level, "[conn %v] "+format, append([]interface{}{c.conn.NetConn().RemoteAddr()}, args...)...)
|
||||
}
|
||||
|
||||
// Conn returns the RTSP connection.
|
||||
func (c *Client) Conn() *gortsplib.ServerConn {
|
||||
func (c *Conn) Conn() *gortsplib.ServerConn {
|
||||
return c.conn
|
||||
}
|
||||
|
||||
func (c *Client) ip() net.IP {
|
||||
func (c *Conn) ip() net.IP {
|
||||
return c.conn.NetConn().RemoteAddr().(*net.TCPAddr).IP
|
||||
}
|
||||
|
||||
// OnRequest is called by serverrtsp.Server.
|
||||
func (c *Client) OnRequest(req *base.Request) {
|
||||
func (c *Conn) OnRequest(req *base.Request) {
|
||||
c.log(logger.Debug, "[c->s] %v", req)
|
||||
}
|
||||
|
||||
// OnResponse is called by serverrtsp.Server.
|
||||
func (c *Client) OnResponse(res *base.Response) {
|
||||
func (c *Conn) OnResponse(res *base.Response) {
|
||||
c.log(logger.Debug, "[s->c] %v", res)
|
||||
}
|
||||
|
||||
// OnDescribe is called by serverrtsp.Server.
|
||||
func (c *Client) OnDescribe(ctx *gortsplib.ServerHandlerOnDescribeCtx) (*base.Response, []byte, error) {
|
||||
func (c *Conn) OnDescribe(ctx *gortsplib.ServerHandlerOnDescribeCtx) (*base.Response, []byte, error) {
|
||||
resc := make(chan readpublisher.DescribeRes)
|
||||
c.pathMan.OnReadPublisherDescribe(readpublisher.DescribeReq{
|
||||
PathName: ctx.Path,
|
||||
|
|
@ -183,7 +183,7 @@ func (c *Client) OnDescribe(ctx *gortsplib.ServerHandlerOnDescribeCtx) (*base.Re
|
|||
}
|
||||
|
||||
// ValidateCredentials allows to validate the credentials of a path.
|
||||
func (c *Client) ValidateCredentials(
|
||||
func (c *Conn) ValidateCredentials(
|
||||
authMethods []headers.AuthMethod,
|
||||
pathUser string,
|
||||
pathPass string,
|
||||
|
|
@ -174,7 +174,7 @@ func New(
|
|||
terminate: make(chan struct{}),
|
||||
}
|
||||
|
||||
c.log(logger.Info, "created")
|
||||
c.log(logger.Info, "opened")
|
||||
|
||||
c.wg.Add(1)
|
||||
go c.run()
|
||||
|
|
@ -184,7 +184,7 @@ func New(
|
|||
|
||||
// Close closes a Converter.
|
||||
func (c *Converter) Close() {
|
||||
c.log(logger.Info, "destroyed")
|
||||
c.log(logger.Info, "closed")
|
||||
close(c.terminate)
|
||||
}
|
||||
|
||||
|
|
@ -322,8 +322,6 @@ func (c *Converter) run() {
|
|||
c.path.OnReadPublisherPlay(readpublisher.PlayReq{c, resc}) //nolint:govet
|
||||
<-resc
|
||||
|
||||
c.log(logger.Info, "is reading")
|
||||
|
||||
writerDone := make(chan error)
|
||||
go func() {
|
||||
writerDone <- func() error {
|
||||
|
|
|
|||
|
|
@ -54,12 +54,12 @@ type PathManager struct {
|
|||
wg sync.WaitGroup
|
||||
|
||||
// in
|
||||
confReload chan map[string]*conf.PathConf
|
||||
pathClose chan *path.Path
|
||||
clientDescribe chan readpublisher.DescribeReq
|
||||
clientSetupPlay chan readpublisher.SetupPlayReq
|
||||
clientAnnounce chan readpublisher.AnnounceReq
|
||||
terminate chan struct{}
|
||||
confReload chan map[string]*conf.PathConf
|
||||
pathClose chan *path.Path
|
||||
rpDescribe chan readpublisher.DescribeReq
|
||||
rpSetupPlay chan readpublisher.SetupPlayReq
|
||||
rpAnnounce chan readpublisher.AnnounceReq
|
||||
terminate chan struct{}
|
||||
|
||||
// out
|
||||
done chan struct{}
|
||||
|
|
@ -90,9 +90,9 @@ func New(
|
|||
paths: make(map[string]*path.Path),
|
||||
confReload: make(chan map[string]*conf.PathConf),
|
||||
pathClose: make(chan *path.Path),
|
||||
clientDescribe: make(chan readpublisher.DescribeReq),
|
||||
clientSetupPlay: make(chan readpublisher.SetupPlayReq),
|
||||
clientAnnounce: make(chan readpublisher.AnnounceReq),
|
||||
rpDescribe: make(chan readpublisher.DescribeReq),
|
||||
rpSetupPlay: make(chan readpublisher.SetupPlayReq),
|
||||
rpAnnounce: make(chan readpublisher.AnnounceReq),
|
||||
terminate: make(chan struct{}),
|
||||
done: make(chan struct{}),
|
||||
}
|
||||
|
|
@ -161,7 +161,7 @@ outer:
|
|||
delete(pm.paths, pa.Name())
|
||||
pa.Close()
|
||||
|
||||
case req := <-pm.clientDescribe:
|
||||
case req := <-pm.rpDescribe:
|
||||
pathName, pathConf, err := pm.findPathConf(req.PathName)
|
||||
if err != nil {
|
||||
req.Res <- readpublisher.DescribeRes{nil, "", err} //nolint:govet
|
||||
|
|
@ -188,7 +188,7 @@ outer:
|
|||
|
||||
pm.paths[req.PathName].OnPathManDescribe(req)
|
||||
|
||||
case req := <-pm.clientSetupPlay:
|
||||
case req := <-pm.rpSetupPlay:
|
||||
pathName, pathConf, err := pm.findPathConf(req.PathName)
|
||||
if err != nil {
|
||||
req.Res <- readpublisher.SetupPlayRes{nil, nil, err} //nolint:govet
|
||||
|
|
@ -215,7 +215,7 @@ outer:
|
|||
|
||||
pm.paths[req.PathName].OnPathManSetupPlay(req)
|
||||
|
||||
case req := <-pm.clientAnnounce:
|
||||
case req := <-pm.rpAnnounce:
|
||||
pathName, pathConf, err := pm.findPathConf(req.PathName)
|
||||
if err != nil {
|
||||
req.Res <- readpublisher.AnnounceRes{nil, err} //nolint:govet
|
||||
|
|
@ -260,19 +260,19 @@ outer:
|
|||
return
|
||||
}
|
||||
|
||||
case req, ok := <-pm.clientDescribe:
|
||||
case req, ok := <-pm.rpDescribe:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
req.Res <- readpublisher.DescribeRes{nil, "", fmt.Errorf("terminated")} //nolint:govet
|
||||
|
||||
case req, ok := <-pm.clientSetupPlay:
|
||||
case req, ok := <-pm.rpSetupPlay:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
req.Res <- readpublisher.SetupPlayRes{nil, nil, fmt.Errorf("terminated")} //nolint:govet
|
||||
|
||||
case req, ok := <-pm.clientAnnounce:
|
||||
case req, ok := <-pm.rpAnnounce:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
|
@ -288,9 +288,9 @@ outer:
|
|||
|
||||
close(pm.confReload)
|
||||
close(pm.pathClose)
|
||||
close(pm.clientDescribe)
|
||||
close(pm.clientSetupPlay)
|
||||
close(pm.clientAnnounce)
|
||||
close(pm.rpDescribe)
|
||||
close(pm.rpSetupPlay)
|
||||
close(pm.rpAnnounce)
|
||||
}
|
||||
|
||||
func (pm *PathManager) createPath(confName string, conf *conf.PathConf, name string) {
|
||||
|
|
@ -347,19 +347,19 @@ func (pm *PathManager) OnPathClose(pa *path.Path) {
|
|||
pm.pathClose <- pa
|
||||
}
|
||||
|
||||
// OnReadPublisherDescribe is called by clientman.ClientMan.
|
||||
// OnReadPublisherDescribe is called by a ReadPublisher.
|
||||
func (pm *PathManager) OnReadPublisherDescribe(req readpublisher.DescribeReq) {
|
||||
pm.clientDescribe <- req
|
||||
pm.rpDescribe <- req
|
||||
}
|
||||
|
||||
// OnReadPublisherAnnounce is called by clientman.ClientMan.
|
||||
// OnReadPublisherAnnounce is called by a ReadPublisher.
|
||||
func (pm *PathManager) OnReadPublisherAnnounce(req readpublisher.AnnounceReq) {
|
||||
pm.clientAnnounce <- req
|
||||
pm.rpAnnounce <- req
|
||||
}
|
||||
|
||||
// OnReadPublisherSetupPlay is called by clientman.ClientMan.
|
||||
// OnReadPublisherSetupPlay is called by a ReadPublisher.
|
||||
func (pm *PathManager) OnReadPublisherSetupPlay(req readpublisher.SetupPlayReq) {
|
||||
pm.clientSetupPlay <- req
|
||||
pm.rpSetupPlay <- req
|
||||
}
|
||||
|
||||
func (pm *PathManager) authenticate(
|
||||
|
|
|
|||
|
|
@ -34,9 +34,9 @@ type Server struct {
|
|||
converters map[string]*converterhls.Converter
|
||||
|
||||
// in
|
||||
request chan converterhls.Request
|
||||
clientClose chan *converterhls.Converter
|
||||
terminate chan struct{}
|
||||
request chan converterhls.Request
|
||||
connClose chan *converterhls.Converter
|
||||
terminate chan struct{}
|
||||
|
||||
// out
|
||||
done chan struct{}
|
||||
|
|
@ -68,7 +68,7 @@ func New(
|
|||
ln: ln,
|
||||
converters: make(map[string]*converterhls.Converter),
|
||||
request: make(chan converterhls.Request),
|
||||
clientClose: make(chan *converterhls.Converter),
|
||||
connClose: make(chan *converterhls.Converter),
|
||||
terminate: make(chan struct{}),
|
||||
done: make(chan struct{}),
|
||||
}
|
||||
|
|
@ -116,7 +116,7 @@ outer:
|
|||
}
|
||||
c.OnRequest(req)
|
||||
|
||||
case c := <-s.clientClose:
|
||||
case c := <-s.connClose:
|
||||
if c2, ok := s.converters[c.PathName()]; !ok || c2 != c {
|
||||
continue
|
||||
}
|
||||
|
|
@ -136,7 +136,7 @@ outer:
|
|||
}
|
||||
req.Res <- nil
|
||||
|
||||
case _, ok := <-s.clientClose:
|
||||
case _, ok := <-s.connClose:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
|
@ -151,12 +151,12 @@ outer:
|
|||
hs.Shutdown(context.Background())
|
||||
|
||||
close(s.request)
|
||||
close(s.clientClose)
|
||||
close(s.connClose)
|
||||
}
|
||||
|
||||
// ServeHTTP implements http.Handler.
|
||||
func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
s.Log(logger.Info, "[client %v] %s %s", r.RemoteAddr, r.Method, r.URL.Path)
|
||||
s.Log(logger.Info, "[conn %v] %s %s", r.RemoteAddr, r.Method, r.URL.Path)
|
||||
|
||||
// remove leading prefix
|
||||
path := r.URL.Path[1:]
|
||||
|
|
@ -208,5 +208,5 @@ func (s *Server) doConverterClose(c *converterhls.Converter) {
|
|||
|
||||
// OnConverterClose is called by converterhls.Converter.
|
||||
func (s *Server) OnConverterClose(c *converterhls.Converter) {
|
||||
s.clientClose <- c
|
||||
s.connClose <- c
|
||||
}
|
||||
|
|
|
|||
|
|
@ -5,7 +5,7 @@ import (
|
|||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/aler9/rtsp-simple-server/internal/clientrtmp"
|
||||
"github.com/aler9/rtsp-simple-server/internal/connrtmp"
|
||||
"github.com/aler9/rtsp-simple-server/internal/logger"
|
||||
"github.com/aler9/rtsp-simple-server/internal/pathman"
|
||||
"github.com/aler9/rtsp-simple-server/internal/stats"
|
||||
|
|
@ -16,7 +16,7 @@ type Parent interface {
|
|||
Log(logger.Level, string, ...interface{})
|
||||
}
|
||||
|
||||
// Server is a RTMP listener.
|
||||
// Server is a RTMP server.
|
||||
type Server struct {
|
||||
readTimeout time.Duration
|
||||
writeTimeout time.Duration
|
||||
|
|
@ -28,13 +28,13 @@ type Server struct {
|
|||
pathMan *pathman.PathManager
|
||||
parent Parent
|
||||
|
||||
l net.Listener
|
||||
wg sync.WaitGroup
|
||||
clients map[*clientrtmp.Client]struct{}
|
||||
l net.Listener
|
||||
wg sync.WaitGroup
|
||||
conns map[*connrtmp.Conn]struct{}
|
||||
|
||||
// in
|
||||
clientClose chan *clientrtmp.Client
|
||||
terminate chan struct{}
|
||||
connClose chan *connrtmp.Conn
|
||||
terminate chan struct{}
|
||||
|
||||
// out
|
||||
done chan struct{}
|
||||
|
|
@ -69,8 +69,8 @@ func New(
|
|||
pathMan: pathMan,
|
||||
parent: parent,
|
||||
l: l,
|
||||
clients: make(map[*clientrtmp.Client]struct{}),
|
||||
clientClose: make(chan *clientrtmp.Client),
|
||||
conns: make(map[*connrtmp.Conn]struct{}),
|
||||
connClose: make(chan *connrtmp.Conn),
|
||||
terminate: make(chan struct{}),
|
||||
done: make(chan struct{}),
|
||||
}
|
||||
|
|
@ -121,7 +121,7 @@ outer:
|
|||
break outer
|
||||
|
||||
case nconn := <-connNew:
|
||||
c := clientrtmp.New(
|
||||
c := connrtmp.New(
|
||||
s.rtspAddress,
|
||||
s.readTimeout,
|
||||
s.writeTimeout,
|
||||
|
|
@ -133,13 +133,13 @@ outer:
|
|||
nconn,
|
||||
s.pathMan,
|
||||
s)
|
||||
s.clients[c] = struct{}{}
|
||||
s.conns[c] = struct{}{}
|
||||
|
||||
case c := <-s.clientClose:
|
||||
if _, ok := s.clients[c]; !ok {
|
||||
case c := <-s.connClose:
|
||||
if _, ok := s.conns[c]; !ok {
|
||||
continue
|
||||
}
|
||||
s.doClientClose(c)
|
||||
s.doConnClose(c)
|
||||
|
||||
case <-s.terminate:
|
||||
break outer
|
||||
|
|
@ -160,7 +160,7 @@ outer:
|
|||
}
|
||||
conn.Close()
|
||||
|
||||
case _, ok := <-s.clientClose:
|
||||
case _, ok := <-s.connClose:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
|
@ -170,23 +170,23 @@ outer:
|
|||
|
||||
s.l.Close()
|
||||
|
||||
for c := range s.clients {
|
||||
s.doClientClose(c)
|
||||
for c := range s.conns {
|
||||
s.doConnClose(c)
|
||||
}
|
||||
|
||||
s.wg.Wait()
|
||||
|
||||
close(acceptErr)
|
||||
close(connNew)
|
||||
close(s.clientClose)
|
||||
close(s.connClose)
|
||||
}
|
||||
|
||||
func (s *Server) doClientClose(c *clientrtmp.Client) {
|
||||
delete(s.clients, c)
|
||||
func (s *Server) doConnClose(c *connrtmp.Conn) {
|
||||
delete(s.conns, c)
|
||||
c.Close()
|
||||
}
|
||||
|
||||
// OnClientClose is called by clientrtmp.Client.
|
||||
func (s *Server) OnClientClose(c *clientrtmp.Client) {
|
||||
s.clientClose <- c
|
||||
// OnConnClose is called by connrtmp.Conn.
|
||||
func (s *Server) OnConnClose(c *connrtmp.Conn) {
|
||||
s.connClose <- c
|
||||
}
|
||||
|
|
|
|||
|
|
@ -11,7 +11,7 @@ import (
|
|||
"github.com/aler9/gortsplib"
|
||||
"github.com/aler9/gortsplib/pkg/base"
|
||||
|
||||
"github.com/aler9/rtsp-simple-server/internal/clientrtsp"
|
||||
"github.com/aler9/rtsp-simple-server/internal/connrtsp"
|
||||
"github.com/aler9/rtsp-simple-server/internal/logger"
|
||||
"github.com/aler9/rtsp-simple-server/internal/pathman"
|
||||
"github.com/aler9/rtsp-simple-server/internal/sessionrtsp"
|
||||
|
|
@ -47,7 +47,7 @@ type Parent interface {
|
|||
Log(logger.Level, string, ...interface{})
|
||||
}
|
||||
|
||||
// Server is a RTSP listener.
|
||||
// Server is a RTSP server.
|
||||
type Server struct {
|
||||
readTimeout time.Duration
|
||||
isTLS bool
|
||||
|
|
@ -61,7 +61,7 @@ type Server struct {
|
|||
|
||||
srv *gortsplib.Server
|
||||
mutex sync.RWMutex
|
||||
clients map[*gortsplib.ServerConn]*clientrtsp.Client
|
||||
conns map[*gortsplib.ServerConn]*connrtsp.Conn
|
||||
sessions map[*gortsplib.ServerSession]*sessionrtsp.Session
|
||||
|
||||
// in
|
||||
|
|
@ -100,7 +100,7 @@ func New(
|
|||
stats: stats,
|
||||
pathMan: pathMan,
|
||||
parent: parent,
|
||||
clients: make(map[*gortsplib.ServerConn]*clientrtsp.Client),
|
||||
conns: make(map[*gortsplib.ServerConn]*connrtsp.Conn),
|
||||
sessions: make(map[*gortsplib.ServerSession]*sessionrtsp.Session),
|
||||
terminate: make(chan struct{}),
|
||||
done: make(chan struct{}),
|
||||
|
|
@ -199,7 +199,7 @@ outer:
|
|||
|
||||
// OnConnOpen implements gortsplib.ServerHandlerOnConnOpen.
|
||||
func (s *Server) OnConnOpen(ctx *gortsplib.ServerHandlerOnConnOpenCtx) {
|
||||
c := clientrtsp.New(
|
||||
c := connrtsp.New(
|
||||
s.rtspAddress,
|
||||
s.readTimeout,
|
||||
s.runOnConnect,
|
||||
|
|
@ -210,15 +210,15 @@ func (s *Server) OnConnOpen(ctx *gortsplib.ServerHandlerOnConnOpenCtx) {
|
|||
s)
|
||||
|
||||
s.mutex.Lock()
|
||||
s.clients[ctx.Conn] = c
|
||||
s.conns[ctx.Conn] = c
|
||||
s.mutex.Unlock()
|
||||
}
|
||||
|
||||
// OnConnClose implements gortsplib.ServerHandlerOnConnClose.
|
||||
func (s *Server) OnConnClose(ctx *gortsplib.ServerHandlerOnConnCloseCtx) {
|
||||
s.mutex.Lock()
|
||||
c := s.clients[ctx.Conn]
|
||||
delete(s.clients, ctx.Conn)
|
||||
c := s.conns[ctx.Conn]
|
||||
delete(s.conns, ctx.Conn)
|
||||
s.mutex.Unlock()
|
||||
|
||||
c.Close(ctx.Error)
|
||||
|
|
@ -227,7 +227,7 @@ func (s *Server) OnConnClose(ctx *gortsplib.ServerHandlerOnConnCloseCtx) {
|
|||
// OnRequest implements gortsplib.ServerHandlerOnRequest.
|
||||
func (s *Server) OnRequest(sc *gortsplib.ServerConn, req *base.Request) {
|
||||
s.mutex.Lock()
|
||||
c := s.clients[sc]
|
||||
c := s.conns[sc]
|
||||
s.mutex.Unlock()
|
||||
|
||||
c.OnRequest(req)
|
||||
|
|
@ -236,7 +236,7 @@ func (s *Server) OnRequest(sc *gortsplib.ServerConn, req *base.Request) {
|
|||
// OnResponse implements gortsplib.ServerHandlerOnResponse.
|
||||
func (s *Server) OnResponse(sc *gortsplib.ServerConn, res *base.Response) {
|
||||
s.mutex.Lock()
|
||||
c := s.clients[sc]
|
||||
c := s.conns[sc]
|
||||
s.mutex.Unlock()
|
||||
|
||||
c.OnResponse(res)
|
||||
|
|
@ -276,7 +276,7 @@ func (s *Server) OnSessionClose(ctx *gortsplib.ServerHandlerOnSessionCloseCtx) {
|
|||
// OnDescribe implements gortsplib.ServerHandlerOnDescribe.
|
||||
func (s *Server) OnDescribe(ctx *gortsplib.ServerHandlerOnDescribeCtx) (*base.Response, []byte, error) {
|
||||
s.mutex.RLock()
|
||||
c := s.clients[ctx.Conn]
|
||||
c := s.conns[ctx.Conn]
|
||||
s.mutex.RUnlock()
|
||||
return c.OnDescribe(ctx)
|
||||
}
|
||||
|
|
@ -284,7 +284,7 @@ func (s *Server) OnDescribe(ctx *gortsplib.ServerHandlerOnDescribeCtx) (*base.Re
|
|||
// OnAnnounce implements gortsplib.ServerHandlerOnAnnounce.
|
||||
func (s *Server) OnAnnounce(ctx *gortsplib.ServerHandlerOnAnnounceCtx) (*base.Response, error) {
|
||||
s.mutex.RLock()
|
||||
c := s.clients[ctx.Conn]
|
||||
c := s.conns[ctx.Conn]
|
||||
se := s.sessions[ctx.Session]
|
||||
s.mutex.RUnlock()
|
||||
return se.OnAnnounce(c, ctx)
|
||||
|
|
@ -293,7 +293,7 @@ func (s *Server) OnAnnounce(ctx *gortsplib.ServerHandlerOnAnnounceCtx) (*base.Re
|
|||
// OnSetup implements gortsplib.ServerHandlerOnSetup.
|
||||
func (s *Server) OnSetup(ctx *gortsplib.ServerHandlerOnSetupCtx) (*base.Response, error) {
|
||||
s.mutex.RLock()
|
||||
c := s.clients[ctx.Conn]
|
||||
c := s.conns[ctx.Conn]
|
||||
se := s.sessions[ctx.Session]
|
||||
s.mutex.RUnlock()
|
||||
return se.OnSetup(c, ctx)
|
||||
|
|
|
|||
|
|
@ -11,7 +11,7 @@ import (
|
|||
"github.com/aler9/gortsplib/pkg/base"
|
||||
"github.com/aler9/gortsplib/pkg/headers"
|
||||
|
||||
"github.com/aler9/rtsp-simple-server/internal/clientrtsp"
|
||||
"github.com/aler9/rtsp-simple-server/internal/connrtsp"
|
||||
"github.com/aler9/rtsp-simple-server/internal/externalcmd"
|
||||
"github.com/aler9/rtsp-simple-server/internal/logger"
|
||||
"github.com/aler9/rtsp-simple-server/internal/readpublisher"
|
||||
|
|
@ -35,7 +35,7 @@ type Parent interface {
|
|||
Log(logger.Level, string, ...interface{})
|
||||
}
|
||||
|
||||
// Session is a RTSP session.
|
||||
// Session is a RTSP server-side session.
|
||||
type Session struct {
|
||||
rtspAddress string
|
||||
protocols map[gortsplib.StreamProtocol]struct{}
|
||||
|
|
@ -70,7 +70,7 @@ func New(
|
|||
parent: parent,
|
||||
}
|
||||
|
||||
s.log(logger.Info, "created by %v", sc.NetConn().RemoteAddr())
|
||||
s.log(logger.Info, "opened by %v", sc.NetConn().RemoteAddr())
|
||||
|
||||
return s
|
||||
}
|
||||
|
|
@ -96,7 +96,7 @@ func (s *Session) Close() {
|
|||
s.path = nil
|
||||
}
|
||||
|
||||
s.log(logger.Info, "destroyed")
|
||||
s.log(logger.Info, "closed")
|
||||
}
|
||||
|
||||
// RequestClose closes a Session.
|
||||
|
|
@ -120,7 +120,7 @@ func (s *Session) log(level logger.Level, format string, args ...interface{}) {
|
|||
}
|
||||
|
||||
// OnAnnounce is called by serverrtsp.Server.
|
||||
func (s *Session) OnAnnounce(c *clientrtsp.Client, ctx *gortsplib.ServerHandlerOnAnnounceCtx) (*base.Response, error) {
|
||||
func (s *Session) OnAnnounce(c *connrtsp.Conn, ctx *gortsplib.ServerHandlerOnAnnounceCtx) (*base.Response, error) {
|
||||
resc := make(chan readpublisher.AnnounceRes)
|
||||
s.pathMan.OnReadPublisherAnnounce(readpublisher.AnnounceReq{
|
||||
Author: s,
|
||||
|
|
@ -160,7 +160,7 @@ func (s *Session) OnAnnounce(c *clientrtsp.Client, ctx *gortsplib.ServerHandlerO
|
|||
}
|
||||
|
||||
// OnSetup is called by serverrtsp.Server.
|
||||
func (s *Session) OnSetup(c *clientrtsp.Client, ctx *gortsplib.ServerHandlerOnSetupCtx) (*base.Response, error) {
|
||||
func (s *Session) OnSetup(c *connrtsp.Conn, ctx *gortsplib.ServerHandlerOnSetupCtx) (*base.Response, error) {
|
||||
if ctx.Transport.Protocol == gortsplib.StreamProtocolUDP {
|
||||
if _, ok := s.protocols[gortsplib.StreamProtocolUDP]; !ok {
|
||||
return &base.Response{
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue