From 59e754feb16e3d248927fb0cda7be10495a6f35d Mon Sep 17 00:00:00 2001 From: aler9 <46489434+aler9@users.noreply.github.com> Date: Sun, 31 Jan 2021 23:16:44 +0100 Subject: [PATCH] support runOnConnect and runOnDemand with RTMP clients --- internal/clientman/clientman.go | 3 ++ internal/clientrtmp/client.go | 82 +++++++++++++++++++++++---------- 2 files changed, 60 insertions(+), 25 deletions(-) diff --git a/internal/clientman/clientman.go b/internal/clientman/clientman.go index a5334cf5..780dee06 100644 --- a/internal/clientman/clientman.go +++ b/internal/clientman/clientman.go @@ -160,7 +160,10 @@ outer: case conn := <-rtmpAccept: c := clientrtmp.New( + cm.rtspPort, cm.readTimeout, + cm.runOnConnect, + cm.runOnConnectRestart, &cm.wg, cm.stats, conn, diff --git a/internal/clientrtmp/client.go b/internal/clientrtmp/client.go index 81509c3d..caa7aa62 100644 --- a/internal/clientrtmp/client.go +++ b/internal/clientrtmp/client.go @@ -5,6 +5,7 @@ import ( "io" "net" "net/url" + "strconv" "strings" "sync" "sync/atomic" @@ -19,6 +20,7 @@ import ( "github.com/notedit/rtmp/codec/h264" "github.com/aler9/rtsp-simple-server/internal/client" + "github.com/aler9/rtsp-simple-server/internal/externalcmd" "github.com/aler9/rtsp-simple-server/internal/logger" "github.com/aler9/rtsp-simple-server/internal/rtmputils" "github.com/aler9/rtsp-simple-server/internal/stats" @@ -54,11 +56,14 @@ type Parent interface { // Client is a RTMP client. type Client struct { - readTimeout time.Duration - stats *stats.Stats - wg *sync.WaitGroup - conn rtmputils.ConnPair - parent Parent + rtspPort int + readTimeout time.Duration + runOnConnect string + runOnConnectRestart bool + stats *stats.Stats + wg *sync.WaitGroup + conn rtmputils.ConnPair + parent Parent path client.Path @@ -68,19 +73,25 @@ type Client struct { // New allocates a Client. func New( + rtspPort int, readTimeout time.Duration, + runOnConnect string, + runOnConnectRestart bool, wg *sync.WaitGroup, stats *stats.Stats, conn rtmputils.ConnPair, parent Parent) *Client { c := &Client{ - readTimeout: readTimeout, - wg: wg, - stats: stats, - conn: conn, - parent: parent, - terminate: make(chan struct{}), + rtspPort: rtspPort, + readTimeout: readTimeout, + runOnConnect: runOnConnect, + runOnConnectRestart: runOnConnectRestart, + wg: wg, + stats: stats, + conn: conn, + parent: parent, + terminate: make(chan struct{}), } atomic.AddInt64(c.stats.CountClients, 1) @@ -116,6 +127,14 @@ func (c *Client) run() { defer c.wg.Done() defer c.log(logger.Info, "disconnected") + if c.runOnConnect != "" { + onConnectCmd := externalcmd.New(c.runOnConnect, c.runOnConnectRestart, externalcmd.Environment{ + Path: "", + Port: strconv.FormatInt(int64(c.rtspPort), 10), + }) + defer onConnectCmd.Close() + } + if !c.conn.RConn.Publishing { c.conn.NConn.Close() c.log(logger.Info, "ERR: client is not publishing") @@ -220,21 +239,34 @@ func (c *Client) run() { return } - func() { - resc := make(chan struct{}) - c.path.OnClientRecord(client.RecordReq{c, resc}) //nolint:govet - <-resc + resc := make(chan struct{}) + c.path.OnClientRecord(client.RecordReq{c, resc}) //nolint:govet + <-resc - c.log(logger.Info, "is publishing to path '%s', %d %s", - c.path.Name(), - len(tracks), - func() string { - if len(tracks) == 1 { - return "track" - } - return "tracks" - }()) - }() + c.log(logger.Info, "is publishing to path '%s', %d %s", + c.path.Name(), + len(tracks), + func() string { + if len(tracks) == 1 { + return "track" + } + return "tracks" + }()) + + var onPublishCmd *externalcmd.Cmd + if c.path.Conf().RunOnPublish != "" { + onPublishCmd = externalcmd.New(c.path.Conf().RunOnPublish, + c.path.Conf().RunOnPublishRestart, externalcmd.Environment{ + Path: c.path.Name(), + Port: strconv.FormatInt(int64(c.rtspPort), 10), + }) + } + + defer func(path client.Path) { + if path.Conf().RunOnPublish != "" { + onPublishCmd.Close() + } + }(c.path) readerDone := make(chan error) go func() {