diff --git a/internal/playback/on_get.go b/internal/playback/on_get.go index 350c24b6..c8ac4441 100644 --- a/internal/playback/on_get.go +++ b/internal/playback/on_get.go @@ -66,12 +66,12 @@ func seekAndMux( segmentStartOffset := start.Sub(segments[0].Start) - segmentMaxElapsed, err := segmentFMP4SeekAndMuxParts(f, segmentStartOffset, duration, firstInit, m) + segmentDuration, err := segmentFMP4SeekAndMuxParts(f, segmentStartOffset, duration, firstInit, m) if err != nil { return err } - segmentEnd = start.Add(segmentMaxElapsed) + segmentEnd = start.Add(segmentDuration) for _, seg := range segments[1:] { f, err = os.Open(seg.Fpath) @@ -92,13 +92,13 @@ func seekAndMux( segmentStartOffset := seg.Start.Sub(start) - var segmentMaxElapsed time.Duration - segmentMaxElapsed, err = segmentFMP4MuxParts(f, segmentStartOffset, duration, firstInit, m) + var segmentDuration time.Duration + segmentDuration, err = segmentFMP4MuxParts(f, segmentStartOffset, duration, firstInit, m) if err != nil { return err } - segmentEnd = start.Add(segmentMaxElapsed) + segmentEnd = start.Add(segmentDuration) } err = m.flush() diff --git a/internal/playback/on_list.go b/internal/playback/on_list.go index 7149ed71..4a44c46b 100644 --- a/internal/playback/on_list.go +++ b/internal/playback/on_list.go @@ -22,66 +22,110 @@ func (d listEntryDuration) MarshalJSON() ([]byte, error) { return json.Marshal(time.Duration(d).Seconds()) } +type parsedSegment struct { + start time.Time + init *fmp4.Init + duration time.Duration +} + +func parseSegment(seg *recordstore.Segment) (*parsedSegment, error) { + f, err := os.Open(seg.Fpath) + if err != nil { + return nil, err + } + defer f.Close() + + init, duration, err := segmentFMP4ReadHeader(f) + if err != nil { + return nil, err + } + + // if duration is not present in the header, compute it + // by parsing each part + if duration == 0 { + duration, err = segmentFMP4ReadDurationFromParts(f, init) + if err != nil { + return nil, err + } + } + + return &parsedSegment{ + start: seg.Start, + init: init, + duration: duration, + }, nil +} + +func parseSegments(segments []*recordstore.Segment) ([]*parsedSegment, error) { + parsed := make([]*parsedSegment, len(segments)) + ch := make(chan error) + + // process segments in parallel. + // parallel random access should improve performance in most cases. + // ref: https://pkolaczk.github.io/disk-parallelism/ + for i, seg := range segments { + go func(i int, seg *recordstore.Segment) { + var err error + parsed[i], err = parseSegment(seg) + ch <- err + }(i, seg) + } + + var err error + + for range segments { + err2 := <-ch + if err2 != nil { + err = err2 + } + } + + return parsed, err +} + type listEntry struct { Start time.Time `json:"start"` Duration listEntryDuration `json:"duration"` URL string `json:"url"` } -func readDurationAndConcatenate( +func concatenateSegments(parsed []*parsedSegment) []listEntry { + out := []listEntry{} + var prevInit *fmp4.Init + + for _, parsed := range parsed { + if len(out) != 0 && segmentFMP4CanBeConcatenated( + prevInit, + out[len(out)-1].Start.Add(time.Duration(out[len(out)-1].Duration)), + parsed.init, + parsed.start) { + prevStart := out[len(out)-1].Start + curEnd := parsed.start.Add(parsed.duration) + out[len(out)-1].Duration = listEntryDuration(curEnd.Sub(prevStart)) + } else { + out = append(out, listEntry{ + Start: parsed.start, + Duration: listEntryDuration(parsed.duration), + }) + } + + prevInit = parsed.init + } + + return out +} + +func parseAndConcatenate( recordFormat conf.RecordFormat, segments []*recordstore.Segment, ) ([]listEntry, error) { if recordFormat == conf.RecordFormatFMP4 { - out := []listEntry{} - var prevInit *fmp4.Init - - for _, seg := range segments { - err := func() error { - f, err := os.Open(seg.Fpath) - if err != nil { - return err - } - defer f.Close() - - init, duration, err := segmentFMP4ReadHeader(f) - if err != nil { - return err - } - - // if duration is not present in the header, compute it - // by parsing each part - if duration == 0 { - duration, err = segmentFMP4ReadDurationFromParts(f, init) - if err != nil { - return err - } - } - - if len(out) != 0 && segmentFMP4CanBeConcatenated( - prevInit, - out[len(out)-1].Start.Add(time.Duration(out[len(out)-1].Duration)), - init, - seg.Start) { - prevStart := out[len(out)-1].Start - curEnd := seg.Start.Add(duration) - out[len(out)-1].Duration = listEntryDuration(curEnd.Sub(prevStart)) - } else { - out = append(out, listEntry{ - Start: seg.Start, - Duration: listEntryDuration(duration), - }) - } - - prevInit = init - - return nil - }() - if err != nil { - return nil, err - } + parsed, err := parseSegments(segments) + if err != nil { + return nil, err } + out := concatenateSegments(parsed) return out, nil } @@ -135,7 +179,7 @@ func (s *Server) onList(ctx *gin.Context) { return } - entries, err := readDurationAndConcatenate(pathConf.RecordFormat, segments) + entries, err := parseAndConcatenate(pathConf.RecordFormat, segments) if err != nil { s.writeError(ctx, http.StatusInternalServerError, err) return