forked from External/mediamtx
add serverrtmp
This commit is contained in:
parent
17c22577c9
commit
f9a7ad7eca
7 changed files with 172 additions and 9 deletions
|
|
@ -13,6 +13,7 @@ Features:
|
||||||
* Each stream can have multiple video and audio tracks, encoded with any codec (including H264, H265, VP8, VP9, MPEG2, MP3, AAC, Opus, PCM)
|
* Each stream can have multiple video and audio tracks, encoded with any codec (including H264, H265, VP8, VP9, MPEG2, MP3, AAC, Opus, PCM)
|
||||||
* Serve multiple streams at once in separate paths
|
* Serve multiple streams at once in separate paths
|
||||||
* Encrypt streams with TLS (RTSPS)
|
* Encrypt streams with TLS (RTSPS)
|
||||||
|
* Publish legacy RTMP streams
|
||||||
* Pull and serve streams from other RTSP or RTMP servers, always or on-demand (RTSP proxy)
|
* Pull and serve streams from other RTSP or RTMP servers, always or on-demand (RTSP proxy)
|
||||||
* Authenticate readers and publishers separately
|
* Authenticate readers and publishers separately
|
||||||
* Redirect to other RTSP servers (load balancing)
|
* Redirect to other RTSP servers (load balancing)
|
||||||
|
|
|
||||||
|
|
@ -213,7 +213,7 @@ func (conf *Conf) fillAndCheck() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
if conf.RTMPPort == 0 {
|
if conf.RTMPPort == 0 {
|
||||||
conf.RTMPPort = 8888
|
conf.RTMPPort = 1935
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(conf.Paths) == 0 {
|
if len(conf.Paths) == 0 {
|
||||||
|
|
|
||||||
121
internal/serverrtmp/server.go
Normal file
121
internal/serverrtmp/server.go
Normal file
|
|
@ -0,0 +1,121 @@
|
||||||
|
package serverrtmp
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"net"
|
||||||
|
"strconv"
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"github.com/notedit/rtmp/av"
|
||||||
|
"github.com/notedit/rtmp/format/flv/flvio"
|
||||||
|
"github.com/notedit/rtmp/format/rtmp"
|
||||||
|
|
||||||
|
"github.com/aler9/rtsp-simple-server/internal/logger"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
codecH264 = 7
|
||||||
|
codecAAC = 10
|
||||||
|
)
|
||||||
|
|
||||||
|
func readMetadata(conn *rtmp.Conn) (flvio.AMFMap, error) {
|
||||||
|
pkt, err := conn.ReadPacket()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if pkt.Type != av.Metadata {
|
||||||
|
return nil, fmt.Errorf("first packet must be metadata")
|
||||||
|
}
|
||||||
|
|
||||||
|
arr, err := flvio.ParseAMFVals(pkt.Data, false)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(arr) != 1 {
|
||||||
|
return nil, fmt.Errorf("invalid metadata")
|
||||||
|
}
|
||||||
|
|
||||||
|
ma, ok := arr[0].(flvio.AMFMap)
|
||||||
|
if !ok {
|
||||||
|
return nil, fmt.Errorf("invalid metadata")
|
||||||
|
}
|
||||||
|
|
||||||
|
return ma, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Parent is implemented by program.
|
||||||
|
type Parent interface {
|
||||||
|
Log(logger.Level, string, ...interface{})
|
||||||
|
}
|
||||||
|
|
||||||
|
// Server is a RTMP listener.
|
||||||
|
type Server struct {
|
||||||
|
l net.Listener
|
||||||
|
srv *rtmp.Server
|
||||||
|
wg sync.WaitGroup
|
||||||
|
}
|
||||||
|
|
||||||
|
// New allocates a Server.
|
||||||
|
func New(
|
||||||
|
listenIP string,
|
||||||
|
port int,
|
||||||
|
parent Parent) (*Server, error) {
|
||||||
|
|
||||||
|
address := listenIP + ":" + strconv.FormatInt(int64(port), 10)
|
||||||
|
l, err := net.Listen("tcp", address)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
s := &Server{
|
||||||
|
l: l,
|
||||||
|
}
|
||||||
|
|
||||||
|
s.srv = rtmp.NewServer()
|
||||||
|
s.srv.HandleConn = s.innerHandleConn
|
||||||
|
|
||||||
|
parent.Log(logger.Info, "[RTMP listener] opened on %s", address)
|
||||||
|
|
||||||
|
s.wg.Add(1)
|
||||||
|
go s.run()
|
||||||
|
|
||||||
|
return s, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close closes a Server.
|
||||||
|
func (s *Server) Close() {
|
||||||
|
s.l.Close()
|
||||||
|
s.wg.Wait()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Server) run() {
|
||||||
|
defer s.wg.Done()
|
||||||
|
|
||||||
|
for {
|
||||||
|
nconn, err := s.l.Accept()
|
||||||
|
if err != nil {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
s.wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer s.wg.Done()
|
||||||
|
s.srv.HandleNetConn(nconn)
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Server) innerHandleConn(rconn *rtmp.Conn, nconn net.Conn) {
|
||||||
|
defer nconn.Close()
|
||||||
|
|
||||||
|
err := func() error {
|
||||||
|
if !rconn.Publishing {
|
||||||
|
return fmt.Errorf("not publishing")
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}()
|
||||||
|
fmt.Println("ERR", err)
|
||||||
|
}
|
||||||
|
|
@ -13,7 +13,7 @@ type Parent interface {
|
||||||
Log(logger.Level, string, ...interface{})
|
Log(logger.Level, string, ...interface{})
|
||||||
}
|
}
|
||||||
|
|
||||||
// Server is a TCP/TLS/RTSPS listener.
|
// Server is a RTSP listener.
|
||||||
type Server struct {
|
type Server struct {
|
||||||
parent Parent
|
parent Parent
|
||||||
|
|
||||||
|
|
@ -54,6 +54,7 @@ func New(
|
||||||
parent.Log(logger.Info, "[%s listener] opened on %s", label, address)
|
parent.Log(logger.Info, "[%s listener] opened on %s", label, address)
|
||||||
|
|
||||||
go s.run()
|
go s.run()
|
||||||
|
|
||||||
return s, nil
|
return s, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
22
main.go
22
main.go
|
|
@ -17,6 +17,7 @@ import (
|
||||||
"github.com/aler9/rtsp-simple-server/internal/metrics"
|
"github.com/aler9/rtsp-simple-server/internal/metrics"
|
||||||
"github.com/aler9/rtsp-simple-server/internal/pathman"
|
"github.com/aler9/rtsp-simple-server/internal/pathman"
|
||||||
"github.com/aler9/rtsp-simple-server/internal/pprof"
|
"github.com/aler9/rtsp-simple-server/internal/pprof"
|
||||||
|
"github.com/aler9/rtsp-simple-server/internal/serverrtmp"
|
||||||
"github.com/aler9/rtsp-simple-server/internal/serverrtsp"
|
"github.com/aler9/rtsp-simple-server/internal/serverrtsp"
|
||||||
"github.com/aler9/rtsp-simple-server/internal/serverudpl"
|
"github.com/aler9/rtsp-simple-server/internal/serverudpl"
|
||||||
"github.com/aler9/rtsp-simple-server/internal/stats"
|
"github.com/aler9/rtsp-simple-server/internal/stats"
|
||||||
|
|
@ -36,6 +37,7 @@ type program struct {
|
||||||
serverUDPRTCP *gortsplib.ServerUDPListener
|
serverUDPRTCP *gortsplib.ServerUDPListener
|
||||||
serverPlain *serverrtsp.Server
|
serverPlain *serverrtsp.Server
|
||||||
serverTLS *serverrtsp.Server
|
serverTLS *serverrtsp.Server
|
||||||
|
serverRTMP *serverrtmp.Server
|
||||||
pathMan *pathman.PathManager
|
pathMan *pathman.PathManager
|
||||||
clientMan *clientman.ClientManager
|
clientMan *clientman.ClientManager
|
||||||
confWatcher *confwatcher.ConfWatcher
|
confWatcher *confwatcher.ConfWatcher
|
||||||
|
|
@ -209,8 +211,8 @@ func (p *program) createResources(initial bool) error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if p.serverPlain == nil {
|
if p.conf.EncryptionParsed == conf.EncryptionNo || p.conf.EncryptionParsed == conf.EncryptionOptional {
|
||||||
if p.conf.EncryptionParsed == conf.EncryptionNo || p.conf.EncryptionParsed == conf.EncryptionOptional {
|
if p.serverPlain == nil {
|
||||||
conf := gortsplib.ServerConf{
|
conf := gortsplib.ServerConf{
|
||||||
ReadTimeout: p.conf.ReadTimeout,
|
ReadTimeout: p.conf.ReadTimeout,
|
||||||
WriteTimeout: p.conf.WriteTimeout,
|
WriteTimeout: p.conf.WriteTimeout,
|
||||||
|
|
@ -230,8 +232,8 @@ func (p *program) createResources(initial bool) error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if p.serverTLS == nil {
|
if p.conf.EncryptionParsed == conf.EncryptionStrict || p.conf.EncryptionParsed == conf.EncryptionOptional {
|
||||||
if p.conf.EncryptionParsed == conf.EncryptionStrict || p.conf.EncryptionParsed == conf.EncryptionOptional {
|
if p.serverTLS == nil {
|
||||||
cert, err := tls.LoadX509KeyPair(p.conf.ServerCert, p.conf.ServerKey)
|
cert, err := tls.LoadX509KeyPair(p.conf.ServerCert, p.conf.ServerKey)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
|
@ -255,6 +257,18 @@ func (p *program) createResources(initial bool) error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if p.conf.RTMPEnable {
|
||||||
|
if p.serverRTMP == nil {
|
||||||
|
p.serverRTMP, err = serverrtmp.New(
|
||||||
|
p.conf.ListenIP,
|
||||||
|
p.conf.RTMPPort,
|
||||||
|
p)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if p.pathMan == nil {
|
if p.pathMan == nil {
|
||||||
p.pathMan = pathman.New(
|
p.pathMan = pathman.New(
|
||||||
p.conf.RTSPPort,
|
p.conf.RTSPPort,
|
||||||
|
|
|
||||||
20
main_test.go
20
main_test.go
|
|
@ -849,6 +849,26 @@ func TestFallback(t *testing.T) {
|
||||||
require.Equal(t, 0, cnt2.wait())
|
require.Equal(t, 0, cnt2.wait())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestRTMP(t *testing.T) {
|
||||||
|
p, ok := testProgram("paths:\n" +
|
||||||
|
"rtmpEnable: yes\n")
|
||||||
|
require.Equal(t, true, ok)
|
||||||
|
defer p.close()
|
||||||
|
|
||||||
|
cnt1, err := newContainer("ffmpeg", "source", []string{
|
||||||
|
"-re",
|
||||||
|
"-stream_loop", "-1",
|
||||||
|
"-i", "emptyvideo.ts",
|
||||||
|
"-c", "copy",
|
||||||
|
"-f", "flv",
|
||||||
|
"rtmp://test:tast@" + ownDockerIP + ":1935/test1/test2",
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
defer cnt1.close()
|
||||||
|
|
||||||
|
time.Sleep(1 * time.Second)
|
||||||
|
}
|
||||||
|
|
||||||
func TestRunOnDemand(t *testing.T) {
|
func TestRunOnDemand(t *testing.T) {
|
||||||
doneFile := filepath.Join(os.TempDir(), "ondemand_done")
|
doneFile := filepath.Join(os.TempDir(), "ondemand_done")
|
||||||
onDemandFile, err := writeTempFile([]byte(fmt.Sprintf(`#!/bin/sh -e
|
onDemandFile, err := writeTempFile([]byte(fmt.Sprintf(`#!/bin/sh -e
|
||||||
|
|
|
||||||
|
|
@ -6,7 +6,10 @@ logDestinations: [stdout]
|
||||||
# if "file" is in logDestinations, this is the file which will receive the logs.
|
# if "file" is in logDestinations, this is the file which will receive the logs.
|
||||||
logFile: rtsp-simple-server.log
|
logFile: rtsp-simple-server.log
|
||||||
|
|
||||||
# supported stream protocols.
|
# listen IP. If provided, all listeners will listen on this specific IP.
|
||||||
|
listenIP:
|
||||||
|
|
||||||
|
# supported RTSP stream protocols.
|
||||||
# UDP is the most performant, but can cause problems if there's a NAT between
|
# UDP is the most performant, but can cause problems if there's a NAT between
|
||||||
# server and clients, and doesn't support encryption.
|
# server and clients, and doesn't support encryption.
|
||||||
# TCP is the most versatile, and does support encryption.
|
# TCP is the most versatile, and does support encryption.
|
||||||
|
|
@ -15,8 +18,6 @@ protocols: [udp, tcp]
|
||||||
# encrypt handshake and TCP streams with TLS (RTSPS).
|
# encrypt handshake and TCP streams with TLS (RTSPS).
|
||||||
# available values are "no", "strict", "optional".
|
# available values are "no", "strict", "optional".
|
||||||
encryption: no
|
encryption: no
|
||||||
# listen IP. If provided, all listeners will listen on this specific IP.
|
|
||||||
listenIP:
|
|
||||||
# port of the TCP/RTSP listener. This is used only if encryption is "no" or "optional".
|
# port of the TCP/RTSP listener. This is used only if encryption is "no" or "optional".
|
||||||
rtspPort: 8554
|
rtspPort: 8554
|
||||||
# port of the TCP/TLS/RTSPS listener. This is used only if encryption is "strict" or "optional".
|
# port of the TCP/TLS/RTSPS listener. This is used only if encryption is "strict" or "optional".
|
||||||
|
|
@ -40,6 +41,11 @@ writeTimeout: 10s
|
||||||
# a lower number allows to save RAM.
|
# a lower number allows to save RAM.
|
||||||
readBufferCount: 512
|
readBufferCount: 512
|
||||||
|
|
||||||
|
# enable a RTMP listener that allows to receive streams with RTMP.
|
||||||
|
rtmpEnable: no
|
||||||
|
# port of the RTMP listener.
|
||||||
|
rtmpPort: 1935
|
||||||
|
|
||||||
# enable Prometheus-compatible metrics on port 9998.
|
# enable Prometheus-compatible metrics on port 9998.
|
||||||
metrics: no
|
metrics: no
|
||||||
# enable pprof on port 9999 to monitor performances.
|
# enable pprof on port 9999 to monitor performances.
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue