Skip to content

Commit

Permalink
Refactor predictkube scaler config (#6282)
Browse files Browse the repository at this point in the history
Signed-off-by: wangrushen <[email protected]>
  • Loading branch information
dovics authored Nov 3, 2024
1 parent 5062262 commit a825451
Showing 1 changed file with 55 additions and 108 deletions.
163 changes: 55 additions & 108 deletions pkg/scalers/predictkube_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,18 +83,51 @@ type PredictKubeScaler struct {
}

type predictKubeMetadata struct {
predictHorizon time.Duration
historyTimeWindow time.Duration
stepDuration time.Duration
apiKey string
prometheusAddress string
prometheusAuth *authentication.AuthMeta
query string
threshold float64
activationThreshold float64
triggerIndex int
PrometheusAddress string `keda:"name=prometheusAddress, order=triggerMetadata"`
PrometheusAuth *authentication.Config `keda:"optional"`
Query string `keda:"name=query, order=triggerMetadata"`
PredictHorizon string `keda:"name=predictHorizon, order=triggerMetadata"`
QueryStep string `keda:"name=queryStep, order=triggerMetadata"`
HistoryTimeWindow string `keda:"name=historyTimeWindow, order=triggerMetadata"`
APIKey string `keda:"name=apiKey, order=authParams"`
Threshold float64 `keda:"name=threshold, order=triggerMetadata, optional"`
ActivationThreshold float64 `keda:"name=activationThreshold, order=triggerMetadata, optional"`

predictHorizon time.Duration
historyTimeWindow time.Duration
stepDuration time.Duration
triggerIndex int
}

func (p *predictKubeMetadata) Validate() error {
validate := validator.New()
err := validate.Var(p.PrometheusAddress, "url")
if err != nil {
return fmt.Errorf("invalid prometheusAddress")
}

p.predictHorizon, err = str2duration.ParseDuration(p.PredictHorizon)
if err != nil {
return fmt.Errorf("predictHorizon parsing error %w", err)
}

p.stepDuration, err = str2duration.ParseDuration(p.QueryStep)
if err != nil {
return fmt.Errorf("queryStep parsing error %w", err)
}

p.historyTimeWindow, err = str2duration.ParseDuration(p.HistoryTimeWindow)
if err != nil {
return fmt.Errorf("historyTimeWindow parsing error %w", err)
}

err = validate.Var(p.APIKey, "jwt")
if err != nil {
return fmt.Errorf("invalid apiKey")
}

return nil
}
func (s *PredictKubeScaler) setupClientConn() error {
clientOpt, err := pc.SetGrpcClientOptions(grpcConf,
&libs.Base{
Expand All @@ -108,7 +141,7 @@ func (s *PredictKubeScaler) setupClientConn() error {
Enabled: false,
},
},
pc.InjectPublicClientMetadataInterceptor(s.metadata.apiKey),
pc.InjectPublicClientMetadataInterceptor(s.metadata.APIKey),
)

if !grpcConf.Conn.Insecure {
Expand Down Expand Up @@ -186,7 +219,7 @@ func (s *PredictKubeScaler) GetMetricSpecForScaling(context.Context) []v2.Metric
Metric: v2.MetricIdentifier{
Name: GenerateMetricNameWithIndex(s.metadata.triggerIndex, metricName),
},
Target: GetMetricTargetMili(s.metricType, s.metadata.threshold),
Target: GetMetricTargetMili(s.metricType, s.metadata.Threshold),
}

metricSpec := v2.MetricSpec{
Expand All @@ -211,7 +244,7 @@ func (s *PredictKubeScaler) GetMetricsAndActivity(ctx context.Context, metricNam

metric := GenerateMetricInMili(metricName, value)

return []external_metrics.ExternalMetricValue{metric}, activationValue > s.metadata.activationThreshold, nil
return []external_metrics.ExternalMetricValue{metric}, activationValue > s.metadata.ActivationThreshold, nil
}

func (s *PredictKubeScaler) doPredictRequest(ctx context.Context) (float64, float64, error) {
Expand Down Expand Up @@ -257,7 +290,7 @@ func (s *PredictKubeScaler) doQuery(ctx context.Context) ([]*commonproto.Item, e
Step: s.metadata.stepDuration,
}

val, warns, err := s.api.QueryRange(ctx, s.metadata.query, r)
val, warns, err := s.api.QueryRange(ctx, s.metadata.Query, r)

if len(warns) > 0 {
s.logger.V(1).Info("warnings", warns)
Expand Down Expand Up @@ -345,103 +378,17 @@ func (s *PredictKubeScaler) parsePrometheusResult(result model.Value) (out []*co
}

func parsePredictKubeMetadata(config *scalersconfig.ScalerConfig) (result *predictKubeMetadata, err error) {
validate := validator.New()
meta := predictKubeMetadata{}

if val, ok := config.TriggerMetadata["query"]; ok {
if len(val) == 0 {
return nil, fmt.Errorf("no query given")
}

meta.query = val
} else {
return nil, fmt.Errorf("no query given")
}

if val, ok := config.TriggerMetadata["prometheusAddress"]; ok {
err = validate.Var(val, "url")
if err != nil {
return nil, fmt.Errorf("invalid prometheusAddress")
}

meta.prometheusAddress = val
} else {
return nil, fmt.Errorf("no prometheusAddress given")
}

if val, ok := config.TriggerMetadata["predictHorizon"]; ok {
predictHorizon, err := str2duration.ParseDuration(val)
if err != nil {
return nil, fmt.Errorf("predictHorizon parsing error %w", err)
}
meta.predictHorizon = predictHorizon
} else {
return nil, fmt.Errorf("no predictHorizon given")
}

if val, ok := config.TriggerMetadata["queryStep"]; ok {
stepDuration, err := str2duration.ParseDuration(val)
if err != nil {
return nil, fmt.Errorf("queryStep parsing error %w", err)
}
meta.stepDuration = stepDuration
} else {
return nil, fmt.Errorf("no queryStep given")
}

if val, ok := config.TriggerMetadata["historyTimeWindow"]; ok {
historyTimeWindow, err := str2duration.ParseDuration(val)
if err != nil {
return nil, fmt.Errorf("historyTimeWindow parsing error %w", err)
}
meta.historyTimeWindow = historyTimeWindow
} else {
return nil, fmt.Errorf("no historyTimeWindow given")
}

if val, ok := config.TriggerMetadata["threshold"]; ok {
threshold, err := strconv.ParseFloat(val, 64)
if err != nil {
return nil, fmt.Errorf("threshold parsing error %w", err)
}
meta.threshold = threshold
} else {
if config.AsMetricSource {
meta.threshold = 0
} else {
return nil, fmt.Errorf("no threshold given")
}
meta := &predictKubeMetadata{}
if err := config.TypedConfig(meta); err != nil {
return nil, fmt.Errorf("error parsing arango metadata: %w", err)
}

meta.activationThreshold = 0
if val, ok := config.TriggerMetadata["activationThreshold"]; ok {
activationThreshold, err := strconv.ParseFloat(val, 64)
if err != nil {
return nil, fmt.Errorf("activationThreshold parsing error %w", err)
}
meta.activationThreshold = activationThreshold
if !config.AsMetricSource && meta.Threshold == 0 {
return nil, fmt.Errorf("no threshold given")
}

meta.triggerIndex = config.TriggerIndex

if val, ok := config.AuthParams["apiKey"]; ok {
err = validate.Var(val, "jwt")
if err != nil {
return nil, fmt.Errorf("invalid apiKey")
}

meta.apiKey = val
} else {
return nil, fmt.Errorf("no api key given")
}

// parse auth configs from ScalerConfig
auth, err := authentication.GetAuthConfigs(config.TriggerMetadata, config.AuthParams)
if err != nil {
return nil, err
}
meta.prometheusAuth = auth
return &meta, nil
return meta, nil
}

func (s *PredictKubeScaler) ping(ctx context.Context) (err error) {
Expand All @@ -454,14 +401,14 @@ func (s *PredictKubeScaler) initPredictKubePrometheusConn(ctx context.Context) (
// create http.RoundTripper with auth settings from ScalerConfig
roundTripper, err := authentication.CreateHTTPRoundTripper(
authentication.FastHTTP,
s.metadata.prometheusAuth,
s.metadata.PrometheusAuth.ToAuthMeta(),
)
if err != nil {
s.logger.V(1).Error(err, "init Prometheus client http transport")
return err
}
client, err := api.NewClient(api.Config{
Address: s.metadata.prometheusAddress,
Address: s.metadata.PrometheusAddress,
RoundTripper: roundTripper,
})
if err != nil {
Expand Down

0 comments on commit a825451

Please sign in to comment.