Skip to content

Commit

Permalink
use context to create new loggers
Browse files Browse the repository at this point in the history
Signed-off-by: Kevin <[email protected]>
  • Loading branch information
KPostOffice committed Nov 30, 2023
1 parent a7bb744 commit a9c5d25
Show file tree
Hide file tree
Showing 9 changed files with 221 additions and 185 deletions.
3 changes: 2 additions & 1 deletion controllers/appWrapper_controller_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package controllers

import (
"context"
"testing"

"github.com/onsi/gomega"
Expand Down Expand Up @@ -99,7 +100,7 @@ func (r *AppWrapperReconciler) TestDiscoverInstanceTypes(t *testing.T) {

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
result := r.discoverInstanceTypes(test.input)
result := r.discoverInstanceTypes(context.TODO(), test.input)
g.Expect(result).To(gomega.Equal(test.expected))
})
}
Expand Down
52 changes: 39 additions & 13 deletions controllers/appwrapper_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes"
"k8s.io/klog"

"k8s.io/apimachinery/pkg/labels"
ctrl "sigs.k8s.io/controller-runtime"
Expand Down Expand Up @@ -81,7 +80,6 @@ const (
// For more details, check Reconcile and its Result here:
// - https://pkg.go.dev/sigs.k8s.io/[email protected]/pkg/reconcile
func (r *AppWrapperReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {

_ = log.FromContext(ctx)
// todo: Move the getOCMClusterID call out of reconcile loop.
// Only reason we are calling it here is that the client is not able to make
Expand Down Expand Up @@ -122,7 +120,7 @@ func (r *AppWrapperReconciler) Reconcile(ctx context.Context, req ctrl.Request)
return ctrl.Result{}, nil
}

demandPerInstanceType := r.discoverInstanceTypes(&appwrapper)
demandPerInstanceType := r.discoverInstanceTypes(ctx, &appwrapper)
if ocmSecretRef := r.Config.OCMSecretRef; ocmSecretRef != nil {
return r.scaleMachinePool(ctx, &appwrapper, demandPerInstanceType)
} else {
Expand All @@ -137,6 +135,7 @@ func (r *AppWrapperReconciler) Reconcile(ctx context.Context, req ctrl.Request)
}

func (r *AppWrapperReconciler) finalizeScalingDownMachines(ctx context.Context, appwrapper *arbv1.AppWrapper) error {
logger := ctrl.LoggerFrom(ctx)
if appwrapper.Status.State == arbv1.AppWrapperStateCompleted {
deletionMessage = "completed"
} else {
Expand All @@ -147,24 +146,41 @@ func (r *AppWrapperReconciler) finalizeScalingDownMachines(ctx context.Context,
case "reuse":
matchedAw := r.findExactMatch(ctx, appwrapper)
if matchedAw != nil {
klog.Infof("Appwrapper %s %s, swapping machines to %s", appwrapper.Name, deletionMessage, matchedAw.Name)
logger.Info(
"AppWrapper deleted transferring machines",
"oldAppWrapper", appwrapper,
"deletionMessage", deletionMessage,
"newAppWrapper", matchedAw,
)
if err := r.swapNodeLabels(ctx, appwrapper, matchedAw); err != nil {
return err
}
} else {
klog.Infof("Appwrapper %s %s, scaling down machines", appwrapper.Name, deletionMessage)
logger.Info(
"Scaling down machines associated with deleted AppWrapper",
"appWrapper", appwrapper,
"deletionMessage", deletionMessage,
)
if err := r.annotateToDeleteMachine(ctx, appwrapper); err != nil {
return err
}
}
case "duplicate":
klog.Infof("Appwrapper %s scale-down machineset: %s ", deletionMessage, appwrapper.Name)
logger.Info(
"AppWrapper deleted, scaling down machineset",
"appWrapper", appwrapper,
"deletionMessage", deletionMessage,
)
if err := r.deleteMachineSet(ctx, appwrapper); err != nil {
return err
}
}
} else {
klog.Infof("Appwrapper %s scale-down machine pool: %s ", deletionMessage, appwrapper.Name)
logger.Info(
"AppWrapper deleted, scaling down machine pool",
"appWrapper", appwrapper,
"deletionMessage", deletionMessage,
)
if _, err := r.deleteMachinePool(ctx, appwrapper); err != nil {
return err
}
Expand All @@ -175,6 +191,7 @@ func (r *AppWrapperReconciler) finalizeScalingDownMachines(ctx context.Context,
// SetupWithManager sets up the controller with the Manager.
func (r *AppWrapperReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager) error {

logger := ctrl.LoggerFrom(ctx)
restConfig := mgr.GetConfig()

var err error
Expand All @@ -197,20 +214,21 @@ func (r *AppWrapperReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Ma
if ok, err := r.machinePoolExists(); err != nil {
return err
} else if ok {
klog.Info("Using machine pools for cluster auto-scaling")
logger.Info("Using machine pools for cluster auto-scaling")
}
}

return ctrl.NewControllerManagedBy(mgr).
For(&arbv1.AppWrapper{}).
For(&arbv1.AppWrapper{}).Named("instascale").
Complete(r)
}

func (r *AppWrapperReconciler) getOCMSecret(ctx context.Context, secretRef *corev1.SecretReference) (*corev1.Secret, error) {
return r.kubeClient.CoreV1().Secrets(secretRef.Namespace).Get(ctx, secretRef.Name, metav1.GetOptions{})
}

func (r *AppWrapperReconciler) discoverInstanceTypes(aw *arbv1.AppWrapper) map[string]int {
func (r *AppWrapperReconciler) discoverInstanceTypes(ctx context.Context, aw *arbv1.AppWrapper) map[string]int {
logger := ctrl.LoggerFrom(ctx)
demandMapPerInstanceType := make(map[string]int)
var instanceRequired []string
for k, v := range aw.Labels {
Expand All @@ -220,7 +238,10 @@ func (r *AppWrapperReconciler) discoverInstanceTypes(aw *arbv1.AppWrapper) map[s
}

if len(instanceRequired) < 1 {
klog.Infof("Found AW %s that cannot be scaled due to missing orderedinstance label", aw.ObjectMeta.Name)
logger.Info(
"AppWrapper cannot be scaled out due to missing orderedinstance label",
"appWrapper", aw,
)
return demandMapPerInstanceType
}

Expand All @@ -237,6 +258,7 @@ func (r *AppWrapperReconciler) discoverInstanceTypes(aw *arbv1.AppWrapper) map[s
}

func (r *AppWrapperReconciler) findExactMatch(ctx context.Context, aw *arbv1.AppWrapper) *arbv1.AppWrapper {
logger := ctrl.LoggerFrom(ctx)
var match *arbv1.AppWrapper = nil
appwrappers := arbv1.AppWrapperList{}

Expand All @@ -250,7 +272,7 @@ func (r *AppWrapperReconciler) findExactMatch(ctx context.Context, aw *arbv1.App

err := r.List(ctx, &appwrappers, listOptions)
if err != nil {
klog.Error("Cannot list queued appwrappers, associated machines will be deleted")
logger.Error(err, "Cannot list queued appwrappers, associated machines will be deleted")
return match
}
var existingAcquiredMachineTypes = ""
Expand All @@ -265,7 +287,11 @@ func (r *AppWrapperReconciler) findExactMatch(ctx context.Context, aw *arbv1.App
if eachAw.Status.State != arbv1.AppWrapperStateEnqueued {
if eachAw.Labels["orderedinstance"] == existingAcquiredMachineTypes {
match = &eachAw
klog.Infof("Found exact match, %v appwrapper has acquired machinetypes %v", eachAw.Name, existingAcquiredMachineTypes)
logger.Info(
"AppWrapper has successfully acquired requested machine types",
"appWrapper", eachAw,
"acquiredMachineTypes", existingAcquiredMachineTypes,
)
break
}
}
Expand Down
36 changes: 27 additions & 9 deletions controllers/machinepools.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
arbv1 "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/apis/controller/v1beta1"

"k8s.io/apimachinery/pkg/types"
"k8s.io/klog"
ctrl "sigs.k8s.io/controller-runtime"
)

Expand Down Expand Up @@ -44,6 +43,7 @@ func hasAwLabel(machinePool *cmv1.MachinePool, aw *arbv1.AppWrapper) bool {
}

func (r *AppWrapperReconciler) scaleMachinePool(ctx context.Context, aw *arbv1.AppWrapper, demandPerInstanceType map[string]int) (ctrl.Result, error) {
logger := ctrl.LoggerFrom(ctx)
connection, err := r.createOCMConnection()
if err != nil {
fmt.Fprintf(os.Stderr, "Error creating OCM connection: %v", err)
Expand Down Expand Up @@ -72,25 +72,36 @@ func (r *AppWrapperReconciler) scaleMachinePool(ctx context.Context, aw *arbv1.A
if numberOfMachines != replicas {
m := make(map[string]string)
m[aw.Name] = aw.Name
klog.Infof("The instanceRequired array: %v", userRequestedInstanceType)

machinePoolID := strings.ReplaceAll(aw.Name+"-"+userRequestedInstanceType, ".", "-")
createMachinePool, err := cmv1.NewMachinePool().ID(machinePoolID).InstanceType(userRequestedInstanceType).Replicas(replicas).Labels(m).Build()
if err != nil {
klog.Errorf(`Error building MachinePool: %v`, err)
logger.Error(
err, "Error building MachinePool",
"userRequestedInstanceType", userRequestedInstanceType,
)
}
klog.Infof("Built MachinePool with instance type %v and name %v", userRequestedInstanceType, createMachinePool.ID())
logger.Info(
"Sending MachinePool creation request",
"instanceType", userRequestedInstanceType,
"machinePoolName", createMachinePool.ID(),
)
response, err := clusterMachinePools.Add().Body(createMachinePool).SendContext(ctx)
if err != nil {
klog.Errorf(`Error creating MachinePool: %v`, err)
logger.Error(err, "Error creating MachinePool")
}
klog.Infof("Created MachinePool: %v", response)
logger.Info(
"Successfully created MachinePool",
"machinePoolName", createMachinePool.ID(),
"response", response,
)
}
}
return ctrl.Result{Requeue: false}, nil
}

func (r *AppWrapperReconciler) deleteMachinePool(ctx context.Context, aw *arbv1.AppWrapper) (ctrl.Result, error) {
logger := ctrl.LoggerFrom(ctx)
connection, err := r.createOCMConnection()
if err != nil {
fmt.Fprintf(os.Stderr, "Error creating OCM connection: %v", err)
Expand All @@ -107,9 +118,15 @@ func (r *AppWrapperReconciler) deleteMachinePool(ctx context.Context, aw *arbv1.
if strings.Contains(id, aw.Name) {
targetMachinePool, err := connection.ClustersMgmt().V1().Clusters().Cluster(r.ocmClusterID).MachinePools().MachinePool(id).Delete().SendContext(ctx)
if err != nil {
klog.Infof("Error deleting target machinepool %v", targetMachinePool)
logger.Error(
err, "Error deleting machinepool",
"machinePool", targetMachinePool,
)
}
klog.Infof("Successfully Scaled down target machinepool %v", id)
logger.Info(
"Successfully scaled down target machinepool",
"machinePool", targetMachinePool,
)
}
return true
})
Expand All @@ -129,6 +146,7 @@ func (r *AppWrapperReconciler) machinePoolExists() (bool, error) {

// getOCMClusterID determines the internal clusterID to be used for OCM API calls
func (r *AppWrapperReconciler) getOCMClusterID(ctx context.Context) error {
logger := ctrl.LoggerFrom(ctx)
cv := &configv1.ClusterVersion{}
err := r.Get(ctx, types.NamespacedName{Name: "version"}, cv)
if err != nil {
Expand All @@ -148,7 +166,7 @@ func (r *AppWrapperReconciler) getOCMClusterID(ctx context.Context) error {

response, err := collection.List().Search(fmt.Sprintf("external_id = '%s'", internalClusterID)).Size(1).Page(1).SendContext(ctx)
if err != nil {
klog.Errorf(`Error getting cluster id: %v`, err)
logger.Error(err, "Error getting cluster id")
}

response.Items().Each(func(cluster *cmv1.Cluster) bool {
Expand Down
Loading

0 comments on commit a9c5d25

Please sign in to comment.