diff --git a/internal/servers/rtmp/server_test.go b/internal/servers/rtmp/server_test.go index 7cf5a4e1..2ae60f5a 100644 --- a/internal/servers/rtmp/server_test.go +++ b/internal/servers/rtmp/server_test.go @@ -61,7 +61,12 @@ func TestServerPublish(t *testing.T) { } var strm *stream.Stream - streamCreated := make(chan struct{}) + var reader *stream.Reader + defer func() { + strm.RemoveReader(reader) + }() + dataReceived := make(chan struct{}) + n := 0 pathManager := &test.PathManager{ AddPublisherImpl: func(req defs.PathAddPublisherReq) (defs.Path, *stream.Stream, error) { @@ -80,7 +85,32 @@ func TestServerPublish(t *testing.T) { err := strm.Initialize() require.NoError(t, err) - close(streamCreated) + reader = &stream.Reader{Parent: test.NilLogger} + + reader.OnData( + strm.Desc.Medias[0], + strm.Desc.Medias[0].Formats[0], + func(u *unit.Unit) error { + switch n { + case 0: + require.Equal(t, unit.PayloadH264(nil), u.Payload) + + case 1: + require.Equal(t, unit.PayloadH264{ + test.FormatH264.SPS, + test.FormatH264.PPS, + {5, 2, 3, 4}, + }, u.Payload) + close(dataReceived) + + default: + t.Errorf("should not happen") + } + n++ + return nil + }) + + strm.AddReader(reader) return &dummyPath{}, strm, nil }, @@ -141,36 +171,7 @@ func TestServerPublish(t *testing.T) { }) require.NoError(t, err) - <-streamCreated - - recv := make(chan struct{}) - - r := &stream.Reader{Parent: test.NilLogger} - - r.OnData( - strm.Desc.Medias[0], - strm.Desc.Medias[0].Formats[0], - func(u *unit.Unit) error { - require.Equal(t, unit.PayloadH264{ - test.FormatH264.SPS, - test.FormatH264.PPS, - {5, 2, 3, 4}, - }, u.Payload) - close(recv) - return nil - }) - - strm.AddReader(r) - defer strm.RemoveReader(r) - - err = w.WriteH264( - test.FormatH264, - 3*time.Second, 3*time.Second, [][]byte{ - {5, 2, 3, 4}, - }) - require.NoError(t, err) - - <-recv + <-dataReceived }) } } diff --git a/internal/servers/rtsp/server_test.go b/internal/servers/rtsp/server_test.go index ef0e4743..71123f12 100644 --- a/internal/servers/rtsp/server_test.go +++ b/internal/servers/rtsp/server_test.go @@ -47,7 +47,12 @@ func TestServerPublish(t *testing.T) { for _, ca := range []string{"basic", "digest", "basic+digest"} { t.Run(ca, func(t *testing.T) { var strm *stream.Stream - streamCreated := make(chan struct{}) + var reader *stream.Reader + defer func() { + strm.RemoveReader(reader) + }() + dataReceived := make(chan struct{}) + n := 0 pathManager := &test.PathManager{ @@ -91,7 +96,22 @@ func TestServerPublish(t *testing.T) { err := strm.Initialize() require.NoError(t, err) - close(streamCreated) + reader = &stream.Reader{Parent: test.NilLogger} + + reader.OnData( + strm.Desc.Medias[0], + strm.Desc.Medias[0].Formats[0], + func(u *unit.Unit) error { + require.Equal(t, unit.PayloadH264{ + test.FormatH264.SPS, + test.FormatH264.PPS, + {5, 2, 3, 4}, + }, u.Payload) + close(dataReceived) + return nil + }) + + strm.AddReader(reader) return &dummyPath{}, strm, nil }, @@ -131,28 +151,6 @@ func TestServerPublish(t *testing.T) { require.NoError(t, err) defer source.Close() - <-streamCreated - - r := &stream.Reader{Parent: test.NilLogger} - - recv := make(chan struct{}) - - r.OnData( - strm.Desc.Medias[0], - strm.Desc.Medias[0].Formats[0], - func(u *unit.Unit) error { - require.Equal(t, unit.PayloadH264{ - test.FormatH264.SPS, - test.FormatH264.PPS, - {5, 2, 3, 4}, - }, u.Payload) - close(recv) - return nil - }) - - strm.AddReader(r) - defer strm.RemoveReader(r) - err = source.WritePacketRTP(media0, &rtp.Packet{ Header: rtp.Header{ Version: 2, @@ -166,7 +164,7 @@ func TestServerPublish(t *testing.T) { }) require.NoError(t, err) - <-recv + <-dataReceived }) } } diff --git a/internal/servers/srt/server_test.go b/internal/servers/srt/server_test.go index 5b669554..e2ae3c7f 100644 --- a/internal/servers/srt/server_test.go +++ b/internal/servers/srt/server_test.go @@ -44,7 +44,13 @@ func TestServerPublish(t *testing.T) { defer externalCmdPool.Close() var strm *stream.Stream - streamCreated := make(chan struct{}) + var reader *stream.Reader + defer func() { + strm.RemoveReader(reader) + }() + dataReceived := make(chan struct{}) + dataReceived2 := make(chan struct{}) + n := 0 pathManager := &test.PathManager{ FindPathConfImpl: func(req defs.PathFindPathConfReq) (*conf.Path, error) { @@ -69,7 +75,37 @@ func TestServerPublish(t *testing.T) { err := strm.Initialize() require.NoError(t, err) - close(streamCreated) + reader = &stream.Reader{Parent: test.NilLogger} + + reader.OnData( + strm.Desc.Medias[0], + strm.Desc.Medias[0].Formats[0], + func(u *unit.Unit) error { + switch n { + case 0: + require.Equal(t, unit.PayloadH264{ + test.FormatH264.SPS, + test.FormatH264.PPS, + {5, 1}, + }, u.Payload) + close(dataReceived) + + case 1: + require.Equal(t, unit.PayloadH264{ + test.FormatH264.SPS, + test.FormatH264.PPS, + {5, 2}, + }, u.Payload) + close(dataReceived2) + + default: + t.Errorf("should not happen") + } + n++ + return nil + }) + + strm.AddReader(reader) return &dummyPath{}, strm, nil }, @@ -103,7 +139,6 @@ func TestServerPublish(t *testing.T) { publisher, err := srt.Dial("srt", address, srtConf) require.NoError(t, err) - defer publisher.Close() track := &mpegts.Track{ Codec: &tscodecs.H264{}, @@ -114,47 +149,28 @@ func TestServerPublish(t *testing.T) { err = w.Initialize() require.NoError(t, err) + // the MPEG-TS muxer needs two PES packets in order to write the first one + err = w.WriteH264(track, 0, 0, [][]byte{ test.FormatH264.SPS, test.FormatH264.PPS, - {0x05, 1}, // IDR + {5, 1}, // IDR }) require.NoError(t, err) - err = bw.Flush() - require.NoError(t, err) - - <-streamCreated - - r := &stream.Reader{Parent: test.NilLogger} - - recv := make(chan struct{}) - - r.OnData( - strm.Desc.Medias[0], - strm.Desc.Medias[0].Formats[0], - func(u *unit.Unit) error { - require.Equal(t, unit.PayloadH264{ - test.FormatH264.SPS, - test.FormatH264.PPS, - {0x05, 1}, // IDR - }, u.Payload) - close(recv) - return nil - }) - - strm.AddReader(r) - defer strm.RemoveReader(r) - err = w.WriteH264(track, 0, 0, [][]byte{ - {5, 2}, + {5, 2}, // IDR }) require.NoError(t, err) err = bw.Flush() require.NoError(t, err) - <-recv + <-dataReceived + + // the second PES is written after writer is closed + publisher.Close() + <-dataReceived2 } func TestServerRead(t *testing.T) { diff --git a/internal/servers/webrtc/server_test.go b/internal/servers/webrtc/server_test.go index 0f12949c..2b1d217a 100644 --- a/internal/servers/webrtc/server_test.go +++ b/internal/servers/webrtc/server_test.go @@ -197,7 +197,11 @@ func TestServerOptionsICEServer(t *testing.T) { func TestServerPublish(t *testing.T) { var strm *stream.Stream - streamCreated := make(chan struct{}) + var reader *stream.Reader + defer func() { + strm.RemoveReader(reader) + }() + dataReceived := make(chan struct{}) pathManager := &test.PathManager{ FindPathConfImpl: func(req defs.PathFindPathConfReq) (*conf.Path, error) { @@ -222,7 +226,25 @@ func TestServerPublish(t *testing.T) { err := strm.Initialize() require.NoError(t, err) - close(streamCreated) + reader = &stream.Reader{Parent: test.NilLogger} + + reader.OnData( + strm.Desc.Medias[0], + strm.Desc.Medias[0].Formats[0], + func(u *unit.Unit) error { + /* select { + case <-recv: + return nil + default: + } */ + require.Equal(t, unit.PayloadH264{ + {1}, + }, u.Payload) + close(dataReceived) + return nil + }) + + strm.AddReader(reader) return &dummyPath{}, strm, nil }, @@ -289,47 +311,7 @@ func TestServerPublish(t *testing.T) { }) require.NoError(t, err) - <-streamCreated - - r := &stream.Reader{Parent: test.NilLogger} - - recv := make(chan struct{}) - - r.OnData( - strm.Desc.Medias[0], - strm.Desc.Medias[0].Formats[0], - func(u *unit.Unit) error { - select { - case <-recv: - return nil - default: - } - - require.Equal(t, unit.PayloadH264{ - {1}, - }, u.Payload) - close(recv) - - return nil - }) - - strm.AddReader(r) - defer strm.RemoveReader(r) - - err = track.WriteRTP(&rtp.Packet{ - Header: rtp.Header{ - Version: 2, - Marker: true, - PayloadType: 96, - SequenceNumber: 124, - Timestamp: 45343, - SSRC: 563423, - }, - Payload: []byte{1}, - }) - require.NoError(t, err) - - <-recv + <-dataReceived } func TestServerRead(t *testing.T) {