Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

playaback: use a fixed fMP4 part duration #3203

Merged
merged 1 commit into from
Apr 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions internal/playback/muxer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@ package playback
type muxer interface {
writeInit(init []byte)
setTrack(trackID int)
writeSample(dts int64, ptsOffset int32, isNonSyncSample bool, payload []byte)
writeSample(dts int64, ptsOffset int32, isNonSyncSample bool, payload []byte) error
writeFinalDTS(dts int64)
flush() error
finalFlush() error
}
96 changes: 42 additions & 54 deletions internal/playback/muxer_fmp4.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,17 @@

import (
"io"
"time"

"github.com/bluenviron/mediacommon/pkg/formats/fmp4"
"github.com/bluenviron/mediacommon/pkg/formats/fmp4/seekablebuffer"
)

var partSize = durationGoToMp4(1*time.Second, fmp4Timescale)

type muxerFMP4Track struct {
started bool
id int
firstDTS uint64
firstDTS int64
lastDTS int64
samples []*fmp4.PartSample
}
Expand Down Expand Up @@ -42,58 +44,27 @@
w.curTrack = findTrack(w.tracks, trackID)
if w.curTrack == nil {
w.curTrack = &muxerFMP4Track{
id: trackID,
id: trackID,
firstDTS: -1,
}
w.tracks = append(w.tracks, w.curTrack)
}
}

