Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: make monitor params configurable via flags #246

Merged
merged 1 commit into from
Aug 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions cluster/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,22 @@ type Config struct {
BlockedHostnames []string
DeploymentIngressStaticHosts bool
DeploymentIngressDomain string
MonitorMaxRetries uint
MonitorRetryPeriod time.Duration
MonitorRetryPeriodJitter time.Duration
MonitorHealthcheckPeriod time.Duration
MonitorHealthcheckPeriodJitter time.Duration
ClusterSettings map[interface{}]interface{}
}

func NewDefaultConfig() Config {
return Config{
InventoryResourcePollPeriod: time.Second * 5,
InventoryResourceDebugFrequency: 10,
MonitorMaxRetries: 40,
MonitorRetryPeriod: time.Second * 4, // nolint revive
MonitorRetryPeriodJitter: time.Second * 15,
MonitorHealthcheckPeriod: time.Second * 10, // nolint revive
MonitorHealthcheckPeriodJitter: time.Second * 5,
}
}
35 changes: 13 additions & 22 deletions cluster/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,6 @@ import (
"github.com/akash-network/provider/tools/fromctx"
)

const (
monitorMaxRetries = 40
monitorRetryPeriodMin = time.Second * 4 // nolint revive
monitorRetryPeriodJitter = time.Second * 15

monitorHealthcheckPeriodMin = time.Second * 10 // nolint revive
monitorHealthcheckPeriodJitter = time.Second * 5
)

