diff --git a/go.mod b/go.mod index fc619b06..7e2d46e2 100644 --- a/go.mod +++ b/go.mod @@ -10,7 +10,7 @@ require ( github.com/abema/go-mp4 v1.4.1 github.com/alecthomas/kong v1.13.0 github.com/asticode/go-astits v1.14.0 - github.com/bluenviron/gohlslib/v2 v2.2.4 + github.com/bluenviron/gohlslib/v2 v2.2.5-0.20260117214804-b8c1ff42629d github.com/bluenviron/gortmplib v0.2.0 github.com/bluenviron/gortsplib/v5 v5.2.2 github.com/bluenviron/mediacommon/v2 v2.6.0 diff --git a/go.sum b/go.sum index f1e99753..cf8ce034 100644 --- a/go.sum +++ b/go.sum @@ -35,8 +35,8 @@ github.com/asticode/go-astits v1.14.0 h1:zkgnZzipx2XX5mWycqsSBeEyDH58+i4HtyF4j2R github.com/asticode/go-astits v1.14.0/go.mod h1:QSHmknZ51pf6KJdHKZHJTLlMegIrhega3LPWz3ND/iI= github.com/benburkert/openpgp v0.0.0-20160410205803-c2471f86866c h1:8XZeJrs4+ZYhJeJ2aZxADI2tGADS15AzIF8MQ8XAhT4= github.com/benburkert/openpgp v0.0.0-20160410205803-c2471f86866c/go.mod h1:x1vxHcL/9AVzuk5HOloOEPrtJY0MaalYr78afXZ+pWI= -github.com/bluenviron/gohlslib/v2 v2.2.4 h1:5F/Ud2VuJrrLYCDV0Ham947UIIrd801/GyoCsPYMqiw= -github.com/bluenviron/gohlslib/v2 v2.2.4/go.mod h1:5J8Ry4xraLs6W0B2gVS0IqhCzcq+CDDJIisfkpoHyxM= +github.com/bluenviron/gohlslib/v2 v2.2.5-0.20260117214804-b8c1ff42629d h1:Mm56CDoDBNgVRI4vQHt6+JMQ+yiEX8Fljlh/o2NskaQ= +github.com/bluenviron/gohlslib/v2 v2.2.5-0.20260117214804-b8c1ff42629d/go.mod h1:iatp6XDIncpGlua8YU2wga01fU7pzp42nWG684GZhbM= github.com/bluenviron/gortmplib v0.2.0 h1:j15eeHrgVh6Avg9oAx+r4w0HugTqrIqLBsYnhs3D1dE= github.com/bluenviron/gortmplib v0.2.0/go.mod h1:yzobxBF8zusF2nKbEOF69zIIL429j0kaCWc/euNdvO4= github.com/bluenviron/gortsplib/v5 v5.2.2 h1:5q2viB8PGxWOSXNhVvj8buyr1wighLbHqRZ0U7MLM3o= diff --git a/internal/servers/hls/server_test.go b/internal/servers/hls/server_test.go index a68990c7..f824c7b3 100644 --- a/internal/servers/hls/server_test.go +++ b/internal/servers/hls/server_test.go @@ -235,7 +235,8 @@ func TestServerRead(t *testing.T) { defer s.Close() c := &gohlslib.Client{ - URI: "http://myuser:mypass@127.0.0.1:8888/teststream/index.m3u8?param=value", + URI: "http://myuser:mypass@127.0.0.1:8888/teststream/index.m3u8?param=value", + StartDistance: 1, } recv1 := make(chan struct{}) @@ -283,9 +284,9 @@ func TestServerRead(t *testing.T) { require.NoError(t, err) defer c.Close() - time.Sleep(100 * time.Millisecond) + strm.WaitForReaders() - for i := range 4 { + for i := range 2 { strm.WriteUnit(test.MediaH264, test.FormatH264, &unit.Unit{ NTP: time.Time{}, PTS: int64(i) * 90000, @@ -324,9 +325,9 @@ func TestServerRead(t *testing.T) { s.PathReady(&dummyPath{}) - time.Sleep(500 * time.Millisecond) + strm.WaitForReaders() - for i := range 4 { + for i := range 2 { strm.WriteUnit(test.MediaH264, test.FormatH264, &unit.Unit{ NTP: time.Time{}, PTS: int64(i) * 90000, @@ -341,10 +342,9 @@ func TestServerRead(t *testing.T) { }) } - time.Sleep(100 * time.Millisecond) - c := &gohlslib.Client{ - URI: "http://myuser:mypass@127.0.0.1:8888/teststream/index.m3u8?param=value", + URI: "http://myuser:mypass@127.0.0.1:8888/teststream/index.m3u8?param=value", + StartDistance: 1, } recv1 := make(chan struct{}) diff --git a/internal/servers/rtmp/server_test.go b/internal/servers/rtmp/server_test.go index e566521f..defa53c5 100644 --- a/internal/servers/rtmp/server_test.go +++ b/internal/servers/rtmp/server_test.go @@ -266,9 +266,9 @@ func TestServerRead(t *testing.T) { require.NoError(t, err) defer conn.Close() - go func() { - time.Sleep(500 * time.Millisecond) + strm.WaitForReaders() + go func() { strm.WriteUnit(desc.Medias[0], desc.Medias[0].Formats[0], &unit.Unit{ NTP: time.Time{}, Payload: unit.PayloadH264{ diff --git a/internal/servers/srt/server_test.go b/internal/servers/srt/server_test.go index f3756bb1..e3d6b2b1 100644 --- a/internal/servers/srt/server_test.go +++ b/internal/servers/srt/server_test.go @@ -230,7 +230,7 @@ func TestServerRead(t *testing.T) { require.NoError(t, err) defer reader.Close() - time.Sleep(500 * time.Millisecond) + strm.WaitForReaders() strm.WriteUnit(desc.Medias[0], desc.Medias[0].Formats[0], &unit.Unit{ NTP: time.Time{}, diff --git a/internal/stream/stream.go b/internal/stream/stream.go index 467d6c30..bdd46dc0 100644 --- a/internal/stream/stream.go +++ b/internal/stream/stream.go @@ -34,6 +34,8 @@ type Stream struct { rtspsStream *gortsplib.ServerStream readers map[*Reader]struct{} processingErrors *errordumper.Dumper + + hasReaders chan struct{} } // Initialize initializes a Stream. @@ -42,6 +44,7 @@ func (s *Stream) Initialize() error { s.bytesSent = new(uint64) s.medias = make(map[*description.Media]*streamMedia) s.readers = make(map[*Reader]struct{}) + s.hasReaders = make(chan struct{}) s.processingErrors = &errordumper.Dumper{ OnReport: func(val uint64, last error) { @@ -166,6 +169,12 @@ func (s *Stream) AddReader(r *Reader) { r.queueSize = s.WriteQueueSize r.start() + + select { + case <-s.hasReaders: + default: + close(s.hasReaders) + } } // RemoveReader removes a reader. @@ -188,6 +197,11 @@ func (s *Stream) RemoveReader(r *Reader) { delete(s.readers, r) } +// WaitForReaders waits for the stream to have at least one reader. +func (s *Stream) WaitForReaders() { + <-s.hasReaders +} + // WriteUnit writes a Unit. func (s *Stream) WriteUnit(medi *description.Media, forma format.Format, u *unit.Unit) { sm := s.medias[medi]