From 7ca0529ef1330d2b6c98ac2f55df3973758b0256 Mon Sep 17 00:00:00 2001 From: Tiago Silva Date: Wed, 11 Dec 2024 16:58:48 +0000 Subject: [PATCH] Fixes Kubernetes Service using expired credentials The Kubernetes service occasionally fails to forward requests to EKS clusters or retrieve the cluster schema due to AWS rejecting the request with an "expired token" error. EKS access tokens are generated using STS presigned URLs, which include details such as the cluster, backend credentials, and assumed roles. By default, these tokens are valid for 15 minutes, and the Kubernetes service refreshes them every $(15 - 1) / 2 = 7\text{ }minutes$. However, our cloud SDK caches the underlying `aws.Session`, particularly those with assumed roles, for 15 minutes. This leads to a scenario where the token is refreshed a second time at approximately 14 minutes, close to the token's 15-minute validity. If the underlying credentials expire before the next token refresh, given that they were reused from the previous query and cached since then, it results in the Kubernetes Service considering the token valid (since it is a Base64-encoded presigned URL without knowledge about the credentials), but AWS EKS cluster rejects the request, treating the credentials as expired. This PR adds an option to disable cache for EKS STS token signing which results in creating a session per EKS cluster sign process. Bellow one can find the error message EKS returns. ``` 2024-12-09T17:00:15Z ERRO [KUBERNETE] Failed to update cluster schema error:[ ERROR REPORT: Original Error: *errors.StatusError the server has asked for the client to provide credentials Stack Trace: github.com/gravitational/teleport/lib/kube/proxy/scheme.go:140 github.com/gravitational/teleport/lib/kube/proxy.newClusterSchemaBuilder github.com/gravitational/teleport/lib/kube/proxy/cluster_details.go:193 github.com/gravitational/teleport/lib/kube/proxy.newClusterDetails.func1 runtime/asm_amd64.s:1695 runtime.goexit User Message: the server has asked for the client to provide credentials] pid:7.1 start_time:2024-12-09T17:00:15Z proxy/cluster_details.go:210 2024-12-09T17:00:24Z ERRO [KUBERNETE] Failed to update cluster schema error:[ ERROR REPORT: Original Error: *errors.StatusError the server has asked for the client to provide credentials Stack Trace: github.com/gravitational/teleport/lib/kube/proxy/scheme.go:140 github.com/gravitational/teleport/lib/kube/proxy.newClusterSchemaBuilder github.com/gravitational/teleport/lib/kube/proxy/cluster_details.go:193 github.com/gravitational/teleport/lib/kube/proxy.newClusterDetails.func1 runtime/asm_amd64.s:1695 runtime.goexit User Message: the server has asked for the client to provide credentials] pid:7.1 start_time:2024-12-09T17:00:24Z proxy/cluster_details.go:210 ``` Changelog: Fixes an intermittent EKS authentication failure when dealing with EKS auto-discovery. Signed-off-by: Tiago Silva --- lib/cloud/clients.go | 59 +++++++++++++++++++++++++------ lib/kube/proxy/cluster_details.go | 9 +++-- 2 files changed, 53 insertions(+), 15 deletions(-) diff --git a/lib/cloud/clients.go b/lib/cloud/clients.go index ac21c4c89c045..93bb27f90b246 100644 --- a/lib/cloud/clients.go +++ b/lib/cloud/clients.go @@ -390,6 +390,9 @@ type awsOptions struct { // maxRetries is the maximum number of retries to use for the session. maxRetries *int + + // withoutSessionCache disables the session cache for the AWS session. + withoutSessionCache bool } func (a *awsOptions) checkAndSetDefaults() error { @@ -421,6 +424,13 @@ func WithAssumeRole(roleARN, externalID string) AWSOptionsFn { } } +// WithoutSessionCache disables the session cache for the AWS session. +func WithoutSessionCache() AWSOptionsFn { + return func(options *awsOptions) { + options.withoutSessionCache = true + } +} + // WithAssumeRoleFromAWSMeta extracts options needed from AWS metadata for // assuming an AWS role. func WithAssumeRoleFromAWSMeta(meta types.AWS) AWSOptionsFn { @@ -487,7 +497,7 @@ func (c *cloudClients) GetAWSSession(ctx context.Context, region string, opts .. } var err error if options.baseSession == nil { - options.baseSession, err = c.getAWSSessionForRegion(region, options) + options.baseSession, err = c.getAWSSessionForRegion(ctx, region, options) if err != nil { return nil, trace.Wrap(err) } @@ -767,17 +777,12 @@ func awsAmbientSessionProvider(ctx context.Context, region string) (*awssession. } // getAWSSessionForRegion returns AWS session for the specified region. -func (c *cloudClients) getAWSSessionForRegion(region string, opts awsOptions) (*awssession.Session, error) { +func (c *cloudClients) getAWSSessionForRegion(ctx context.Context, region string, opts awsOptions) (*awssession.Session, error) { if err := opts.checkAndSetDefaults(); err != nil { return nil, trace.Wrap(err) } - cacheKey := awsSessionCacheKey{ - region: region, - integration: opts.integration, - } - - sess, err := utils.FnCacheGet(context.Background(), c.awsSessionsCache, cacheKey, func(ctx context.Context) (*awssession.Session, error) { + createSession := func(ctx context.Context) (*awssession.Session, error) { if opts.credentialsSource == credentialsSourceIntegration { if c.awsIntegrationSessionProviderFn == nil { return nil, trace.BadParameter("missing aws integration session provider") @@ -791,6 +796,30 @@ func (c *cloudClients) getAWSSessionForRegion(region string, opts awsOptions) (* logrus.Debugf("Initializing AWS session for region %v using environment credentials.", region) session, err := awsAmbientSessionProvider(ctx, region) return session, trace.Wrap(err) + } + + if opts.withoutSessionCache { + sess, err := createSession(ctx) + if err != nil { + return nil, trace.Wrap(err) + } + if opts.customRetryer != nil || opts.maxRetries != nil { + return sess.Copy(&aws.Config{ + Retryer: opts.customRetryer, + MaxRetries: opts.maxRetries, + }), nil + } + return sess, trace.Wrap(err) + } + + cacheKey := awsSessionCacheKey{ + region: region, + integration: opts.integration, + } + + sess, err := utils.FnCacheGet(ctx, c.awsSessionsCache, cacheKey, func(ctx context.Context) (*awssession.Session, error) { + session, err := createSession(ctx) + return session, trace.Wrap(err) }) if err != nil { return nil, trace.Wrap(err) @@ -810,6 +839,16 @@ func (c *cloudClients) getAWSSessionForRole(ctx context.Context, region string, return nil, trace.Wrap(err) } + createSession := func(ctx context.Context) (*awssession.Session, error) { + stsClient := sts.New(options.baseSession) + return newSessionWithRole(ctx, stsClient, region, options.assumeRoleARN, options.assumeRoleExternalID) + } + + if options.withoutSessionCache { + session, err := createSession(ctx) + return session, trace.Wrap(err) + } + cacheKey := awsSessionCacheKey{ region: region, integration: options.integration, @@ -817,8 +856,8 @@ func (c *cloudClients) getAWSSessionForRole(ctx context.Context, region string, externalID: options.assumeRoleExternalID, } return utils.FnCacheGet(ctx, c.awsSessionsCache, cacheKey, func(ctx context.Context) (*awssession.Session, error) { - stsClient := sts.New(options.baseSession) - return newSessionWithRole(ctx, stsClient, region, options.assumeRoleARN, options.assumeRoleExternalID) + session, err := createSession(ctx) + return session, trace.Wrap(err) }) } diff --git a/lib/kube/proxy/cluster_details.go b/lib/kube/proxy/cluster_details.go index b74e81f1b5ddb..c949494ac982d 100644 --- a/lib/kube/proxy/cluster_details.go +++ b/lib/kube/proxy/cluster_details.go @@ -131,11 +131,9 @@ func newClusterDetails(ctx context.Context, cfg clusterDetailsConfig) (_ *kubeDe go dynLabels.Start() } - kubeClient := creds.getKubeClient() - var isClusterOffline bool // Create the codec factory and the list of supported types for RBAC. - codecFactory, rbacSupportedTypes, gvkSupportedRes, err := newClusterSchemaBuilder(cfg.log, kubeClient) + codecFactory, rbacSupportedTypes, gvkSupportedRes, err := newClusterSchemaBuilder(cfg.log, creds.getKubeClient()) if err != nil { cfg.log.WithError(err).Warn("Failed to create cluster schema. Possibly the cluster is offline.") // If the cluster is offline, we will not be able to create the codec factory @@ -145,7 +143,7 @@ func newClusterDetails(ctx context.Context, cfg clusterDetailsConfig) (_ *kubeDe isClusterOffline = true } - kubeVersion, err := kubeClient.Discovery().ServerVersion() + kubeVersion, err := creds.getKubeClient().Discovery().ServerVersion() if err != nil { cfg.log.WithError(err).Warn("Failed to get Kubernetes cluster version. Possibly the cluster is offline.") } @@ -204,7 +202,7 @@ func newClusterDetails(ctx context.Context, cfg clusterDetailsConfig) (_ *kubeDe continue } - kubeVersion, err := kubeClient.Discovery().ServerVersion() + kubeVersion, err := creds.getKubeClient().Discovery().ServerVersion() if err != nil { cfg.log.WithError(err).Warn("Failed to get Kubernetes cluster version. Possibly the cluster is offline.") } @@ -342,6 +340,7 @@ func getAWSClientRestConfig(cloudClients cloud.Clients, clock clockwork.Clock, r region := cluster.GetAWSConfig().Region opts := []cloud.AWSOptionsFn{ cloud.WithAmbientCredentials(), + cloud.WithoutSessionCache(), } if awsAssume := getAWSResourceMatcherToCluster(cluster, resourceMatchers); awsAssume != nil { opts = append(opts, cloud.WithAssumeRole(awsAssume.AssumeRoleARN, awsAssume.ExternalID))