rpi: support exposing a secondary stream from the same camera (#4426)

This commit is contained in:
Alessandro Ros 2025-04-14 11:56:08 +02:00 committed by GitHub
parent 9579989eff
commit 8ce49727d6
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
18 changed files with 619 additions and 282 deletions

View file

@ -2,10 +2,16 @@
package rpicamera
import (
"context"
"errors"
"fmt"
"time"
"github.com/bluenviron/gortsplib/v4/pkg/description"
"github.com/bluenviron/gortsplib/v4/pkg/format"
"github.com/bluenviron/gortsplib/v4/pkg/format/rtph264"
"github.com/bluenviron/gortsplib/v4/pkg/format/rtpmjpeg"
"github.com/pion/rtp"
"github.com/bluenviron/mediamtx/internal/conf"
"github.com/bluenviron/mediamtx/internal/defs"
@ -14,6 +20,10 @@ import (
"github.com/bluenviron/mediamtx/internal/unit"
)
const (
pauseBetweenErrors = 1 * time.Second
)
func paramsFromConf(logLevel conf.LogLevel, cnf *conf.Path) params {
return params{
LogLevel: func() string {
@ -63,13 +73,40 @@ func paramsFromConf(logLevel conf.LogLevel, cnf *conf.Path) params {
Bitrate: uint32(cnf.RPICameraBitrate),
Profile: cnf.RPICameraProfile,
Level: cnf.RPICameraLevel,
SecondaryWidth: uint32(cnf.RPICameraSecondaryWidth),
SecondaryHeight: uint32(cnf.RPICameraSecondaryHeight),
SecondaryFPS: float32(cnf.RPICameraSecondaryFPS),
SecondaryQuality: uint32(cnf.RPICameraSecondaryJPEGQuality),
}
}
type secondaryReader struct {
ctx context.Context
ctxCancel func()
}
// Close implements reader.
func (r *secondaryReader) Close() {
r.ctxCancel()
}
// APIReaderDescribe implements reader.
func (*secondaryReader) APIReaderDescribe() defs.APIPathSourceOrReader {
return defs.APIPathSourceOrReader{
Type: "rpiCameraSecondary",
ID: "",
}
}
type parent interface {
defs.StaticSourceParent
AddReader(req defs.PathAddReaderReq) (defs.Path, *stream.Stream, error)
}
// Source is a Raspberry Pi Camera static source.
type Source struct {
LogLevel conf.LogLevel
Parent defs.StaticSourceParent
Parent parent
}
// Log implements logger.Writer.
@ -79,6 +116,15 @@ func (s *Source) Log(level logger.Level, format string, args ...interface{}) {
// Run implements StaticSource.
func (s *Source) Run(params defs.StaticSourceRunParams) error {
if !params.Conf.RPICameraSecondary {
return s.runPrimary(params)
}
return s.runSecondary(params)
}
func (s *Source) runPrimary(params defs.StaticSourceRunParams) error {
var medias []*description.Media
medi := &description.Media{
Type: description.MediaTypeVideo,
Formats: []format.Format{&format.H264{
@ -86,29 +132,88 @@ func (s *Source) Run(params defs.StaticSourceRunParams) error {
PacketizationMode: 1,
}},
}
medias := []*description.Media{medi}
medias = append(medias, medi)
var mediaSecondary *description.Media
if params.Conf.RPICameraSecondaryWidth != 0 {
mediaSecondary = &description.Media{
Type: description.MediaTypeApplication,
Formats: []format.Format{&format.Generic{
PayloadTyp: 96,
RTPMa: "rpicamera_secondary",
ClockRat: 90000,
}},
}
medias = append(medias, mediaSecondary)
}
var stream *stream.Stream
onData := func(pts int64, ntp time.Time, au [][]byte) {
initializeStream := func() {
if stream == nil {
res := s.Parent.SetReady(defs.PathSourceStaticSetReadyReq{
Desc: &description.Session{Medias: medias},
GenerateRTPPackets: true,
GenerateRTPPackets: false,
})
if res.Err != nil {
return
panic("should not happen")
}
stream = res.Stream
}
}
stream.WriteUnit(medi, medi.Formats[0], &unit.H264{
Base: unit.Base{
PTS: pts,
NTP: ntp,
},
AU: au,
})
encH264 := &rtph264.Encoder{
PayloadType: 96,
PayloadMaxSize: 1460,
}
err := encH264.Init()
if err != nil {
return err
}
onData := func(pts int64, ntp time.Time, au [][]byte) {
initializeStream()
pkts, err2 := encH264.Encode(au)
if err2 != nil {
s.Log(logger.Error, err2.Error())
return
}
for _, pkt := range pkts {
pkt.Timestamp = uint32(pts)
stream.WriteRTPPacket(medi, medi.Formats[0], pkt, ntp, pts)
}
}
var onDataSecondary func(pts int64, ntp time.Time, au []byte)
if params.Conf.RPICameraSecondaryWidth != 0 {
encJpeg := &rtpmjpeg.Encoder{
PayloadMaxSize: 1460,
}
err = encJpeg.Init()
if err != nil {
panic(err)
}
onDataSecondary = func(pts int64, ntp time.Time, au []byte) {
initializeStream()
pkts, err2 := encJpeg.Encode(au)
if err2 != nil {
s.Log(logger.Error, err2.Error())
return
}
for _, pkt := range pkts {
pkt.Timestamp = uint32(pts)
pkt.PayloadType = 96
stream.WriteRTPPacket(mediaSecondary, mediaSecondary.Formats[0], pkt, ntp, pts)
}
}
}
defer func() {
@ -118,10 +223,11 @@ func (s *Source) Run(params defs.StaticSourceRunParams) error {
}()
cam := &camera{
params: paramsFromConf(s.LogLevel, params.Conf),
onData: onData,
params: paramsFromConf(s.LogLevel, params.Conf),
onData: onData,
onDataSecondary: onDataSecondary,
}
err := cam.initialize()
err = cam.initialize()
if err != nil {
return err
}
@ -146,6 +252,93 @@ func (s *Source) Run(params defs.StaticSourceRunParams) error {
}
}
func (s *Source) runSecondary(params defs.StaticSourceRunParams) error {
r := &secondaryReader{}
r.ctx, r.ctxCancel = context.WithCancel(context.Background())
defer r.ctxCancel()
path, origStream, err := s.waitForPrimary(r, params)
if err != nil {
return err
}
defer path.RemoveReader(defs.PathRemoveReaderReq{Author: r})
media := &description.Media{
Type: description.MediaTypeVideo,
Formats: []format.Format{&format.MJPEG{}},
}
res := s.Parent.SetReady(defs.PathSourceStaticSetReadyReq{
Desc: &description.Session{Medias: []*description.Media{media}},
GenerateRTPPackets: false,
})
if res.Err != nil {
return res.Err
}
origStream.AddReader(
s,
origStream.Desc.Medias[1],
origStream.Desc.Medias[1].Formats[0],
func(u unit.Unit) error {
pkt := u.GetRTPPackets()[0]
newPkt := &rtp.Packet{
Header: pkt.Header,
Payload: pkt.Payload,
}
newPkt.PayloadType = 26
res.Stream.WriteRTPPacket(media, media.Formats[0], newPkt, u.GetNTP(), u.GetPTS())
return nil
})
origStream.StartReader(s)
defer origStream.RemoveReader(s)
select {
case err := <-origStream.ReaderError(s):
return err
case <-r.ctx.Done():
return fmt.Errorf("primary stream closed")
case <-params.Context.Done():
return fmt.Errorf("terminated")
}
}
func (s *Source) waitForPrimary(
r *secondaryReader,
params defs.StaticSourceRunParams,
) (defs.Path, *stream.Stream, error) {
for {
path, origStream, err := s.Parent.AddReader(defs.PathAddReaderReq{
Author: r,
AccessRequest: defs.PathAccessRequest{
Name: params.Conf.RPICameraPrimaryName,
SkipAuth: true,
},
})
if err != nil {
var err2 defs.PathNoStreamAvailableError
if errors.As(err, &err2) {
select {
case <-time.After(pauseBetweenErrors):
case <-params.Context.Done():
return nil, nil, fmt.Errorf("terminated")
}
continue
}
return nil, nil, err
}
return path, origStream, nil
}
}
// APISourceDescribe implements StaticSource.
func (*Source) APISourceDescribe() defs.APIPathSourceOrReader {
return defs.APIPathSourceOrReader{