From 02146232cb3b9ed95c3519c4d04f96bdc139b9a6 Mon Sep 17 00:00:00 2001 From: Rick Staa Date: Tue, 15 Oct 2024 09:40:43 +0200 Subject: [PATCH] feat(rewardService): add reward call retries This commit adds logic to retry reward calls if they fail. The retry mechanism can be enabled by setting the `rewardRetryInterval` to a value greater than 0. If `rewardRetryInterval` is set to 0, retries are disabled, and the reward call is attempted only once. --- cmd/livepeer/livepeer.go | 1 + cmd/livepeer/starter/starter.go | 5 ++- eth/rewardservice.go | 68 ++++++++++++++++++++++++--------- 3 files changed, 56 insertions(+), 18 deletions(-) diff --git a/cmd/livepeer/livepeer.go b/cmd/livepeer/livepeer.go index 446bbe5f20..c71ae7bfc3 100755 --- a/cmd/livepeer/livepeer.go +++ b/cmd/livepeer/livepeer.go @@ -191,6 +191,7 @@ func parseLivepeerConfig() starter.LivepeerConfig { cfg.RedeemerAddr = flag.String("redeemerAddr", *cfg.RedeemerAddr, "URL of the ticket redemption service to use") // Reward service cfg.Reward = flag.Bool("reward", false, "Set to true to run a reward service") + cfg.RewardRetryInterval = flag.Duration("rewardRetryInterval", *cfg.RewardRetryInterval, "Interval at which the reward call is retried in case of failure, set to 0 to disable retries") // Metrics & logging: cfg.Monitor = flag.Bool("monitor", *cfg.Monitor, "Set to true to send performance metrics") cfg.MetricsPerStream = flag.Bool("metricsPerStream", *cfg.MetricsPerStream, "Set to true to group performance metrics per stream") diff --git a/cmd/livepeer/starter/starter.go b/cmd/livepeer/starter/starter.go index 92ba9079bb..3b8defe1cf 100755 --- a/cmd/livepeer/starter/starter.go +++ b/cmd/livepeer/starter/starter.go @@ -135,6 +135,7 @@ type LivepeerConfig struct { Redeemer *bool RedeemerAddr *string Reward *bool + RewardRetryInterval *time.Duration Monitor *bool MetricsPerStream *bool MetricsExposeClientIP *bool @@ -216,6 +217,7 @@ func DefaultLivepeerConfig() LivepeerConfig { defaultBlockPollingInterval := 5 defaultRedeemer := false defaultRedeemerAddr := "" + defaultRewardTryInterval := 0 * time.Minute // disabled by default defaultMonitor := false defaultMetricsPerStream := false defaultMetricsExposeClientIP := false @@ -308,6 +310,7 @@ func DefaultLivepeerConfig() LivepeerConfig { BlockPollingInterval: &defaultBlockPollingInterval, Redeemer: &defaultRedeemer, RedeemerAddr: &defaultRedeemerAddr, + RewardRetryInterval: &defaultRewardTryInterval, Monitor: &defaultMonitor, MetricsPerStream: &defaultMetricsPerStream, MetricsExposeClientIP: &defaultMetricsExposeClientIP, @@ -1011,7 +1014,7 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) { if reward { // Start reward service // The node will only call reward if it is active in the current round - rs := eth.NewRewardService(n.Eth, timeWatcher) + rs := eth.NewRewardService(n.Eth, timeWatcher, *cfg.RewardRetryInterval) go func() { if err := rs.Start(ctx); err != nil { serviceErr <- err diff --git a/eth/rewardservice.go b/eth/rewardservice.go index bd10a062a3..88d4965ed5 100644 --- a/eth/rewardservice.go +++ b/eth/rewardservice.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "sync" + "time" "github.com/ethereum/go-ethereum/core/types" "github.com/golang/glog" @@ -16,17 +17,20 @@ var ( ) type RewardService struct { - client LivepeerEthClient - working bool - cancelWorker context.CancelFunc - tw timeWatcher - mu sync.Mutex + client LivepeerEthClient + working bool + cancelWorker context.CancelFunc + tw timeWatcher + mu sync.Mutex + rewardRetryInterval time.Duration + cancelRetryWorker context.CancelFunc } -func NewRewardService(client LivepeerEthClient, tw timeWatcher) *RewardService { +func NewRewardService(client LivepeerEthClient, tw timeWatcher, rewardRetryInterval time.Duration) *RewardService { return &RewardService{ - client: client, - tw: tw, + client: client, + tw: tw, + rewardRetryInterval: rewardRetryInterval, } } @@ -54,15 +58,12 @@ func (s *RewardService) Start(ctx context.Context) error { glog.Errorf("Round subscription error err=%q", err) } case <-roundSink: - go func() { - err := s.tryReward() - if err != nil { - glog.Errorf("Error trying to call reward for round %v err=%q", s.tw.LastInitializedRound(), err) - if monitor.Enabled { - monitor.RewardCallError(err.Error()) - } - } - }() + s.cancelRetryWorker() + + retryCtx, cancelRetry := context.WithCancel(cancelCtx) + s.cancelRetryWorker = cancelRetry + + go s.callRewardWithRetries(retryCtx) case <-cancelCtx.Done(): glog.V(5).Infof("Reward service done") return nil @@ -113,3 +114,36 @@ func (s *RewardService) tryReward() error { return nil } + +func (s *RewardService) callRewardWithRetries(ctx context.Context) { + if s.rewardRetryInterval == 0 { + err := s.tryReward() + if err != nil { + glog.Errorf("Error trying to call reward for round %v err=%q", s.tw.LastInitializedRound(), err) + if monitor.Enabled { + monitor.RewardCallError(err.Error()) + } + } + return + } + + ticker := time.NewTicker(s.rewardRetryInterval) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + err := s.tryReward() + if err == nil { + return + } + + glog.Errorf("Error trying to call reward for round %v err=%q", s.tw.LastInitializedRound(), err) + if monitor.Enabled { + monitor.RewardCallError(err.Error()) + } + case <-ctx.Done(): + return + } + } +}