diff --git a/internal/client/client.go b/internal/client/client.go index 47ed8c72..493f6138 100644 --- a/internal/client/client.go +++ b/internal/client/client.go @@ -8,7 +8,7 @@ import ( "github.com/aler9/gortsplib/pkg/headers" "github.com/aler9/rtsp-simple-server/internal/conf" - "github.com/aler9/rtsp-simple-server/internal/source" + "github.com/aler9/rtsp-simple-server/internal/streamproc" ) // ErrNoOnePublishing is a "no one is publishing" error. @@ -49,8 +49,6 @@ type Path interface { OnClientPlay(PlayReq) OnClientRecord(RecordReq) OnClientPause(PauseReq) - OnSetStartingPoint(source.SetStartingPointReq) - OnFrame(int, gortsplib.StreamType, []byte) } // DescribeRes is a describe response. @@ -106,7 +104,7 @@ type RemoveReq struct { // PlayRes is a play response. type PlayRes struct { - TrackStartingPoints []source.TrackStartingPoint + TrackStartingPoints []streamproc.TrackStartingPoint } // PlayReq is a play request. @@ -115,10 +113,16 @@ type PlayReq struct { Res chan PlayRes } +// RecordRes is a record response. +type RecordRes struct { + SP *streamproc.StreamProc + Err error +} + // RecordReq is a record request. type RecordReq struct { Client Client - Res chan struct{} + Res chan RecordRes } // PauseReq is a pause request. @@ -135,5 +139,5 @@ type Client interface { Authenticate([]headers.AuthMethod, string, []interface{}, string, string, interface{}) error - OnIncomingFrame(int, gortsplib.StreamType, []byte) + OnFrame(int, gortsplib.StreamType, []byte) } diff --git a/internal/clientrtmp/client.go b/internal/clientrtmp/client.go index c0e6f96f..e8bb127b 100644 --- a/internal/clientrtmp/client.go +++ b/internal/clientrtmp/client.go @@ -25,9 +25,7 @@ import ( "github.com/aler9/rtsp-simple-server/internal/logger" "github.com/aler9/rtsp-simple-server/internal/rtcpsenderset" "github.com/aler9/rtsp-simple-server/internal/rtmputils" - "github.com/aler9/rtsp-simple-server/internal/source" "github.com/aler9/rtsp-simple-server/internal/stats" - "github.com/aler9/rtsp-simple-server/internal/streamproc" ) const ( @@ -465,21 +463,7 @@ func (c *Client) runPublish() { return res.Err } - resc2 := make(chan struct{}) - res.Path.OnClientRecord(client.RecordReq{c, resc2}) //nolint:govet - <-resc2 path = res.Path - - c.log(logger.Info, "is publishing to path '%s', %d %s", - path.Name(), - len(tracks), - func() string { - if len(tracks) == 1 { - return "track" - } - return "tracks" - }()) - return nil }() }() @@ -500,32 +484,48 @@ func (c *Client) runPublish() { return } - var onPublishCmd *externalcmd.Cmd - if path.Conf().RunOnPublish != "" { - onPublishCmd = externalcmd.New(path.Conf().RunOnPublish, - path.Conf().RunOnPublishRestart, externalcmd.Environment{ - Path: path.Name(), - Port: strconv.FormatInt(int64(c.rtspPort), 10), - }) - } - - defer func(path client.Path) { - if path.Conf().RunOnPublish != "" { - onPublishCmd.Close() - } - }(path) - - sp := streamproc.New(c, path, make([]source.TrackStartingPoint, len(tracks))) - readerDone := make(chan error) go func() { readerDone <- func() error { - rtcpSenders := rtcpsenderset.New(tracks, path.OnFrame) + resc := make(chan client.RecordRes) + path.OnClientRecord(client.RecordReq{Client: c, Res: resc}) + res := <-resc + + if res.Err != nil { + return res.Err + } + + c.log(logger.Info, "is publishing to path '%s', %d %s", + path.Name(), + len(tracks), + func() string { + if len(tracks) == 1 { + return "track" + } + return "tracks" + }()) + + var onPublishCmd *externalcmd.Cmd + if path.Conf().RunOnPublish != "" { + onPublishCmd = externalcmd.New(path.Conf().RunOnPublish, + path.Conf().RunOnPublishRestart, externalcmd.Environment{ + Path: path.Name(), + Port: strconv.FormatInt(int64(c.rtspPort), 10), + }) + } + + defer func(path client.Path) { + if path.Conf().RunOnPublish != "" { + onPublishCmd.Close() + } + }(path) + + rtcpSenders := rtcpsenderset.New(tracks, res.SP.OnFrame) defer rtcpSenders.Close() onFrame := func(trackID int, payload []byte) { rtcpSenders.OnFrame(trackID, gortsplib.StreamTypeRTP, payload) - sp.OnFrame(trackID, gortsplib.StreamTypeRTP, payload) + res.SP.OnFrame(trackID, gortsplib.StreamTypeRTP, payload) } for { @@ -641,8 +641,8 @@ func (c *Client) Authenticate(authMethods []headers.AuthMethod, return nil } -// OnIncomingFrame implements path.Reader. -func (c *Client) OnIncomingFrame(trackID int, streamType gortsplib.StreamType, buf []byte) { +// OnFrame implements path.Reader. +func (c *Client) OnFrame(trackID int, streamType gortsplib.StreamType, buf []byte) { if streamType == gortsplib.StreamTypeRTP { c.ringBuffer.Push(trackIDBufPair{trackID, buf}) } diff --git a/internal/clientrtsp/client.go b/internal/clientrtsp/client.go index 8ac32dc7..53b50210 100644 --- a/internal/clientrtsp/client.go +++ b/internal/clientrtsp/client.go @@ -19,7 +19,6 @@ import ( "github.com/aler9/rtsp-simple-server/internal/client" "github.com/aler9/rtsp-simple-server/internal/externalcmd" "github.com/aler9/rtsp-simple-server/internal/logger" - "github.com/aler9/rtsp-simple-server/internal/source" "github.com/aler9/rtsp-simple-server/internal/stats" "github.com/aler9/rtsp-simple-server/internal/streamproc" ) @@ -74,8 +73,8 @@ type Client struct { authFailures int // read only - trackStartingPoints []source.TrackStartingPoint - onReadCmd *externalcmd.Cmd + setuppedTracks map[int]*gortsplib.Track + onReadCmd *externalcmd.Cmd // publish only sp *streamproc.StreamProc @@ -296,6 +295,11 @@ func (c *Client) run() { StatusCode: base.StatusBadRequest, }, fmt.Errorf("track %d does not exist", ctx.TrackID) } + + if c.setuppedTracks == nil { + c.setuppedTracks = make(map[int]*gortsplib.Track) + } + c.setuppedTracks[ctx.TrackID] = res.Tracks[ctx.TrackID] } return &base.Response{ @@ -307,6 +311,10 @@ func (c *Client) run() { } onPlay := func(ctx *gortsplib.ServerConnPlayCtx) (*base.Response, error) { + h := base.Header{ + "Session": base.HeaderValue{sessionID}, + } + if c.conn.State() == gortsplib.ServerConnStatePrePlay { if ctx.Path != c.path.Name() { return &base.Response{ @@ -315,40 +323,40 @@ func (c *Client) run() { } res := c.playStart() - c.trackStartingPoints = res.TrackStartingPoints - } - h := base.Header{ - "Session": base.HeaderValue{sessionID}, - } + // add RTP-Info + var ri headers.RTPInfo + for trackID, v := range res.TrackStartingPoints { + if !v.Filled { + continue + } - // add RTP-Info - var ri headers.RTPInfo - for trackID, v := range c.trackStartingPoints { - if !v.Filled { - continue + track, ok := c.setuppedTracks[trackID] + if !ok { + continue + } + + u := &base.URL{ + Scheme: ctx.Req.URL.Scheme, + User: ctx.Req.URL.User, + Host: ctx.Req.URL.Host, + Path: "/" + c.path.Name(), + } + u.AddControlAttribute("trackID=" + strconv.FormatInt(int64(trackID), 10)) + + clockRate, _ := track.ClockRate() + ts := uint32(uint64(time.Since(v.NTPTime).Seconds()*float64(clockRate)) + + uint64(v.RTPTime)) + + ri = append(ri, &headers.RTPInfoEntry{ + URL: u, + SequenceNumber: 0, + Timestamp: ts, + }) } - - if _, ok := c.conn.SetuppedTracks()[trackID]; !ok { - continue + if len(ri) > 0 { + h["RTP-Info"] = ri.Write() } - - u := &base.URL{ - Scheme: ctx.Req.URL.Scheme, - User: ctx.Req.URL.User, - Host: ctx.Req.URL.Host, - Path: "/" + c.path.Name(), - } - u.AddControlAttribute("trackID=" + strconv.FormatInt(int64(trackID), 10)) - - ri = append(ri, &headers.RTPInfoEntry{ - URL: u, - SequenceNumber: v.SequenceNumber, - Timestamp: v.Timestamp, - }) - } - if len(ri) > 0 { - h["RTP-Info"] = ri.Write() } return &base.Response{ @@ -364,7 +372,12 @@ func (c *Client) run() { }, fmt.Errorf("path has changed, was '%s', now is '%s'", c.path.Name(), ctx.Path) } - c.recordStart() + err := c.recordStart() + if err != nil { + return &base.Response{ + StatusCode: base.StatusBadRequest, + }, err + } return &base.Response{ StatusCode: base.StatusOK, @@ -583,10 +596,16 @@ func (c *Client) playStop() { } } -func (c *Client) recordStart() { - resc := make(chan struct{}) - c.path.OnClientRecord(client.RecordReq{c, resc}) //nolint:govet - <-resc +func (c *Client) recordStart() error { + resc := make(chan client.RecordRes) + c.path.OnClientRecord(client.RecordReq{Client: c, Res: resc}) + res := <-resc + + if res.Err != nil { + return res.Err + } + + c.sp = res.SP tracksLen := len(c.conn.AnnouncedTracks()) @@ -608,7 +627,7 @@ func (c *Client) recordStart() { }) } - c.sp = streamproc.New(c, c.path, make([]source.TrackStartingPoint, len(c.conn.AnnouncedTracks()))) + return nil } func (c *Client) recordStop() { @@ -617,8 +636,8 @@ func (c *Client) recordStop() { } } -// OnIncomingFrame implements path.Reader. -func (c *Client) OnIncomingFrame(trackID int, streamType gortsplib.StreamType, payload []byte) { +// OnFrame implements path.Reader. +func (c *Client) OnFrame(trackID int, streamType gortsplib.StreamType, payload []byte) { if _, ok := c.conn.SetuppedTracks()[trackID]; !ok { return } diff --git a/internal/path/path.go b/internal/path/path.go index acb9dc20..3397d244 100644 --- a/internal/path/path.go +++ b/internal/path/path.go @@ -19,6 +19,7 @@ import ( "github.com/aler9/rtsp-simple-server/internal/sourcertmp" "github.com/aler9/rtsp-simple-server/internal/sourcertsp" "github.com/aler9/rtsp-simple-server/internal/stats" + "github.com/aler9/rtsp-simple-server/internal/streamproc" ) func newEmptyTimer() *time.Timer { @@ -76,7 +77,8 @@ type Path struct { setupPlayRequests []client.SetupPlayReq source source.Source sourceTracks gortsplib.Tracks - sourceTrackStartingPoints []source.TrackStartingPoint + sourceTrackStartingPoints []streamproc.TrackStartingPoint + sp *streamproc.StreamProc readers *readersMap onDemandCmd *externalcmd.Cmd describeTimer *time.Timer @@ -90,7 +92,6 @@ type Path struct { closeTimerStarted bool // in - setStartingPoint chan source.SetStartingPointReq extSourceSetReady chan source.ExtSetReadyReq extSourceSetNotReady chan source.ExtSetNotReadyReq clientDescribe chan client.DescribeReq @@ -100,6 +101,7 @@ type Path struct { clientRecord chan client.RecordReq clientPause chan client.PauseReq clientRemove chan client.RemoveReq + spSetStartingPoint chan streamproc.SetStartingPointReq terminate chan struct{} } @@ -135,7 +137,6 @@ func New( sourceCloseTimer: newEmptyTimer(), runOnDemandCloseTimer: newEmptyTimer(), closeTimer: newEmptyTimer(), - setStartingPoint: make(chan source.SetStartingPointReq), extSourceSetReady: make(chan source.ExtSetReadyReq), extSourceSetNotReady: make(chan source.ExtSetNotReadyReq), clientDescribe: make(chan client.DescribeReq), @@ -145,6 +146,7 @@ func New( clientRecord: make(chan client.RecordReq), clientPause: make(chan client.PauseReq), clientRemove: make(chan client.RemoveReq), + spSetStartingPoint: make(chan streamproc.SetStartingPointReq), terminate: make(chan struct{}), } @@ -224,21 +226,12 @@ outer: <-pa.terminate break outer - case req := <-pa.setStartingPoint: - pa.onSetStartingPoint(req) - case req := <-pa.extSourceSetReady: pa.sourceTracks = req.Tracks - - // clone - cl := make([]source.TrackStartingPoint, len(req.StartingPoints)) - for k, v := range req.StartingPoints { - cl[k] = v - } - pa.sourceTrackStartingPoints = cl - + pa.sourceTrackStartingPoints = make([]streamproc.TrackStartingPoint, len(req.Tracks)) + pa.sp = streamproc.New(pa, len(req.Tracks)) pa.onSourceSetReady() - close(req.Res) + req.Res <- source.ExtSetReadyRes{SP: pa.sp} case req := <-pa.extSourceSetNotReady: pa.onSourceSetNotReady() @@ -276,6 +269,9 @@ outer: pa.clientsWg.Done() close(req.Res) + case req := <-pa.spSetStartingPoint: + pa.onSPSetStartingPoint(req) + case <-pa.terminate: pa.exhaustChannels() break outer @@ -325,7 +321,6 @@ outer: } pa.clientsWg.Wait() - close(pa.setStartingPoint) close(pa.extSourceSetReady) close(pa.extSourceSetNotReady) close(pa.clientDescribe) @@ -335,17 +330,13 @@ outer: close(pa.clientRecord) close(pa.clientPause) close(pa.clientRemove) + close(pa.spSetStartingPoint) } func (pa *Path) exhaustChannels() { go func() { for { select { - case _, ok := <-pa.setStartingPoint: - if !ok { - return - } - case req, ok := <-pa.extSourceSetReady: if !ok { return @@ -406,6 +397,12 @@ func (pa *Path) exhaustChannels() { pa.clientsWg.Done() close(req.Res) + + case _, ok := <-pa.spSetStartingPoint: + if !ok { + return + } + } } }() @@ -570,14 +567,6 @@ func (pa *Path) fixedPublisherStart() { } } -func (pa *Path) onSetStartingPoint(req source.SetStartingPointReq) { - if req.Source != pa.source { - return - } - - pa.sourceTrackStartingPoints[req.TrackID] = req.StartingPoint -} - func (pa *Path) onClientDescribe(req client.DescribeReq) { if _, ok := pa.clients[req.Client]; ok { req.Res <- client.DescribeRes{nil, "", fmt.Errorf("already subscribed")} //nolint:govet @@ -669,7 +658,7 @@ func (pa *Path) onClientPlay(req client.PlayReq) { pa.readers.add(req.Client) // clone - cl := make([]source.TrackStartingPoint, len(pa.sourceTrackStartingPoints)) + cl := make([]streamproc.TrackStartingPoint, len(pa.sourceTrackStartingPoints)) for k, v := range pa.sourceTrackStartingPoints { cl[k] = v } @@ -711,13 +700,12 @@ func (pa *Path) onClientAnnounce(req client.AnnounceReq) { pa.source = req.Client pa.sourceTracks = req.Tracks - pa.sourceTrackStartingPoints = make([]source.TrackStartingPoint, len(req.Tracks)) req.Res <- client.AnnounceRes{pa, nil} //nolint:govet } func (pa *Path) onClientRecord(req client.RecordReq) { if state, ok := pa.clients[req.Client]; !ok || state != clientStatePreRecord { - close(req.Res) + req.Res <- client.RecordRes{SP: nil, Err: fmt.Errorf("not recording anymore")} return } @@ -725,7 +713,10 @@ func (pa *Path) onClientRecord(req client.RecordReq) { pa.clients[req.Client] = clientStateRecord pa.onSourceSetReady() - close(req.Res) + pa.sourceTrackStartingPoints = make([]streamproc.TrackStartingPoint, len(pa.sourceTracks)) + pa.sp = streamproc.New(pa, len(pa.sourceTracks)) + + req.Res <- client.RecordRes{SP: pa.sp, Err: nil} } func (pa *Path) onClientPause(req client.PauseReq) { @@ -749,6 +740,14 @@ func (pa *Path) onClientPause(req client.PauseReq) { close(req.Res) } +func (pa *Path) onSPSetStartingPoint(req streamproc.SetStartingPointReq) { + if req.SP != pa.sp { + return + } + + pa.sourceTrackStartingPoints[req.TrackID] = req.StartingPoint +} + func (pa *Path) scheduleSourceClose() { if !pa.hasExternalSource() || !pa.conf.SourceOnDemand || pa.source == nil { return @@ -811,16 +810,6 @@ func (pa *Path) Name() string { return pa.name } -// OnSetStartingPoint is called by a source. -func (pa *Path) OnSetStartingPoint(req source.SetStartingPointReq) { - pa.setStartingPoint <- req -} - -// OnFrame is called by a source. -func (pa *Path) OnFrame(trackID int, streamType gortsplib.StreamType, payload []byte) { - pa.readers.forwardFrame(trackID, streamType, payload) -} - // OnExtSourceSetReady is called by an external source. func (pa *Path) OnExtSourceSetReady(req source.ExtSetReadyReq) { pa.extSourceSetReady <- req @@ -865,3 +854,13 @@ func (pa *Path) OnClientRecord(req client.RecordReq) { func (pa *Path) OnClientPause(req client.PauseReq) { pa.clientPause <- req } + +// OnSPSetStartingPoint is called by streamproc.StreamProc. +func (pa *Path) OnSPSetStartingPoint(req streamproc.SetStartingPointReq) { + pa.spSetStartingPoint <- req +} + +// OnSPFrame is called by streamproc.StreamProc. +func (pa *Path) OnSPFrame(trackID int, streamType gortsplib.StreamType, payload []byte) { + pa.readers.forwardFrame(trackID, streamType, payload) +} diff --git a/internal/path/readersmap.go b/internal/path/readersmap.go index a83880c6..882ddcd5 100644 --- a/internal/path/readersmap.go +++ b/internal/path/readersmap.go @@ -7,7 +7,7 @@ import ( ) type reader interface { - OnIncomingFrame(int, gortsplib.StreamType, []byte) + OnFrame(int, gortsplib.StreamType, []byte) } type readersMap struct { @@ -40,6 +40,6 @@ func (m *readersMap) forwardFrame(trackID int, streamType gortsplib.StreamType, defer m.mutex.RUnlock() for c := range m.ma { - c.OnIncomingFrame(trackID, streamType, buf) + c.OnFrame(trackID, streamType, buf) } } diff --git a/internal/source/source.go b/internal/source/source.go index 850fc982..105c65be 100644 --- a/internal/source/source.go +++ b/internal/source/source.go @@ -4,13 +4,6 @@ import ( "github.com/aler9/gortsplib" ) -// TrackStartingPoint is the starting point of a track. -type TrackStartingPoint struct { - Filled bool // used to avoid mutexes - SequenceNumber uint16 - Timestamp uint32 -} - // Source is implemented by all sources (clients and external sources). type Source interface { IsSource() @@ -23,18 +16,20 @@ type ExtSource interface { Close() } -// SetStartingPointReq is a set starting point request. -type SetStartingPointReq struct { - Source Source - TrackID int - StartingPoint TrackStartingPoint +// StreamProc is implemented by streamproc.StreamProc. +type StreamProc interface { + OnFrame(int, gortsplib.StreamType, []byte) +} + +// ExtSetReadyRes is a set ready response. +type ExtSetReadyRes struct { + SP StreamProc } // ExtSetReadyReq is a set ready request. type ExtSetReadyReq struct { - Tracks gortsplib.Tracks - StartingPoints []TrackStartingPoint - Res chan struct{} + Tracks gortsplib.Tracks + Res chan ExtSetReadyRes } // ExtSetNotReadyReq is a set not ready request. diff --git a/internal/sourcertmp/source.go b/internal/sourcertmp/source.go index 668900a3..395c471d 100644 --- a/internal/sourcertmp/source.go +++ b/internal/sourcertmp/source.go @@ -19,7 +19,6 @@ import ( "github.com/aler9/rtsp-simple-server/internal/rtmputils" "github.com/aler9/rtsp-simple-server/internal/source" "github.com/aler9/rtsp-simple-server/internal/stats" - "github.com/aler9/rtsp-simple-server/internal/streamproc" ) const ( @@ -31,8 +30,6 @@ type Parent interface { Log(logger.Level, string, ...interface{}) OnExtSourceSetReady(req source.ExtSetReadyReq) OnExtSourceSetNotReady(req source.ExtSetNotReadyReq) - OnSetStartingPoint(source.SetStartingPointReq) - OnFrame(int, gortsplib.StreamType, []byte) } // Source is a RTMP external source. @@ -177,13 +174,14 @@ func (s *Source) runInner() bool { } s.log(logger.Info, "ready") - res := make(chan struct{}) + + cres := make(chan source.ExtSetReadyRes) s.parent.OnExtSourceSetReady(source.ExtSetReadyReq{ - Tracks: tracks, - StartingPoints: make([]source.TrackStartingPoint, len(tracks)), - Res: res, + Tracks: tracks, + Res: cres, }) - <-res + res := <-cres + defer func() { res := make(chan struct{}) s.parent.OnExtSourceSetNotReady(source.ExtSetNotReadyReq{ @@ -195,14 +193,12 @@ func (s *Source) runInner() bool { readerDone := make(chan error) go func() { readerDone <- func() error { - rtcpSenders := rtcpsenderset.New(tracks, s.parent.OnFrame) + rtcpSenders := rtcpsenderset.New(tracks, res.SP.OnFrame) defer rtcpSenders.Close() - sp := streamproc.New(s, s.parent, make([]source.TrackStartingPoint, len(tracks))) - onFrame := func(trackID int, payload []byte) { rtcpSenders.OnFrame(trackID, gortsplib.StreamTypeRTP, payload) - sp.OnFrame(videoTrack.ID, gortsplib.StreamTypeRTP, payload) + res.SP.OnFrame(videoTrack.ID, gortsplib.StreamTypeRTP, payload) } for { diff --git a/internal/sourcertsp/source.go b/internal/sourcertsp/source.go index 6cdf4370..57a4fbb5 100644 --- a/internal/sourcertsp/source.go +++ b/internal/sourcertsp/source.go @@ -11,7 +11,6 @@ import ( "github.com/aler9/rtsp-simple-server/internal/logger" "github.com/aler9/rtsp-simple-server/internal/source" "github.com/aler9/rtsp-simple-server/internal/stats" - "github.com/aler9/rtsp-simple-server/internal/streamproc" ) const ( @@ -23,8 +22,6 @@ type Parent interface { Log(logger.Level, string, ...interface{}) OnExtSourceSetReady(req source.ExtSetReadyReq) OnExtSourceSetNotReady(req source.ExtSetNotReadyReq) - OnSetStartingPoint(source.SetStartingPointReq) - OnFrame(int, gortsplib.StreamType, []byte) } // Source is a RTSP external source. @@ -150,51 +147,15 @@ func (s *Source) runInner() bool { return true } - trackStartingPoints := make([]source.TrackStartingPoint, len(conn.Tracks())) - - if conn.RTPInfo() != nil { - for _, info := range *conn.RTPInfo() { - ipath, ok := info.URL.RTSPPath() - if !ok { - continue - } - - trackID := func() int { - for _, tr := range conn.Tracks() { - u, err := tr.URL() - if err != nil { - continue - } - - tpath, ok := u.RTSPPath() - if !ok { - continue - } - - if tpath == ipath { - return tr.ID - } - } - return -1 - }() - if trackID < 0 { - continue - } - - trackStartingPoints[trackID].Filled = true - trackStartingPoints[trackID].SequenceNumber = info.SequenceNumber - trackStartingPoints[trackID].Timestamp = info.Timestamp - } - } - s.log(logger.Info, "ready") - res := make(chan struct{}) + + cres := make(chan source.ExtSetReadyRes) s.parent.OnExtSourceSetReady(source.ExtSetReadyReq{ - Tracks: conn.Tracks(), - StartingPoints: trackStartingPoints, - Res: res, + Tracks: conn.Tracks(), + Res: cres, }) - <-res + res := <-cres + defer func() { res := make(chan struct{}) s.parent.OnExtSourceSetNotReady(source.ExtSetNotReadyReq{ @@ -203,10 +164,8 @@ func (s *Source) runInner() bool { <-res }() - sp := streamproc.New(s, s.parent, trackStartingPoints) - done := conn.ReadFrames(func(trackID int, streamType gortsplib.StreamType, payload []byte) { - sp.OnFrame(trackID, streamType, payload) + res.SP.OnFrame(trackID, streamType, payload) }) for { diff --git a/internal/streamproc/streamproc.go b/internal/streamproc/streamproc.go index 340a6ca8..77c02200 100644 --- a/internal/streamproc/streamproc.go +++ b/internal/streamproc/streamproc.go @@ -1,31 +1,43 @@ package streamproc import ( + "time" + "github.com/aler9/gortsplib" "github.com/pion/rtp" - - "github.com/aler9/rtsp-simple-server/internal/source" ) +// TrackStartingPoint is the starting point of a track. +type TrackStartingPoint struct { + Filled bool // used to avoid mutexes + RTPTime uint32 + NTPTime time.Time +} + // Path is implemented by path.path. type Path interface { - OnSetStartingPoint(source.SetStartingPointReq) - OnFrame(int, gortsplib.StreamType, []byte) + OnSPSetStartingPoint(SetStartingPointReq) + OnSPFrame(int, gortsplib.StreamType, []byte) +} + +// SetStartingPointReq is a set starting point request. +type SetStartingPointReq struct { + SP *StreamProc + TrackID int + StartingPoint TrackStartingPoint } // StreamProc is a stream processor, an intermediate layer between a source and a path. type StreamProc struct { - source source.Source path Path - startingPoints []source.TrackStartingPoint + startingPoints []TrackStartingPoint } // New allocates a StreamProc. -func New(source source.Source, path Path, startingPoints []source.TrackStartingPoint) *StreamProc { +func New(path Path, tracksLen int) *StreamProc { return &StreamProc{ - source: source, path: path, - startingPoints: startingPoints, + startingPoints: make([]TrackStartingPoint, tracksLen), } } @@ -40,15 +52,15 @@ func (sp *StreamProc) OnFrame(trackID int, streamType gortsplib.StreamType, payl } sp.startingPoints[trackID].Filled = true - sp.startingPoints[trackID].SequenceNumber = pkt.SequenceNumber - sp.startingPoints[trackID].Timestamp = pkt.Timestamp + sp.startingPoints[trackID].RTPTime = pkt.Timestamp + sp.startingPoints[trackID].NTPTime = time.Now() - sp.path.OnSetStartingPoint(source.SetStartingPointReq{ - Source: sp.source, + sp.path.OnSPSetStartingPoint(SetStartingPointReq{ + SP: sp, TrackID: trackID, StartingPoint: sp.startingPoints[trackID], }) } - sp.path.OnFrame(trackID, streamType, payload) + sp.path.OnSPFrame(trackID, streamType, payload) } diff --git a/main_clientrtsp_test.go b/main_clientrtsp_test.go index 83edec01..385ad7d8 100644 --- a/main_clientrtsp_test.go +++ b/main_clientrtsp_test.go @@ -670,8 +670,8 @@ func TestClientRTSPRTPInfo(t *testing.T) { Host: ownDockerIP + ":8554", Path: "/teststream/trackID=0", }, - SequenceNumber: 556, - Timestamp: 984512368, + SequenceNumber: 0, + Timestamp: (*dest.RTPInfo())[0].Timestamp, }, }, dest.RTPInfo()) }() @@ -705,8 +705,8 @@ func TestClientRTSPRTPInfo(t *testing.T) { Host: ownDockerIP + ":8554", Path: "/teststream/trackID=0", }, - SequenceNumber: 556, - Timestamp: 984512368, + SequenceNumber: 0, + Timestamp: (*dest.RTPInfo())[0].Timestamp, }, &headers.RTPInfoEntry{ URL: &base.URL{ @@ -714,8 +714,8 @@ func TestClientRTSPRTPInfo(t *testing.T) { Host: ownDockerIP + ":8554", Path: "/teststream/trackID=1", }, - SequenceNumber: 87, - Timestamp: 756436454, + SequenceNumber: 0, + Timestamp: (*dest.RTPInfo())[1].Timestamp, }, }, dest.RTPInfo()) }() diff --git a/main_sourcertsp_test.go b/main_sourcertsp_test.go index 511f5c5d..4528243c 100644 --- a/main_sourcertsp_test.go +++ b/main_sourcertsp_test.go @@ -220,23 +220,34 @@ func TestSourceRTSPRTPInfo(t *testing.T) { require.NoError(t, err) require.Equal(t, base.Play, req.Method) - // provide a partial RTP-Info with only one track err = base.Response{ StatusCode: base.StatusOK, - Header: base.Header{ - "RTP-Info": headers.RTPInfo{ - { - URL: base.MustParseURL("rtsp://127.0.0.1/stream/trackID=1"), - SequenceNumber: 34254, - Timestamp: 156457686, - }, - }.Write(), - }, + Header: base.Header{}, }.Write(bconn.Writer) require.NoError(t, err) - // send a packet to fill the missing RTP-Info track pkt := rtp.Packet{ + Header: rtp.Header{ + Version: 0x80, + PayloadType: 96, + SequenceNumber: 34254, + Timestamp: 156457686, + SSRC: 96342362, + }, + Payload: []byte{0x01, 0x02, 0x03, 0x04}, + } + + buf, err := pkt.Marshal() + require.NoError(t, err) + + err = base.InterleavedFrame{ + TrackID: 1, + StreamType: gortsplib.StreamTypeRTP, + Payload: buf, + }.Write(bconn.Writer) + require.NoError(t, err) + + pkt = rtp.Packet{ Header: rtp.Header{ Version: 0x80, PayloadType: 96, @@ -247,7 +258,7 @@ func TestSourceRTSPRTPInfo(t *testing.T) { Payload: []byte{0x01, 0x02, 0x03, 0x04}, } - buf, err := pkt.Marshal() + buf, err = pkt.Marshal() require.NoError(t, err) err = base.InterleavedFrame{ @@ -277,7 +288,7 @@ func TestSourceRTSPRTPInfo(t *testing.T) { require.Equal(t, true, ok) defer p1.close() - time.Sleep(500 * time.Millisecond) + time.Sleep(1000 * time.Millisecond) dest, err := gortsplib.DialRead("rtsp://127.0.1.2:8554/proxied") require.NoError(t, err) @@ -290,8 +301,8 @@ func TestSourceRTSPRTPInfo(t *testing.T) { Host: "127.0.1.2:8554", Path: "/proxied/trackID=0", }, - SequenceNumber: 87, - Timestamp: 756436454, + SequenceNumber: 0, + Timestamp: (*dest.RTPInfo())[0].Timestamp, }, &headers.RTPInfoEntry{ URL: &base.URL{ @@ -299,8 +310,8 @@ func TestSourceRTSPRTPInfo(t *testing.T) { Host: "127.0.1.2:8554", Path: "/proxied/trackID=1", }, - SequenceNumber: 34254, - Timestamp: 156457686, + SequenceNumber: 0, + Timestamp: (*dest.RTPInfo())[1].Timestamp, }, }, dest.RTPInfo()) }