tests: fix race condition (#5301)

This commit is contained in:
Alessandro Ros 2025-12-28 16:36:54 +01:00 committed by GitHub
parent f843df4ab8
commit 5c67b2f08b
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 128 additions and 131 deletions

View file

@ -61,7 +61,12 @@ func TestServerPublish(t *testing.T) {
}
var strm *stream.Stream
streamCreated := make(chan struct{})
var reader *stream.Reader
defer func() {
strm.RemoveReader(reader)
}()
dataReceived := make(chan struct{})
n := 0
pathManager := &test.PathManager{
AddPublisherImpl: func(req defs.PathAddPublisherReq) (defs.Path, *stream.Stream, error) {
@ -80,7 +85,32 @@ func TestServerPublish(t *testing.T) {
err := strm.Initialize()
require.NoError(t, err)
close(streamCreated)
reader = &stream.Reader{Parent: test.NilLogger}
reader.OnData(
strm.Desc.Medias[0],
strm.Desc.Medias[0].Formats[0],
func(u *unit.Unit) error {
switch n {
case 0:
require.Equal(t, unit.PayloadH264(nil), u.Payload)
case 1:
require.Equal(t, unit.PayloadH264{
test.FormatH264.SPS,
test.FormatH264.PPS,
{5, 2, 3, 4},
}, u.Payload)
close(dataReceived)
default:
t.Errorf("should not happen")
}
n++
return nil
})
strm.AddReader(reader)
return &dummyPath{}, strm, nil
},
@ -141,36 +171,7 @@ func TestServerPublish(t *testing.T) {
})
require.NoError(t, err)
<-streamCreated
recv := make(chan struct{})
r := &stream.Reader{Parent: test.NilLogger}
r.OnData(
strm.Desc.Medias[0],
strm.Desc.Medias[0].Formats[0],
func(u *unit.Unit) error {
require.Equal(t, unit.PayloadH264{
test.FormatH264.SPS,
test.FormatH264.PPS,
{5, 2, 3, 4},
}, u.Payload)
close(recv)
return nil
})
strm.AddReader(r)
defer strm.RemoveReader(r)
err = w.WriteH264(
test.FormatH264,
3*time.Second, 3*time.Second, [][]byte{
{5, 2, 3, 4},
})
require.NoError(t, err)
<-recv
<-dataReceived
})
}
}

View file

