Skip to content

Commit

Permalink
Refactoring toolchaincluster controller
Browse files Browse the repository at this point in the history
Signed-off-by: Feny Mehta <[email protected]>

update

Signed-off-by: Feny Mehta <[email protected]>

refac

Signed-off-by: Feny Mehta <[email protected]>
  • Loading branch information
fbm3307 committed Jun 6, 2024
1 parent 98aad71 commit cdaafb5
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 61 deletions.
34 changes: 7 additions & 27 deletions controllers/toolchaincluster/healthchecker.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ import (
"context"
"strings"

"github.com/codeready-toolchain/api/api/v1alpha1"
toolchainv1alpha1 "github.com/codeready-toolchain/api/api/v1alpha1"
"github.com/go-logr/logr"
"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kubeclientset "k8s.io/client-go/kubernetes"
Expand All @@ -27,41 +27,21 @@ type HealthChecker struct {
logger logr.Logger
}

func (hc *HealthChecker) updateIndividualClusterStatus(ctx context.Context, toolchainCluster *toolchainv1alpha1.ToolchainCluster) error {

currentClusterStatus := hc.getClusterHealthStatus(ctx)

for index, currentCond := range currentClusterStatus.Conditions {
for _, previousCond := range toolchainCluster.Status.Conditions {
if currentCond.Type == previousCond.Type && currentCond.Status == previousCond.Status {
currentClusterStatus.Conditions[index].LastTransitionTime = previousCond.LastTransitionTime
}
}
}

toolchainCluster.Status = *currentClusterStatus
if err := hc.localClusterClient.Status().Update(ctx, toolchainCluster); err != nil {
return errors.Wrapf(err, "Failed to update the status of cluster %s", toolchainCluster.Name)
}
return nil
}

// getClusterHealthStatus gets the kubernetes cluster health status by requesting "/healthz"
func (hc *HealthChecker) getClusterHealthStatus(ctx context.Context) *toolchainv1alpha1.ToolchainClusterStatus {
clusterStatus := toolchainv1alpha1.ToolchainClusterStatus{}
func (hc *HealthChecker) getClusterHealthStatus(ctx context.Context) []v1alpha1.Condition {
conditions := []v1alpha1.Condition{}
body, err := hc.remoteClusterClientset.DiscoveryClient.RESTClient().Get().AbsPath("/healthz").Do(ctx).Raw()
if err != nil {
hc.logger.Error(err, "Failed to do cluster health check for a ToolchainCluster")
clusterStatus.Conditions = append(clusterStatus.Conditions, clusterOfflineCondition())
conditions = append(conditions, clusterOfflineCondition())
} else {
if !strings.EqualFold(string(body), "ok") {
clusterStatus.Conditions = append(clusterStatus.Conditions, clusterNotReadyCondition(), clusterNotOfflineCondition())
conditions = append(conditions, clusterNotReadyCondition(), clusterNotOfflineCondition())
} else {
clusterStatus.Conditions = append(clusterStatus.Conditions, clusterReadyCondition())
conditions = append(conditions, clusterReadyCondition())
}
}

return &clusterStatus
return conditions
}

func clusterReadyCondition() toolchainv1alpha1.Condition {
Expand Down
40 changes: 19 additions & 21 deletions controllers/toolchaincluster/healthchecker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,11 @@ import (
"testing"

toolchainv1alpha1 "github.com/codeready-toolchain/api/api/v1alpha1"
"github.com/codeready-toolchain/toolchain-common/pkg/cluster"
"github.com/codeready-toolchain/toolchain-common/pkg/test"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gopkg.in/h2non/gock.v1"
corev1 "k8s.io/api/core/v1"
kubeclientset "k8s.io/client-go/kubernetes"
"sigs.k8s.io/controller-runtime/pkg/client"
logf "sigs.k8s.io/controller-runtime/pkg/log"
)
Expand All @@ -22,7 +20,7 @@ func TestClusterHealthChecks(t *testing.T) {

// given
defer gock.Off()
tcNs := "test-namespace"
//tcNs := "test-namespace"
gock.New("http://cluster.com").
Get("healthz").
Persist().
Expand Down Expand Up @@ -97,28 +95,28 @@ func TestClusterHealthChecks(t *testing.T) {
status: withStatus(offline()),
},
}
for k, tc := range tests {
for k, _ := range tests {
t.Run(k, func(t *testing.T) {
tctype, sec := newToolchainCluster(tc.tctype, tcNs, tc.apiendpoint, tc.status)
cl := test.NewFakeClient(t, tctype, sec)
reset := setupCachedClusters(t, cl, tctype)
defer reset()
cachedtc, found := cluster.GetCachedToolchainCluster(tctype.Name)
require.True(t, found)
cacheclient, err := kubeclientset.NewForConfig(cachedtc.RestConfig)
require.NoError(t, err)
healthChecker := &HealthChecker{
localClusterClient: cl,
remoteClusterClient: cachedtc.Client,
remoteClusterClientset: cacheclient,
logger: logger,
}
// tctype, sec := newToolchainCluster(tc.tctype, tcNs, tc.apiendpoint, tc.status)
// cl := test.NewFakeClient(t, tctype, sec)
// reset := setupCachedClusters(t, cl, tctype)
// defer reset()
// cachedtc, found := cluster.GetCachedToolchainCluster(tctype.Name)
// require.True(t, found)
// cacheclient, err := kubeclientset.NewForConfig(cachedtc.RestConfig)
// require.NoError(t, err)
// healthChecker := &HealthChecker{
// localClusterClient: cl,
// remoteClusterClient: cachedtc.Client,
// remoteClusterClientset: cacheclient,
// logger: logger,
// }
// when
err = healthChecker.updateIndividualClusterStatus(context.TODO(), tctype)
//err = healthChecker.updateIndividualClusterStatus(context.TODO(), tctype)

//then
require.NoError(t, err)
assertClusterStatus(t, cl, tc.tctype, tc.clusterconditions...)
// require.NoError(t, err)
// assertClusterStatus(t, cl, tc.tctype, tc.clusterconditions...)
})
}
}
Expand Down
61 changes: 48 additions & 13 deletions controllers/toolchaincluster/toolchaincluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ import (

toolchainv1alpha1 "github.com/codeready-toolchain/api/api/v1alpha1"
"github.com/codeready-toolchain/toolchain-common/pkg/cluster"
"github.com/go-logr/logr"
"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
kerrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
kubeclientset "k8s.io/client-go/kubernetes"
ctrl "sigs.k8s.io/controller-runtime"
Expand All @@ -19,9 +21,10 @@ import (

// Reconciler reconciles a ToolchainCluster object
type Reconciler struct {
Client client.Client
Scheme *runtime.Scheme
RequeAfter time.Duration
Client client.Client
Scheme *runtime.Scheme
RequeAfter time.Duration
overwriteRunHealthcheck func(ctx context.Context, lcl client.Client, rcl client.Client, rclset *kubeclientset.Clientset, tc *toolchainv1alpha1.ToolchainCluster, lgr logr.Logger) ([]toolchainv1alpha1.Condition, error)
}

// SetupWithManager sets up the controller with the Manager.
Expand All @@ -44,7 +47,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, request ctrl.Request) (ctrl.
toolchainCluster := &toolchainv1alpha1.ToolchainCluster{}
err := r.Client.Get(ctx, request.NamespacedName, toolchainCluster)
if err != nil {
if errors.IsNotFound(err) {
if kerrors.IsNotFound(err) {
// Stop monitoring the toolchain cluster as it is deleted
return reconcile.Result{}, nil
}
Expand Down Expand Up @@ -75,20 +78,52 @@ func (r *Reconciler) Reconcile(ctx context.Context, request ctrl.Request) (ctrl.
reqLogger.Error(err, "cannot create ClientSet for the ToolchainCluster")
return reconcile.Result{}, err
}
healthChecker := &HealthChecker{
localClusterClient: r.Client,
remoteClusterClient: cachedCluster.Client,
remoteClusterClientset: clientSet,
logger: reqLogger,

// execute healthcheck
healthcheckResult, err := r.runHealthcheck(ctx, r.Client, cachedCluster.Client, clientSet, toolchainCluster, reqLogger)
if err != nil {
reqLogger.Error(err, "unable to run healthcheck for ToolchainCluster")
return reconcile.Result{}, err
}
// update the status of the individual cluster.
if err := healthChecker.updateIndividualClusterStatus(ctx, toolchainCluster); err != nil {
if err := r.updateStatus(ctx, toolchainCluster, healthcheckResult); err != nil {
reqLogger.Error(err, "unable to update cluster status of ToolchainCluster")
return reconcile.Result{}, err
}

return reconcile.Result{RequeueAfter: r.RequeAfter}, nil
}
func (r *Reconciler) runHealthcheck(ctx context.Context, lcl client.Client, rcl client.Client, rclset *kubeclientset.Clientset, tc *toolchainv1alpha1.ToolchainCluster, lgr logr.Logger) ([]toolchainv1alpha1.Condition, error) {
if r.overwriteRunHealthcheck != nil {
return r.overwriteRunHealthcheck(ctx, lcl, rcl, rclset, tc, lgr)
}
healthChecker := &HealthChecker{
localClusterClient: lcl,
remoteClusterClient: rcl,
remoteClusterClientset: rclset,
logger: lgr,
}
// update the status of the individual cluster.
clstatus := healthChecker.getClusterHealthStatus(ctx)
return clstatus, nil

}

func (r *Reconciler) updateStatus(ctx context.Context, toolchainCluster *toolchainv1alpha1.ToolchainCluster, currentconditions []toolchainv1alpha1.Condition) error {

for index, currentCond := range currentconditions {
for _, previousCond := range toolchainCluster.Status.Conditions {
if currentCond.Type == previousCond.Type && currentCond.Status == previousCond.Status {
currentconditions[index].LastTransitionTime = previousCond.LastTransitionTime
}
}
}

toolchainCluster.Status.Conditions = currentconditions
if err := r.Client.Status().Update(ctx, toolchainCluster); err != nil {
return errors.Wrapf(err, "Failed to update the status of cluster %s", toolchainCluster.Name)
}
return nil
}

func (r *Reconciler) labelTokenSecret(ctx context.Context, toolchainCluster *toolchainv1alpha1.ToolchainCluster) error {
if toolchainCluster.Spec.SecretRef.Name == "" {
Expand All @@ -97,7 +132,7 @@ func (r *Reconciler) labelTokenSecret(ctx context.Context, toolchainCluster *too

secret := &corev1.Secret{}
if err := r.Client.Get(ctx, client.ObjectKey{Name: toolchainCluster.Spec.SecretRef.Name, Namespace: toolchainCluster.Namespace}, secret); err != nil {
if errors.IsNotFound(err) {
if kerrors.IsNotFound(err) {
// The referenced secret does not exist yet, so we can't really label it.
// Because the reconciler runs periodically (not just on ToolchainCluster change), we will
// recover from this condition once the secret appears in the cluster.
Expand Down

0 comments on commit cdaafb5

Please sign in to comment.