Skip to content

Commit

Permalink
feat(rewardService): add reward call retries
Browse files Browse the repository at this point in the history
This commit adds logic to retry reward calls if they fail. The retry mechanism
can be enabled by setting the `rewardRetryInterval` to a value greater than 0.
If `rewardRetryInterval` is set to 0, retries are disabled, and the reward call
is attempted only once.
  • Loading branch information
rickstaa committed Oct 15, 2024
1 parent aad1a23 commit 0214623
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 18 deletions.
1 change: 1 addition & 0 deletions cmd/livepeer/livepeer.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ func parseLivepeerConfig() starter.LivepeerConfig {
cfg.RedeemerAddr = flag.String("redeemerAddr", *cfg.RedeemerAddr, "URL of the ticket redemption service to use")
// Reward service
cfg.Reward = flag.Bool("reward", false, "Set to true to run a reward service")
cfg.RewardRetryInterval = flag.Duration("rewardRetryInterval", *cfg.RewardRetryInterval, "Interval at which the reward call is retried in case of failure, set to 0 to disable retries")
// Metrics & logging:
cfg.Monitor = flag.Bool("monitor", *cfg.Monitor, "Set to true to send performance metrics")
cfg.MetricsPerStream = flag.Bool("metricsPerStream", *cfg.MetricsPerStream, "Set to true to group performance metrics per stream")
Expand Down
5 changes: 4 additions & 1 deletion cmd/livepeer/starter/starter.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ type LivepeerConfig struct {
Redeemer *bool
RedeemerAddr *string
Reward *bool
RewardRetryInterval *time.Duration
Monitor *bool
MetricsPerStream *bool
MetricsExposeClientIP *bool
Expand Down Expand Up @@ -216,6 +217,7 @@ func DefaultLivepeerConfig() LivepeerConfig {
defaultBlockPollingInterval := 5
defaultRedeemer := false
defaultRedeemerAddr := ""
defaultRewardTryInterval := 0 * time.Minute // disabled by default
defaultMonitor := false
defaultMetricsPerStream := false
defaultMetricsExposeClientIP := false
Expand Down Expand Up @@ -308,6 +310,7 @@ func DefaultLivepeerConfig() LivepeerConfig {
BlockPollingInterval: &defaultBlockPollingInterval,
Redeemer: &defaultRedeemer,
RedeemerAddr: &defaultRedeemerAddr,
RewardRetryInterval: &defaultRewardTryInterval,
Monitor: &defaultMonitor,
MetricsPerStream: &defaultMetricsPerStream,
MetricsExposeClientIP: &defaultMetricsExposeClientIP,
Expand Down Expand Up @@ -1011,7 +1014,7 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) {
if reward {
// Start reward service
// The node will only call reward if it is active in the current round
rs := eth.NewRewardService(n.Eth, timeWatcher)
rs := eth.NewRewardService(n.Eth, timeWatcher, *cfg.RewardRetryInterval)
go func() {
if err := rs.Start(ctx); err != nil {
serviceErr <- err
Expand Down
68 changes: 51 additions & 17 deletions eth/rewardservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"sync"
"time"

"github.com/ethereum/go-ethereum/core/types"
"github.com/golang/glog"
Expand All @@ -16,17 +17,20 @@ var (
)

type RewardService struct {
client LivepeerEthClient
working bool
cancelWorker context.CancelFunc
tw timeWatcher
mu sync.Mutex
client LivepeerEthClient
working bool
cancelWorker context.CancelFunc
tw timeWatcher
mu sync.Mutex
rewardRetryInterval time.Duration
cancelRetryWorker context.CancelFunc
}

func NewRewardService(client LivepeerEthClient, tw timeWatcher) *RewardService {
func NewRewardService(client LivepeerEthClient, tw timeWatcher, rewardRetryInterval time.Duration) *RewardService {
return &RewardService{
client: client,
tw: tw,
client: client,
tw: tw,
rewardRetryInterval: rewardRetryInterval,
}
}

Expand Down Expand Up @@ -54,15 +58,12 @@ func (s *RewardService) Start(ctx context.Context) error {
glog.Errorf("Round subscription error err=%q", err)
}
case <-roundSink:
go func() {
err := s.tryReward()
if err != nil {
glog.Errorf("Error trying to call reward for round %v err=%q", s.tw.LastInitializedRound(), err)
if monitor.Enabled {
monitor.RewardCallError(err.Error())
}
}
}()
s.cancelRetryWorker()

retryCtx, cancelRetry := context.WithCancel(cancelCtx)
s.cancelRetryWorker = cancelRetry

go s.callRewardWithRetries(retryCtx)
case <-cancelCtx.Done():
glog.V(5).Infof("Reward service done")
return nil
Expand Down Expand Up @@ -113,3 +114,36 @@ func (s *RewardService) tryReward() error {

return nil
}

func (s *RewardService) callRewardWithRetries(ctx context.Context) {
if s.rewardRetryInterval == 0 {
err := s.tryReward()
if err != nil {
glog.Errorf("Error trying to call reward for round %v err=%q", s.tw.LastInitializedRound(), err)
if monitor.Enabled {
monitor.RewardCallError(err.Error())
}
}
return
}

ticker := time.NewTicker(s.rewardRetryInterval)
defer ticker.Stop()

for {
select {
case <-ticker.C:
err := s.tryReward()
if err == nil {
return
}

glog.Errorf("Error trying to call reward for round %v err=%q", s.tw.LastInitializedRound(), err)
if monitor.Enabled {
monitor.RewardCallError(err.Error())
}
case <-ctx.Done():
return
}
}
}

0 comments on commit 0214623

Please sign in to comment.