1
0
Fork 0
forked from External/mediamtx

move SRT tests into internal/servers/srt (#3037)

This commit is contained in:
Alessandro Ros 2024-02-17 20:42:41 +01:00 committed by GitHub
parent ad58efe47d
commit 9e5de737f0
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
16 changed files with 336 additions and 276 deletions

View file

@ -13,6 +13,7 @@ import (
"github.com/bluenviron/mediamtx/internal/conf" "github.com/bluenviron/mediamtx/internal/conf"
"github.com/bluenviron/mediamtx/internal/logger" "github.com/bluenviron/mediamtx/internal/logger"
"github.com/bluenviron/mediamtx/internal/test"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
@ -23,23 +24,8 @@ func (testParent) Log(_ logger.Level, _ string, _ ...interface{}) {
func (testParent) APIConfigSet(_ *conf.Conf) {} func (testParent) APIConfigSet(_ *conf.Conf) {}
func writeTempFile(byts []byte) (string, error) {
tmpf, err := os.CreateTemp(os.TempDir(), "rtsp-")
if err != nil {
return "", err
}
defer tmpf.Close()
_, err = tmpf.Write(byts)
if err != nil {
return "", err
}
return tmpf.Name(), nil
}
func tempConf(t *testing.T, cnt string) *conf.Conf { func tempConf(t *testing.T, cnt string) *conf.Conf {
fi, err := writeTempFile([]byte(cnt)) fi, err := test.CreateTempFile([]byte(cnt))
require.NoError(t, err) require.NoError(t, err)
defer os.Remove(fi) defer os.Remove(fi)

View file

@ -15,7 +15,7 @@ import (
"github.com/bluenviron/mediamtx/internal/logger" "github.com/bluenviron/mediamtx/internal/logger"
) )
func writeTempFile(byts []byte) (string, error) { func createTempFile(byts []byte) (string, error) {
tmpf, err := os.CreateTemp(os.TempDir(), "rtsp-") tmpf, err := os.CreateTemp(os.TempDir(), "rtsp-")
if err != nil { if err != nil {
return "", err return "", err
@ -32,7 +32,7 @@ func writeTempFile(byts []byte) (string, error) {
func TestConfFromFile(t *testing.T) { func TestConfFromFile(t *testing.T) {
func() { func() {
tmpf, err := writeTempFile([]byte("logLevel: debug\n" + tmpf, err := createTempFile([]byte("logLevel: debug\n" +
"paths:\n" + "paths:\n" +
" cam1:\n" + " cam1:\n" +
" runOnDemandStartTimeout: 5s\n")) " runOnDemandStartTimeout: 5s\n"))
@ -84,7 +84,7 @@ func TestConfFromFile(t *testing.T) {
}() }()
func() { func() {
tmpf, err := writeTempFile([]byte(``)) tmpf, err := createTempFile([]byte(``))
require.NoError(t, err) require.NoError(t, err)
defer os.Remove(tmpf) defer os.Remove(tmpf)
@ -93,7 +93,7 @@ func TestConfFromFile(t *testing.T) {
}() }()
func() { func() {
tmpf, err := writeTempFile([]byte(`paths:`)) tmpf, err := createTempFile([]byte(`paths:`))
require.NoError(t, err) require.NoError(t, err)
defer os.Remove(tmpf) defer os.Remove(tmpf)
@ -102,7 +102,7 @@ func TestConfFromFile(t *testing.T) {
}() }()
func() { func() {
tmpf, err := writeTempFile([]byte( tmpf, err := createTempFile([]byte(
"paths:\n" + "paths:\n" +
" mypath:\n")) " mypath:\n"))
require.NoError(t, err) require.NoError(t, err)
@ -126,7 +126,7 @@ func TestConfFromFileAndEnv(t *testing.T) {
// deprecated path parameter // deprecated path parameter
t.Setenv("MTX_PATHS_CAM2_DISABLEPUBLISHEROVERRIDE", "yes") t.Setenv("MTX_PATHS_CAM2_DISABLEPUBLISHEROVERRIDE", "yes")
tmpf, err := writeTempFile([]byte("{}")) tmpf, err := createTempFile([]byte("{}"))
require.NoError(t, err) require.NoError(t, err)
defer os.Remove(tmpf) defer os.Remove(tmpf)
@ -178,7 +178,7 @@ func TestConfEncryption(t *testing.T) {
t.Setenv("RTSP_CONFKEY", key) t.Setenv("RTSP_CONFKEY", key)
tmpf, err := writeTempFile([]byte(encryptedConf)) tmpf, err := createTempFile([]byte(encryptedConf))
require.NoError(t, err) require.NoError(t, err)
defer os.Remove(tmpf) defer os.Remove(tmpf)
@ -295,7 +295,7 @@ func TestConfErrors(t *testing.T) {
}, },
} { } {
t.Run(ca.name, func(t *testing.T) { t.Run(ca.name, func(t *testing.T) {
tmpf, err := writeTempFile([]byte(ca.conf)) tmpf, err := createTempFile([]byte(ca.conf))
require.NoError(t, err) require.NoError(t, err)
defer os.Remove(tmpf) defer os.Remove(tmpf)
@ -325,7 +325,7 @@ func TestSampleConfFile(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, "../../mediamtx.yml", confPath1) require.Equal(t, "../../mediamtx.yml", confPath1)
tmpf, err := writeTempFile([]byte("paths:\n all_others:")) tmpf, err := createTempFile([]byte("paths:\n all_others:"))
require.NoError(t, err) require.NoError(t, err)
defer os.Remove(tmpf) defer os.Remove(tmpf)

View file

@ -5,31 +5,17 @@ import (
"testing" "testing"
"time" "time"
"github.com/bluenviron/mediamtx/internal/test"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
func writeTempFile(byts []byte) (string, error) {
tmpf, err := os.CreateTemp(os.TempDir(), "confwatcher-")
if err != nil {
return "", err
}
defer tmpf.Close()
_, err = tmpf.Write(byts)
if err != nil {
return "", err
}
return tmpf.Name(), nil
}
func TestNoFile(t *testing.T) { func TestNoFile(t *testing.T) {
_, err := New("/nonexistent") _, err := New("/nonexistent")
require.Error(t, err) require.Error(t, err)
} }
func TestWrite(t *testing.T) { func TestWrite(t *testing.T) {
fpath, err := writeTempFile([]byte("{}")) fpath, err := test.CreateTempFile([]byte("{}"))
require.NoError(t, err) require.NoError(t, err)
w, err := New(fpath) w, err := New(fpath)
@ -54,7 +40,7 @@ func TestWrite(t *testing.T) {
} }
func TestWriteMultipleTimes(t *testing.T) { func TestWriteMultipleTimes(t *testing.T) {
fpath, err := writeTempFile([]byte("{}")) fpath, err := test.CreateTempFile([]byte("{}"))
require.NoError(t, err) require.NoError(t, err)
w, err := New(fpath) w, err := New(fpath)
@ -97,7 +83,7 @@ func TestWriteMultipleTimes(t *testing.T) {
} }
func TestDeleteCreate(t *testing.T) { func TestDeleteCreate(t *testing.T) {
fpath, err := writeTempFile([]byte("{}")) fpath, err := test.CreateTempFile([]byte("{}"))
require.NoError(t, err) require.NoError(t, err)
w, err := New(fpath) w, err := New(fpath)
@ -125,7 +111,7 @@ func TestDeleteCreate(t *testing.T) {
} }
func TestSymlinkDeleteCreate(t *testing.T) { func TestSymlinkDeleteCreate(t *testing.T) {
fpath, err := writeTempFile([]byte("{}")) fpath, err := test.CreateTempFile([]byte("{}"))
require.NoError(t, err) require.NoError(t, err)
err = os.Symlink(fpath, fpath+"-sym") err = os.Symlink(fpath, fpath+"-sym")

View file

@ -149,11 +149,11 @@ func TestAPIPathsList(t *testing.T) {
}) })
t.Run("rtsps session", func(t *testing.T) { t.Run("rtsps session", func(t *testing.T) {
serverCertFpath, err := writeTempFile(test.TLSCertPub) serverCertFpath, err := test.CreateTempFile(test.TLSCertPub)
require.NoError(t, err) require.NoError(t, err)
defer os.Remove(serverCertFpath) defer os.Remove(serverCertFpath)
serverKeyFpath, err := writeTempFile(test.TLSCertKey) serverKeyFpath, err := test.CreateTempFile(test.TLSCertKey)
require.NoError(t, err) require.NoError(t, err)
defer os.Remove(serverKeyFpath) defer os.Remove(serverKeyFpath)
@ -340,11 +340,11 @@ func TestAPIPathsGet(t *testing.T) {
} }
func TestAPIProtocolListGet(t *testing.T) { func TestAPIProtocolListGet(t *testing.T) {
serverCertFpath, err := writeTempFile(test.TLSCertPub) serverCertFpath, err := test.CreateTempFile(test.TLSCertPub)
require.NoError(t, err) require.NoError(t, err)
defer os.Remove(serverCertFpath) defer os.Remove(serverCertFpath)
serverKeyFpath, err := writeTempFile(test.TLSCertKey) serverKeyFpath, err := test.CreateTempFile(test.TLSCertKey)
require.NoError(t, err) require.NoError(t, err)
defer os.Remove(serverKeyFpath) defer os.Remove(serverKeyFpath)
@ -813,11 +813,11 @@ func TestAPIProtocolListGet(t *testing.T) {
} }
func TestAPIProtocolGetNotFound(t *testing.T) { func TestAPIProtocolGetNotFound(t *testing.T) {
serverCertFpath, err := writeTempFile(test.TLSCertPub) serverCertFpath, err := test.CreateTempFile(test.TLSCertPub)
require.NoError(t, err) require.NoError(t, err)
defer os.Remove(serverCertFpath) defer os.Remove(serverCertFpath)
serverKeyFpath, err := writeTempFile(test.TLSCertKey) serverKeyFpath, err := test.CreateTempFile(test.TLSCertKey)
require.NoError(t, err) require.NoError(t, err)
defer os.Remove(serverKeyFpath) defer os.Remove(serverKeyFpath)
@ -913,11 +913,11 @@ func TestAPIProtocolGetNotFound(t *testing.T) {
} }
func TestAPIProtocolKick(t *testing.T) { func TestAPIProtocolKick(t *testing.T) {
serverCertFpath, err := writeTempFile(test.TLSCertPub) serverCertFpath, err := test.CreateTempFile(test.TLSCertPub)
require.NoError(t, err) require.NoError(t, err)
defer os.Remove(serverCertFpath) defer os.Remove(serverCertFpath)
serverKeyFpath, err := writeTempFile(test.TLSCertKey) serverKeyFpath, err := test.CreateTempFile(test.TLSCertKey)
require.NoError(t, err) require.NoError(t, err)
defer os.Remove(serverKeyFpath) defer os.Remove(serverKeyFpath)
@ -1060,11 +1060,11 @@ func TestAPIProtocolKick(t *testing.T) {
} }
func TestAPIProtocolKickNotFound(t *testing.T) { func TestAPIProtocolKickNotFound(t *testing.T) {
serverCertFpath, err := writeTempFile(test.TLSCertPub) serverCertFpath, err := test.CreateTempFile(test.TLSCertPub)
require.NoError(t, err) require.NoError(t, err)
defer os.Remove(serverCertFpath) defer os.Remove(serverCertFpath)
serverKeyFpath, err := writeTempFile(test.TLSCertKey) serverKeyFpath, err := test.CreateTempFile(test.TLSCertKey)
require.NoError(t, err) require.NoError(t, err)
defer os.Remove(serverKeyFpath) defer os.Remove(serverKeyFpath)

View file

@ -8,30 +8,16 @@ import (
"github.com/bluenviron/gortsplib/v4" "github.com/bluenviron/gortsplib/v4"
"github.com/bluenviron/gortsplib/v4/pkg/description" "github.com/bluenviron/gortsplib/v4/pkg/description"
"github.com/bluenviron/mediamtx/internal/test"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
func writeTempFile(byts []byte) (string, error) {
tmpf, err := os.CreateTemp(os.TempDir(), "rtsp-")
if err != nil {
return "", err
}
defer tmpf.Close()
_, err = tmpf.Write(byts)
if err != nil {
return "", err
}
return tmpf.Name(), nil
}
func newInstance(conf string) (*Core, bool) { func newInstance(conf string) (*Core, bool) {
if conf == "" { if conf == "" {
return New([]string{}) return New([]string{})
} }
tmpf, err := writeTempFile([]byte(conf)) tmpf, err := test.CreateTempFile([]byte(conf))
if err != nil { if err != nil {
return nil, false return nil, false
} }

View file

@ -42,11 +42,11 @@ func httpPullFile(t *testing.T, hc *http.Client, u string) []byte {
} }
func TestMetrics(t *testing.T) { func TestMetrics(t *testing.T) {
serverCertFpath, err := writeTempFile(test.TLSCertPub) serverCertFpath, err := test.CreateTempFile(test.TLSCertPub)
require.NoError(t, err) require.NoError(t, err)
defer os.Remove(serverCertFpath) defer os.Remove(serverCertFpath)
serverKeyFpath, err := writeTempFile(test.TLSCertKey) serverKeyFpath, err := test.CreateTempFile(test.TLSCertKey)
require.NoError(t, err) require.NoError(t, err)
defer os.Remove(serverKeyFpath) defer os.Remove(serverKeyFpath)

View file

@ -1,129 +0,0 @@
package core
import (
"bufio"
"testing"
"time"
"github.com/bluenviron/mediacommon/pkg/formats/mpegts"
"github.com/bluenviron/mediamtx/internal/test"
srt "github.com/datarhei/gosrt"
"github.com/stretchr/testify/require"
)
func TestSRTServer(t *testing.T) {
for _, ca := range []string{
"no passphrase",
"publish passphrase",
"read passphrase",
} {
t.Run(ca, func(t *testing.T) {
conf := "paths:\n" +
" all_others:\n"
switch ca {
case "publish passphrase":
conf += " srtPublishPassphrase: 123456789abcde"
case "read passphrase":
conf += " srtReadPassphrase: 123456789abcde"
}
p, ok := newInstance(conf)
require.Equal(t, true, ok)
defer p.Close()
u := "srt://localhost:8890?streamid=publish:mypath"
if ca == "publish passphrase" {
u += "&passphrase=123456789abcde"
}
srtConf := srt.DefaultConfig()
address, err := srtConf.UnmarshalURL(u)
require.NoError(t, err)
err = srtConf.Validate()
require.NoError(t, err)
publisher, err := srt.Dial("srt", address, srtConf)
require.NoError(t, err)
defer publisher.Close()
track := &mpegts.Track{
Codec: &mpegts.CodecH264{},
}
bw := bufio.NewWriter(publisher)
w := mpegts.NewWriter(bw, []*mpegts.Track{track})
require.NoError(t, err)
err = w.WriteH26x(track, 0, 0, true, [][]byte{
test.FormatH264.SPS,
test.FormatH264.PPS,
{0x05, 1}, // IDR
})
require.NoError(t, err)
err = bw.Flush()
require.NoError(t, err)
time.Sleep(500 * time.Millisecond)
u = "srt://localhost:8890?streamid=read:mypath"
if ca == "read passphrase" {
u += "&passphrase=123456789abcde"
}
srtConf = srt.DefaultConfig()
address, err = srtConf.UnmarshalURL(u)
require.NoError(t, err)
err = srtConf.Validate()
require.NoError(t, err)
reader, err := srt.Dial("srt", address, srtConf)
require.NoError(t, err)
defer reader.Close()
err = w.WriteH26x(track, 2*90000, 1*90000, true, [][]byte{
{ // IDR
0x05, 2,
},
})
require.NoError(t, err)
err = bw.Flush()
require.NoError(t, err)
r, err := mpegts.NewReader(reader)
require.NoError(t, err)
require.Equal(t, []*mpegts.Track{{
PID: 256,
Codec: &mpegts.CodecH264{},
}}, r.Tracks())
received := false
r.OnDataH26x(r.Tracks()[0], func(pts int64, dts int64, au [][]byte) error {
require.Equal(t, int64(0), pts)
require.Equal(t, int64(0), dts)
require.Equal(t, [][]byte{
test.FormatH264.SPS,
test.FormatH264.PPS,
{0x05, 1},
}, au) // IDR
received = true
return nil
})
for {
err = r.Read()
require.NoError(t, err)
if received {
break
}
}
})
}
}

View file

@ -55,11 +55,11 @@ func TestRTSPServerPublishRead(t *testing.T) {
proto = "rtsps" proto = "rtsps"
port = "8322" port = "8322"
serverCertFpath, err := writeTempFile(test.TLSCertPub) serverCertFpath, err := test.CreateTempFile(test.TLSCertPub)
require.NoError(t, err) require.NoError(t, err)
defer os.Remove(serverCertFpath) defer os.Remove(serverCertFpath)
serverKeyFpath, err := writeTempFile(test.TLSCertKey) serverKeyFpath, err := test.CreateTempFile(test.TLSCertKey)
require.NoError(t, err) require.NoError(t, err)
defer os.Remove(serverKeyFpath) defer os.Remove(serverKeyFpath)

View file

@ -10,29 +10,15 @@ import (
"time" "time"
"github.com/bluenviron/mediamtx/internal/core" "github.com/bluenviron/mediamtx/internal/core"
"github.com/bluenviron/mediamtx/internal/test"
) )
func writeTempFile(byts []byte) (string, error) {
tmpf, err := os.CreateTemp(os.TempDir(), "rtsp-")
if err != nil {
return "", err
}
defer tmpf.Close()
_, err = tmpf.Write(byts)
if err != nil {
return "", err
}
return tmpf.Name(), nil
}
func newInstance(conf string) (*core.Core, bool) { func newInstance(conf string) (*core.Core, bool) {
if conf == "" { if conf == "" {
return core.New([]string{}) return core.New([]string{})
} }
tmpf, err := writeTempFile([]byte(conf)) tmpf, err := test.CreateTempFile([]byte(conf))
if err != nil { if err != nil {
return nil, false return nil, false
} }

View file

@ -21,21 +21,6 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
func writeTempFile(byts []byte) (string, error) {
tmpf, err := os.CreateTemp(os.TempDir(), "rtsp-")
if err != nil {
return "", err
}
defer tmpf.Close()
_, err = tmpf.Write(byts)
if err != nil {
return "", err
}
return tmpf.Name(), nil
}
type dummyPath struct { type dummyPath struct {
stream *stream.Stream stream *stream.Stream
streamCreated chan struct{} streamCreated chan struct{}
@ -100,11 +85,11 @@ func TestServerPublish(t *testing.T) {
if encrypt == "tls" { if encrypt == "tls" {
var err error var err error
serverCertFpath, err = writeTempFile(test.TLSCertPub) serverCertFpath, err = test.CreateTempFile(test.TLSCertPub)
require.NoError(t, err) require.NoError(t, err)
defer os.Remove(serverCertFpath) defer os.Remove(serverCertFpath)
serverKeyFpath, err = writeTempFile(test.TLSCertKey) serverKeyFpath, err = test.CreateTempFile(test.TLSCertKey)
require.NoError(t, err) require.NoError(t, err)
defer os.Remove(serverKeyFpath) defer os.Remove(serverKeyFpath)
} }
@ -178,9 +163,7 @@ func TestServerPublish(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
aw.Start() aw.Start()
<-recv <-recv
aw.Stop() aw.Stop()
}) })
} }
@ -197,11 +180,11 @@ func TestServerRead(t *testing.T) {
if encrypt == "tls" { if encrypt == "tls" {
var err error var err error
serverCertFpath, err = writeTempFile(test.TLSCertPub) serverCertFpath, err = test.CreateTempFile(test.TLSCertPub)
require.NoError(t, err) require.NoError(t, err)
defer os.Remove(serverCertFpath) defer os.Remove(serverCertFpath)
serverKeyFpath, err = writeTempFile(test.TLSCertKey) serverKeyFpath, err = test.CreateTempFile(test.TLSCertKey)
require.NoError(t, err) require.NoError(t, err)
defer os.Remove(serverKeyFpath) defer os.Remove(serverKeyFpath)
} }

View file

@ -65,7 +65,7 @@ type conn struct {
runOnDisconnect string runOnDisconnect string
wg *sync.WaitGroup wg *sync.WaitGroup
externalCmdPool *externalcmd.Pool externalCmdPool *externalcmd.Pool
pathManager defs.PathManager pathManager serverPathManager
parent *Server parent *Server
ctx context.Context ctx context.Context

View file

@ -16,6 +16,7 @@ import (
"github.com/bluenviron/mediamtx/internal/defs" "github.com/bluenviron/mediamtx/internal/defs"
"github.com/bluenviron/mediamtx/internal/externalcmd" "github.com/bluenviron/mediamtx/internal/externalcmd"
"github.com/bluenviron/mediamtx/internal/logger" "github.com/bluenviron/mediamtx/internal/logger"
"github.com/bluenviron/mediamtx/internal/stream"
) )
// ErrConnNotFound is returned when a connection is not found. // ErrConnNotFound is returned when a connection is not found.
@ -58,6 +59,11 @@ type serverAPIConnsKickReq struct {
res chan serverAPIConnsKickRes res chan serverAPIConnsKickRes
} }
type serverPathManager interface {
AddPublisher(req defs.PathAddPublisherReq) (defs.Path, error)
AddReader(req defs.PathAddReaderReq) (defs.Path, *stream.Stream, error)
}
type serverParent interface { type serverParent interface {
logger.Writer logger.Writer
} }
@ -74,7 +80,7 @@ type Server struct {
RunOnConnectRestart bool RunOnConnectRestart bool
RunOnDisconnect string RunOnDisconnect string
ExternalCmdPool *externalcmd.Pool ExternalCmdPool *externalcmd.Pool
PathManager defs.PathManager PathManager serverPathManager
Parent serverParent Parent serverParent
ctx context.Context ctx context.Context

View file

@ -0,0 +1,267 @@
package srt
import (
"bufio"
"testing"
"time"
"github.com/bluenviron/gortsplib/v4/pkg/description"
"github.com/bluenviron/gortsplib/v4/pkg/format"
"github.com/bluenviron/mediacommon/pkg/formats/mpegts"
"github.com/bluenviron/mediamtx/internal/asyncwriter"
"github.com/bluenviron/mediamtx/internal/conf"
"github.com/bluenviron/mediamtx/internal/defs"
"github.com/bluenviron/mediamtx/internal/externalcmd"
"github.com/bluenviron/mediamtx/internal/stream"
"github.com/bluenviron/mediamtx/internal/test"
"github.com/bluenviron/mediamtx/internal/unit"
srt "github.com/datarhei/gosrt"
"github.com/stretchr/testify/require"
)
type dummyPath struct {
stream *stream.Stream
streamCreated chan struct{}
}
func (p *dummyPath) Name() string {
return "teststream"
}
func (p *dummyPath) SafeConf() *conf.Path {
return &conf.Path{}
}
func (p *dummyPath) ExternalCmdEnv() externalcmd.Environment {
return externalcmd.Environment{}
}
func (p *dummyPath) StartPublisher(req defs.PathStartPublisherReq) (*stream.Stream, error) {
var err error
p.stream, err = stream.New(
1460,
req.Desc,
true,
test.NilLogger{},
)
if err != nil {
return nil, err
}
close(p.streamCreated)
return p.stream, nil
}
func (p *dummyPath) StopPublisher(_ defs.PathStopPublisherReq) {
}
func (p *dummyPath) RemovePublisher(_ defs.PathRemovePublisherReq) {
}
func (p *dummyPath) RemoveReader(_ defs.PathRemoveReaderReq) {
}
type dummyPathManager struct {
path *dummyPath
}
func (pm *dummyPathManager) AddPublisher(_ defs.PathAddPublisherReq) (defs.Path, error) {
return pm.path, nil
}
func (pm *dummyPathManager) AddReader(_ defs.PathAddReaderReq) (defs.Path, *stream.Stream, error) {
return pm.path, pm.path.stream, nil
}
func TestServerPublish(t *testing.T) {
externalCmdPool := externalcmd.NewPool()
defer externalCmdPool.Close()
path := &dummyPath{
streamCreated: make(chan struct{}),
}
pathManager := &dummyPathManager{path: path}
s := &Server{
Address: "127.0.0.1:8890",
RTSPAddress: "",
ReadTimeout: conf.StringDuration(10 * time.Second),
WriteTimeout: conf.StringDuration(10 * time.Second),
WriteQueueSize: 512,
UDPMaxPayloadSize: 1472,
RunOnConnect: "",
RunOnConnectRestart: false,
RunOnDisconnect: "string",
ExternalCmdPool: externalCmdPool,
PathManager: pathManager,
Parent: &test.NilLogger{},
}
err := s.Initialize()
require.NoError(t, err)
defer s.Close()
u := "srt://localhost:8890?streamid=publish:mypath"
srtConf := srt.DefaultConfig()
address, err := srtConf.UnmarshalURL(u)
require.NoError(t, err)
err = srtConf.Validate()
require.NoError(t, err)
publisher, err := srt.Dial("srt", address, srtConf)
require.NoError(t, err)
defer publisher.Close()
track := &mpegts.Track{
Codec: &mpegts.CodecH264{},
}
bw := bufio.NewWriter(publisher)
w := mpegts.NewWriter(bw, []*mpegts.Track{track})
require.NoError(t, err)
err = w.WriteH26x(track, 0, 0, true, [][]byte{
test.FormatH264.SPS,
test.FormatH264.PPS,
{0x05, 1}, // IDR
})
require.NoError(t, err)
err = bw.Flush()
require.NoError(t, err)
<-path.streamCreated
aw := asyncwriter.New(512, &test.NilLogger{})
recv := make(chan struct{})
path.stream.AddReader(aw,
path.stream.Desc().Medias[0],
path.stream.Desc().Medias[0].Formats[0],
func(u unit.Unit) error {
require.Equal(t, [][]byte{
test.FormatH264.SPS,
test.FormatH264.PPS,
{0x05, 1}, // IDR
}, u.(*unit.H264).AU)
close(recv)
return nil
})
err = w.WriteH26x(track, 0, 0, true, [][]byte{
{5, 2},
})
require.NoError(t, err)
err = bw.Flush()
require.NoError(t, err)
aw.Start()
<-recv
aw.Stop()
}
func TestServerRead(t *testing.T) {
externalCmdPool := externalcmd.NewPool()
defer externalCmdPool.Close()
testMediaH264 := &description.Media{
Type: description.MediaTypeVideo,
Formats: []format.Format{test.FormatH264},
}
desc := &description.Session{Medias: []*description.Media{testMediaH264}}
stream, err := stream.New(
1460,
desc,
true,
test.NilLogger{},
)
require.NoError(t, err)
path := &dummyPath{stream: stream}
pathManager := &dummyPathManager{path: path}
s := &Server{
Address: "127.0.0.1:8890",
RTSPAddress: "",
ReadTimeout: conf.StringDuration(10 * time.Second),
WriteTimeout: conf.StringDuration(10 * time.Second),
WriteQueueSize: 512,
UDPMaxPayloadSize: 1472,
RunOnConnect: "",
RunOnConnectRestart: false,
RunOnDisconnect: "string",
ExternalCmdPool: externalCmdPool,
PathManager: pathManager,
Parent: &test.NilLogger{},
}
err = s.Initialize()
require.NoError(t, err)
defer s.Close()
u := "srt://localhost:8890?streamid=read:mypath"
srtConf := srt.DefaultConfig()
address, err := srtConf.UnmarshalURL(u)
require.NoError(t, err)
err = srtConf.Validate()
require.NoError(t, err)
reader, err := srt.Dial("srt", address, srtConf)
require.NoError(t, err)
defer reader.Close()
stream.WriteUnit(desc.Medias[0], desc.Medias[0].Formats[0], &unit.H264{
Base: unit.Base{
NTP: time.Time{},
},
AU: [][]byte{
{5, 1}, // IDR
},
})
r, err := mpegts.NewReader(reader)
require.NoError(t, err)
require.Equal(t, []*mpegts.Track{{
PID: 256,
Codec: &mpegts.CodecH264{},
}}, r.Tracks())
received := false
r.OnDataH26x(r.Tracks()[0], func(pts int64, dts int64, au [][]byte) error {
require.Equal(t, int64(0), pts)
require.Equal(t, int64(0), dts)
require.Equal(t, [][]byte{
test.FormatH264.SPS,
test.FormatH264.PPS,
{0x05, 1},
}, au)
received = true
return nil
})
stream.WriteUnit(desc.Medias[0], desc.Medias[0].Formats[0], &unit.H264{
Base: unit.Base{
NTP: time.Time{},
},
AU: [][]byte{
{5, 2},
},
})
for {
err = r.Read()
require.NoError(t, err)
if received {
break
}
}
}

View file

@ -15,21 +15,6 @@ import (
"github.com/bluenviron/mediamtx/internal/test" "github.com/bluenviron/mediamtx/internal/test"
) )
func writeTempFile(byts []byte) (string, error) {
tmpf, err := os.CreateTemp(os.TempDir(), "rtsp-")
if err != nil {
return "", err
}
defer tmpf.Close()
_, err = tmpf.Write(byts)
if err != nil {
return "", err
}
return tmpf.Name(), nil
}
func TestSource(t *testing.T) { func TestSource(t *testing.T) {
for _, ca := range []string{ for _, ca := range []string{
"plain", "plain",
@ -41,11 +26,11 @@ func TestSource(t *testing.T) {
return net.Listen("tcp", "127.0.0.1:1935") return net.Listen("tcp", "127.0.0.1:1935")
} }
serverCertFpath, err := writeTempFile(test.TLSCertPub) serverCertFpath, err := test.CreateTempFile(test.TLSCertPub)
require.NoError(t, err) require.NoError(t, err)
defer os.Remove(serverCertFpath) defer os.Remove(serverCertFpath)
serverKeyFpath, err := writeTempFile(test.TLSCertKey) serverKeyFpath, err := test.CreateTempFile(test.TLSCertKey)
require.NoError(t, err) require.NoError(t, err)
defer os.Remove(serverKeyFpath) defer os.Remove(serverKeyFpath)

View file

@ -19,21 +19,6 @@ import (
"github.com/bluenviron/mediamtx/internal/test" "github.com/bluenviron/mediamtx/internal/test"
) )
func writeTempFile(byts []byte) (string, error) {
tmpf, err := os.CreateTemp(os.TempDir(), "rtsp-")
if err != nil {
return "", err
}
defer tmpf.Close()
_, err = tmpf.Write(byts)
if err != nil {
return "", err
}
return tmpf.Name(), nil
}
type testServer struct { type testServer struct {
onDescribe func(*gortsplib.ServerHandlerOnDescribeCtx) (*base.Response, *gortsplib.ServerStream, error) onDescribe func(*gortsplib.ServerHandlerOnDescribeCtx) (*base.Response, *gortsplib.ServerStream, error)
onSetup func(*gortsplib.ServerHandlerOnSetupCtx) (*base.Response, *gortsplib.ServerStream, error) onSetup func(*gortsplib.ServerHandlerOnSetupCtx) (*base.Response, *gortsplib.ServerStream, error)
@ -124,11 +109,11 @@ func TestSource(t *testing.T) {
s.UDPRTCPAddress = "127.0.0.1:8003" s.UDPRTCPAddress = "127.0.0.1:8003"
case "tls": case "tls":
serverCertFpath, err := writeTempFile(test.TLSCertPub) serverCertFpath, err := test.CreateTempFile(test.TLSCertPub)
require.NoError(t, err) require.NoError(t, err)
defer os.Remove(serverCertFpath) defer os.Remove(serverCertFpath)
serverKeyFpath, err := writeTempFile(test.TLSCertKey) serverKeyFpath, err := test.CreateTempFile(test.TLSCertKey)
require.NoError(t, err) require.NoError(t, err)
defer os.Remove(serverKeyFpath) defer os.Remove(serverKeyFpath)

View file

@ -0,0 +1,19 @@
package test
import "os"
// CreateTempFile creates a temporary file with given content.
func CreateTempFile(byts []byte) (string, error) {
tmpf, err := os.CreateTemp(os.TempDir(), "rtsp-")
if err != nil {
return "", err
}
defer tmpf.Close()
_, err = tmpf.Write(byts)
if err != nil {
return "", err
}
return tmpf.Name(), nil
}