From 3a14be73ce48520e37a193a8fef9678217d8f9a0 Mon Sep 17 00:00:00 2001 From: zyy17 Date: Thu, 25 Jul 2024 15:00:16 +0800 Subject: [PATCH] feat: add maintenance mode (#164) * feat: add maintenance mode * refactor: enable region failover by default when use remote wal --- apis/v1alpha1/defaulting.go | 6 +- apis/v1alpha1/defaulting_test.go | 47 ++++- apis/v1alpha1/greptimedbcluster_types.go | 2 +- apis/v1alpha1/zz_generated.deepcopy.go | 5 + controllers/greptimedbcluster/controller.go | 5 + .../greptimedbcluster/deployers/datanode.go | 161 +++++++++++++++++- pkg/dbconfig/metasrv_config.go | 2 +- pkg/deployer/deployer.go | 19 +-- pkg/util/k8s/k8s.go | 11 ++ 9 files changed, 234 insertions(+), 24 deletions(-) diff --git a/apis/v1alpha1/defaulting.go b/apis/v1alpha1/defaulting.go index e0eae595..eae8e288 100644 --- a/apis/v1alpha1/defaulting.go +++ b/apis/v1alpha1/defaulting.go @@ -100,12 +100,16 @@ func (in *GreptimeDBCluster) SetDefaults() error { } if in.Spec.Meta != nil { + enableRegionFailover := false + if in.Spec.RemoteWalProvider != nil { // If remote wal provider is enabled, enable region failover by default. + enableRegionFailover = true + } defaultGreptimeDBClusterSpec.Meta = &MetaSpec{ ComponentSpec: ComponentSpec{ Template: &PodTemplateSpec{}, }, ServicePort: int32(defaultMetaServicePort), - EnableRegionFailover: false, + EnableRegionFailover: &enableRegionFailover, } if in.Spec.Meta.Replicas == nil { in.Spec.Meta.Replicas = proto.Int32(defaultMetaReplicas) diff --git a/apis/v1alpha1/defaulting_test.go b/apis/v1alpha1/defaulting_test.go index 30269391..c953d42b 100644 --- a/apis/v1alpha1/defaulting_test.go +++ b/apis/v1alpha1/defaulting_test.go @@ -109,7 +109,8 @@ func TestSetDefaults(t *testing.T) { }, }, }, - ServicePort: int32(defaultMetaServicePort), + ServicePort: int32(defaultMetaServicePort), + EnableRegionFailover: proto.Bool(false), }, Datanode: &DatanodeSpec{ ComponentSpec: ComponentSpec{ @@ -300,7 +301,8 @@ func TestSetDefaults(t *testing.T) { EtcdEndpoints: []string{ "etcd.default:2379", }, - ServicePort: int32(defaultMetaServicePort), + ServicePort: int32(defaultMetaServicePort), + EnableRegionFailover: proto.Bool(false), }, Datanode: &DatanodeSpec{ ComponentSpec: ComponentSpec{ @@ -437,7 +439,8 @@ func TestSetDefaults(t *testing.T) { }, }, }, - ServicePort: int32(defaultMetaServicePort), + ServicePort: int32(defaultMetaServicePort), + EnableRegionFailover: proto.Bool(false), }, Datanode: &DatanodeSpec{ ComponentSpec: ComponentSpec{ @@ -487,3 +490,41 @@ func TestSetDefaults(t *testing.T) { } } } + +func TestDefaultEnableRegionFailover(t *testing.T) { + clusterWithRemoteWAL := GreptimeDBCluster{ + Spec: GreptimeDBClusterSpec{ + Base: &PodTemplateSpec{ + MainContainer: &MainContainerSpec{ + Image: "greptime/greptimedb:latest", + }, + }, + Datanode: &DatanodeSpec{ + ComponentSpec: ComponentSpec{ + Replicas: proto.Int32(1), + }, + }, + Frontend: &FrontendSpec{ + ComponentSpec: ComponentSpec{ + Replicas: proto.Int32(1), + }, + }, + Meta: &MetaSpec{ + ComponentSpec: ComponentSpec{ + Replicas: proto.Int32(1), + }, + }, + RemoteWalProvider: &RemoteWalProvider{KafkaRemoteWal: &KafkaRemoteWal{ + BrokerEndpoints: []string{"kafka.default:9092"}, + }}, + }, + } + + if err := clusterWithRemoteWAL.SetDefaults(); err != nil { + t.Errorf("set default cluster failed: %v", err) + } + + if *clusterWithRemoteWAL.Spec.Meta.EnableRegionFailover != true { + t.Errorf("default EnableRegionFailover should be true") + } +} diff --git a/apis/v1alpha1/greptimedbcluster_types.go b/apis/v1alpha1/greptimedbcluster_types.go index dcf9863f..79cce7cc 100644 --- a/apis/v1alpha1/greptimedbcluster_types.go +++ b/apis/v1alpha1/greptimedbcluster_types.go @@ -50,7 +50,7 @@ type MetaSpec struct { // EnableRegionFailover indicates whether to enable region failover. // +optional - EnableRegionFailover bool `json:"enableRegionFailover,omitempty"` + EnableRegionFailover *bool `json:"enableRegionFailover,omitempty"` // The meta will store data with this key prefix. // +optional diff --git a/apis/v1alpha1/zz_generated.deepcopy.go b/apis/v1alpha1/zz_generated.deepcopy.go index 4f5a8d31..67543475 100644 --- a/apis/v1alpha1/zz_generated.deepcopy.go +++ b/apis/v1alpha1/zz_generated.deepcopy.go @@ -556,6 +556,11 @@ func (in *MetaSpec) DeepCopyInto(out *MetaSpec) { *out = make([]string, len(*in)) copy(*out, *in) } + if in.EnableRegionFailover != nil { + in, out := &in.EnableRegionFailover, &out.EnableRegionFailover + *out = new(bool) + **out = **in + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new MetaSpec. diff --git a/controllers/greptimedbcluster/controller.go b/controllers/greptimedbcluster/controller.go index cb2726a4..e98d0e9b 100644 --- a/controllers/greptimedbcluster/controller.go +++ b/controllers/greptimedbcluster/controller.go @@ -301,6 +301,11 @@ func (r *Reconciler) validate(ctx context.Context, cluster *v1alpha1.GreptimeDBC if err := r.validateTomlConfig(cluster.Spec.Meta.Config); err != nil { return fmt.Errorf("invalid meta toml config: %v", err) } + if cluster.Spec.Meta.EnableRegionFailover != nil && *cluster.Spec.Meta.EnableRegionFailover { + if cluster.Spec.RemoteWalProvider == nil { + return fmt.Errorf("remote wal provider must be specified when enable region failover") + } + } } if cluster.Spec.Datanode != nil { diff --git a/controllers/greptimedbcluster/deployers/datanode.go b/controllers/greptimedbcluster/deployers/datanode.go index 7d41e231..27c22dbe 100644 --- a/controllers/greptimedbcluster/deployers/datanode.go +++ b/controllers/greptimedbcluster/deployers/datanode.go @@ -17,7 +17,9 @@ package deployers import ( "context" "fmt" + "net/http" "path" + "reflect" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" @@ -34,19 +36,21 @@ import ( "github.com/GreptimeTeam/greptimedb-operator/pkg/dbconfig" "github.com/GreptimeTeam/greptimedb-operator/pkg/deployer" "github.com/GreptimeTeam/greptimedb-operator/pkg/util" - k8sutil "github.com/GreptimeTeam/greptimedb-operator/pkg/util/k8s" + k8sutils "github.com/GreptimeTeam/greptimedb-operator/pkg/util/k8s" ) // DatanodeDeployer is the deployer for datanode. type DatanodeDeployer struct { *CommonDeployer + maintenanceMode bool } var _ deployer.Deployer = &DatanodeDeployer{} func NewDatanodeDeployer(mgr ctrl.Manager) *DatanodeDeployer { return &DatanodeDeployer{ - CommonDeployer: NewFromManager(mgr), + CommonDeployer: NewFromManager(mgr), + maintenanceMode: false, } } @@ -118,7 +122,107 @@ func (d *DatanodeDeployer) CheckAndUpdateStatus(ctx context.Context, crdObject c klog.Errorf("Failed to update status: %s", err) } - return k8sutil.IsStatefulSetReady(sts), nil + return k8sutils.IsStatefulSetReady(sts), nil +} + +// Apply is re-implemented for datanode to handle the maintenance mode. +func (d *DatanodeDeployer) Apply(ctx context.Context, crdObject client.Object, objects []client.Object) error { + updateObject := false + + cluster, err := d.GetCluster(crdObject) + if err != nil { + return err + } + + for _, newObject := range objects { + oldObject, err := k8sutils.CreateObjectIfNotExist(ctx, d.Client, k8sutils.SourceObject(newObject), newObject) + if err != nil { + return err + } + + if oldObject != nil { + equal, err := k8sutils.IsObjectSpecEqual(oldObject, newObject, deployer.LastAppliedResourceSpec) + if err != nil { + return err + } + + // If the spec is not equal, update the object. + if !equal { + if sts, ok := newObject.(*appsv1.StatefulSet); ok && d.shouldUserMaintenanceMode(cluster) { + if err := d.turnOnMaintenanceMode(ctx, sts, cluster); err != nil { + return err + } + } + if err := d.Client.Patch(ctx, newObject, client.MergeFrom(oldObject)); err != nil { + return err + } + updateObject = true + } + } + } + + if updateObject { + // If the object is updated, we need to wait for the object to be ready. + // When the related object is ready, we will receive the event and enter the next reconcile loop. + return deployer.ErrSyncNotReady + } + + return nil +} + +func (d *DatanodeDeployer) PostSyncHooks() []deployer.Hook { + return []deployer.Hook{ + d.turnOffMaintenanceMode, + } +} + +func (d *DatanodeDeployer) turnOnMaintenanceMode(ctx context.Context, newSts *appsv1.StatefulSet, cluster *v1alpha1.GreptimeDBCluster) error { + oldSts := new(appsv1.StatefulSet) + // The oldSts must exist since we have checked it before. + if err := d.Get(ctx, client.ObjectKeyFromObject(newSts), oldSts); err != nil { + return err + } + + if !d.maintenanceMode && d.isOldPodRestart(*newSts, *oldSts) { + klog.Infof("Turn on maintenance mode for datanode, statefulset: %s", newSts.Name) + if err := d.requestMetasrvForMaintenance(cluster, true); err != nil { + return err + } + d.maintenanceMode = true + } + + return nil +} + +func (d *DatanodeDeployer) turnOffMaintenanceMode(ctx context.Context, crdObject client.Object) error { + cluster, err := d.GetCluster(crdObject) + if err != nil { + return err + } + + if d.maintenanceMode && d.shouldUserMaintenanceMode(cluster) { + klog.Infof("Turn off maintenance mode for datanode, cluster: %s", cluster.Name) + if err := d.requestMetasrvForMaintenance(cluster, false); err != nil { + return err + } + d.maintenanceMode = false + } + + return nil +} + +func (d *DatanodeDeployer) requestMetasrvForMaintenance(cluster *v1alpha1.GreptimeDBCluster, enabled bool) error { + requestURL := fmt.Sprintf("http://%s.%s:%d/admin/maintenance?enable=%v", common.ResourceName(cluster.GetName(), v1alpha1.MetaComponentKind), cluster.GetNamespace(), cluster.Spec.Meta.ServicePort, enabled) + rsp, err := http.Get(requestURL) + if err != nil { + return err + } + defer rsp.Body.Close() + + if rsp.StatusCode != http.StatusOK { + return fmt.Errorf("failed to turn off maintenance mode for datanode, status code: %d", rsp.StatusCode) + } + return nil } func (d *DatanodeDeployer) deleteStorage(ctx context.Context, cluster *v1alpha1.GreptimeDBCluster) error { @@ -153,6 +257,57 @@ func (d *DatanodeDeployer) deleteStorage(ctx context.Context, cluster *v1alpha1. return nil } +// isOldPodRestart checks if the existed pod needs to be restarted. For convenience, we only compare the necessary fields. +// TODO(zyy17): Do we have a easy way to implement this? +func (d *DatanodeDeployer) isOldPodRestart(new, old appsv1.StatefulSet) bool { + var ( + newPodTemplate = new.Spec.Template + oldPodTemplate = old.Spec.Template + ) + + if !reflect.DeepEqual(newPodTemplate.GetObjectMeta().GetAnnotations(), oldPodTemplate.GetObjectMeta().GetAnnotations()) { + return true + } + + if newPodTemplate.Spec.InitContainers[0].Image != oldPodTemplate.Spec.InitContainers[0].Image { + return true + } + + // If the tolerations, affinity, nodeSelector are changed, the original Pod may need to be restarted for re-scheduling. + if !reflect.DeepEqual(newPodTemplate.Spec.Tolerations, oldPodTemplate.Spec.Tolerations) || + !reflect.DeepEqual(newPodTemplate.Spec.Affinity, oldPodTemplate.Spec.Affinity) || + !reflect.DeepEqual(newPodTemplate.Spec.NodeSelector, oldPodTemplate.Spec.NodeSelector) { + return true + } + + // Compare the main container settings. + newMainContainer := newPodTemplate.Spec.Containers[constant.MainContainerIndex] + oldMainContainer := oldPodTemplate.Spec.Containers[constant.MainContainerIndex] + if newMainContainer.Image != oldMainContainer.Image { + return true + } + + if !reflect.DeepEqual(newMainContainer.Command, oldMainContainer.Command) || + !reflect.DeepEqual(newMainContainer.Args, oldMainContainer.Args) || + !reflect.DeepEqual(newMainContainer.Env, oldMainContainer.Env) || + !reflect.DeepEqual(newMainContainer.VolumeMounts, oldMainContainer.VolumeMounts) || + !reflect.DeepEqual(newMainContainer.Ports, oldMainContainer.Ports) || + !reflect.DeepEqual(newMainContainer.Resources, oldMainContainer.Resources) { + return true + } + + return false +} + +func (d *DatanodeDeployer) shouldUserMaintenanceMode(cluster *v1alpha1.GreptimeDBCluster) bool { + if cluster.Spec.RemoteWalProvider != nil && + cluster.Spec.Meta.EnableRegionFailover != nil && + *cluster.Spec.Meta.EnableRegionFailover { + return true + } + return false +} + var _ deployer.Builder = &datanodeBuilder{} type datanodeBuilder struct { diff --git a/pkg/dbconfig/metasrv_config.go b/pkg/dbconfig/metasrv_config.go index 49e7b3d6..2136ab87 100644 --- a/pkg/dbconfig/metasrv_config.go +++ b/pkg/dbconfig/metasrv_config.go @@ -42,7 +42,7 @@ type MetasrvConfig struct { // ConfigureByCluster configures the metasrv config by the given cluster. func (c *MetasrvConfig) ConfigureByCluster(cluster *v1alpha1.GreptimeDBCluster) error { if cluster.Spec.Meta != nil { - c.EnableRegionFailover = &cluster.Spec.Meta.EnableRegionFailover + c.EnableRegionFailover = cluster.Spec.Meta.EnableRegionFailover if len(cluster.Spec.Meta.StoreKeyPrefix) > 0 { c.StoreKeyPrefix = &cluster.Spec.Meta.StoreKeyPrefix diff --git a/pkg/deployer/deployer.go b/pkg/deployer/deployer.go index 96b6dd4f..ff16101f 100644 --- a/pkg/deployer/deployer.go +++ b/pkg/deployer/deployer.go @@ -18,7 +18,6 @@ import ( "context" "fmt" - "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "sigs.k8s.io/controller-runtime/pkg/client" k8sutils "github.com/GreptimeTeam/greptimedb-operator/pkg/util/k8s" @@ -50,7 +49,7 @@ type ComponentOperator interface { // Apply creates or update the Kubernetes objects that generated by Render(). // If the object is not existed, it will be created. // If the object is existed, it will be updated if the object is different. - Apply(ctx context.Context, objects []client.Object) error + Apply(ctx context.Context, crdObject client.Object, objects []client.Object) error // CleanUp cleans up the resources that created by the deployer. CleanUp(ctx context.Context, crdObject client.Object) error @@ -86,7 +85,7 @@ func (d *DefaultDeployer) Sync(ctx context.Context, crdObject client.Object, ope return err } - if err := operator.Apply(ctx, objects); err != nil { + if err := operator.Apply(ctx, crdObject, objects); err != nil { return err } @@ -110,10 +109,10 @@ func (d *DefaultDeployer) Generate(_ client.Object) ([]client.Object, error) { return nil, nil } -func (d *DefaultDeployer) Apply(ctx context.Context, objects []client.Object) error { +func (d *DefaultDeployer) Apply(ctx context.Context, _ client.Object, objects []client.Object) error { updateObject := false for _, newObject := range objects { - oldObject, err := k8sutils.CreateObjectIfNotExist(ctx, d.Client, d.sourceObject(newObject), newObject) + oldObject, err := k8sutils.CreateObjectIfNotExist(ctx, d.Client, k8sutils.SourceObject(newObject), newObject) if err != nil { return err } @@ -178,13 +177,3 @@ func (d *DefaultDeployer) onPostSync(ctx context.Context, crdObject client.Objec } return nil } - -// sourceObject create the unstructured object from the given object. -func (d *DefaultDeployer) sourceObject(input client.Object) client.Object { - u := &unstructured.Unstructured{} - - // MUST set the APIVersion and Kind. - u.SetGroupVersionKind(input.GetObjectKind().GroupVersionKind()) - - return u -} diff --git a/pkg/util/k8s/k8s.go b/pkg/util/k8s/k8s.go index cafa4437..b4228cbe 100644 --- a/pkg/util/k8s/k8s.go +++ b/pkg/util/k8s/k8s.go @@ -22,6 +22,7 @@ import ( appsv1 "k8s.io/api/apps/v1" "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -122,3 +123,13 @@ func GetK8sResource(namespace, name string, obj client.Object) error { return nil } + +// SourceObject create the unstructured object from the given object. +func SourceObject(input client.Object) client.Object { + u := &unstructured.Unstructured{} + + // MUST set the APIVersion and Kind. + u.SetGroupVersionKind(input.GetObjectKind().GroupVersionKind()) + + return u +}