Skip to content

Commit

Permalink
Merge branch 'master' into ai-video
Browse files Browse the repository at this point in the history
  • Loading branch information
rickstaa committed Sep 23, 2024
2 parents 71a2dcf + 81bc9e5 commit 9c20ce8
Show file tree
Hide file tree
Showing 35 changed files with 573 additions and 286 deletions.
1 change: 1 addition & 0 deletions .github/workflows/build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,7 @@ jobs:

upload:
name: Upload artifacts to google bucket
if: github.event_name == 'push' || github.event.pull_request.head.repo.full_name == github.repository
permissions:
contents: "read"
id-token: "write"
Expand Down
18 changes: 18 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,23 @@
# Changelog

## v0.7.9

- [#3165](https://github.com/livepeer/go-livepeer/pull/3165) Add node version and orch addr to transcoded metadata

### Features ⚒

#### Broadcaster

- [#3158](https://github.com/livepeer/go-livepeer/pull/3158) Add a metric tag for Orchestrator version

### Bug Fixes 🐞

#### Broadcaster

- [#3164](https://github.com/livepeer/go-livepeer/pull/3164) Fix media compatibility check
- [#3166](https://github.com/livepeer/go-livepeer/pull/3166) Clean up inactive sessions
- [#3086](https://github.com/livepeer/go-livepeer/pull/3086) Clear known sessions with inadequate latency scores

## v0.7.8

### Features ⚒
Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.7.8-ai.2
0.7.9-ai.1
1 change: 1 addition & 0 deletions cmd/livepeer/livepeer.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ func parseLivepeerConfig() starter.LivepeerConfig {
cfg.MaxPricePerCapability = flag.String("maxPricePerCapability", *cfg.MaxPricePerCapability, `json list of prices per capability/model or path to json config file. Use "model_id": "default" to price all models in a pipeline the same. Example: {"capabilities_prices": [{"pipeline": "text-to-image", "model_id": "stabilityai/sd-turbo", "price_per_unit": 1000, "pixels_per_unit": 1}, {"pipeline": "upscale", "model_id": "default", price_per_unit": 1200, "pixels_per_unit": 1}]}`)
cfg.IgnoreMaxPriceIfNeeded = flag.Bool("ignoreMaxPriceIfNeeded", *cfg.IgnoreMaxPriceIfNeeded, "Set to true to allow exceeding max price condition if there is no O that meets this requirement")
cfg.MinPerfScore = flag.Float64("minPerfScore", *cfg.MinPerfScore, "The minimum orchestrator's performance score a broadcaster is willing to accept")
cfg.DiscoveryTimeout = flag.Duration("discoveryTimeout", *cfg.DiscoveryTimeout, "Time to wait for orchestrators to return info to be included in transcoding sessions for manifest (default = 500ms)")

// Transcoding:
cfg.Orchestrator = flag.Bool("orchestrator", *cfg.Orchestrator, "Set to true to be an orchestrator")
Expand Down
15 changes: 12 additions & 3 deletions cmd/livepeer/starter/starter.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ type LivepeerConfig struct {
MaxPricePerCapability *string
IgnoreMaxPriceIfNeeded *bool
MinPerfScore *float64
DiscoveryTimeout *time.Duration
MaxSessions *string
CurrentManifest *bool
Nvidia *string
Expand Down Expand Up @@ -189,6 +190,7 @@ func DefaultLivepeerConfig() LivepeerConfig {
defaultOrchPerfStatsURL := ""
defaultRegion := ""
defaultMinPerfScore := 0.0
defaultDiscoveryTimeout := 500 * time.Millisecond
defaultCurrentManifest := false
defaultNvidia := ""
defaultNetint := ""
Expand Down Expand Up @@ -287,6 +289,7 @@ func DefaultLivepeerConfig() LivepeerConfig {
OrchPerfStatsURL: &defaultOrchPerfStatsURL,
Region: &defaultRegion,
MinPerfScore: &defaultMinPerfScore,
DiscoveryTimeout: &defaultDiscoveryTimeout,
CurrentManifest: &defaultCurrentManifest,
Nvidia: &defaultNvidia,
Netint: &defaultNetint,
Expand Down Expand Up @@ -877,6 +880,7 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) {
glog.Errorf("Error setting up orchestrator: %v", err)
return
}
n.RecipientAddr = recipientAddr.Hex()

sigVerifier := &pm.DefaultSigVerifier{}
validator := pm.NewValidator(sigVerifier, timeWatcher)
Expand Down Expand Up @@ -1419,7 +1423,7 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) {
if *cfg.Network != "offchain" {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
dbOrchPoolCache, err := discovery.NewDBOrchestratorPoolCache(ctx, n, timeWatcher, orchBlacklist)
dbOrchPoolCache, err := discovery.NewDBOrchestratorPoolCache(ctx, n, timeWatcher, orchBlacklist, *cfg.DiscoveryTimeout)
if err != nil {
exit("Could not create orchestrator pool with DB cache: %v", err)
}
Expand All @@ -1434,9 +1438,9 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) {
glog.Exit("Error setting orch webhook URL ", err)
}
glog.Info("Using orchestrator webhook URL ", whurl)
n.OrchestratorPool = discovery.NewWebhookPool(bcast, whurl)
n.OrchestratorPool = discovery.NewWebhookPool(bcast, whurl, *cfg.DiscoveryTimeout)
} else if len(orchURLs) > 0 {
n.OrchestratorPool = discovery.NewOrchestratorPool(bcast, orchURLs, common.Score_Trusted, orchBlacklist)
n.OrchestratorPool = discovery.NewOrchestratorPool(bcast, orchURLs, common.Score_Trusted, orchBlacklist, *cfg.DiscoveryTimeout)
}

if n.OrchestratorPool == nil {
Expand Down Expand Up @@ -1496,6 +1500,11 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) {
if err != nil {
glog.Exit("Error getting service URI: ", err)
}

if *cfg.Network != "offchain" && !common.ValidateServiceURI(suri) {
glog.Warning("**Warning -serviceAddr is a not a public address or hostname; this is not recommended for onchain networks**")
}

n.SetServiceURI(suri)
// if http addr is not provided, listen to all ifaces
// take the port to listen to from the service URI
Expand Down
6 changes: 6 additions & 0 deletions common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"math/big"
"math/rand"
"mime"
"net/url"
"regexp"
"sort"
"strconv"
Expand Down Expand Up @@ -565,3 +566,8 @@ func CalculateAudioDuration(audio types.File) (int64, error) {

return duration, nil
}

// ValidateServiceURI checks if the serviceURI is valid.
func ValidateServiceURI(serviceURI *url.URL) bool {
return !strings.Contains(serviceURI.Host, "0.0.0.0")
}
36 changes: 36 additions & 0 deletions common/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"math"
"math/big"
"net/url"
"strconv"
"strings"
"testing"
Expand Down Expand Up @@ -483,3 +484,38 @@ func TestParseAccelDevices_CustomSelection(t *testing.T) {
assert.Equal(ids[1], "3")
assert.Equal(ids[2], "1")
}
func TestValidateServiceURI(t *testing.T) {
// Valid service URIs
validURIs := []string{
"https://8.8.8.8:8935",
"https://127.0.0.1:8935",
}

for _, uri := range validURIs {
serviceURI, err := url.Parse(uri)
if err != nil {
t.Errorf("Failed to parse valid service URI: %v", err)
}

if !ValidateServiceURI(serviceURI) {
t.Errorf("Expected service URI to be valid, but got invalid: %v", uri)
}
}

// Invalid service URIs
invalidURIs := []string{
"http://0.0.0.0",
"https://0.0.0.0",
}

for _, uri := range invalidURIs {
serviceURI, err := url.Parse(uri)
if err != nil {
t.Errorf("Failed to parse invalid service URI: %v", err)
}

if ValidateServiceURI(serviceURI) {
t.Errorf("Expected service URI to be invalid, but got valid: %v", uri)
}
}
}
1 change: 1 addition & 0 deletions core/livepeernode.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ type LivepeerNode struct {
// Transcoder public fields
SegmentChans map[ManifestID]SegmentChan
Recipient pm.Recipient
RecipientAddr string
SelectionAlgorithm common.SelectionAlgorithm
OrchestratorPool common.OrchestratorPool
OrchPerfScore *common.PerfScore
Expand Down
12 changes: 12 additions & 0 deletions core/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -782,6 +782,17 @@ func (n *LivepeerNode) transcodeSeg(ctx context.Context, config transcodeConfig,
}
md.Fname = url

orchId := "offchain"
if n.RecipientAddr != "" {
orchId = n.RecipientAddr
}
if isRemote {
// huge hack to thread the orch id down to the transcoder
md.Metadata = map[string]string{"orchId": orchId}
} else {
md.Metadata = MakeMetadata(orchId)
}

//Do the transcoding
start := time.Now()
tData, err := transcoder.Transcode(ctx, md)
Expand Down Expand Up @@ -1116,6 +1127,7 @@ func (rt *RemoteTranscoder) Transcode(logCtx context.Context, md *SegTranscoding
msg := &net.NotifySegment{
Url: fname,
TaskId: taskID,
OrchId: md.Metadata["orchId"],
SegData: segData,
// Triggers failure on Os that don't know how to use SegData
Profiles: []byte("invalid"),
Expand Down
9 changes: 9 additions & 0 deletions core/streamdata.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ type SegTranscodingMetadata struct {
AuthToken *net.AuthToken
CalcPerceptualHash bool
SegmentParameters *SegmentParameters
Metadata map[string]string
}

func (md *SegTranscodingMetadata) Flatten() []byte {
Expand Down Expand Up @@ -193,3 +194,11 @@ func (id StreamID) String() string {
func RandomManifestID() ManifestID {
return ManifestID(common.RandomIDGenerator(DefaultManifestIDLength))
}

func MakeMetadata(id string) map[string]string {
s := fmt.Sprintf("Livepeer Transcoder %s (%s)", LivepeerVersion, id)
return map[string]string{
"service_provider": s, // for mpegts
"comment": "Processed by " + s, // for mp4
}
}
16 changes: 11 additions & 5 deletions core/transcoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func (lt *LocalTranscoder) Transcode(ctx context.Context, md *SegTranscodingMeta
Profile: md.ProfileIn,
}
profiles := md.Profiles
opts := profilesToTranscodeOptions(lt.workDir, ffmpeg.Software, profiles, md.CalcPerceptualHash, md.SegmentParameters)
opts := profilesToTranscodeOptions(lt.workDir, ffmpeg.Software, md)

_, seqNo, parseErr := parseURI(md.Fname)
start := time.Now()
Expand Down Expand Up @@ -101,7 +101,7 @@ func (nv *NetintTranscoder) Transcode(ctx context.Context, md *SegTranscodingMet
Device: nv.device,
}
profiles := md.Profiles
out := profilesToTranscodeOptions(WorkDir, ffmpeg.Netint, profiles, md.CalcPerceptualHash, md.SegmentParameters)
out := profilesToTranscodeOptions(WorkDir, ffmpeg.Netint, md)

_, seqNo, parseErr := parseURI(md.Fname)
start := time.Now()
Expand Down Expand Up @@ -143,7 +143,7 @@ func (nv *NvidiaTranscoder) Transcode(ctx context.Context, md *SegTranscodingMet
Profile: md.ProfileIn,
}
profiles := md.Profiles
out := profilesToTranscodeOptions(WorkDir, ffmpeg.Nvidia, profiles, md.CalcPerceptualHash, md.SegmentParameters)
out := profilesToTranscodeOptions(WorkDir, ffmpeg.Nvidia, md)

_, seqNo, parseErr := parseURI(md.Fname)
start := time.Now()
Expand Down Expand Up @@ -437,8 +437,13 @@ func resToTranscodeData(ctx context.Context, res *ffmpeg.TranscodeResults, opts
}, nil
}

func profilesToTranscodeOptions(workDir string, accel ffmpeg.Acceleration, profiles []ffmpeg.VideoProfile, calcPHash bool,
segPar *SegmentParameters) []ffmpeg.TranscodeOptions {
func profilesToTranscodeOptions(workDir string, accel ffmpeg.Acceleration, md *SegTranscodingMetadata) []ffmpeg.TranscodeOptions {
var (
profiles []ffmpeg.VideoProfile = md.Profiles
calcPHash bool = md.CalcPerceptualHash
segPar *SegmentParameters = md.SegmentParameters
metadata map[string]string = md.Metadata
)

opts := make([]ffmpeg.TranscodeOptions, len(profiles))
for i := range profiles {
Expand All @@ -448,6 +453,7 @@ func profilesToTranscodeOptions(workDir string, accel ffmpeg.Acceleration, profi
Accel: accel,
AudioEncoder: ffmpeg.ComponentOptions{Name: "copy"},
CalcSign: calcPHash,
Metadata: metadata,
}
if segPar != nil && segPar.Clip != nil {
o.From = segPar.Clip.From
Expand Down
23 changes: 18 additions & 5 deletions core/transcoder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,14 +178,24 @@ func TestProfilesToTranscodeOptions(t *testing.T) {
}
defer func() { common.RandomIDGenerator = oldRandIDFunc }()

makeMeta := func(p []ffmpeg.VideoProfile, c bool) *SegTranscodingMetadata {
return &SegTranscodingMetadata{
Profiles: p,
CalcPerceptualHash: c,
Metadata: map[string]string{
"meta": "data",
},
}
}

// Test 0 profiles
profiles := []ffmpeg.VideoProfile{}
opts := profilesToTranscodeOptions(workDir, ffmpeg.Software, profiles, false, nil)
opts := profilesToTranscodeOptions(workDir, ffmpeg.Software, makeMeta(profiles, false))
assert.Equal(0, len(opts))

// Test 1 profile
profiles = []ffmpeg.VideoProfile{ffmpeg.P144p30fps16x9}
opts = profilesToTranscodeOptions(workDir, ffmpeg.Software, profiles, false, nil)
opts = profilesToTranscodeOptions(workDir, ffmpeg.Software, makeMeta(profiles, false))
assert.Equal(1, len(opts))
assert.Equal("foo/out_bar.tempfile", opts[0].Oname)
assert.Equal(ffmpeg.Software, opts[0].Accel)
Expand All @@ -194,22 +204,25 @@ func TestProfilesToTranscodeOptions(t *testing.T) {

// Test > 1 profile
profiles = []ffmpeg.VideoProfile{ffmpeg.P144p30fps16x9, ffmpeg.P240p30fps16x9}
opts = profilesToTranscodeOptions(workDir, ffmpeg.Software, profiles, false, nil)
opts = profilesToTranscodeOptions(workDir, ffmpeg.Software, makeMeta(profiles, false))
assert.Equal(2, len(opts))

for i, p := range profiles {
assert.Equal("foo/out_bar.tempfile", opts[i].Oname)
assert.Equal(ffmpeg.Software, opts[i].Accel)
assert.Equal(p, opts[i].Profile)
assert.Equal("copy", opts[i].AudioEncoder.Name)
assert.Equal(opts[i].Metadata, map[string]string{
"meta": "data",
})
}

// Test different acceleration value
opts = profilesToTranscodeOptions(workDir, ffmpeg.Nvidia, profiles, false, nil)
opts = profilesToTranscodeOptions(workDir, ffmpeg.Nvidia, makeMeta(profiles, false))
assert.Equal(2, len(opts))

// Test signature calculation
opts = profilesToTranscodeOptions(workDir, ffmpeg.Nvidia, profiles, true, nil)
opts = profilesToTranscodeOptions(workDir, ffmpeg.Nvidia, makeMeta(profiles, true))
assert.True(opts[0].CalcSign)
assert.True(opts[1].CalcSign)

Expand Down
12 changes: 8 additions & 4 deletions discovery/db_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,10 @@ type DBOrchestratorPoolCache struct {
rm common.RoundsManager
bcast common.Broadcaster
orchBlacklist []string
discoveryTimeout time.Duration
}

func NewDBOrchestratorPoolCache(ctx context.Context, node *core.LivepeerNode, rm common.RoundsManager, orchBlacklist []string) (*DBOrchestratorPoolCache, error) {
func NewDBOrchestratorPoolCache(ctx context.Context, node *core.LivepeerNode, rm common.RoundsManager, orchBlacklist []string, discoveryTimeout time.Duration) (*DBOrchestratorPoolCache, error) {
if node.Eth == nil {
return nil, fmt.Errorf("could not create DBOrchestratorPoolCache: LivepeerEthClient is nil")
}
Expand All @@ -50,6 +51,7 @@ func NewDBOrchestratorPoolCache(ctx context.Context, node *core.LivepeerNode, rm
rm: rm,
bcast: core.NewBroadcaster(node),
orchBlacklist: orchBlacklist,
discoveryTimeout: discoveryTimeout,
}

if err := dbo.cacheTranscoderPool(); err != nil {
Expand Down Expand Up @@ -135,7 +137,7 @@ func (dbo *DBOrchestratorPoolCache) GetOrchestrators(ctx context.Context, numOrc
return true
}

orchPool := NewOrchestratorPoolWithPred(dbo.bcast, uris, pred, common.Score_Untrusted, dbo.orchBlacklist)
orchPool := NewOrchestratorPoolWithPred(dbo.bcast, uris, pred, common.Score_Untrusted, dbo.orchBlacklist, dbo.discoveryTimeout)
orchInfos, err := orchPool.GetOrchestrators(ctx, numOrchestrators, suspender, caps, scorePred)
if err != nil || len(orchInfos) <= 0 {
return nil, err
Expand Down Expand Up @@ -187,7 +189,8 @@ func (dbo *DBOrchestratorPoolCache) cacheOrchestratorStake() error {
}

resc, errc := make(chan *common.DBOrch, len(orchs)), make(chan error, len(orchs))
ctx, cancel := context.WithTimeout(context.Background(), getOrchestratorsTimeoutLoop)
timeout := getOrchestratorTimeoutLoop //needs to be same or longer than GRPCConnectTimeout in server/rpc.go
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

currentRound := dbo.rm.LastInitializedRound()
Expand Down Expand Up @@ -263,7 +266,8 @@ func (dbo *DBOrchestratorPoolCache) cacheDBOrchs() error {
}

resc, errc := make(chan *common.DBOrch, len(orchs)), make(chan error, len(orchs))
ctx, cancel := context.WithTimeout(context.Background(), getOrchestratorsTimeoutLoop)
timeout := getOrchestratorTimeoutLoop //needs to be same or longer than GRPCConnectTimeout in server/rpc.go
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

getOrchInfo := func(dbOrch *common.DBOrch) {
Expand Down
Loading

0 comments on commit 9c20ce8

Please sign in to comment.