From b67c089a5867fcf95fc28efd1f18fb00574b43b0 Mon Sep 17 00:00:00 2001 From: samrabelachew Date: Thu, 9 Nov 2023 11:40:04 -0800 Subject: [PATCH] Support upgrade CronJob api version (#2840) Support upgrade CronJob api version ### Description According to kube's deprecation [guide](https://kubernetes.io/docs/reference/using-api/deprecation-guide/#cronjob-v125), CronJob api on batch/v1beta1 should be deprecated on v1.25 in favor of batch/v1 which was introduced in v1.21. Let's add its support and eventually remove the legacy api as well. ### Testing Performed See unit tests ### TODOs No deadline on this right now, but eventually would be nice to go back and clean out the legacy references. --- backend/service/k8s/cronjob.go | 117 ++++++++++++++++++++++++++-- backend/service/k8s/cronjob_test.go | 77 ++++++++++++++---- 2 files changed, 171 insertions(+), 23 deletions(-) diff --git a/backend/service/k8s/cronjob.go b/backend/service/k8s/cronjob.go index d3688d1662..8ccec1a265 100644 --- a/backend/service/k8s/cronjob.go +++ b/backend/service/k8s/cronjob.go @@ -2,12 +2,15 @@ package k8s import ( "context" + "fmt" + "strconv" "strings" "github.com/golang/protobuf/ptypes/wrappers" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" - v1beta1 "k8s.io/api/batch/v1beta1" + "k8s.io/api/batch/v1" + "k8s.io/api/batch/v1beta1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" k8sapiv1 "github.com/lyft/clutch/backend/api/k8s/v1" @@ -19,16 +22,38 @@ func (s *svc) DescribeCronJob(ctx context.Context, clientset, cluster, namespace return nil, err } - cronJobs, err := cs.BatchV1beta1().CronJobs(cs.Namespace()).List(ctx, metav1.ListOptions{ + major, minor, err := fetchVersion(cs) + if err != nil { + return nil, err + } + + if major == 1 && minor < 21 { + cronJobs, err := cs.BatchV1beta1().CronJobs(cs.Namespace()).List(ctx, metav1.ListOptions{ + FieldSelector: "metadata.name=" + name, + }) + if err != nil { + return nil, err + } + + if len(cronJobs.Items) == 1 { + return ProtoForV1Beta1CronJob(cs.Cluster(), &cronJobs.Items[0]), nil + } + if len(cronJobs.Items) > 1 { + return nil, status.Error(codes.FailedPrecondition, "located multiple cron jobs") + } + return nil, status.Error(codes.NotFound, "unable to locate specified cron job") + } + + cronJobs, err := cs.BatchV1().CronJobs(cs.Namespace()).List(ctx, metav1.ListOptions{ FieldSelector: "metadata.name=" + name, }) if err != nil { return nil, err } - if len(cronJobs.Items) == 1 { return ProtoForCronJob(cs.Cluster(), &cronJobs.Items[0]), nil - } else if len(cronJobs.Items) > 1 { + } + if len(cronJobs.Items) > 1 { return nil, status.Error(codes.FailedPrecondition, "located multiple cron jobs") } return nil, status.Error(codes.NotFound, "unable to locate specified cron job") @@ -45,7 +70,25 @@ func (s *svc) ListCronJobs(ctx context.Context, clientset, cluster, namespace st return nil, err } - cronJobList, err := cs.BatchV1beta1().CronJobs(cs.Namespace()).List(ctx, opts) + major, minor, err := fetchVersion(cs) + if err != nil { + return nil, err + } + + if major == 1 && minor < 21 { + cronJobList, err := cs.BatchV1beta1().CronJobs(cs.Namespace()).List(ctx, opts) + if err != nil { + return nil, err + } + var cronJobs []*k8sapiv1.CronJob + for _, d := range cronJobList.Items { + cronJob := d + cronJobs = append(cronJobs, ProtoForV1Beta1CronJob(cs.Cluster(), &cronJob)) + } + return cronJobs, nil + } + + cronJobList, err := cs.BatchV1().CronJobs(cs.Namespace()).List(ctx, opts) if err != nil { return nil, err } @@ -55,7 +98,6 @@ func (s *svc) ListCronJobs(ctx context.Context, clientset, cluster, namespace st cronJob := d cronJobs = append(cronJobs, ProtoForCronJob(cs.Cluster(), &cronJob)) } - return cronJobs, nil } @@ -66,10 +108,19 @@ func (s *svc) DeleteCronJob(ctx context.Context, clientset, cluster, namespace, } opts := metav1.DeleteOptions{} - return cs.BatchV1beta1().CronJobs(cs.Namespace()).Delete(ctx, name, opts) + + major, minor, err := fetchVersion(cs) + if err != nil { + return err + } + + if major == 1 && minor < 21 { + return cs.BatchV1beta1().CronJobs(cs.Namespace()).Delete(ctx, name, opts) + } + return cs.BatchV1().CronJobs(cs.Namespace()).Delete(ctx, name, opts) } -func ProtoForCronJob(cluster string, k8scronJob *v1beta1.CronJob) *k8sapiv1.CronJob { +func ProtoForV1Beta1CronJob(cluster string, k8scronJob *v1beta1.CronJob) *k8sapiv1.CronJob { clusterName := GetKubeClusterName(k8scronJob) if clusterName == "" { clusterName = cluster @@ -100,3 +151,53 @@ func ProtoForCronJob(cluster string, k8scronJob *v1beta1.CronJob) *k8sapiv1.Cron } return ret } + +func ProtoForCronJob(cluster string, k8scronJob *v1.CronJob) *k8sapiv1.CronJob { + clusterName := GetKubeClusterName(k8scronJob) + if clusterName == "" { + clusterName = cluster + } + // Required fields + ret := &k8sapiv1.CronJob{ + Cluster: clusterName, + Namespace: k8scronJob.Namespace, + Name: k8scronJob.Name, + Schedule: k8scronJob.Spec.Schedule, + Labels: k8scronJob.Labels, + Annotations: k8scronJob.Annotations, + } + + // Update optional fields + if k8scronJob.Spec.Suspend != nil { + ret.Suspend = *k8scronJob.Spec.Suspend + } + if k8scronJob.Spec.ConcurrencyPolicy != "" { + ret.ConcurrencyPolicy = k8sapiv1.CronJob_ConcurrencyPolicy( + k8sapiv1.CronJob_ConcurrencyPolicy_value[strings.ToUpper(string(k8scronJob.Spec.ConcurrencyPolicy))]) + } + if k8scronJob.Status.Active != nil { + ret.NumActiveJobs = int32(len(k8scronJob.Status.Active)) + } + if k8scronJob.Spec.StartingDeadlineSeconds != nil { + ret.StartingDeadlineSeconds = &wrappers.Int64Value{Value: *k8scronJob.Spec.StartingDeadlineSeconds} + } + return ret +} + +func fetchVersion(cs ContextClientset) (int, int, error) { + version, err := cs.Discovery().ServerVersion() + if err != nil { + return 0, 0, fmt.Errorf("unable to get kubernetes server version info: %w", err) + } + + major, err := strconv.Atoi(version.Major) + if err != nil { + return 0, 0, fmt.Errorf("unable to convert kube major version to int: %w", err) + } + + minor, err := strconv.Atoi(strings.Trim(version.Minor, "+")) + if err != nil { + return 0, 0, fmt.Errorf("unable to convert kube minor version to int: %w", err) + } + return major, minor, nil +} diff --git a/backend/service/k8s/cronjob_test.go b/backend/service/k8s/cronjob_test.go index 90cbdabcd2..9420c61cf5 100644 --- a/backend/service/k8s/cronjob_test.go +++ b/backend/service/k8s/cronjob_test.go @@ -6,25 +6,56 @@ import ( "testing" "github.com/stretchr/testify/assert" - v1beta1 "k8s.io/api/batch/v1beta1" + batchv1 "k8s.io/api/batch/v1" + "k8s.io/api/batch/v1beta1" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/version" + fakediscovery "k8s.io/client-go/discovery/fake" "k8s.io/client-go/kubernetes/fake" k8sapiv1 "github.com/lyft/clutch/backend/api/k8s/v1" ) -func testCronService() *svc { - cron := &v1beta1.CronJob{ - ObjectMeta: metav1.ObjectMeta{ - Name: "testing-cron-name", - Namespace: "testing-namespace", - Labels: map[string]string{"test": "foo"}, - Annotations: map[string]string{"test": "bar"}, - }, +func testCronService(t *testing.T, legacy bool) *svc { + var cs *fake.Clientset + if legacy { + cron := &v1beta1.CronJob{ + ObjectMeta: metav1.ObjectMeta{ + Name: "testing-cron-name", + Namespace: "testing-namespace", + Labels: map[string]string{"test": "foo"}, + Annotations: map[string]string{"test": "bar"}, + }, + } + cs = fake.NewSimpleClientset(cron) + fakeDiscovery, ok := cs.Discovery().(*fakediscovery.FakeDiscovery) + if !ok { + t.Fatalf("couldn't convert Discovery() to *FakeDiscovery") + } + fakeDiscovery.FakedServerVersion = &version.Info{ + Major: "1", + Minor: "20", + } + } else { + cron := &batchv1.CronJob{ + ObjectMeta: metav1.ObjectMeta{ + Name: "testing-cron-name", + Namespace: "testing-namespace", + Labels: map[string]string{"test": "foo"}, + Annotations: map[string]string{"test": "bar"}, + }, + } + cs = fake.NewSimpleClientset(cron) + fakeDiscovery, ok := cs.Discovery().(*fakediscovery.FakeDiscovery) + if !ok { + t.Fatalf("couldn't convert Discovery() to *FakeDiscovery") + } + fakeDiscovery.FakedServerVersion = &version.Info{ + Major: "1", + Minor: "22", + } } - - cs := fake.NewSimpleClientset(cron) return &svc{ manager: &managerImpl{ clientsets: map[string]*ctxClientsetImpl{"foo": { @@ -37,18 +68,30 @@ func testCronService() *svc { } func TestDescribeCron(t *testing.T) { - s := testCronService() + s := testCronService(t, true) cron, err := s.DescribeCronJob(context.Background(), "foo", "core-testing", "testing-namespace", "testing-cron-name") assert.NoError(t, err) assert.NotNil(t, cron) + + s = testCronService(t, false) + cron, err = s.DescribeCronJob(context.Background(), "foo", "core-testing", "testing-namespace", "testing-cron-name") + assert.NoError(t, err) + assert.NotNil(t, cron) } func TestListCron(t *testing.T) { - s := testCronService() + s := testCronService(t, true) opts := &k8sapiv1.ListOptions{Labels: map[string]string{"test": "foo"}} list, err := s.ListCronJobs(context.Background(), "foo", "core-testing", "testing-namespace", opts) assert.NoError(t, err) assert.Equal(t, 1, len(list)) + + s = testCronService(t, false) + opts = &k8sapiv1.ListOptions{Labels: map[string]string{"test": "foo"}} + list, err = s.ListCronJobs(context.Background(), "foo", "core-testing", "testing-namespace", opts) + assert.NoError(t, err) + assert.Equal(t, 1, len(list)) + // Not Found opts = &k8sapiv1.ListOptions{Labels: map[string]string{"unknown": "bar"}} list, err = s.ListCronJobs(context.Background(), "foo", "core-testing", "testing-namespace", opts) @@ -57,7 +100,7 @@ func TestListCron(t *testing.T) { } func TestDeleteCron(t *testing.T) { - s := testCronService() + s := testCronService(t, true) // Not found. err := s.DeleteCronJob(context.Background(), "foo", "core-testing", "testing-namespace", "abc") assert.Error(t, err) @@ -68,6 +111,10 @@ func TestDeleteCron(t *testing.T) { // Not found. _, err = s.DescribeCronJob(context.Background(), "foo", "core-testing", "testing-namespace", "testing-cron-name") assert.Error(t, err) + + s = testCronService(t, false) + err = s.DeleteCronJob(context.Background(), "foo", "core-testing", "testing-namespace", "testing-cron-name") + assert.NoError(t, err) } func TestProtoForCron(t *testing.T) { @@ -124,7 +171,7 @@ func TestProtoForCron(t *testing.T) { t.Run(tt.id, func(t *testing.T) { t.Parallel() - cron := ProtoForCronJob(tt.inputClusterName, tt.cron) + cron := ProtoForV1Beta1CronJob(tt.inputClusterName, tt.cron) assert.Equal(t, tt.expectedClusterName, cron.Cluster) assert.Equal(t, tt.expectedName, cron.Name) assert.Equal(t, tt.cron.Spec.Schedule, cron.Schedule)