diff --git a/internal/staticsources/rtp/format.go b/internal/staticsources/rtp/format.go new file mode 100644 index 00000000..753efe7f --- /dev/null +++ b/internal/staticsources/rtp/format.go @@ -0,0 +1,25 @@ +package rtp + +import ( + "time" + + "github.com/bluenviron/gortsplib/v5/pkg/format" + "github.com/bluenviron/gortsplib/v5/pkg/rtpreceiver" + "github.com/pion/rtcp" +) + +type rtpFormat struct { + desc format.Format + + rtpReceiver *rtpreceiver.Receiver +} + +func (f *rtpFormat) initialize() { + f.rtpReceiver = &rtpreceiver.Receiver{ + ClockRate: f.desc.ClockRate(), + UnrealiableTransport: true, + Period: 10 * time.Second, + WritePacketRTCP: func(_ rtcp.Packet) { + }, + } +} diff --git a/internal/staticsources/rtp/media.go b/internal/staticsources/rtp/media.go new file mode 100644 index 00000000..8fdc744d --- /dev/null +++ b/internal/staticsources/rtp/media.go @@ -0,0 +1,7 @@ +package rtp + +import "github.com/bluenviron/gortsplib/v5/pkg/description" + +type rtpMedia struct { + desc *description.Media +} diff --git a/internal/staticsources/rtp/source.go b/internal/staticsources/rtp/source.go index 31061cb6..8e3dc21d 100644 --- a/internal/staticsources/rtp/source.go +++ b/internal/staticsources/rtp/source.go @@ -8,7 +8,6 @@ import ( "time" "github.com/bluenviron/gortsplib/v5/pkg/description" - "github.com/bluenviron/gortsplib/v5/pkg/format" "github.com/bluenviron/gortsplib/v5/pkg/rtptime" "github.com/bluenviron/gortsplib/v5/pkg/sdp" "github.com/bluenviron/mediamtx/internal/conf" @@ -103,6 +102,22 @@ func (s *Source) Run(params defs.StaticSourceRunParams) error { } func (s *Source) runReader(desc *description.Session, nc net.Conn) error { + packetsLost := &counterdumper.CounterDumper{ + OnReport: func(val uint64) { + s.Log(logger.Warn, "%d RTP %s lost", + val, + func() string { + if val == 1 { + return "packet" + } + return "packets" + }()) + }, + } + + packetsLost.Start() + defer packetsLost.Stop() + decodeErrors := &counterdumper.CounterDumper{ OnReport: func(val uint64) { s.Log(logger.Warn, "%d decode %s", @@ -123,13 +138,22 @@ func (s *Source) runReader(desc *description.Session, nc net.Conn) error { timeDecoder := &rtptime.GlobalDecoder{} timeDecoder.Initialize() - mediasByPayloadType := make(map[uint8]*description.Media) - formatsByPayloadType := make(map[uint8]format.Format) + mediasByPayloadType := make(map[uint8]*rtpMedia) + formatsByPayloadType := make(map[uint8]*rtpFormat) - for _, media := range desc.Medias { - for _, forma := range media.Formats { - mediasByPayloadType[forma.PayloadType()] = media - formatsByPayloadType[forma.PayloadType()] = forma + for _, descMedia := range desc.Medias { + rtpMedia := &rtpMedia{ + desc: descMedia, + } + + for _, descFormat := range descMedia.Formats { + rtpFormat := &rtpFormat{ + desc: descFormat, + } + rtpFormat.initialize() + + mediasByPayloadType[descFormat.PayloadType()] = rtpMedia + formatsByPayloadType[descFormat.PayloadType()] = rtpFormat } } @@ -173,12 +197,20 @@ func (s *Source) runReader(desc *description.Session, nc net.Conn) error { forma := formatsByPayloadType[pkt.PayloadType] - pts, ok := timeDecoder.Decode(forma, &pkt) - if !ok { - continue + pkts, lost := forma.rtpReceiver.ProcessPacket2(&pkt, time.Now(), forma.desc.PTSEqualsDTS(&pkt)) + + if lost != 0 { + packetsLost.Add(lost) } - stream.WriteRTPPacket(media, forma, &pkt, time.Time{}, pts) + for _, pkt := range pkts { + pts, ok2 := timeDecoder.Decode(forma.desc, pkt) + if !ok2 { + continue + } + + stream.WriteRTPPacket(media.desc, forma.desc, pkt, time.Time{}, pts) + } } }