rtmp: support connecting to sources that require standard credentials (#4530)
Some checks are pending
code_lint / golangci_lint (push) Waiting to run
code_lint / mod_tidy (push) Waiting to run
code_lint / api_docs (push) Waiting to run
code_test / test_64 (push) Waiting to run
code_test / test_32 (push) Waiting to run
code_test / test_e2e (push) Waiting to run

This commit is contained in:
Alessandro Ros 2025-05-15 14:23:03 +02:00 committed by GitHub
parent b48a0098d3
commit 1b9dfbd367
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
19 changed files with 2408 additions and 1404 deletions

View file

@ -3,7 +3,6 @@ package rtmp
import (
"context"
ctls "crypto/tls"
"fmt"
"net"
"net/url"
@ -50,53 +49,38 @@ func (s *Source) Run(params defs.StaticSourceRunParams) error {
}
}
nconn, err := func() (net.Conn, error) {
ctx2, cancel2 := context.WithTimeout(params.Context, time.Duration(s.ReadTimeout))
defer cancel2()
if u.Scheme == "rtmp" {
return (&net.Dialer{}).DialContext(ctx2, "tcp", u.Host)
}
return (&ctls.Dialer{
Config: tls.ConfigForFingerprint(params.Conf.SourceFingerprint),
}).DialContext(ctx2, "tcp", u.Host)
}()
if err != nil {
return err
}
ctx, ctxCancel := context.WithCancel(context.Background())
readDone := make(chan error)
go func() {
readDone <- s.runReader(u, nconn)
readDone <- s.runReader(ctx, u, params.Conf.SourceFingerprint)
}()
for {
select {
case err := <-readDone:
nconn.Close()
ctxCancel()
return err
case <-params.ReloadConf:
case <-params.Context.Done():
nconn.Close()
ctxCancel()
<-readDone
return nil
}
}
}
func (s *Source) runReader(u *url.URL, nconn net.Conn) error {
nconn.SetReadDeadline(time.Now().Add(time.Duration(s.ReadTimeout)))
nconn.SetWriteDeadline(time.Now().Add(time.Duration(s.WriteTimeout)))
conn := &rtmp.Conn{
RW: nconn,
Client: true,
URL: u,
Publish: false,
func (s *Source) runReader(ctx context.Context, u *url.URL, fingerprint string) error {
connectCtx, connectCtxCancel := context.WithTimeout(ctx, time.Duration(s.ReadTimeout))
conn := &rtmp.Client{
URL: u,
TLSConfig: tls.ConfigForFingerprint(fingerprint),
Publish: false,
}
err := conn.Initialize()
err := conn.Initialize(connectCtx)
connectCtxCancel()
if err != nil {
return err
}
@ -106,6 +90,7 @@ func (s *Source) runReader(u *url.URL, nconn net.Conn) error {
}
err = r.Initialize()
if err != nil {
conn.Close()
return err
}
@ -113,10 +98,12 @@ func (s *Source) runReader(u *url.URL, nconn net.Conn) error {
medias, err := rtmp.ToStream(r, &stream)
if err != nil {
conn.Close()
return err
}
if len(medias) == 0 {
conn.Close()
return fmt.Errorf("no supported tracks found")
}
@ -125,6 +112,7 @@ func (s *Source) runReader(u *url.URL, nconn net.Conn) error {
GenerateRTPPackets: true,
})
if res.Err != nil {
conn.Close()
return res.Err
}
@ -132,15 +120,26 @@ func (s *Source) runReader(u *url.URL, nconn net.Conn) error {
stream = res.Stream
// disable write deadline to allow outgoing acknowledges
nconn.SetWriteDeadline(time.Time{})
for {
nconn.SetReadDeadline(time.Now().Add(time.Duration(s.ReadTimeout)))
err := r.Read()
if err != nil {
return err
readerErr := make(chan error)
go func() {
for {
conn.NetConn().SetReadDeadline(time.Now().Add(time.Duration(s.ReadTimeout)))
err := r.Read()
if err != nil {
readerErr <- err
return
}
}
}()
select {
case <-ctx.Done():
conn.Close()
<-readerErr
return fmt.Errorf("terminated")
case err := <-readerErr:
return err
}
}