From 81bc9e505a5675b3672d5bf294a1b07b4cc1ce2b Mon Sep 17 00:00:00 2001 From: ad-astra-video <99882368+ad-astra-video@users.noreply.github.com> Date: Thu, 19 Sep 2024 15:35:09 -0500 Subject: [PATCH] feat: enable changing GetOrchestrator discovery timeout (#3150) This commit introduces a mechanism for gateways to configure the timeout during the discovery process. It adds the `DiscoveryTimeout` CLI argument, with a default value of 500ms, consistent with the previous codebase. --- cmd/livepeer/livepeer.go | 1 + cmd/livepeer/starter/starter.go | 9 ++-- discovery/db_discovery.go | 12 +++-- discovery/discovery.go | 22 ++++----- discovery/discovery_test.go | 86 ++++++++++++++++++++------------- discovery/wh_discovery.go | 24 ++++----- 6 files changed, 92 insertions(+), 62 deletions(-) diff --git a/cmd/livepeer/livepeer.go b/cmd/livepeer/livepeer.go index d28b579a32..446bbe5f20 100755 --- a/cmd/livepeer/livepeer.go +++ b/cmd/livepeer/livepeer.go @@ -137,6 +137,7 @@ func parseLivepeerConfig() starter.LivepeerConfig { cfg.MaxPricePerUnit = flag.String("maxPricePerUnit", *cfg.MaxPricePerUnit, "The maximum transcoding price per 'pixelsPerUnit' a broadcaster is willing to accept. If not set explicitly, broadcaster is willing to accept ANY price. Can be specified in wei or a custom currency in the format (e.g. 0.50USD). When using a custom currency, a corresponding price feed must be configured with -priceFeedAddr") cfg.IgnoreMaxPriceIfNeeded = flag.Bool("ignoreMaxPriceIfNeeded", *cfg.IgnoreMaxPriceIfNeeded, "Set to true to allow exceeding max price condition if there is no O that meets this requirement") cfg.MinPerfScore = flag.Float64("minPerfScore", *cfg.MinPerfScore, "The minimum orchestrator's performance score a broadcaster is willing to accept") + cfg.DiscoveryTimeout = flag.Duration("discoveryTimeout", *cfg.DiscoveryTimeout, "Time to wait for orchestrators to return info to be included in transcoding sessions for manifest (default = 500ms)") // Transcoding: cfg.Orchestrator = flag.Bool("orchestrator", *cfg.Orchestrator, "Set to true to be an orchestrator") diff --git a/cmd/livepeer/starter/starter.go b/cmd/livepeer/starter/starter.go index 23a7be0990..92ba9079bb 100755 --- a/cmd/livepeer/starter/starter.go +++ b/cmd/livepeer/starter/starter.go @@ -101,6 +101,7 @@ type LivepeerConfig struct { MaxPricePerUnit *string IgnoreMaxPriceIfNeeded *bool MinPerfScore *float64 + DiscoveryTimeout *time.Duration MaxSessions *string CurrentManifest *bool Nvidia *string @@ -180,6 +181,7 @@ func DefaultLivepeerConfig() LivepeerConfig { defaultOrchPerfStatsURL := "" defaultRegion := "" defaultMinPerfScore := 0.0 + defaultDiscoveryTimeout := 500 * time.Millisecond defaultCurrentManifest := false defaultNvidia := "" defaultNetint := "" @@ -271,6 +273,7 @@ func DefaultLivepeerConfig() LivepeerConfig { OrchPerfStatsURL: &defaultOrchPerfStatsURL, Region: &defaultRegion, MinPerfScore: &defaultMinPerfScore, + DiscoveryTimeout: &defaultDiscoveryTimeout, CurrentManifest: &defaultCurrentManifest, Nvidia: &defaultNvidia, Netint: &defaultNetint, @@ -1123,7 +1126,7 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) { if *cfg.Network != "offchain" { ctx, cancel := context.WithCancel(ctx) defer cancel() - dbOrchPoolCache, err := discovery.NewDBOrchestratorPoolCache(ctx, n, timeWatcher, orchBlacklist) + dbOrchPoolCache, err := discovery.NewDBOrchestratorPoolCache(ctx, n, timeWatcher, orchBlacklist, *cfg.DiscoveryTimeout) if err != nil { exit("Could not create orchestrator pool with DB cache: %v", err) } @@ -1138,9 +1141,9 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) { glog.Exit("Error setting orch webhook URL ", err) } glog.Info("Using orchestrator webhook URL ", whurl) - n.OrchestratorPool = discovery.NewWebhookPool(bcast, whurl) + n.OrchestratorPool = discovery.NewWebhookPool(bcast, whurl, *cfg.DiscoveryTimeout) } else if len(orchURLs) > 0 { - n.OrchestratorPool = discovery.NewOrchestratorPool(bcast, orchURLs, common.Score_Trusted, orchBlacklist) + n.OrchestratorPool = discovery.NewOrchestratorPool(bcast, orchURLs, common.Score_Trusted, orchBlacklist, *cfg.DiscoveryTimeout) } if n.OrchestratorPool == nil { diff --git a/discovery/db_discovery.go b/discovery/db_discovery.go index 7747be9b72..36f9162aae 100644 --- a/discovery/db_discovery.go +++ b/discovery/db_discovery.go @@ -36,9 +36,10 @@ type DBOrchestratorPoolCache struct { rm common.RoundsManager bcast common.Broadcaster orchBlacklist []string + discoveryTimeout time.Duration } -func NewDBOrchestratorPoolCache(ctx context.Context, node *core.LivepeerNode, rm common.RoundsManager, orchBlacklist []string) (*DBOrchestratorPoolCache, error) { +func NewDBOrchestratorPoolCache(ctx context.Context, node *core.LivepeerNode, rm common.RoundsManager, orchBlacklist []string, discoveryTimeout time.Duration) (*DBOrchestratorPoolCache, error) { if node.Eth == nil { return nil, fmt.Errorf("could not create DBOrchestratorPoolCache: LivepeerEthClient is nil") } @@ -50,6 +51,7 @@ func NewDBOrchestratorPoolCache(ctx context.Context, node *core.LivepeerNode, rm rm: rm, bcast: core.NewBroadcaster(node), orchBlacklist: orchBlacklist, + discoveryTimeout: discoveryTimeout, } if err := dbo.cacheTranscoderPool(); err != nil { @@ -135,7 +137,7 @@ func (dbo *DBOrchestratorPoolCache) GetOrchestrators(ctx context.Context, numOrc return true } - orchPool := NewOrchestratorPoolWithPred(dbo.bcast, uris, pred, common.Score_Untrusted, dbo.orchBlacklist) + orchPool := NewOrchestratorPoolWithPred(dbo.bcast, uris, pred, common.Score_Untrusted, dbo.orchBlacklist, dbo.discoveryTimeout) orchInfos, err := orchPool.GetOrchestrators(ctx, numOrchestrators, suspender, caps, scorePred) if err != nil || len(orchInfos) <= 0 { return nil, err @@ -187,7 +189,8 @@ func (dbo *DBOrchestratorPoolCache) cacheOrchestratorStake() error { } resc, errc := make(chan *common.DBOrch, len(orchs)), make(chan error, len(orchs)) - ctx, cancel := context.WithTimeout(context.Background(), getOrchestratorsTimeoutLoop) + timeout := getOrchestratorTimeoutLoop //needs to be same or longer than GRPCConnectTimeout in server/rpc.go + ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() currentRound := dbo.rm.LastInitializedRound() @@ -263,7 +266,8 @@ func (dbo *DBOrchestratorPoolCache) cacheDBOrchs() error { } resc, errc := make(chan *common.DBOrch, len(orchs)), make(chan error, len(orchs)) - ctx, cancel := context.WithTimeout(context.Background(), getOrchestratorsTimeoutLoop) + timeout := getOrchestratorTimeoutLoop //needs to be same or longer than GRPCConnectTimeout in server/rpc.go + ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() getOrchInfo := func(dbOrch *common.DBOrch) { diff --git a/discovery/discovery.go b/discovery/discovery.go index 4b37b9c934..bc488274cc 100644 --- a/discovery/discovery.go +++ b/discovery/discovery.go @@ -20,20 +20,20 @@ import ( "github.com/golang/glog" ) -var getOrchestratorsTimeoutLoop = 3 * time.Second -var getOrchestratorsCutoffTimeout = 500 * time.Millisecond +var getOrchestratorTimeoutLoop = 3 * time.Second var maxGetOrchestratorCutoffTimeout = 6 * time.Second var serverGetOrchInfo = server.GetOrchestratorInfo type orchestratorPool struct { - infos []common.OrchestratorLocalInfo - pred func(info *net.OrchestratorInfo) bool - bcast common.Broadcaster - orchBlacklist []string + infos []common.OrchestratorLocalInfo + pred func(info *net.OrchestratorInfo) bool + bcast common.Broadcaster + orchBlacklist []string + discoveryTimeout time.Duration } -func NewOrchestratorPool(bcast common.Broadcaster, uris []*url.URL, score float32, orchBlacklist []string) *orchestratorPool { +func NewOrchestratorPool(bcast common.Broadcaster, uris []*url.URL, score float32, orchBlacklist []string, discoveryTimeout time.Duration) *orchestratorPool { if len(uris) <= 0 { // Should we return here? glog.Error("Orchestrator pool does not have any URIs") @@ -42,13 +42,13 @@ func NewOrchestratorPool(bcast common.Broadcaster, uris []*url.URL, score float3 for _, uri := range uris { infos = append(infos, common.OrchestratorLocalInfo{URL: uri, Score: score}) } - return &orchestratorPool{infos: infos, bcast: bcast, orchBlacklist: orchBlacklist} + return &orchestratorPool{infos: infos, bcast: bcast, orchBlacklist: orchBlacklist, discoveryTimeout: discoveryTimeout} } func NewOrchestratorPoolWithPred(bcast common.Broadcaster, addresses []*url.URL, - pred func(*net.OrchestratorInfo) bool, score float32, orchBlacklist []string) *orchestratorPool { + pred func(*net.OrchestratorInfo) bool, score float32, orchBlacklist []string, discoveryTimeout time.Duration) *orchestratorPool { - pool := NewOrchestratorPool(bcast, addresses, score, orchBlacklist) + pool := NewOrchestratorPool(bcast, addresses, score, orchBlacklist, discoveryTimeout) pool.pred = pred return pool } @@ -136,7 +136,7 @@ func (o *orchestratorPool) GetOrchestrators(ctx context.Context, numOrchestrator } // try to wait for orchestrators until at least 1 is found (with the exponential backoff timout) - timeout := getOrchestratorsCutoffTimeout + timeout := o.discoveryTimeout timer := time.NewTimer(timeout) for nbResp < numAvailableOrchs && len(ods) < numOrchestrators && !timedOut { diff --git a/discovery/discovery_test.go b/discovery/discovery_test.go index 33e20e5b79..dccf8dab5c 100644 --- a/discovery/discovery_test.go +++ b/discovery/discovery_test.go @@ -43,7 +43,7 @@ func TestNewDBOrchestratorPoolCache_NilEthClient_ReturnsError(t *testing.T) { } ctx, cancel := context.WithCancel(context.Background()) defer cancel() - pool, err := NewDBOrchestratorPoolCache(ctx, node, &stubRoundsManager{}, []string{}) + pool, err := NewDBOrchestratorPoolCache(ctx, node, &stubRoundsManager{}, []string{}, 500*time.Millisecond) assert.Nil(pool) assert.EqualError(err, "could not create DBOrchestratorPoolCache: LivepeerEthClient is nil") } @@ -73,7 +73,7 @@ func TestDeadLock(t *testing.T) { uris := stringsToURIs(addresses) assert := assert.New(t) wg.Add(len(uris)) - pool := NewOrchestratorPool(nil, uris, common.Score_Trusted, []string{}) + pool := NewOrchestratorPool(nil, uris, common.Score_Trusted, []string{}, 50*time.Millisecond) infos, err := pool.GetOrchestrators(context.TODO(), 1, newStubSuspender(), newStubCapabilities(), common.ScoreAtLeast(0)) assert.Nil(err, "Should not be error") assert.Len(infos, 1, "Should return one orchestrator") @@ -120,7 +120,7 @@ func TestDeadLock_NewOrchestratorPoolWithPred(t *testing.T) { } wg.Add(len(uris)) - pool := NewOrchestratorPoolWithPred(nil, uris, pred, common.Score_Trusted, []string{}) + pool := NewOrchestratorPoolWithPred(nil, uris, pred, common.Score_Trusted, []string{}, 50*time.Millisecond) infos, err := pool.GetOrchestrators(context.TODO(), 1, newStubSuspender(), newStubCapabilities(), common.ScoreAtLeast(0)) assert.Nil(err, "Should not be error") @@ -132,12 +132,12 @@ func TestPoolSize(t *testing.T) { addresses := stringsToURIs([]string{"https://127.0.0.1:8936", "https://127.0.0.1:8937", "https://127.0.0.1:8938"}) assert := assert.New(t) - pool := NewOrchestratorPool(nil, addresses, common.Score_Trusted, []string{}) + pool := NewOrchestratorPool(nil, addresses, common.Score_Trusted, []string{}, 50*time.Millisecond) assert.Equal(3, pool.Size()) // will results in len(uris) <= 0 -> log Error errorLogsBefore := glog.Stats.Error.Lines() - pool = NewOrchestratorPool(nil, nil, common.Score_Trusted, []string{}) + pool = NewOrchestratorPool(nil, nil, common.Score_Trusted, []string{}, 50*time.Millisecond) errorLogsAfter := glog.Stats.Error.Lines() assert.Equal(0, pool.Size()) assert.NotZero(t, errorLogsAfter-errorLogsBefore) @@ -163,7 +163,7 @@ func TestDBOrchestratorPoolCacheSize(t *testing.T) { goleak.VerifyNone(t, common.IgnoreRoutines()...) }() - emptyPool, err := NewDBOrchestratorPoolCache(ctx, node, &stubRoundsManager{}, []string{}) + emptyPool, err := NewDBOrchestratorPoolCache(ctx, node, &stubRoundsManager{}, []string{}, 500*time.Millisecond) require.NoError(err) require.NotNil(emptyPool) assert.Equal(0, emptyPool.Size()) @@ -174,7 +174,7 @@ func TestDBOrchestratorPoolCacheSize(t *testing.T) { dbh.UpdateOrch(ethOrchToDBOrch(o)) } - nonEmptyPool, err := NewDBOrchestratorPoolCache(ctx, node, &stubRoundsManager{}, []string{}) + nonEmptyPool, err := NewDBOrchestratorPoolCache(ctx, node, &stubRoundsManager{}, []string{}, 500*time.Millisecond) require.NoError(err) require.NotNil(nonEmptyPool) assert.Equal(len(addresses), nonEmptyPool.Size()) @@ -218,7 +218,7 @@ func TestNewDBOrchestorPoolCache_NoEthAddress(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - pool, err := NewDBOrchestratorPoolCache(ctx, node, rm, []string{}) + pool, err := NewDBOrchestratorPoolCache(ctx, node, rm, []string{}, 500*time.Millisecond) require.Nil(err) // Check that serverGetOrchInfo returns early and the orchestrator isn't updated @@ -272,7 +272,7 @@ func TestNewDBOrchestratorPoolCache_InvalidPrices(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - pool, err := NewDBOrchestratorPoolCache(ctx, node, rm, []string{}) + pool, err := NewDBOrchestratorPoolCache(ctx, node, rm, []string{}, 500*time.Millisecond) require.Nil(err) // priceInfo.PixelsPerUnit = 0 @@ -343,7 +343,7 @@ func TestNewDBOrchestratorPoolCache_GivenListOfOrchs_CreatesPoolCacheCorrectly(t sender.On("ValidateTicketParams", mock.Anything).Return(nil).Times(3) - pool, err := NewDBOrchestratorPoolCache(ctx, node, &stubRoundsManager{}, []string{}) + pool, err := NewDBOrchestratorPoolCache(ctx, node, &stubRoundsManager{}, []string{}, 500*time.Millisecond) require.NoError(err) assert.Equal(pool.Size(), 3) orchs, err := pool.GetOrchestrators(context.TODO(), pool.Size(), newStubSuspender(), newStubCapabilities(), common.ScoreAtLeast(0)) @@ -413,7 +413,7 @@ func TestNewDBOrchestratorPoolCache_TestURLs(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - pool, err := NewDBOrchestratorPoolCache(ctx, node, &stubRoundsManager{}, []string{}) + pool, err := NewDBOrchestratorPoolCache(ctx, node, &stubRoundsManager{}, []string{}, 500*time.Millisecond) require.NoError(err) // bad URLs are inserted in the database but are not included in the working set, as there is no returnable query for getting their priceInfo // And if URL is updated it won't be picked up until next cache update @@ -446,7 +446,7 @@ func TestNewDBOrchestratorPoolCache_TestURLs_Empty(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - pool, err := NewDBOrchestratorPoolCache(ctx, node, &stubRoundsManager{}, []string{}) + pool, err := NewDBOrchestratorPoolCache(ctx, node, &stubRoundsManager{}, []string{}, 500*time.Millisecond) require.NoError(err) assert.Equal(0, pool.Size()) infos := pool.GetInfos() @@ -531,7 +531,7 @@ func TestNewDBOrchestorPoolCache_PollOrchestratorInfo(t *testing.T) { origCacheRefreshInterval := cacheRefreshInterval cacheRefreshInterval = 200 * time.Millisecond defer func() { cacheRefreshInterval = origCacheRefreshInterval }() - pool, err := NewDBOrchestratorPoolCache(ctx, node, &stubRoundsManager{}, []string{}) + pool, err := NewDBOrchestratorPoolCache(ctx, node, &stubRoundsManager{}, []string{}, 500*time.Millisecond) require.NoError(err) // Ensure orchestrators exist in DB @@ -569,7 +569,7 @@ func TestNewOrchestratorPoolCache_GivenListOfOrchs_CreatesPoolCacheCorrectly(t * assert := assert.New(t) // creating NewOrchestratorPool with orch addresses - offchainOrch := NewOrchestratorPool(nil, addresses, common.Score_Trusted, []string{}) + offchainOrch := NewOrchestratorPool(nil, addresses, common.Score_Trusted, []string{}, 50*time.Millisecond) for i, info := range offchainOrch.infos { assert.Equal(info.URL.String(), addresses[i].String()) @@ -595,7 +595,7 @@ func TestNewOrchestratorPoolWithPred_TestPredicate(t *testing.T) { } uris := stringsToURIs(addresses) - pool := NewOrchestratorPoolWithPred(nil, uris, pred, common.Score_Trusted, []string{}) + pool := NewOrchestratorPoolWithPred(nil, uris, pred, common.Score_Trusted, []string{}, 50*time.Millisecond) oInfo := &net.OrchestratorInfo{ PriceInfo: &net.PriceInfo{ @@ -685,7 +685,7 @@ func TestCachedPool_AllOrchestratorsTooExpensive_ReturnsAllOrchestrators(t *test sender.On("ValidateTicketParams", mock.Anything).Return(nil) - pool, err := NewDBOrchestratorPoolCache(ctx, node, &stubRoundsManager{}, []string{}) + pool, err := NewDBOrchestratorPoolCache(ctx, node, &stubRoundsManager{}, []string{}, 500*time.Millisecond) require.NoError(err) // ensuring orchs exist in DB @@ -775,7 +775,7 @@ func TestCachedPool_GetOrchestrators_MaxBroadcastPriceNotSet(t *testing.T) { sender.On("ValidateTicketParams", mock.Anything).Return(nil) - pool, err := NewDBOrchestratorPoolCache(ctx, node, &stubRoundsManager{}, []string{}) + pool, err := NewDBOrchestratorPoolCache(ctx, node, &stubRoundsManager{}, []string{}, 500*time.Millisecond) require.NoError(err) // ensuring orchs exist in DB @@ -881,7 +881,7 @@ func TestCachedPool_N_OrchestratorsGoodPricing_ReturnsNOrchestrators(t *testing. sender.On("ValidateTicketParams", mock.Anything).Return(nil) - pool, err := NewDBOrchestratorPoolCache(ctx, node, &stubRoundsManager{}, []string{}) + pool, err := NewDBOrchestratorPoolCache(ctx, node, &stubRoundsManager{}, []string{}, 500*time.Millisecond) require.NoError(err) // ensuring orchs exist in DB @@ -928,7 +928,7 @@ func TestCachedPool_GetOrchestrators_TicketParamsValidation(t *testing.T) { gmp := runtime.GOMAXPROCS(50) defer runtime.GOMAXPROCS(gmp) // Disable retrying discovery with extended timeout - maxGetOrchestratorCutoffTimeout = getOrchestratorsCutoffTimeout + maxGetOrchestratorCutoffTimeout = 500 * time.Millisecond server.BroadcastCfg.SetMaxPrice(nil) @@ -971,7 +971,7 @@ func TestCachedPool_GetOrchestrators_TicketParamsValidation(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - pool, err := NewDBOrchestratorPoolCache(ctx, node, &stubRoundsManager{}, []string{}) + pool, err := NewDBOrchestratorPoolCache(ctx, node, &stubRoundsManager{}, []string{}, 500*time.Millisecond) require.NoError(err) // Test 25 out of 50 orchs pass ticket params validation @@ -1065,7 +1065,7 @@ func TestCachedPool_GetOrchestrators_OnlyActiveOrchestrators(t *testing.T) { sender.On("ValidateTicketParams", mock.Anything).Return(nil) - pool, err := NewDBOrchestratorPoolCache(ctx, node, &stubRoundsManager{round: big.NewInt(24)}, []string{}) + pool, err := NewDBOrchestratorPoolCache(ctx, node, &stubRoundsManager{round: big.NewInt(24)}, []string{}, 500*time.Millisecond) require.NoError(err) // ensuring orchs exist in DB @@ -1120,7 +1120,7 @@ func TestNewWHOrchestratorPoolCache(t *testing.T) { // assert created webhook pool is correct length whURL, _ := url.ParseRequestURI("https://livepeer.live/api/orchestrator") - whpool := NewWebhookPool(nil, whURL) + whpool := NewWebhookPool(nil, whURL, 500*time.Millisecond) assert.Equal(3, whpool.Size()) // assert that list is not refreshed if lastRequest is less than 1 min ago and hash is the same @@ -1270,6 +1270,7 @@ func TestOrchestratorPool_GetOrchestrators(t *testing.T) { assert := assert.New(t) addresses := stringsToURIs([]string{"https://127.0.0.1:8936", "https://127.0.0.1:8937", "https://127.0.0.1:8938"}) + orchTimeout := 500 * time.Millisecond wg := sync.WaitGroup{} orchCb := func() error { return nil } @@ -1283,7 +1284,7 @@ func TestOrchestratorPool_GetOrchestrators(t *testing.T) { }, err } - pool := NewOrchestratorPool(nil, addresses, common.Score_Trusted, []string{}) + pool := NewOrchestratorPool(nil, addresses, common.Score_Trusted, []string{}, orchTimeout) // Check that we receive everything wg.Add(len(addresses)) @@ -1326,7 +1327,7 @@ func TestOrchestratorPool_GetOrchestrators(t *testing.T) { assert.Len(res, len(addresses)-1) // Ensure that the timeout did not fire assert.Less(end.Sub(start).Milliseconds(), - getOrchestratorsTimeoutLoop.Milliseconds()) + pool.discoveryTimeout.Milliseconds()) } @@ -1348,7 +1349,7 @@ func TestOrchestratorPool_GetOrchestrators_SuspendedOrchs(t *testing.T) { }, err } - pool := NewOrchestratorPool(nil, addresses, common.Score_Trusted, []string{}) + pool := NewOrchestratorPool(nil, addresses, common.Score_Trusted, []string{}, 50*time.Millisecond) // suspend https://127.0.0.1:8938 sus := newStubSuspender() @@ -1417,7 +1418,7 @@ func TestOrchestratorPool_ShuffleGetOrchestrators(t *testing.T) { return &net.OrchestratorInfo{Transcoder: server.String()}, nil } - pool := NewOrchestratorPool(nil, addresses, common.Score_Trusted, []string{}) + pool := NewOrchestratorPool(nil, addresses, common.Score_Trusted, []string{}, 50*time.Millisecond) // Check that randomization happens: check for elements in a different order // Could fail sometimes due to scheduling; the order of execution is undefined @@ -1480,14 +1481,12 @@ func TestOrchestratorPool_GetOrchestratorTimeout(t *testing.T) { return &net.OrchestratorInfo{}, nil } - oldTimeout := getOrchestratorsTimeoutLoop - getOrchestratorsTimeoutLoop = 1 * time.Millisecond - defer func() { getOrchestratorsTimeoutLoop = oldTimeout }() + timeout := 1 * time.Millisecond - pool := NewOrchestratorPool(nil, addresses, common.Score_Trusted, []string{}) + pool := NewOrchestratorPool(nil, addresses, common.Score_Trusted, []string{}, timeout) timedOut := func(start, end time.Time) bool { - return end.Sub(start).Milliseconds() >= getOrchestratorsTimeoutLoop.Milliseconds() + return end.Sub(start).Milliseconds() >= pool.discoveryTimeout.Milliseconds() } // We may only return a subset of responses for a given test @@ -1529,7 +1528,7 @@ func TestOrchestratorPool_GetOrchestratorTimeout(t *testing.T) { assert.True(responsesDrained(), "Did not drain responses in time") // Sanity check we get addresses with a reasonable timeout and no forced delay - getOrchestratorsTimeoutLoop = 25 * time.Millisecond + pool.discoveryTimeout = 25 * time.Millisecond go drainOrchResponses(len(addresses)) start = time.Now() res, err = getOrchestrators(len(addresses)) @@ -1579,7 +1578,7 @@ func TestOrchestratorPool_Capabilities(t *testing.T) { responses := []*net.OrchestratorInfo{i1, i2, i3, i4, i5} addresses := stringsToURIs([]string{"a://b", "a://b", "a://b", "a://b", "a://b"}) - pool := NewOrchestratorPool(nil, addresses, common.Score_Trusted, []string{hex.EncodeToString(address)}) + pool := NewOrchestratorPool(nil, addresses, common.Score_Trusted, []string{hex.EncodeToString(address)}, 50*time.Millisecond) // some sanity checks assert.Len(addresses, len(responses)) @@ -1629,3 +1628,24 @@ func TestOrchestratorPool_Capabilities(t *testing.T) { assert.Len(infos, 1) assert.Equal(i4, infos[0].RemoteInfo) } + +func TestSetGetOrchestratorTimeout(t *testing.T) { + assert := assert.New(t) + dbh, _, err := common.TempDB(t) + require := require.New(t) + require.Nil(err) + + sender := &pm.MockSender{} + node := &core.LivepeerNode{ + Database: dbh, + Eth: ð.StubClient{TotalStake: big.NewInt(0)}, + Sender: sender, + } + + //set timeout to 1000ms + poolCache, err := NewDBOrchestratorPoolCache(context.TODO(), node, &stubRoundsManager{}, []string{}, 1000*time.Millisecond) + assert.Nil(err) + //confirm the timeout is now 1000ms + assert.Equal(poolCache.discoveryTimeout, 1000*time.Millisecond) + +} diff --git a/discovery/wh_discovery.go b/discovery/wh_discovery.go index 1c4c8539f3..7e552dfcce 100644 --- a/discovery/wh_discovery.go +++ b/discovery/wh_discovery.go @@ -21,19 +21,21 @@ type webhookResponse struct { } type webhookPool struct { - pool *orchestratorPool - callback *url.URL - responseHash ethcommon.Hash - lastRequest time.Time - mu *sync.RWMutex - bcast common.Broadcaster + pool *orchestratorPool + callback *url.URL + responseHash ethcommon.Hash + lastRequest time.Time + mu *sync.RWMutex + bcast common.Broadcaster + discoveryTimeout time.Duration } -func NewWebhookPool(bcast common.Broadcaster, callback *url.URL) *webhookPool { +func NewWebhookPool(bcast common.Broadcaster, callback *url.URL, discoveryTimeout time.Duration) *webhookPool { p := &webhookPool{ - callback: callback, - mu: &sync.RWMutex{}, - bcast: bcast, + callback: callback, + mu: &sync.RWMutex{}, + bcast: bcast, + discoveryTimeout: discoveryTimeout, } go p.getInfos() return p @@ -71,7 +73,7 @@ func (w *webhookPool) getInfos() ([]common.OrchestratorLocalInfo, error) { } // pool = NewOrchestratorPool(w.bcast, addrs) - pool = &orchestratorPool{infos: infos, bcast: w.bcast} + pool = &orchestratorPool{infos: infos, bcast: w.bcast, discoveryTimeout: w.discoveryTimeout} w.mu.Lock() w.responseHash = hash