improve tests (#4581)
Some checks are pending
code_lint / golangci_lint (push) Waiting to run
code_lint / mod_tidy (push) Waiting to run
code_lint / api_docs (push) Waiting to run
code_test / test_64 (push) Waiting to run
code_test / test_32 (push) Waiting to run
code_test / test_e2e (push) Waiting to run

This commit is contained in:
Alessandro Ros 2025-05-29 20:37:44 +02:00 committed by GitHub
parent 402b8ff9c9
commit 189c99d5fc
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -12,6 +12,7 @@ import (
"github.com/bluenviron/gohlslib/v2" "github.com/bluenviron/gohlslib/v2"
"github.com/bluenviron/gohlslib/v2/pkg/codecs" "github.com/bluenviron/gohlslib/v2/pkg/codecs"
"github.com/bluenviron/gortsplib/v4/pkg/description" "github.com/bluenviron/gortsplib/v4/pkg/description"
"github.com/bluenviron/mediacommon/v2/pkg/codecs/mpeg4audio"
"github.com/bluenviron/mediamtx/internal/conf" "github.com/bluenviron/mediamtx/internal/conf"
"github.com/bluenviron/mediamtx/internal/defs" "github.com/bluenviron/mediamtx/internal/defs"
"github.com/bluenviron/mediamtx/internal/externalcmd" "github.com/bluenviron/mediamtx/internal/externalcmd"
@ -69,7 +70,7 @@ func (pa *dummyPath) RemovePublisher(_ defs.PathRemovePublisherReq) {
func (pa *dummyPath) RemoveReader(_ defs.PathRemoveReaderReq) { func (pa *dummyPath) RemoveReader(_ defs.PathRemoveReaderReq) {
} }
func TestPreflightRequest(t *testing.T) { func TestServerPreflightRequest(t *testing.T) {
s := &Server{ s := &Server{
Address: "127.0.0.1:8888", Address: "127.0.0.1:8888",
AllowOrigin: "*", AllowOrigin: "*",
@ -173,208 +174,240 @@ func TestServerNotFound(t *testing.T) {
} }
func TestServerRead(t *testing.T) { func TestServerRead(t *testing.T) {
t.Run("always remux off", func(t *testing.T) { for _, ca := range []string{
desc := &description.Session{Medias: []*description.Media{test.MediaH264}} "always remux off",
"always remux on",
} {
t.Run(ca, func(t *testing.T) {
desc := &description.Session{Medias: []*description.Media{
test.MediaH264,
test.MediaMPEG4Audio,
}}
strm := &stream.Stream{ strm := &stream.Stream{
WriteQueueSize: 512, WriteQueueSize: 512,
UDPMaxPayloadSize: 1472, UDPMaxPayloadSize: 1472,
Desc: desc, Desc: desc,
GenerateRTPPackets: true, GenerateRTPPackets: true,
Parent: test.NilLogger, Parent: test.NilLogger,
}
err := strm.Initialize()
require.NoError(t, err)
pm := &dummyPathManager{
findPathConfImpl: func(req defs.PathFindPathConfReq) (*conf.Path, error) {
require.Equal(t, "teststream", req.AccessRequest.Name)
require.Equal(t, "param=value", req.AccessRequest.Query)
require.Equal(t, "myuser", req.AccessRequest.Credentials.User)
require.Equal(t, "mypass", req.AccessRequest.Credentials.Pass)
return &conf.Path{}, nil
},
addReaderImpl: func(req defs.PathAddReaderReq) (defs.Path, *stream.Stream, error) {
require.Equal(t, "teststream", req.AccessRequest.Name)
require.Equal(t, "param=value", req.AccessRequest.Query)
return &dummyPath{}, strm, nil
},
}
s := &Server{
Address: "127.0.0.1:8888",
Encryption: false,
ServerKey: "",
ServerCert: "",
AlwaysRemux: false,
Variant: conf.HLSVariant(gohlslib.MuxerVariantMPEGTS),
SegmentCount: 7,
SegmentDuration: conf.Duration(1 * time.Second),
PartDuration: conf.Duration(200 * time.Millisecond),
SegmentMaxSize: 50 * 1024 * 1024,
AllowOrigin: "",
TrustedProxies: conf.IPNetworks{},
Directory: "",
ReadTimeout: conf.Duration(10 * time.Second),
PathManager: pm,
Parent: test.NilLogger,
}
err = s.Initialize()
require.NoError(t, err)
defer s.Close()
c := &gohlslib.Client{
URI: "http://myuser:mypass@127.0.0.1:8888/teststream/index.m3u8?param=value",
}
recv := make(chan struct{})
c.OnTracks = func(tracks []*gohlslib.Track) error {
require.Equal(t, []*gohlslib.Track{{
Codec: &codecs.H264{},
ClockRate: 90000,
}}, tracks)
c.OnDataH26x(tracks[0], func(pts, dts int64, au [][]byte) {
require.Equal(t, int64(0), pts)
require.Equal(t, int64(0), dts)
require.Equal(t, [][]byte{
test.FormatH264.SPS,
test.FormatH264.PPS,
{5, 1},
}, au)
close(recv)
})
return nil
}
err = c.Start()
require.NoError(t, err)
defer func() { <-c.Wait() }()
defer c.Close()
go func() {
time.Sleep(100 * time.Millisecond)
for i := 0; i < 4; i++ {
strm.WriteUnit(test.MediaH264, test.FormatH264, &unit.H264{
Base: unit.Base{
NTP: time.Time{},
PTS: int64(i) * 90000,
},
AU: [][]byte{
{5, 1}, // IDR
},
})
} }
}() err := strm.Initialize()
require.NoError(t, err)
<-recv pm := &dummyPathManager{
}) findPathConfImpl: func(req defs.PathFindPathConfReq) (*conf.Path, error) {
require.Equal(t, "teststream", req.AccessRequest.Name)
t.Run("always remux on", func(t *testing.T) { require.Equal(t, "param=value", req.AccessRequest.Query)
desc := &description.Session{Medias: []*description.Media{test.MediaH264}} require.Equal(t, "myuser", req.AccessRequest.Credentials.User)
require.Equal(t, "mypass", req.AccessRequest.Credentials.Pass)
strm := &stream.Stream{ return &conf.Path{}, nil
WriteQueueSize: 512,
UDPMaxPayloadSize: 1472,
Desc: desc,
GenerateRTPPackets: true,
Parent: test.NilLogger,
}
err := strm.Initialize()
require.NoError(t, err)
pm := &dummyPathManager{
findPathConfImpl: func(req defs.PathFindPathConfReq) (*conf.Path, error) {
require.Equal(t, "teststream", req.AccessRequest.Name)
require.Equal(t, "param=value", req.AccessRequest.Query)
require.Equal(t, "myuser", req.AccessRequest.Credentials.User)
require.Equal(t, "mypass", req.AccessRequest.Credentials.Pass)
return &conf.Path{}, nil
},
addReaderImpl: func(req defs.PathAddReaderReq) (defs.Path, *stream.Stream, error) {
require.Equal(t, "teststream", req.AccessRequest.Name)
require.Equal(t, "", req.AccessRequest.Query)
return &dummyPath{}, strm, nil
},
}
s := &Server{
Address: "127.0.0.1:8888",
Encryption: false,
ServerKey: "",
ServerCert: "",
AlwaysRemux: true,
Variant: conf.HLSVariant(gohlslib.MuxerVariantMPEGTS),
SegmentCount: 7,
SegmentDuration: conf.Duration(1 * time.Second),
PartDuration: conf.Duration(200 * time.Millisecond),
SegmentMaxSize: 50 * 1024 * 1024,
AllowOrigin: "",
TrustedProxies: conf.IPNetworks{},
Directory: "",
ReadTimeout: conf.Duration(10 * time.Second),
PathManager: pm,
Parent: test.NilLogger,
}
err = s.Initialize()
require.NoError(t, err)
defer s.Close()
s.PathReady(&dummyPath{})
strm.WaitRunningReader()
for i := 0; i < 4; i++ {
strm.WriteUnit(test.MediaH264, test.FormatH264, &unit.H264{
Base: unit.Base{
NTP: time.Time{},
PTS: int64(i) * 90000,
}, },
AU: [][]byte{ addReaderImpl: func(req defs.PathAddReaderReq) (defs.Path, *stream.Stream, error) {
{5, 1}, // IDR require.Equal(t, "teststream", req.AccessRequest.Name)
if ca == "always remux off" {
require.Equal(t, "param=value", req.AccessRequest.Query)
} else {
require.Equal(t, "", req.AccessRequest.Query)
}
return &dummyPath{}, strm, nil
}, },
}) }
}
c := &gohlslib.Client{ switch ca {
URI: "http://myuser:mypass@127.0.0.1:8888/teststream/index.m3u8?param=value", case "always remux off":
} s := &Server{
Address: "127.0.0.1:8888",
AlwaysRemux: false,
Variant: conf.HLSVariant(gohlslib.MuxerVariantMPEGTS),
SegmentCount: 7,
SegmentDuration: conf.Duration(1 * time.Second),
PartDuration: conf.Duration(200 * time.Millisecond),
SegmentMaxSize: 50 * 1024 * 1024,
TrustedProxies: conf.IPNetworks{},
ReadTimeout: conf.Duration(10 * time.Second),
PathManager: pm,
Parent: test.NilLogger,
}
err = s.Initialize()
require.NoError(t, err)
defer s.Close()
recv := make(chan struct{}) c := &gohlslib.Client{
URI: "http://myuser:mypass@127.0.0.1:8888/teststream/index.m3u8?param=value",
}
c.OnTracks = func(tracks []*gohlslib.Track) error { recv1 := make(chan struct{})
require.Equal(t, []*gohlslib.Track{{ recv2 := make(chan struct{})
Codec: &codecs.H264{},
ClockRate: 90000,
}}, tracks)
c.OnDataH26x(tracks[0], func(pts, dts int64, au [][]byte) { c.OnTracks = func(tracks []*gohlslib.Track) error { //nolint:dupl
require.Equal(t, int64(0), pts) require.Equal(t, []*gohlslib.Track{
require.Equal(t, int64(0), dts) {
require.Equal(t, [][]byte{ Codec: &codecs.H264{},
test.FormatH264.SPS, ClockRate: 90000,
test.FormatH264.PPS, },
{5, 1}, {
}, au) Codec: &codecs.MPEG4Audio{
close(recv) Config: mpeg4audio.AudioSpecificConfig{
}) Type: 2,
ChannelCount: 2,
SampleRate: 44100,
},
},
ClockRate: 90000,
},
}, tracks)
return nil c.OnDataH26x(tracks[0], func(pts, dts int64, au [][]byte) {
} require.Equal(t, int64(0), pts)
require.Equal(t, int64(0), dts)
require.Equal(t, [][]byte{
test.FormatH264.SPS,
test.FormatH264.PPS,
{5, 1},
}, au)
close(recv1)
})
err = c.Start() c.OnDataMPEG4Audio(tracks[1], func(pts int64, aus [][]byte) {
require.NoError(t, err) require.Equal(t, int64(0), pts)
defer func() { <-c.Wait() }() require.Equal(t, [][]byte{{1, 2}}, aus)
defer c.Close() close(recv2)
})
<-recv return nil
}) }
err = c.Start()
require.NoError(t, err)
defer func() { <-c.Wait() }()
defer c.Close()
time.Sleep(100 * time.Millisecond)
for i := 0; i < 4; i++ {
strm.WriteUnit(test.MediaH264, test.FormatH264, &unit.H264{
Base: unit.Base{
NTP: time.Time{},
PTS: int64(i) * 90000,
},
AU: [][]byte{
{5, 1}, // IDR
},
})
strm.WriteUnit(test.MediaMPEG4Audio, test.FormatMPEG4Audio, &unit.MPEG4Audio{
Base: unit.Base{
NTP: time.Time{},
PTS: int64(i) * 44100,
},
AUs: [][]byte{{1, 2}},
})
}
<-recv1
<-recv2
case "always remux on":
s := &Server{
Address: "127.0.0.1:8888",
AlwaysRemux: true,
Variant: conf.HLSVariant(gohlslib.MuxerVariantMPEGTS),
SegmentCount: 7,
SegmentDuration: conf.Duration(1 * time.Second),
PartDuration: conf.Duration(200 * time.Millisecond),
SegmentMaxSize: 50 * 1024 * 1024,
TrustedProxies: conf.IPNetworks{},
ReadTimeout: conf.Duration(10 * time.Second),
PathManager: pm,
Parent: test.NilLogger,
}
err = s.Initialize()
require.NoError(t, err)
defer s.Close()
s.PathReady(&dummyPath{})
strm.WaitRunningReader()
for i := range 4 {
strm.WriteUnit(test.MediaH264, test.FormatH264, &unit.H264{
Base: unit.Base{
NTP: time.Time{},
PTS: int64(i) * 90000,
},
AU: [][]byte{
{5, 1}, // IDR
},
})
strm.WriteUnit(test.MediaMPEG4Audio, test.FormatMPEG4Audio, &unit.MPEG4Audio{
Base: unit.Base{
NTP: time.Time{},
PTS: int64(i) * 44100,
},
AUs: [][]byte{{1, 2}},
})
}
time.Sleep(100 * time.Millisecond)
c := &gohlslib.Client{
URI: "http://myuser:mypass@127.0.0.1:8888/teststream/index.m3u8?param=value",
}
recv1 := make(chan struct{})
recv2 := make(chan struct{})
c.OnTracks = func(tracks []*gohlslib.Track) error { //nolint:dupl
require.Equal(t, []*gohlslib.Track{
{
Codec: &codecs.H264{},
ClockRate: 90000,
},
{
Codec: &codecs.MPEG4Audio{
Config: mpeg4audio.AudioSpecificConfig{
Type: 2,
ChannelCount: 2,
SampleRate: 44100,
},
},
ClockRate: 90000,
},
}, tracks)
c.OnDataH26x(tracks[0], func(pts, dts int64, au [][]byte) {
require.Equal(t, int64(0), pts)
require.Equal(t, int64(0), dts)
require.Equal(t, [][]byte{
test.FormatH264.SPS,
test.FormatH264.PPS,
{5, 1},
}, au)
close(recv1)
})
c.OnDataMPEG4Audio(tracks[1], func(pts int64, aus [][]byte) {
require.Equal(t, int64(0), pts)
require.Equal(t, [][]byte{{1, 2}}, aus)
close(recv2)
})
return nil
}
err = c.Start()
require.NoError(t, err)
defer func() { <-c.Wait() }()
defer c.Close()
<-recv1
<-recv2
}
})
}
} }
func TestDirectory(t *testing.T) { func TestServerDirectory(t *testing.T) {
dir, err := os.MkdirTemp("", "mediamtx-playback") dir, err := os.MkdirTemp("", "mediamtx-playback")
require.NoError(t, err) require.NoError(t, err)
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
@ -427,7 +460,7 @@ func TestDirectory(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
} }
func TestDynamicAlwaysRemux(t *testing.T) { func TestServerDynamicAlwaysRemux(t *testing.T) {
desc := &description.Session{Medias: []*description.Media{test.MediaH264}} desc := &description.Session{Medias: []*description.Media{test.MediaH264}}
strm := &stream.Stream{ strm := &stream.Stream{