diff --git a/internal/path/path.go b/internal/path/path.go index 04aa21ad..317bb693 100644 --- a/internal/path/path.go +++ b/internal/path/path.go @@ -432,6 +432,7 @@ func (pa *Path) startExternalSource() { } else if strings.HasPrefix(pa.conf.Source, "rtmp://") { pa.source = sourcertmp.New( pa.conf.Source, + pa.readTimeout, &pa.sourceWg, pa.stats, pa) diff --git a/internal/sourcertmp/source.go b/internal/sourcertmp/source.go index 7480e55a..5bee9c78 100644 --- a/internal/sourcertmp/source.go +++ b/internal/sourcertmp/source.go @@ -37,10 +37,11 @@ type Parent interface { // Source is a RTMP source. type Source struct { - ur string - wg *sync.WaitGroup - stats *stats.Stats - parent Parent + ur string + readTimeout time.Duration + wg *sync.WaitGroup + stats *stats.Stats + parent Parent // in terminate chan struct{} @@ -48,15 +49,17 @@ type Source struct { // New allocates a Source. func New(ur string, + readTimeout time.Duration, wg *sync.WaitGroup, stats *stats.Stats, parent Parent) *Source { s := &Source{ - ur: ur, - wg: wg, - stats: stats, - parent: parent, - terminate: make(chan struct{}), + ur: ur, + readTimeout: readTimeout, + wg: wg, + stats: stats, + parent: parent, + terminate: make(chan struct{}), } atomic.AddInt64(s.stats.CountSourcesRtmp, +1) @@ -170,6 +173,9 @@ func (s *Source) runInner() bool { var audioRTCPSender *rtcpsender.RTCPSender var aacEncoder *rtpaac.Encoder + // configuration must be completed within readTimeout + nconn.SetReadDeadline(time.Now().Add(s.readTimeout)) + confDone := make(chan error) go func() { confDone <- func() error { @@ -331,6 +337,7 @@ func (s *Source) runInner() bool { go func() { readerDone <- func() error { for { + nconn.SetReadDeadline(time.Now().Add(s.readTimeout)) pkt, err := conn.ReadPacket() if err != nil { return err