Skip to content

Commit

Permalink
feat: enable changing GetOrchestrator discovery timeout (#3150)
Browse files Browse the repository at this point in the history
This commit introduces a mechanism for gateways to configure the timeout during the discovery process. It adds the `DiscoveryTimeout` CLI argument, with a default value of 500ms, consistent with the previous codebase.
  • Loading branch information
ad-astra-video authored Sep 19, 2024
1 parent 56a8a88 commit 81bc9e5
Show file tree
Hide file tree
Showing 6 changed files with 92 additions and 62 deletions.
1 change: 1 addition & 0 deletions cmd/livepeer/livepeer.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ func parseLivepeerConfig() starter.LivepeerConfig {
cfg.MaxPricePerUnit = flag.String("maxPricePerUnit", *cfg.MaxPricePerUnit, "The maximum transcoding price per 'pixelsPerUnit' a broadcaster is willing to accept. If not set explicitly, broadcaster is willing to accept ANY price. Can be specified in wei or a custom currency in the format <price><currency> (e.g. 0.50USD). When using a custom currency, a corresponding price feed must be configured with -priceFeedAddr")
cfg.IgnoreMaxPriceIfNeeded = flag.Bool("ignoreMaxPriceIfNeeded", *cfg.IgnoreMaxPriceIfNeeded, "Set to true to allow exceeding max price condition if there is no O that meets this requirement")
cfg.MinPerfScore = flag.Float64("minPerfScore", *cfg.MinPerfScore, "The minimum orchestrator's performance score a broadcaster is willing to accept")
cfg.DiscoveryTimeout = flag.Duration("discoveryTimeout", *cfg.DiscoveryTimeout, "Time to wait for orchestrators to return info to be included in transcoding sessions for manifest (default = 500ms)")

// Transcoding:
cfg.Orchestrator = flag.Bool("orchestrator", *cfg.Orchestrator, "Set to true to be an orchestrator")
Expand Down
9 changes: 6 additions & 3 deletions cmd/livepeer/starter/starter.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ type LivepeerConfig struct {
MaxPricePerUnit *string
IgnoreMaxPriceIfNeeded *bool
MinPerfScore *float64
DiscoveryTimeout *time.Duration
MaxSessions *string
CurrentManifest *bool
Nvidia *string
Expand Down Expand Up @@ -180,6 +181,7 @@ func DefaultLivepeerConfig() LivepeerConfig {
defaultOrchPerfStatsURL := ""
defaultRegion := ""
defaultMinPerfScore := 0.0
defaultDiscoveryTimeout := 500 * time.Millisecond
defaultCurrentManifest := false
defaultNvidia := ""
defaultNetint := ""
Expand Down Expand Up @@ -271,6 +273,7 @@ func DefaultLivepeerConfig() LivepeerConfig {
OrchPerfStatsURL: &defaultOrchPerfStatsURL,
Region: &defaultRegion,
MinPerfScore: &defaultMinPerfScore,
DiscoveryTimeout: &defaultDiscoveryTimeout,
CurrentManifest: &defaultCurrentManifest,
Nvidia: &defaultNvidia,
Netint: &defaultNetint,
Expand Down Expand Up @@ -1123,7 +1126,7 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) {
if *cfg.Network != "offchain" {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
dbOrchPoolCache, err := discovery.NewDBOrchestratorPoolCache(ctx, n, timeWatcher, orchBlacklist)
dbOrchPoolCache, err := discovery.NewDBOrchestratorPoolCache(ctx, n, timeWatcher, orchBlacklist, *cfg.DiscoveryTimeout)
if err != nil {
exit("Could not create orchestrator pool with DB cache: %v", err)
}
Expand All @@ -1138,9 +1141,9 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) {
glog.Exit("Error setting orch webhook URL ", err)
}
glog.Info("Using orchestrator webhook URL ", whurl)
n.OrchestratorPool = discovery.NewWebhookPool(bcast, whurl)
n.OrchestratorPool = discovery.NewWebhookPool(bcast, whurl, *cfg.DiscoveryTimeout)
} else if len(orchURLs) > 0 {
n.OrchestratorPool = discovery.NewOrchestratorPool(bcast, orchURLs, common.Score_Trusted, orchBlacklist)
n.OrchestratorPool = discovery.NewOrchestratorPool(bcast, orchURLs, common.Score_Trusted, orchBlacklist, *cfg.DiscoveryTimeout)
}

if n.OrchestratorPool == nil {
Expand Down
12 changes: 8 additions & 4 deletions discovery/db_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,10 @@ type DBOrchestratorPoolCache struct {
rm common.RoundsManager
bcast common.Broadcaster
orchBlacklist []string
discoveryTimeout time.Duration
}

func NewDBOrchestratorPoolCache(ctx context.Context, node *core.LivepeerNode, rm common.RoundsManager, orchBlacklist []string) (*DBOrchestratorPoolCache, error) {
func NewDBOrchestratorPoolCache(ctx context.Context, node *core.LivepeerNode, rm common.RoundsManager, orchBlacklist []string, discoveryTimeout time.Duration) (*DBOrchestratorPoolCache, error) {
if node.Eth == nil {
return nil, fmt.Errorf("could not create DBOrchestratorPoolCache: LivepeerEthClient is nil")
}
Expand All @@ -50,6 +51,7 @@ func NewDBOrchestratorPoolCache(ctx context.Context, node *core.LivepeerNode, rm
rm: rm,
bcast: core.NewBroadcaster(node),
orchBlacklist: orchBlacklist,
discoveryTimeout: discoveryTimeout,
}

if err := dbo.cacheTranscoderPool(); err != nil {
Expand Down Expand Up @@ -135,7 +137,7 @@ func (dbo *DBOrchestratorPoolCache) GetOrchestrators(ctx context.Context, numOrc
return true
}

orchPool := NewOrchestratorPoolWithPred(dbo.bcast, uris, pred, common.Score_Untrusted, dbo.orchBlacklist)
orchPool := NewOrchestratorPoolWithPred(dbo.bcast, uris, pred, common.Score_Untrusted, dbo.orchBlacklist, dbo.discoveryTimeout)
orchInfos, err := orchPool.GetOrchestrators(ctx, numOrchestrators, suspender, caps, scorePred)
if err != nil || len(orchInfos) <= 0 {
return nil, err
Expand Down Expand Up @@ -187,7 +189,8 @@ func (dbo *DBOrchestratorPoolCache) cacheOrchestratorStake() error {
}

resc, errc := make(chan *common.DBOrch, len(orchs)), make(chan error, len(orchs))
ctx, cancel := context.WithTimeout(context.Background(), getOrchestratorsTimeoutLoop)
timeout := getOrchestratorTimeoutLoop //needs to be same or longer than GRPCConnectTimeout in server/rpc.go
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

currentRound := dbo.rm.LastInitializedRound()
Expand Down Expand Up @@ -263,7 +266,8 @@ func (dbo *DBOrchestratorPoolCache) cacheDBOrchs() error {
}

resc, errc := make(chan *common.DBOrch, len(orchs)), make(chan error, len(orchs))
ctx, cancel := context.WithTimeout(context.Background(), getOrchestratorsTimeoutLoop)
timeout := getOrchestratorTimeoutLoop //needs to be same or longer than GRPCConnectTimeout in server/rpc.go
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

getOrchInfo := func(dbOrch *common.DBOrch) {
Expand Down
22 changes: 11 additions & 11 deletions discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,20 @@ import (
"github.com/golang/glog"
)

var getOrchestratorsTimeoutLoop = 3 * time.Second
var getOrchestratorsCutoffTimeout = 500 * time.Millisecond
var getOrchestratorTimeoutLoop = 3 * time.Second
var maxGetOrchestratorCutoffTimeout = 6 * time.Second

var serverGetOrchInfo = server.GetOrchestratorInfo

type orchestratorPool struct {
infos []common.OrchestratorLocalInfo
pred func(info *net.OrchestratorInfo) bool
bcast common.Broadcaster
orchBlacklist []string
infos []common.OrchestratorLocalInfo
pred func(info *net.OrchestratorInfo) bool
bcast common.Broadcaster
orchBlacklist []string
discoveryTimeout time.Duration
}

func NewOrchestratorPool(bcast common.Broadcaster, uris []*url.URL, score float32, orchBlacklist []string) *orchestratorPool {
func NewOrchestratorPool(bcast common.Broadcaster, uris []*url.URL, score float32, orchBlacklist []string, discoveryTimeout time.Duration) *orchestratorPool {
if len(uris) <= 0 {
// Should we return here?
glog.Error("Orchestrator pool does not have any URIs")
Expand All @@ -42,13 +42,13 @@ func NewOrchestratorPool(bcast common.Broadcaster, uris []*url.URL, score float3
for _, uri := range uris {
infos = append(infos, common.OrchestratorLocalInfo{URL: uri, Score: score})
}
return &orchestratorPool{infos: infos, bcast: bcast, orchBlacklist: orchBlacklist}
return &orchestratorPool{infos: infos, bcast: bcast, orchBlacklist: orchBlacklist, discoveryTimeout: discoveryTimeout}
}

func NewOrchestratorPoolWithPred(bcast common.Broadcaster, addresses []*url.URL,
pred func(*net.OrchestratorInfo) bool, score float32, orchBlacklist []string) *orchestratorPool {
pred func(*net.OrchestratorInfo) bool, score float32, orchBlacklist []string, discoveryTimeout time.Duration) *orchestratorPool {

pool := NewOrchestratorPool(bcast, addresses, score, orchBlacklist)
pool := NewOrchestratorPool(bcast, addresses, score, orchBlacklist, discoveryTimeout)
pool.pred = pred
return pool
}
Expand Down Expand Up @@ -136,7 +136,7 @@ func (o *orchestratorPool) GetOrchestrators(ctx context.Context, numOrchestrator
}

// try to wait for orchestrators until at least 1 is found (with the exponential backoff timout)
timeout := getOrchestratorsCutoffTimeout
timeout := o.discoveryTimeout
timer := time.NewTimer(timeout)

for nbResp < numAvailableOrchs && len(ods) < numOrchestrators && !timedOut {
Expand Down
Loading

0 comments on commit 81bc9e5

Please sign in to comment.