1
0
Fork 0
forked from External/mediamtx

support runOnConnect and runOnDemand with RTMP clients

This commit is contained in:
aler9 2021-01-31 23:16:44 +01:00
parent 2025fa163d
commit 59e754feb1
2 changed files with 60 additions and 25 deletions

View file

@ -160,7 +160,10 @@ outer:
case conn := <-rtmpAccept:
c := clientrtmp.New(
cm.rtspPort,
cm.readTimeout,
cm.runOnConnect,
cm.runOnConnectRestart,
&cm.wg,
cm.stats,
conn,

View file

@ -5,6 +5,7 @@ import (
"io"
"net"
"net/url"
"strconv"
"strings"
"sync"
"sync/atomic"
@ -19,6 +20,7 @@ import (
"github.com/notedit/rtmp/codec/h264"
"github.com/aler9/rtsp-simple-server/internal/client"
"github.com/aler9/rtsp-simple-server/internal/externalcmd"
"github.com/aler9/rtsp-simple-server/internal/logger"
"github.com/aler9/rtsp-simple-server/internal/rtmputils"
"github.com/aler9/rtsp-simple-server/internal/stats"
@ -54,11 +56,14 @@ type Parent interface {
// Client is a RTMP client.
type Client struct {
readTimeout time.Duration
stats *stats.Stats
wg *sync.WaitGroup
conn rtmputils.ConnPair
parent Parent
rtspPort int
readTimeout time.Duration
runOnConnect string
runOnConnectRestart bool
stats *stats.Stats
wg *sync.WaitGroup
conn rtmputils.ConnPair
parent Parent
path client.Path
@ -68,19 +73,25 @@ type Client struct {
// New allocates a Client.
func New(
rtspPort int,
readTimeout time.Duration,
runOnConnect string,
runOnConnectRestart bool,
wg *sync.WaitGroup,
stats *stats.Stats,
conn rtmputils.ConnPair,
parent Parent) *Client {
c := &Client{
readTimeout: readTimeout,
wg: wg,
stats: stats,
conn: conn,
parent: parent,
terminate: make(chan struct{}),
rtspPort: rtspPort,
readTimeout: readTimeout,
runOnConnect: runOnConnect,
runOnConnectRestart: runOnConnectRestart,
wg: wg,
stats: stats,
conn: conn,
parent: parent,
terminate: make(chan struct{}),
}
atomic.AddInt64(c.stats.CountClients, 1)
@ -116,6 +127,14 @@ func (c *Client) run() {
defer c.wg.Done()
defer c.log(logger.Info, "disconnected")
if c.runOnConnect != "" {
onConnectCmd := externalcmd.New(c.runOnConnect, c.runOnConnectRestart, externalcmd.Environment{
Path: "",
Port: strconv.FormatInt(int64(c.rtspPort), 10),
})
defer onConnectCmd.Close()
}
if !c.conn.RConn.Publishing {
c.conn.NConn.Close()
c.log(logger.Info, "ERR: client is not publishing")
@ -220,21 +239,34 @@ func (c *Client) run() {
return
}
func() {
resc := make(chan struct{})
c.path.OnClientRecord(client.RecordReq{c, resc}) //nolint:govet
<-resc
resc := make(chan struct{})
c.path.OnClientRecord(client.RecordReq{c, resc}) //nolint:govet
<-resc
c.log(logger.Info, "is publishing to path '%s', %d %s",
c.path.Name(),
len(tracks),
func() string {
if len(tracks) == 1 {
return "track"
}
return "tracks"
}())
}()
c.log(logger.Info, "is publishing to path '%s', %d %s",
c.path.Name(),
len(tracks),
func() string {
if len(tracks) == 1 {
return "track"
}
return "tracks"
}())
var onPublishCmd *externalcmd.Cmd
if c.path.Conf().RunOnPublish != "" {
onPublishCmd = externalcmd.New(c.path.Conf().RunOnPublish,
c.path.Conf().RunOnPublishRestart, externalcmd.Environment{
Path: c.path.Name(),
Port: strconv.FormatInt(int64(c.rtspPort), 10),
})
}
defer func(path client.Path) {
if path.Conf().RunOnPublish != "" {
onPublishCmd.Close()
}
}(c.path)
readerDone := make(chan error)
go func() {