Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes Kubernetes Service using expired credentials #50074

Merged
merged 1 commit into from
Dec 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 49 additions & 10 deletions lib/cloud/clients.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,9 @@ type awsOptions struct {

// maxRetries is the maximum number of retries to use for the session.
maxRetries *int

// withoutSessionCache disables the session cache for the AWS session.
withoutSessionCache bool
}

func (a *awsOptions) checkAndSetDefaults() error {
Expand Down Expand Up @@ -421,6 +424,13 @@ func WithAssumeRole(roleARN, externalID string) AWSOptionsFn {
}
}

// WithoutSessionCache disables the session cache for the AWS session.
func WithoutSessionCache() AWSOptionsFn {
return func(options *awsOptions) {
options.withoutSessionCache = true
}
}

// WithAssumeRoleFromAWSMeta extracts options needed from AWS metadata for
// assuming an AWS role.
func WithAssumeRoleFromAWSMeta(meta types.AWS) AWSOptionsFn {
Expand Down Expand Up @@ -487,7 +497,7 @@ func (c *cloudClients) GetAWSSession(ctx context.Context, region string, opts ..
}
var err error
if options.baseSession == nil {
options.baseSession, err = c.getAWSSessionForRegion(region, options)
options.baseSession, err = c.getAWSSessionForRegion(ctx, region, options)
if err != nil {
return nil, trace.Wrap(err)
}
Expand Down Expand Up @@ -767,17 +777,12 @@ func awsAmbientSessionProvider(ctx context.Context, region string) (*awssession.
}

// getAWSSessionForRegion returns AWS session for the specified region.
func (c *cloudClients) getAWSSessionForRegion(region string, opts awsOptions) (*awssession.Session, error) {
func (c *cloudClients) getAWSSessionForRegion(ctx context.Context, region string, opts awsOptions) (*awssession.Session, error) {
if err := opts.checkAndSetDefaults(); err != nil {
return nil, trace.Wrap(err)
}

cacheKey := awsSessionCacheKey{
region: region,
integration: opts.integration,
}

sess, err := utils.FnCacheGet(context.Background(), c.awsSessionsCache, cacheKey, func(ctx context.Context) (*awssession.Session, error) {
createSession := func(ctx context.Context) (*awssession.Session, error) {
if opts.credentialsSource == credentialsSourceIntegration {
if c.awsIntegrationSessionProviderFn == nil {
return nil, trace.BadParameter("missing aws integration session provider")
Expand All @@ -791,6 +796,30 @@ func (c *cloudClients) getAWSSessionForRegion(region string, opts awsOptions) (*
logrus.Debugf("Initializing AWS session for region %v using environment credentials.", region)
session, err := awsAmbientSessionProvider(ctx, region)
return session, trace.Wrap(err)
}

if opts.withoutSessionCache {
sess, err := createSession(ctx)
if err != nil {
return nil, trace.Wrap(err)
}
if opts.customRetryer != nil || opts.maxRetries != nil {
return sess.Copy(&aws.Config{
Retryer: opts.customRetryer,
MaxRetries: opts.maxRetries,
}), nil
}
return sess, trace.Wrap(err)
}

cacheKey := awsSessionCacheKey{
region: region,
integration: opts.integration,
}

sess, err := utils.FnCacheGet(ctx, c.awsSessionsCache, cacheKey, func(ctx context.Context) (*awssession.Session, error) {
session, err := createSession(ctx)
return session, trace.Wrap(err)
})
if err != nil {
return nil, trace.Wrap(err)
Expand All @@ -810,15 +839,25 @@ func (c *cloudClients) getAWSSessionForRole(ctx context.Context, region string,
return nil, trace.Wrap(err)
}

createSession := func(ctx context.Context) (*awssession.Session, error) {
stsClient := sts.New(options.baseSession)
return newSessionWithRole(ctx, stsClient, region, options.assumeRoleARN, options.assumeRoleExternalID)
}

if options.withoutSessionCache {
session, err := createSession(ctx)
return session, trace.Wrap(err)
}

cacheKey := awsSessionCacheKey{
region: region,
integration: options.integration,
roleARN: options.assumeRoleARN,
externalID: options.assumeRoleExternalID,
}
return utils.FnCacheGet(ctx, c.awsSessionsCache, cacheKey, func(ctx context.Context) (*awssession.Session, error) {
stsClient := sts.New(options.baseSession)
return newSessionWithRole(ctx, stsClient, region, options.assumeRoleARN, options.assumeRoleExternalID)
session, err := createSession(ctx)
return session, trace.Wrap(err)
})
}

Expand Down
9 changes: 4 additions & 5 deletions lib/kube/proxy/cluster_details.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,11 +131,9 @@ func newClusterDetails(ctx context.Context, cfg clusterDetailsConfig) (_ *kubeDe
go dynLabels.Start()
}

kubeClient := creds.getKubeClient()

var isClusterOffline bool
// Create the codec factory and the list of supported types for RBAC.
codecFactory, rbacSupportedTypes, gvkSupportedRes, err := newClusterSchemaBuilder(cfg.log, kubeClient)
codecFactory, rbacSupportedTypes, gvkSupportedRes, err := newClusterSchemaBuilder(cfg.log, creds.getKubeClient())
if err != nil {
cfg.log.WithError(err).Warn("Failed to create cluster schema. Possibly the cluster is offline.")
// If the cluster is offline, we will not be able to create the codec factory
Expand All @@ -145,7 +143,7 @@ func newClusterDetails(ctx context.Context, cfg clusterDetailsConfig) (_ *kubeDe
isClusterOffline = true
}

kubeVersion, err := kubeClient.Discovery().ServerVersion()
kubeVersion, err := creds.getKubeClient().Discovery().ServerVersion()
if err != nil {
cfg.log.WithError(err).Warn("Failed to get Kubernetes cluster version. Possibly the cluster is offline.")
}
Expand Down Expand Up @@ -204,7 +202,7 @@ func newClusterDetails(ctx context.Context, cfg clusterDetailsConfig) (_ *kubeDe
continue
}

kubeVersion, err := kubeClient.Discovery().ServerVersion()
kubeVersion, err := creds.getKubeClient().Discovery().ServerVersion()
if err != nil {
cfg.log.WithError(err).Warn("Failed to get Kubernetes cluster version. Possibly the cluster is offline.")
}
Expand Down Expand Up @@ -342,6 +340,7 @@ func getAWSClientRestConfig(cloudClients cloud.Clients, clock clockwork.Clock, r
region := cluster.GetAWSConfig().Region
opts := []cloud.AWSOptionsFn{
cloud.WithAmbientCredentials(),
cloud.WithoutSessionCache(),
}
if awsAssume := getAWSResourceMatcherToCluster(cluster, resourceMatchers); awsAssume != nil {
opts = append(opts, cloud.WithAssumeRole(awsAssume.AssumeRoleARN, awsAssume.ExternalID))
Expand Down
Loading