Skip to content

Commit

Permalink
Feat: migrate cluster management methods
Browse files Browse the repository at this point in the history
Signed-off-by: Yin Da <[email protected]>
  • Loading branch information
Somefive committed Apr 13, 2023
1 parent 8ab4e3b commit 6cb023e
Show file tree
Hide file tree
Showing 8 changed files with 768 additions and 79 deletions.
17 changes: 11 additions & 6 deletions cmd/apiserver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,13 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/server"
genericfilters "k8s.io/apiserver/pkg/server/filters"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/klog/v2"
"sigs.k8s.io/apiserver-runtime/pkg/builder"

genericfilters "k8s.io/apiserver/pkg/server/filters"

"github.com/oam-dev/cluster-gateway/pkg/config"
"github.com/oam-dev/cluster-gateway/pkg/featuregates"
"github.com/oam-dev/cluster-gateway/pkg/metrics"
"github.com/oam-dev/cluster-gateway/pkg/options"
"github.com/oam-dev/cluster-gateway/pkg/util/singleton"
Expand All @@ -41,10 +42,9 @@ func main() {
// registering metrics
metrics.Register()

cmd, err := builder.APIServer.
apiserverBuilder := builder.APIServer.
// +kubebuilder:scaffold:resource-register
WithResource(&clusterv1alpha1.ClusterGateway{}).
WithResource(&clusterv1alpha1.VirtualCluster{}).
WithLocalDebugExtension().
ExposeLoopbackMasterClientConfig().
ExposeLoopbackAuthorizer().
Expand Down Expand Up @@ -74,8 +74,13 @@ func main() {
server.Handler.FullHandlerChain = clusterv1alpha1.NewClusterGatewayProxyRequestEscaper(server.Handler.FullHandlerChain)
return server
}).
WithPostStartHook("init-master-loopback-client", singleton.InitLoopbackClient).
Build()
WithPostStartHook("init-master-loopback-client", singleton.InitLoopbackClient)

if utilfeature.DefaultMutableFeatureGate.Enabled(featuregates.VirtualCluster) {
apiserverBuilder = apiserverBuilder.WithResource(&clusterv1alpha1.VirtualCluster{})
}

cmd, err := apiserverBuilder.Build()
if err != nil {
klog.Fatal(err)
}
Expand Down
7 changes: 7 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.19

