rtp: reorder packets before ingestion (#5194)
Some checks failed
code_lint / go (push) Has been cancelled
code_lint / go_mod (push) Has been cancelled
code_lint / docs (push) Has been cancelled
code_lint / api_docs (push) Has been cancelled
code_test / test_64 (push) Has been cancelled
code_test / test_32 (push) Has been cancelled
code_test / test_e2e (push) Has been cancelled

This commit is contained in:
Alessandro Ros 2025-11-14 17:19:48 +01:00 committed by GitHub
parent f52670e630
commit 3f599d236f
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 75 additions and 11 deletions

View file

@ -0,0 +1,25 @@
package rtp
import (
"time"
"github.com/bluenviron/gortsplib/v5/pkg/format"
"github.com/bluenviron/gortsplib/v5/pkg/rtpreceiver"
"github.com/pion/rtcp"
)
type rtpFormat struct {
desc format.Format
rtpReceiver *rtpreceiver.Receiver
}
func (f *rtpFormat) initialize() {
f.rtpReceiver = &rtpreceiver.Receiver{
ClockRate: f.desc.ClockRate(),
UnrealiableTransport: true,
Period: 10 * time.Second,
WritePacketRTCP: func(_ rtcp.Packet) {
},
}
}

View file

@ -0,0 +1,7 @@
package rtp
import "github.com/bluenviron/gortsplib/v5/pkg/description"
type rtpMedia struct {
desc *description.Media
}

View file

@ -8,7 +8,6 @@ import (
"time"
"github.com/bluenviron/gortsplib/v5/pkg/description"
"github.com/bluenviron/gortsplib/v5/pkg/format"
"github.com/bluenviron/gortsplib/v5/pkg/rtptime"
"github.com/bluenviron/gortsplib/v5/pkg/sdp"
"github.com/bluenviron/mediamtx/internal/conf"
@ -103,6 +102,22 @@ func (s *Source) Run(params defs.StaticSourceRunParams) error {
}
func (s *Source) runReader(desc *description.Session, nc net.Conn) error {
packetsLost := &counterdumper.CounterDumper{
OnReport: func(val uint64) {
s.Log(logger.Warn, "%d RTP %s lost",
val,
func() string {
if val == 1 {
return "packet"
}
return "packets"
}())
},
}
packetsLost.Start()
defer packetsLost.Stop()
decodeErrors := &counterdumper.CounterDumper{
OnReport: func(val uint64) {
s.Log(logger.Warn, "%d decode %s",
@ -123,13 +138,22 @@ func (s *Source) runReader(desc *description.Session, nc net.Conn) error {
timeDecoder := &rtptime.GlobalDecoder{}
timeDecoder.Initialize()
mediasByPayloadType := make(map[uint8]*description.Media)
formatsByPayloadType := make(map[uint8]format.Format)
mediasByPayloadType := make(map[uint8]*rtpMedia)
formatsByPayloadType := make(map[uint8]*rtpFormat)
for _, media := range desc.Medias {
for _, forma := range media.Formats {
mediasByPayloadType[forma.PayloadType()] = media
formatsByPayloadType[forma.PayloadType()] = forma
for _, descMedia := range desc.Medias {
rtpMedia := &rtpMedia{
desc: descMedia,
}
for _, descFormat := range descMedia.Formats {
rtpFormat := &rtpFormat{
desc: descFormat,
}
rtpFormat.initialize()
mediasByPayloadType[descFormat.PayloadType()] = rtpMedia
formatsByPayloadType[descFormat.PayloadType()] = rtpFormat
}
}
@ -173,12 +197,20 @@ func (s *Source) runReader(desc *description.Session, nc net.Conn) error {
forma := formatsByPayloadType[pkt.PayloadType]
pts, ok := timeDecoder.Decode(forma, &pkt)
if !ok {
continue
pkts, lost := forma.rtpReceiver.ProcessPacket2(&pkt, time.Now(), forma.desc.PTSEqualsDTS(&pkt))
if lost != 0 {
packetsLost.Add(lost)
}
stream.WriteRTPPacket(media, forma, &pkt, time.Time{}, pts)
for _, pkt := range pkts {
pts, ok2 := timeDecoder.Decode(forma.desc, pkt)
if !ok2 {
continue
}
stream.WriteRTPPacket(media.desc, forma.desc, pkt, time.Time{}, pts)
}
}
}