tests: fix race condition (#5354)
Some checks failed
code_lint / go (push) Has been cancelled
code_lint / go_mod (push) Has been cancelled
code_lint / docs (push) Has been cancelled
code_lint / api_docs (push) Has been cancelled
code_test / test_64 (push) Has been cancelled
code_test / test_32 (push) Has been cancelled
code_test / test_e2e (push) Has been cancelled

This commit is contained in:
Alessandro Ros 2026-01-17 23:21:34 +01:00 committed by GitHub
parent 8901ac4e9c
commit 02fe2e2b42
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 28 additions and 14 deletions

2
go.mod
View file

@ -10,7 +10,7 @@ require (
github.com/abema/go-mp4 v1.4.1
github.com/alecthomas/kong v1.13.0
github.com/asticode/go-astits v1.14.0
github.com/bluenviron/gohlslib/v2 v2.2.4
github.com/bluenviron/gohlslib/v2 v2.2.5-0.20260117214804-b8c1ff42629d
github.com/bluenviron/gortmplib v0.2.0
github.com/bluenviron/gortsplib/v5 v5.2.2
github.com/bluenviron/mediacommon/v2 v2.6.0

4
go.sum
View file

@ -35,8 +35,8 @@ github.com/asticode/go-astits v1.14.0 h1:zkgnZzipx2XX5mWycqsSBeEyDH58+i4HtyF4j2R
github.com/asticode/go-astits v1.14.0/go.mod h1:QSHmknZ51pf6KJdHKZHJTLlMegIrhega3LPWz3ND/iI=
github.com/benburkert/openpgp v0.0.0-20160410205803-c2471f86866c h1:8XZeJrs4+ZYhJeJ2aZxADI2tGADS15AzIF8MQ8XAhT4=
github.com/benburkert/openpgp v0.0.0-20160410205803-c2471f86866c/go.mod h1:x1vxHcL/9AVzuk5HOloOEPrtJY0MaalYr78afXZ+pWI=
github.com/bluenviron/gohlslib/v2 v2.2.4 h1:5F/Ud2VuJrrLYCDV0Ham947UIIrd801/GyoCsPYMqiw=
github.com/bluenviron/gohlslib/v2 v2.2.4/go.mod h1:5J8Ry4xraLs6W0B2gVS0IqhCzcq+CDDJIisfkpoHyxM=
github.com/bluenviron/gohlslib/v2 v2.2.5-0.20260117214804-b8c1ff42629d h1:Mm56CDoDBNgVRI4vQHt6+JMQ+yiEX8Fljlh/o2NskaQ=
github.com/bluenviron/gohlslib/v2 v2.2.5-0.20260117214804-b8c1ff42629d/go.mod h1:iatp6XDIncpGlua8YU2wga01fU7pzp42nWG684GZhbM=
github.com/bluenviron/gortmplib v0.2.0 h1:j15eeHrgVh6Avg9oAx+r4w0HugTqrIqLBsYnhs3D1dE=
github.com/bluenviron/gortmplib v0.2.0/go.mod h1:yzobxBF8zusF2nKbEOF69zIIL429j0kaCWc/euNdvO4=
github.com/bluenviron/gortsplib/v5 v5.2.2 h1:5q2viB8PGxWOSXNhVvj8buyr1wighLbHqRZ0U7MLM3o=

View file

@ -235,7 +235,8 @@ func TestServerRead(t *testing.T) {
defer s.Close()
c := &gohlslib.Client{
URI: "http://myuser:mypass@127.0.0.1:8888/teststream/index.m3u8?param=value",
URI: "http://myuser:mypass@127.0.0.1:8888/teststream/index.m3u8?param=value",
StartDistance: 1,
}
recv1 := make(chan struct{})
@ -283,9 +284,9 @@ func TestServerRead(t *testing.T) {
require.NoError(t, err)
defer c.Close()
time.Sleep(100 * time.Millisecond)
strm.WaitForReaders()
for i := range 4 {
for i := range 2 {
strm.WriteUnit(test.MediaH264, test.FormatH264, &unit.Unit{
NTP: time.Time{},
PTS: int64(i) * 90000,
@ -324,9 +325,9 @@ func TestServerRead(t *testing.T) {
s.PathReady(&dummyPath{})
time.Sleep(500 * time.Millisecond)
strm.WaitForReaders()
for i := range 4 {
for i := range 2 {
strm.WriteUnit(test.MediaH264, test.FormatH264, &unit.Unit{
NTP: time.Time{},
PTS: int64(i) * 90000,
@ -341,10 +342,9 @@ func TestServerRead(t *testing.T) {
})
}
time.Sleep(100 * time.Millisecond)
c := &gohlslib.Client{
URI: "http://myuser:mypass@127.0.0.1:8888/teststream/index.m3u8?param=value",
URI: "http://myuser:mypass@127.0.0.1:8888/teststream/index.m3u8?param=value",
StartDistance: 1,
}
recv1 := make(chan struct{})

View file

@ -266,9 +266,9 @@ func TestServerRead(t *testing.T) {
require.NoError(t, err)
defer conn.Close()
go func() {
time.Sleep(500 * time.Millisecond)
strm.WaitForReaders()
go func() {
strm.WriteUnit(desc.Medias[0], desc.Medias[0].Formats[0], &unit.Unit{
NTP: time.Time{},
Payload: unit.PayloadH264{

View file

@ -230,7 +230,7 @@ func TestServerRead(t *testing.T) {
require.NoError(t, err)
defer reader.Close()
time.Sleep(500 * time.Millisecond)
strm.WaitForReaders()
strm.WriteUnit(desc.Medias[0], desc.Medias[0].Formats[0], &unit.Unit{
NTP: time.Time{},

View file

@ -34,6 +34,8 @@ type Stream struct {
rtspsStream *gortsplib.ServerStream
readers map[*Reader]struct{}
processingErrors *errordumper.Dumper
hasReaders chan struct{}
}
// Initialize initializes a Stream.
@ -42,6 +44,7 @@ func (s *Stream) Initialize() error {
s.bytesSent = new(uint64)
s.medias = make(map[*description.Media]*streamMedia)
s.readers = make(map[*Reader]struct{})
s.hasReaders = make(chan struct{})
s.processingErrors = &errordumper.Dumper{
OnReport: func(val uint64, last error) {
@ -166,6 +169,12 @@ func (s *Stream) AddReader(r *Reader) {
r.queueSize = s.WriteQueueSize
r.start()
select {
case <-s.hasReaders:
default:
close(s.hasReaders)
}
}
// RemoveReader removes a reader.
@ -188,6 +197,11 @@ func (s *Stream) RemoveReader(r *Reader) {
delete(s.readers, r)
}
// WaitForReaders waits for the stream to have at least one reader.
func (s *Stream) WaitForReaders() {
<-s.hasReaders
}
// WriteUnit writes a Unit.
func (s *Stream) WriteUnit(medi *description.Media, forma format.Format, u *unit.Unit) {
sm := s.medias[medi]