@ -47,7 +47,12 @@ func TestServerPublish(t *testing.T) {
for _, ca := range []string{"basic", "digest", "basic+digest"} {
t.Run(ca, func(t *testing.T) {
var strm *stream.Stream
streamCreated := make(chan struct{})
var reader *stream.Reader
defer func() {
strm.RemoveReader(reader)
}()
dataReceived := make(chan struct{})
n := 0
pathManager := &test.PathManager{
@ -91,7 +96,22 @@ func TestServerPublish(t *testing.T) {
err := strm.Initialize()
require.NoError(t, err)
close(streamCreated)
reader = &stream.Reader{Parent: test.NilLogger}
reader.OnData(
strm.Desc.Medias[0],
strm.Desc.Medias[0].Formats[0],
func(u *unit.Unit) error {
require.Equal(t, unit.PayloadH264{
test.FormatH264.SPS,
test.FormatH264.PPS,
{5, 2, 3, 4},
}, u.Payload)
close(dataReceived)
return nil
})
strm.AddReader(reader)
return &dummyPath{}, strm, nil
},
@ -131,28 +151,6 @@ func TestServerPublish(t *testing.T) {
require.NoError(t, err)
defer source.Close()
<-streamCreated
r := &stream.Reader{Parent: test.NilLogger}
recv := make(chan struct{})
r.OnData(
strm.Desc.Medias[0],
strm.Desc.Medias[0].Formats[0],
func(u *unit.Unit) error {
require.Equal(t, unit.PayloadH264{
test.FormatH264.SPS,
test.FormatH264.PPS,
{5, 2, 3, 4},
}, u.Payload)
close(recv)
return nil
})
strm.AddReader(r)
defer strm.RemoveReader(r)
err = source.WritePacketRTP(media0, &rtp.Packet{
Header: rtp.Header{
Version: 2,
@ -166,7 +164,7 @@ func TestServerPublish(t *testing.T) {
})
require.NoError(t, err)
<-recv
<-dataReceived
})
}
}

View file

@ -44,7 +44,13 @@ func TestServerPublish(t *testing.T) {
defer externalCmdPool.Close()
var strm *stream.Stream
streamCreated := make(chan struct{})
var reader *stream.Reader
defer func() {
strm.RemoveReader(reader)
}()
dataReceived := make(chan struct{})
dataReceived2 := make(chan struct{})
n := 0
pathManager := &test.PathManager{
FindPathConfImpl: func(req defs.PathFindPathConfReq) (*conf.Path, error) {
@ -69,7 +75,37 @@ func TestServerPublish(t *testing.T) {
err := strm.Initialize()
require.NoError(t, err)
close(streamCreated)
reader = &stream.Reader{Parent: test.NilLogger}
reader.OnData(
strm.Desc.Medias[0],
strm.Desc.Medias[0].Formats[0],
func(u *unit.Unit) error {
switch n {
case 0:
require.Equal(t, unit.PayloadH264{
test.FormatH264.SPS,
test.FormatH264.PPS,
{5, 1},
}, u.Payload)
close(dataReceived)
case 1:
require.Equal(t, unit.PayloadH264{
test.FormatH264.SPS,
test.FormatH264.PPS,
{5, 2},
}, u.Payload)
close(dataReceived2)
default:
t.Errorf("should not happen")
}
n++
return nil
})
strm.AddReader(reader)
return &dummyPath{}, strm, nil
},
@ -103,7 +139,6 @@ func TestServerPublish(t *testing.T) {
publisher, err := srt.Dial("srt", address, srtConf)
require.NoError(t, err)
defer publisher.Close()
track := &mpegts.Track{
Codec: &tscodecs.H264{},
@ -114,47 +149,28 @@ func TestServerPublish(t *testing.T) {
err = w.Initialize()
require.NoError(t, err)
// the MPEG-TS muxer needs two PES packets in order to write the first one
err = w.WriteH264(track, 0, 0, [][]byte{
test.FormatH264.SPS,
test.FormatH264.PPS,
{0x05, 1}, // IDR
{5, 1}, // IDR
})
require.NoError(t, err)
err = bw.Flush()
require.NoError(t, err)
<-streamCreated
r := &stream.Reader{Parent: test.NilLogger}
recv := make(chan struct{})
r.OnData(
strm.Desc.Medias[0],
strm.Desc.Medias[0].Formats[0],
func(u *unit.Unit) error {
require.Equal(t, unit.PayloadH264{
test.FormatH264.SPS,
test.FormatH264.PPS,
{0x05, 1}, // IDR
}, u.Payload)
close(recv)
return nil
})
strm.AddReader(r)
defer strm.RemoveReader(r)
err = w.WriteH264(track, 0, 0, [][]byte{
{5, 2},
{5, 2}, // IDR
})
require.NoError(t, err)
err = bw.Flush()
require.NoError(t, err)
<-recv
<-dataReceived
// the second PES is written after writer is closed
publisher.Close()
<-dataReceived2
}
func TestServerRead(t *testing.T) {

View file

@ -197,7 +197,11 @@ func TestServerOptionsICEServer(t *testing.T) {
func TestServerPublish(t *testing.T) {
var strm *stream.Stream
streamCreated := make(chan struct{})
var reader *stream.Reader
defer func() {
strm.RemoveReader(reader)
}()
dataReceived := make(chan struct{})
pathManager := &test.PathManager{
FindPathConfImpl: func(req defs.PathFindPathConfReq) (*conf.Path, error) {
@ -222,7 +226,25 @@ func TestServerPublish(t *testing.T) {
err := strm.Initialize()
require.NoError(t, err)
close(streamCreated)
reader = &stream.Reader{Parent: test.NilLogger}
reader.OnData(
strm.Desc.Medias[0],
strm.Desc.Medias[0].Formats[0],
func(u *unit.Unit) error {
/* select {
case <-recv:
return nil
default:
} */
require.Equal(t, unit.PayloadH264{
{1},
}, u.Payload)
close(dataReceived)
return nil
})
strm.AddReader(reader)
return &dummyPath{}, strm, nil
},
@ -289,47 +311,7 @@ func TestServerPublish(t *testing.T) {
})
require.NoError(t, err)
<-streamCreated
r := &stream.Reader{Parent: test.NilLogger}
recv := make(chan struct{})
r.OnData(
strm.Desc.Medias[0],
strm.Desc.Medias[0].Formats[0],
func(u *unit.Unit) error {
select {
case <-recv:
return nil
default:
}
require.Equal(t, unit.PayloadH264{
{1},
}, u.Payload)
close(recv)
return nil
})
strm.AddReader(r)
defer strm.RemoveReader(r)
err = track.WriteRTP(&rtp.Packet{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 124,
Timestamp: 45343,
SSRC: 563423,
},
Payload: []byte{1},
})
require.NoError(t, err)
<-recv
<-dataReceived
}
func TestServerRead(t *testing.T) {