diff --git a/internal/core/hls_muxer.go b/internal/core/hls_muxer.go index 9cc23585..7bafcef8 100644 --- a/internal/core/hls_muxer.go +++ b/internal/core/hls_muxer.go @@ -47,7 +47,7 @@ const index = ` -
+
@@ -57,8 +57,14 @@ const create = () => { const video = document.getElementById('video'); if (video.canPlayType('application/vnd.apple.mpegurl')) { - video.src = 'index.m3u8'; - video.play(); + // since it's not possible to detect timeout errors in iOS, + // wait for the playlist to be available before starting the stream + fetch('stream.m3u8') + .then(() => { + video.src = 'index.m3u8'; + video.play(); + }); + } else { const hls = new Hls({ progressive: false, @@ -68,9 +74,7 @@ const create = () => { if (data.fatal) { hls.destroy(); - setTimeout(() => { - create(); - }, 2000); + setTimeout(create, 2000); } }); @@ -79,8 +83,9 @@ const create = () => { video.play(); } -} -create(); +}; + +window.addEventListener('DOMContentLoaded', create); @@ -443,7 +448,7 @@ func (r *hlsMuxer) handleRequest(req hlsMuxerRequest) { req.Res <- r.muxer.StreamPlaylist() case strings.HasSuffix(req.File, ".ts"): - r := r.muxer.TSFile(req.File) + r := r.muxer.Segment(req.File) if r == nil { req.W.WriteHeader(http.StatusNotFound) req.Res <- nil diff --git a/internal/hls/multiaccessbuffer.go b/internal/hls/multiaccessbuffer.go deleted file mode 100644 index f70ca201..00000000 --- a/internal/hls/multiaccessbuffer.go +++ /dev/null @@ -1,68 +0,0 @@ -package hls - -import ( - "bytes" - "io" - "sync" -) - -type multiAccessBufferReader struct { - m *multiAccessBuffer - readPos int -} - -func (r *multiAccessBufferReader) Read(p []byte) (int, error) { - r.m.mutex.Lock() - defer r.m.mutex.Unlock() - - if r.m.closed && r.m.writePos == r.readPos { - return 0, io.EOF - } - - for !r.m.closed && r.m.writePos == r.readPos { - r.m.cond.Wait() - } - - buf := r.m.buf.Bytes() - n := copy(p, buf[r.readPos:]) - r.readPos += n - - return n, nil -} - -type multiAccessBuffer struct { - buf bytes.Buffer - closed bool - writePos int - mutex sync.Mutex - cond *sync.Cond -} - -func newMultiAccessBuffer() *multiAccessBuffer { - m := &multiAccessBuffer{} - m.cond = sync.NewCond(&m.mutex) - return m -} - -func (m *multiAccessBuffer) Close() error { - m.mutex.Lock() - m.closed = true - m.mutex.Unlock() - m.cond.Broadcast() - return nil -} - -func (m *multiAccessBuffer) Write(p []byte) (int, error) { - m.mutex.Lock() - n, _ := m.buf.Write(p) - m.writePos += n - m.mutex.Unlock() - m.cond.Broadcast() - return n, nil -} - -func (m *multiAccessBuffer) NewReader() io.Reader { - return &multiAccessBufferReader{ - m: m, - } -} diff --git a/internal/hls/multiaccessbuffer_test.go b/internal/hls/multiaccessbuffer_test.go deleted file mode 100644 index 41aafbd2..00000000 --- a/internal/hls/multiaccessbuffer_test.go +++ /dev/null @@ -1,39 +0,0 @@ -package hls - -import ( - "io" - "testing" - - "github.com/stretchr/testify/require" -) - -func TestMultiAccessBuffer(t *testing.T) { - m := newMultiAccessBuffer() - - m.Write([]byte{0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08}) - - r := m.NewReader() - - buf := make([]byte, 4) - n, err := r.Read(buf) - require.NoError(t, err) - require.Equal(t, []byte{0x01, 0x02, 0x03, 0x04}, buf[:n]) - - buf = make([]byte, 10) - n, err = r.Read(buf) - require.NoError(t, err) - require.Equal(t, []byte{0x05, 0x06, 0x07, 0x08}, buf[:n]) - - m.Write([]byte{0x09, 0x0a, 0x0b, 0x0c}) - - m.Close() - - buf = make([]byte, 10) - n, err = r.Read(buf) - require.NoError(t, err) - require.Equal(t, []byte{0x09, 0x0a, 0x0b, 0x0c}, buf[:n]) - - buf = make([]byte, 10) - _, err = r.Read(buf) - require.Equal(t, io.EOF, err) -} diff --git a/internal/hls/muxer.go b/internal/hls/muxer.go index 27b81d7b..ec48fade 100644 --- a/internal/hls/muxer.go +++ b/internal/hls/muxer.go @@ -1,13 +1,7 @@ package hls import ( - "bytes" - "encoding/hex" "io" - "math" - "strconv" - "strings" - "sync" "time" "github.com/aler9/gortsplib" @@ -32,18 +26,16 @@ type Muxer struct { videoTrack *gortsplib.Track audioTrack *gortsplib.Track - h264SPS []byte - h264PPS []byte - aacConfig rtpaac.MPEG4AudioConfig - videoDTSEst *h264.DTSEstimator - audioAUCount int - tsCurrent *tsFile - tsQueue []*tsFile - tsByName map[string]*tsFile - tsDeleteCount int - mutex sync.RWMutex - startPCR time.Time - startPTS time.Duration + h264SPS []byte + h264PPS []byte + aacConfig rtpaac.MPEG4AudioConfig + videoDTSEst *h264.DTSEstimator + audioAUCount int + currentSegment *segment + startPCR time.Time + startPTS time.Duration + primaryPlaylist *primaryPlaylist + streamPlaylist *streamPlaylist } // NewMuxer allocates a Muxer. @@ -84,19 +76,17 @@ func NewMuxer( h264PPS: h264PPS, aacConfig: aacConfig, videoDTSEst: h264.NewDTSEstimator(), - tsCurrent: newTSFile(videoTrack, audioTrack), - tsByName: make(map[string]*tsFile), + currentSegment: newSegment(videoTrack, audioTrack, h264SPS, h264PPS), + primaryPlaylist: newPrimaryPlaylist(videoTrack, audioTrack, h264SPS, h264PPS), + streamPlaylist: newStreamPlaylist(hlsSegmentCount), } - m.tsByName[m.tsCurrent.name] = m.tsCurrent - m.tsQueue = append(m.tsQueue, m.tsCurrent) - return m, nil } // Close closes a Muxer. func (m *Muxer) Close() { - m.tsCurrent.close() + m.streamPlaylist.close() } // WriteH264 writes H264 NALUs, grouped by PTS, into the muxer. @@ -112,38 +102,27 @@ func (m *Muxer) WriteH264(pts time.Duration, nalus [][]byte) error { }() // skip group silently until we find one with a IDR - if !m.tsCurrent.firstPacketWritten && !idrPresent { + if !m.currentSegment.firstPacketWritten && !idrPresent { return nil } - m.mutex.Lock() - defer m.mutex.Unlock() + if m.currentSegment.firstPacketWritten { + if idrPresent && + m.currentSegment.duration() >= m.hlsSegmentDuration { + m.streamPlaylist.pushSegment(m.currentSegment) - if idrPresent && - m.tsCurrent.firstPacketWritten && - m.tsCurrent.duration() >= m.hlsSegmentDuration { - m.tsCurrent.close() - - m.tsCurrent = newTSFile(m.videoTrack, m.audioTrack) - m.tsCurrent.setStartPCR(m.startPCR) - - m.tsByName[m.tsCurrent.name] = m.tsCurrent - m.tsQueue = append(m.tsQueue, m.tsCurrent) - if len(m.tsQueue) > m.hlsSegmentCount { - delete(m.tsByName, m.tsQueue[0].name) - m.tsQueue = m.tsQueue[1:] - m.tsDeleteCount++ + m.currentSegment = newSegment(m.videoTrack, m.audioTrack, m.h264SPS, m.h264PPS) + m.currentSegment.setStartPCR(m.startPCR) } - } else if !m.tsCurrent.firstPacketWritten { + } else { m.startPCR = time.Now() m.startPTS = pts - m.tsCurrent.setStartPCR(m.startPCR) + m.currentSegment.setStartPCR(m.startPCR) } pts = pts + ptsOffset - m.startPTS - err := m.tsCurrent.writeH264( - m.h264SPS, - m.h264PPS, + + err := m.currentSegment.writeH264( m.videoDTSEst.Feed(pts), pts, idrPresent, @@ -157,34 +136,24 @@ func (m *Muxer) WriteH264(pts time.Duration, nalus [][]byte) error { // WriteAAC writes AAC AUs, grouped by PTS, into the muxer. func (m *Muxer) WriteAAC(pts time.Duration, aus [][]byte) error { - m.mutex.Lock() - defer m.mutex.Unlock() - if m.videoTrack == nil { - if m.audioAUCount >= segmentMinAUCount && - m.tsCurrent.firstPacketWritten && - m.tsCurrent.duration() >= m.hlsSegmentDuration { - m.tsCurrent.close() + if m.currentSegment.firstPacketWritten { + if m.audioAUCount >= segmentMinAUCount && + m.currentSegment.duration() >= m.hlsSegmentDuration { + m.audioAUCount = 0 - m.audioAUCount = 0 + m.streamPlaylist.pushSegment(m.currentSegment) - m.tsCurrent = newTSFile(m.videoTrack, m.audioTrack) - m.tsCurrent.setStartPCR(m.startPCR) - - m.tsByName[m.tsCurrent.name] = m.tsCurrent - m.tsQueue = append(m.tsQueue, m.tsCurrent) - if len(m.tsQueue) > m.hlsSegmentCount { - delete(m.tsByName, m.tsQueue[0].name) - m.tsQueue = m.tsQueue[1:] - m.tsDeleteCount++ + m.currentSegment = newSegment(m.videoTrack, m.audioTrack, m.h264SPS, m.h264PPS) + m.currentSegment.setStartPCR(m.startPCR) } - } else if !m.tsCurrent.firstPacketWritten { + } else { m.startPCR = time.Now() m.startPTS = pts - m.tsCurrent.setStartPCR(m.startPCR) + m.currentSegment.setStartPCR(m.startPCR) } } else { - if !m.tsCurrent.firstPacketWritten { + if !m.currentSegment.firstPacketWritten { return nil } } @@ -194,7 +163,7 @@ func (m *Muxer) WriteAAC(pts time.Duration, aus [][]byte) error { for i, au := range aus { auPTS := pts + time.Duration(i)*1000*time.Second/time.Duration(m.aacConfig.SampleRate) - err := m.tsCurrent.writeAAC( + err := m.currentSegment.writeAAC( m.aacConfig.SampleRate, m.aacConfig.ChannelCount, auPTS, @@ -211,68 +180,15 @@ func (m *Muxer) WriteAAC(pts time.Duration, aus [][]byte) error { // PrimaryPlaylist returns a reader to read the primary playlist func (m *Muxer) PrimaryPlaylist() io.Reader { - var codecs []string - - if m.videoTrack != nil { - codecs = append(codecs, "avc1."+hex.EncodeToString(m.h264SPS[1:4])) - } - - if m.audioTrack != nil { - codecs = append(codecs, "mp4a.40.2") - } - - cnt := "#EXTM3U\n" - cnt += "#EXT-X-STREAM-INF:BANDWIDTH=200000,CODECS=\"" + strings.Join(codecs, ",") + "\"\n" - cnt += "stream.m3u8\n" - - return bytes.NewReader([]byte(cnt)) + return m.primaryPlaylist.reader() } // StreamPlaylist returns a reader to read the stream playlist. func (m *Muxer) StreamPlaylist() io.Reader { - cnt := "#EXTM3U\n" - cnt += "#EXT-X-VERSION:3\n" - cnt += "#EXT-X-ALLOW-CACHE:NO\n" - - m.mutex.RLock() - defer m.mutex.RUnlock() - - targetDuration := func() uint { - ret := uint(math.Ceil(m.hlsSegmentDuration.Seconds())) - - // EXTINF, when rounded to the nearest integer, must be <= EXT-X-TARGETDURATION - for _, f := range m.tsQueue { - v2 := uint(math.Round(f.duration().Seconds())) - if v2 > ret { - ret = v2 - } - } - - return ret - }() - cnt += "#EXT-X-TARGETDURATION:" + strconv.FormatUint(uint64(targetDuration), 10) + "\n" - - cnt += "#EXT-X-MEDIA-SEQUENCE:" + strconv.FormatInt(int64(m.tsDeleteCount), 10) + "\n" - - for _, f := range m.tsQueue { - cnt += "#EXTINF:" + strconv.FormatFloat(f.duration().Seconds(), 'f', -1, 64) + ",\n" - cnt += f.name + ".ts\n" - } - - return bytes.NewReader([]byte(cnt)) + return m.streamPlaylist.reader() } -// TSFile returns a reader to read a MPEG-TS file. -func (m *Muxer) TSFile(fname string) io.Reader { - base := strings.TrimSuffix(fname, ".ts") - - m.mutex.RLock() - f, ok := m.tsByName[base] - m.mutex.RUnlock() - - if !ok { - return nil - } - - return f.newReader() +// Segment returns a reader to read a segment. +func (m *Muxer) Segment(fname string) io.Reader { + return m.streamPlaylist.segment(fname) } diff --git a/internal/hls/muxer_test.go b/internal/hls/muxer_test.go index 2241e36e..94cfb983 100644 --- a/internal/hls/muxer_test.go +++ b/internal/hls/muxer_test.go @@ -80,13 +80,11 @@ func TestMuxer(t *testing.T) { `#EXT-X-TARGETDURATION:2\n` + `#EXT-X-MEDIA-SEQUENCE:0\n` + `#EXTINF:2,\n` + - `([0-9]+\.ts)\n` + - `#EXTINF:0,\n` + `([0-9]+\.ts)\n$`) ma := re.FindStringSubmatch(string(byts)) - require.NotEqual(t, nil, ma) + require.NotEqual(t, 0, len(ma)) - byts, err = ioutil.ReadAll(m.TSFile(ma[1])) + byts, err = ioutil.ReadAll(m.Segment(ma[1])) require.NoError(t, err) checkTSPacket(t, byts, 0, 1) @@ -107,3 +105,29 @@ func TestMuxer(t *testing.T) { byts[:24], ) } + +func TestMuxerCloseBeforeFirstSegment(t *testing.T) { + videoTrack, err := gortsplib.NewTrackH264(96, []byte{0x07, 0x01, 0x02, 0x03}, []byte{0x08}) + require.NoError(t, err) + + audioTrack, err := gortsplib.NewTrackAAC(97, []byte{17, 144}) + require.NoError(t, err) + + m, err := NewMuxer(3, 1*time.Second, videoTrack, audioTrack) + require.NoError(t, err) + + // group with IDR + err = m.WriteH264(2*time.Second, [][]byte{ + {5}, // IDR + {9}, // AUD + {8}, // PPS + {7}, // SPS + }) + require.NoError(t, err) + + m.Close() + + byts, err := ioutil.ReadAll(m.StreamPlaylist()) + require.NoError(t, err) + require.Equal(t, []byte{}, byts) +} diff --git a/internal/hls/primaryplaylist.go b/internal/hls/primaryplaylist.go new file mode 100644 index 00000000..54b7bdee --- /dev/null +++ b/internal/hls/primaryplaylist.go @@ -0,0 +1,55 @@ +package hls + +import ( + "bytes" + "encoding/hex" + "io" + "strings" + + "github.com/aler9/gortsplib" +) + +type primaryPlaylist struct { + videoTrack *gortsplib.Track + audioTrack *gortsplib.Track + h264SPS []byte + h264PPS []byte + + breader *bytes.Reader +} + +func newPrimaryPlaylist( + videoTrack *gortsplib.Track, + audioTrack *gortsplib.Track, + h264SPS []byte, + h264PPS []byte, +) *primaryPlaylist { + p := &primaryPlaylist{ + videoTrack: videoTrack, + audioTrack: audioTrack, + h264SPS: h264SPS, + h264PPS: h264PPS, + } + + var codecs []string + + if p.videoTrack != nil { + codecs = append(codecs, "avc1."+hex.EncodeToString(p.h264SPS[1:4])) + } + + if p.audioTrack != nil { + codecs = append(codecs, "mp4a.40.2") + } + + cnt := "#EXTM3U\n" + cnt += "#EXT-X-STREAM-INF:BANDWIDTH=200000,CODECS=\"" + strings.Join(codecs, ",") + "\"\n" + cnt += "stream.m3u8\n" + + p.breader = bytes.NewReader([]byte(cnt)) + + return p +} + +func (p *primaryPlaylist) reader() io.Reader { + return p.breader +} diff --git a/internal/hls/tsfile.go b/internal/hls/segment.go similarity index 85% rename from internal/hls/tsfile.go rename to internal/hls/segment.go index e82aa072..ac6a32b9 100644 --- a/internal/hls/tsfile.go +++ b/internal/hls/segment.go @@ -1,6 +1,7 @@ package hls import ( + "bytes" "context" "io" "strconv" @@ -13,10 +14,13 @@ import ( "github.com/aler9/rtsp-simple-server/internal/h264" ) -type tsFile struct { - videoTrack *gortsplib.Track +type segment struct { + videoTrack *gortsplib.Track + h264SPS []byte + h264PPS []byte + name string - buf *multiAccessBuffer + buf bytes.Buffer mux *astits.Muxer firstPacketWritten bool minPTS time.Duration @@ -25,14 +29,20 @@ type tsFile struct { pcrSendCounter int } -func newTSFile(videoTrack *gortsplib.Track, audioTrack *gortsplib.Track) *tsFile { - t := &tsFile{ +func newSegment( + videoTrack *gortsplib.Track, + audioTrack *gortsplib.Track, + h264SPS []byte, + h264PPS []byte, +) *segment { + t := &segment{ videoTrack: videoTrack, + h264SPS: h264SPS, + h264PPS: h264PPS, name: strconv.FormatInt(time.Now().Unix(), 10), - buf: newMultiAccessBuffer(), } - t.mux = astits.NewMuxer(context.Background(), t.buf) + t.mux = astits.NewMuxer(context.Background(), &t.buf) if videoTrack != nil { t.mux.AddElementaryStream(astits.PMTElementaryStream{ @@ -62,25 +72,19 @@ func newTSFile(videoTrack *gortsplib.Track, audioTrack *gortsplib.Track) *tsFile return t } -func (t *tsFile) close() error { - return t.buf.Close() -} - -func (t *tsFile) duration() time.Duration { +func (t *segment) duration() time.Duration { return t.maxPTS - t.minPTS } -func (t *tsFile) setStartPCR(startPCR time.Time) { +func (t *segment) setStartPCR(startPCR time.Time) { t.startPCR = startPCR } -func (t *tsFile) newReader() io.Reader { - return t.buf.NewReader() +func (t *segment) reader() io.Reader { + return bytes.NewReader(t.buf.Bytes()) } -func (t *tsFile) writeH264( - h264SPS []byte, - h264PPS []byte, +func (t *segment) writeH264( dts time.Duration, pts time.Duration, isIDR bool, @@ -113,8 +117,8 @@ func (t *tsFile) writeH264( // add SPS and PPS before IDR if typ == h264.NALUTypeIDR { - filteredNALUs = append(filteredNALUs, h264SPS) - filteredNALUs = append(filteredNALUs, h264PPS) + filteredNALUs = append(filteredNALUs, t.h264SPS) + filteredNALUs = append(filteredNALUs, t.h264PPS) } filteredNALUs = append(filteredNALUs, nalu) @@ -172,7 +176,7 @@ func (t *tsFile) writeH264( return err } -func (t *tsFile) writeAAC(sampleRate int, channelCount int, pts time.Duration, au []byte) error { +func (t *segment) writeAAC(sampleRate int, channelCount int, pts time.Duration, au []byte) error { if t.videoTrack == nil { if !t.firstPacketWritten { t.firstPacketWritten = true diff --git a/internal/hls/streamplaylist.go b/internal/hls/streamplaylist.go new file mode 100644 index 00000000..8de36819 --- /dev/null +++ b/internal/hls/streamplaylist.go @@ -0,0 +1,128 @@ +package hls + +import ( + "bytes" + "io" + "math" + "strconv" + "strings" + "sync" +) + +type readerFunc struct { + wrapped func() []byte + reader *bytes.Reader +} + +func (r *readerFunc) Read(buf []byte) (int, error) { + if r.reader == nil { + cnt := r.wrapped() + r.reader = bytes.NewReader(cnt) + } + return r.reader.Read(buf) +} + +type streamPlaylist struct { + hlsSegmentCount int + + mutex sync.Mutex + cond *sync.Cond + closed bool + segments []*segment + segmentByName map[string]*segment + segmentDeleteCount int +} + +func newStreamPlaylist(hlsSegmentCount int) *streamPlaylist { + p := &streamPlaylist{ + hlsSegmentCount: hlsSegmentCount, + segmentByName: make(map[string]*segment), + } + p.cond = sync.NewCond(&p.mutex) + return p +} + +func (p *streamPlaylist) close() { + func() { + p.mutex.Lock() + defer p.mutex.Unlock() + p.closed = true + }() + + p.cond.Broadcast() +} + +func (p *streamPlaylist) reader() io.Reader { + return &readerFunc{wrapped: func() []byte { + p.mutex.Lock() + defer p.mutex.Unlock() + + if !p.closed && len(p.segments) == 0 { + p.cond.Wait() + } + + if p.closed { + return nil + } + + cnt := "#EXTM3U\n" + cnt += "#EXT-X-VERSION:3\n" + cnt += "#EXT-X-ALLOW-CACHE:NO\n" + + targetDuration := func() uint { + ret := uint(0) + + // EXTINF, when rounded to the nearest integer, must be <= EXT-X-TARGETDURATION + for _, f := range p.segments { + v2 := uint(math.Round(f.duration().Seconds())) + if v2 > ret { + ret = v2 + } + } + + return ret + }() + cnt += "#EXT-X-TARGETDURATION:" + strconv.FormatUint(uint64(targetDuration), 10) + "\n" + + cnt += "#EXT-X-MEDIA-SEQUENCE:" + strconv.FormatInt(int64(p.segmentDeleteCount), 10) + "\n" + + for _, f := range p.segments { + cnt += "#EXTINF:" + strconv.FormatFloat(f.duration().Seconds(), 'f', -1, 64) + ",\n" + cnt += f.name + ".ts\n" + } + + return []byte(cnt) + }} +} + +func (p *streamPlaylist) segment(fname string) io.Reader { + base := strings.TrimSuffix(fname, ".ts") + + p.mutex.Lock() + f, ok := p.segmentByName[base] + p.mutex.Unlock() + + if !ok { + return nil + } + + return f.reader() +} + +func (p *streamPlaylist) pushSegment(t *segment) { + func() { + p.mutex.Lock() + defer p.mutex.Unlock() + + p.segmentByName[t.name] = t + p.segments = append(p.segments, t) + + if len(p.segments) > p.hlsSegmentCount { + delete(p.segmentByName, p.segments[0].name) + p.segments = p.segments[1:] + p.segmentDeleteCount++ + } + }() + + p.cond.Broadcast() +}