var (
deploymentHealthCheckCounter = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "provider_deployment_monitor_health",
Expand All @@ -45,22 +36,22 @@ type deploymentMonitor struct {

deployment ctypes.IDeployment

attempts int
attempts uint
log log.Logger
lc lifecycle.Lifecycle

clusterSettings map[interface{}]interface{}
config Config
}

func newDeploymentMonitor(dm *deploymentManager) *deploymentMonitor {
m := &deploymentMonitor{
bus: dm.bus,
session: dm.session,
client: dm.client,
deployment: dm.deployment,
log: dm.log.With("cmp", "deployment-monitor"),
lc: lifecycle.New(),
clusterSettings: dm.config.ClusterSettings,
bus: dm.bus,
session: dm.session,
client: dm.client,
deployment: dm.deployment,
log: dm.log.With("cmp", "deployment-monitor"),
lc: lifecycle.New(),
config: dm.config,
}

go m.lc.WatchChannel(dm.lc.ShuttingDown())
Expand Down Expand Up @@ -126,7 +117,7 @@ loop:

m.publishStatus(event.ClusterDeploymentPending)

if m.attempts <= monitorMaxRetries {
if m.attempts <= m.config.MonitorMaxRetries {
// unhealthy. retry
tickch = m.scheduleRetry()
break
Expand Down Expand Up @@ -166,7 +157,7 @@ func (m *deploymentMonitor) runCheck(ctx context.Context) <-chan runner.Result {
}

func (m *deploymentMonitor) doCheck(ctx context.Context) (bool, error) {
ctx = fromctx.ApplyToContext(ctx, m.clusterSettings)
ctx = fromctx.ApplyToContext(ctx, m.config.ClusterSettings)

status, err := m.client.LeaseStatus(ctx, m.deployment.LeaseID())

Expand Down Expand Up @@ -226,11 +217,11 @@ func (m *deploymentMonitor) publishStatus(status event.ClusterDeploymentStatus)
}

func (m *deploymentMonitor) scheduleRetry() <-chan time.Time {
return m.schedule(monitorRetryPeriodMin, monitorRetryPeriodJitter)
return m.schedule(m.config.MonitorRetryPeriod, m.config.MonitorRetryPeriodJitter)
}

func (m *deploymentMonitor) scheduleHealthcheck() <-chan time.Time {
return m.schedule(monitorHealthcheckPeriodMin, monitorHealthcheckPeriodJitter)
return m.schedule(m.config.MonitorHealthcheckPeriod, m.config.MonitorHealthcheckPeriodJitter)
}

func (m *deploymentMonitor) schedule(min, jitter time.Duration) <-chan time.Time {
Expand Down
3 changes: 3 additions & 0 deletions cluster/monitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ func TestMonitorInstantiate(t *testing.T) {
deployment: deployment,
log: myLog,
lc: lc,
config: NewDefaultConfig(),
}
monitor := newDeploymentMonitor(myDeploymentManager)
require.NotNil(t, monitor)
Expand Down Expand Up @@ -78,6 +79,7 @@ func TestMonitorSendsClusterDeploymentPending(t *testing.T) {
deployment: deployment,
log: myLog,
lc: lc,
config: NewDefaultConfig(),
}
monitor := newDeploymentMonitor(myDeploymentManager)
require.NotNil(t, monitor)
Expand Down Expand Up @@ -134,6 +136,7 @@ func TestMonitorSendsClusterDeploymentDeployed(t *testing.T) {
deployment: deployment,
log: myLog,
lc: lc,
config: NewDefaultConfig(),
}
monitor := newDeploymentMonitor(myDeploymentManager)
require.NotNil(t, monitor)
Expand Down
48 changes: 48 additions & 0 deletions cmd/provider-services/cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,11 @@ const (
FlagBidPriceIPScale = "bid-price-ip-scale"
FlagEnableIPOperator = "ip-operator"
FlagTxBroadcastTimeout = "tx-broadcast-timeout"
FlagMonitorMaxRetries = "monitor-max-retries"
FlagMonitorRetryPeriod = "monitor-retry-period"
FlagMonitorRetryPeriodJitter = "monitor-retry-period-jitter"
FlagMonitorHealthcheckPeriod = "monitor-healthcheck-period"
FlagMonitorHealthcheckPeriodJitter = "monitor-healthcheck-period-jitter"
)

const (
Expand Down Expand Up @@ -131,6 +136,14 @@ func RunCmd() *cobra.Command {
return errors.Errorf(`flag "%s" value must be > "%s"`, FlagWithdrawalPeriod, FlagLeaseFundsMonitorInterval) // nolint: err113
}

if viper.GetDuration(FlagMonitorRetryPeriod) < 4*time.Second {
return errors.Errorf(`flag "%s" value must be > "%s"`, FlagMonitorRetryPeriod, 4*time.Second) // nolint: err113
}

if viper.GetDuration(FlagMonitorHealthcheckPeriod) < 4*time.Second {
return errors.Errorf(`flag "%s" value must be > "%s"`, FlagMonitorHealthcheckPeriod, 4*time.Second) // nolint: err113
}

group, ctx := errgroup.WithContext(cmd.Context())
cmd.SetContext(ctx)

Expand Down Expand Up @@ -397,6 +410,31 @@ func RunCmd() *cobra.Command {
panic(err)
}

cmd.Flags().Uint(FlagMonitorMaxRetries, 40, "max count of status retries before closing the lease. defaults to 40")
if err := viper.BindPFlag(FlagMonitorMaxRetries, cmd.Flags().Lookup(FlagMonitorMaxRetries)); err != nil {
panic(err)
}

cmd.Flags().Duration(FlagMonitorRetryPeriod, 4*time.Second, "monitor status retry period. defaults to 4s (min value)")
if err := viper.BindPFlag(FlagMonitorRetryPeriod, cmd.Flags().Lookup(FlagMonitorRetryPeriod)); err != nil {
panic(err)
}

cmd.Flags().Duration(FlagMonitorRetryPeriodJitter, 15*time.Second, "monitor status retry window. defaults to 15s")
if err := viper.BindPFlag(FlagMonitorRetryPeriodJitter, cmd.Flags().Lookup(FlagMonitorRetryPeriodJitter)); err != nil {
panic(err)
}

cmd.Flags().Duration(FlagMonitorHealthcheckPeriod, 10*time.Second, "monitor healthcheck period. defaults to 10s")
if err := viper.BindPFlag(FlagMonitorHealthcheckPeriod, cmd.Flags().Lookup(FlagMonitorHealthcheckPeriod)); err != nil {
panic(err)
}

cmd.Flags().Duration(FlagMonitorHealthcheckPeriodJitter, 5*time.Second, "monitor healthcheck window. defaults to 5s")
if err := viper.BindPFlag(FlagMonitorHealthcheckPeriodJitter, cmd.Flags().Lookup(FlagMonitorHealthcheckPeriodJitter)); err != nil {
panic(err)
}

if err := providerflags.AddServiceEndpointFlag(cmd, serviceHostnameOperator); err != nil {
panic(err)
}
Expand Down Expand Up @@ -522,6 +560,11 @@ func doRunCmd(ctx context.Context, cmd *cobra.Command, _ []string) error {
cachedResultMaxAge := viper.GetDuration(FlagCachedResultMaxAge)
rpcQueryTimeout := viper.GetDuration(FlagRPCQueryTimeout)
enableIPOperator := viper.GetBool(FlagEnableIPOperator)
monitorMaxRetries := viper.GetUint(FlagMonitorMaxRetries)
monitorRetryPeriod := viper.GetDuration(FlagMonitorRetryPeriod)
monitorRetryPeriodJitter := viper.GetDuration(FlagMonitorRetryPeriodJitter)
monitorHealthcheckPeriod := viper.GetDuration(FlagMonitorHealthcheckPeriod)
monitorHealthcheckPeriodJitter := viper.GetDuration(FlagMonitorHealthcheckPeriodJitter)

pricing, err := createBidPricingStrategy(strategy)
if err != nil {
Expand Down Expand Up @@ -656,6 +699,11 @@ func doRunCmd(ctx context.Context, cmd *cobra.Command, _ []string) error {
config.DeploymentIngressDomain = deploymentIngressDomain
config.BidTimeout = bidTimeout
config.ManifestTimeout = manifestTimeout
config.MonitorMaxRetries = monitorMaxRetries
config.MonitorRetryPeriod = monitorRetryPeriod
config.MonitorRetryPeriodJitter = monitorRetryPeriodJitter
config.MonitorHealthcheckPeriod = monitorHealthcheckPeriod
config.MonitorHealthcheckPeriodJitter = monitorHealthcheckPeriodJitter

if len(providerConfig) != 0 {
pConf, err := config2.ReadConfigPath(providerConfig)
Expand Down
36 changes: 15 additions & 21 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,30 +10,23 @@ import (
types "github.com/akash-network/akash-api/go/node/types/v1beta3"

"github.com/akash-network/provider/bidengine"
"github.com/akash-network/provider/cluster"
)

type Config struct {
ClusterWaitReadyDuration time.Duration
ClusterPublicHostname string
ClusterExternalPortQuantity uint
InventoryResourcePollPeriod time.Duration
InventoryResourceDebugFrequency uint
BidPricingStrategy bidengine.BidPricingStrategy
BidDeposit sdk.Coin
CPUCommitLevel float64
MemoryCommitLevel float64
StorageCommitLevel float64
MaxGroupVolumes int
BlockedHostnames []string
BidTimeout time.Duration
ManifestTimeout time.Duration
BalanceCheckerCfg BalanceCheckerConfig
Attributes types.Attributes
DeploymentIngressStaticHosts bool
DeploymentIngressDomain string
ClusterSettings map[interface{}]interface{}
RPCQueryTimeout time.Duration
CachedResultMaxAge time.Duration
ClusterWaitReadyDuration time.Duration
ClusterPublicHostname string
ClusterExternalPortQuantity uint
BidPricingStrategy bidengine.BidPricingStrategy
BidDeposit sdk.Coin
BidTimeout time.Duration
ManifestTimeout time.Duration
BalanceCheckerCfg BalanceCheckerConfig
Attributes types.Attributes
MaxGroupVolumes int
RPCQueryTimeout time.Duration
CachedResultMaxAge time.Duration
cluster.Config
}

func NewDefaultConfig() Config {
Expand All @@ -45,5 +38,6 @@ func NewDefaultConfig() Config {
WithdrawalPeriod: 24 * time.Hour,
},
MaxGroupVolumes: constants.DefaultMaxGroupVolumes,
Config: cluster.NewDefaultConfig(),
}
}
14 changes: 1 addition & 13 deletions service.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,18 +74,6 @@ func NewService(ctx context.Context,

session = session.ForModule("provider-service")

clusterConfig := cluster.NewDefaultConfig()
clusterConfig.InventoryResourcePollPeriod = cfg.InventoryResourcePollPeriod
clusterConfig.InventoryResourceDebugFrequency = cfg.InventoryResourceDebugFrequency
clusterConfig.InventoryExternalPortQuantity = cfg.ClusterExternalPortQuantity
clusterConfig.CPUCommitLevel = cfg.CPUCommitLevel
clusterConfig.MemoryCommitLevel = cfg.MemoryCommitLevel
clusterConfig.StorageCommitLevel = cfg.StorageCommitLevel
clusterConfig.BlockedHostnames = cfg.BlockedHostnames
clusterConfig.DeploymentIngressStaticHosts = cfg.DeploymentIngressStaticHosts
clusterConfig.DeploymentIngressDomain = cfg.DeploymentIngressDomain
clusterConfig.ClusterSettings = cfg.ClusterSettings

cl, err := aclient.DiscoverQueryClient(ctx, cctx)
if err != nil {
cancel()
Expand All @@ -99,7 +87,7 @@ func NewService(ctx context.Context,
return nil, err
}

cluster, err := cluster.NewService(ctx, session, bus, cclient, waiter, clusterConfig)
cluster, err := cluster.NewService(ctx, session, bus, cclient, waiter, cfg.Config)
if err != nil {
cancel()
<-bc.lc.Done()
Expand Down
Loading