forked from External/mediamtx
This commit is contained in:
parent
80a133afc9
commit
caa9fa6ca0
6 changed files with 373 additions and 58 deletions
10
README.md
10
README.md
|
|
@ -1180,12 +1180,20 @@ The JWT is expected to contain the `mediamtx_permissions` scope, with a list of
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
Clients are expected to pass the JWT in query parameters, for instance:
|
Clients are expected to pass the JWT in the Authorization header (in case of HLS and WebRTC) or in query parameters (in case of any other protocol), for instance (RTSP):
|
||||||
|
|
||||||
```
|
```
|
||||||
ffmpeg -re -stream_loop -1 -i file.ts -c copy -f rtsp rtsp://localhost:8554/mystream?jwt=MY_JWT
|
ffmpeg -re -stream_loop -1 -i file.ts -c copy -f rtsp rtsp://localhost:8554/mystream?jwt=MY_JWT
|
||||||
```
|
```
|
||||||
|
|
||||||
|
For instance (HLS):
|
||||||
|
|
||||||
|
```
|
||||||
|
GET /mypath/index.m3u8 HTTP/1.1
|
||||||
|
Host: example.com
|
||||||
|
Authorization: Bearer MY_JWT
|
||||||
|
```
|
||||||
|
|
||||||
Here's a tutorial on how to setup the [Keycloak identity server](https://www.keycloak.org/) in order to provide such JWTs:
|
Here's a tutorial on how to setup the [Keycloak identity server](https://www.keycloak.org/) in order to provide such JWTs:
|
||||||
|
|
||||||
1. Start Keycloak:
|
1. Start Keycloak:
|
||||||
|
|
|
||||||
|
|
@ -5,6 +5,7 @@ import (
|
||||||
"errors"
|
"errors"
|
||||||
"net"
|
"net"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"net/url"
|
||||||
gopath "path"
|
gopath "path"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
@ -36,6 +37,17 @@ func mergePathAndQuery(path string, rawQuery string) string {
|
||||||
return res
|
return res
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func addJWTFromAuthorization(rawQuery string, auth string) string {
|
||||||
|
jwt := strings.TrimPrefix(auth, "Bearer ")
|
||||||
|
if rawQuery != "" {
|
||||||
|
if v, err := url.ParseQuery(rawQuery); err == nil && v.Get("jwt") == "" {
|
||||||
|
v.Set("jwt", jwt)
|
||||||
|
return v.Encode()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return url.Values{"jwt": []string{jwt}}.Encode()
|
||||||
|
}
|
||||||
|
|
||||||
type httpServer struct {
|
type httpServer struct {
|
||||||
address string
|
address string
|
||||||
encryption bool
|
encryption bool
|
||||||
|
|
@ -145,10 +157,15 @@ func (s *httpServer) onRequest(ctx *gin.Context) {
|
||||||
|
|
||||||
user, pass, hasCredentials := ctx.Request.BasicAuth()
|
user, pass, hasCredentials := ctx.Request.BasicAuth()
|
||||||
|
|
||||||
|
q := ctx.Request.URL.RawQuery
|
||||||
|
if h := ctx.Request.Header.Get("Authorization"); strings.HasPrefix(h, "Bearer ") {
|
||||||
|
q = addJWTFromAuthorization(q, h)
|
||||||
|
}
|
||||||
|
|
||||||
pathConf, err := s.pathManager.FindPathConf(defs.PathFindPathConfReq{
|
pathConf, err := s.pathManager.FindPathConf(defs.PathFindPathConfReq{
|
||||||
AccessRequest: defs.PathAccessRequest{
|
AccessRequest: defs.PathAccessRequest{
|
||||||
Name: dir,
|
Name: dir,
|
||||||
Query: ctx.Request.URL.RawQuery,
|
Query: q,
|
||||||
Publish: false,
|
Publish: false,
|
||||||
IP: net.ParseIP(ctx.ClientIP()),
|
IP: net.ParseIP(ctx.ClientIP()),
|
||||||
User: user,
|
User: user,
|
||||||
|
|
|
||||||
|
|
@ -11,7 +11,6 @@ import (
|
||||||
"github.com/bluenviron/gohlslib"
|
"github.com/bluenviron/gohlslib"
|
||||||
"github.com/bluenviron/gohlslib/pkg/codecs"
|
"github.com/bluenviron/gohlslib/pkg/codecs"
|
||||||
"github.com/bluenviron/gortsplib/v4/pkg/description"
|
"github.com/bluenviron/gortsplib/v4/pkg/description"
|
||||||
"github.com/bluenviron/mediamtx/internal/auth"
|
|
||||||
"github.com/bluenviron/mediamtx/internal/conf"
|
"github.com/bluenviron/mediamtx/internal/conf"
|
||||||
"github.com/bluenviron/mediamtx/internal/defs"
|
"github.com/bluenviron/mediamtx/internal/defs"
|
||||||
"github.com/bluenviron/mediamtx/internal/externalcmd"
|
"github.com/bluenviron/mediamtx/internal/externalcmd"
|
||||||
|
|
@ -49,21 +48,16 @@ func (pa *dummyPath) RemoveReader(_ defs.PathRemoveReaderReq) {
|
||||||
}
|
}
|
||||||
|
|
||||||
type dummyPathManager struct {
|
type dummyPathManager struct {
|
||||||
stream *stream.Stream
|
findPathConf func(req defs.PathFindPathConfReq) (*conf.Path, error)
|
||||||
|
addReader func(req defs.PathAddReaderReq) (defs.Path, *stream.Stream, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (pm *dummyPathManager) FindPathConf(req defs.PathFindPathConfReq) (*conf.Path, error) {
|
func (pm *dummyPathManager) FindPathConf(req defs.PathFindPathConfReq) (*conf.Path, error) {
|
||||||
if req.AccessRequest.User != "myuser" || req.AccessRequest.Pass != "mypass" {
|
return pm.findPathConf(req)
|
||||||
return nil, auth.Error{}
|
|
||||||
}
|
|
||||||
return &conf.Path{}, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (pm *dummyPathManager) AddReader(req defs.PathAddReaderReq) (defs.Path, *stream.Stream, error) {
|
func (pm *dummyPathManager) AddReader(req defs.PathAddReaderReq) (defs.Path, *stream.Stream, error) {
|
||||||
if req.AccessRequest.Name == "nonexisting" {
|
return pm.addReader(req)
|
||||||
return nil, nil, fmt.Errorf("not found")
|
|
||||||
}
|
|
||||||
return &dummyPath{}, pm.stream, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestServerNotFound(t *testing.T) {
|
func TestServerNotFound(t *testing.T) {
|
||||||
|
|
@ -72,6 +66,19 @@ func TestServerNotFound(t *testing.T) {
|
||||||
"always remux on",
|
"always remux on",
|
||||||
} {
|
} {
|
||||||
t.Run(ca, func(t *testing.T) {
|
t.Run(ca, func(t *testing.T) {
|
||||||
|
pm := &dummyPathManager{
|
||||||
|
findPathConf: func(req defs.PathFindPathConfReq) (*conf.Path, error) {
|
||||||
|
require.Equal(t, "nonexisting", req.AccessRequest.Name)
|
||||||
|
require.Equal(t, "myuser", req.AccessRequest.User)
|
||||||
|
require.Equal(t, "mypass", req.AccessRequest.Pass)
|
||||||
|
return &conf.Path{}, nil
|
||||||
|
},
|
||||||
|
addReader: func(req defs.PathAddReaderReq) (defs.Path, *stream.Stream, error) {
|
||||||
|
require.Equal(t, "nonexisting", req.AccessRequest.Name)
|
||||||
|
return nil, nil, fmt.Errorf("not found")
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
s := &Server{
|
s := &Server{
|
||||||
Address: "127.0.0.1:8888",
|
Address: "127.0.0.1:8888",
|
||||||
Encryption: false,
|
Encryption: false,
|
||||||
|
|
@ -88,7 +95,7 @@ func TestServerNotFound(t *testing.T) {
|
||||||
Directory: "",
|
Directory: "",
|
||||||
ReadTimeout: conf.StringDuration(10 * time.Second),
|
ReadTimeout: conf.StringDuration(10 * time.Second),
|
||||||
WriteQueueSize: 512,
|
WriteQueueSize: 512,
|
||||||
PathManager: &dummyPathManager{},
|
PathManager: pm,
|
||||||
Parent: test.NilLogger,
|
Parent: test.NilLogger,
|
||||||
}
|
}
|
||||||
err := s.Initialize()
|
err := s.Initialize()
|
||||||
|
|
@ -126,7 +133,7 @@ func TestServerRead(t *testing.T) {
|
||||||
t.Run("always remux off", func(t *testing.T) {
|
t.Run("always remux off", func(t *testing.T) {
|
||||||
desc := &description.Session{Medias: []*description.Media{test.MediaH264}}
|
desc := &description.Session{Medias: []*description.Media{test.MediaH264}}
|
||||||
|
|
||||||
stream, err := stream.New(
|
str, err := stream.New(
|
||||||
1460,
|
1460,
|
||||||
desc,
|
desc,
|
||||||
true,
|
true,
|
||||||
|
|
@ -134,7 +141,18 @@ func TestServerRead(t *testing.T) {
|
||||||
)
|
)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
pathManager := &dummyPathManager{stream: stream}
|
pm := &dummyPathManager{
|
||||||
|
findPathConf: func(req defs.PathFindPathConfReq) (*conf.Path, error) {
|
||||||
|
require.Equal(t, "mystream", req.AccessRequest.Name)
|
||||||
|
require.Equal(t, "myuser", req.AccessRequest.User)
|
||||||
|
require.Equal(t, "mypass", req.AccessRequest.Pass)
|
||||||
|
return &conf.Path{}, nil
|
||||||
|
},
|
||||||
|
addReader: func(req defs.PathAddReaderReq) (defs.Path, *stream.Stream, error) {
|
||||||
|
require.Equal(t, "mystream", req.AccessRequest.Name)
|
||||||
|
return &dummyPath{}, str, nil
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
s := &Server{
|
s := &Server{
|
||||||
Address: "127.0.0.1:8888",
|
Address: "127.0.0.1:8888",
|
||||||
|
|
@ -152,7 +170,7 @@ func TestServerRead(t *testing.T) {
|
||||||
Directory: "",
|
Directory: "",
|
||||||
ReadTimeout: conf.StringDuration(10 * time.Second),
|
ReadTimeout: conf.StringDuration(10 * time.Second),
|
||||||
WriteQueueSize: 512,
|
WriteQueueSize: 512,
|
||||||
PathManager: pathManager,
|
PathManager: pm,
|
||||||
Parent: test.NilLogger,
|
Parent: test.NilLogger,
|
||||||
}
|
}
|
||||||
err = s.Initialize()
|
err = s.Initialize()
|
||||||
|
|
@ -192,7 +210,7 @@ func TestServerRead(t *testing.T) {
|
||||||
go func() {
|
go func() {
|
||||||
time.Sleep(100 * time.Millisecond)
|
time.Sleep(100 * time.Millisecond)
|
||||||
for i := 0; i < 4; i++ {
|
for i := 0; i < 4; i++ {
|
||||||
stream.WriteUnit(test.MediaH264, test.FormatH264, &unit.H264{
|
str.WriteUnit(test.MediaH264, test.FormatH264, &unit.H264{
|
||||||
Base: unit.Base{
|
Base: unit.Base{
|
||||||
NTP: time.Time{},
|
NTP: time.Time{},
|
||||||
PTS: time.Duration(i) * time.Second,
|
PTS: time.Duration(i) * time.Second,
|
||||||
|
|
@ -210,7 +228,7 @@ func TestServerRead(t *testing.T) {
|
||||||
t.Run("always remux on", func(t *testing.T) {
|
t.Run("always remux on", func(t *testing.T) {
|
||||||
desc := &description.Session{Medias: []*description.Media{test.MediaH264}}
|
desc := &description.Session{Medias: []*description.Media{test.MediaH264}}
|
||||||
|
|
||||||
stream, err := stream.New(
|
str, err := stream.New(
|
||||||
1460,
|
1460,
|
||||||
desc,
|
desc,
|
||||||
true,
|
true,
|
||||||
|
|
@ -218,7 +236,18 @@ func TestServerRead(t *testing.T) {
|
||||||
)
|
)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
pathManager := &dummyPathManager{stream: stream}
|
pm := &dummyPathManager{
|
||||||
|
findPathConf: func(req defs.PathFindPathConfReq) (*conf.Path, error) {
|
||||||
|
require.Equal(t, "mystream", req.AccessRequest.Name)
|
||||||
|
require.Equal(t, "myuser", req.AccessRequest.User)
|
||||||
|
require.Equal(t, "mypass", req.AccessRequest.Pass)
|
||||||
|
return &conf.Path{}, nil
|
||||||
|
},
|
||||||
|
addReader: func(req defs.PathAddReaderReq) (defs.Path, *stream.Stream, error) {
|
||||||
|
require.Equal(t, "mystream", req.AccessRequest.Name)
|
||||||
|
return &dummyPath{}, str, nil
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
s := &Server{
|
s := &Server{
|
||||||
Address: "127.0.0.1:8888",
|
Address: "127.0.0.1:8888",
|
||||||
|
|
@ -236,7 +265,7 @@ func TestServerRead(t *testing.T) {
|
||||||
Directory: "",
|
Directory: "",
|
||||||
ReadTimeout: conf.StringDuration(10 * time.Second),
|
ReadTimeout: conf.StringDuration(10 * time.Second),
|
||||||
WriteQueueSize: 512,
|
WriteQueueSize: 512,
|
||||||
PathManager: pathManager,
|
PathManager: pm,
|
||||||
Parent: test.NilLogger,
|
Parent: test.NilLogger,
|
||||||
}
|
}
|
||||||
err = s.Initialize()
|
err = s.Initialize()
|
||||||
|
|
@ -248,7 +277,7 @@ func TestServerRead(t *testing.T) {
|
||||||
time.Sleep(100 * time.Millisecond)
|
time.Sleep(100 * time.Millisecond)
|
||||||
|
|
||||||
for i := 0; i < 4; i++ {
|
for i := 0; i < 4; i++ {
|
||||||
stream.WriteUnit(test.MediaH264, test.FormatH264, &unit.H264{
|
str.WriteUnit(test.MediaH264, test.FormatH264, &unit.H264{
|
||||||
Base: unit.Base{
|
Base: unit.Base{
|
||||||
NTP: time.Time{},
|
NTP: time.Time{},
|
||||||
PTS: time.Duration(i) * time.Second,
|
PTS: time.Duration(i) * time.Second,
|
||||||
|
|
@ -293,14 +322,10 @@ func TestServerRead(t *testing.T) {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestDirectory(t *testing.T) {
|
func TestServerReadAuthorizationHeader(t *testing.T) {
|
||||||
dir, err := os.MkdirTemp("", "mediamtx-playback")
|
|
||||||
require.NoError(t, err)
|
|
||||||
defer os.RemoveAll(dir)
|
|
||||||
|
|
||||||
desc := &description.Session{Medias: []*description.Media{test.MediaH264}}
|
desc := &description.Session{Medias: []*description.Media{test.MediaH264}}
|
||||||
|
|
||||||
stream, err := stream.New(
|
str, err := stream.New(
|
||||||
1460,
|
1460,
|
||||||
desc,
|
desc,
|
||||||
true,
|
true,
|
||||||
|
|
@ -308,7 +333,111 @@ func TestDirectory(t *testing.T) {
|
||||||
)
|
)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
pathManager := &dummyPathManager{stream: stream}
|
pm := &dummyPathManager{
|
||||||
|
findPathConf: func(req defs.PathFindPathConfReq) (*conf.Path, error) {
|
||||||
|
require.Equal(t, "jwt=testing", req.AccessRequest.Query)
|
||||||
|
return &conf.Path{}, nil
|
||||||
|
},
|
||||||
|
addReader: func(_ defs.PathAddReaderReq) (defs.Path, *stream.Stream, error) {
|
||||||
|
return &dummyPath{}, str, nil
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
s := &Server{
|
||||||
|
Address: "127.0.0.1:8888",
|
||||||
|
Encryption: false,
|
||||||
|
ServerKey: "",
|
||||||
|
ServerCert: "",
|
||||||
|
AlwaysRemux: true,
|
||||||
|
Variant: conf.HLSVariant(gohlslib.MuxerVariantMPEGTS),
|
||||||
|
SegmentCount: 7,
|
||||||
|
SegmentDuration: conf.StringDuration(1 * time.Second),
|
||||||
|
PartDuration: conf.StringDuration(200 * time.Millisecond),
|
||||||
|
SegmentMaxSize: 50 * 1024 * 1024,
|
||||||
|
AllowOrigin: "",
|
||||||
|
TrustedProxies: conf.IPNetworks{},
|
||||||
|
Directory: "",
|
||||||
|
ReadTimeout: conf.StringDuration(10 * time.Second),
|
||||||
|
WriteQueueSize: 512,
|
||||||
|
PathManager: pm,
|
||||||
|
Parent: test.NilLogger,
|
||||||
|
}
|
||||||
|
err = s.Initialize()
|
||||||
|
require.NoError(t, err)
|
||||||
|
defer s.Close()
|
||||||
|
|
||||||
|
s.PathReady(&dummyPath{})
|
||||||
|
|
||||||
|
time.Sleep(100 * time.Millisecond)
|
||||||
|
|
||||||
|
for i := 0; i < 4; i++ {
|
||||||
|
str.WriteUnit(test.MediaH264, test.FormatH264, &unit.H264{
|
||||||
|
Base: unit.Base{
|
||||||
|
NTP: time.Time{},
|
||||||
|
PTS: time.Duration(i) * time.Second,
|
||||||
|
},
|
||||||
|
AU: [][]byte{
|
||||||
|
{5, 1}, // IDR
|
||||||
|
},
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
c := &gohlslib.Client{
|
||||||
|
URI: "http://127.0.0.1:8888/mystream/index.m3u8",
|
||||||
|
OnRequest: func(r *http.Request) {
|
||||||
|
r.Header.Set("Authorization", "Bearer testing")
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
recv := make(chan struct{})
|
||||||
|
|
||||||
|
c.OnTracks = func(tracks []*gohlslib.Track) error {
|
||||||
|
require.Equal(t, []*gohlslib.Track{{
|
||||||
|
Codec: &codecs.H264{},
|
||||||
|
}}, tracks)
|
||||||
|
|
||||||
|
c.OnDataH26x(tracks[0], func(pts, dts time.Duration, au [][]byte) {
|
||||||
|
require.Equal(t, time.Duration(0), pts)
|
||||||
|
require.Equal(t, time.Duration(0), dts)
|
||||||
|
require.Equal(t, [][]byte{
|
||||||
|
test.FormatH264.SPS,
|
||||||
|
test.FormatH264.PPS,
|
||||||
|
{5, 1},
|
||||||
|
}, au)
|
||||||
|
close(recv)
|
||||||
|
})
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
err = c.Start()
|
||||||
|
require.NoError(t, err)
|
||||||
|
defer func() { <-c.Wait() }()
|
||||||
|
defer c.Close()
|
||||||
|
|
||||||
|
<-recv
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestDirectory(t *testing.T) {
|
||||||
|
dir, err := os.MkdirTemp("", "mediamtx-playback")
|
||||||
|
require.NoError(t, err)
|
||||||
|
defer os.RemoveAll(dir)
|
||||||
|
|
||||||
|
desc := &description.Session{Medias: []*description.Media{test.MediaH264}}
|
||||||
|
|
||||||
|
str, err := stream.New(
|
||||||
|
1460,
|
||||||
|
desc,
|
||||||
|
true,
|
||||||
|
test.NilLogger,
|
||||||
|
)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
pm := &dummyPathManager{
|
||||||
|
addReader: func(_ defs.PathAddReaderReq) (defs.Path, *stream.Stream, error) {
|
||||||
|
return &dummyPath{}, str, nil
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
s := &Server{
|
s := &Server{
|
||||||
Address: "127.0.0.1:8888",
|
Address: "127.0.0.1:8888",
|
||||||
|
|
@ -326,7 +455,7 @@ func TestDirectory(t *testing.T) {
|
||||||
Directory: filepath.Join(dir, "mydir"),
|
Directory: filepath.Join(dir, "mydir"),
|
||||||
ReadTimeout: conf.StringDuration(10 * time.Second),
|
ReadTimeout: conf.StringDuration(10 * time.Second),
|
||||||
WriteQueueSize: 512,
|
WriteQueueSize: 512,
|
||||||
PathManager: pathManager,
|
PathManager: pm,
|
||||||
Parent: test.NilLogger,
|
Parent: test.NilLogger,
|
||||||
}
|
}
|
||||||
err = s.Initialize()
|
err = s.Initialize()
|
||||||
|
|
|
||||||
|
|
@ -7,6 +7,7 @@ import (
|
||||||
"io"
|
"io"
|
||||||
"net"
|
"net"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"net/url"
|
||||||
"regexp"
|
"regexp"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
@ -59,6 +60,17 @@ func sessionLocation(publish bool, path string, secret uuid.UUID) string {
|
||||||
return ret
|
return ret
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func addJWTFromAuthorization(rawQuery string, auth string) string {
|
||||||
|
jwt := strings.TrimPrefix(auth, "Bearer ")
|
||||||
|
if rawQuery != "" {
|
||||||
|
if v, err := url.ParseQuery(rawQuery); err == nil && v.Get("jwt") == "" {
|
||||||
|
v.Set("jwt", jwt)
|
||||||
|
return v.Encode()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return url.Values{"jwt": []string{jwt}}.Encode()
|
||||||
|
}
|
||||||
|
|
||||||
type httpServer struct {
|
type httpServer struct {
|
||||||
address string
|
address string
|
||||||
encryption bool
|
encryption bool
|
||||||
|
|
@ -110,10 +122,15 @@ func (s *httpServer) close() {
|
||||||
func (s *httpServer) checkAuthOutsideSession(ctx *gin.Context, pathName string, publish bool) bool {
|
func (s *httpServer) checkAuthOutsideSession(ctx *gin.Context, pathName string, publish bool) bool {
|
||||||
user, pass, hasCredentials := ctx.Request.BasicAuth()
|
user, pass, hasCredentials := ctx.Request.BasicAuth()
|
||||||
|
|
||||||
|
q := ctx.Request.URL.RawQuery
|
||||||
|
if h := ctx.Request.Header.Get("Authorization"); strings.HasPrefix(h, "Bearer ") {
|
||||||
|
q = addJWTFromAuthorization(q, h)
|
||||||
|
}
|
||||||
|
|
||||||
_, err := s.pathManager.FindPathConf(defs.PathFindPathConfReq{
|
_, err := s.pathManager.FindPathConf(defs.PathFindPathConfReq{
|
||||||
AccessRequest: defs.PathAccessRequest{
|
AccessRequest: defs.PathAccessRequest{
|
||||||
Name: pathName,
|
Name: pathName,
|
||||||
Query: ctx.Request.URL.RawQuery,
|
Query: q,
|
||||||
Publish: publish,
|
Publish: publish,
|
||||||
IP: net.ParseIP(ctx.ClientIP()),
|
IP: net.ParseIP(ctx.ClientIP()),
|
||||||
User: user,
|
User: user,
|
||||||
|
|
@ -178,10 +195,15 @@ func (s *httpServer) onWHIPPost(ctx *gin.Context, pathName string, publish bool)
|
||||||
|
|
||||||
user, pass, _ := ctx.Request.BasicAuth()
|
user, pass, _ := ctx.Request.BasicAuth()
|
||||||
|
|
||||||
|
q := ctx.Request.URL.RawQuery
|
||||||
|
if h := ctx.Request.Header.Get("Authorization"); strings.HasPrefix(h, "Bearer ") {
|
||||||
|
q = addJWTFromAuthorization(q, h)
|
||||||
|
}
|
||||||
|
|
||||||
res := s.parent.newSession(webRTCNewSessionReq{
|
res := s.parent.newSession(webRTCNewSessionReq{
|
||||||
pathName: pathName,
|
pathName: pathName,
|
||||||
remoteAddr: httpp.RemoteAddr(ctx),
|
remoteAddr: httpp.RemoteAddr(ctx),
|
||||||
query: ctx.Request.URL.RawQuery,
|
query: q,
|
||||||
user: user,
|
user: user,
|
||||||
pass: pass,
|
pass: pass,
|
||||||
offer: offer,
|
offer: offer,
|
||||||
|
|
|
||||||
|
|
@ -12,7 +12,6 @@ import (
|
||||||
"github.com/bluenviron/gortsplib/v4/pkg/description"
|
"github.com/bluenviron/gortsplib/v4/pkg/description"
|
||||||
"github.com/bluenviron/gortsplib/v4/pkg/format"
|
"github.com/bluenviron/gortsplib/v4/pkg/format"
|
||||||
"github.com/bluenviron/mediamtx/internal/asyncwriter"
|
"github.com/bluenviron/mediamtx/internal/asyncwriter"
|
||||||
"github.com/bluenviron/mediamtx/internal/auth"
|
|
||||||
"github.com/bluenviron/mediamtx/internal/conf"
|
"github.com/bluenviron/mediamtx/internal/conf"
|
||||||
"github.com/bluenviron/mediamtx/internal/defs"
|
"github.com/bluenviron/mediamtx/internal/defs"
|
||||||
"github.com/bluenviron/mediamtx/internal/externalcmd"
|
"github.com/bluenviron/mediamtx/internal/externalcmd"
|
||||||
|
|
@ -72,34 +71,32 @@ func (p *dummyPath) RemoveReader(_ defs.PathRemoveReaderReq) {
|
||||||
}
|
}
|
||||||
|
|
||||||
type dummyPathManager struct {
|
type dummyPathManager struct {
|
||||||
path *dummyPath
|
findPathConf func(req defs.PathFindPathConfReq) (*conf.Path, error)
|
||||||
|
addPublisher func(req defs.PathAddPublisherReq) (defs.Path, error)
|
||||||
|
addReader func(req defs.PathAddReaderReq) (defs.Path, *stream.Stream, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (pm *dummyPathManager) FindPathConf(req defs.PathFindPathConfReq) (*conf.Path, error) {
|
func (pm *dummyPathManager) FindPathConf(req defs.PathFindPathConfReq) (*conf.Path, error) {
|
||||||
if req.AccessRequest.User != "myuser" || req.AccessRequest.Pass != "mypass" {
|
return pm.findPathConf(req)
|
||||||
return nil, auth.Error{}
|
|
||||||
}
|
|
||||||
return &conf.Path{}, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (pm *dummyPathManager) AddPublisher(_ defs.PathAddPublisherReq) (defs.Path, error) {
|
func (pm *dummyPathManager) AddPublisher(req defs.PathAddPublisherReq) (defs.Path, error) {
|
||||||
return pm.path, nil
|
return pm.addPublisher(req)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (pm *dummyPathManager) AddReader(req defs.PathAddReaderReq) (defs.Path, *stream.Stream, error) {
|
func (pm *dummyPathManager) AddReader(req defs.PathAddReaderReq) (defs.Path, *stream.Stream, error) {
|
||||||
if req.AccessRequest.Name == "nonexisting" {
|
return pm.addReader(req)
|
||||||
return nil, nil, defs.PathNoOnePublishingError{}
|
|
||||||
}
|
|
||||||
return pm.path, pm.path.stream, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func initializeTestServer(t *testing.T) *Server {
|
func initializeTestServer(t *testing.T) *Server {
|
||||||
path := &dummyPath{
|
pm := &dummyPathManager{
|
||||||
streamCreated: make(chan struct{}),
|
findPathConf: func(req defs.PathFindPathConfReq) (*conf.Path, error) {
|
||||||
|
require.Equal(t, "myuser", req.AccessRequest.User)
|
||||||
|
require.Equal(t, "mypass", req.AccessRequest.Pass)
|
||||||
|
return &conf.Path{}, nil
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
pathManager := &dummyPathManager{path: path}
|
|
||||||
|
|
||||||
s := &Server{
|
s := &Server{
|
||||||
Address: "127.0.0.1:8886",
|
Address: "127.0.0.1:8886",
|
||||||
Encryption: false,
|
Encryption: false,
|
||||||
|
|
@ -118,7 +115,7 @@ func initializeTestServer(t *testing.T) *Server {
|
||||||
HandshakeTimeout: conf.StringDuration(10 * time.Second),
|
HandshakeTimeout: conf.StringDuration(10 * time.Second),
|
||||||
TrackGatherTimeout: conf.StringDuration(2 * time.Second),
|
TrackGatherTimeout: conf.StringDuration(2 * time.Second),
|
||||||
ExternalCmdPool: nil,
|
ExternalCmdPool: nil,
|
||||||
PathManager: pathManager,
|
PathManager: pm,
|
||||||
Parent: test.NilLogger,
|
Parent: test.NilLogger,
|
||||||
}
|
}
|
||||||
err := s.Initialize()
|
err := s.Initialize()
|
||||||
|
|
@ -174,7 +171,13 @@ func TestServerOptionsPreflight(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestServerOptionsICEServer(t *testing.T) {
|
func TestServerOptionsICEServer(t *testing.T) {
|
||||||
pathManager := &dummyPathManager{}
|
pathManager := &dummyPathManager{
|
||||||
|
findPathConf: func(req defs.PathFindPathConfReq) (*conf.Path, error) {
|
||||||
|
require.Equal(t, "myuser", req.AccessRequest.User)
|
||||||
|
require.Equal(t, "mypass", req.AccessRequest.Pass)
|
||||||
|
return &conf.Path{}, nil
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
s := &Server{
|
s := &Server{
|
||||||
Address: "127.0.0.1:8886",
|
Address: "127.0.0.1:8886",
|
||||||
|
|
@ -234,7 +237,20 @@ func TestServerPublish(t *testing.T) {
|
||||||
streamCreated: make(chan struct{}),
|
streamCreated: make(chan struct{}),
|
||||||
}
|
}
|
||||||
|
|
||||||
pathManager := &dummyPathManager{path: path}
|
pathManager := &dummyPathManager{
|
||||||
|
findPathConf: func(req defs.PathFindPathConfReq) (*conf.Path, error) {
|
||||||
|
require.Equal(t, "teststream", req.AccessRequest.Name)
|
||||||
|
require.Equal(t, "myuser", req.AccessRequest.User)
|
||||||
|
require.Equal(t, "mypass", req.AccessRequest.Pass)
|
||||||
|
return &conf.Path{}, nil
|
||||||
|
},
|
||||||
|
addPublisher: func(req defs.PathAddPublisherReq) (defs.Path, error) {
|
||||||
|
require.Equal(t, "teststream", req.AccessRequest.Name)
|
||||||
|
require.Equal(t, "myuser", req.AccessRequest.User)
|
||||||
|
require.Equal(t, "mypass", req.AccessRequest.Pass)
|
||||||
|
return path, nil
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
s := &Server{
|
s := &Server{
|
||||||
Address: "127.0.0.1:8886",
|
Address: "127.0.0.1:8886",
|
||||||
|
|
@ -486,7 +502,7 @@ func TestServerRead(t *testing.T) {
|
||||||
t.Run(ca.name, func(t *testing.T) {
|
t.Run(ca.name, func(t *testing.T) {
|
||||||
desc := &description.Session{Medias: ca.medias}
|
desc := &description.Session{Medias: ca.medias}
|
||||||
|
|
||||||
stream, err := stream.New(
|
str, err := stream.New(
|
||||||
1460,
|
1460,
|
||||||
desc,
|
desc,
|
||||||
reflect.TypeOf(ca.unit) != reflect.TypeOf(&unit.Generic{}),
|
reflect.TypeOf(ca.unit) != reflect.TypeOf(&unit.Generic{}),
|
||||||
|
|
@ -494,9 +510,22 @@ func TestServerRead(t *testing.T) {
|
||||||
)
|
)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
path := &dummyPath{stream: stream}
|
path := &dummyPath{stream: str}
|
||||||
|
|
||||||
pathManager := &dummyPathManager{path: path}
|
pathManager := &dummyPathManager{
|
||||||
|
findPathConf: func(req defs.PathFindPathConfReq) (*conf.Path, error) {
|
||||||
|
require.Equal(t, "teststream", req.AccessRequest.Name)
|
||||||
|
require.Equal(t, "myuser", req.AccessRequest.User)
|
||||||
|
require.Equal(t, "mypass", req.AccessRequest.Pass)
|
||||||
|
return &conf.Path{}, nil
|
||||||
|
},
|
||||||
|
addReader: func(req defs.PathAddReaderReq) (defs.Path, *stream.Stream, error) {
|
||||||
|
require.Equal(t, "teststream", req.AccessRequest.Name)
|
||||||
|
require.Equal(t, "myuser", req.AccessRequest.User)
|
||||||
|
require.Equal(t, "mypass", req.AccessRequest.Pass)
|
||||||
|
return path, str, nil
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
s := &Server{
|
s := &Server{
|
||||||
Address: "127.0.0.1:8886",
|
Address: "127.0.0.1:8886",
|
||||||
|
|
@ -556,9 +585,9 @@ func TestServerRead(t *testing.T) {
|
||||||
|
|
||||||
if g, ok := r.Interface().(*unit.Generic); ok {
|
if g, ok := r.Interface().(*unit.Generic); ok {
|
||||||
clone := *g.RTPPackets[0]
|
clone := *g.RTPPackets[0]
|
||||||
stream.WriteRTPPacket(desc.Medias[0], desc.Medias[0].Formats[0], &clone, time.Time{}, 0)
|
str.WriteRTPPacket(desc.Medias[0], desc.Medias[0].Formats[0], &clone, time.Time{}, 0)
|
||||||
} else {
|
} else {
|
||||||
stream.WriteUnit(desc.Medias[0], desc.Medias[0].Formats[0], r.Interface().(unit.Unit))
|
str.WriteUnit(desc.Medias[0], desc.Medias[0].Formats[0], r.Interface().(unit.Unit))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
@ -574,8 +603,118 @@ func TestServerRead(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestServerPostNotFound(t *testing.T) {
|
func TestServerReadAuthorizationHeader(t *testing.T) {
|
||||||
s := initializeTestServer(t)
|
desc := &description.Session{Medias: []*description.Media{test.MediaH264}}
|
||||||
|
|
||||||
|
str, err := stream.New(
|
||||||
|
1460,
|
||||||
|
desc,
|
||||||
|
true,
|
||||||
|
test.NilLogger,
|
||||||
|
)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
path := &dummyPath{stream: str}
|
||||||
|
|
||||||
|
pm := &dummyPathManager{
|
||||||
|
findPathConf: func(req defs.PathFindPathConfReq) (*conf.Path, error) {
|
||||||
|
require.Equal(t, "jwt=testing", req.AccessRequest.Query)
|
||||||
|
return &conf.Path{}, nil
|
||||||
|
},
|
||||||
|
addReader: func(req defs.PathAddReaderReq) (defs.Path, *stream.Stream, error) {
|
||||||
|
require.Equal(t, "jwt=testing", req.AccessRequest.Query)
|
||||||
|
return path, str, nil
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
s := &Server{
|
||||||
|
Address: "127.0.0.1:8886",
|
||||||
|
Encryption: false,
|
||||||
|
ServerKey: "",
|
||||||
|
ServerCert: "",
|
||||||
|
AllowOrigin: "",
|
||||||
|
TrustedProxies: conf.IPNetworks{},
|
||||||
|
ReadTimeout: conf.StringDuration(10 * time.Second),
|
||||||
|
WriteQueueSize: 512,
|
||||||
|
LocalUDPAddress: "127.0.0.1:8887",
|
||||||
|
LocalTCPAddress: "127.0.0.1:8887",
|
||||||
|
IPsFromInterfaces: true,
|
||||||
|
IPsFromInterfacesList: []string{},
|
||||||
|
AdditionalHosts: []string{},
|
||||||
|
ICEServers: []conf.WebRTCICEServer{},
|
||||||
|
HandshakeTimeout: conf.StringDuration(10 * time.Second),
|
||||||
|
TrackGatherTimeout: conf.StringDuration(2 * time.Second),
|
||||||
|
ExternalCmdPool: nil,
|
||||||
|
PathManager: pm,
|
||||||
|
Parent: test.NilLogger,
|
||||||
|
}
|
||||||
|
err = s.Initialize()
|
||||||
|
require.NoError(t, err)
|
||||||
|
defer s.Close()
|
||||||
|
|
||||||
|
tr := &http.Transport{}
|
||||||
|
defer tr.CloseIdleConnections()
|
||||||
|
hc := &http.Client{Transport: tr}
|
||||||
|
|
||||||
|
pc, err := pwebrtc.NewPeerConnection(pwebrtc.Configuration{})
|
||||||
|
require.NoError(t, err)
|
||||||
|
defer pc.Close() //nolint:errcheck
|
||||||
|
|
||||||
|
_, err = pc.AddTransceiverFromKind(pwebrtc.RTPCodecTypeVideo)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
offer, err := pc.CreateOffer(nil)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
req, err := http.NewRequest(http.MethodPost,
|
||||||
|
"http://localhost:8886/teststream/whep", bytes.NewReader([]byte(offer.SDP)))
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
req.Header.Set("Content-Type", "application/sdp")
|
||||||
|
req.Header.Set("Authorization", "Bearer testing")
|
||||||
|
|
||||||
|
res, err := hc.Do(req)
|
||||||
|
require.NoError(t, err)
|
||||||
|
defer res.Body.Close()
|
||||||
|
|
||||||
|
require.Equal(t, http.StatusCreated, res.StatusCode)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestServerReadNotFound(t *testing.T) {
|
||||||
|
pm := &dummyPathManager{
|
||||||
|
findPathConf: func(req defs.PathFindPathConfReq) (*conf.Path, error) {
|
||||||
|
require.Equal(t, "myuser", req.AccessRequest.User)
|
||||||
|
require.Equal(t, "mypass", req.AccessRequest.Pass)
|
||||||
|
return &conf.Path{}, nil
|
||||||
|
},
|
||||||
|
addReader: func(_ defs.PathAddReaderReq) (defs.Path, *stream.Stream, error) {
|
||||||
|
return nil, nil, defs.PathNoOnePublishingError{}
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
s := &Server{
|
||||||
|
Address: "127.0.0.1:8886",
|
||||||
|
Encryption: false,
|
||||||
|
ServerKey: "",
|
||||||
|
ServerCert: "",
|
||||||
|
AllowOrigin: "",
|
||||||
|
TrustedProxies: conf.IPNetworks{},
|
||||||
|
ReadTimeout: conf.StringDuration(10 * time.Second),
|
||||||
|
WriteQueueSize: 512,
|
||||||
|
LocalUDPAddress: "127.0.0.1:8887",
|
||||||
|
LocalTCPAddress: "127.0.0.1:8887",
|
||||||
|
IPsFromInterfaces: true,
|
||||||
|
IPsFromInterfacesList: []string{},
|
||||||
|
AdditionalHosts: []string{},
|
||||||
|
ICEServers: []conf.WebRTCICEServer{},
|
||||||
|
HandshakeTimeout: conf.StringDuration(10 * time.Second),
|
||||||
|
TrackGatherTimeout: conf.StringDuration(2 * time.Second),
|
||||||
|
ExternalCmdPool: nil,
|
||||||
|
PathManager: pm,
|
||||||
|
Parent: test.NilLogger,
|
||||||
|
}
|
||||||
|
err := s.Initialize()
|
||||||
|
require.NoError(t, err)
|
||||||
defer s.Close()
|
defer s.Close()
|
||||||
|
|
||||||
tr := &http.Transport{}
|
tr := &http.Transport{}
|
||||||
|
|
|
||||||
|
|
@ -117,7 +117,7 @@ authHTTPExclude:
|
||||||
# }
|
# }
|
||||||
# ]
|
# ]
|
||||||
# }
|
# }
|
||||||
# Users are then expected to pass the JWT as a query parameter, i.e. ?jwt=...
|
# Users are expected to pass the JWT in the Authorization header or as a query parameter.
|
||||||
# This is the JWKS URL that will be used to pull (once) the public key that allows
|
# This is the JWKS URL that will be used to pull (once) the public key that allows
|
||||||
# to validate JWTs.
|
# to validate JWTs.
|
||||||
authJWTJWKS:
|
authJWTJWKS:
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue