diff --git a/internal/sourcertmp/source.go b/internal/sourcertmp/source.go index c3205133..7480e55a 100644 --- a/internal/sourcertmp/source.go +++ b/internal/sourcertmp/source.go @@ -13,6 +13,7 @@ import ( "github.com/aler9/gortsplib/pkg/rtph264" "github.com/notedit/rtmp/av" "github.com/notedit/rtmp/codec/h264" + "github.com/notedit/rtmp/format/flv/flvio" "github.com/notedit/rtmp/format/rtmp" "github.com/aler9/rtsp-simple-server/internal/logger" @@ -20,8 +21,10 @@ import ( ) const ( - retryPause = 5 * time.Second - analyzeTimeout = 8 * time.Second + retryPause = 5 * time.Second + + codecH264 = 7 + codecAAC = 10 ) // Parent is implemeneted by path.Path. @@ -107,6 +110,33 @@ func (s *Source) run() { } } +func readMetadata(conn *rtmp.Conn) (flvio.AMFMap, error) { + pkt, err := conn.ReadPacket() + if err != nil { + return nil, err + } + + if pkt.Type != av.Metadata { + return nil, fmt.Errorf("first packet must be metadata") + } + + arr, err := flvio.ParseAMFVals(pkt.Data, false) + if err != nil { + return nil, err + } + + if len(arr) != 1 { + return nil, fmt.Errorf("invalid metadata") + } + + ma, ok := arr[0].(flvio.AMFMap) + if !ok { + return nil, fmt.Errorf("invalid metadata") + } + + return ma, nil +} + func (s *Source) runInner() bool { s.log(logger.Info, "connecting") @@ -130,64 +160,6 @@ func (s *Source) runInner() bool { return true } - // gather video and audio features - var h264Sps []byte - var h264Pps []byte - var aacConfig []byte - confDone := make(chan struct{}) - confClose := uint32(0) - go func() { - defer close(confDone) - - for { - var pkt av.Packet - pkt, err = conn.ReadPacket() - if err != nil { - return - } - - if atomic.LoadUint32(&confClose) > 0 { - return - } - - switch pkt.Type { - case av.H264DecoderConfig: - codec, err := h264.FromDecoderConfig(pkt.Data) - if err != nil { - panic(err) - } - - h264Sps, h264Pps = codec.SPS[0], codec.PPS[0] - - if aacConfig != nil { - return - } - - case av.AACDecoderConfig: - aacConfig = pkt.Data - - if h264Sps != nil { - return - } - } - } - }() - - timer := time.NewTimer(analyzeTimeout) - defer timer.Stop() - - select { - case <-confDone: - case <-timer.C: - atomic.StoreUint32(&confClose, 1) - <-confDone - } - - if err != nil { - s.log(logger.Info, "ERR: %s", err) - return true - } - var tracks gortsplib.Tracks var videoTrack *gortsplib.Track @@ -198,47 +170,120 @@ func (s *Source) runInner() bool { var audioRTCPSender *rtcpsender.RTCPSender var aacEncoder *rtpaac.Encoder - if h264Sps != nil { - videoTrack, err = gortsplib.NewTrackH264(96, h264Sps, h264Pps) + confDone := make(chan error) + go func() { + confDone <- func() error { + md, err := readMetadata(conn) + if err != nil { + return err + } + + hasVideo := false + if v, ok := md.GetFloat64("videocodecid"); ok { + switch v { + case codecH264: + hasVideo = true + case 0: + default: + return fmt.Errorf("unsupported video codec %v", v) + } + + } + + hasAudio := false + if v, ok := md.GetFloat64("audiocodecid"); ok { + switch v { + case codecAAC: + hasAudio = true + case 0: + default: + return fmt.Errorf("unsupported audio codec %v", v) + } + } + + if !hasVideo && !hasAudio { + return fmt.Errorf("stream has no tracks") + } + + for { + var pkt av.Packet + pkt, err = conn.ReadPacket() + if err != nil { + return err + } + + switch pkt.Type { + case av.H264DecoderConfig: + if !hasVideo { + return fmt.Errorf("unexpected video packet") + } + if videoTrack != nil { + return fmt.Errorf("video track setupped twice") + } + + codec, err := h264.FromDecoderConfig(pkt.Data) + if err != nil { + return err + } + + videoTrack, err = gortsplib.NewTrackH264(96, codec.SPS[0], codec.PPS[0]) + if err != nil { + return err + } + + clockRate, _ := videoTrack.ClockRate() + videoRTCPSender = rtcpsender.New(clockRate) + + h264Encoder, err = rtph264.NewEncoder(96) + if err != nil { + return err + } + + tracks = append(tracks, videoTrack) + + case av.AACDecoderConfig: + if !hasAudio { + return fmt.Errorf("unexpected audio packet") + } + if audioTrack != nil { + return fmt.Errorf("audio track setupped twice") + } + + audioTrack, err = gortsplib.NewTrackAAC(96, pkt.Data) + if err != nil { + return err + } + + clockRate, _ := audioTrack.ClockRate() + audioRTCPSender = rtcpsender.New(clockRate) + + aacEncoder, err = rtpaac.NewEncoder(96, clockRate) + if err != nil { + return err + } + + tracks = append(tracks, audioTrack) + } + + if (!hasVideo || videoTrack != nil) && + (!hasAudio || audioTrack != nil) { + return nil + } + } + }() + }() + + select { + case err := <-confDone: if err != nil { s.log(logger.Info, "ERR: %s", err) return true } - clockRate, _ := videoTrack.ClockRate() - videoRTCPSender = rtcpsender.New(clockRate) - - h264Encoder, err = rtph264.NewEncoder(96) - if err != nil { - s.log(logger.Info, "ERR: %s", err) - return true - } - - tracks = append(tracks, videoTrack) - } - - if aacConfig != nil { - audioTrack, err = gortsplib.NewTrackAAC(96, aacConfig) - if err != nil { - s.log(logger.Info, "ERR: %s", err) - return true - } - - clockRate, _ := audioTrack.ClockRate() - audioRTCPSender = rtcpsender.New(clockRate) - - aacEncoder, err = rtpaac.NewEncoder(96, clockRate) - if err != nil { - s.log(logger.Info, "ERR: %s", err) - return true - } - - tracks = append(tracks, audioTrack) - } - - if len(tracks) == 0 { - s.log(logger.Info, "ERR: no tracks found") - return true + case <-s.terminate: + nconn.Close() + <-confDone + return false } for i, t := range tracks { @@ -284,61 +329,56 @@ func (s *Source) runInner() bool { readerDone := make(chan error) go func() { - for { - pkt, err := conn.ReadPacket() - if err != nil { - readerDone <- err - return - } - - switch pkt.Type { - case av.H264: - if h264Sps == nil { - readerDone <- fmt.Errorf("rtmp source ERR: received an H264 frame, but track is not setup up") - return - } - - // decode from AVCC format - nalus, typ := h264.SplitNALUs(pkt.Data) - if typ != h264.NALU_AVCC { - readerDone <- fmt.Errorf("invalid NALU format (%d)", typ) - return - } - - // encode into RTP/H264 format - frames, err := h264Encoder.Write(pkt.Time+pkt.CTime, nalus) + readerDone <- func() error { + for { + pkt, err := conn.ReadPacket() if err != nil { - readerDone <- err - return + return err } - for _, f := range frames { - videoRTCPSender.ProcessFrame(time.Now(), gortsplib.StreamTypeRTP, f) - s.parent.OnFrame(videoTrack.ID, gortsplib.StreamTypeRTP, f) - } + switch pkt.Type { + case av.H264: + if videoTrack == nil { + return fmt.Errorf("rtmp source ERR: received an H264 frame, but track is not setup up") + } - case av.AAC: - if aacConfig == nil { - readerDone <- fmt.Errorf("rtmp source ERR: received an AAC frame, but track is not setup up") - return - } + // decode from AVCC format + nalus, typ := h264.SplitNALUs(pkt.Data) + if typ != h264.NALU_AVCC { + return fmt.Errorf("invalid NALU format (%d)", typ) + } - frames, err := aacEncoder.Write(pkt.Time+pkt.CTime, pkt.Data) - if err != nil { - readerDone <- err - return - } + // encode into RTP/H264 format + frames, err := h264Encoder.Write(pkt.Time+pkt.CTime, nalus) + if err != nil { + return err + } - for _, f := range frames { - audioRTCPSender.ProcessFrame(time.Now(), gortsplib.StreamTypeRTP, f) - s.parent.OnFrame(audioTrack.ID, gortsplib.StreamTypeRTP, f) - } + for _, f := range frames { + videoRTCPSender.ProcessFrame(time.Now(), gortsplib.StreamTypeRTP, f) + s.parent.OnFrame(videoTrack.ID, gortsplib.StreamTypeRTP, f) + } - default: - readerDone <- fmt.Errorf("rtmp source ERR: unexpected packet: %v", pkt.Type) - return + case av.AAC: + if audioTrack == nil { + return fmt.Errorf("rtmp source ERR: received an AAC frame, but track is not setup up") + } + + frames, err := aacEncoder.Write(pkt.Time+pkt.CTime, pkt.Data) + if err != nil { + return err + } + + for _, f := range frames { + audioRTCPSender.ProcessFrame(time.Now(), gortsplib.StreamTypeRTP, f) + s.parent.OnFrame(audioTrack.ID, gortsplib.StreamTypeRTP, f) + } + + default: + return fmt.Errorf("rtmp source ERR: unexpected packet: %v", pkt.Type) + } } - } + }() }() for { diff --git a/main_test.go b/main_test.go index a2634184..002caea7 100644 --- a/main_test.go +++ b/main_test.go @@ -651,7 +651,8 @@ func TestSource(t *testing.T) { "rtsp_udp", "rtsp_tcp", "rtsps", - "rtmp", + "rtmp_videoaudio", + "rtmp_video", } { t.Run(source, func(t *testing.T) { switch source { @@ -730,15 +731,20 @@ func TestSource(t *testing.T) { require.Equal(t, true, ok) defer p2.close() - case "rtmp": + case "rtmp_videoaudio", "rtmp_video": cnt1, err := newContainer("nginx-rtmp", "rtmpserver", []string{}) require.NoError(t, err) defer cnt1.close() + input := "emptyvideoaudio.ts" + if source == "rtmp_video" { + input = "emptyvideo.ts" + } + cnt2, err := newContainer("ffmpeg", "source", []string{ "-re", "-stream_loop", "-1", - "-i", "emptyvideo.ts", + "-i", input, "-c", "copy", "-f", "flv", "rtmp://" + cnt1.ip() + "/stream/test", diff --git a/testimages/ffmpeg/Dockerfile b/testimages/ffmpeg/Dockerfile index 1f380e26..e4d2228b 100644 --- a/testimages/ffmpeg/Dockerfile +++ b/testimages/ffmpeg/Dockerfile @@ -3,7 +3,7 @@ FROM amd64/alpine:3.12 RUN apk add --no-cache \ ffmpeg -COPY emptyvideo.ts / +COPY *.ts / COPY start.sh / RUN chmod +x /start.sh diff --git a/testimages/ffmpeg/emptyvideoaudio.ts b/testimages/ffmpeg/emptyvideoaudio.ts new file mode 100644 index 00000000..a25061ea Binary files /dev/null and b/testimages/ffmpeg/emptyvideoaudio.ts differ