From f03ff73ef3413cdc9a87a88213b25676e328ce35 Mon Sep 17 00:00:00 2001 From: aler9 <46489434+aler9@users.noreply.github.com> Date: Sun, 31 Jan 2021 16:24:58 +0100 Subject: [PATCH] add read timeout to RTMP sources --- internal/path/path.go | 1 + internal/sourcertmp/source.go | 25 ++++++++++++++++--------- 2 files changed, 17 insertions(+), 9 deletions(-) diff --git a/internal/path/path.go b/internal/path/path.go index 04aa21ad..317bb693 100644 --- a/internal/path/path.go +++ b/internal/path/path.go @@ -432,6 +432,7 @@ func (pa *Path) startExternalSource() { } else if strings.HasPrefix(pa.conf.Source, "rtmp://") { pa.source = sourcertmp.New( pa.conf.Source, + pa.readTimeout, &pa.sourceWg, pa.stats, pa) diff --git a/internal/sourcertmp/source.go b/internal/sourcertmp/source.go index 7480e55a..5bee9c78 100644 --- a/internal/sourcertmp/source.go +++ b/internal/sourcertmp/source.go @@ -37,10 +37,11 @@ type Parent interface { // Source is a RTMP source. type Source struct { - ur string - wg *sync.WaitGroup - stats *stats.Stats - parent Parent + ur string + readTimeout time.Duration + wg *sync.WaitGroup + stats *stats.Stats + parent Parent // in terminate chan struct{} @@ -48,15 +49,17 @@ type Source struct { // New allocates a Source. func New(ur string, + readTimeout time.Duration, wg *sync.WaitGroup, stats *stats.Stats, parent Parent) *Source { s := &Source{ - ur: ur, - wg: wg, - stats: stats, - parent: parent, - terminate: make(chan struct{}), + ur: ur, + readTimeout: readTimeout, + wg: wg, + stats: stats, + parent: parent, + terminate: make(chan struct{}), } atomic.AddInt64(s.stats.CountSourcesRtmp, +1) @@ -170,6 +173,9 @@ func (s *Source) runInner() bool { var audioRTCPSender *rtcpsender.RTCPSender var aacEncoder *rtpaac.Encoder + // configuration must be completed within readTimeout + nconn.SetReadDeadline(time.Now().Add(s.readTimeout)) + confDone := make(chan error) go func() { confDone <- func() error { @@ -331,6 +337,7 @@ func (s *Source) runInner() bool { go func() { readerDone <- func() error { for { + nconn.SetReadDeadline(time.Now().Add(s.readTimeout)) pkt, err := conn.ReadPacket() if err != nil { return err