Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

LPMS bottleneck fix #141

Merged
merged 4 commits into from
Jan 10, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 90 additions & 0 deletions ffmpeg/api_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package ffmpeg

import (
"testing"
)

func TestTranscoderAPI_InvalidFile(t *testing.T) {
// Test the following file open results on input: fail, success, fail, success

tc := NewTranscoder()
defer tc.StopTranscoder()
in := &TranscodeOptionsIn{}
out := []TranscodeOptions{TranscodeOptions{
Oname: "-",
AudioEncoder: ComponentOptions{Name: "copy"},
VideoEncoder: ComponentOptions{Name: "drop"},
Muxer: ComponentOptions{Name: "null"},
}}

// fail # 1
in.Fname = "none"
_, err := tc.Transcode(in, out)
if err == nil || err.Error() != "No such file or directory" {
t.Error("Expected 'No such file or directory', got ", err)
}

// success # 1
in.Fname = "../transcoder/test.ts"
_, err = tc.Transcode(in, out)
if err != nil {
t.Error(err)
}

// fail # 2
in.Fname = "none"
_, err = tc.Transcode(in, out)
if err == nil || err.Error() != "No such file or directory" {
t.Error("Expected 'No such file or directory', got ", err)
}

// success # 2
in.Fname = "../transcoder/test.ts"
_, err = tc.Transcode(in, out)
if err != nil {
t.Error(err)
}

// Now check invalid output filename
out[0].Muxer = ComponentOptions{Name: "md5"}
out[0].Oname = "/not/really/anywhere"
_, err = tc.Transcode(in, out)
if err == nil {
t.Error(err)
}

}

func TestTranscoderAPI_Stopped(t *testing.T) {

// Test stopped transcoder
tc := NewTranscoder()
tc.StopTranscoder()
in := &TranscodeOptionsIn{}
_, err := tc.Transcode(in, nil)
if err != ErrTranscoderStp {
t.Errorf("Unexpected error; wanted %v but got %v", ErrTranscoderStp, err)
}

// test somehow munged transcoder handle
tc2 := NewTranscoder()
tc2.handle = nil // technically this leaks memory ... OK for test
_, err = tc2.Transcode(in, nil)
if err != ErrTranscoderStp {
t.Errorf("Unexpected error; wanted %v but got %v", ErrTranscoderStp, err)
}
}

func TestTranscoderAPI_TooManyOutputs(t *testing.T) {

out := make([]TranscodeOptions, 11)
for i, _ := range out {
out[i].VideoEncoder = ComponentOptions{Name: "drop"}
}
in := &TranscodeOptionsIn{}
tc := NewTranscoder()
_, err := tc.Transcode(in, out)
if err == nil || err.Error() != "Too many outputs" {
t.Error("Expected 'Too many outputs', got ", err)
}
}
64 changes: 61 additions & 3 deletions ffmpeg/ffmpeg.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"path/filepath"
"strconv"
"strings"
"sync"
"unsafe"
)

Expand All @@ -19,6 +20,7 @@ import "C"
var ErrTranscoderRes = errors.New("TranscoderInvalidResolution")
var ErrTranscoderHw = errors.New("TranscoderInvalidHardware")
var ErrTranscoderInp = errors.New("TranscoderInvalidInput")
var ErrTranscoderStp = errors.New("TranscoderStopped")

type Acceleration int

Expand All @@ -33,6 +35,12 @@ type ComponentOptions struct {
Opts map[string]string
}

type Transcoder struct {
handle *C.struct_transcode_thread
stopped bool
yondonfu marked this conversation as resolved.
Show resolved Hide resolved
mu *sync.Mutex
}

