Skip to content

Commit

Permalink
playback: allow filtering timespans by start and end date (#3637) (#3489
Browse files Browse the repository at this point in the history
)
  • Loading branch information
aler9 committed Dec 28, 2024
1 parent df3362a commit 41a86bd
Show file tree
Hide file tree
Showing 9 changed files with 227 additions and 157 deletions.
14 changes: 10 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -1578,10 +1578,16 @@ playbackAddress: :9996
The server provides an endpoint to list recorded timespans:

```
http://localhost:9996/list?path=[mypath]
http://localhost:9996/list?path=[mypath]&start=[start]&end=[end]
```

Where [mypath] is the name of a path. The server will return a list of timespans in JSON format:
Where:

* [mypath] is the name of a path
* [start] (optional) is the start date in [RFC3339 format](https://www.utctime.net/)
* [end] (optional) is the end date in [RFC3339 format](https://www.utctime.net/)

The server will return a list of timespans in JSON format:

```json
[
Expand All @@ -1601,13 +1607,13 @@ Where [mypath] is the name of a path. The server will return a list of timespans
The server provides an endpoint to download recordings:

```
http://localhost:9996/get?path=[mypath]&start=[start_date]&duration=[duration]&format=[format]
http://localhost:9996/get?path=[mypath]&start=[start]&duration=[duration]&format=[format]
```

Where:

* [mypath] is the path name
* [start_date] is the start date in [RFC3339 format](https://www.utctime.net/)
* [start] is the start date in [RFC3339 format](https://www.utctime.net/)
* [duration] is the maximum duration of the recording in seconds
* [format] (optional) is the output format of the stream. Available values are "fmp4" (default) and "mp4"

Expand Down
2 changes: 1 addition & 1 deletion internal/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func recordingsOfPath(
Name: pathName,
}

segments, _ := recordstore.FindSegments(pathConf, pathName)
segments, _ := recordstore.FindSegments(pathConf, pathName, nil, nil)

ret.Segments = make([]*defs.APIRecordingSegment, len(segments))

Expand Down
3 changes: 2 additions & 1 deletion internal/playback/on_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,8 @@ func (s *Server) onGet(ctx *gin.Context) {
return
}

segments, err := recordstore.FindSegmentsInTimespan(pathConf, pathName, start, duration)
end := start.Add(duration)
segments, err := recordstore.FindSegments(pathConf, pathName, &start, &end)
if err != nil {
if errors.Is(err, recordstore.ErrNoSegmentsFound) {
s.writeError(ctx, http.StatusNotFound, err)
Expand Down
43 changes: 41 additions & 2 deletions internal/playback/on_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func computeDurationAndConcatenate(
return err
}

maxDuration, err := segmentFMP4ReadMaxDuration(f, init)
maxDuration, err := segmentFMP4ReadDuration(f, init)
if err != nil {
return err
}
Expand Down Expand Up @@ -103,7 +103,31 @@ func (s *Server) onList(ctx *gin.Context) {
return
}

segments, err := recordstore.FindSegments(pathConf, pathName)
var start *time.Time
rawStart := ctx.Query("start")
if rawStart != "" {
var tmp time.Time
tmp, err = time.Parse(time.RFC3339, rawStart)
if err != nil {
s.writeError(ctx, http.StatusBadRequest, fmt.Errorf("invalid start: %w", err))
return
}

Check warning on line 114 in internal/playback/on_list.go

View check run for this annotation

Codecov / codecov/patch

internal/playback/on_list.go#L112-L114

Added lines #L112 - L114 were not covered by tests
start = &tmp
}

var end *time.Time
rawEnd := ctx.Query("end")
if rawEnd != "" {
var tmp time.Time
tmp, err = time.Parse(time.RFC3339, rawEnd)
if err != nil {
s.writeError(ctx, http.StatusBadRequest, fmt.Errorf("invalid end: %w", err))
return
}

Check warning on line 126 in internal/playback/on_list.go

View check run for this annotation

Codecov / codecov/patch

internal/playback/on_list.go#L124-L126

Added lines #L124 - L126 were not covered by tests
end = &tmp
}

segments, err := recordstore.FindSegments(pathConf, pathName, start, end)
if err != nil {
if errors.Is(err, recordstore.ErrNoSegmentsFound) {
s.writeError(ctx, http.StatusNotFound, err)
Expand All @@ -119,6 +143,21 @@ func (s *Server) onList(ctx *gin.Context) {
return
}

if start != nil {
firstEntry := entries[0]
if firstEntry.Start.Before(*start) {
entries[0].Duration -= listEntryDuration(start.Sub(firstEntry.Start))
entries[0].Start = *start
}
}

if end != nil {
lastEntry := entries[len(entries)-1]
if lastEntry.Start.Add(time.Duration(lastEntry.Duration)).After(*end) {
entries[len(entries)-1].Duration = listEntryDuration(end.Sub(lastEntry.Start))
}
}

var scheme string
if s.Encryption {
scheme = "https"
Expand Down
68 changes: 67 additions & 1 deletion internal/playback/on_list_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
"github.com/stretchr/testify/require"
)

func TestOnList(t *testing.T) {
func TestOnListUnfiltered(t *testing.T) {
dir, err := os.MkdirTemp("", "mediamtx-playback")
require.NoError(t, err)
defer os.RemoveAll(dir)
Expand Down Expand Up @@ -78,6 +78,72 @@ func TestOnList(t *testing.T) {
}, out)
}

func TestOnListFiltered(t *testing.T) {
dir, err := os.MkdirTemp("", "mediamtx-playback")
require.NoError(t, err)
defer os.RemoveAll(dir)

err = os.Mkdir(filepath.Join(dir, "mypath"), 0o755)
require.NoError(t, err)

writeSegment1(t, filepath.Join(dir, "mypath", "2008-11-07_11-22-00-500000.mp4"))
writeSegment2(t, filepath.Join(dir, "mypath", "2008-11-07_11-23-02-500000.mp4"))
writeSegment2(t, filepath.Join(dir, "mypath", "2009-11-07_11-23-02-500000.mp4"))

s := &Server{
Address: "127.0.0.1:9996",
ReadTimeout: conf.StringDuration(10 * time.Second),
PathConfs: map[string]*conf.Path{
"mypath": {
Name: "mypath",
RecordPath: filepath.Join(dir, "%path/%Y-%m-%d_%H-%M-%S-%f"),
},
},
AuthManager: test.NilAuthManager,
Parent: test.NilLogger,
}
err = s.Initialize()
require.NoError(t, err)
defer s.Close()

u, err := url.Parse("http://myuser:mypass@localhost:9996/list?start=")
require.NoError(t, err)

v := url.Values{}
v.Set("path", "mypath")
v.Set("start", time.Date(2008, 11, 0o7, 11, 22, 1, 500000000, time.Local).Format(time.RFC3339Nano))
v.Set("end", time.Date(2009, 11, 0o7, 11, 23, 4, 500000000, time.Local).Format(time.RFC3339Nano))
u.RawQuery = v.Encode()

req, err := http.NewRequest(http.MethodGet, u.String(), nil)
require.NoError(t, err)

res, err := http.DefaultClient.Do(req)
require.NoError(t, err)
defer res.Body.Close()

require.Equal(t, http.StatusOK, res.StatusCode)

var out interface{}
err = json.NewDecoder(res.Body).Decode(&out)
require.NoError(t, err)

require.Equal(t, []interface{}{
map[string]interface{}{
"duration": float64(64),
"start": time.Date(2008, 11, 0o7, 11, 22, 1, 500000000, time.Local).Format(time.RFC3339Nano),
"url": "http://localhost:9996/get?duration=64&path=mypath&start=" +
url.QueryEscape(time.Date(2008, 11, 0o7, 11, 22, 1, 500000000, time.Local).Format(time.RFC3339Nano)),
},
map[string]interface{}{
"duration": float64(2),
"start": time.Date(2009, 11, 0o7, 11, 23, 2, 500000000, time.Local).Format(time.RFC3339Nano),
"url": "http://localhost:9996/get?duration=2&path=mypath&start=" +
url.QueryEscape(time.Date(2009, 11, 0o7, 11, 23, 2, 500000000, time.Local).Format(time.RFC3339Nano)),
},
}, out)
}

func TestOnListDifferentInit(t *testing.T) {
dir, err := os.MkdirTemp("", "mediamtx-playback")
require.NoError(t, err)
Expand Down
2 changes: 1 addition & 1 deletion internal/playback/segment_fmp4.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func segmentFMP4ReadInit(r io.ReadSeeker) (*fmp4.Init, error) {
return &init, nil
}

func segmentFMP4ReadMaxDuration(
func segmentFMP4ReadDuration(
r io.ReadSeeker,
init *fmp4.Init,
) (time.Duration, error) {
Expand Down
9 changes: 4 additions & 5 deletions internal/recordcleaner/cleaner.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,16 +118,15 @@ func (c *Cleaner) processPath(now time.Time, pathName string) error {
return nil
}

segments, err := recordstore.FindSegments(pathConf, pathName)
end := now.Add(-time.Duration(pathConf.RecordDeleteAfter))
segments, err := recordstore.FindSegments(pathConf, pathName, nil, &end)
if err != nil {
return err
}

for _, seg := range segments {
if now.Sub(seg.Start) > time.Duration(pathConf.RecordDeleteAfter) {
c.Log(logger.Debug, "removing %s", seg.Fpath)
os.Remove(seg.Fpath)
}
c.Log(logger.Debug, "removing %s", seg.Fpath)
os.Remove(seg.Fpath)
}

return nil
Expand Down
88 changes: 20 additions & 68 deletions internal/recordstore/segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func regexpPathFindPathsWithSegments(pathConf *conf.Path) map[string]struct{} {
return ret
}

// FindAllPathsWithSegments returns all paths that do have segments.
// FindAllPathsWithSegments returns all paths that have at least one segment.
func FindAllPathsWithSegments(pathConfs map[string]*conf.Path) []string {
pathNames := make(map[string]struct{})

Expand Down Expand Up @@ -117,9 +117,12 @@ func FindAllPathsWithSegments(pathConfs map[string]*conf.Path) []string {
}

// FindSegments returns all segments of a path.
// Segments can be filtered by start date and end date.
func FindSegments(
pathConf *conf.Path,
pathName string,
start *time.Time,
end *time.Time,
) ([]*Segment, error) {
recordPath := PathAddExtension(
strings.ReplaceAll(pathConf.RecordPath, "%path", pathName),
Expand All @@ -133,59 +136,6 @@ func FindSegments(
commonPath := CommonPath(recordPath)
var segments []*Segment

err := filepath.Walk(commonPath, func(fpath string, info fs.FileInfo, err error) error {
if err != nil {
return err
}

if !info.IsDir() {
var pa Path
ok := pa.Decode(recordPath, fpath)
if ok {
segments = append(segments, &Segment{
Fpath: fpath,
Start: pa.Start,
})
}
}

return nil
})
if err != nil {
return nil, err
}

if segments == nil {
return nil, ErrNoSegmentsFound
}

sort.Slice(segments, func(i, j int) bool {
return segments[i].Start.Before(segments[j].Start)
})

return segments, nil
}

// FindSegmentsInTimespan returns all segments in a certain timestamp.
func FindSegmentsInTimespan(
pathConf *conf.Path,
pathName string,
start time.Time,
duration time.Duration,
) ([]*Segment, error) {
recordPath := PathAddExtension(
strings.ReplaceAll(pathConf.RecordPath, "%path", pathName),
pathConf.RecordFormat,
)

// we have to convert to absolute paths
// otherwise, recordPath and fpath inside Walk() won't have common elements
recordPath, _ = filepath.Abs(recordPath)

commonPath := CommonPath(recordPath)
end := start.Add(duration)
var segments []*Segment

err := filepath.Walk(commonPath, func(fpath string, info fs.FileInfo, err error) error {
if err != nil {
return err
Expand All @@ -196,7 +146,7 @@ func FindSegmentsInTimespan(
ok := pa.Decode(recordPath, fpath)

// gather all segments that starts before the end of the playback
if ok && !end.Before(pa.Start) {
if ok && (end == nil || !end.Before(pa.Start)) {
segments = append(segments, &Segment{
Fpath: fpath,
Start: pa.Start,
Expand All @@ -218,21 +168,23 @@ func FindSegmentsInTimespan(
return segments[i].Start.Before(segments[j].Start)
})

// find the segment that may contain the start of the playback and remove all previous ones
found := false
for i := 0; i < len(segments)-1; i++ {
if !start.Before(segments[i].Start) && start.Before(segments[i+1].Start) {
segments = segments[i:]
found = true
break
if start != nil {
// find the segment that may contain the start of the playback and remove all previous ones
found := false
for i := 0; i < len(segments)-1; i++ {
if !start.Before(segments[i].Start) && start.Before(segments[i+1].Start) {
segments = segments[i:]
found = true
break

Check warning on line 178 in internal/recordstore/segment.go

View check run for this annotation

Codecov / codecov/patch

internal/recordstore/segment.go#L175-L178

Added lines #L175 - L178 were not covered by tests
}
}
}

// otherwise, keep the last segment only and check if it may contain the start of the playback
if !found {
segments = segments[len(segments)-1:]
if segments[len(segments)-1].Start.After(start) {
return nil, ErrNoSegmentsFound
// otherwise, keep the last segment only and check if it may contain the start of the playback
if !found {
segments = segments[len(segments)-1:]
if segments[len(segments)-1].Start.After(*start) {
return nil, ErrNoSegmentsFound
}

Check warning on line 187 in internal/recordstore/segment.go

View check run for this annotation

Codecov / codecov/patch

internal/recordstore/segment.go#L186-L187

Added lines #L186 - L187 were not covered by tests
}
}

Expand Down
Loading

0 comments on commit 41a86bd

Please sign in to comment.