func (w *muxerFMP4) writeSample(dts int64, ptsOffset int32, isNonSyncSample bool, payload []byte) {
if !w.curTrack.started {
if dts >= 0 {
w.curTrack.started = true
w.curTrack.firstDTS = uint64(dts)
func (w *muxerFMP4) writeSample(dts int64, ptsOffset int32, isNonSyncSample bool, payload []byte) error {
if dts >= 0 {
if w.curTrack.firstDTS < 0 {
w.curTrack.firstDTS = dts

// reset GOP preceding the first frame
if !isNonSyncSample {
w.curTrack.samples = []*fmp4.PartSample{{
PTSOffset: ptsOffset,
IsNonSyncSample: isNonSyncSample,
Payload: payload,
}}
} else {
w.curTrack.samples = append(w.curTrack.samples, &fmp4.PartSample{
PTSOffset: ptsOffset,
IsNonSyncSample: isNonSyncSample,
Payload: payload,
})
w.curTrack.samples = nil

Check warning on line 61 in internal/playback/muxer_fmp4.go

View check run for this annotation

Codecov / codecov/patch

internal/playback/muxer_fmp4.go#L61

Added line #L61 was not covered by tests
}
w.curTrack.lastDTS = dts
} else {
ptsOffset = 0

if !isNonSyncSample {
w.curTrack.samples = []*fmp4.PartSample{{
PTSOffset: ptsOffset,
IsNonSyncSample: isNonSyncSample,
Payload: payload,
}}
} else {
w.curTrack.samples = append(w.curTrack.samples, &fmp4.PartSample{
PTSOffset: ptsOffset,
IsNonSyncSample: isNonSyncSample,
Payload: payload,
})
}
}
} else {
if w.curTrack.samples == nil {
w.curTrack.firstDTS = uint64(dts)
} else {
diff := dts - w.curTrack.lastDTS
if diff < 0 {
diff = 0
}

w.curTrack.samples[len(w.curTrack.samples)-1].Duration = uint32(diff)
}

Expand All @@ -103,25 +74,48 @@
Payload: payload,
})
w.curTrack.lastDTS = dts

if (w.curTrack.lastDTS - w.curTrack.firstDTS) > int64(partSize) {
err := w.innerFlush(false)
if err != nil {
return err
}

Check warning on line 82 in internal/playback/muxer_fmp4.go

View check run for this annotation

Codecov / codecov/patch

internal/playback/muxer_fmp4.go#L81-L82

Added lines #L81 - L82 were not covered by tests
}
} else {
// store GOP preceding the first frame, with PTSOffset = 0 and Duration = 0
if !isNonSyncSample {
w.curTrack.samples = []*fmp4.PartSample{{
IsNonSyncSample: isNonSyncSample,
Payload: payload,
}}
} else {
w.curTrack.samples = append(w.curTrack.samples, &fmp4.PartSample{
IsNonSyncSample: isNonSyncSample,
Payload: payload,
})
}

Check warning on line 96 in internal/playback/muxer_fmp4.go

View check run for this annotation

Codecov / codecov/patch

internal/playback/muxer_fmp4.go#L92-L96

Added lines #L92 - L96 were not covered by tests
}

return nil
}

func (w *muxerFMP4) writeFinalDTS(dts int64) {
if w.curTrack.started && w.curTrack.samples != nil {
if w.curTrack.firstDTS >= 0 {
diff := dts - w.curTrack.lastDTS
if diff < 0 {
diff = 0
}

w.curTrack.samples[len(w.curTrack.samples)-1].Duration = uint32(diff)
}
}

func (w *muxerFMP4) flush2(final bool) error {
func (w *muxerFMP4) innerFlush(final bool) error {
var part fmp4.Part

for _, track := range w.tracks {
if track.started && (len(track.samples) > 1 || (final && len(track.samples) != 0)) {
if track.firstDTS >= 0 && (len(track.samples) > 1 || (final && len(track.samples) != 0)) {
// do not write the final sample
// in order to allow changing its duration to compensate NTP-DTS differences
var samples []*fmp4.PartSample
if !final {
samples = track.samples[:len(track.samples)-1]
Expand All @@ -131,15 +125,13 @@

part.Tracks = append(part.Tracks, &fmp4.PartTrack{
ID: track.id,
BaseTime: track.firstDTS,
BaseTime: uint64(track.firstDTS),
Samples: samples,
})

if !final {
track.samples = track.samples[len(track.samples)-1:]
track.firstDTS = uint64(track.lastDTS)
} else {
track.samples = nil
track.firstDTS = track.lastDTS
}
}
}
Expand Down Expand Up @@ -173,9 +165,5 @@
}

func (w *muxerFMP4) flush() error {
return w.flush2(false)
}

func (w *muxerFMP4) finalFlush() error {
return w.flush2(true)
return w.innerFlush(true)
}
5 changes: 2 additions & 3 deletions internal/playback/on_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,14 +110,13 @@ func seekAndMux(
}()
if err != nil {
if errors.Is(err, errStopIteration) {
return nil
break
}

return err
}
}

err = m.finalFlush()
err = m.flush()
if err != nil {
return err
}
Expand Down
44 changes: 16 additions & 28 deletions internal/playback/on_get_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ func TestOnGet(t *testing.T) {

writeSegment1(t, filepath.Join(dir, "mypath", "2008-11-07_11-22-00-500000.mp4"))
writeSegment2(t, filepath.Join(dir, "mypath", "2008-11-07_11-23-02-500000.mp4"))
writeSegment2(t, filepath.Join(dir, "mypath", "2008-11-07_11-23-04-500000.mp4"))

s := &Server{
Address: "127.0.0.1:9996",
Expand All @@ -252,7 +253,7 @@ func TestOnGet(t *testing.T) {
v := url.Values{}
v.Set("path", "mypath")
v.Set("start", time.Date(2008, 11, 0o7, 11, 23, 1, 500000000, time.Local).Format(time.RFC3339Nano))
v.Set("duration", "2")
v.Set("duration", "3")
v.Set("format", "fmp4")
u.RawQuery = v.Encode()

Expand Down Expand Up @@ -283,36 +284,29 @@ func TestOnGet(t *testing.T) {
Duration: 0,
Payload: []byte{3, 4},
},
},
},
},
},
{
SequenceNumber: 1,
Tracks: []*fmp4.PartTrack{
{
ID: 1,
BaseTime: 0,
Samples: []*fmp4.PartSample{
{
Duration: 90000,
IsNonSyncSample: true,
Payload: []byte{5, 6},
},
{
Duration: 90000,
Payload: []byte{7, 8},
},
},
},
},
},
{
SequenceNumber: 2,
SequenceNumber: 1,
Tracks: []*fmp4.PartTrack{
{
ID: 1,
BaseTime: 90000,
BaseTime: 180000,
Samples: []*fmp4.PartSample{
{
Duration: 90000,
Payload: []byte{7, 8},
Payload: []byte{9, 10},
},
},
},
Expand Down Expand Up @@ -385,6 +379,11 @@ func TestOnGetDifferentInit(t *testing.T) {
Duration: 0,
Payload: []byte{3, 4},
},
{
Duration: 90000,
IsNonSyncSample: true,
Payload: []byte{5, 6},
},
},
},
},
Expand Down Expand Up @@ -456,17 +455,6 @@ func TestOnGetNTPCompensation(t *testing.T) {
Duration: 0,
Payload: []byte{3, 4},
},
},
},
},
},
{
SequenceNumber: 1,
Tracks: []*fmp4.PartTrack{
{
ID: 1,
BaseTime: 0,
Samples: []*fmp4.PartSample{
{
Duration: 45000, // 90 - 45
IsNonSyncSample: true,
Expand All @@ -481,11 +469,11 @@ func TestOnGetNTPCompensation(t *testing.T) {
},
},
{
SequenceNumber: 2,
SequenceNumber: 1,
Tracks: []*fmp4.PartTrack{
{
ID: 1,
BaseTime: 135000, // 180 - 45
BaseTime: 135000,
Samples: []*fmp4.PartSample{
{
Duration: 90000,
Expand Down
22 changes: 8 additions & 14 deletions internal/playback/segment_fmp4.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,12 +374,15 @@
return nil, err
}

m.writeSample(
err = m.writeSample(
muxerDTS,
e.SampleCompositionTimeOffsetV1,
(e.SampleFlags&sampleFlagIsNonSyncSample) != 0,
payload,
)
if err != nil {
return nil, err
}

Check warning on line 385 in internal/playback/segment_fmp4.go

View check run for this annotation

Codecov / codecov/patch

internal/playback/segment_fmp4.go#L384-L385

Added lines #L384 - L385 were not covered by tests

muxerDTS += int64(e.SampleDuration)
}
Expand All @@ -389,12 +392,6 @@
if muxerDTS > maxMuxerDTS {
maxMuxerDTS = muxerDTS
}

case "mdat":
err := m.flush()
if err != nil {
return nil, err
}
}
return nil, nil
})
Expand Down Expand Up @@ -474,12 +471,15 @@
return nil, err
}

m.writeSample(
err = m.writeSample(
muxerDTS,
e.SampleCompositionTimeOffsetV1,
(e.SampleFlags&sampleFlagIsNonSyncSample) != 0,
payload,
)
if err != nil {
return nil, err
}

Check warning on line 482 in internal/playback/segment_fmp4.go

View check run for this annotation

Codecov / codecov/patch

internal/playback/segment_fmp4.go#L481-L482

Added lines #L481 - L482 were not covered by tests

muxerDTS += int64(e.SampleDuration)
}
Expand All @@ -489,12 +489,6 @@
if muxerDTS > maxMuxerDTS {
maxMuxerDTS = muxerDTS
}

case "mdat":
err := m.flush()
if err != nil {
return nil, err
}
}
return nil, nil
})
Expand Down
Loading