type TranscodeOptionsIn struct {
Fname string
Accel Acceleration
Expand Down Expand Up @@ -155,6 +163,17 @@ func Transcode2(input *TranscodeOptionsIn, ps []TranscodeOptions) error {
}

func Transcode3(input *TranscodeOptionsIn, ps []TranscodeOptions) (*TranscodeResults, error) {
t := NewTranscoder()
defer t.StopTranscoder()
return t.Transcode(input, ps)
}

func (t *Transcoder) Transcode(input *TranscodeOptionsIn, ps []TranscodeOptions) (*TranscodeResults, error) {
t.mu.Lock()
defer t.mu.Unlock()
if t.stopped || t.handle == nil {
return nil, ErrTranscoderStp
}
if input == nil {
return nil, ErrTranscoderInp
}
Expand Down Expand Up @@ -198,7 +217,7 @@ func Transcode3(input *TranscodeOptionsIn, ps []TranscodeOptions) (*TranscodeRes
filters += fmt.Sprintf("%s='w=if(gte(iw,ih),%d,-2):h=if(lt(iw,ih),%d,-2)'", scale_filter, w, h)
if input.Accel != Software && p.Accel == Software {
// needed for hw dec -> hw rescale -> sw enc
filters = filters + ":format=yuv420p,hwdownload"
filters = filters + ",hwdownload,format=nv12"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the change from converting to yuv420p before hwdownload to converting to nv12 after hwdownload?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Old way was actually broken - it only worked with the npp scaler which we phased out in favor of CUDA before the original GPU PR was even merged. See livepeer/go-livepeer#951

This seems to work better now that we're using cuvid as a decoder, but I have to check that it actually fixes livepeer/go-livepeer#951

}
muxOpts := C.component_opts{
opts: newAVOpts(p.Muxer.Opts), // don't free this bc of avformat_write_header API
Expand All @@ -207,6 +226,12 @@ func Transcode3(input *TranscodeOptionsIn, ps []TranscodeOptions) (*TranscodeRes
muxOpts.name = C.CString(p.Muxer.Name)
defer C.free(unsafe.Pointer(muxOpts.name))
}
// Set some default encoding options
if len(p.VideoEncoder.Name) <= 0 && len(p.VideoEncoder.Opts) <= 0 {
p.VideoEncoder.Opts = map[string]string{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of curiosity, in what scenarios wouldn't we want to enforce IDR frames or closed GOPs?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't set it if the user passes in their own options. Practical scenarios for open GOP and non-intra I frames include non-segmented VOD or real time streaming where you want to take advantage of a larger range of pictures for the bitrate savings.

On checking again, nvenc doesn't actually the cgop option, so it's useless for us now. Will remove. It was a leftover from when trying to kludge x264 into doing persistent transcode sessions. We aren't anymore, so this option is no longer needed.

The forced-idr option might not be needed as well...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So forced-idr is in fact needed here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes the force-idr option is required - without it, we get decoder errors at the beginning of playback because SPS/PPS extradata isn't always is emitted for the first frame of new segments, which we signal via an explicit keyframe. This option forces that.

Ideally this option wouldn't be needed at all; ffmpeg itself should but smart enough to emit extradata for the first frame after a flush; I may add that in.

Also just checked and it seems that disabling this option doesn't lead to any unit test failures - we really should capture this behavior in tests. Will make a note to correct that.

"forced-idr": "1",
}
}
vidOpts := C.component_opts{
name: C.CString(encoder),
opts: newAVOpts(p.VideoEncoder.Opts),
Expand All @@ -230,13 +255,28 @@ func Transcode3(input *TranscodeOptionsIn, ps []TranscodeOptions) (*TranscodeRes
params[i] = C.output_params{fname: oname, fps: fps,
w: C.int(w), h: C.int(h), bitrate: C.int(bitrate),
muxer: muxOpts, audio: audioOpts, video: vidOpts, vfilters: vfilt}
defer func(param *C.output_params) {
// Work around the ownership rules:
// ffmpeg normally takes ownership of the following AVDictionary options
// However, if we don't pass these opts to ffmpeg, then we need to free
if param.muxer.opts != nil {
C.av_dict_free(&param.muxer.opts)
}
if param.audio.opts != nil {
C.av_dict_free(&param.audio.opts)
}
if param.video.opts != nil {
C.av_dict_free(&param.video.opts)
}
}(&params[i])
}
var device *C.char
if input.Device != "" {
device = C.CString(input.Device)
defer C.free(unsafe.Pointer(device))
}
inp := &C.input_params{fname: fname, hw_type: hw_type, device: device}
inp := &C.input_params{fname: fname, hw_type: hw_type, device: device,
handle: t.handle}
results := make([]C.output_results, len(ps))
decoded := &C.output_results{}
var (
Expand All @@ -249,7 +289,7 @@ func Transcode3(input *TranscodeOptionsIn, ps []TranscodeOptions) (*TranscodeRes
}
ret := int(C.lpms_transcode(inp, paramsPointer, resultsPointer, C.int(len(params)), decoded))
if 0 != ret {
glog.Infof("Transcoder Return : %v\n", Strerror(ret))
glog.Error("Transcoder Return : ", ErrorMap[ret])
return nil, ErrorMap[ret]
}
tr := make([]MediaInfo, len(ps))
Expand All @@ -266,6 +306,24 @@ func Transcode3(input *TranscodeOptionsIn, ps []TranscodeOptions) (*TranscodeRes
return &TranscodeResults{Encoded: tr, Decoded: dec}, nil
}

func NewTranscoder() *Transcoder {
return &Transcoder{
handle: C.lpms_transcode_new(),
mu: &sync.Mutex{},
}
}

func (t *Transcoder) StopTranscoder() {
t.mu.Lock()
defer t.mu.Unlock()
if t.stopped {
return
}
C.lpms_transcode_stop(t.handle)
t.handle = nil // prevent accidental reuse
t.stopped = true
}

func InitFFmpeg() {
C.lpms_init()
}
1 change: 1 addition & 0 deletions ffmpeg/ffmpeg_errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ func error_map() map[int]error {
}{
{code: C.lpms_ERR_INPUT_PIXFMT, desc: "Unsupported input pixel format"},
{code: C.lpms_ERR_FILTERS, desc: "Error initializing filtergraph"},
{code: C.lpms_ERR_OUTPUTS, desc: "Too many outputs"},
}
for _, v := range lpmsErrors {
m[int(v.code)] = errors.New(v.desc)
Expand Down
Loading