RTSP server: support reading with multicast (#214) (#324)

This commit is contained in:
aler9 2021-06-15 22:15:51 +02:00
parent d21841c1b7
commit 75e1e3e4dc
26 changed files with 338 additions and 591 deletions

View file

@ -64,6 +64,7 @@ test-nodocker: test-internal test-root
test: test:
echo "$$DOCKERFILE_TEST" | docker build -q . -f - -t temp echo "$$DOCKERFILE_TEST" | docker build -q . -f - -t temp
docker run --rm \ docker run --rm \
--network=host \
-v /var/run/docker.sock:/var/run/docker.sock:ro \ -v /var/run/docker.sock:/var/run/docker.sock:ro \
temp \ temp \
make test-nodocker make test-nodocker

View file

@ -18,7 +18,7 @@ _rtsp-simple-server_ is a simple, ready-to-use and zero-dependency RTSP / RTMP /
Features: Features:
* Publish live streams with RTSP (UDP, TCP or TLS mode) or RTMP * Publish live streams with RTSP (UDP, TCP or TLS mode) or RTMP
* Read live streams with RTSP, RTMP or HLS * Read live streams with RTSP (UDP, UDP-multicast, TCP or TLS mode), RTMP or HLS
* Pull and serve streams from other RTSP or RTMP servers or cameras, always or on-demand (RTSP proxy) * Pull and serve streams from other RTSP or RTMP servers or cameras, always or on-demand (RTSP proxy)
* Streams are automatically converted from a protocol to another (for instance, it's possible to publish with RTSP and read with HLS) * Streams are automatically converted from a protocol to another (for instance, it's possible to publish with RTSP and read with HLS)
* Each stream can have multiple video and audio tracks, encoded with any codec (including H264, H265, VP8, VP9, MPEG2, MP3, AAC, Opus, PCM, JPEG) * Each stream can have multiple video and audio tracks, encoded with any codec (including H264, H265, VP8, VP9, MPEG2, MP3, AAC, Opus, PCM, JPEG)

2
go.mod
View file

@ -5,7 +5,7 @@ go 1.15
require ( require (
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 // indirect github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 // indirect
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d // indirect github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d // indirect
github.com/aler9/gortsplib v0.0.0-20210603214139-363871d65898 github.com/aler9/gortsplib v0.0.0-20210617144524-db28e87ecb0c
github.com/asticode/go-astits v0.0.0-00010101000000-000000000000 github.com/asticode/go-astits v0.0.0-00010101000000-000000000000
github.com/davecgh/go-spew v1.1.1 // indirect github.com/davecgh/go-spew v1.1.1 // indirect
github.com/fsnotify/fsnotify v1.4.9 github.com/fsnotify/fsnotify v1.4.9

18
go.sum
View file

@ -4,13 +4,12 @@ github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d h1:UQZhZ2O0vMHr2c
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho= github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho=
github.com/aler9/go-astits v0.0.0-20210423195926-582b09ed7c04 h1:CXgQLsU4uxWAmsXNOjGLbj0A+0IlRcpZpMgI13fmVwo= github.com/aler9/go-astits v0.0.0-20210423195926-582b09ed7c04 h1:CXgQLsU4uxWAmsXNOjGLbj0A+0IlRcpZpMgI13fmVwo=
github.com/aler9/go-astits v0.0.0-20210423195926-582b09ed7c04/go.mod h1:DkOWmBNQpnr9mv24KfZjq4JawCFX1FCqjLVGvO0DygQ= github.com/aler9/go-astits v0.0.0-20210423195926-582b09ed7c04/go.mod h1:DkOWmBNQpnr9mv24KfZjq4JawCFX1FCqjLVGvO0DygQ=
github.com/aler9/gortsplib v0.0.0-20210603214139-363871d65898 h1:Qw3xa+fdWVF0eHhZ/ntET1q24y5uynLFIllYAWbkeTU= github.com/aler9/gortsplib v0.0.0-20210617144524-db28e87ecb0c h1:IqV2N1yifhnVPafY8SknenVL6k66gGa5jhrujcbjl5Q=
github.com/aler9/gortsplib v0.0.0-20210603214139-363871d65898/go.mod h1:zVCg+TQX445hh1pC5QgAuuBvvXZMWLY1XYz626dGFqY= github.com/aler9/gortsplib v0.0.0-20210617144524-db28e87ecb0c/go.mod h1:ozu0NvgZMhb4AT6VdyV6OfmgPviSiZImRkaTwW1nEKc=
github.com/aler9/rtmp v0.0.0-20210403095203-3be4a5535927 h1:95mXJ5fUCYpBRdSOnLAQAdJHHKxxxJrVCiaqDi965YQ= github.com/aler9/rtmp v0.0.0-20210403095203-3be4a5535927 h1:95mXJ5fUCYpBRdSOnLAQAdJHHKxxxJrVCiaqDi965YQ=
github.com/aler9/rtmp v0.0.0-20210403095203-3be4a5535927/go.mod h1:vzuE21rowz+lT1NGsWbreIvYulgBpCGnQyeTyFblUHc= github.com/aler9/rtmp v0.0.0-20210403095203-3be4a5535927/go.mod h1:vzuE21rowz+lT1NGsWbreIvYulgBpCGnQyeTyFblUHc=
github.com/asticode/go-astikit v0.20.0 h1:+7N+J4E4lWx2QOkRdOf6DafWJMv6O4RRfgClwQokrH8= github.com/asticode/go-astikit v0.20.0 h1:+7N+J4E4lWx2QOkRdOf6DafWJMv6O4RRfgClwQokrH8=
github.com/asticode/go-astikit v0.20.0/go.mod h1:h4ly7idim1tNhaVkdVBeXQZEE3L0xblP7fCWbgwipF0= github.com/asticode/go-astikit v0.20.0/go.mod h1:h4ly7idim1tNhaVkdVBeXQZEE3L0xblP7fCWbgwipF0=
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
@ -28,7 +27,6 @@ github.com/pion/randutil v0.1.0 h1:CFG1UdESneORglEsnimhUjf33Rwjubwj6xfiOXBa3mA=
github.com/pion/randutil v0.1.0/go.mod h1:XcJrSMMbbMRhASFVOlj/5hQial/Y8oH/HVo7TBZq+j8= github.com/pion/randutil v0.1.0/go.mod h1:XcJrSMMbbMRhASFVOlj/5hQial/Y8oH/HVo7TBZq+j8=
github.com/pion/rtcp v1.2.4 h1:NT3H5LkUGgaEapvp0HGik+a+CpflRF7KTD7H+o7OWIM= github.com/pion/rtcp v1.2.4 h1:NT3H5LkUGgaEapvp0HGik+a+CpflRF7KTD7H+o7OWIM=
github.com/pion/rtcp v1.2.4/go.mod h1:52rMNPWFsjr39z9B9MhnkqhPLoeHTv1aN63o/42bWE0= github.com/pion/rtcp v1.2.4/go.mod h1:52rMNPWFsjr39z9B9MhnkqhPLoeHTv1aN63o/42bWE0=
github.com/pion/rtp v1.6.1 h1:2Y2elcVBrahYnHKN2X7rMHX/r1R4TEBMP1LaVu/wNhk=
github.com/pion/rtp v1.6.1/go.mod h1:bDb5n+BFZxXx0Ea7E5qe+klMuqiBrP+w8XSjiWtCUko= github.com/pion/rtp v1.6.1/go.mod h1:bDb5n+BFZxXx0Ea7E5qe+klMuqiBrP+w8XSjiWtCUko=
github.com/pion/rtp v1.6.2 h1:iGBerLX6JiDjB9NXuaPzHyxHFG9JsIEdgwTC0lp5n/U= github.com/pion/rtp v1.6.2 h1:iGBerLX6JiDjB9NXuaPzHyxHFG9JsIEdgwTC0lp5n/U=
github.com/pion/rtp v1.6.2/go.mod h1:bDb5n+BFZxXx0Ea7E5qe+klMuqiBrP+w8XSjiWtCUko= github.com/pion/rtp v1.6.2/go.mod h1:bDb5n+BFZxXx0Ea7E5qe+klMuqiBrP+w8XSjiWtCUko=
@ -38,7 +36,6 @@ github.com/pkg/profile v1.4.0/go.mod h1:NWz/XGvpEW1FyYQ7fCx4dqYBLlfTcE+A9FLAkNKq
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0= github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
@ -48,20 +45,25 @@ golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACk
golang.org/x/crypto v0.0.0-20201221181555-eec23a3978ad h1:DN0cp81fZ3njFcrLCytUHRSUkqBjfTo4Tx9RJTWs0EY= golang.org/x/crypto v0.0.0-20201221181555-eec23a3978ad h1:DN0cp81fZ3njFcrLCytUHRSUkqBjfTo4Tx9RJTWs0EY=
golang.org/x/crypto v0.0.0-20201221181555-eec23a3978ad/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I= golang.org/x/crypto v0.0.0-20201221181555-eec23a3978ad/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20210610132358-84b48f89b13b h1:k+E048sYJHyVnsr1GDrRZWQ32D2C7lWs9JRc0bel53A=
golang.org/x/net v0.0.0-20210610132358-84b48f89b13b/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9 h1:L2auWcuQIvxz9xSEqzESnV/QN/gNRXNApHi3fYwl2w0=
golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191026070338-33540a1f6037 h1:YyJpGZS1sBuBCzLAR1VEpK193GlqGZbnPFnPV/5Rsb4=
golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44 h1:Bli41pIlzTzf3KEY06n+xnzK/BESIg2ze4Pgfh/aI8c= golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44 h1:Bli41pIlzTzf3KEY06n+xnzK/BESIg2ze4Pgfh/aI8c=
golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210423082822-04245dca01da h1:b3NXsE2LusjYGGjL5bxEVZZORm/YEFFrWFjR8eFrw/c=
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
gopkg.in/alecthomas/kingpin.v2 v2.2.6 h1:jMFz6MfLP0/4fUyZle81rXUoxOBFi19VUFKVDOQfozc= gopkg.in/alecthomas/kingpin.v2 v2.2.6 h1:jMFz6MfLP0/4fUyZle81rXUoxOBFi19VUFKVDOQfozc=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10= gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10=
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=

View file

@ -7,7 +7,6 @@ import (
"os" "os"
"time" "time"
"github.com/aler9/gortsplib"
"github.com/aler9/gortsplib/pkg/headers" "github.com/aler9/gortsplib/pkg/headers"
"golang.org/x/crypto/nacl/secretbox" "golang.org/x/crypto/nacl/secretbox"
"gopkg.in/yaml.v2" "gopkg.in/yaml.v2"
@ -26,6 +25,16 @@ const (
EncryptionStrict EncryptionStrict
) )
// Protocol is a RTSP protocol
type Protocol int
// RTSP protocols.
const (
ProtocolUDP Protocol = iota
ProtocolMulticast
ProtocolTCP
)
func decrypt(key string, byts []byte) ([]byte, error) { func decrypt(key string, byts []byte) ([]byte, error) {
enc, err := base64.StdEncoding.DecodeString(string(byts)) enc, err := base64.StdEncoding.DecodeString(string(byts))
if err != nil { if err != nil {
@ -64,20 +73,20 @@ type Conf struct {
RunOnConnectRestart bool `yaml:"runOnConnectRestart"` RunOnConnectRestart bool `yaml:"runOnConnectRestart"`
// rtsp // rtsp
RTSPDisable bool `yaml:"rtspDisable"` RTSPDisable bool `yaml:"rtspDisable"`
Protocols []string `yaml:"protocols"` Protocols []string `yaml:"protocols"`
ProtocolsParsed map[gortsplib.StreamProtocol]struct{} `yaml:"-" json:"-"` ProtocolsParsed map[Protocol]struct{} `yaml:"-" json:"-"`
Encryption string `yaml:"encryption"` Encryption string `yaml:"encryption"`
EncryptionParsed Encryption `yaml:"-" json:"-"` EncryptionParsed Encryption `yaml:"-" json:"-"`
RTSPAddress string `yaml:"rtspAddress"` RTSPAddress string `yaml:"rtspAddress"`
RTSPSAddress string `yaml:"rtspsAddress"` RTSPSAddress string `yaml:"rtspsAddress"`
RTPAddress string `yaml:"rtpAddress"` RTPAddress string `yaml:"rtpAddress"`
RTCPAddress string `yaml:"rtcpAddress"` RTCPAddress string `yaml:"rtcpAddress"`
ServerKey string `yaml:"serverKey"` ServerKey string `yaml:"serverKey"`
ServerCert string `yaml:"serverCert"` ServerCert string `yaml:"serverCert"`
AuthMethods []string `yaml:"authMethods"` AuthMethods []string `yaml:"authMethods"`
AuthMethodsParsed []headers.AuthMethod `yaml:"-" json:"-"` AuthMethodsParsed []headers.AuthMethod `yaml:"-" json:"-"`
ReadBufferSize int `yaml:"readBufferSize"` ReadBufferSize int `yaml:"readBufferSize"`
// rtmp // rtmp
RTMPDisable bool `yaml:"rtmpDisable"` RTMPDisable bool `yaml:"rtmpDisable"`
@ -153,16 +162,19 @@ func (conf *Conf) fillAndCheck() error {
} }
if len(conf.Protocols) == 0 { if len(conf.Protocols) == 0 {
conf.Protocols = []string{"udp", "tcp"} conf.Protocols = []string{"udp", "multicast", "tcp"}
} }
conf.ProtocolsParsed = make(map[gortsplib.StreamProtocol]struct{}) conf.ProtocolsParsed = make(map[Protocol]struct{})
for _, proto := range conf.Protocols { for _, proto := range conf.Protocols {
switch proto { switch proto {
case "udp": case "udp":
conf.ProtocolsParsed[gortsplib.StreamProtocolUDP] = struct{}{} conf.ProtocolsParsed[ProtocolUDP] = struct{}{}
case "multicast":
conf.ProtocolsParsed[ProtocolMulticast] = struct{}{}
case "tcp": case "tcp":
conf.ProtocolsParsed[gortsplib.StreamProtocolTCP] = struct{}{} conf.ProtocolsParsed[ProtocolTCP] = struct{}{}
default: default:
return fmt.Errorf("unsupported protocol: %s", proto) return fmt.Errorf("unsupported protocol: %s", proto)
@ -185,7 +197,7 @@ func (conf *Conf) fillAndCheck() error {
case "strict", "yes", "true": case "strict", "yes", "true":
conf.EncryptionParsed = EncryptionStrict conf.EncryptionParsed = EncryptionStrict
if _, ok := conf.ProtocolsParsed[gortsplib.StreamProtocolUDP]; ok { if _, ok := conf.ProtocolsParsed[ProtocolUDP]; ok {
return fmt.Errorf("encryption can't be used with the UDP stream protocol") return fmt.Errorf("encryption can't be used with the UDP stream protocol")
} }

View file

@ -9,7 +9,6 @@ import (
"strings" "strings"
"time" "time"
"github.com/aler9/gortsplib"
"github.com/aler9/gortsplib/pkg/base" "github.com/aler9/gortsplib/pkg/base"
) )
@ -69,17 +68,17 @@ type PathConf struct {
Regexp *regexp.Regexp `yaml:"-" json:"-"` Regexp *regexp.Regexp `yaml:"-" json:"-"`
// source // source
Source string `yaml:"source"` Source string `yaml:"source"`
SourceProtocol string `yaml:"sourceProtocol"` SourceProtocol string `yaml:"sourceProtocol"`
SourceProtocolParsed *gortsplib.StreamProtocol `yaml:"-" json:"-"` SourceProtocolParsed *base.StreamProtocol `yaml:"-" json:"-"`
SourceAnyPortEnable bool `yaml:"sourceAnyPortEnable"` SourceAnyPortEnable bool `yaml:"sourceAnyPortEnable"`
SourceFingerprint string `yaml:"sourceFingerprint" json:"sourceFingerprint"` SourceFingerprint string `yaml:"sourceFingerprint" json:"sourceFingerprint"`
SourceOnDemand bool `yaml:"sourceOnDemand"` SourceOnDemand bool `yaml:"sourceOnDemand"`
SourceOnDemandStartTimeout time.Duration `yaml:"sourceOnDemandStartTimeout"` SourceOnDemandStartTimeout time.Duration `yaml:"sourceOnDemandStartTimeout"`
SourceOnDemandCloseAfter time.Duration `yaml:"sourceOnDemandCloseAfter"` SourceOnDemandCloseAfter time.Duration `yaml:"sourceOnDemandCloseAfter"`
SourceRedirect string `yaml:"sourceRedirect"` SourceRedirect string `yaml:"sourceRedirect"`
DisablePublisherOverride bool `yaml:"disablePublisherOverride"` DisablePublisherOverride bool `yaml:"disablePublisherOverride"`
Fallback string `yaml:"fallback"` Fallback string `yaml:"fallback"`
// authentication // authentication
PublishUser string `yaml:"publishUser"` PublishUser string `yaml:"publishUser"`
@ -149,11 +148,11 @@ func (pconf *PathConf) fillAndCheck(name string) error {
switch pconf.SourceProtocol { switch pconf.SourceProtocol {
case "udp": case "udp":
v := gortsplib.StreamProtocolUDP v := base.StreamProtocolUDP
pconf.SourceProtocolParsed = &v pconf.SourceProtocolParsed = &v
case "tcp": case "tcp":
v := gortsplib.StreamProtocolTCP v := base.StreamProtocolTCP
pconf.SourceProtocolParsed = &v pconf.SourceProtocolParsed = &v
case "automatic": case "automatic":

View file

@ -267,7 +267,7 @@ func (c *Converter) runInner(innerCtx context.Context) error {
var aacConfig rtpaac.MPEG4AudioConfig var aacConfig rtpaac.MPEG4AudioConfig
var aacDecoder *rtpaac.Decoder var aacDecoder *rtpaac.Decoder
for i, t := range res.Tracks { for i, t := range res.Stream.Tracks() {
if t.IsH264() { if t.IsH264() {
if videoTrack != nil { if videoTrack != nil {
return fmt.Errorf("can't read track %d with HLS: too many tracks", i+1) return fmt.Errorf("can't read track %d with HLS: too many tracks", i+1)

View file

@ -20,7 +20,6 @@ import (
"github.com/aler9/rtsp-simple-server/internal/rtspsource" "github.com/aler9/rtsp-simple-server/internal/rtspsource"
"github.com/aler9/rtsp-simple-server/internal/source" "github.com/aler9/rtsp-simple-server/internal/source"
"github.com/aler9/rtsp-simple-server/internal/stats" "github.com/aler9/rtsp-simple-server/internal/stats"
"github.com/aler9/rtsp-simple-server/internal/streamproc"
) )
func newEmptyTimer() *time.Timer { func newEmptyTimer() *time.Timer {
@ -35,6 +34,10 @@ type Parent interface {
OnPathClose(*Path) OnPathClose(*Path)
} }
type rtspSession interface {
IsRTSPSession()
}
type sourceRedirect struct{} type sourceRedirect struct{}
func (*sourceRedirect) IsSource() {} func (*sourceRedirect) IsSource() {}
@ -77,9 +80,8 @@ type Path struct {
describeRequests []readpublisher.DescribeReq describeRequests []readpublisher.DescribeReq
setupPlayRequests []readpublisher.SetupPlayReq setupPlayRequests []readpublisher.SetupPlayReq
source source.Source source source.Source
sourceTracks gortsplib.Tracks sourceStream *gortsplib.ServerStream
sp *streamproc.StreamProc nonRTSPReaders *readersMap
readers *readersMap
onDemandCmd *externalcmd.Cmd onDemandCmd *externalcmd.Cmd
describeTimer *time.Timer describeTimer *time.Timer
sourceCloseTimer *time.Timer sourceCloseTimer *time.Timer
@ -134,7 +136,7 @@ func New(
ctx: ctx, ctx: ctx,
ctxCancel: ctxCancel, ctxCancel: ctxCancel,
readPublishers: make(map[readpublisher.ReadPublisher]readPublisherState), readPublishers: make(map[readpublisher.ReadPublisher]readPublisherState),
readers: newReadersMap(), nonRTSPReaders: newReadersMap(),
describeTimer: newEmptyTimer(), describeTimer: newEmptyTimer(),
sourceCloseTimer: newEmptyTimer(), sourceCloseTimer: newEmptyTimer(),
runOnDemandCloseTimer: newEmptyTimer(), runOnDemandCloseTimer: newEmptyTimer(),
@ -224,10 +226,9 @@ outer:
break outer break outer
case req := <-pa.extSourceSetReady: case req := <-pa.extSourceSetReady:
pa.sourceTracks = req.Tracks pa.sourceStream = gortsplib.NewServerStream(req.Tracks)
pa.sp = streamproc.New(pa, len(req.Tracks))
pa.onSourceSetReady() pa.onSourceSetReady()
req.Res <- source.ExtSetReadyRes{SP: pa.sp} req.Res <- source.ExtSetReadyRes{}
case req := <-pa.extSourceSetNotReady: case req := <-pa.extSourceSetNotReady:
pa.onSourceSetNotReady() pa.onSourceSetNotReady()
@ -299,17 +300,20 @@ outer:
req.Res <- readpublisher.SetupPlayRes{Err: fmt.Errorf("terminated")} req.Res <- readpublisher.SetupPlayRes{Err: fmt.Errorf("terminated")}
} }
for c, state := range pa.readPublishers { for rp, state := range pa.readPublishers {
if state != readPublisherStatePreRemove { if state != readPublisherStatePreRemove {
switch state { switch state {
case readPublisherStatePlay: case readPublisherStatePlay:
atomic.AddInt64(pa.stats.CountReaders, -1) atomic.AddInt64(pa.stats.CountReaders, -1)
pa.readers.remove(c)
if _, ok := rp.(rtspSession); !ok {
pa.nonRTSPReaders.remove(rp)
}
case readPublisherStateRecord: case readPublisherStateRecord:
atomic.AddInt64(pa.stats.CountPublishers, -1) atomic.AddInt64(pa.stats.CountPublishers, -1)
} }
c.Close() rp.Close()
} }
} }
@ -372,28 +376,33 @@ func (pa *Path) addReadPublisher(c readpublisher.ReadPublisher, state readPublis
pa.readPublishers[c] = state pa.readPublishers[c] = state
} }
func (pa *Path) removeReadPublisher(c readpublisher.ReadPublisher) { func (pa *Path) removeReadPublisher(rp readpublisher.ReadPublisher) {
state := pa.readPublishers[c] state := pa.readPublishers[rp]
pa.readPublishers[c] = readPublisherStatePreRemove pa.readPublishers[rp] = readPublisherStatePreRemove
switch state { switch state {
case readPublisherStatePlay: case readPublisherStatePlay:
atomic.AddInt64(pa.stats.CountReaders, -1) atomic.AddInt64(pa.stats.CountReaders, -1)
pa.readers.remove(c)
if _, ok := rp.(rtspSession); !ok {
pa.nonRTSPReaders.remove(rp)
}
case readPublisherStateRecord: case readPublisherStateRecord:
atomic.AddInt64(pa.stats.CountPublishers, -1) atomic.AddInt64(pa.stats.CountPublishers, -1)
pa.onSourceSetNotReady() pa.onSourceSetNotReady()
} }
if pa.source == c { if pa.source == rp {
pa.source = nil pa.source = nil
pa.sourceStream.Close()
pa.sourceStream = nil
// close all readPublishers that are reading or waiting to read // close all readPublishers that are reading or waiting to read
for oc, state := range pa.readPublishers { for orp, state := range pa.readPublishers {
if state != readPublisherStatePreRemove { if state != readPublisherStatePreRemove {
pa.removeReadPublisher(oc) pa.removeReadPublisher(orp)
oc.Close() orp.Close()
} }
} }
} }
@ -412,7 +421,9 @@ func (pa *Path) onSourceSetReady() {
pa.sourceState = sourceStateReady pa.sourceState = sourceStateReady
for _, req := range pa.describeRequests { for _, req := range pa.describeRequests {
req.Res <- readpublisher.DescribeRes{pa.sourceTracks.Write(), "", nil} //nolint:govet req.Res <- readpublisher.DescribeRes{
Stream: pa.sourceStream,
}
} }
pa.describeRequests = nil pa.describeRequests = nil
@ -484,13 +495,17 @@ func (pa *Path) onReadPublisherDescribe(req readpublisher.DescribeReq) {
pa.scheduleClose() pa.scheduleClose()
if _, ok := pa.source.(*sourceRedirect); ok { if _, ok := pa.source.(*sourceRedirect); ok {
req.Res <- readpublisher.DescribeRes{nil, pa.conf.SourceRedirect, nil} //nolint:govet req.Res <- readpublisher.DescribeRes{
Redirect: pa.conf.SourceRedirect,
}
return return
} }
switch pa.sourceState { switch pa.sourceState {
case sourceStateReady: case sourceStateReady:
req.Res <- readpublisher.DescribeRes{pa.sourceTracks.Write(), "", nil} //nolint:govet req.Res <- readpublisher.DescribeRes{
Stream: pa.sourceStream,
}
return return
case sourceStateWaitingDescribe: case sourceStateWaitingDescribe:
@ -556,24 +571,21 @@ func (pa *Path) onReadPublisherSetupPlayPost(req readpublisher.SetupPlayReq) {
pa.addReadPublisher(req.Author, readPublisherStatePrePlay) pa.addReadPublisher(req.Author, readPublisherStatePrePlay)
} }
var ti []streamproc.TrackInfo
if pa.sp != nil {
ti = pa.sp.TrackInfos()
}
req.Res <- readpublisher.SetupPlayRes{ req.Res <- readpublisher.SetupPlayRes{
Path: pa, Path: pa,
Tracks: pa.sourceTracks, Stream: pa.sourceStream,
TrackInfos: ti,
} }
} }
func (pa *Path) onReadPublisherPlay(req readpublisher.PlayReq) { func (pa *Path) onReadPublisherPlay(req readpublisher.PlayReq) {
atomic.AddInt64(pa.stats.CountReaders, 1) atomic.AddInt64(pa.stats.CountReaders, 1)
pa.readPublishers[req.Author] = readPublisherStatePlay pa.readPublishers[req.Author] = readPublisherStatePlay
pa.readers.add(req.Author)
req.Res <- readpublisher.PlayRes{TrackInfos: pa.sp.TrackInfos()} if _, ok := req.Author.(rtspSession); !ok {
pa.nonRTSPReaders.add(req.Author)
}
req.Res <- readpublisher.PlayRes{}
} }
func (pa *Path) onReadPublisherAnnounce(req readpublisher.AnnounceReq) { func (pa *Path) onReadPublisherAnnounce(req readpublisher.AnnounceReq) {
@ -609,7 +621,7 @@ func (pa *Path) onReadPublisherAnnounce(req readpublisher.AnnounceReq) {
pa.addReadPublisher(req.Author, readPublisherStatePreRecord) pa.addReadPublisher(req.Author, readPublisherStatePreRecord)
pa.source = req.Author pa.source = req.Author
pa.sourceTracks = req.Tracks pa.sourceStream = gortsplib.NewServerStream(req.Tracks)
req.Res <- readpublisher.AnnounceRes{pa, nil} //nolint:govet req.Res <- readpublisher.AnnounceRes{pa, nil} //nolint:govet
} }
@ -623,9 +635,7 @@ func (pa *Path) onReadPublisherRecord(req readpublisher.RecordReq) {
pa.readPublishers[req.Author] = readPublisherStateRecord pa.readPublishers[req.Author] = readPublisherStateRecord
pa.onSourceSetReady() pa.onSourceSetReady()
pa.sp = streamproc.New(pa, len(pa.sourceTracks)) req.Res <- readpublisher.RecordRes{}
req.Res <- readpublisher.RecordRes{SP: pa.sp, Err: nil}
} }
func (pa *Path) onReadPublisherPause(req readpublisher.PauseReq) { func (pa *Path) onReadPublisherPause(req readpublisher.PauseReq) {
@ -638,7 +648,10 @@ func (pa *Path) onReadPublisherPause(req readpublisher.PauseReq) {
if state == readPublisherStatePlay { if state == readPublisherStatePlay {
atomic.AddInt64(pa.stats.CountReaders, -1) atomic.AddInt64(pa.stats.CountReaders, -1)
pa.readPublishers[req.Author] = readPublisherStatePrePlay pa.readPublishers[req.Author] = readPublisherStatePrePlay
pa.readers.remove(req.Author)
if _, ok := req.Author.(rtspSession); !ok {
pa.nonRTSPReaders.remove(req.Author)
}
} else if state == readPublisherStateRecord { } else if state == readPublisherStateRecord {
atomic.AddInt64(pa.stats.CountPublishers, -1) atomic.AddInt64(pa.stats.CountPublishers, -1)
@ -792,7 +805,9 @@ func (pa *Path) OnReadPublisherRemove(req readpublisher.RemoveReq) {
} }
} }
// OnSPFrame is called by streamproc.StreamProc. // OnFrame is called by a readpublisher
func (pa *Path) OnSPFrame(trackID int, streamType gortsplib.StreamType, payload []byte) { func (pa *Path) OnFrame(trackID int, streamType gortsplib.StreamType, payload []byte) {
pa.readers.forwardFrame(trackID, streamType, payload) pa.sourceStream.WriteFrame(trackID, streamType, payload)
pa.nonRTSPReaders.forwardFrame(trackID, streamType, payload)
} }

View file

@ -319,11 +319,7 @@ func (pm *PathManager) OnReadPublisherDescribe(req readpublisher.DescribeReq) {
select { select {
case pm.rpDescribe <- req: case pm.rpDescribe <- req:
case <-pm.ctx.Done(): case <-pm.ctx.Done():
req.Res <- readpublisher.DescribeRes{ req.Res <- readpublisher.DescribeRes{Err: fmt.Errorf("terminated")}
SDP: nil,
Redirect: "",
Err: fmt.Errorf("terminated"),
}
} }
} }
@ -332,10 +328,7 @@ func (pm *PathManager) OnReadPublisherAnnounce(req readpublisher.AnnounceReq) {
select { select {
case pm.rpAnnounce <- req: case pm.rpAnnounce <- req:
case <-pm.ctx.Done(): case <-pm.ctx.Done():
req.Res <- readpublisher.AnnounceRes{ req.Res <- readpublisher.AnnounceRes{Err: fmt.Errorf("terminated")}
Path: nil,
Err: fmt.Errorf("terminated"),
}
} }
} }
@ -344,11 +337,7 @@ func (pm *PathManager) OnReadPublisherSetupPlay(req readpublisher.SetupPlayReq)
select { select {
case pm.rpSetupPlay <- req: case pm.rpSetupPlay <- req:
case <-pm.ctx.Done(): case <-pm.ctx.Done():
req.Res <- readpublisher.SetupPlayRes{ req.Res <- readpublisher.SetupPlayRes{Err: fmt.Errorf("terminated")}
Path: nil,
Tracks: nil,
Err: fmt.Errorf("terminated"),
}
} }
} }

View file

@ -9,7 +9,6 @@ import (
"github.com/aler9/gortsplib/pkg/headers" "github.com/aler9/gortsplib/pkg/headers"
"github.com/aler9/rtsp-simple-server/internal/conf" "github.com/aler9/rtsp-simple-server/internal/conf"
"github.com/aler9/rtsp-simple-server/internal/streamproc"
) )
// Path is implemented by path.Path. // Path is implemented by path.Path.
@ -20,6 +19,7 @@ type Path interface {
OnReadPublisherPlay(PlayReq) OnReadPublisherPlay(PlayReq)
OnReadPublisherRecord(RecordReq) OnReadPublisherRecord(RecordReq)
OnReadPublisherPause(PauseReq) OnReadPublisherPause(PauseReq)
OnFrame(int, gortsplib.StreamType, []byte)
} }
// ErrNoOnePublishing is a "no one is publishing" error. // ErrNoOnePublishing is a "no one is publishing" error.
@ -63,7 +63,7 @@ type ReadPublisher interface {
// DescribeRes is a describe response. // DescribeRes is a describe response.
type DescribeRes struct { type DescribeRes struct {
SDP []byte Stream *gortsplib.ServerStream
Redirect string Redirect string
Err error Err error
} }
@ -79,10 +79,9 @@ type DescribeReq struct {
// SetupPlayRes is a setup/play response. // SetupPlayRes is a setup/play response.
type SetupPlayRes struct { type SetupPlayRes struct {
Path Path Path Path
Tracks gortsplib.Tracks Stream *gortsplib.ServerStream
TrackInfos []streamproc.TrackInfo Err error
Err error
} }
// SetupPlayReq is a setup/play request. // SetupPlayReq is a setup/play request.
@ -117,9 +116,7 @@ type RemoveReq struct {
} }
// PlayRes is a play response. // PlayRes is a play response.
type PlayRes struct { type PlayRes struct{}
TrackInfos []streamproc.TrackInfo
}
// PlayReq is a play request. // PlayReq is a play request.
type PlayReq struct { type PlayReq struct {
@ -129,7 +126,6 @@ type PlayReq struct {
// RecordRes is a record response. // RecordRes is a record response.
type RecordRes struct { type RecordRes struct {
SP *streamproc.StreamProc
Err error Err error
} }

View file

@ -238,7 +238,7 @@ func (c *Conn) runRead(ctx context.Context) error {
var audioClockRate int var audioClockRate int
var aacDecoder *rtpaac.Decoder var aacDecoder *rtpaac.Decoder
for i, t := range res.Tracks { for i, t := range res.Stream.Tracks() {
if t.IsH264() { if t.IsH264() {
if videoTrack != nil { if videoTrack != nil {
return fmt.Errorf("can't read track %d with RTMP: too many tracks", i+1) return fmt.Errorf("can't read track %d with RTMP: too many tracks", i+1)
@ -450,12 +450,12 @@ func (c *Conn) runPublish(ctx context.Context) error {
} }
}(c.path) }(c.path)
rtcpSenders := rtcpsenderset.New(tracks, rres.SP.OnFrame) rtcpSenders := rtcpsenderset.New(tracks, c.path.OnFrame)
defer rtcpSenders.Close() defer rtcpSenders.Close()
onFrame := func(trackID int, payload []byte) { onFrame := func(trackID int, payload []byte) {
rtcpSenders.OnFrame(trackID, gortsplib.StreamTypeRTP, payload) rtcpSenders.OnFrame(trackID, gortsplib.StreamTypeRTP, payload)
rres.SP.OnFrame(trackID, gortsplib.StreamTypeRTP, payload) c.path.OnFrame(trackID, gortsplib.StreamTypeRTP, payload)
} }
for { for {

View file

@ -29,6 +29,7 @@ type Parent interface {
Log(logger.Level, string, ...interface{}) Log(logger.Level, string, ...interface{})
OnExtSourceSetReady(req source.ExtSetReadyReq) OnExtSourceSetReady(req source.ExtSetReadyReq)
OnExtSourceSetNotReady(req source.ExtSetNotReadyReq) OnExtSourceSetNotReady(req source.ExtSetNotReadyReq)
OnFrame(int, gortsplib.StreamType, []byte)
} }
// Source is a RTMP external source. // Source is a RTMP external source.
@ -177,7 +178,7 @@ func (s *Source) runInner() bool {
Tracks: tracks, Tracks: tracks,
Res: cres, Res: cres,
}) })
res := <-cres <-cres
defer func() { defer func() {
res := make(chan struct{}) res := make(chan struct{})
@ -187,12 +188,12 @@ func (s *Source) runInner() bool {
<-res <-res
}() }()
rtcpSenders := rtcpsenderset.New(tracks, res.SP.OnFrame) rtcpSenders := rtcpsenderset.New(tracks, s.parent.OnFrame)
defer rtcpSenders.Close() defer rtcpSenders.Close()
onFrame := func(trackID int, payload []byte) { onFrame := func(trackID int, payload []byte) {
rtcpSenders.OnFrame(trackID, gortsplib.StreamTypeRTP, payload) rtcpSenders.OnFrame(trackID, gortsplib.StreamTypeRTP, payload)
res.SP.OnFrame(trackID, gortsplib.StreamTypeRTP, payload) s.parent.OnFrame(trackID, gortsplib.StreamTypeRTP, payload)
} }
for { for {

View file

@ -131,7 +131,7 @@ func (c *Conn) OnResponse(res *base.Response) {
} }
// OnDescribe is called by rtspserver.Server. // OnDescribe is called by rtspserver.Server.
func (c *Conn) OnDescribe(ctx *gortsplib.ServerHandlerOnDescribeCtx) (*base.Response, []byte, error) { func (c *Conn) OnDescribe(ctx *gortsplib.ServerHandlerOnDescribeCtx) (*base.Response, *gortsplib.ServerStream, error) {
resc := make(chan readpublisher.DescribeRes) resc := make(chan readpublisher.DescribeRes)
c.pathMan.OnReadPublisherDescribe(readpublisher.DescribeReq{ c.pathMan.OnReadPublisherDescribe(readpublisher.DescribeReq{
PathName: ctx.Path, PathName: ctx.Path,
@ -178,7 +178,7 @@ func (c *Conn) OnDescribe(ctx *gortsplib.ServerHandlerOnDescribeCtx) (*base.Resp
return &base.Response{ return &base.Response{
StatusCode: base.StatusOK, StatusCode: base.StatusOK,
}, res.SDP, nil }, res.Stream, nil
} }
// ValidateCredentials allows to validate the credentials of a path. // ValidateCredentials allows to validate the credentials of a path.

View file

@ -12,6 +12,7 @@ import (
"github.com/aler9/gortsplib" "github.com/aler9/gortsplib"
"github.com/aler9/gortsplib/pkg/base" "github.com/aler9/gortsplib/pkg/base"
"github.com/aler9/rtsp-simple-server/internal/conf"
"github.com/aler9/rtsp-simple-server/internal/logger" "github.com/aler9/rtsp-simple-server/internal/logger"
"github.com/aler9/rtsp-simple-server/internal/pathman" "github.com/aler9/rtsp-simple-server/internal/pathman"
"github.com/aler9/rtsp-simple-server/internal/rtspconn" "github.com/aler9/rtsp-simple-server/internal/rtspconn"
@ -53,7 +54,7 @@ type Server struct {
readTimeout time.Duration readTimeout time.Duration
isTLS bool isTLS bool
rtspAddress string rtspAddress string
protocols map[base.StreamProtocol]struct{} protocols map[conf.Protocol]struct{}
runOnConnect string runOnConnect string
runOnConnectRestart bool runOnConnectRestart bool
stats *stats.Stats stats *stats.Stats
@ -84,7 +85,7 @@ func New(
serverCert string, serverCert string,
serverKey string, serverKey string,
rtspAddress string, rtspAddress string,
protocols map[base.StreamProtocol]struct{}, protocols map[conf.Protocol]struct{},
runOnConnect string, runOnConnect string,
runOnConnectRestart bool, runOnConnectRestart bool,
stats *stats.Stats, stats *stats.Stats,
@ -274,7 +275,7 @@ func (s *Server) OnSessionClose(ctx *gortsplib.ServerHandlerOnSessionCloseCtx) {
} }
// OnDescribe implements gortsplib.ServerHandlerOnDescribe. // OnDescribe implements gortsplib.ServerHandlerOnDescribe.
func (s *Server) OnDescribe(ctx *gortsplib.ServerHandlerOnDescribeCtx) (*base.Response, []byte, error) { func (s *Server) OnDescribe(ctx *gortsplib.ServerHandlerOnDescribeCtx) (*base.Response, *gortsplib.ServerStream, error) {
s.mutex.RLock() s.mutex.RLock()
c := s.conns[ctx.Conn] c := s.conns[ctx.Conn]
s.mutex.RUnlock() s.mutex.RUnlock()
@ -291,7 +292,7 @@ func (s *Server) OnAnnounce(ctx *gortsplib.ServerHandlerOnAnnounceCtx) (*base.Re
} }
// OnSetup implements gortsplib.ServerHandlerOnSetup. // OnSetup implements gortsplib.ServerHandlerOnSetup.
func (s *Server) OnSetup(ctx *gortsplib.ServerHandlerOnSetupCtx) (*base.Response, *uint32, error) { func (s *Server) OnSetup(ctx *gortsplib.ServerHandlerOnSetupCtx) (*base.Response, *gortsplib.ServerStream, error) {
s.mutex.RLock() s.mutex.RLock()
c := s.conns[ctx.Conn] c := s.conns[ctx.Conn]
se := s.sessions[ctx.Session] se := s.sessions[ctx.Session]

View file

@ -4,18 +4,17 @@ import (
"errors" "errors"
"fmt" "fmt"
"net" "net"
"strconv"
"time" "time"
"github.com/aler9/gortsplib" "github.com/aler9/gortsplib"
"github.com/aler9/gortsplib/pkg/base" "github.com/aler9/gortsplib/pkg/base"
"github.com/aler9/gortsplib/pkg/headers" "github.com/aler9/gortsplib/pkg/headers"
"github.com/aler9/rtsp-simple-server/internal/conf"
"github.com/aler9/rtsp-simple-server/internal/externalcmd" "github.com/aler9/rtsp-simple-server/internal/externalcmd"
"github.com/aler9/rtsp-simple-server/internal/logger" "github.com/aler9/rtsp-simple-server/internal/logger"
"github.com/aler9/rtsp-simple-server/internal/readpublisher" "github.com/aler9/rtsp-simple-server/internal/readpublisher"
"github.com/aler9/rtsp-simple-server/internal/rtspconn" "github.com/aler9/rtsp-simple-server/internal/rtspconn"
"github.com/aler9/rtsp-simple-server/internal/streamproc"
) )
const ( const (
@ -36,7 +35,7 @@ type Parent interface {
// Session is a RTSP server-side session. // Session is a RTSP server-side session.
type Session struct { type Session struct {
rtspAddress string rtspAddress string
protocols map[gortsplib.StreamProtocol]struct{} protocols map[conf.Protocol]struct{}
visualID string visualID string
ss *gortsplib.ServerSession ss *gortsplib.ServerSession
pathMan PathMan pathMan PathMan
@ -45,14 +44,13 @@ type Session struct {
path readpublisher.Path path readpublisher.Path
setuppedTracks map[int]*gortsplib.Track // read setuppedTracks map[int]*gortsplib.Track // read
onReadCmd *externalcmd.Cmd // read onReadCmd *externalcmd.Cmd // read
sp *streamproc.StreamProc // publish
onPublishCmd *externalcmd.Cmd // publish onPublishCmd *externalcmd.Cmd // publish
} }
// New allocates a Session. // New allocates a Session.
func New( func New(
rtspAddress string, rtspAddress string,
protocols map[gortsplib.StreamProtocol]struct{}, protocols map[conf.Protocol]struct{},
visualID string, visualID string,
ss *gortsplib.ServerSession, ss *gortsplib.ServerSession,
sc *gortsplib.ServerConn, sc *gortsplib.ServerConn,
@ -107,11 +105,21 @@ func (s *Session) IsReadPublisher() {}
// IsSource implements source.Source. // IsSource implements source.Source.
func (s *Session) IsSource() {} func (s *Session) IsSource() {}
// IsRTSPSession implements path.rtspSession.
func (s *Session) IsRTSPSession() {}
// VisualID returns the visual ID of the session. // VisualID returns the visual ID of the session.
func (s *Session) VisualID() string { func (s *Session) VisualID() string {
return s.visualID return s.visualID
} }
func (s *Session) displayedProtocol() string {
if *s.ss.SetuppedDelivery() == base.StreamDeliveryMulticast {
return "UDP-multicast"
}
return s.ss.SetuppedProtocol().String()
}
func (s *Session) log(level logger.Level, format string, args ...interface{}) { func (s *Session) log(level logger.Level, format string, args ...interface{}) {
s.parent.Log(level, "[session %s] "+format, append([]interface{}{s.visualID}, args...)...) s.parent.Log(level, "[session %s] "+format, append([]interface{}{s.visualID}, args...)...)
} }
@ -157,19 +165,25 @@ func (s *Session) OnAnnounce(c *rtspconn.Conn, ctx *gortsplib.ServerHandlerOnAnn
} }
// OnSetup is called by rtspserver.Server. // OnSetup is called by rtspserver.Server.
func (s *Session) OnSetup(c *rtspconn.Conn, ctx *gortsplib.ServerHandlerOnSetupCtx) (*base.Response, *uint32, error) { func (s *Session) OnSetup(c *rtspconn.Conn, ctx *gortsplib.ServerHandlerOnSetupCtx) (*base.Response, *gortsplib.ServerStream, error) {
if ctx.Transport.Protocol == gortsplib.StreamProtocolUDP { if ctx.Transport.Protocol == base.StreamProtocolUDP {
if _, ok := s.protocols[gortsplib.StreamProtocolUDP]; !ok { if _, ok := s.protocols[conf.ProtocolUDP]; !ok {
return &base.Response{ return &base.Response{
StatusCode: base.StatusUnsupportedTransport, StatusCode: base.StatusUnsupportedTransport,
}, nil, nil }, nil, nil
} }
} else {
if _, ok := s.protocols[gortsplib.StreamProtocolTCP]; !ok { if ctx.Transport.Delivery != nil && *ctx.Transport.Delivery == base.StreamDeliveryMulticast {
return &base.Response{ if _, ok := s.protocols[conf.ProtocolMulticast]; !ok {
StatusCode: base.StatusUnsupportedTransport, return &base.Response{
}, nil, nil StatusCode: base.StatusUnsupportedTransport,
}, nil, nil
}
} }
} else if _, ok := s.protocols[conf.ProtocolTCP]; !ok {
return &base.Response{
StatusCode: base.StatusUnsupportedTransport,
}, nil, nil
} }
switch s.ss.State() { switch s.ss.State() {
@ -211,7 +225,7 @@ func (s *Session) OnSetup(c *rtspconn.Conn, ctx *gortsplib.ServerHandlerOnSetupC
s.path = res.Path s.path = res.Path
if ctx.TrackID >= len(res.Tracks) { if ctx.TrackID >= len(res.Stream.Tracks()) {
return &base.Response{ return &base.Response{
StatusCode: base.StatusBadRequest, StatusCode: base.StatusBadRequest,
}, nil, fmt.Errorf("track %d does not exist", ctx.TrackID) }, nil, fmt.Errorf("track %d does not exist", ctx.TrackID)
@ -220,21 +234,17 @@ func (s *Session) OnSetup(c *rtspconn.Conn, ctx *gortsplib.ServerHandlerOnSetupC
if s.setuppedTracks == nil { if s.setuppedTracks == nil {
s.setuppedTracks = make(map[int]*gortsplib.Track) s.setuppedTracks = make(map[int]*gortsplib.Track)
} }
s.setuppedTracks[ctx.TrackID] = res.Tracks[ctx.TrackID] s.setuppedTracks[ctx.TrackID] = res.Stream.Tracks()[ctx.TrackID]
var ssrc *uint32
if res.TrackInfos != nil && res.TrackInfos[ctx.TrackID].LastSSRC != 0 {
ssrc = &res.TrackInfos[ctx.TrackID].LastSSRC
}
return &base.Response{ return &base.Response{
StatusCode: base.StatusOK, StatusCode: base.StatusOK,
}, ssrc, nil }, res.Stream, nil
}
return &base.Response{ default: // record
StatusCode: base.StatusOK, return &base.Response{
}, nil, nil StatusCode: base.StatusOK,
}, nil, nil
}
} }
// OnPlay is called by rtspserver.Server. // OnPlay is called by rtspserver.Server.
@ -250,7 +260,7 @@ func (s *Session) OnPlay(ctx *gortsplib.ServerHandlerOnPlayCtx) (*base.Response,
resc := make(chan readpublisher.PlayRes) resc := make(chan readpublisher.PlayRes)
s.path.OnReadPublisherPlay(readpublisher.PlayReq{s, resc}) //nolint:govet s.path.OnReadPublisherPlay(readpublisher.PlayReq{s, resc}) //nolint:govet
res := <-resc <-resc
tracksLen := len(s.ss.SetuppedTracks()) tracksLen := len(s.ss.SetuppedTracks())
@ -263,7 +273,7 @@ func (s *Session) OnPlay(ctx *gortsplib.ServerHandlerOnPlayCtx) (*base.Response,
} }
return "tracks" return "tracks"
}(), }(),
*s.ss.StreamProtocol()) s.displayedProtocol())
if s.path.Conf().RunOnRead != "" { if s.path.Conf().RunOnRead != "" {
_, port, _ := net.SplitHostPort(s.rtspAddress) _, port, _ := net.SplitHostPort(s.rtspAddress)
@ -272,40 +282,6 @@ func (s *Session) OnPlay(ctx *gortsplib.ServerHandlerOnPlayCtx) (*base.Response,
Port: port, Port: port,
}) })
} }
// add RTP-Info
var ri headers.RTPInfo
for trackID, ti := range res.TrackInfos {
if ti.LastTimeNTP == 0 {
continue
}
track, ok := s.setuppedTracks[trackID]
if !ok {
continue
}
u := &base.URL{
Scheme: ctx.Req.URL.Scheme,
User: ctx.Req.URL.User,
Host: ctx.Req.URL.Host,
Path: "/" + s.path.Name() + "/trackID=" + strconv.FormatInt(int64(trackID), 10),
}
clockRate, _ := track.ClockRate()
ts := uint32(uint64(ti.LastTimeRTP) +
uint64(time.Since(time.Unix(ti.LastTimeNTP, 0)).Seconds()*float64(clockRate)))
lsn := ti.LastSequenceNumber
ri = append(ri, &headers.RTPInfoEntry{
URL: u.String(),
SequenceNumber: &lsn,
Timestamp: &ts,
})
}
if len(ri) > 0 {
h["RTP-Info"] = ri.Write()
}
} }
return &base.Response{ return &base.Response{
@ -332,8 +308,6 @@ func (s *Session) OnRecord(ctx *gortsplib.ServerHandlerOnRecordCtx) (*base.Respo
}, res.Err }, res.Err
} }
s.sp = res.SP
tracksLen := len(s.ss.AnnouncedTracks()) tracksLen := len(s.ss.AnnouncedTracks())
s.log(logger.Info, "is publishing to path '%s', %d %s with %s", s.log(logger.Info, "is publishing to path '%s', %d %s with %s",
@ -345,7 +319,7 @@ func (s *Session) OnRecord(ctx *gortsplib.ServerHandlerOnRecordCtx) (*base.Respo
} }
return "tracks" return "tracks"
}(), }(),
*s.ss.StreamProtocol()) s.displayedProtocol())
if s.path.Conf().RunOnPublish != "" { if s.path.Conf().RunOnPublish != "" {
_, port, _ := net.SplitHostPort(s.rtspAddress) _, port, _ := net.SplitHostPort(s.rtspAddress)
@ -389,10 +363,6 @@ func (s *Session) OnPause(ctx *gortsplib.ServerHandlerOnPauseCtx) (*base.Respons
// OnFrame implements path.Reader. // OnFrame implements path.Reader.
func (s *Session) OnFrame(trackID int, streamType gortsplib.StreamType, payload []byte) { func (s *Session) OnFrame(trackID int, streamType gortsplib.StreamType, payload []byte) {
if _, ok := s.ss.SetuppedTracks()[trackID]; !ok {
return
}
s.ss.WriteFrame(trackID, streamType, payload) s.ss.WriteFrame(trackID, streamType, payload)
} }
@ -402,5 +372,5 @@ func (s *Session) OnIncomingFrame(ctx *gortsplib.ServerHandlerOnFrameCtx) {
return return
} }
s.sp.OnFrame(ctx.TrackID, ctx.StreamType, ctx.Payload) s.path.OnFrame(ctx.TrackID, ctx.StreamType, ctx.Payload)
} }

View file

@ -28,12 +28,13 @@ type Parent interface {
Log(logger.Level, string, ...interface{}) Log(logger.Level, string, ...interface{})
OnExtSourceSetReady(req source.ExtSetReadyReq) OnExtSourceSetReady(req source.ExtSetReadyReq)
OnExtSourceSetNotReady(req source.ExtSetNotReadyReq) OnExtSourceSetNotReady(req source.ExtSetNotReadyReq)
OnFrame(int, gortsplib.StreamType, []byte)
} }
// Source is a RTSP external source. // Source is a RTSP external source.
type Source struct { type Source struct {
ur string ur string
proto *gortsplib.StreamProtocol proto *base.StreamProtocol
anyPortEnable bool anyPortEnable bool
fingerprint string fingerprint string
readTimeout time.Duration readTimeout time.Duration
@ -52,7 +53,7 @@ type Source struct {
func New( func New(
ctxParent context.Context, ctxParent context.Context,
ur string, ur string,
proto *gortsplib.StreamProtocol, proto *base.StreamProtocol,
anyPortEnable bool, anyPortEnable bool,
fingerprint string, fingerprint string,
readTimeout time.Duration, readTimeout time.Duration,
@ -197,7 +198,7 @@ func (s *Source) runInner() bool {
Tracks: conn.Tracks(), Tracks: conn.Tracks(),
Res: cres, Res: cres,
}) })
res := <-cres <-cres
defer func() { defer func() {
res := make(chan struct{}) res := make(chan struct{})
@ -210,7 +211,7 @@ func (s *Source) runInner() bool {
readErr := make(chan error) readErr := make(chan error)
go func() { go func() {
readErr <- conn.ReadFrames(func(trackID int, streamType gortsplib.StreamType, payload []byte) { readErr <- conn.ReadFrames(func(trackID int, streamType gortsplib.StreamType, payload []byte) {
res.SP.OnFrame(trackID, streamType, payload) s.parent.OnFrame(trackID, streamType, payload)
}) })
}() }()

View file

@ -16,15 +16,8 @@ type ExtSource interface {
Close() Close()
} }
// StreamProc is implemented by streamproc.StreamProc.
type StreamProc interface {
OnFrame(int, gortsplib.StreamType, []byte)
}
// ExtSetReadyRes is a set ready response. // ExtSetReadyRes is a set ready response.
type ExtSetReadyRes struct { type ExtSetReadyRes struct{}
SP StreamProc
}
// ExtSetReadyReq is a set ready request. // ExtSetReadyReq is a set ready request.
type ExtSetReadyReq struct { type ExtSetReadyReq struct {

View file

@ -1,82 +0,0 @@
package streamproc
import (
"encoding/binary"
"sync/atomic"
"time"
"github.com/aler9/gortsplib"
)
// Path is implemented by path.path.
type Path interface {
OnSPFrame(int, gortsplib.StreamType, []byte)
}
// TrackInfo contains infos about a track.
type TrackInfo struct {
LastSequenceNumber uint16
LastTimeRTP uint32
LastTimeNTP int64
LastSSRC uint32
}
type track struct {
lastSequenceNumber uint32
lastTimeRTP uint32
lastTimeNTP int64
lastSSRC uint32
}
// StreamProc is a stream processor, an intermediate layer between a source and a path.
type StreamProc struct {
path Path
tracks []*track
}
// New allocates a StreamProc.
func New(path Path, tracksLen int) *StreamProc {
sp := &StreamProc{
path: path,
}
sp.tracks = make([]*track, tracksLen)
for i := range sp.tracks {
sp.tracks[i] = &track{}
}
return sp
}
// TrackInfos returns infos about the tracks of the stream.
func (sp *StreamProc) TrackInfos() []TrackInfo {
ret := make([]TrackInfo, len(sp.tracks))
for trackID, track := range sp.tracks {
ret[trackID] = TrackInfo{
LastSequenceNumber: uint16(atomic.LoadUint32(&track.lastSequenceNumber)),
LastTimeRTP: atomic.LoadUint32(&track.lastTimeRTP),
LastTimeNTP: atomic.LoadInt64(&track.lastTimeNTP),
LastSSRC: atomic.LoadUint32(&track.lastSSRC),
}
}
return ret
}
// OnFrame processes a frame.
func (sp *StreamProc) OnFrame(trackID int, streamType gortsplib.StreamType, payload []byte) {
if streamType == gortsplib.StreamTypeRTP && len(payload) >= 8 {
track := sp.tracks[trackID]
sequenceNumber := binary.BigEndian.Uint16(payload[2:4])
atomic.StoreUint32(&track.lastSequenceNumber, uint32(sequenceNumber))
timestamp := binary.BigEndian.Uint32(payload[4:8])
atomic.StoreUint32(&track.lastTimeRTP, timestamp)
atomic.StoreInt64(&track.lastTimeNTP, time.Now().Unix())
ssrc := binary.BigEndian.Uint32(payload[8:12])
atomic.StoreUint32(&track.lastSSRC, ssrc)
}
sp.path.OnSPFrame(trackID, streamType, payload)
}

View file

@ -7,7 +7,6 @@ import (
"reflect" "reflect"
"sync/atomic" "sync/atomic"
"github.com/aler9/gortsplib"
"gopkg.in/alecthomas/kingpin.v2" "gopkg.in/alecthomas/kingpin.v2"
"github.com/aler9/rtsp-simple-server/internal/conf" "github.com/aler9/rtsp-simple-server/internal/conf"
@ -215,7 +214,7 @@ func (p *program) createResources(initial bool) error {
(p.conf.EncryptionParsed == conf.EncryptionNo || (p.conf.EncryptionParsed == conf.EncryptionNo ||
p.conf.EncryptionParsed == conf.EncryptionOptional) { p.conf.EncryptionParsed == conf.EncryptionOptional) {
if p.serverRTSPPlain == nil { if p.serverRTSPPlain == nil {
_, useUDP := p.conf.ProtocolsParsed[gortsplib.StreamProtocolUDP] _, useUDP := p.conf.ProtocolsParsed[conf.ProtocolUDP]
p.serverRTSPPlain, err = rtspserver.New( p.serverRTSPPlain, err = rtspserver.New(
p.ctx, p.ctx,
p.conf.RTSPAddress, p.conf.RTSPAddress,

View file

@ -18,7 +18,7 @@ func TestClientHLSRead(t *testing.T) {
"-i", "emptyvideo.mkv", "-i", "emptyvideo.mkv",
"-c", "copy", "-c", "copy",
"-f", "rtsp", "-f", "rtsp",
"rtsp://" + ownDockerIP + ":8554/test/stream", "rtsp://localhost:8554/test/stream",
}) })
require.NoError(t, err) require.NoError(t, err)
defer cnt1.close() defer cnt1.close()
@ -26,7 +26,7 @@ func TestClientHLSRead(t *testing.T) {
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
cnt2, err := newContainer("ffmpeg", "dest", []string{ cnt2, err := newContainer("ffmpeg", "dest", []string{
"-i", "http://" + ownDockerIP + ":8888/test/stream/stream.m3u8", "-i", "http://localhost:8888/test/stream/stream.m3u8",
"-vframes", "1", "-vframes", "1",
"-f", "image2", "-f", "image2",
"-y", "/dev/null", "-y", "/dev/null",
@ -42,7 +42,7 @@ func TestClientHLSReadAuth(t *testing.T) {
" all:\n" + " all:\n" +
" readUser: testuser\n" + " readUser: testuser\n" +
" readPass: testpass\n" + " readPass: testpass\n" +
" readIps: [172.17.0.0/16]\n") " readIps: [127.0.0.0/16]\n")
require.Equal(t, true, ok) require.Equal(t, true, ok)
defer p.close() defer p.close()
@ -52,7 +52,7 @@ func TestClientHLSReadAuth(t *testing.T) {
"-i", "emptyvideo.mkv", "-i", "emptyvideo.mkv",
"-c", "copy", "-c", "copy",
"-f", "rtsp", "-f", "rtsp",
"rtsp://" + ownDockerIP + ":8554/teststream", "rtsp://localhost:8554/teststream",
}) })
require.NoError(t, err) require.NoError(t, err)
defer cnt1.close() defer cnt1.close()
@ -60,7 +60,7 @@ func TestClientHLSReadAuth(t *testing.T) {
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
cnt2, err := newContainer("ffmpeg", "dest", []string{ cnt2, err := newContainer("ffmpeg", "dest", []string{
"-i", "http://testuser:testpass@" + ownDockerIP + ":8888/teststream/stream.m3u8", "-i", "http://testuser:testpass@127.0.0.1:8888/teststream/stream.m3u8",
"-vframes", "1", "-vframes", "1",
"-f", "image2", "-f", "image2",
"-y", "/dev/null", "-y", "/dev/null",

View file

@ -23,7 +23,7 @@ func TestClientRTMPPublish(t *testing.T) {
"-i", "empty" + source + ".mkv", "-i", "empty" + source + ".mkv",
"-c", "copy", "-c", "copy",
"-f", "flv", "-f", "flv",
"rtmp://" + ownDockerIP + ":1935/test1/test2", "rtmp://localhost:1935/test1/test2",
}) })
require.NoError(t, err) require.NoError(t, err)
defer cnt1.close() defer cnt1.close()
@ -32,7 +32,7 @@ func TestClientRTMPPublish(t *testing.T) {
cnt2, err := newContainer("ffmpeg", "dest", []string{ cnt2, err := newContainer("ffmpeg", "dest", []string{
"-rtsp_transport", "udp", "-rtsp_transport", "udp",
"-i", "rtsp://" + ownDockerIP + ":8554/test1/test2", "-i", "rtsp://localhost:8554/test1/test2",
"-vframes", "1", "-vframes", "1",
"-f", "image2", "-f", "image2",
"-y", "/dev/null", "-y", "/dev/null",
@ -55,7 +55,7 @@ func TestClientRTMPRead(t *testing.T) {
"-i", "emptyvideo.mkv", "-i", "emptyvideo.mkv",
"-c", "copy", "-c", "copy",
"-f", "rtsp", "-f", "rtsp",
"rtsp://" + ownDockerIP + ":8554/teststream", "rtsp://localhost:8554/teststream",
}) })
require.NoError(t, err) require.NoError(t, err)
defer cnt1.close() defer cnt1.close()
@ -63,7 +63,7 @@ func TestClientRTMPRead(t *testing.T) {
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
cnt2, err := newContainer("ffmpeg", "dest", []string{ cnt2, err := newContainer("ffmpeg", "dest", []string{
"-i", "rtmp://" + ownDockerIP + ":1935/teststream", "-i", "rtmp://localhost:1935/teststream",
"-vframes", "1", "-vframes", "1",
"-f", "image2", "-f", "image2",
"-y", "/dev/null", "-y", "/dev/null",
@ -81,7 +81,7 @@ func TestClientRTMPAuth(t *testing.T) {
" all:\n" + " all:\n" +
" publishUser: testuser\n" + " publishUser: testuser\n" +
" publishPass: testpass\n" + " publishPass: testpass\n" +
" readIps: [172.17.0.0/16]\n") " readIps: [127.0.0.0/16]\n")
require.Equal(t, true, ok) require.Equal(t, true, ok)
defer p.close() defer p.close()
@ -91,7 +91,7 @@ func TestClientRTMPAuth(t *testing.T) {
"-i", "emptyvideo.mkv", "-i", "emptyvideo.mkv",
"-c", "copy", "-c", "copy",
"-f", "flv", "-f", "flv",
"rtmp://" + ownDockerIP + "/teststream?user=testuser&pass=testpass", "rtmp://localhost/teststream?user=testuser&pass=testpass",
}) })
require.NoError(t, err) require.NoError(t, err)
defer cnt1.close() defer cnt1.close()
@ -99,7 +99,7 @@ func TestClientRTMPAuth(t *testing.T) {
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
cnt2, err := newContainer("ffmpeg", "dest", []string{ cnt2, err := newContainer("ffmpeg", "dest", []string{
"-i", "rtmp://" + ownDockerIP + "/teststream", "-i", "rtmp://127.0.0.1/teststream",
"-vframes", "1", "-vframes", "1",
"-f", "image2", "-f", "image2",
"-y", "/dev/null", "-y", "/dev/null",
@ -116,7 +116,7 @@ func TestClientRTMPAuth(t *testing.T) {
" all:\n" + " all:\n" +
" readUser: testuser\n" + " readUser: testuser\n" +
" readPass: testpass\n" + " readPass: testpass\n" +
" readIps: [172.17.0.0/16]\n") " readIps: [127.0.0.0/16]\n")
require.Equal(t, true, ok) require.Equal(t, true, ok)
defer p.close() defer p.close()
@ -126,7 +126,7 @@ func TestClientRTMPAuth(t *testing.T) {
"-i", "emptyvideo.mkv", "-i", "emptyvideo.mkv",
"-c", "copy", "-c", "copy",
"-f", "flv", "-f", "flv",
"rtmp://" + ownDockerIP + "/teststream", "rtmp://localhost/teststream",
}) })
require.NoError(t, err) require.NoError(t, err)
defer cnt1.close() defer cnt1.close()
@ -134,7 +134,7 @@ func TestClientRTMPAuth(t *testing.T) {
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
cnt2, err := newContainer("ffmpeg", "dest", []string{ cnt2, err := newContainer("ffmpeg", "dest", []string{
"-i", "rtmp://" + ownDockerIP + "/teststream?user=testuser&pass=testpass", "-i", "rtmp://127.0.0.1/teststream?user=testuser&pass=testpass",
"-vframes", "1", "-vframes", "1",
"-f", "image2", "-f", "image2",
"-y", "/dev/null", "-y", "/dev/null",
@ -162,7 +162,7 @@ func TestClientRTMPAuthFail(t *testing.T) {
"-i", "emptyvideo.mkv", "-i", "emptyvideo.mkv",
"-c", "copy", "-c", "copy",
"-f", "flv", "-f", "flv",
"rtmp://" + ownDockerIP + "/teststream?user=testuser&pass=testpass", "rtmp://localhost/teststream?user=testuser&pass=testpass",
}) })
require.NoError(t, err) require.NoError(t, err)
defer cnt1.close() defer cnt1.close()
@ -170,7 +170,7 @@ func TestClientRTMPAuthFail(t *testing.T) {
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
cnt2, err := newContainer("ffmpeg", "dest", []string{ cnt2, err := newContainer("ffmpeg", "dest", []string{
"-i", "rtmp://" + ownDockerIP + "/teststream", "-i", "rtmp://localhost/teststream",
"-vframes", "1", "-vframes", "1",
"-f", "image2", "-f", "image2",
"-y", "/dev/null", "-y", "/dev/null",
@ -196,7 +196,7 @@ func TestClientRTMPAuthFail(t *testing.T) {
"-i", "emptyvideo.mkv", "-i", "emptyvideo.mkv",
"-c", "copy", "-c", "copy",
"-f", "flv", "-f", "flv",
"rtmp://" + ownDockerIP + "/teststream", "rtmp://localhost/teststream",
}) })
require.NoError(t, err) require.NoError(t, err)
defer cnt1.close() defer cnt1.close()
@ -204,7 +204,7 @@ func TestClientRTMPAuthFail(t *testing.T) {
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
cnt2, err := newContainer("ffmpeg", "dest", []string{ cnt2, err := newContainer("ffmpeg", "dest", []string{
"-i", "rtmp://" + ownDockerIP + "/teststream?user=testuser&pass=testpass", "-i", "rtmp://localhost/teststream?user=testuser&pass=testpass",
"-vframes", "1", "-vframes", "1",
"-f", "image2", "-f", "image2",
"-y", "/dev/null", "-y", "/dev/null",

View file

@ -33,9 +33,10 @@ func TestSourceRTMP(t *testing.T) {
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
p, ok := testProgram("hlsDisable: yes\n" + p, ok := testProgram("hlsDisable: yes\n" +
"rtmpDisable: yes\n" +
"paths:\n" + "paths:\n" +
" proxied:\n" + " proxied:\n" +
" source: rtmp://" + cnt1.ip() + "/stream/test\n" + " source: rtmp://localhost/stream/test\n" +
" sourceOnDemand: yes\n") " sourceOnDemand: yes\n")
require.Equal(t, true, ok) require.Equal(t, true, ok)
defer p.close() defer p.close()
@ -45,7 +46,7 @@ func TestSourceRTMP(t *testing.T) {
cnt3, err := newContainer("ffmpeg", "dest", []string{ cnt3, err := newContainer("ffmpeg", "dest", []string{
"-rtsp_transport", "udp", "-rtsp_transport", "udp",
"-i", "rtsp://" + ownDockerIP + ":8554/proxied", "-i", "rtsp://localhost:8554/proxied",
"-vframes", "1", "-vframes", "1",
"-f", "image2", "-f", "image2",
"-y", "/dev/null", "-y", "/dev/null",

View file

@ -13,7 +13,6 @@ import (
"github.com/aler9/gortsplib" "github.com/aler9/gortsplib"
"github.com/aler9/gortsplib/pkg/base" "github.com/aler9/gortsplib/pkg/base"
"github.com/aler9/gortsplib/pkg/headers" "github.com/aler9/gortsplib/pkg/headers"
"github.com/pion/rtp"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
@ -27,39 +26,31 @@ func mustParseURL(s string) *base.URL {
func TestClientRTSPPublishRead(t *testing.T) { func TestClientRTSPPublishRead(t *testing.T) {
for _, ca := range []struct { for _, ca := range []struct {
encrypted bool
publisherSoft string publisherSoft string
publisherProto string publisherProto string
readerSoft string readerSoft string
readerProto string readerProto string
}{ }{
{false, "ffmpeg", "udp", "ffmpeg", "udp"}, {"ffmpeg", "udp", "ffmpeg", "udp"},
{false, "ffmpeg", "udp", "ffmpeg", "tcp"}, {"ffmpeg", "udp", "ffmpeg", "multicast"},
{false, "ffmpeg", "udp", "gstreamer", "udp"}, {"ffmpeg", "udp", "ffmpeg", "tcp"},
{false, "ffmpeg", "udp", "gstreamer", "tcp"}, {"ffmpeg", "udp", "gstreamer", "udp"},
{false, "ffmpeg", "udp", "vlc", "udp"}, {"ffmpeg", "udp", "gstreamer", "multicast"},
{false, "ffmpeg", "udp", "vlc", "tcp"}, {"ffmpeg", "udp", "gstreamer", "tcp"},
{"ffmpeg", "udp", "vlc", "udp"},
{false, "ffmpeg", "tcp", "ffmpeg", "udp"}, {"ffmpeg", "udp", "vlc", "tcp"},
{false, "gstreamer", "udp", "ffmpeg", "udp"}, {"ffmpeg", "tcp", "ffmpeg", "udp"},
{false, "gstreamer", "tcp", "ffmpeg", "udp"}, {"gstreamer", "udp", "ffmpeg", "udp"},
{"gstreamer", "tcp", "ffmpeg", "udp"},
{true, "ffmpeg", "tcp", "ffmpeg", "tcp"}, {"ffmpeg", "tls", "ffmpeg", "tls"},
{true, "ffmpeg", "tcp", "gstreamer", "tcp"}, {"ffmpeg", "tls", "gstreamer", "tls"},
{true, "gstreamer", "tcp", "ffmpeg", "tcp"}, {"gstreamer", "tls", "ffmpeg", "tls"},
} { } {
encryptedStr := func() string { t.Run(ca.publisherSoft+"_"+ca.publisherProto+"_"+
if ca.encrypted {
return "encrypted"
}
return "plain"
}()
t.Run(encryptedStr+"_"+ca.publisherSoft+"_"+ca.publisherProto+"_"+
ca.readerSoft+"_"+ca.readerProto, func(t *testing.T) { ca.readerSoft+"_"+ca.readerProto, func(t *testing.T) {
var proto string var proto string
var port string var port string
if !ca.encrypted { if ca.publisherProto != "tls" {
proto = "rtsp" proto = "rtsp"
port = "8554" port = "8554"
@ -94,14 +85,25 @@ func TestClientRTSPPublishRead(t *testing.T) {
switch ca.publisherSoft { switch ca.publisherSoft {
case "ffmpeg": case "ffmpeg":
ps := func() string {
switch ca.publisherProto {
case "udp", "tcp":
return ca.publisherProto
default: // tls
return "tcp"
}
}()
cnt1, err := newContainer("ffmpeg", "source", []string{ cnt1, err := newContainer("ffmpeg", "source", []string{
"-re", "-re",
"-stream_loop", "-1", "-stream_loop", "-1",
"-i", "emptyvideo.mkv", "-i", "emptyvideo.mkv",
"-c", "copy", "-c", "copy",
"-f", "rtsp", "-f", "rtsp",
"-rtsp_transport", ca.publisherProto, "-rtsp_transport",
proto + "://" + ownDockerIP + ":" + port + "/teststream", ps,
proto + "://localhost:" + port + "/teststream",
}) })
require.NoError(t, err) require.NoError(t, err)
defer cnt1.close() defer cnt1.close()
@ -109,10 +111,20 @@ func TestClientRTSPPublishRead(t *testing.T) {
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
case "gstreamer": case "gstreamer":
ps := func() string {
switch ca.publisherProto {
case "udp", "tcp":
return ca.publisherProto
default: // tls
return "tcp"
}
}()
cnt1, err := newContainer("gstreamer", "source", []string{ cnt1, err := newContainer("gstreamer", "source", []string{
"filesrc location=emptyvideo.mkv ! matroskademux ! video/x-h264 ! rtspclientsink " + "filesrc location=emptyvideo.mkv ! matroskademux ! video/x-h264 ! rtspclientsink " +
"location=" + proto + "://" + ownDockerIP + ":" + port + "/teststream " + "location=" + proto + "://localhost:" + port + "/teststream " +
"protocols=" + ca.publisherProto + " tls-validation-flags=0 latency=0 timeout=0 rtx-time=0", "protocols=" + ps + " tls-validation-flags=0 latency=0 timeout=0 rtx-time=0",
}) })
require.NoError(t, err) require.NoError(t, err)
defer cnt1.close() defer cnt1.close()
@ -124,9 +136,22 @@ func TestClientRTSPPublishRead(t *testing.T) {
switch ca.readerSoft { switch ca.readerSoft {
case "ffmpeg": case "ffmpeg":
ps := func() string {
switch ca.readerProto {
case "udp", "tcp":
return ca.publisherProto
case "multicast":
return "udp_multicast"
default: // tls
return "tcp"
}
}()
cnt2, err := newContainer("ffmpeg", "dest", []string{ cnt2, err := newContainer("ffmpeg", "dest", []string{
"-rtsp_transport", ca.readerProto, "-rtsp_transport", ps,
"-i", proto + "://" + ownDockerIP + ":" + port + "/teststream", "-i", proto + "://localhost:" + port + "/teststream",
"-vframes", "1", "-vframes", "1",
"-f", "image2", "-f", "image2",
"-y", "/dev/null", "-y", "/dev/null",
@ -136,8 +161,21 @@ func TestClientRTSPPublishRead(t *testing.T) {
require.Equal(t, 0, cnt2.wait()) require.Equal(t, 0, cnt2.wait())
case "gstreamer": case "gstreamer":
ps := func() string {
switch ca.readerProto {
case "udp", "tcp":
return ca.publisherProto
case "multicast":
return "udp-mcast"
default: // tls
return "tcp"
}
}()
cnt2, err := newContainer("gstreamer", "read", []string{ cnt2, err := newContainer("gstreamer", "read", []string{
"rtspsrc location=" + proto + "://" + ownDockerIP + ":" + port + "/teststream protocols=tcp tls-validation-flags=0 latency=0 " + "rtspsrc location=" + proto + "://127.0.0.1:" + port + "/teststream protocols=" + ps + " tls-validation-flags=0 latency=0 " +
"! application/x-rtp,media=video ! decodebin ! exitafterframe ! fakesink", "! application/x-rtp,media=video ! decodebin ! exitafterframe ! fakesink",
}) })
require.NoError(t, err) require.NoError(t, err)
@ -149,7 +187,7 @@ func TestClientRTSPPublishRead(t *testing.T) {
if ca.readerProto == "tcp" { if ca.readerProto == "tcp" {
args = append(args, "--rtsp-tcp") args = append(args, "--rtsp-tcp")
} }
args = append(args, proto+"://"+ownDockerIP+":"+port+"/teststream") args = append(args, proto+"://localhost:"+port+"/teststream")
cnt2, err := newContainer("vlc", "dest", args) cnt2, err := newContainer("vlc", "dest", args)
require.NoError(t, err) require.NoError(t, err)
defer cnt2.close() defer cnt2.close()
@ -167,7 +205,7 @@ func TestClientRTSPAuth(t *testing.T) {
" all:\n" + " all:\n" +
" publishUser: testuser\n" + " publishUser: testuser\n" +
" publishPass: test!$()*+.;<=>[]^_-{}\n" + " publishPass: test!$()*+.;<=>[]^_-{}\n" +
" publishIps: [172.17.0.0/16]\n") " publishIps: [127.0.0.0/16]\n")
require.Equal(t, true, ok) require.Equal(t, true, ok)
defer p.close() defer p.close()
@ -178,7 +216,7 @@ func TestClientRTSPAuth(t *testing.T) {
"-c", "copy", "-c", "copy",
"-f", "rtsp", "-f", "rtsp",
"-rtsp_transport", "udp", "-rtsp_transport", "udp",
"rtsp://testuser:test!$()*+.;<=>[]^_-{}@" + ownDockerIP + ":8554/test/stream", "rtsp://testuser:test!$()*+.;<=>[]^_-{}@127.0.0.1:8554/test/stream",
}) })
require.NoError(t, err) require.NoError(t, err)
defer cnt1.close() defer cnt1.close()
@ -187,7 +225,7 @@ func TestClientRTSPAuth(t *testing.T) {
cnt2, err := newContainer("ffmpeg", "dest", []string{ cnt2, err := newContainer("ffmpeg", "dest", []string{
"-rtsp_transport", "udp", "-rtsp_transport", "udp",
"-i", "rtsp://" + ownDockerIP + ":8554/test/stream", "-i", "rtsp://localhost:8554/test/stream",
"-vframes", "1", "-vframes", "1",
"-f", "image2", "-f", "image2",
"-y", "/dev/null", "-y", "/dev/null",
@ -208,7 +246,7 @@ func TestClientRTSPAuth(t *testing.T) {
" all:\n" + " all:\n" +
" readUser: testuser\n" + " readUser: testuser\n" +
" readPass: test!$()*+.;<=>[]^_-{}\n" + " readPass: test!$()*+.;<=>[]^_-{}\n" +
" readIps: [172.17.0.0/16]\n") " readIps: [127.0.0.0/16]\n")
require.Equal(t, true, ok) require.Equal(t, true, ok)
defer p.close() defer p.close()
@ -219,7 +257,7 @@ func TestClientRTSPAuth(t *testing.T) {
"-c", "copy", "-c", "copy",
"-f", "rtsp", "-f", "rtsp",
"-rtsp_transport", "udp", "-rtsp_transport", "udp",
"rtsp://" + ownDockerIP + ":8554/test/stream", "rtsp://localhost:8554/test/stream",
}) })
require.NoError(t, err) require.NoError(t, err)
defer cnt1.close() defer cnt1.close()
@ -229,7 +267,7 @@ func TestClientRTSPAuth(t *testing.T) {
if soft == "ffmpeg" { if soft == "ffmpeg" {
cnt2, err := newContainer("ffmpeg", "dest", []string{ cnt2, err := newContainer("ffmpeg", "dest", []string{
"-rtsp_transport", "udp", "-rtsp_transport", "udp",
"-i", "rtsp://testuser:test!$()*+.;<=>[]^_-{}@" + ownDockerIP + ":8554/test/stream", "-i", "rtsp://testuser:test!$()*+.;<=>[]^_-{}@127.0.0.1:8554/test/stream",
"-vframes", "1", "-vframes", "1",
"-f", "image2", "-f", "image2",
"-y", "/dev/null", "-y", "/dev/null",
@ -240,7 +278,7 @@ func TestClientRTSPAuth(t *testing.T) {
} else { } else {
cnt2, err := newContainer("vlc", "dest", []string{ cnt2, err := newContainer("vlc", "dest", []string{
"rtsp://testuser:test!$()*+.;<=>[]^_-{}@" + ownDockerIP + ":8554/test/stream", "rtsp://testuser:test!$()*+.;<=>[]^_-{}@localhost:8554/test/stream",
}) })
require.NoError(t, err) require.NoError(t, err)
defer cnt2.close() defer cnt2.close()
@ -266,14 +304,14 @@ func TestClientRTSPAuth(t *testing.T) {
"-c", "copy", "-c", "copy",
"-f", "rtsp", "-f", "rtsp",
"-rtsp_transport", "udp", "-rtsp_transport", "udp",
"rtsp://" + ownDockerIP + ":8554/test/stream", "rtsp://localhost:8554/test/stream",
}) })
require.NoError(t, err) require.NoError(t, err)
defer cnt1.close() defer cnt1.close()
cnt2, err := newContainer("ffmpeg", "dest", []string{ cnt2, err := newContainer("ffmpeg", "dest", []string{
"-rtsp_transport", "udp", "-rtsp_transport", "udp",
"-i", "rtsp://testuser:testpass@" + ownDockerIP + ":8554/test/stream", "-i", "rtsp://testuser:testpass@localhost:8554/test/stream",
"-vframes", "1", "-vframes", "1",
"-f", "image2", "-f", "image2",
"-y", "/dev/null", "-y", "/dev/null",
@ -320,7 +358,7 @@ func TestClientRTSPAuthFail(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
_, err = gortsplib.DialPublish( _, err = gortsplib.DialPublish(
"rtsp://"+ca.user+":"+ca.pass+"@"+ownDockerIP+":8554/test/stream", "rtsp://"+ca.user+":"+ca.pass+"@localhost:8554/test/stream",
gortsplib.Tracks{track}, gortsplib.Tracks{track},
) )
require.Equal(t, "wrong status code: 401 (Unauthorized)", err.Error()) require.Equal(t, "wrong status code: 401 (Unauthorized)", err.Error())
@ -359,7 +397,7 @@ func TestClientRTSPAuthFail(t *testing.T) {
defer p.close() defer p.close()
_, err := gortsplib.DialRead( _, err := gortsplib.DialRead(
"rtsp://" + ca.user + ":" + ca.pass + "@" + ownDockerIP + ":8554/test/stream", "rtsp://" + ca.user + ":" + ca.pass + "@localhost:8554/test/stream",
) )
require.Equal(t, "wrong status code: 401 (Unauthorized)", err.Error()) require.Equal(t, "wrong status code: 401 (Unauthorized)", err.Error())
}) })
@ -370,7 +408,7 @@ func TestClientRTSPAuthFail(t *testing.T) {
"hlsDisable: yes\n" + "hlsDisable: yes\n" +
"paths:\n" + "paths:\n" +
" all:\n" + " all:\n" +
" publishIps: [127.0.0.1/32]\n") " publishIps: [128.0.0.1/32]\n")
require.Equal(t, true, ok) require.Equal(t, true, ok)
defer p.close() defer p.close()
@ -378,7 +416,7 @@ func TestClientRTSPAuthFail(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
_, err = gortsplib.DialPublish( _, err = gortsplib.DialPublish(
"rtsp://"+ownDockerIP+":8554/test/stream", "rtsp://localhost:8554/test/stream",
gortsplib.Tracks{track}, gortsplib.Tracks{track},
) )
require.Equal(t, "wrong status code: 401 (Unauthorized)", err.Error()) require.Equal(t, "wrong status code: 401 (Unauthorized)", err.Error())
@ -403,14 +441,14 @@ func TestClientRTSPAutomaticProtocol(t *testing.T) {
"-i", "emptyvideo.mkv", "-i", "emptyvideo.mkv",
"-c", "copy", "-c", "copy",
"-f", "rtsp", "-f", "rtsp",
"rtsp://" + ownDockerIP + ":8554/teststream", "rtsp://localhost:8554/teststream",
}) })
require.NoError(t, err) require.NoError(t, err)
defer cnt1.close() defer cnt1.close()
} }
cnt2, err := newContainer("ffmpeg", "dest", []string{ cnt2, err := newContainer("ffmpeg", "dest", []string{
"-i", "rtsp://" + ownDockerIP + ":8554/teststream", "-i", "rtsp://localhost:8554/teststream",
"-vframes", "1", "-vframes", "1",
"-f", "image2", "-f", "image2",
"-y", "/dev/null", "-y", "/dev/null",
@ -442,12 +480,12 @@ func TestClientRTSPPublisherOverride(t *testing.T) {
track, err := gortsplib.NewTrackH264(68, []byte{0x01, 0x02, 0x03, 0x04}, []byte{0x01, 0x02, 0x03, 0x04}) track, err := gortsplib.NewTrackH264(68, []byte{0x01, 0x02, 0x03, 0x04}, []byte{0x01, 0x02, 0x03, 0x04})
require.NoError(t, err) require.NoError(t, err)
s1, err := gortsplib.DialPublish("rtsp://"+ownDockerIP+":8554/teststream", s1, err := gortsplib.DialPublish("rtsp://localhost:8554/teststream",
gortsplib.Tracks{track}) gortsplib.Tracks{track})
require.NoError(t, err) require.NoError(t, err)
defer s1.Close() defer s1.Close()
s2, err := gortsplib.DialPublish("rtsp://"+ownDockerIP+":8554/teststream", s2, err := gortsplib.DialPublish("rtsp://localhost:8554/teststream",
gortsplib.Tracks{track}) gortsplib.Tracks{track})
if ca == "enabled" { if ca == "enabled" {
require.NoError(t, err) require.NoError(t, err)
@ -456,7 +494,7 @@ func TestClientRTSPPublisherOverride(t *testing.T) {
require.Error(t, err) require.Error(t, err)
} }
d1, err := gortsplib.DialRead("rtsp://" + ownDockerIP + ":8554/teststream") d1, err := gortsplib.DialRead("rtsp://localhost:8554/teststream")
require.NoError(t, err) require.NoError(t, err)
defer d1.Close() defer d1.Close()
@ -465,12 +503,14 @@ func TestClientRTSPPublisherOverride(t *testing.T) {
go func() { go func() {
defer close(readDone) defer close(readDone)
d1.ReadFrames(func(trackID int, streamType base.StreamType, payload []byte) { d1.ReadFrames(func(trackID int, streamType base.StreamType, payload []byte) {
if ca == "enabled" { if streamType == gortsplib.StreamTypeRTP {
require.Equal(t, []byte{0x05, 0x06, 0x07, 0x08}, payload) if ca == "enabled" {
} else { require.Equal(t, []byte{0x05, 0x06, 0x07, 0x08}, payload)
require.Equal(t, []byte{0x01, 0x02, 0x03, 0x04}, payload) } else {
require.Equal(t, []byte{0x01, 0x02, 0x03, 0x04}, payload)
}
close(frameRecv)
} }
close(frameRecv)
}) })
}() }()
@ -508,19 +548,19 @@ func TestClientRTSPNonCompliantFrameSize(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
client := &gortsplib.Client{ client := &gortsplib.Client{
StreamProtocol: func() *gortsplib.StreamProtocol { StreamProtocol: func() *base.StreamProtocol {
v := gortsplib.StreamProtocolTCP v := base.StreamProtocolTCP
return &v return &v
}(), }(),
ReadBufferSize: 4500, ReadBufferSize: 4500,
} }
source, err := client.DialPublish("rtsp://"+ownDockerIP+":8554/teststream", source, err := client.DialPublish("rtsp://localhost:8554/teststream",
gortsplib.Tracks{track}) gortsplib.Tracks{track})
require.NoError(t, err) require.NoError(t, err)
defer source.Close() defer source.Close()
dest, err := client.DialRead("rtsp://" + ownDockerIP + ":8554/teststream") dest, err := client.DialRead("rtsp://localhost:8554/teststream")
require.NoError(t, err) require.NoError(t, err)
defer dest.Close() defer dest.Close()
@ -558,14 +598,14 @@ func TestClientRTSPNonCompliantFrameSize(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
client := &gortsplib.Client{ client := &gortsplib.Client{
StreamProtocol: func() *gortsplib.StreamProtocol { StreamProtocol: func() *base.StreamProtocol {
v := gortsplib.StreamProtocolTCP v := base.StreamProtocolTCP
return &v return &v
}(), }(),
ReadBufferSize: 4500, ReadBufferSize: 4500,
} }
source, err := client.DialPublish("rtsp://"+ownDockerIP+":8554/teststream", source, err := client.DialPublish("rtsp://localhost:8554/teststream",
gortsplib.Tracks{track}) gortsplib.Tracks{track})
require.NoError(t, err) require.NoError(t, err)
defer source.Close() defer source.Close()
@ -577,14 +617,14 @@ func TestClientRTSPNonCompliantFrameSize(t *testing.T) {
"rtspAddress: :8555\n" + "rtspAddress: :8555\n" +
"paths:\n" + "paths:\n" +
" teststream:\n" + " teststream:\n" +
" source: rtsp://" + ownDockerIP + ":8554/teststream\n" + " source: rtsp://localhost:8554/teststream\n" +
" sourceProtocol: tcp\n") " sourceProtocol: tcp\n")
require.Equal(t, true, ok) require.Equal(t, true, ok)
defer p2.close() defer p2.close()
time.Sleep(100 * time.Millisecond) time.Sleep(100 * time.Millisecond)
dest, err := client.DialRead("rtsp://" + ownDockerIP + ":8555/teststream") dest, err := client.DialRead("rtsp://localhost:8555/teststream")
require.NoError(t, err) require.NoError(t, err)
defer dest.Close() defer dest.Close()
@ -611,176 +651,13 @@ func TestClientRTSPNonCompliantFrameSize(t *testing.T) {
}) })
} }
func TestClientRTSPAdditionalInfos(t *testing.T) {
getInfos := func() (*headers.RTPInfo, []*uint32, error) {
u, err := base.ParseURL("rtsp://" + ownDockerIP + ":8554/teststream")
if err != nil {
return nil, nil, err
}
conn, err := gortsplib.Dial(u.Scheme, u.Host)
if err != nil {
return nil, nil, err
}
defer conn.Close()
tracks, _, err := conn.Describe(u)
if err != nil {
return nil, nil, err
}
ssrcs := make([]*uint32, len(tracks))
for i, t := range tracks {
res, err := conn.Setup(headers.TransportModePlay, t, 0, 0)
if err != nil {
return nil, nil, err
}
var th headers.Transport
err = th.Read(res.Header["Transport"])
if err != nil {
return nil, nil, err
}
ssrcs[i] = th.SSRC
}
res, err := conn.Play(nil)
if err != nil {
return nil, nil, err
}
var ri headers.RTPInfo
err = ri.Read(res.Header["RTP-Info"])
if err != nil {
return nil, nil, err
}
return &ri, ssrcs, nil
}
p, ok := testProgram("rtmpDisable: yes\n" +
"hlsDisable: yes\n")
require.Equal(t, true, ok)
defer p.close()
track1, err := gortsplib.NewTrackH264(96, []byte("123456"), []byte("123456"))
require.NoError(t, err)
track2, err := gortsplib.NewTrackH264(96, []byte("123456"), []byte("123456"))
require.NoError(t, err)
source, err := gortsplib.DialPublish("rtsp://"+ownDockerIP+":8554/teststream",
gortsplib.Tracks{track1, track2})
require.NoError(t, err)
defer source.Close()
pkt := rtp.Packet{
Header: rtp.Header{
Version: 0x80,
PayloadType: 96,
SequenceNumber: 556,
Timestamp: 984512368,
SSRC: 96342362,
},
Payload: []byte{0x01, 0x02, 0x03, 0x04},
}
buf, err := pkt.Marshal()
require.NoError(t, err)
err = source.WriteFrame(track1.ID, gortsplib.StreamTypeRTP, buf)
require.NoError(t, err)
rtpInfo, ssrcs, err := getInfos()
require.NoError(t, err)
require.Equal(t, &headers.RTPInfo{
&headers.RTPInfoEntry{
URL: (&base.URL{
Scheme: "rtsp",
Host: ownDockerIP + ":8554",
Path: "/teststream/trackID=0",
}).String(),
SequenceNumber: func() *uint16 {
v := uint16(556)
return &v
}(),
Timestamp: (*rtpInfo)[0].Timestamp,
},
}, rtpInfo)
require.Equal(t, []*uint32{
func() *uint32 {
v := uint32(96342362)
return &v
}(),
nil,
}, ssrcs)
pkt = rtp.Packet{
Header: rtp.Header{
Version: 0x80,
PayloadType: 96,
SequenceNumber: 87,
Timestamp: 756436454,
SSRC: 536474323,
},
Payload: []byte{0x01, 0x02, 0x03, 0x04},
}
buf, err = pkt.Marshal()
require.NoError(t, err)
err = source.WriteFrame(track2.ID, gortsplib.StreamTypeRTP, buf)
require.NoError(t, err)
rtpInfo, ssrcs, err = getInfos()
require.NoError(t, err)
require.Equal(t, &headers.RTPInfo{
&headers.RTPInfoEntry{
URL: (&base.URL{
Scheme: "rtsp",
Host: ownDockerIP + ":8554",
Path: "/teststream/trackID=0",
}).String(),
SequenceNumber: func() *uint16 {
v := uint16(556)
return &v
}(),
Timestamp: (*rtpInfo)[0].Timestamp,
},
&headers.RTPInfoEntry{
URL: (&base.URL{
Scheme: "rtsp",
Host: ownDockerIP + ":8554",
Path: "/teststream/trackID=1",
}).String(),
SequenceNumber: func() *uint16 {
v := uint16(87)
return &v
}(),
Timestamp: (*rtpInfo)[1].Timestamp,
},
}, rtpInfo)
require.Equal(t, []*uint32{
func() *uint32 {
v := uint32(96342362)
return &v
}(),
func() *uint32 {
v := uint32(536474323)
return &v
}(),
}, ssrcs)
}
func TestClientRTSPRedirect(t *testing.T) { func TestClientRTSPRedirect(t *testing.T) {
p1, ok := testProgram("rtmpDisable: yes\n" + p1, ok := testProgram("rtmpDisable: yes\n" +
"hlsDisable: yes\n" + "hlsDisable: yes\n" +
"paths:\n" + "paths:\n" +
" path1:\n" + " path1:\n" +
" source: redirect\n" + " source: redirect\n" +
" sourceRedirect: rtsp://" + ownDockerIP + ":8554/path2\n" + " sourceRedirect: rtsp://localhost:8554/path2\n" +
" path2:\n") " path2:\n")
require.Equal(t, true, ok) require.Equal(t, true, ok)
defer p1.close() defer p1.close()
@ -792,7 +669,7 @@ func TestClientRTSPRedirect(t *testing.T) {
"-c", "copy", "-c", "copy",
"-f", "rtsp", "-f", "rtsp",
"-rtsp_transport", "udp", "-rtsp_transport", "udp",
"rtsp://" + ownDockerIP + ":8554/path2", "rtsp://localhost:8554/path2",
}) })
require.NoError(t, err) require.NoError(t, err)
defer cnt1.close() defer cnt1.close()
@ -801,7 +678,7 @@ func TestClientRTSPRedirect(t *testing.T) {
cnt2, err := newContainer("ffmpeg", "dest", []string{ cnt2, err := newContainer("ffmpeg", "dest", []string{
"-rtsp_transport", "udp", "-rtsp_transport", "udp",
"-i", "rtsp://" + ownDockerIP + ":8554/path1", "-i", "rtsp://localhost:8554/path1",
"-vframes", "1", "-vframes", "1",
"-f", "image2", "-f", "image2",
"-y", "/dev/null", "-y", "/dev/null",
@ -819,7 +696,7 @@ func TestClientRTSPFallback(t *testing.T) {
t.Run(ca, func(t *testing.T) { t.Run(ca, func(t *testing.T) {
val := func() string { val := func() string {
if ca == "absolute" { if ca == "absolute" {
return "rtsp://" + ownDockerIP + ":8554/path2" return "rtsp://localhost:8554/path2"
} }
return "/path2" return "/path2"
}() }()
@ -840,7 +717,7 @@ func TestClientRTSPFallback(t *testing.T) {
"-c", "copy", "-c", "copy",
"-f", "rtsp", "-f", "rtsp",
"-rtsp_transport", "udp", "-rtsp_transport", "udp",
"rtsp://" + ownDockerIP + ":8554/path2", "rtsp://localhost:8554/path2",
}) })
require.NoError(t, err) require.NoError(t, err)
defer cnt1.close() defer cnt1.close()
@ -849,7 +726,7 @@ func TestClientRTSPFallback(t *testing.T) {
cnt2, err := newContainer("ffmpeg", "dest", []string{ cnt2, err := newContainer("ffmpeg", "dest", []string{
"-rtsp_transport", "udp", "-rtsp_transport", "udp",
"-i", "rtsp://" + ownDockerIP + ":8554/path1", "-i", "rtsp://localhost:8554/path1",
"-vframes", "1", "-vframes", "1",
"-f", "image2", "-f", "image2",
"-y", "/dev/null", "-y", "/dev/null",
@ -954,7 +831,7 @@ wait
Header: base.Header{ Header: base.Header{
"CSeq": base.HeaderValue{"2"}, "CSeq": base.HeaderValue{"2"},
"Transport": headers.Transport{ "Transport": headers.Transport{
Protocol: gortsplib.StreamProtocolTCP, Protocol: base.StreamProtocolTCP,
Delivery: func() *base.StreamDelivery { Delivery: func() *base.StreamDelivery {
v := base.StreamDeliveryUnicast v := base.StreamDeliveryUnicast
return &v return &v
@ -1007,7 +884,7 @@ wait
Header: base.Header{ Header: base.Header{
"CSeq": base.HeaderValue{"1"}, "CSeq": base.HeaderValue{"1"},
"Transport": headers.Transport{ "Transport": headers.Transport{
Protocol: gortsplib.StreamProtocolTCP, Protocol: base.StreamProtocolTCP,
Delivery: func() *base.StreamDelivery { Delivery: func() *base.StreamDelivery {
v := base.StreamDeliveryUnicast v := base.StreamDeliveryUnicast
return &v return &v

View file

@ -16,10 +16,12 @@ type testServer struct {
user string user string
pass string pass string
authValidator *auth.Validator authValidator *auth.Validator
done chan struct{} stream *gortsplib.ServerStream
done chan struct{}
} }
func (sh *testServer) OnDescribe(ctx *gortsplib.ServerHandlerOnDescribeCtx) (*base.Response, []byte, error) { func (sh *testServer) OnDescribe(ctx *gortsplib.ServerHandlerOnDescribeCtx) (*base.Response, *gortsplib.ServerStream, error) {
if sh.authValidator == nil { if sh.authValidator == nil {
sh.authValidator = auth.NewValidator(sh.user, sh.pass, nil) sh.authValidator = auth.NewValidator(sh.user, sh.pass, nil)
} }
@ -35,26 +37,27 @@ func (sh *testServer) OnDescribe(ctx *gortsplib.ServerHandlerOnDescribeCtx) (*ba
} }
track, _ := gortsplib.NewTrackH264(96, []byte{0x01, 0x02, 0x03, 0x04}, []byte{0x05, 0x06}) track, _ := gortsplib.NewTrackH264(96, []byte{0x01, 0x02, 0x03, 0x04}, []byte{0x05, 0x06})
sh.stream = gortsplib.NewServerStream(gortsplib.Tracks{track})
return &base.Response{ return &base.Response{
StatusCode: base.StatusOK, StatusCode: base.StatusOK,
}, gortsplib.Tracks{track}.Write(), nil }, sh.stream, nil
} }
func (sh *testServer) OnSetup(ctx *gortsplib.ServerHandlerOnSetupCtx) (*base.Response, *uint32, error) { func (sh *testServer) OnSetup(ctx *gortsplib.ServerHandlerOnSetupCtx) (*base.Response, *gortsplib.ServerStream, error) {
if sh.done != nil { if sh.done != nil {
close(sh.done) close(sh.done)
} }
return &base.Response{ return &base.Response{
StatusCode: base.StatusOK, StatusCode: base.StatusOK,
}, nil, nil }, sh.stream, nil
} }
func (sh *testServer) OnPlay(ctx *gortsplib.ServerHandlerOnPlayCtx) (*base.Response, error) { func (sh *testServer) OnPlay(ctx *gortsplib.ServerHandlerOnPlayCtx) (*base.Response, error) {
go func() { go func() {
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
ctx.Session.WriteFrame(0, gortsplib.StreamTypeRTP, []byte{0x01, 0x02, 0x03, 0x04}) sh.stream.WriteFrame(0, gortsplib.StreamTypeRTP, []byte{0x01, 0x02, 0x03, 0x04})
}() }()
return &base.Response{ return &base.Response{
@ -125,8 +128,10 @@ func TestSourceRTSP(t *testing.T) {
go func() { go func() {
defer close(readDone) defer close(readDone)
conn.ReadFrames(func(trackID int, streamType gortsplib.StreamType, payload []byte) { conn.ReadFrames(func(trackID int, streamType gortsplib.StreamType, payload []byte) {
require.Equal(t, []byte{0x01, 0x02, 0x03, 0x04}, payload) if streamType == gortsplib.StreamTypeRTP {
close(received) require.Equal(t, []byte{0x01, 0x02, 0x03, 0x04}, payload)
close(received)
}
}) })
}() }()

View file

@ -2,7 +2,6 @@ package main
import ( import (
"io/ioutil" "io/ioutil"
"net"
"os" "os"
"os/exec" "os/exec"
"path/filepath" "path/filepath"
@ -13,41 +12,6 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
var ownDockerIP = func() string {
out, err := exec.Command("docker", "network", "inspect", "bridge",
"-f", "{{range .IPAM.Config}}{{.Subnet}}{{end}}").Output()
if err != nil {
panic(err)
}
_, ipnet, err := net.ParseCIDR(string(out[:len(out)-1]))
if err != nil {
panic(err)
}
ifaces, err := net.Interfaces()
if err != nil {
panic(err)
}
for _, i := range ifaces {
addrs, err := i.Addrs()
if err != nil {
continue
}
for _, addr := range addrs {
if v, ok := addr.(*net.IPNet); ok {
if ipnet.Contains(v.IP) {
return v.IP.String()
}
}
}
}
panic("IP not found")
}()
type container struct { type container struct {
name string name string
} }
@ -60,8 +24,10 @@ func newContainer(image string, name string, args []string) (*container, error)
exec.Command("docker", "kill", "rtsp-simple-server-test-"+name).Run() exec.Command("docker", "kill", "rtsp-simple-server-test-"+name).Run()
exec.Command("docker", "wait", "rtsp-simple-server-test-"+name).Run() exec.Command("docker", "wait", "rtsp-simple-server-test-"+name).Run()
// --network=host is needed to test multicast
cmd := []string{ cmd := []string{
"docker", "run", "docker", "run",
"--network=host",
"--name=rtsp-simple-server-test-" + name, "--name=rtsp-simple-server-test-" + name,
"rtsp-simple-server-test-" + image, "rtsp-simple-server-test-" + image,
} }
@ -203,7 +169,7 @@ func TestHotReloading(t *testing.T) {
func() { func() {
cnt1, err := newContainer("ffmpeg", "dest", []string{ cnt1, err := newContainer("ffmpeg", "dest", []string{
"-i", "rtsp://" + ownDockerIP + ":8554/test1", "-i", "rtsp://localhost:8554/test1",
"-vframes", "1", "-vframes", "1",
"-f", "image2", "-f", "image2",
"-y", "/dev/null", "-y", "/dev/null",
@ -226,7 +192,7 @@ func TestHotReloading(t *testing.T) {
func() { func() {
cnt1, err := newContainer("ffmpeg", "dest", []string{ cnt1, err := newContainer("ffmpeg", "dest", []string{
"-i", "rtsp://" + ownDockerIP + ":8554/test1", "-i", "rtsp://localhost:8554/test1",
"-vframes", "1", "-vframes", "1",
"-f", "image2", "-f", "image2",
"-y", "/dev/null", "-y", "/dev/null",
@ -238,7 +204,7 @@ func TestHotReloading(t *testing.T) {
func() { func() {
cnt1, err := newContainer("ffmpeg", "dest", []string{ cnt1, err := newContainer("ffmpeg", "dest", []string{
"-i", "rtsp://" + ownDockerIP + ":8554/test2", "-i", "rtsp://localhost:8554/test2",
"-vframes", "1", "-vframes", "1",
"-f", "image2", "-f", "image2",
"-y", "/dev/null", "-y", "/dev/null",

View file

@ -43,9 +43,10 @@ rtspDisable: no
# supported RTSP stream protocols. # supported RTSP stream protocols.
# UDP is the most performant, but can cause problems if there's a NAT between # UDP is the most performant, but can cause problems if there's a NAT between
# server and clients, and doesn't support encryption. # server and clients, and doesn't support encryption.
# UDP-multicast allows to save bandwidth when clients are all in the same LAN.
# TCP is the most versatile, and does support encryption. # TCP is the most versatile, and does support encryption.
# The handshake is always performed with TCP. # The handshake is always performed with TCP.
protocols: [udp, tcp] protocols: [udp, multicast, tcp]
# encrypt handshake and TCP streams with TLS (RTSPS). # encrypt handshake and TCP streams with TLS (RTSPS).
# available values are "no", "strict", "optional". # available values are "no", "strict", "optional".
encryption: no encryption: no