Skip to content

Commit

Permalink
Support upgrade CronJob api version (#2840)
Browse files Browse the repository at this point in the history
Support upgrade CronJob api version

### Description
According to kube's deprecation
[guide](https://kubernetes.io/docs/reference/using-api/deprecation-guide/#cronjob-v125),
CronJob api on batch/v1beta1 should be deprecated on v1.25 in favor of
batch/v1 which was introduced in v1.21. Let's add its support and
eventually remove the legacy api as well.

### Testing Performed
See unit tests

### TODOs
No deadline on this right now, but eventually would be nice to go back
and clean out the legacy references.
  • Loading branch information
samrabelachew authored Nov 9, 2023
1 parent 11a6f50 commit b67c089
Show file tree
Hide file tree
Showing 2 changed files with 171 additions and 23 deletions.
117 changes: 109 additions & 8 deletions backend/service/k8s/cronjob.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@ package k8s

import (
"context"
"fmt"
"strconv"
"strings"

"github.com/golang/protobuf/ptypes/wrappers"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
v1beta1 "k8s.io/api/batch/v1beta1"
"k8s.io/api/batch/v1"
"k8s.io/api/batch/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

k8sapiv1 "github.com/lyft/clutch/backend/api/k8s/v1"
Expand All @@ -19,16 +22,38 @@ func (s *svc) DescribeCronJob(ctx context.Context, clientset, cluster, namespace
return nil, err
}

cronJobs, err := cs.BatchV1beta1().CronJobs(cs.Namespace()).List(ctx, metav1.ListOptions{
major, minor, err := fetchVersion(cs)
if err != nil {
return nil, err
}

if major == 1 && minor < 21 {
cronJobs, err := cs.BatchV1beta1().CronJobs(cs.Namespace()).List(ctx, metav1.ListOptions{
FieldSelector: "metadata.name=" + name,
})
if err != nil {
return nil, err
}

if len(cronJobs.Items) == 1 {
return ProtoForV1Beta1CronJob(cs.Cluster(), &cronJobs.Items[0]), nil
}
if len(cronJobs.Items) > 1 {
return nil, status.Error(codes.FailedPrecondition, "located multiple cron jobs")
}
return nil, status.Error(codes.NotFound, "unable to locate specified cron job")
}

cronJobs, err := cs.BatchV1().CronJobs(cs.Namespace()).List(ctx, metav1.ListOptions{
FieldSelector: "metadata.name=" + name,
})
if err != nil {
return nil, err
}

if len(cronJobs.Items) == 1 {
return ProtoForCronJob(cs.Cluster(), &cronJobs.Items[0]), nil
} else if len(cronJobs.Items) > 1 {
}
if len(cronJobs.Items) > 1 {
return nil, status.Error(codes.FailedPrecondition, "located multiple cron jobs")
}
return nil, status.Error(codes.NotFound, "unable to locate specified cron job")
Expand All @@ -45,7 +70,25 @@ func (s *svc) ListCronJobs(ctx context.Context, clientset, cluster, namespace st
return nil, err
}

cronJobList, err := cs.BatchV1beta1().CronJobs(cs.Namespace()).List(ctx, opts)
major, minor, err := fetchVersion(cs)
if err != nil {
return nil, err
}

if major == 1 && minor < 21 {
cronJobList, err := cs.BatchV1beta1().CronJobs(cs.Namespace()).List(ctx, opts)
if err != nil {
return nil, err
}
var cronJobs []*k8sapiv1.CronJob
for _, d := range cronJobList.Items {
cronJob := d
cronJobs = append(cronJobs, ProtoForV1Beta1CronJob(cs.Cluster(), &cronJob))
}
return cronJobs, nil
}

cronJobList, err := cs.BatchV1().CronJobs(cs.Namespace()).List(ctx, opts)
if err != nil {
return nil, err
}
Expand All @@ -55,7 +98,6 @@ func (s *svc) ListCronJobs(ctx context.Context, clientset, cluster, namespace st
cronJob := d
cronJobs = append(cronJobs, ProtoForCronJob(cs.Cluster(), &cronJob))
}

return cronJobs, nil
}

Expand All @@ -66,10 +108,19 @@ func (s *svc) DeleteCronJob(ctx context.Context, clientset, cluster, namespace,
}

opts := metav1.DeleteOptions{}
return cs.BatchV1beta1().CronJobs(cs.Namespace()).Delete(ctx, name, opts)

major, minor, err := fetchVersion(cs)
if err != nil {
return err
}

if major == 1 && minor < 21 {
return cs.BatchV1beta1().CronJobs(cs.Namespace()).Delete(ctx, name, opts)
}
return cs.BatchV1().CronJobs(cs.Namespace()).Delete(ctx, name, opts)
}

func ProtoForCronJob(cluster string, k8scronJob *v1beta1.CronJob) *k8sapiv1.CronJob {
func ProtoForV1Beta1CronJob(cluster string, k8scronJob *v1beta1.CronJob) *k8sapiv1.CronJob {
clusterName := GetKubeClusterName(k8scronJob)
if clusterName == "" {
clusterName = cluster
Expand Down Expand Up @@ -100,3 +151,53 @@ func ProtoForCronJob(cluster string, k8scronJob *v1beta1.CronJob) *k8sapiv1.Cron
}
return ret
}

func ProtoForCronJob(cluster string, k8scronJob *v1.CronJob) *k8sapiv1.CronJob {
clusterName := GetKubeClusterName(k8scronJob)
if clusterName == "" {
clusterName = cluster
}
// Required fields
ret := &k8sapiv1.CronJob{
Cluster: clusterName,
Namespace: k8scronJob.Namespace,
Name: k8scronJob.Name,
Schedule: k8scronJob.Spec.Schedule,
Labels: k8scronJob.Labels,
Annotations: k8scronJob.Annotations,
}

// Update optional fields
if k8scronJob.Spec.Suspend != nil {
ret.Suspend = *k8scronJob.Spec.Suspend
}
if k8scronJob.Spec.ConcurrencyPolicy != "" {
ret.ConcurrencyPolicy = k8sapiv1.CronJob_ConcurrencyPolicy(
k8sapiv1.CronJob_ConcurrencyPolicy_value[strings.ToUpper(string(k8scronJob.Spec.ConcurrencyPolicy))])
}
if k8scronJob.Status.Active != nil {
ret.NumActiveJobs = int32(len(k8scronJob.Status.Active))
}
if k8scronJob.Spec.StartingDeadlineSeconds != nil {
ret.StartingDeadlineSeconds = &wrappers.Int64Value{Value: *k8scronJob.Spec.StartingDeadlineSeconds}
}
return ret
}

func fetchVersion(cs ContextClientset) (int, int, error) {
version, err := cs.Discovery().ServerVersion()
if err != nil {
return 0, 0, fmt.Errorf("unable to get kubernetes server version info: %w", err)
}

major, err := strconv.Atoi(version.Major)
if err != nil {
return 0, 0, fmt.Errorf("unable to convert kube major version to int: %w", err)
}

minor, err := strconv.Atoi(strings.Trim(version.Minor, "+"))
if err != nil {
return 0, 0, fmt.Errorf("unable to convert kube minor version to int: %w", err)
}
return major, minor, nil
}
77 changes: 62 additions & 15 deletions backend/service/k8s/cronjob_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,25 +6,56 @@ import (
"testing"

"github.com/stretchr/testify/assert"
v1beta1 "k8s.io/api/batch/v1beta1"
batchv1 "k8s.io/api/batch/v1"
"k8s.io/api/batch/v1beta1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/version"
fakediscovery "k8s.io/client-go/discovery/fake"
"k8s.io/client-go/kubernetes/fake"

k8sapiv1 "github.com/lyft/clutch/backend/api/k8s/v1"
)

func testCronService() *svc {
cron := &v1beta1.CronJob{
ObjectMeta: metav1.ObjectMeta{
Name: "testing-cron-name",
Namespace: "testing-namespace",
Labels: map[string]string{"test": "foo"},
Annotations: map[string]string{"test": "bar"},
},
func testCronService(t *testing.T, legacy bool) *svc {
var cs *fake.Clientset
if legacy {
cron := &v1beta1.CronJob{
ObjectMeta: metav1.ObjectMeta{
Name: "testing-cron-name",
Namespace: "testing-namespace",
Labels: map[string]string{"test": "foo"},
Annotations: map[string]string{"test": "bar"},
},
}
cs = fake.NewSimpleClientset(cron)
fakeDiscovery, ok := cs.Discovery().(*fakediscovery.FakeDiscovery)
if !ok {
t.Fatalf("couldn't convert Discovery() to *FakeDiscovery")
}
fakeDiscovery.FakedServerVersion = &version.Info{
Major: "1",
Minor: "20",
}
} else {
cron := &batchv1.CronJob{
ObjectMeta: metav1.ObjectMeta{
Name: "testing-cron-name",
Namespace: "testing-namespace",
Labels: map[string]string{"test": "foo"},
Annotations: map[string]string{"test": "bar"},
},
}
cs = fake.NewSimpleClientset(cron)
fakeDiscovery, ok := cs.Discovery().(*fakediscovery.FakeDiscovery)
if !ok {
t.Fatalf("couldn't convert Discovery() to *FakeDiscovery")
}
fakeDiscovery.FakedServerVersion = &version.Info{
Major: "1",
Minor: "22",
}
}

cs := fake.NewSimpleClientset(cron)
return &svc{
manager: &managerImpl{
clientsets: map[string]*ctxClientsetImpl{"foo": {
Expand All @@ -37,18 +68,30 @@ func testCronService() *svc {
}

func TestDescribeCron(t *testing.T) {
s := testCronService()
s := testCronService(t, true)
cron, err := s.DescribeCronJob(context.Background(), "foo", "core-testing", "testing-namespace", "testing-cron-name")
assert.NoError(t, err)
assert.NotNil(t, cron)

s = testCronService(t, false)
cron, err = s.DescribeCronJob(context.Background(), "foo", "core-testing", "testing-namespace", "testing-cron-name")
assert.NoError(t, err)
assert.NotNil(t, cron)
}

func TestListCron(t *testing.T) {
s := testCronService()
s := testCronService(t, true)
opts := &k8sapiv1.ListOptions{Labels: map[string]string{"test": "foo"}}
list, err := s.ListCronJobs(context.Background(), "foo", "core-testing", "testing-namespace", opts)
assert.NoError(t, err)
assert.Equal(t, 1, len(list))

s = testCronService(t, false)
opts = &k8sapiv1.ListOptions{Labels: map[string]string{"test": "foo"}}
list, err = s.ListCronJobs(context.Background(), "foo", "core-testing", "testing-namespace", opts)
assert.NoError(t, err)
assert.Equal(t, 1, len(list))

// Not Found
opts = &k8sapiv1.ListOptions{Labels: map[string]string{"unknown": "bar"}}
list, err = s.ListCronJobs(context.Background(), "foo", "core-testing", "testing-namespace", opts)
Expand All @@ -57,7 +100,7 @@ func TestListCron(t *testing.T) {
}

func TestDeleteCron(t *testing.T) {
s := testCronService()
s := testCronService(t, true)
// Not found.
err := s.DeleteCronJob(context.Background(), "foo", "core-testing", "testing-namespace", "abc")
assert.Error(t, err)
Expand All @@ -68,6 +111,10 @@ func TestDeleteCron(t *testing.T) {
// Not found.
_, err = s.DescribeCronJob(context.Background(), "foo", "core-testing", "testing-namespace", "testing-cron-name")
assert.Error(t, err)

s = testCronService(t, false)
err = s.DeleteCronJob(context.Background(), "foo", "core-testing", "testing-namespace", "testing-cron-name")
assert.NoError(t, err)
}

func TestProtoForCron(t *testing.T) {
Expand Down Expand Up @@ -124,7 +171,7 @@ func TestProtoForCron(t *testing.T) {
t.Run(tt.id, func(t *testing.T) {
t.Parallel()

cron := ProtoForCronJob(tt.inputClusterName, tt.cron)
cron := ProtoForV1Beta1CronJob(tt.inputClusterName, tt.cron)
assert.Equal(t, tt.expectedClusterName, cron.Cluster)
assert.Equal(t, tt.expectedName, cron.Name)
assert.Equal(t, tt.cron.Spec.Schedule, cron.Schedule)
Expand Down

0 comments on commit b67c089

Please sign in to comment.