Skip to content

Commit

Permalink
[Feature] add retries and backoffs for propeller sending events to ad…
Browse files Browse the repository at this point in the history
…min (#5166)

* add retries and backoffs for propeller sending events to admin

Signed-off-by: Paul Dittamo <[email protected]>

* update docs

Signed-off-by: Paul Dittamo <[email protected]>

* update retry types

Signed-off-by: Paul Dittamo <[email protected]>

* support expontential backoffs w/ jitter for catalog connection

Signed-off-by: Paul Dittamo <[email protected]>

---------

Signed-off-by: Paul Dittamo <[email protected]>
  • Loading branch information
pvditt authored Apr 18, 2024
1 parent c899586 commit 0095165
Show file tree
Hide file tree
Showing 9 changed files with 247 additions and 23 deletions.
72 changes: 72 additions & 0 deletions docs/deployment/configuration/generated/flytepropeller_config.rst
Original file line number Diff line number Diff line change
Expand Up @@ -608,6 +608,42 @@ Use the same gRPC credentials option as the flyteadmin client
"false"
max-retries (int)
------------------------------------------------------------------------------------------------------------------------

The max number of retries for event recording.

**Default Value**:

.. code-block:: yaml
"5"
base-scalar (int)
------------------------------------------------------------------------------------------------------------------------

The base/scalar backoff duration in milliseconds for event recording retries.

**Default Value**:

.. code-block:: yaml
"100"
backoff-jitter (string)
------------------------------------------------------------------------------------------------------------------------

A string representation of a floating point number between 0 and 1 specifying the jitter factor for event recording retries.

**Default Value**:

.. code-block:: yaml
"0.1"
default-service-config (string)
------------------------------------------------------------------------------------------------------------------------

Expand Down Expand Up @@ -671,6 +707,42 @@ The max bucket size for event recording tokens.
"1000"
max-retries (int)
------------------------------------------------------------------------------------------------------------------------

The max number of retries for event recording.

**Default Value**:

.. code-block:: yaml
"5"
base-scalar (int)
------------------------------------------------------------------------------------------------------------------------

The base/scalar backoff duration in milliseconds for event recording retries.

**Default Value**:

.. code-block:: yaml
"100"
backoff-jitter (string)
------------------------------------------------------------------------------------------------------------------------

A string representation of a floating point number between 0 and 1 specifying the jitter factor for event recording retries.

**Default Value**:

.. code-block:: yaml
"0.1"
Section: logger
========================================================================================================================

Expand Down
16 changes: 13 additions & 3 deletions flytepropeller/events/admin_eventsink.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ package events
import (
"context"
"fmt"
"time"

"github.com/golang/protobuf/proto"
grpcRetry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"go.opentelemetry.io/otel/propagation"
"golang.org/x/time/rate"
Expand Down Expand Up @@ -128,15 +130,23 @@ func IDFromMessage(message proto.Message) ([]byte, error) {
return []byte(id), nil
}

func initializeAdminClientFromConfig(ctx context.Context) (client service.AdminServiceClient, err error) {
func initializeAdminClientFromConfig(ctx context.Context, config *Config) (client service.AdminServiceClient, err error) {
cfg := admin2.GetConfig(ctx)
tracerProvider := otelutils.GetTracerProvider(otelutils.AdminClientTracer)
opt := grpc.WithUnaryInterceptor(

grpcOptions := []grpcRetry.CallOption{
grpcRetry.WithBackoff(grpcRetry.BackoffExponentialWithJitter(time.Duration(config.BackoffScalar)*time.Millisecond, config.GetBackoffJitter(ctx))),
grpcRetry.WithMax(uint(config.MaxRetries)),
}

opt := grpc.WithChainUnaryInterceptor(
otelgrpc.UnaryClientInterceptor(
otelgrpc.WithTracerProvider(tracerProvider),
otelgrpc.WithPropagators(propagation.TraceContext{}),
),
grpcRetry.UnaryClientInterceptor(grpcOptions...),
)

clients, err := admin2.NewClientsetBuilder().WithDialOptions(opt).WithConfig(cfg).Build(ctx)
if err != nil {
return nil, fmt.Errorf("failed to initialize clientset. Error: %w", err)
Expand All @@ -152,7 +162,7 @@ func ConstructEventSink(ctx context.Context, config *Config, scope promutils.Sco
case EventSinkFile:
return NewFileSink(config.FilePath)
case EventSinkAdmin:
adminClient, err := initializeAdminClientFromConfig(ctx)
adminClient, err := initializeAdminClientFromConfig(ctx, config)
if err != nil {
return nil, err
}
Expand Down
41 changes: 33 additions & 8 deletions flytepropeller/events/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package events

import (
"context"
"strconv"

"github.com/flyteorg/flyte/flytestdlib/config"
"github.com/flyteorg/flyte/flytestdlib/logger"
Expand All @@ -21,22 +22,46 @@ const (
)

type Config struct {
Type EventReportingType `json:"type" pflag:",Sets the type of EventSink to configure [log/admin/file]."`
FilePath string `json:"file-path" pflag:",For file types, specify where the file should be located."`
Rate int64 `json:"rate" pflag:",Max rate at which events can be recorded per second."`
Capacity int `json:"capacity" pflag:",The max bucket size for event recording tokens."`
Type EventReportingType `json:"type" pflag:",Sets the type of EventSink to configure [log/admin/file]."`
FilePath string `json:"file-path" pflag:",For file types, specify where the file should be located."`
Rate int64 `json:"rate" pflag:",Max rate at which events can be recorded per second."`
Capacity int `json:"capacity" pflag:",The max bucket size for event recording tokens."`
MaxRetries int `json:"max-retries" pflag:",The max number of retries for event recording."`
BackoffScalar int `json:"base-scalar" pflag:",The base/scalar backoff duration in milliseconds for event recording retries."`
BackoffJitter string `json:"backoff-jitter" pflag:",A string representation of a floating point number between 0 and 1 specifying the jitter factor for event recording retries."`
}

var (
defaultConfig = Config{
Rate: int64(500),
Capacity: 1000,
Type: EventSinkAdmin,
Rate: int64(500),
Capacity: 1000,
Type: EventSinkAdmin,
MaxRetries: 5,
BackoffScalar: 100,
BackoffJitter: "0.1",
}

configSection = config.MustRegisterSection(configSectionKey, &defaultConfig)
configSection = config.MustRegisterSectionWithUpdates(configSectionKey, &defaultConfig, func(ctx context.Context, newValue config.Config) {
if newValue.(*Config).MaxRetries < 0 {
logger.Panicf(ctx, "Admin configuration given with negative gRPC retry value.")
}

if jitter, err := strconv.ParseFloat(newValue.(*Config).BackoffJitter, 64); err != nil || jitter < 0 || jitter > 1 {
logger.Panicf(ctx, "Invalid jitter value [%v]. Must be between 0 and 1.", jitter)
}
})
)

func (c Config) GetBackoffJitter(ctx context.Context) float64 {
jitter, err := strconv.ParseFloat(c.BackoffJitter, 64)
if err != nil {
logger.Warnf(ctx, "Failed to parse backoff jitter [%v]. Error: %v", c.BackoffJitter, err)
return 0.1
}

return jitter
}

// GetConfig Retrieves current global config for storage.
func GetConfig(ctx context.Context) *Config {
if c, ok := configSection.GetConfig().(*Config); ok {
Expand Down
3 changes: 3 additions & 0 deletions flytepropeller/events/config_flags.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

42 changes: 42 additions & 0 deletions flytepropeller/events/config_flags_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

44 changes: 35 additions & 9 deletions flytepropeller/pkg/controller/nodes/catalog/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,37 @@ package catalog
import (
"context"
"fmt"
"strconv"

"google.golang.org/grpc"

"github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/catalog"
"github.com/flyteorg/flyte/flytepropeller/pkg/controller/nodes/catalog/datacatalog"
"github.com/flyteorg/flyte/flytestdlib/config"
"github.com/flyteorg/flyte/flytestdlib/logger"
)

//go:generate pflags Config --default-var defaultConfig

const ConfigSectionKey = "catalog-cache"

var (
defaultConfig = &Config{
Type: NoOpDiscoveryType,
defaultConfig = Config{
Type: NoOpDiscoveryType,
MaxRetries: 5,
BackoffScalar: 100,
BackoffJitter: "0.1",
}

configSection = config.MustRegisterSection(ConfigSectionKey, defaultConfig)
configSection = config.MustRegisterSectionWithUpdates(ConfigSectionKey, &defaultConfig, func(ctx context.Context, newValue config.Config) {
if newValue.(*Config).MaxRetries < 0 {
logger.Panicf(ctx, "Admin configuration given with negative gRPC retry value.")
}

if jitter, err := strconv.ParseFloat(newValue.(*Config).BackoffJitter, 64); err != nil || jitter < 0 || jitter > 1 {
logger.Panicf(ctx, "Invalid jitter value [%v]. Must be between 0 and 1.", jitter)
}
})
)

type DiscoveryType = string
Expand All @@ -31,11 +44,14 @@ const (
)

type Config struct {
Type DiscoveryType `json:"type" pflag:"\"noop\", Catalog Implementation to use"`
Endpoint string `json:"endpoint" pflag:"\"\", Endpoint for catalog service"`
Insecure bool `json:"insecure" pflag:"false, Use insecure grpc connection"`
MaxCacheAge config.Duration `json:"max-cache-age" pflag:", Cache entries past this age will incur cache miss. 0 means cache never expires"`
UseAdminAuth bool `json:"use-admin-auth" pflag:"false, Use the same gRPC credentials option as the flyteadmin client"`
Type DiscoveryType `json:"type" pflag:"\"noop\", Catalog Implementation to use"`
Endpoint string `json:"endpoint" pflag:"\"\", Endpoint for catalog service"`
Insecure bool `json:"insecure" pflag:"false, Use insecure grpc connection"`
MaxCacheAge config.Duration `json:"max-cache-age" pflag:", Cache entries past this age will incur cache miss. 0 means cache never expires"`
UseAdminAuth bool `json:"use-admin-auth" pflag:"false, Use the same gRPC credentials option as the flyteadmin client"`
MaxRetries int `json:"max-retries" pflag:",The max number of retries for event recording."`
BackoffScalar int `json:"base-scalar" pflag:",The base/scalar backoff duration in milliseconds for event recording retries."`
BackoffJitter string `json:"backoff-jitter" pflag:",A string representation of a floating point number between 0 and 1 specifying the jitter factor for event recording retries."`

// Set the gRPC service config formatted as a json string https://github.com/grpc/grpc/blob/master/doc/service_config.md
// eg. {"loadBalancingConfig": [{"round_robin":{}}], "methodConfig": [{"name":[{"service": "foo", "method": "bar"}, {"service": "baz"}], "timeout": "1.000000001s"}]}
Expand All @@ -44,6 +60,16 @@ type Config struct {
DefaultServiceConfig string `json:"default-service-config" pflag:"\"\", Set the default service config for the catalog gRPC client"`
}

func (c Config) GetBackoffJitter(ctx context.Context) float64 {
jitter, err := strconv.ParseFloat(c.BackoffJitter, 64)
if err != nil {
logger.Warnf(ctx, "Failed to parse backoff jitter [%v]. Error: %v", c.BackoffJitter, err)
return 0.1
}

return jitter
}

// GetConfig gets loaded config for Discovery
func GetConfig() *Config {
return configSection.GetConfig().(*Config)
Expand All @@ -56,7 +82,7 @@ func NewCatalogClient(ctx context.Context, authOpt ...grpc.DialOption) (catalog.
case DataCatalogType:
return datacatalog.NewDataCatalog(ctx, catalogConfig.Endpoint, catalogConfig.Insecure,
catalogConfig.MaxCacheAge.Duration, catalogConfig.UseAdminAuth, catalogConfig.DefaultServiceConfig,
authOpt...)
uint(catalogConfig.MaxRetries), catalogConfig.BackoffScalar, catalogConfig.GetBackoffJitter(ctx), authOpt...)
case NoOpDiscoveryType, "":
return NOOPCatalog{}, nil
}
Expand Down
3 changes: 3 additions & 0 deletions flytepropeller/pkg/controller/nodes/catalog/config_flags.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

42 changes: 42 additions & 0 deletions flytepropeller/pkg/controller/nodes/catalog/config_flags_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 0095165

Please sign in to comment.