Skip to content

Commit

Permalink
fix race condition (#52)
Browse files Browse the repository at this point in the history
* requeue in scaleUpWait instead of requeueDuration on initial check

* ignore incorrect instance state in attach

* set shouldRequeue to false on Failed

* fix race condition in cordon and healing

- cordon removed nodes
- reattach terminated nodes
  • Loading branch information
hyang200 authored Oct 16, 2022
1 parent 774be96 commit eaf643c
Show file tree
Hide file tree
Showing 8 changed files with 976 additions and 22 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ require (
github.com/cenkalti/backoff/v4 v4.1.0
github.com/go-logr/logr v0.4.0
github.com/google/uuid v1.1.5
github.com/hashicorp/go-version v1.4.0
github.com/logrusorgru/aurora v2.0.3+incompatible
github.com/mattn/go-isatty v0.0.12
github.com/operator-framework/operator-lib v0.9.0
Expand All @@ -24,7 +25,6 @@ require (
k8s.io/client-go v0.22.6
k8s.io/klog v1.0.0
sigs.k8s.io/controller-runtime v0.10.0
github.com/hashicorp/go-version v1.4.0
)

require (
Expand Down
928 changes: 928 additions & 0 deletions go.sum

Large diffs are not rendered by default.

26 changes: 18 additions & 8 deletions pkg/cloudprovider/aws/aws.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,17 +39,27 @@ func instanceIDToProviderID(instanceID, availabilityZone string) (string, error)
return fmt.Sprintf("aws:///%s/%s", availabilityZone, instanceID), nil
}

func verifyIfErrorOccured(apiErr error, expectedMessage string) (bool, error) {
if awsErr, ok := apiErr.(awserr.Error); ok {
// process SDK error: Unfortunately there's no generic ValidationError in the SDK and no FailedAttach/FailedDetach error. Check manually
if awsErr.Code() == validationErrorCode && strings.Contains(awsErr.Message(), expectedMessage) {
return true, apiErr
func verifyIfErrorOccurred(apiErr error, expectedMessage ...string) (bool, error) {
for _, msg := range expectedMessage {
if awsErr, ok := apiErr.(awserr.Error); ok {
// process SDK error: Unfortunately there's no generic ValidationError in the SDK and no FailedAttach/FailedDetach error. Check manually
if awsErr.Code() == validationErrorCode && strings.Contains(awsErr.Message(), msg) {
return true, apiErr
}
}
}

return false, apiErr
}

func verifyIfErrorOccurredWithDefaults(apiErr error, expectedMessage string) (bool, error) {
skip_errs := []string{
// default errors we wanted to skip
"is not in correct state",
expectedMessage,
}
return verifyIfErrorOccurred(apiErr, skip_errs...)
}

type provider struct {
autoScalingService *autoscaling.AutoScaling
ec2Service *ec2.EC2
Expand Down Expand Up @@ -267,7 +277,7 @@ func (a *autoscalingGroups) DetachInstance(providerID string) (alreadyDetaching
ShouldDecrementDesiredCapacity: aws.Bool(false),
})

return verifyIfErrorOccured(apiErr, alreadyDetachingMessage)
return verifyIfErrorOccurred(apiErr, alreadyDetachingMessage)
}

// AttachInstance attaches the instance to the Autoscaling group
Expand All @@ -287,7 +297,7 @@ func (a *autoscalingGroups) AttachInstance(providerID, nodeGroup string) (alread
InstanceIds: aws.StringSlice([]string{instanceID}),
})

return verifyIfErrorOccured(apiErr, alreadyAttachedMessage)
return verifyIfErrorOccurredWithDefaults(apiErr, alreadyAttachedMessage)
}

func (a *autoscalingGroups) instanceOutOfDate(instance *autoscaling.Instance) bool {
Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/cyclenoderequest/transitioner/checks.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ func (t *CycleNodeRequestTransitioner) sendPreTerminationTrigger(node v1.CycleNo
}

// Send the trigger, disregard the response body
statusCode, _, err := t.makeRequest(http.MethodPost, httpClient, endpoint)
statusCode, res, err := t.makeRequest(http.MethodPost, httpClient, endpoint)
if err != nil {
return fmt.Errorf("sending trigger failed: %v", err)
}
Expand All @@ -360,7 +360,7 @@ func (t *CycleNodeRequestTransitioner) sendPreTerminationTrigger(node v1.CycleNo
}

if !statusCodeFound {
return fmt.Errorf("got unexpected status code after sending trigger: %d", statusCode)
return fmt.Errorf("got unexpected status code after sending trigger: %d, resp: %s", statusCode, res)
}

now := metav1.Now()
Expand Down
32 changes: 23 additions & 9 deletions pkg/controller/cyclenoderequest/transitioner/transitions.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ import (
"github.com/atlassian-labs/cyclops/pkg/k8s"
"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"k8s.io/apimachinery/pkg/util/validation"
"k8s.io/client-go/util/retry"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
Expand Down Expand Up @@ -55,9 +57,9 @@ func (t *CycleNodeRequestTransitioner) transitionUndefined() (reconcile.Result,

// transitionPending transitions any CycleNodeRequests in the pending phase to the initialised phase
// Does the following:
// 1. fetches the current nodes by the label selector, and saves them as nodes to be terminated
// 2. describes the node group and checks that the number of instances in the node group matches the number we
// are planning on terminating
// 1. fetches the current nodes by the label selector, and saves them as nodes to be terminated
// 2. describes the node group and checks that the number of instances in the node group matches the number we
// are planning on terminating
func (t *CycleNodeRequestTransitioner) transitionPending() (reconcile.Result, error) {
// Fetch the node names for the cycleNodeRequest, using the label selector provided
t.rm.LogEvent(t.cycleNodeRequest, "SelectingNodes", "Selecting nodes with label selector")
Expand Down Expand Up @@ -303,8 +305,8 @@ func (t *CycleNodeRequestTransitioner) transitionScalingUp() (reconcile.Result,

// Check we have waited long enough - give the node some time to start up
if time.Since(scaleUpStarted.Time) <= scaleUpWait {
t.rm.LogEvent(t.cycleNodeRequest, "ScalingUpWaiting", "Waiting for new nodes to be ready")
return reconcile.Result{Requeue: true, RequeueAfter: requeueDuration}, nil
t.rm.LogEvent(t.cycleNodeRequest, "ScalingUpWaiting", "Waiting for new nodes to be warmed up")
return reconcile.Result{Requeue: true, RequeueAfter: scaleUpWait}, nil
}

nodeGroups, err := t.rm.CloudProvider.GetNodeGroups(t.cycleNodeRequest.GetNodeGroupNames())
Expand Down Expand Up @@ -374,7 +376,8 @@ func (t *CycleNodeRequestTransitioner) transitionScalingUp() (reconcile.Result,
for i, node := range t.cycleNodeRequest.Status.CurrentNodes {
if nodeToRemove.Name == node.Name {
t.rm.LogEvent(t.cycleNodeRequest, "RaceCondition", "Node %v was prematurely terminated.", node.Name)
t.cycleNodeRequest.Status.CurrentNodes = append(t.cycleNodeRequest.Status.CurrentNodes[:i], t.cycleNodeRequest.Status.CurrentNodes[i+1:]...)
t.cycleNodeRequest.Status.CurrentNodes = append(t.cycleNodeRequest.Status.CurrentNodes[:i],
t.cycleNodeRequest.Status.CurrentNodes[i+1:]...)
break
}
}
Expand Down Expand Up @@ -415,6 +418,10 @@ func (t *CycleNodeRequestTransitioner) transitionCordoning() (reconcile.Result,
for _, node := range t.cycleNodeRequest.Status.CurrentNodes {
// If the node is not already cordoned, cordon it
cordoned, err := k8s.IsCordoned(node.Name, t.rm.RawClient)
// Skip handling the node if it doesn't exist
if apierrors.IsNotFound(err) {
continue
}
if err != nil {
t.rm.Logger.Error(err, "failed to check if node is cordoned", "nodeName", node.Name)
return t.transitionToHealing(err)
Expand All @@ -431,14 +438,18 @@ func (t *CycleNodeRequestTransitioner) transitionCordoning() (reconcile.Result,
// Try to send the trigger, if is has already been sent then this will
// be skipped in the function. The trigger must only be sent once
if err := t.sendPreTerminationTrigger(node); err != nil {
return t.transitionToHealing(errors.Wrapf(err, "failed to send pre-termination trigger, %s is still cordononed", node.Name))
t.rm.LogEvent(t.cycleNodeRequest,
"PreTerminationTriggerFailed", "failed to send pre-termination trigger to %s, err: %v", node.Name, err)
return t.transitionToHealing(errors.Wrapf(err, "failed to send pre-termination trigger to %s", node.Name))
}

// After the trigger has been sent, perform health checks to monitor if the node
// can be terminated. If all checks pass then it can be terminated.
allHealthChecksPassed, err := t.performPreTerminationHealthChecks(node)
if err != nil {
return t.transitionToHealing(errors.Wrapf(err, "failed to perform pre-termination health checks, %s is still cordononed", node.Name))
t.rm.LogEvent(t.cycleNodeRequest, "PreTerminationHealChecks",
"failed to perform pre-termination health checks for %v, err: %v", node.Name, err)
return t.transitionToHealing(errors.Wrapf(err, "failed to perform pre-termination health checks for %s", node.Name))
}

// If not all health checks have passed, it is not ready for termination yet
Expand Down Expand Up @@ -499,7 +510,7 @@ func (t *CycleNodeRequestTransitioner) transitionWaitingTermination() (reconcile
return t.transitionObject(desiredPhase)
}

// transitionFailed handles failed CycleNodeRequests
// transitionHealing handles healing CycleNodeRequests
func (t *CycleNodeRequestTransitioner) transitionHealing() (reconcile.Result, error) {
nodeGroups, err := t.rm.CloudProvider.GetNodeGroups(t.cycleNodeRequest.GetNodeGroupNames())
if err != nil {
Expand All @@ -511,6 +522,9 @@ func (t *CycleNodeRequestTransitioner) transitionHealing() (reconcile.Result, er
t.rm.LogEvent(t.cycleNodeRequest, "AttachingNodes", "Attaching instances to nodes group: %v", node.Name)
alreadyAttached, err := nodeGroups.AttachInstance(node.ProviderID, node.NodeGroupName)
if alreadyAttached {
t.rm.LogEvent(t.cycleNodeRequest,
"AttachingNodes", "Skip re-attaching instances to nodes group: %v, err: %v",
node.Name, err)
continue
}
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/cyclenoderequest/transitioner/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ func (t *CycleNodeRequestTransitioner) finalReapChildren() (shouldRequeue bool,
}

switch t.cycleNodeRequest.Status.Phase {
case v1.CycleNodeRequestInitialised:
case v1.CycleNodeRequestInitialised, v1.CycleNodeRequestFailed:
if t.cycleNodeRequest.Status.ActiveChildren == 0 {
// No more work to be done, stop processing this request
return false, nil
Expand Down
3 changes: 2 additions & 1 deletion pkg/controller/cyclenodestatus/transitioner/transitions.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ func (t *CycleNodeStatusTransitioner) transitionUndefined() (reconcile.Result, e
// RemovingLabelsFromPods phase based on the .Spec.Method provided.
// Gets the requested node from the cloud provider and from kube and performs sanity checks. Depending on these checks
// the CycleNodeStatus may go straight to Failed or Successful.
// If the node has problems then it will transition straight to Failed.
//
// If the node has problems then it will transition straight to Failed.
func (t *CycleNodeStatusTransitioner) transitionPending() (reconcile.Result, error) {
t.rm.LogEvent(t.cycleNodeStatus, "FetchingNode", "Fetching information about node: %v", t.cycleNodeStatus.Spec.NodeName)
node, err := t.rm.GetNode(t.cycleNodeStatus.Spec.NodeName)
Expand Down
1 change: 1 addition & 0 deletions pkg/k8s/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
Expand Down

0 comments on commit eaf643c

Please sign in to comment.