From 2f3baafb548deba167916eb0261bf2a67885277d Mon Sep 17 00:00:00 2001 From: aler9 <46489434+aler9@users.noreply.github.com> Date: Sat, 28 Dec 2019 22:07:03 +0100 Subject: [PATCH] initial commit --- .dockerignore | 2 + .gitignore | 1 + LICENSE | 21 +++ Makefile | 101 +++++++++++ README.md | 58 +++++- go.mod | 10 + go.sum | 17 ++ main.go | 116 ++++++++++++ rtsp/conn.go | 39 ++++ rtsp/request.go | 88 +++++++++ rtsp/request_test.go | 134 ++++++++++++++ rtsp/response.go | 95 ++++++++++ rtsp/response_test.go | 105 +++++++++++ rtsp/utils.go | 161 ++++++++++++++++ rtsp_client.go | 414 ++++++++++++++++++++++++++++++++++++++++++ rtsp_listener.go | 51 ++++++ udp_listener.go | 53 ++++++ 17 files changed, 1465 insertions(+), 1 deletion(-) create mode 100644 .dockerignore create mode 100644 .gitignore create mode 100644 LICENSE create mode 100644 Makefile create mode 100644 go.mod create mode 100644 go.sum create mode 100644 main.go create mode 100644 rtsp/conn.go create mode 100644 rtsp/request.go create mode 100644 rtsp/request_test.go create mode 100644 rtsp/response.go create mode 100644 rtsp/response_test.go create mode 100644 rtsp/utils.go create mode 100644 rtsp_client.go create mode 100644 rtsp_listener.go create mode 100644 udp_listener.go diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 00000000..70d34cec --- /dev/null +++ b/.dockerignore @@ -0,0 +1,2 @@ +.git +/release diff --git a/.gitignore b/.gitignore new file mode 100644 index 00000000..4f062d69 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +/release diff --git a/LICENSE b/LICENSE new file mode 100644 index 00000000..50bbbd83 --- /dev/null +++ b/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2019 aler9 + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/Makefile b/Makefile new file mode 100644 index 00000000..ccc3c5a5 --- /dev/null +++ b/Makefile @@ -0,0 +1,101 @@ + +.PHONY: $(shell ls) + +BASE_IMAGE = amd64/golang:1.13-alpine3.10 + +help: + @echo "usage: make [action]" + @echo "" + @echo "available actions:" + @echo "" + @echo " mod-tidy run go mod tidy" + @echo " format format source files" + @echo " test run available tests" + @echo " run run app" + @echo "" + +mod-tidy: + docker run --rm -it -v $(PWD):/s $(BASE_IMAGE) \ + sh -c "apk add git && cd /s && go get && go mod tidy" + +format: + docker run --rm -it -v $(PWD):/s $(BASE_IMAGE) \ + sh -c "cd /s && find . -type f -name '*.go' | xargs gofmt -l -w -s" + +define DOCKERFILE_TEST +FROM $(BASE_IMAGE) +RUN apk add --no-cache make git +WORKDIR /s +COPY go.mod go.sum ./ +RUN go mod download +COPY . ./ +endef +export DOCKERFILE_TEST + +test: + echo "$$DOCKERFILE_TEST" | docker build -q . -f - -t temp + docker run --rm -it \ + --name temp \ + temp \ + make test-nodocker + +IMAGES = $(shell echo test-images/*/ | xargs -n1 basename) + +test-nodocker: + $(eval export CGO_ENABLED = 0) + go test -v ./rtsp + +define DOCKERFILE_RUN +FROM $(BASE_IMAGE) +RUN apk add --no-cache git +WORKDIR /s +COPY go.mod go.sum ./ +RUN go mod download +COPY . ./ +RUN go build -o /out . +endef +export DOCKERFILE_RUN + +run: + echo "$$DOCKERFILE_RUN" | docker build -q . -f - -t temp + docker run --rm -it \ + --network=host \ + --name temp \ + temp \ + /out + +define DOCKERFILE_RELEASE +FROM $(BASE_IMAGE) +RUN apk add --no-cache zip make git tar +WORKDIR /s +COPY go.mod go.sum ./ +RUN go mod download +COPY . ./ +RUN make release-nodocker +endef +export DOCKERFILE_RELEASE + +release: + echo "$$DOCKERFILE_RELEASE" | docker build . -f - -t temp \ + && docker run --rm -it -v $(PWD):/out \ + temp sh -c "rm -rf /out/release && cp -r /s/release /out/" + +release-nodocker: + $(eval VERSION := $(shell git describe --tags)) + $(eval GOBUILD := go build -ldflags '-X "main.Version=$(VERSION)"') + rm -rf release && mkdir release + + CGO_ENABLED=0 GOOS=windows GOARCH=amd64 $(GOBUILD) -o /tmp/rtsp-simple-server.exe + cd /tmp && zip -q $(PWD)/release/rtsp-simple-server_$(VERSION)_windows_amd64.zip rtsp-simple-server.exe + + CGO_ENABLED=0 GOOS=linux GOARCH=amd64 $(GOBUILD) -o /tmp/rtsp-simple-server + tar -C /tmp -czf $(PWD)/release/rtsp-simple-server_$(VERSION)_linux_amd64.tar.gz --owner=0 --group=0 rtsp-simple-server + + CGO_ENABLED=0 GOOS=linux GOARCH=arm GOARM=6 $(GOBUILD) -o /tmp/rtsp-simple-server + tar -C /tmp -czf $(PWD)/release/rtsp-simple-server_$(VERSION)_linux_arm6.tar.gz --owner=0 --group=0 rtsp-simple-server + + CGO_ENABLED=0 GOOS=linux GOARCH=arm GOARM=7 $(GOBUILD) -o /tmp/rtsp-simple-server + tar -C /tmp -czf $(PWD)/release/rtsp-simple-server_$(VERSION)_linux_arm7.tar.gz --owner=0 --group=0 rtsp-simple-server + + CGO_ENABLED=0 GOOS=linux GOARCH=arm64 $(GOBUILD) -o /tmp/rtsp-simple-server + tar -C /tmp -czf $(PWD)/release/rtsp-simple-server_$(VERSION)_linux_arm64.tar.gz --owner=0 --group=0 rtsp-simple-server diff --git a/README.md b/README.md index a6878f1c..3a599827 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,58 @@ + # rtsp-simple-server -RTSP video server written in Go + +_rtsp-simple-server_ is a simple an ready-to-use RTSP video server, a program that allows multiple users to publish or read live video and audio streams. RTSP a standardized protocol that defines how to perform these operations with the help of a server, that both publishers and readers can contact in order to negotiate a streaming protocol and write or read data. The server is then responsible of linking the publisher stream with the readers. + +This software was developed with the aim of simulating a live camera feed for debugging purposes, and therefore to use files instead of real streams. Another reason for the development was the deprecation of _FFserver_, the component of the FFmpeg project that allowed to create a RTSP server with _FFmpeg_ (but this server can be used with any software that supports RTSP). + +It actually supports *one* publisher, while readers can be more than one. + + +## Installation + +Precompiled binaries are available in the [release](https://github.com/aler9/rtsp-simple-server/releases) page. Just download and extract the executable. + + +## Usage + +1. Start the server: + ``` + ./rtsp-simple-server + ``` + +2. In another terminal, publish something with FFmpeg (in this example it's a video file, but it can be anything you want): + ``` + ffmpeg -re -stream_loop -1 -i file.ts -map 0:v:0 -c:v copy -f rtsp rtsp://localhost:8554/ + ``` + +3. Open the stream with VLC: + ``` + vlc rtsp://localhost:8554/ + ``` + + you can alternatively use GStreamer: + ``` + gst-launch-1.0 -v rtspsrc location=rtsp://localhost:8554/ ! rtph264depay ! decodebin ! autovideosink + ``` + +## Full command-line usage + +``` +usage: rtsp-simple-server [] + +rtsp-simple-server + +RTSP server. + +Flags: + --help Show context-sensitive help (also try --help-long and --help-man). + --version print rtsp-simple-server version + --rtsp-port=8554 port of the RTSP TCP listener + --rtp-port=8000 port of the RTP UDP listener + --rtcp-port=8001 port of the RTCP UDP listener +``` + +## Links + +Specifications +* https://tools.ietf.org/html/rfc7826 diff --git a/go.mod b/go.mod new file mode 100644 index 00000000..9dd47e71 --- /dev/null +++ b/go.mod @@ -0,0 +1,10 @@ +module rtsp-server + +go 1.13 + +require ( + github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 // indirect + github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d // indirect + github.com/stretchr/testify v1.4.0 + gopkg.in/alecthomas/kingpin.v2 v2.2.6 +) diff --git a/go.sum b/go.sum new file mode 100644 index 00000000..edf40129 --- /dev/null +++ b/go.sum @@ -0,0 +1,17 @@ +github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 h1:JYp7IbQjafoB+tBA3gMyHYHrpOtNuDiK/uB5uXxq5wM= +github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= +github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d h1:UQZhZ2O0vMHr2cI+DC1Mbh0TJxzA3RcLoMsFw+aXw7E= +github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho= +github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk= +github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +gopkg.in/alecthomas/kingpin.v2 v2.2.6 h1:jMFz6MfLP0/4fUyZle81rXUoxOBFi19VUFKVDOQfozc= +gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/main.go b/main.go new file mode 100644 index 00000000..6fef8441 --- /dev/null +++ b/main.go @@ -0,0 +1,116 @@ +package main + +import ( + "fmt" + "log" + "net" + "os" + "sync" + + "gopkg.in/alecthomas/kingpin.v2" +) + +var Version string = "v0.0.0" + +type program struct { + rtspPort int + rtpPort int + rtcpPort int + mutex sync.Mutex + rtspl *rtspListener + rtpl *udpListener + rtcpl *udpListener + clients map[*rtspClient]struct{} + streamAuthor *rtspClient + streamSdp []byte +} + +func newProgram(rtspPort int, rtpPort int, rtcpPort int) (*program, error) { + p := &program{ + rtspPort: rtspPort, + rtpPort: rtpPort, + rtcpPort: rtcpPort, + clients: make(map[*rtspClient]struct{}), + } + + var err error + + p.rtpl, err = newUdpListener(rtpPort, "RTP", func(l *udpListener, buf []byte) { + p.mutex.Lock() + defer p.mutex.Unlock() + for c := range p.clients { + if c.state == "PLAY" { + l.nconn.WriteTo(buf, &net.UDPAddr{ + IP: c.IP, + Port: c.rtpPort, + }) + } + } + }) + if err != nil { + return nil, err + } + + p.rtcpl, err = newUdpListener(rtcpPort, "RTCP", func(l *udpListener, buf []byte) { + p.mutex.Lock() + defer p.mutex.Unlock() + for c := range p.clients { + if c.state == "PLAY" { + l.nconn.WriteTo(buf, &net.UDPAddr{ + IP: c.IP, + Port: c.rtcpPort, + }) + } + } + }) + if err != nil { + return nil, err + } + + p.rtspl, err = newRtspListener(p) + if err != nil { + return nil, err + } + + return p, nil +} + +func (p *program) run() { + var wg sync.WaitGroup + + wg.Add(1) + go p.rtpl.run(wg) + + wg.Add(1) + go p.rtcpl.run(wg) + + wg.Add(1) + go p.rtspl.run(wg) + + wg.Wait() +} + +func main() { + kingpin.CommandLine.Help = "rtsp-simple-server " + Version + "\n\n" + + "RTSP server." + + version := kingpin.Flag("version", "print rtsp-simple-server version").Bool() + + rtspPort := kingpin.Flag("rtsp-port", "port of the RTSP TCP listener").Default("8554").Int() + rtpPort := kingpin.Flag("rtp-port", "port of the RTP UDP listener").Default("8000").Int() + rtcpPort := kingpin.Flag("rtcp-port", "port of the RTCP UDP listener").Default("8001").Int() + + kingpin.Parse() + + if *version == true { + fmt.Println("rtsp-simple-server " + Version) + os.Exit(0) + } + + p, err := newProgram(*rtspPort, *rtpPort, *rtcpPort) + if err != nil { + log.Fatal("ERR:", err) + } + + p.run() +} diff --git a/rtsp/conn.go b/rtsp/conn.go new file mode 100644 index 00000000..3a5aca7b --- /dev/null +++ b/rtsp/conn.go @@ -0,0 +1,39 @@ +package rtsp + +import ( + "net" +) + +type Conn struct { + c net.Conn +} + +func NewConn(c net.Conn) *Conn { + return &Conn{ + c: c, + } +} + +func (c *Conn) Close() error { + return c.c.Close() +} + +func (c *Conn) RemoteAddr() net.Addr { + return c.c.RemoteAddr() +} + +func (c *Conn) ReadRequest() (*Request, error) { + return requestDecode(c.c) +} + +func (c *Conn) WriteRequest(req *Request) error { + return requestEncode(c.c, req) +} + +func (c *Conn) ReadResponse() (*Response, error) { + return responseDecode(c.c) +} + +func (c *Conn) WriteResponse(res *Response) error { + return responseEncode(c.c, res) +} diff --git a/rtsp/request.go b/rtsp/request.go new file mode 100644 index 00000000..44b08bd1 --- /dev/null +++ b/rtsp/request.go @@ -0,0 +1,88 @@ +package rtsp + +import ( + "bufio" + "fmt" + "io" +) + +type Request struct { + Method string + Path string + Headers map[string]string + Content []byte +} + +func requestDecode(r io.Reader) (*Request, error) { + rb := bufio.NewReader(r) + + req := &Request{} + + byts, err := readBytesLimited(rb, ' ', 255) + if err != nil { + return nil, err + } + req.Method = string(byts[:len(byts)-1]) + + if len(req.Method) == 0 { + return nil, fmt.Errorf("empty method") + } + + byts, err = readBytesLimited(rb, ' ', 255) + if err != nil { + return nil, err + } + req.Path = string(byts[:len(byts)-1]) + + if len(req.Path) == 0 { + return nil, fmt.Errorf("empty path") + } + + byts, err = readBytesLimited(rb, '\r', 255) + if err != nil { + return nil, err + } + proto := string(byts[:len(byts)-1]) + + if proto != _RTSP_PROTO { + return nil, fmt.Errorf("expected '%s', got '%s'", _RTSP_PROTO, proto) + } + + err = readByteEqual(rb, '\n') + if err != nil { + return nil, err + } + + req.Headers, err = readHeaders(rb) + if err != nil { + return nil, err + } + + req.Content, err = readContent(rb, req.Headers) + if err != nil { + return nil, err + } + + return req, nil +} + +func requestEncode(w io.Writer, req *Request) error { + wb := bufio.NewWriter(w) + + _, err := wb.Write([]byte(req.Method + " " + req.Path + " " + _RTSP_PROTO + "\r\n")) + if err != nil { + return err + } + + err = writeHeaders(wb, req.Headers) + if err != nil { + return err + } + + err = writeContent(wb, req.Content) + if err != nil { + return err + } + + return wb.Flush() +} diff --git a/rtsp/request_test.go b/rtsp/request_test.go new file mode 100644 index 00000000..fb910a8d --- /dev/null +++ b/rtsp/request_test.go @@ -0,0 +1,134 @@ +package rtsp + +import ( + "bytes" + "testing" + + "github.com/stretchr/testify/require" +) + +var casesRequest = []struct { + name string + byts []byte + req *Request +}{ + { + "options", + []byte("OPTIONS rtsp://example.com/media.mp4 RTSP/1.0\r\n" + + "CSeq: 1\r\n" + + "Proxy-Require: gzipped-messages\r\n" + + "Require: implicit-play\r\n" + + "\r\n"), + &Request{ + Method: "OPTIONS", + Path: "rtsp://example.com/media.mp4", + Headers: map[string]string{ + "CSeq": "1", + "Require": "implicit-play", + "Proxy-Require": "gzipped-messages", + }, + }, + }, + { + "describe", + []byte("DESCRIBE rtsp://example.com/media.mp4 RTSP/1.0\r\n" + + "CSeq: 2\r\n" + + "\r\n"), + &Request{ + Method: "DESCRIBE", + Path: "rtsp://example.com/media.mp4", + Headers: map[string]string{ + "CSeq": "2", + }, + }, + }, + { + "announce", + []byte("ANNOUNCE rtsp://example.com/media.mp4 RTSP/1.0\r\n" + + "CSeq: 7\r\n" + + "Content-Length: 306\r\n" + + "Content-Type: application/sdp\r\n" + + "Date: 23 Jan 1997 15:35:06 GMT\r\n" + + "Session: 12345678\r\n" + + "\r\n" + + "v=0\n" + + "o=mhandley 2890844526 2890845468 IN IP4 126.16.64.4\n" + + "s=SDP Seminar\n" + + "i=A Seminar on the session description protocol\n" + + "u=http://www.cs.ucl.ac.uk/staff/M.Handley/sdp.03.ps\n" + + "e=mjh@isi.edu (Mark Handley)\n" + + "c=IN IP4 224.2.17.12/127\n" + + "t=2873397496 2873404696\n" + + "a=recvonly\n" + + "m=audio 3456 RTP/AVP 0\n" + + "m=video 2232 RTP/AVP 31\n"), + &Request{ + Method: "ANNOUNCE", + Path: "rtsp://example.com/media.mp4", + Headers: map[string]string{ + "CSeq": "7", + "Date": "23 Jan 1997 15:35:06 GMT", + "Session": "12345678", + "Content-Type": "application/sdp", + "Content-Length": "306", + }, + Content: []byte("v=0\n" + + "o=mhandley 2890844526 2890845468 IN IP4 126.16.64.4\n" + + "s=SDP Seminar\n" + + "i=A Seminar on the session description protocol\n" + + "u=http://www.cs.ucl.ac.uk/staff/M.Handley/sdp.03.ps\n" + + "e=mjh@isi.edu (Mark Handley)\n" + + "c=IN IP4 224.2.17.12/127\n" + + "t=2873397496 2873404696\n" + + "a=recvonly\n" + + "m=audio 3456 RTP/AVP 0\n" + + "m=video 2232 RTP/AVP 31\n", + ), + }, + }, + { + "get_parameter", + []byte("GET_PARAMETER rtsp://example.com/media.mp4 RTSP/1.0\r\n" + + "CSeq: 9\r\n" + + "Content-Length: 24\r\n" + + "Content-Type: text/parameters\r\n" + + "Session: 12345678\r\n" + + "\r\n" + + "packets_received\n" + + "jitter\n"), + &Request{ + Method: "GET_PARAMETER", + Path: "rtsp://example.com/media.mp4", + Headers: map[string]string{ + "CSeq": "9", + "Content-Type": "text/parameters", + "Session": "12345678", + "Content-Length": "24", + }, + Content: []byte("packets_received\n" + + "jitter\n", + ), + }, + }, +} + +func TestRequestDecode(t *testing.T) { + for _, c := range casesRequest { + t.Run(c.name, func(t *testing.T) { + req, err := requestDecode(bytes.NewBuffer(c.byts)) + require.NoError(t, err) + require.Equal(t, c.req, req) + }) + } +} + +func TestRequestEncode(t *testing.T) { + for _, c := range casesRequest { + t.Run(c.name, func(t *testing.T) { + var buf bytes.Buffer + err := requestEncode(&buf, c.req) + require.NoError(t, err) + require.Equal(t, c.byts, buf.Bytes()) + }) + } +} diff --git a/rtsp/response.go b/rtsp/response.go new file mode 100644 index 00000000..3edf1789 --- /dev/null +++ b/rtsp/response.go @@ -0,0 +1,95 @@ +package rtsp + +import ( + "bufio" + "fmt" + "io" + "strconv" +) + +type Response struct { + StatusCode int + Status string + Headers map[string]string + Content []byte +} + +func responseDecode(r io.Reader) (*Response, error) { + rb := bufio.NewReader(r) + + res := &Response{} + + byts, err := readBytesLimited(rb, ' ', 255) + if err != nil { + return nil, err + } + proto := string(byts[:len(byts)-1]) + + if proto != _RTSP_PROTO { + return nil, fmt.Errorf("expected '%s', got '%s'", _RTSP_PROTO, proto) + } + + byts, err = readBytesLimited(rb, ' ', 4) + if err != nil { + return nil, err + } + statusCodeStr := string(byts[:len(byts)-1]) + + statusCode64, err := strconv.ParseInt(statusCodeStr, 10, 32) + res.StatusCode = int(statusCode64) + if err != nil { + return nil, fmt.Errorf("unable to parse status code") + } + + byts, err = readBytesLimited(rb, '\r', 255) + if err != nil { + return nil, err + } + res.Status = string(byts[:len(byts)-1]) + + if len(res.Status) == 0 { + return nil, fmt.Errorf("empty status") + } + + err = readByteEqual(rb, '\n') + if err != nil { + return nil, err + } + + res.Headers, err = readHeaders(rb) + if err != nil { + return nil, err + } + + res.Content, err = readContent(rb, res.Headers) + if err != nil { + return nil, err + } + + return res, nil +} + +func responseEncode(w io.Writer, res *Response) error { + wb := bufio.NewWriter(w) + + _, err := wb.Write([]byte(_RTSP_PROTO + " " + strconv.FormatInt(int64(res.StatusCode), 10) + " " + res.Status + "\r\n")) + if err != nil { + return err + } + + if len(res.Content) != 0 { + res.Headers["Content-Length"] = strconv.FormatInt(int64(len(res.Content)), 10) + } + + err = writeHeaders(wb, res.Headers) + if err != nil { + return err + } + + err = writeContent(wb, res.Content) + if err != nil { + return err + } + + return wb.Flush() +} diff --git a/rtsp/response_test.go b/rtsp/response_test.go new file mode 100644 index 00000000..087456fc --- /dev/null +++ b/rtsp/response_test.go @@ -0,0 +1,105 @@ +package rtsp + +import ( + "bytes" + "testing" + + "github.com/stretchr/testify/require" +) + +var casesResponse = []struct { + name string + byts []byte + res *Response +}{ + { + "ok", + []byte("RTSP/1.0 200 OK\r\n" + + "CSeq: 1\r\n" + + "Public: DESCRIBE, SETUP, TEARDOWN, PLAY, PAUSE\r\n" + + "\r\n", + ), + &Response{ + StatusCode: 200, + Status: "OK", + Headers: map[string]string{ + "CSeq": "1", + "Public": "DESCRIBE, SETUP, TEARDOWN, PLAY, PAUSE", + }, + }, + }, + { + "ok with content", + []byte("RTSP/1.0 200 OK\r\n" + + "CSeq: 2\r\n" + + "Content-Base: rtsp://example.com/media.mp4\r\n" + + "Content-Length: 444\r\n" + + "Content-Type: application/sdp\r\n" + + "\r\n" + + "m=video 0 RTP/AVP 96\n" + + "a=control:streamid=0\n" + + "a=range:npt=0-7.741000\n" + + "a=length:npt=7.741000\n" + + "a=rtpmap:96 MP4V-ES/5544\n" + + "a=mimetype:string;\"video/MP4V-ES\"\n" + + "a=AvgBitRate:integer;304018\n" + + "a=StreamName:string;\"hinted video track\"\n" + + "m=audio 0 RTP/AVP 97\n" + + "a=control:streamid=1\n" + + "a=range:npt=0-7.712000\n" + + "a=length:npt=7.712000\n" + + "a=rtpmap:97 mpeg4-generic/32000/2\n" + + "a=mimetype:string;\"audio/mpeg4-generic\"\n" + + "a=AvgBitRate:integer;65790\n" + + "a=StreamName:string;\"hinted audio track\"\n", + ), + &Response{ + StatusCode: 200, + Status: "OK", + Headers: map[string]string{ + "Content-Base": "rtsp://example.com/media.mp4", + "Content-Length": "444", + "Content-Type": "application/sdp", + "CSeq": "2", + }, + Content: []byte("m=video 0 RTP/AVP 96\n" + + "a=control:streamid=0\n" + + "a=range:npt=0-7.741000\n" + + "a=length:npt=7.741000\n" + + "a=rtpmap:96 MP4V-ES/5544\n" + + "a=mimetype:string;\"video/MP4V-ES\"\n" + + "a=AvgBitRate:integer;304018\n" + + "a=StreamName:string;\"hinted video track\"\n" + + "m=audio 0 RTP/AVP 97\n" + + "a=control:streamid=1\n" + + "a=range:npt=0-7.712000\n" + + "a=length:npt=7.712000\n" + + "a=rtpmap:97 mpeg4-generic/32000/2\n" + + "a=mimetype:string;\"audio/mpeg4-generic\"\n" + + "a=AvgBitRate:integer;65790\n" + + "a=StreamName:string;\"hinted audio track\"\n", + ), + }, + }, +} + +func TestResponseDecode(t *testing.T) { + for _, c := range casesResponse { + t.Run(c.name, func(t *testing.T) { + res, err := responseDecode(bytes.NewBuffer(c.byts)) + require.NoError(t, err) + require.Equal(t, c.res, res) + }) + } +} + +func TestResponseEncode(t *testing.T) { + for _, c := range casesResponse { + t.Run(c.name, func(t *testing.T) { + var buf bytes.Buffer + err := responseEncode(&buf, c.res) + require.NoError(t, err) + require.Equal(t, c.byts, buf.Bytes()) + }) + } +} diff --git a/rtsp/utils.go b/rtsp/utils.go new file mode 100644 index 00000000..0ec613fc --- /dev/null +++ b/rtsp/utils.go @@ -0,0 +1,161 @@ +package rtsp + +import ( + "bufio" + "fmt" + "io" + "sort" + "strconv" +) + +const ( + _RTSP_PROTO = "RTSP/1.0" + _MAX_HEADER_COUNT = 255 + _MAX_HEADER_KEY_LENGTH = 255 + _MAX_HEADER_VALUE_LENGTH = 255 + _MAX_CONTENT_LENGTH = 4096 +) + +func readBytesLimited(rb *bufio.Reader, delim byte, n int) ([]byte, error) { + for i := 1; i <= n; i++ { + byts, err := rb.Peek(i) + if err != nil { + return nil, err + } + + if byts[len(byts)-1] == delim { + rb.Discard(len(byts)) + return byts, nil + } + } + return nil, fmt.Errorf("buffer length exceeds %d", n) +} + +func readByteEqual(rb *bufio.Reader, cmp byte) error { + byt, err := rb.ReadByte() + if err != nil { + return err + } + + if byt != cmp { + return fmt.Errorf("expected '%c', got '%c'", cmp, byt) + } + + return nil +} + +func readHeaders(rb *bufio.Reader) (map[string]string, error) { + ret := make(map[string]string) + + for { + byt, err := rb.ReadByte() + if err != nil { + return nil, err + } + + if byt == '\r' { + err := readByteEqual(rb, '\n') + if err != nil { + return nil, err + } + + break + } + + if len(ret) >= _MAX_HEADER_COUNT { + return nil, fmt.Errorf("headers count exceeds %d", _MAX_HEADER_COUNT) + } + + key := string([]byte{byt}) + byts, err := readBytesLimited(rb, ':', _MAX_HEADER_KEY_LENGTH-1) + if err != nil { + return nil, err + } + key += string(byts[:len(byts)-1]) + + err = readByteEqual(rb, ' ') + if err != nil { + return nil, err + } + + byts, err = readBytesLimited(rb, '\r', _MAX_HEADER_VALUE_LENGTH) + if err != nil { + return nil, err + } + val := string(byts[:len(byts)-1]) + + if len(val) == 0 { + return nil, fmt.Errorf("empty header value") + } + + err = readByteEqual(rb, '\n') + if err != nil { + return nil, err + } + + ret[key] = val + } + + return ret, nil +} + +func writeHeaders(wb *bufio.Writer, headers map[string]string) error { + // sort headers by key + // in order to obtain deterministic results + var keys []string + for key := range headers { + keys = append(keys, key) + } + sort.Strings(keys) + + for _, key := range keys { + _, err := wb.Write([]byte(key + ": " + headers[key] + "\r\n")) + if err != nil { + return err + } + } + + _, err := wb.Write([]byte("\r\n")) + if err != nil { + return err + } + + return nil +} + +func readContent(rb *bufio.Reader, headers map[string]string) ([]byte, error) { + cls, ok := headers["Content-Length"] + if !ok { + return nil, nil + } + + cl, err := strconv.ParseInt(cls, 10, 64) + if err != nil { + return nil, fmt.Errorf("invalid Content-Length") + } + + if cl > _MAX_CONTENT_LENGTH { + return nil, fmt.Errorf("Content-Length exceeds %d", _MAX_CONTENT_LENGTH) + } + + ret := make([]byte, cl) + n, err := io.ReadFull(rb, ret) + if err != nil && n != len(ret) { + return nil, err + } + + return ret, nil +} + +func writeContent(wb *bufio.Writer, content []byte) error { + if len(content) == 0 { + return nil + } + + _, err := wb.Write(content) + if err != nil { + return err + } + + return nil +} diff --git a/rtsp_client.go b/rtsp_client.go new file mode 100644 index 00000000..c3410615 --- /dev/null +++ b/rtsp_client.go @@ -0,0 +1,414 @@ +package main + +import ( + "fmt" + "io" + "log" + "net" + "net/url" + "strconv" + "strings" + "sync" + + "rtsp-server/rtsp" +) + +type rtspClient struct { + p *program + rconn *rtsp.Conn + state string + IP net.IP + rtpPort int + rtcpPort int +} + +func newRtspClient(p *program, rconn *rtsp.Conn) *rtspClient { + c := &rtspClient{ + p: p, + rconn: rconn, + state: "STARTING", + } + + c.p.mutex.Lock() + c.p.clients[c] = struct{}{} + c.p.mutex.Unlock() + + return c +} + +func (c *rtspClient) close() error { + delete(c.p.clients, c) + + if c.p.streamAuthor == c { + c.p.streamAuthor = nil + c.p.streamSdp = nil + + // if the streamer has disconnected + // close all other connections + for oc := range c.p.clients { + oc.close() + } + } + + return c.rconn.Close() +} + +func (c *rtspClient) log(format string, args ...interface{}) { + format = "[RTSP client " + c.rconn.RemoteAddr().String() + "] " + format + log.Printf(format, args...) +} + +func (c *rtspClient) run(wg sync.WaitGroup) { + defer wg.Done() + defer c.log("disconnected") + defer func() { + c.p.mutex.Lock() + defer c.p.mutex.Unlock() + c.close() + }() + + c.log("connected") + + for { + req, err := c.rconn.ReadRequest() + if err != nil { + if err != io.EOF { + c.log("ERR: %s", err) + } + return + } + + c.log(req.Method) + + cseq, ok := req.Headers["CSeq"] + if !ok { + c.log("ERR: cseq missing") + return + } + + ur, err := url.Parse(req.Path) + if err != nil { + c.log("ERR: unable to parse path '%s'", req.Path) + return + } + + switch req.Method { + case "OPTIONS": + // do not check state, since OPTIONS can be requested + // in any state + + err = c.rconn.WriteResponse(&rtsp.Response{ + StatusCode: 200, + Status: "OK", + Headers: map[string]string{ + "CSeq": cseq, + "Public": strings.Join([]string{ + "DESCRIBE", + "SETUP", + "PLAY", + "PAUSE", + "TEARDOWN", + }, ", "), + }, + }) + if err != nil { + c.log("ERR: %s", err) + return + } + + case "DESCRIBE": + if c.state != "STARTING" { + c.log("ERR: client is in state '%s'", c.state) + return + } + + sdp, err := func() ([]byte, error) { + c.p.mutex.Lock() + defer c.p.mutex.Unlock() + + if len(c.p.streamSdp) == 0 { + return nil, fmt.Errorf("no one is streaming") + } + + return c.p.streamSdp, nil + }() + if err != nil { + c.log("ERR: %s", err) + return + } + + err = c.rconn.WriteResponse(&rtsp.Response{ + StatusCode: 200, + Status: "OK", + Headers: map[string]string{ + "CSeq": cseq, + "Content-Base": ur.String(), + "Content-Type": "application/sdp", + }, + Content: sdp, + }) + if err != nil { + c.log("ERR: %s", err) + return + } + + case "SETUP": + transport, ok := req.Headers["Transport"] + if !ok { + c.log("ERR: transport header missing") + return + } + + transports := strings.Split(transport, ";") + + ok = func() bool { + for _, t := range transports { + if t == "unicast" { + return true + } + } + return false + }() + if !ok { + c.log("ERR: transport header does not contain unicast") + return + } + + clientPort1, clientPort2 := func() (int, int) { + for _, t := range transports { + if !strings.HasPrefix(t, "client_port=") { + continue + } + t = t[len("client_port="):] + + ports := strings.Split(t, "-") + if len(ports) != 2 { + return 0, 0 + } + + port1, err := strconv.ParseInt(ports[0], 10, 64) + if err != nil { + return 0, 0 + } + + port2, err := strconv.ParseInt(ports[1], 10, 64) + if err != nil { + return 0, 0 + } + + return int(port1), int(port2) + } + return 0, 0 + }() + if clientPort1 == 0 || clientPort2 == 0 { + c.log("ERR: transport header does not have valid client ports (%s)", transport) + return + } + + switch c.state { + // play + case "STARTING": + ok = func() bool { + for _, t := range transports { + if t == "RTP/AVP" { + return true + } + } + return false + }() + if !ok { + c.log("ERR: transport header does not contain RTP/AVP") + return + } + + err = c.rconn.WriteResponse(&rtsp.Response{ + StatusCode: 200, + Status: "OK", + Headers: map[string]string{ + "CSeq": cseq, + "Transport": strings.Join([]string{ + "RTP/AVP", + "unicast", + fmt.Sprintf("client_port=%d-%d", clientPort1, clientPort2), + fmt.Sprintf("server_port=%d-%d", c.p.rtpPort, c.p.rtcpPort), + "ssrc=1234ABCD", + }, ";"), + "Session": "12345678", + }, + }) + if err != nil { + c.log("ERR: %s", err) + return + } + + c.p.mutex.Lock() + c.rtpPort = clientPort1 + c.rtcpPort = clientPort2 + c.state = "PRE_PLAY" + c.p.mutex.Unlock() + + // record + case "ANNOUNCE": + ok = func() bool { + for _, t := range transports { + if t == "RTP/AVP/UDP" { + return true + } + } + return false + }() + if !ok { + c.log("ERR: transport header does not contain RTP/AVP/UDP") + return + } + + ok = func() bool { + for _, t := range transports { + if t == "mode=record" { + return true + } + } + return false + }() + if !ok { + c.log("ERR: transport header does not contain mode=record") + return + } + + err = c.rconn.WriteResponse(&rtsp.Response{ + StatusCode: 200, + Status: "OK", + Headers: map[string]string{ + "CSeq": cseq, + "Transport": strings.Join([]string{ + "RTP/AVP", + "unicast", + fmt.Sprintf("client_port=%d-%d", clientPort1, clientPort2), + fmt.Sprintf("server_port=%d-%d", c.p.rtpPort, c.p.rtcpPort), + "ssrc=1234ABCD", + }, ";"), + "Session": "12345678", + }, + }) + if err != nil { + c.log("ERR: %s", err) + return + } + + c.p.mutex.Lock() + ipstr, _, _ := net.SplitHostPort(c.rconn.RemoteAddr().String()) + c.IP = net.ParseIP(ipstr) + c.rtpPort = clientPort1 + c.rtcpPort = clientPort2 + c.state = "PRE_RECORD" + c.p.mutex.Unlock() + + default: + c.log("ERR: client is in state '%s'", c.state) + return + } + + case "PLAY": + if c.state != "PRE_PLAY" { + c.log("ERR: client is in state '%s'", c.state) + return + } + + err = c.rconn.WriteResponse(&rtsp.Response{ + StatusCode: 200, + Status: "OK", + Headers: map[string]string{ + "CSeq": cseq, + "Session": "12345678", + }, + }) + if err != nil { + c.log("ERR: %s", err) + return + } + + c.p.mutex.Lock() + c.state = "PLAY" + c.p.mutex.Unlock() + + case "RECORD": + if c.state != "PRE_RECORD" { + c.log("ERR: client is in state '%s'", c.state) + return + } + + err = c.rconn.WriteResponse(&rtsp.Response{ + StatusCode: 200, + Status: "OK", + Headers: map[string]string{ + "CSeq": cseq, + "Session": "12345678", + }, + }) + if err != nil { + c.log("ERR: %s", err) + return + } + + c.p.mutex.Lock() + c.state = "RECORD" + c.p.mutex.Unlock() + + case "ANNOUNCE": + if c.state != "STARTING" { + c.log("ERR: client is in state '%s'", c.state) + return + } + + ct, ok := req.Headers["Content-Type"] + if !ok { + c.log("ERR: Content-Type header missing") + return + } + + if ct != "application/sdp" { + c.log("ERR: unsupported Content-Type '%s'", ct) + return + } + + err := func() error { + c.p.mutex.Lock() + defer c.p.mutex.Unlock() + + if c.p.streamAuthor != nil { + return fmt.Errorf("another client is already streaming") + } + + c.p.streamAuthor = c + c.p.streamSdp = req.Content + return nil + }() + if err != nil { + c.log("ERR: %s", err) + return + } + + err = c.rconn.WriteResponse(&rtsp.Response{ + StatusCode: 200, + Status: "OK", + Headers: map[string]string{ + "CSeq": cseq, + }, + }) + if err != nil { + c.log("ERR: %s", err) + return + } + + c.p.mutex.Lock() + c.state = "ANNOUNCE" + c.p.mutex.Unlock() + + case "TEARDOWN": + return + + default: + c.log("ERR: method %s unhandled", req.Method) + } + } +} diff --git a/rtsp_listener.go b/rtsp_listener.go new file mode 100644 index 00000000..a677296b --- /dev/null +++ b/rtsp_listener.go @@ -0,0 +1,51 @@ +package main + +import ( + "log" + "net" + "sync" + + "rtsp-server/rtsp" +) + +type rtspListener struct { + p *program + netl *net.TCPListener +} + +func newRtspListener(p *program) (*rtspListener, error) { + netl, err := net.ListenTCP("tcp", &net.TCPAddr{ + Port: p.rtspPort, + }) + if err != nil { + return nil, err + } + + s := &rtspListener{ + p: p, + netl: netl, + } + + s.log("opened on :%d", p.rtspPort) + return s, nil +} + +func (l *rtspListener) log(format string, args ...interface{}) { + log.Printf("[RTSP listener] "+format, args...) +} + +func (l *rtspListener) run(wg sync.WaitGroup) { + defer wg.Done() + + for { + conn, err := l.netl.AcceptTCP() + if err != nil { + break + } + + rconn := rtsp.NewConn(conn) + rsc := newRtspClient(l.p, rconn) + wg.Add(1) + go rsc.run(wg) + } +} diff --git a/udp_listener.go b/udp_listener.go new file mode 100644 index 00000000..4aabbd27 --- /dev/null +++ b/udp_listener.go @@ -0,0 +1,53 @@ +package main + +import ( + "log" + "net" + "sync" +) + +type udpListenerCb func(*udpListener, []byte) + +type udpListener struct { + nconn *net.UDPConn + logPrefix string + cb udpListenerCb +} + +func newUdpListener(port int, logPrefix string, cb udpListenerCb) (*udpListener, error) { + nconn, err := net.ListenUDP("udp", &net.UDPAddr{ + Port: port, + }) + if err != nil { + return nil, err + } + + l := &udpListener{ + nconn: nconn, + logPrefix: logPrefix, + cb: cb, + } + + l.log("opened on :%d", port) + return l, nil +} + +func (l *udpListener) log(format string, args ...interface{}) { + log.Printf("["+l.logPrefix+" listener] "+format, args...) +} + +func (l *udpListener) run(wg sync.WaitGroup) { + defer wg.Done() + + buf := make([]byte, 2048) // UDP MTU is 1400 + + for { + n, _, err := l.nconn.ReadFromUDP(buf) + if err != nil { + l.log("ERR: %s", err) + break + } + + l.cb(l, buf[:n]) + } +}