dilution bug fix

This commit is contained in:
Eran Gonen 2025-11-27 11:55:51 +02:00
parent 41f6002521
commit 17b72a6027

View file

@ -36,8 +36,10 @@ type streamFormat struct {
ntpEstimator *ntpestimator.Estimator
onDatas map[*Reader]OnDataFunc
// For codec-agnostic keyframe detection using RTP-level indicators
lastRTPTimestamp uint32
// For keyframe-only mode
lastRTPTimestamp uint32
currentFrameIsKeyframe bool
outputSeqNum uint16
}
func (sf *streamFormat) initialize() error {
@ -66,6 +68,84 @@ func (sf *streamFormat) writeUnit(s *Stream, medi *description.Media, u *unit.Un
sf.writeUnitInner(s, medi, u)
}
// isKeyframePacket checks if the first packet of a frame indicates a keyframe
func (sf *streamFormat) isKeyframePacket(pkt *rtp.Packet) bool {
if len(pkt.Payload) == 0 {
return false
}
switch sf.format.(type) {
case *format.H264:
nalType := pkt.Payload[0] & 0x1F
switch nalType {
case 5: // IDR
return true
case 7, 8: // SPS, PPS - usually precede IDR in same frame
return true
case 28, 29: // FU-A, FU-B
if len(pkt.Payload) >= 2 {
fuNalType := pkt.Payload[1] & 0x1F
if fuNalType == 5 { // IDR
return true
}
}
case 24: // STAP-A
payload := pkt.Payload[1:]
for len(payload) > 2 {
size := uint16(payload[0])<<8 | uint16(payload[1])
payload = payload[2:]
if int(size) > len(payload) {
break
}
if size > 0 {
innerNalType := payload[0] & 0x1F
if innerNalType == 5 || innerNalType == 7 || innerNalType == 8 {
return true
}
}
payload = payload[size:]
}
}
return false
case *format.H265:
nalType := (pkt.Payload[0] >> 1) & 0x3F
switch nalType {
case 19, 20, 21: // IDR_W_RADL, IDR_N_LP, CRA_NUT
return true
case 32, 33, 34: // VPS, SPS, PPS
return true
case 49: // FU
if len(pkt.Payload) >= 3 {
fuNalType := pkt.Payload[2] & 0x3F
if fuNalType == 19 || fuNalType == 20 || fuNalType == 21 {
return true
}
}
case 48: // AP
payload := pkt.Payload[2:]
for len(payload) > 2 {
size := uint16(payload[0])<<8 | uint16(payload[1])
payload = payload[2:]
if int(size) > len(payload) || size < 2 {
break
}
innerNalType := (payload[0] >> 1) & 0x3F
if innerNalType == 19 || innerNalType == 20 || innerNalType == 21 ||
innerNalType == 32 || innerNalType == 33 || innerNalType == 34 {
return true
}
payload = payload[size:]
}
}
return false
default:
// For other codecs, allow all frames
return true
}
}
func (sf *streamFormat) writeRTPPacket(
s *Stream,
medi *description.Media,
@ -98,43 +178,31 @@ func (sf *streamFormat) writeUnitInner(s *Stream, medi *description.Media, u *un
// Drop non-keyframes if configured
if sf.dropNonKeyframes && medi.Type == description.MediaTypeVideo {
// Codec-agnostic keyframe detection using RTP-level indicators:
// - Marker bit: indicates the last packet of a frame (complete frame)
// - Timestamp change: indicates a new frame
// Keyframe heuristic: complete frame (marker bit) on a new timestamp
if len(u.RTPPackets) > 0 {
firstPkt := u.RTPPackets[0]
isNewFrame := firstPkt.Timestamp != sf.lastRTPTimestamp
// Check if any packet has marker bit set (complete frame)
hasMarker := false
for _, pkt := range u.RTPPackets {
if pkt.Marker {
hasMarker = true
break
}
// On new frame, check if it's a keyframe by inspecting NAL type
if isNewFrame {
sf.lastRTPTimestamp = firstPkt.Timestamp
sf.currentFrameIsKeyframe = sf.isKeyframePacket(firstPkt)
}
// Allow complete frames (with marker bit) on new timestamps
// Also allow the first frame through (when lastRTPTimestamp is 0)
if hasMarker {
// Complete frame - allow through if it's a new frame or first frame
if isNewFrame || sf.lastRTPTimestamp == 0 {
// Update last timestamp when we output a new frame
if isNewFrame {
sf.lastRTPTimestamp = firstPkt.Timestamp
}
} else {
// Same timestamp as last frame - likely a duplicate or continuation, drop it
return
}
} else {
// Incomplete frame (no marker bit) - drop it
// Drop if not part of a keyframe
if !sf.currentFrameIsKeyframe {
return
}
}
}
// Rewrite sequence numbers when dropping non-keyframes to ensure continuity
if sf.dropNonKeyframes && medi.Type == description.MediaTypeVideo {
for _, pkt := range u.RTPPackets {
pkt.SequenceNumber = sf.outputSeqNum
sf.outputSeqNum++
}
}
size := unitSize(u)
atomic.AddUint64(s.bytesReceived, size)