require (
github.com/ghodss/yaml v1.0.0
github.com/oam-dev/cluster-register v1.0.3
github.com/onsi/ginkgo v1.16.5
github.com/onsi/ginkgo/v2 v2.6.1
github.com/onsi/gomega v1.24.2
Expand Down Expand Up @@ -33,6 +34,9 @@ require (
)

require (
github.com/Masterminds/goutils v1.1.1 // indirect
github.com/Masterminds/semver v1.5.0 // indirect
github.com/Masterminds/sprig v2.22.0+incompatible // indirect
github.com/NYTimes/gziphandler v1.1.1 // indirect
github.com/antlr/antlr4/runtime/Go/antlr v1.4.10 // indirect
github.com/beorn7/perks v1.0.1 // indirect
Expand Down Expand Up @@ -63,12 +67,15 @@ require (
github.com/google/uuid v1.3.0 // indirect
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.7.0 // indirect
github.com/huandu/xstrings v1.3.2 // indirect
github.com/imdario/mergo v0.3.12 // indirect
github.com/inconshreveable/mousetrap v1.0.1 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/mailru/easyjson v0.7.6 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.2 // indirect
github.com/mitchellh/copystructure v1.2.0 // indirect
github.com/mitchellh/reflectwalk v1.0.2 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
Expand Down
299 changes: 298 additions & 1 deletion go.sum

Large diffs are not rendered by default.

244 changes: 235 additions & 9 deletions pkg/apis/cluster/v1alpha1/virtualcluster_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,29 @@ package v1alpha1

import (
"context"
"encoding/json"
"fmt"
"sort"
"strings"

corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/selection"
"k8s.io/apimachinery/pkg/types"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/client-go/tools/clientcmd"
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
"k8s.io/utils/strings/slices"
ocmclusterv1 "open-cluster-management.io/api/cluster/v1"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/oam-dev/cluster-register/pkg/spoke"

"github.com/oam-dev/cluster-gateway/pkg/common"
"github.com/oam-dev/cluster-gateway/pkg/config"
)
Expand All @@ -42,6 +50,16 @@ import (
type VirtualClusterClient interface {
Get(ctx context.Context, name string) (*VirtualCluster, error)
List(ctx context.Context, options ...client.ListOption) (*VirtualClusterList, error)

UpdateStatus(ctx context.Context, name string, status VirtualClusterStatus) (*VirtualCluster, error)

Alias(ctx context.Context, name string, alias string) error
AddLabels(ctx context.Context, name string, labels map[string]string) (*VirtualCluster, error)
RemoveLabels(ctx context.Context, name string, labels []string) (*VirtualCluster, error)
Detach(ctx context.Context, name string, options ...DetachClusterOption) (*VirtualCluster, error)
DetachCluster(ctx context.Context, vc *VirtualCluster, options ...DetachClusterOption) error
Rename(ctx context.Context, name string, newName string) (*VirtualCluster, error)
RenameCluster(ctx context.Context, vc *VirtualCluster, newName string) (*VirtualCluster, error)
}

type virtualClusterClient struct {
Expand All @@ -57,15 +75,14 @@ func NewVirtualClusterClient(cli client.Client, namespace string, withControlPla

func (c *virtualClusterClient) Get(ctx context.Context, name string) (*VirtualCluster, error) {
if name == ClusterLocalName {
return NewLocalCluster(), nil
return NewLocalVirtualCluster(), nil
}
key := types.NamespacedName{Name: name, Namespace: c.namespace}
var cluster *VirtualCluster
secret := &corev1.Secret{}
err := c.Client.Get(ctx, key, secret)
err := c.Client.Get(ctx, types.NamespacedName{Name: name, Namespace: c.namespace}, secret)
var secretErr error
if err == nil {
if cluster, secretErr = NewClusterFromSecret(secret); secretErr == nil {
if cluster, secretErr = NewVirtualClusterFromSecret(secret); secretErr == nil {
return cluster, nil
}
}
Expand All @@ -74,10 +91,10 @@ func (c *virtualClusterClient) Get(ctx context.Context, name string) (*VirtualCl
}

managedCluster := &ocmclusterv1.ManagedCluster{}
err = c.Client.Get(ctx, key, managedCluster)
err = c.Client.Get(ctx, types.NamespacedName{Name: name}, managedCluster)
var managedClusterErr error
if err == nil {
if cluster, managedClusterErr = NewClusterFromManagedCluster(managedCluster); managedClusterErr == nil {
if cluster, managedClusterErr = NewVirtualClusterFromManagedCluster(managedCluster); managedClusterErr == nil {
return cluster, nil
}
}
Expand All @@ -104,7 +121,7 @@ func (c *virtualClusterClient) List(ctx context.Context, options ...client.ListO
for _, opt := range options {
opt.ApplyToList(opts)
}
local := NewLocalCluster()
local := NewLocalVirtualCluster()
clusters := &VirtualClusterList{Items: []VirtualCluster{*local}}

secrets := &corev1.SecretList{}
Expand All @@ -113,7 +130,7 @@ func (c *virtualClusterClient) List(ctx context.Context, options ...client.ListO
return nil, err
}
for _, secret := range secrets.Items {
if cluster, err := NewClusterFromSecret(secret.DeepCopy()); err == nil {
if cluster, err := NewVirtualClusterFromSecret(secret.DeepCopy()); err == nil {
clusters.Items = append(clusters.Items, *cluster)
}
}
Expand All @@ -125,7 +142,7 @@ func (c *virtualClusterClient) List(ctx context.Context, options ...client.ListO
}
for _, managedCluster := range managedClusters.Items {
if !clusters.HasCluster(managedCluster.Name) {
if cluster, err := NewClusterFromManagedCluster(managedCluster.DeepCopy()); err == nil {
if cluster, err := NewVirtualClusterFromManagedCluster(managedCluster.DeepCopy()); err == nil {
clusters.Items = append(clusters.Items, *cluster)
}
}
Expand Down Expand Up @@ -153,6 +170,215 @@ func (c *virtualClusterClient) List(ctx context.Context, options ...client.ListO
return clusters, nil
}

// detachClusterConfig config for detaching cluster
type detachClusterConfig struct {
managedClusterKubeConfigPath string
}

func newDetachClusterConfig(options ...DetachClusterOption) *detachClusterConfig {
args := &detachClusterConfig{}
for _, op := range options {
op.ApplyTo(args)
}
return args
}

// DetachClusterOption option for detach cluster
// +kubebuilder:object:generate=false
type DetachClusterOption interface {
ApplyTo(cfg *detachClusterConfig)
}

// DetachClusterManagedClusterKubeConfigPathOption configure the managed cluster kubeconfig path while detach ocm cluster
type DetachClusterManagedClusterKubeConfigPathOption string

// ApplyTo apply to args
func (op DetachClusterManagedClusterKubeConfigPathOption) ApplyTo(cfg *detachClusterConfig) {
cfg.managedClusterKubeConfigPath = string(op)
}

// UpdateStatus update the status of the virtual cluster
func (c *virtualClusterClient) UpdateStatus(ctx context.Context, name string, status VirtualClusterStatus) (*VirtualCluster, error) {
vc, err := c.Get(ctx, name)
if err != nil {
return nil, err
}
if vc.Raw == nil {
return nil, fmt.Errorf("no underlying object")
}
annot := vc.Raw.GetAnnotations()
if annot == nil {
annot = map[string]string{}
}
c.encodeStatusField(status.Version, AnnotationClusterVersion, annot)
c.encodeStatusField(status.VirtualClusterHealthiness, AnnotationClusterHealth, annot)
c.encodeStatusField(status.Resources, AnnotationClusterResources, annot)
vc.Raw.SetAnnotations(annot)
vc.Status = status
if err = c.Client.Update(ctx, vc.Raw); err != nil {
return nil, err
}
return vc, err
}

func (c *virtualClusterClient) encodeStatusField(obj interface{}, key string, annot map[string]string) {
if obj == nil {
delete(annot, key)
}
if bs, err := json.Marshal(obj); err == nil {
annot[key] = string(bs)
} else {
delete(annot, key)
}
}

func (c *virtualClusterClient) Alias(ctx context.Context, name string, alias string) error {
vc, err := c.Get(ctx, name)
if err != nil {
return err
}
switch vc.Spec.CredentialType {
case CredentialTypeX509Certificate, CredentialTypeServiceAccountToken, CredentialTypeOCMManagedCluster:
break
case CredentialTypeInternal:
return fmt.Errorf("internal cluster cannot be aliased")
default:
return fmt.Errorf("unrecognizable credential type %T for cluster %s", vc.Spec.CredentialType, name)
}

annot := vc.Raw.GetAnnotations()
if annot == nil {
annot = map[string]string{}
}
annot[AnnotationClusterAlias] = alias
vc.Raw.SetAnnotations(annot)
return c.Client.Update(ctx, vc.Raw)
}

func (c *virtualClusterClient) AddLabels(ctx context.Context, name string, labels map[string]string) (*VirtualCluster, error) {
vc, err := c.Get(ctx, name)
if err != nil {
return nil, err
}
switch vc.Spec.CredentialType {
case CredentialTypeInternal:
return vc, fmt.Errorf("internal cluster cannot set labels for now")
}
ls := vc.Raw.GetLabels()
for k, v := range labels {
if !strings.HasPrefix(k, config.MetaApiGroupName) {
ls[k] = v
}
}
vc.Raw.SetLabels(ls)
if err = c.Client.Update(ctx, vc.Raw); err != nil {
return nil, err
}
return NewVirtualClusterFromObject(vc.Raw)
}

func (c *virtualClusterClient) RemoveLabels(ctx context.Context, name string, labels []string) (*VirtualCluster, error) {
vc, err := c.Get(ctx, name)
if err != nil {
return nil, err
}
switch vc.Spec.CredentialType {
case CredentialTypeInternal:
return vc, fmt.Errorf("internal cluster cannot set labels for now")
}
ls := vc.Raw.GetLabels()
for _, k := range labels {
if !strings.HasPrefix(k, config.MetaApiGroupName) {
delete(ls, k)
}
}
vc.Raw.SetLabels(ls)
if err = c.Client.Update(ctx, vc.Raw); err != nil {
return nil, err
}
return NewVirtualClusterFromObject(vc.Raw)
}

func (c *virtualClusterClient) Rename(ctx context.Context, name string, newName string) (*VirtualCluster, error) {
vc, err := c.Get(ctx, name)
if err != nil {
return nil, err
}
return c.RenameCluster(ctx, vc, newName)
}

func (c *virtualClusterClient) RenameCluster(ctx context.Context, vc *VirtualCluster, newName string) (*VirtualCluster, error) {
switch vc.Spec.CredentialType {
case CredentialTypeX509Certificate, CredentialTypeServiceAccountToken:
break
case CredentialTypeOCMManagedCluster:
return nil, fmt.Errorf("rename ocm managed cluster unsupported")
case CredentialTypeInternal:
return nil, fmt.Errorf("internal cluster cannot be renamed")
default:
return nil, fmt.Errorf("unrecognizable credential type %T for cluster %s", vc.Spec.CredentialType, vc.Name)
}
if _, err := c.Get(ctx, newName); err == nil {
return nil, fmt.Errorf("cluster %s already exists", newName)
} else if !apierrors.IsNotFound(err) {
return nil, err
}

// rename secret
secret, ok := vc.Raw.(*corev1.Secret)
if !ok {
return nil, fmt.Errorf("unexpected underlying cluster type %T", vc.Raw)
}
newClusterSecret := secret.DeepCopy()
newClusterSecret.SetName(newName)
if err := c.Client.Create(ctx, newClusterSecret); err != nil {
return nil, fmt.Errorf("failed to create cluster %s: %w", newName, err)
}
if err := c.Client.Delete(ctx, secret); err != nil {
return nil, fmt.Errorf("failed to detach old cluster %s: %w", vc.Name, err)
}
return NewVirtualClusterFromSecret(secret)
}

func (c *virtualClusterClient) Detach(ctx context.Context, name string, options ...DetachClusterOption) (*VirtualCluster, error) {
vc, err := c.Get(ctx, name)
if err != nil {
return nil, err
}
return vc, c.DetachCluster(ctx, vc, options...)
}

func (c *virtualClusterClient) DetachCluster(ctx context.Context, vc *VirtualCluster, options ...DetachClusterOption) error {
cfg := newDetachClusterConfig(options...)
switch vc.Spec.CredentialType {
case CredentialTypeX509Certificate, CredentialTypeServiceAccountToken:
return c.Client.Delete(ctx, vc.Raw)
case CredentialTypeOCMManagedCluster:
if cfg.managedClusterKubeConfigPath == "" {
return fmt.Errorf("kubeconfig-path must be set to detach ocm managed cluster")
}
apiConfig, err := clientcmd.LoadFromFile(cfg.managedClusterKubeConfigPath)
if err != nil {
return err
}
restConfig, err := clientcmd.BuildConfigFromKubeconfigGetter("", func() (*clientcmdapi.Config, error) {
return apiConfig, nil
})
if err != nil {
return err
}
if err = spoke.CleanSpokeClusterEnv(restConfig); err != nil {
return err
}
managedCluster := &ocmclusterv1.ManagedCluster{ObjectMeta: metav1.ObjectMeta{Name: vc.Name}}
return client.IgnoreNotFound(c.Client.Delete(ctx, managedCluster))
case CredentialTypeInternal:
return fmt.Errorf("cannot delete internal cluster `local`")
default:
return fmt.Errorf("unrecognizable credential type %s for cluster %s", vc.Spec.CredentialType, vc.Name)
}
}

// virtualClusterSelector filters the list/delete operation of cluster list
type virtualClusterSelector struct {
selector labels.Selector
Expand Down
Loading

0 comments on commit 6cb023e

Please sign in to comment.