forked from External/mediamtx
cleanup
This commit is contained in:
parent
49ac52ff67
commit
40f2d5cd09
4 changed files with 218 additions and 217 deletions
|
|
@ -27,6 +27,74 @@ const (
|
|||
pauseAfterAuthError = 2 * time.Second
|
||||
)
|
||||
|
||||
// DescribeRes is a client describe response.
|
||||
type DescribeRes struct {
|
||||
Path Path
|
||||
Err error
|
||||
}
|
||||
|
||||
// DescribeReq is a client describe request.
|
||||
type DescribeReq struct {
|
||||
Client *Client
|
||||
PathName string
|
||||
Req *base.Request
|
||||
Res chan DescribeRes
|
||||
}
|
||||
|
||||
// AnnounceRes is a client announce response.
|
||||
type AnnounceRes struct {
|
||||
Path Path
|
||||
Err error
|
||||
}
|
||||
|
||||
// AnnounceReq is a client announce request.
|
||||
type AnnounceReq struct {
|
||||
Client *Client
|
||||
PathName string
|
||||
Tracks gortsplib.Tracks
|
||||
Req *base.Request
|
||||
Res chan AnnounceRes
|
||||
}
|
||||
|
||||
// SetupPlayRes is a setup/play response.
|
||||
type SetupPlayRes struct {
|
||||
Path Path
|
||||
Err error
|
||||
}
|
||||
|
||||
// SetupPlayReq is a setup/play request.
|
||||
type SetupPlayReq struct {
|
||||
Client *Client
|
||||
PathName string
|
||||
TrackID int
|
||||
Req *base.Request
|
||||
Res chan SetupPlayRes
|
||||
}
|
||||
|
||||
// RemoveReq is a remove request.
|
||||
type RemoveReq struct {
|
||||
Client *Client
|
||||
Res chan struct{}
|
||||
}
|
||||
|
||||
// PlayReq is a play request.
|
||||
type PlayReq struct {
|
||||
Client *Client
|
||||
Res chan struct{}
|
||||
}
|
||||
|
||||
// RecordReq is a record request.
|
||||
type RecordReq struct {
|
||||
Client *Client
|
||||
Res chan struct{}
|
||||
}
|
||||
|
||||
// PauseReq is a pause request.
|
||||
type PauseReq struct {
|
||||
Client *Client
|
||||
Res chan struct{}
|
||||
}
|
||||
|
||||
type describeData struct {
|
||||
sdp []byte
|
||||
redirect string
|
||||
|
|
@ -38,10 +106,10 @@ type Path interface {
|
|||
Name() string
|
||||
SourceTrackCount() int
|
||||
Conf() *conf.PathConf
|
||||
OnClientRemove(*Client)
|
||||
OnClientPlay(*Client)
|
||||
OnClientRecord(*Client)
|
||||
OnClientPause(*Client)
|
||||
OnClientRemove(RemoveReq)
|
||||
OnClientPlay(PlayReq)
|
||||
OnClientRecord(RecordReq)
|
||||
OnClientPause(PauseReq)
|
||||
OnFrame(int, gortsplib.StreamType, []byte)
|
||||
}
|
||||
|
||||
|
|
@ -49,12 +117,12 @@ type Path interface {
|
|||
type Parent interface {
|
||||
Log(logger.Level, string, ...interface{})
|
||||
OnClientClose(*Client)
|
||||
OnClientDescribe(*Client, string, *base.Request) (Path, error)
|
||||
OnClientAnnounce(*Client, string, gortsplib.Tracks, *base.Request) (Path, error)
|
||||
OnClientSetupPlay(*Client, string, int, *base.Request) (Path, error)
|
||||
OnClientDescribe(DescribeReq)
|
||||
OnClientAnnounce(AnnounceReq)
|
||||
OnClientSetupPlay(SetupPlayReq)
|
||||
}
|
||||
|
||||
// Client is a RTSP client.
|
||||
// Client is a RTSP
|
||||
type Client struct {
|
||||
rtspPort int
|
||||
readTimeout time.Duration
|
||||
|
|
@ -167,35 +235,37 @@ func (c *Client) run() {
|
|||
|
||||
c.describeData = make(chan describeData)
|
||||
|
||||
path, err := c.parent.OnClientDescribe(c, reqPath, req)
|
||||
if err != nil {
|
||||
switch terr := err.(type) {
|
||||
resc := make(chan DescribeRes)
|
||||
c.parent.OnClientDescribe(DescribeReq{c, reqPath, req, resc})
|
||||
res := <-resc
|
||||
|
||||
if res.Err != nil {
|
||||
switch terr := res.Err.(type) {
|
||||
case errAuthNotCritical:
|
||||
return terr.Response, nil
|
||||
|
||||
case errAuthCritical:
|
||||
// wait some seconds to stop brute force attacks
|
||||
t := time.NewTimer(pauseAfterAuthError)
|
||||
defer t.Stop()
|
||||
select {
|
||||
case <-t.C:
|
||||
case <-time.After(pauseAfterAuthError):
|
||||
case <-c.terminate:
|
||||
}
|
||||
|
||||
return terr.Response, errTerminated
|
||||
|
||||
default:
|
||||
return &base.Response{
|
||||
StatusCode: base.StatusBadRequest,
|
||||
}, err
|
||||
}, res.Err
|
||||
}
|
||||
}
|
||||
|
||||
c.path = path
|
||||
c.path = res.Path
|
||||
|
||||
select {
|
||||
case res := <-c.describeData:
|
||||
c.path.OnClientRemove(c)
|
||||
resc := make(chan struct{})
|
||||
c.path.OnClientRemove(RemoveReq{c, resc})
|
||||
<-resc
|
||||
c.path = nil
|
||||
|
||||
if res.err != nil {
|
||||
|
|
@ -230,7 +300,9 @@ func (c *Client) run() {
|
|||
}
|
||||
}()
|
||||
|
||||
c.path.OnClientRemove(c)
|
||||
resc := make(chan struct{})
|
||||
c.path.OnClientRemove(RemoveReq{c, resc})
|
||||
<-resc
|
||||
c.path = nil
|
||||
|
||||
close(c.describeData)
|
||||
|
|
@ -249,31 +321,31 @@ func (c *Client) run() {
|
|||
}, fmt.Errorf("invalid path (%s)", req.URL)
|
||||
}
|
||||
|
||||
path, err := c.parent.OnClientAnnounce(c, reqPath, tracks, req)
|
||||
if err != nil {
|
||||
switch terr := err.(type) {
|
||||
resc := make(chan AnnounceRes)
|
||||
c.parent.OnClientAnnounce(AnnounceReq{c, reqPath, tracks, req, resc})
|
||||
res := <-resc
|
||||
|
||||
if res.Err != nil {
|
||||
switch terr := res.Err.(type) {
|
||||
case errAuthNotCritical:
|
||||
return terr.Response, nil
|
||||
|
||||
case errAuthCritical:
|
||||
// wait some seconds to stop brute force attacks
|
||||
t := time.NewTimer(pauseAfterAuthError)
|
||||
defer t.Stop()
|
||||
select {
|
||||
case <-t.C:
|
||||
case <-time.After(pauseAfterAuthError):
|
||||
case <-c.terminate:
|
||||
}
|
||||
|
||||
return terr.Response, errTerminated
|
||||
|
||||
default:
|
||||
return &base.Response{
|
||||
StatusCode: base.StatusBadRequest,
|
||||
}, err
|
||||
}, res.Err
|
||||
}
|
||||
}
|
||||
|
||||
c.path = path
|
||||
c.path = res.Path
|
||||
|
||||
return &base.Response{
|
||||
StatusCode: base.StatusOK,
|
||||
|
|
@ -313,31 +385,31 @@ func (c *Client) run() {
|
|||
}, nil
|
||||
}
|
||||
|
||||
path, err := c.parent.OnClientSetupPlay(c, reqPath, trackID, req)
|
||||
if err != nil {
|
||||
switch terr := err.(type) {
|
||||
resc := make(chan SetupPlayRes)
|
||||
c.parent.OnClientSetupPlay(SetupPlayReq{c, reqPath, trackID, req, resc})
|
||||
res := <-resc
|
||||
|
||||
if res.Err != nil {
|
||||
switch terr := res.Err.(type) {
|
||||
case errAuthNotCritical:
|
||||
return terr.Response, nil
|
||||
|
||||
case errAuthCritical:
|
||||
// wait some seconds to stop brute force attacks
|
||||
t := time.NewTimer(pauseAfterAuthError)
|
||||
defer t.Stop()
|
||||
select {
|
||||
case <-t.C:
|
||||
case <-time.After(pauseAfterAuthError):
|
||||
case <-c.terminate:
|
||||
}
|
||||
|
||||
return terr.Response, errTerminated
|
||||
|
||||
default:
|
||||
return &base.Response{
|
||||
StatusCode: base.StatusBadRequest,
|
||||
}, err
|
||||
}, res.Err
|
||||
}
|
||||
}
|
||||
|
||||
c.path = path
|
||||
c.path = res.Path
|
||||
|
||||
return &base.Response{
|
||||
StatusCode: base.StatusOK,
|
||||
|
|
@ -355,31 +427,31 @@ func (c *Client) run() {
|
|||
}, nil
|
||||
}
|
||||
|
||||
path, err := c.parent.OnClientSetupPlay(c, reqPath, trackID, req)
|
||||
if err != nil {
|
||||
switch terr := err.(type) {
|
||||
resc := make(chan SetupPlayRes)
|
||||
c.parent.OnClientSetupPlay(SetupPlayReq{c, reqPath, trackID, req, resc})
|
||||
res := <-resc
|
||||
|
||||
if res.Err != nil {
|
||||
switch terr := res.Err.(type) {
|
||||
case errAuthNotCritical:
|
||||
return terr.Response, nil
|
||||
|
||||
case errAuthCritical:
|
||||
// wait some seconds to stop brute force attacks
|
||||
t := time.NewTimer(pauseAfterAuthError)
|
||||
defer t.Stop()
|
||||
select {
|
||||
case <-t.C:
|
||||
case <-time.After(pauseAfterAuthError):
|
||||
case <-c.terminate:
|
||||
}
|
||||
|
||||
return terr.Response, errTerminated
|
||||
|
||||
default:
|
||||
return &base.Response{
|
||||
StatusCode: base.StatusBadRequest,
|
||||
}, err
|
||||
}, res.Err
|
||||
}
|
||||
}
|
||||
|
||||
c.path = path
|
||||
c.path = res.Path
|
||||
|
||||
return &base.Response{
|
||||
StatusCode: base.StatusOK,
|
||||
|
|
@ -496,11 +568,15 @@ func (c *Client) run() {
|
|||
switch c.conn.State() {
|
||||
case gortsplib.ServerConnStatePlay:
|
||||
c.stopPlay()
|
||||
c.path.OnClientPause(c)
|
||||
res := make(chan struct{})
|
||||
c.path.OnClientPause(PauseReq{c, res})
|
||||
<-res
|
||||
|
||||
case gortsplib.ServerConnStateRecord:
|
||||
c.stopRecord()
|
||||
c.path.OnClientPause(c)
|
||||
res := make(chan struct{})
|
||||
c.path.OnClientPause(PauseReq{c, res})
|
||||
<-res
|
||||
}
|
||||
|
||||
return &base.Response{
|
||||
|
|
@ -547,7 +623,9 @@ func (c *Client) run() {
|
|||
}
|
||||
|
||||
if c.path != nil {
|
||||
c.path.OnClientRemove(c)
|
||||
res := make(chan struct{})
|
||||
c.path.OnClientRemove(RemoveReq{c, res})
|
||||
<-res
|
||||
c.path = nil
|
||||
}
|
||||
|
||||
|
|
@ -567,7 +645,9 @@ func (c *Client) run() {
|
|||
}
|
||||
|
||||
if c.path != nil {
|
||||
c.path.OnClientRemove(c)
|
||||
res := make(chan struct{})
|
||||
c.path.OnClientRemove(RemoveReq{c, res})
|
||||
<-res
|
||||
c.path = nil
|
||||
}
|
||||
}
|
||||
|
|
@ -656,7 +736,9 @@ func (c *Client) Authenticate(authMethods []headers.AuthMethod, ips []interface{
|
|||
}
|
||||
|
||||
func (c *Client) startPlay() {
|
||||
c.path.OnClientPlay(c)
|
||||
res := make(chan struct{})
|
||||
c.path.OnClientPlay(PlayReq{c, res})
|
||||
<-res
|
||||
|
||||
c.log(logger.Info, "is reading from path '%s', %d %s with %s", c.path.Name(),
|
||||
c.conn.SetuppedTracksLen(),
|
||||
|
|
@ -683,7 +765,9 @@ func (c *Client) stopPlay() {
|
|||
}
|
||||
|
||||
func (c *Client) startRecord() {
|
||||
c.path.OnClientRecord(c)
|
||||
res := make(chan struct{})
|
||||
c.path.OnClientRecord(RecordReq{c, res})
|
||||
<-res
|
||||
|
||||
c.log(logger.Info, "is publishing to path '%s', %d %s with %s", c.path.Name(),
|
||||
c.conn.SetuppedTracksLen(),
|
||||
|
|
|
|||
|
|
@ -173,16 +173,16 @@ func (cm *ClientManager) OnClientClose(c *client.Client) {
|
|||
}
|
||||
|
||||
// OnClientDescribe is called by client.Client.
|
||||
func (cm *ClientManager) OnClientDescribe(c *client.Client, pathName string, req *base.Request) (client.Path, error) {
|
||||
return cm.pathMan.OnClientDescribe(c, pathName, req)
|
||||
func (cm *ClientManager) OnClientDescribe(req client.DescribeReq) {
|
||||
cm.pathMan.OnClientDescribe(req)
|
||||
}
|
||||
|
||||
// OnClientAnnounce is called by client.Client.
|
||||
func (cm *ClientManager) OnClientAnnounce(c *client.Client, pathName string, tracks gortsplib.Tracks, req *base.Request) (client.Path, error) {
|
||||
return cm.pathMan.OnClientAnnounce(c, pathName, tracks, req)
|
||||
func (cm *ClientManager) OnClientAnnounce(req client.AnnounceReq) {
|
||||
cm.pathMan.OnClientAnnounce(req)
|
||||
}
|
||||
|
||||
// OnClientSetupPlay is called by client.Client.
|
||||
func (cm *ClientManager) OnClientSetupPlay(c *client.Client, pathName string, trackID int, req *base.Request) (client.Path, error) {
|
||||
return cm.pathMan.OnClientSetupPlay(c, pathName, trackID, req)
|
||||
func (cm *ClientManager) OnClientSetupPlay(req client.SetupPlayReq) {
|
||||
cm.pathMan.OnClientSetupPlay(req)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -9,7 +9,6 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/aler9/gortsplib"
|
||||
"github.com/aler9/gortsplib/pkg/base"
|
||||
|
||||
"github.com/aler9/rtsp-simple-server/internal/client"
|
||||
"github.com/aler9/rtsp-simple-server/internal/conf"
|
||||
|
|
@ -55,70 +54,6 @@ type sourceRedirect struct{}
|
|||
|
||||
func (*sourceRedirect) IsSource() {}
|
||||
|
||||
// ClientDescribeRes is a client describe response.
|
||||
type ClientDescribeRes struct {
|
||||
Path client.Path
|
||||
Err error
|
||||
}
|
||||
|
||||
// ClientDescribeReq is a client describe request.
|
||||
type ClientDescribeReq struct {
|
||||
Res chan ClientDescribeRes
|
||||
Client *client.Client
|
||||
PathName string
|
||||
Req *base.Request
|
||||
}
|
||||
|
||||
// ClientAnnounceRes is a client announce response.
|
||||
type ClientAnnounceRes struct {
|
||||
Path client.Path
|
||||
Err error
|
||||
}
|
||||
|
||||
// ClientAnnounceReq is a client announce request.
|
||||
type ClientAnnounceReq struct {
|
||||
Res chan ClientAnnounceRes
|
||||
Client *client.Client
|
||||
PathName string
|
||||
Tracks gortsplib.Tracks
|
||||
Req *base.Request
|
||||
}
|
||||
|
||||
// ClientSetupPlayRes is a setup/play response.
|
||||
type ClientSetupPlayRes struct {
|
||||
Path client.Path
|
||||
Err error
|
||||
}
|
||||
|
||||
// ClientSetupPlayReq is a setup/play request.
|
||||
type ClientSetupPlayReq struct {
|
||||
Res chan ClientSetupPlayRes
|
||||
Client *client.Client
|
||||
PathName string
|
||||
TrackID int
|
||||
Req *base.Request
|
||||
}
|
||||
|
||||
type clientRemoveReq struct {
|
||||
res chan struct{}
|
||||
client *client.Client
|
||||
}
|
||||
|
||||
type clientPlayReq struct {
|
||||
res chan struct{}
|
||||
client *client.Client
|
||||
}
|
||||
|
||||
type clientRecordReq struct {
|
||||
res chan struct{}
|
||||
client *client.Client
|
||||
}
|
||||
|
||||
type clientPauseReq struct {
|
||||
res chan struct{}
|
||||
client *client.Client
|
||||
}
|
||||
|
||||
type clientState int
|
||||
|
||||
const (
|
||||
|
|
@ -169,15 +104,15 @@ type Path struct {
|
|||
closeTimerStarted bool
|
||||
|
||||
// in
|
||||
sourceSetReady chan struct{} // from source
|
||||
sourceSetNotReady chan struct{} // from source
|
||||
clientDescribe chan ClientDescribeReq // from program
|
||||
clientAnnounce chan ClientAnnounceReq // from program
|
||||
clientSetupPlay chan ClientSetupPlayReq // from program
|
||||
clientPlay chan clientPlayReq // from client
|
||||
clientRecord chan clientRecordReq // from client
|
||||
clientPause chan clientPauseReq // from client
|
||||
clientRemove chan clientRemoveReq // from client
|
||||
sourceSetReady chan struct{} // from source
|
||||
sourceSetNotReady chan struct{} // from source
|
||||
clientDescribe chan client.DescribeReq // from program
|
||||
clientAnnounce chan client.AnnounceReq // from program
|
||||
clientSetupPlay chan client.SetupPlayReq // from program
|
||||
clientPlay chan client.PlayReq // from client
|
||||
clientRecord chan client.RecordReq // from client
|
||||
clientPause chan client.PauseReq // from client
|
||||
clientRemove chan client.RemoveReq // from client
|
||||
terminate chan struct{}
|
||||
}
|
||||
|
||||
|
|
@ -213,13 +148,13 @@ func New(
|
|||
closeTimer: newEmptyTimer(),
|
||||
sourceSetReady: make(chan struct{}),
|
||||
sourceSetNotReady: make(chan struct{}),
|
||||
clientDescribe: make(chan ClientDescribeReq),
|
||||
clientAnnounce: make(chan ClientAnnounceReq),
|
||||
clientSetupPlay: make(chan ClientSetupPlayReq),
|
||||
clientPlay: make(chan clientPlayReq),
|
||||
clientRecord: make(chan clientRecordReq),
|
||||
clientPause: make(chan clientPauseReq),
|
||||
clientRemove: make(chan clientRemoveReq),
|
||||
clientDescribe: make(chan client.DescribeReq),
|
||||
clientAnnounce: make(chan client.AnnounceReq),
|
||||
clientSetupPlay: make(chan client.SetupPlayReq),
|
||||
clientPlay: make(chan client.PlayReq),
|
||||
clientRecord: make(chan client.RecordReq),
|
||||
clientPause: make(chan client.PauseReq),
|
||||
clientRemove: make(chan client.RemoveReq),
|
||||
terminate: make(chan struct{}),
|
||||
}
|
||||
|
||||
|
|
@ -304,57 +239,57 @@ outer:
|
|||
|
||||
case req := <-pa.clientDescribe:
|
||||
if _, ok := pa.clients[req.Client]; ok {
|
||||
req.Res <- ClientDescribeRes{nil, fmt.Errorf("already subscribed")}
|
||||
req.Res <- client.DescribeRes{nil, fmt.Errorf("already subscribed")} //nolint:govet
|
||||
continue
|
||||
}
|
||||
|
||||
// reply immediately
|
||||
req.Res <- ClientDescribeRes{pa, nil}
|
||||
req.Res <- client.DescribeRes{pa, nil} //nolint:govet
|
||||
|
||||
pa.onClientDescribe(req.Client)
|
||||
|
||||
case req := <-pa.clientSetupPlay:
|
||||
err := pa.onClientSetupPlay(req.Client, req.TrackID)
|
||||
if err != nil {
|
||||
req.Res <- ClientSetupPlayRes{nil, err}
|
||||
req.Res <- client.SetupPlayRes{nil, err} //nolint:govet
|
||||
continue
|
||||
}
|
||||
req.Res <- ClientSetupPlayRes{pa, nil}
|
||||
req.Res <- client.SetupPlayRes{pa, nil} //nolint:govet
|
||||
|
||||
case req := <-pa.clientPlay:
|
||||
pa.onClientPlay(req.client)
|
||||
close(req.res)
|
||||
pa.onClientPlay(req.Client)
|
||||
close(req.Res)
|
||||
|
||||
case req := <-pa.clientAnnounce:
|
||||
err := pa.onClientAnnounce(req.Client, req.Tracks)
|
||||
if err != nil {
|
||||
req.Res <- ClientAnnounceRes{nil, err}
|
||||
req.Res <- client.AnnounceRes{nil, err} //nolint:govet
|
||||
continue
|
||||
}
|
||||
req.Res <- ClientAnnounceRes{pa, nil}
|
||||
req.Res <- client.AnnounceRes{pa, nil} //nolint:govet
|
||||
|
||||
case req := <-pa.clientRecord:
|
||||
pa.onClientRecord(req.client)
|
||||
close(req.res)
|
||||
pa.onClientRecord(req.Client)
|
||||
close(req.Res)
|
||||
|
||||
case req := <-pa.clientPause:
|
||||
pa.onClientPause(req.client)
|
||||
close(req.res)
|
||||
pa.onClientPause(req.Client)
|
||||
close(req.Res)
|
||||
|
||||
case req := <-pa.clientRemove:
|
||||
if _, ok := pa.clients[req.client]; !ok {
|
||||
close(req.res)
|
||||
if _, ok := pa.clients[req.Client]; !ok {
|
||||
close(req.Res)
|
||||
continue
|
||||
}
|
||||
|
||||
if pa.clients[req.client] != clientStatePreRemove {
|
||||
pa.removeClient(req.client)
|
||||
if pa.clients[req.Client] != clientStatePreRemove {
|
||||
pa.removeClient(req.Client)
|
||||
}
|
||||
|
||||
delete(pa.clients, req.client)
|
||||
delete(pa.clients, req.Client)
|
||||
pa.clientsWg.Done()
|
||||
|
||||
close(req.res)
|
||||
close(req.Res)
|
||||
|
||||
case <-pa.terminate:
|
||||
pa.exhaustChannels()
|
||||
|
|
@ -426,51 +361,51 @@ func (pa *Path) exhaustChannels() {
|
|||
if !ok {
|
||||
return
|
||||
}
|
||||
req.Res <- ClientDescribeRes{nil, fmt.Errorf("terminated")}
|
||||
req.Res <- client.DescribeRes{nil, fmt.Errorf("terminated")} //nolint:govet
|
||||
|
||||
case req, ok := <-pa.clientAnnounce:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
req.Res <- ClientAnnounceRes{nil, fmt.Errorf("terminated")}
|
||||
req.Res <- client.AnnounceRes{nil, fmt.Errorf("terminated")} //nolint:govet
|
||||
|
||||
case req, ok := <-pa.clientSetupPlay:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
req.Res <- ClientSetupPlayRes{nil, fmt.Errorf("terminated")}
|
||||
req.Res <- client.SetupPlayRes{nil, fmt.Errorf("terminated")} //nolint:govet
|
||||
|
||||
case req, ok := <-pa.clientPlay:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
close(req.res)
|
||||
close(req.Res)
|
||||
|
||||
case req, ok := <-pa.clientRecord:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
close(req.res)
|
||||
close(req.Res)
|
||||
|
||||
case req, ok := <-pa.clientPause:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
close(req.res)
|
||||
close(req.Res)
|
||||
|
||||
case req, ok := <-pa.clientRemove:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
if _, ok := pa.clients[req.client]; !ok {
|
||||
close(req.res)
|
||||
if _, ok := pa.clients[req.Client]; !ok {
|
||||
close(req.Res)
|
||||
continue
|
||||
}
|
||||
|
||||
pa.clientsWg.Done()
|
||||
|
||||
close(req.res)
|
||||
close(req.Res)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
|
@ -847,46 +782,38 @@ func (pa *Path) OnSourceSetNotReady() {
|
|||
}
|
||||
|
||||
// OnPathManDescribe is called by pathman.PathMan.
|
||||
func (pa *Path) OnPathManDescribe(req ClientDescribeReq) {
|
||||
func (pa *Path) OnPathManDescribe(req client.DescribeReq) {
|
||||
pa.clientDescribe <- req
|
||||
}
|
||||
|
||||
// OnPathManSetupPlay is called by pathman.PathMan.
|
||||
func (pa *Path) OnPathManSetupPlay(req ClientSetupPlayReq) {
|
||||
func (pa *Path) OnPathManSetupPlay(req client.SetupPlayReq) {
|
||||
pa.clientSetupPlay <- req
|
||||
}
|
||||
|
||||
// OnPathManAnnounce is called by pathman.PathMan.
|
||||
func (pa *Path) OnPathManAnnounce(req ClientAnnounceReq) {
|
||||
func (pa *Path) OnPathManAnnounce(req client.AnnounceReq) {
|
||||
pa.clientAnnounce <- req
|
||||
}
|
||||
|
||||
// OnClientRemove is called by client.Client.
|
||||
func (pa *Path) OnClientRemove(c *client.Client) {
|
||||
res := make(chan struct{})
|
||||
pa.clientRemove <- clientRemoveReq{res, c}
|
||||
<-res
|
||||
func (pa *Path) OnClientRemove(req client.RemoveReq) {
|
||||
pa.clientRemove <- req
|
||||
}
|
||||
|
||||
// OnClientPlay is called by client.Client.
|
||||
func (pa *Path) OnClientPlay(c *client.Client) {
|
||||
res := make(chan struct{})
|
||||
pa.clientPlay <- clientPlayReq{res, c}
|
||||
<-res
|
||||
func (pa *Path) OnClientPlay(req client.PlayReq) {
|
||||
pa.clientPlay <- req
|
||||
}
|
||||
|
||||
// OnClientRecord is called by client.Client.
|
||||
func (pa *Path) OnClientRecord(c *client.Client) {
|
||||
res := make(chan struct{})
|
||||
pa.clientRecord <- clientRecordReq{res, c}
|
||||
<-res
|
||||
func (pa *Path) OnClientRecord(req client.RecordReq) {
|
||||
pa.clientRecord <- req
|
||||
}
|
||||
|
||||
// OnClientPause is called by client.Client.
|
||||
func (pa *Path) OnClientPause(c *client.Client) {
|
||||
res := make(chan struct{})
|
||||
pa.clientPause <- clientPauseReq{res, c}
|
||||
<-res
|
||||
func (pa *Path) OnClientPause(req client.PauseReq) {
|
||||
pa.clientPause <- req
|
||||
}
|
||||
|
||||
// OnFrame is called by a source or by a client.Client.
|
||||
|
|
|
|||
|
|
@ -5,7 +5,6 @@ import (
|
|||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/aler9/gortsplib"
|
||||
"github.com/aler9/gortsplib/pkg/base"
|
||||
"github.com/aler9/gortsplib/pkg/headers"
|
||||
|
||||
|
|
@ -38,9 +37,9 @@ type PathManager struct {
|
|||
// in
|
||||
confReload chan map[string]*conf.PathConf
|
||||
pathClose chan *path.Path
|
||||
clientDescribe chan path.ClientDescribeReq
|
||||
clientAnnounce chan path.ClientAnnounceReq
|
||||
clientSetupPlay chan path.ClientSetupPlayReq
|
||||
clientDescribe chan client.DescribeReq
|
||||
clientAnnounce chan client.AnnounceReq
|
||||
clientSetupPlay chan client.SetupPlayReq
|
||||
terminate chan struct{}
|
||||
|
||||
// out
|
||||
|
|
@ -71,9 +70,9 @@ func New(
|
|||
paths: make(map[string]*path.Path),
|
||||
confReload: make(chan map[string]*conf.PathConf),
|
||||
pathClose: make(chan *path.Path),
|
||||
clientDescribe: make(chan path.ClientDescribeReq),
|
||||
clientAnnounce: make(chan path.ClientAnnounceReq),
|
||||
clientSetupPlay: make(chan path.ClientSetupPlayReq),
|
||||
clientDescribe: make(chan client.DescribeReq),
|
||||
clientAnnounce: make(chan client.AnnounceReq),
|
||||
clientSetupPlay: make(chan client.SetupPlayReq),
|
||||
terminate: make(chan struct{}),
|
||||
clientClose: make(chan *client.Client),
|
||||
done: make(chan struct{}),
|
||||
|
|
@ -150,14 +149,14 @@ outer:
|
|||
case req := <-pm.clientDescribe:
|
||||
pathName, pathConf, err := pm.findPathConf(req.PathName)
|
||||
if err != nil {
|
||||
req.Res <- path.ClientDescribeRes{nil, err} //nolint:govet
|
||||
req.Res <- client.DescribeRes{nil, err} //nolint:govet
|
||||
continue
|
||||
}
|
||||
|
||||
err = req.Client.Authenticate(pm.authMethods, pathConf.ReadIpsParsed,
|
||||
pathConf.ReadUser, pathConf.ReadPass, req.Req, nil)
|
||||
if err != nil {
|
||||
req.Res <- path.ClientDescribeRes{nil, err} //nolint:govet
|
||||
req.Res <- client.DescribeRes{nil, err} //nolint:govet
|
||||
continue
|
||||
}
|
||||
|
||||
|
|
@ -182,7 +181,7 @@ outer:
|
|||
case req := <-pm.clientAnnounce:
|
||||
pathName, pathConf, err := pm.findPathConf(req.PathName)
|
||||
if err != nil {
|
||||
req.Res <- path.ClientAnnounceRes{nil, err} //nolint:govet
|
||||
req.Res <- client.AnnounceRes{nil, err} //nolint:govet
|
||||
continue
|
||||
}
|
||||
|
||||
|
|
@ -190,7 +189,7 @@ outer:
|
|||
pathConf.PublishIpsParsed, pathConf.PublishUser,
|
||||
pathConf.PublishPass, req.Req, nil)
|
||||
if err != nil {
|
||||
req.Res <- path.ClientAnnounceRes{nil, err} //nolint:govet
|
||||
req.Res <- client.AnnounceRes{nil, err} //nolint:govet
|
||||
continue
|
||||
}
|
||||
|
||||
|
|
@ -214,13 +213,13 @@ outer:
|
|||
|
||||
case req := <-pm.clientSetupPlay:
|
||||
if _, ok := pm.paths[req.PathName]; !ok {
|
||||
req.Res <- path.ClientSetupPlayRes{nil, fmt.Errorf("no one is publishing to path '%s'", req.PathName)} //nolint:govet
|
||||
req.Res <- client.SetupPlayRes{nil, fmt.Errorf("no one is publishing to path '%s'", req.PathName)} //nolint:govet
|
||||
continue
|
||||
}
|
||||
|
||||
_, pathConf, err := pm.findPathConf(req.PathName)
|
||||
if err != nil {
|
||||
req.Res <- path.ClientSetupPlayRes{nil, err} //nolint:govet
|
||||
req.Res <- client.SetupPlayRes{nil, err} //nolint:govet
|
||||
continue
|
||||
}
|
||||
|
||||
|
|
@ -236,7 +235,7 @@ outer:
|
|||
pathConf.ReadIpsParsed, pathConf.ReadUser, pathConf.ReadPass,
|
||||
req.Req, altURL)
|
||||
if err != nil {
|
||||
req.Res <- path.ClientSetupPlayRes{nil, err} //nolint:govet
|
||||
req.Res <- client.SetupPlayRes{nil, err} //nolint:govet
|
||||
continue
|
||||
}
|
||||
|
||||
|
|
@ -261,13 +260,13 @@ outer:
|
|||
}
|
||||
|
||||
case req := <-pm.clientDescribe:
|
||||
req.Res <- path.ClientDescribeRes{nil, fmt.Errorf("terminated")} //nolint:govet
|
||||
req.Res <- client.DescribeRes{nil, fmt.Errorf("terminated")} //nolint:govet
|
||||
|
||||
case req := <-pm.clientAnnounce:
|
||||
req.Res <- path.ClientAnnounceRes{nil, fmt.Errorf("terminated")} //nolint:govet
|
||||
req.Res <- client.AnnounceRes{nil, fmt.Errorf("terminated")} //nolint:govet
|
||||
|
||||
case req := <-pm.clientSetupPlay:
|
||||
req.Res <- path.ClientSetupPlayRes{nil, fmt.Errorf("terminated")} //nolint:govet
|
||||
req.Res <- client.SetupPlayRes{nil, fmt.Errorf("terminated")} //nolint:govet
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
|
@ -341,27 +340,18 @@ func (pm *PathManager) OnPathClientClose(c *client.Client) {
|
|||
}
|
||||
|
||||
// OnClientDescribe is called by client.Client.
|
||||
func (pm *PathManager) OnClientDescribe(c *client.Client, pathName string, req *base.Request) (client.Path, error) {
|
||||
res := make(chan path.ClientDescribeRes)
|
||||
pm.clientDescribe <- path.ClientDescribeReq{res, c, pathName, req} //nolint:govet
|
||||
re := <-res
|
||||
return re.Path, re.Err
|
||||
func (pm *PathManager) OnClientDescribe(req client.DescribeReq) {
|
||||
pm.clientDescribe <- req
|
||||
}
|
||||
|
||||
// OnClientAnnounce is called by client.Client.
|
||||
func (pm *PathManager) OnClientAnnounce(c *client.Client, pathName string, tracks gortsplib.Tracks, req *base.Request) (client.Path, error) {
|
||||
res := make(chan path.ClientAnnounceRes)
|
||||
pm.clientAnnounce <- path.ClientAnnounceReq{res, c, pathName, tracks, req} //nolint:govet
|
||||
re := <-res
|
||||
return re.Path, re.Err
|
||||
func (pm *PathManager) OnClientAnnounce(req client.AnnounceReq) {
|
||||
pm.clientAnnounce <- req
|
||||
}
|
||||
|
||||
// OnClientSetupPlay is called by client.Client.
|
||||
func (pm *PathManager) OnClientSetupPlay(c *client.Client, pathName string, trackID int, req *base.Request) (client.Path, error) {
|
||||
res := make(chan path.ClientSetupPlayRes)
|
||||
pm.clientSetupPlay <- path.ClientSetupPlayReq{res, c, pathName, trackID, req} //nolint:govet
|
||||
re := <-res
|
||||
return re.Path, re.Err
|
||||
func (pm *PathManager) OnClientSetupPlay(req client.SetupPlayReq) {
|
||||
pm.clientSetupPlay <- req
|
||||
}
|
||||
|
||||
// ClientClose is called by client.Client.
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue