Skip to content

Commit

Permalink
feat: add maintenance mode (#164)
Browse files Browse the repository at this point in the history
* feat: add maintenance mode

* refactor: enable region failover by default when use remote wal
  • Loading branch information
zyy17 authored Jul 25, 2024
1 parent f28aa62 commit 3a14be7
Show file tree
Hide file tree
Showing 9 changed files with 234 additions and 24 deletions.
6 changes: 5 additions & 1 deletion apis/v1alpha1/defaulting.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,12 +100,16 @@ func (in *GreptimeDBCluster) SetDefaults() error {
}

if in.Spec.Meta != nil {
enableRegionFailover := false
if in.Spec.RemoteWalProvider != nil { // If remote wal provider is enabled, enable region failover by default.
enableRegionFailover = true
}
defaultGreptimeDBClusterSpec.Meta = &MetaSpec{
ComponentSpec: ComponentSpec{
Template: &PodTemplateSpec{},
},
ServicePort: int32(defaultMetaServicePort),
EnableRegionFailover: false,
EnableRegionFailover: &enableRegionFailover,
}
if in.Spec.Meta.Replicas == nil {
in.Spec.Meta.Replicas = proto.Int32(defaultMetaReplicas)
Expand Down
47 changes: 44 additions & 3 deletions apis/v1alpha1/defaulting_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,8 @@ func TestSetDefaults(t *testing.T) {
},
},
},
ServicePort: int32(defaultMetaServicePort),
ServicePort: int32(defaultMetaServicePort),
EnableRegionFailover: proto.Bool(false),
},
Datanode: &DatanodeSpec{
ComponentSpec: ComponentSpec{
Expand Down Expand Up @@ -300,7 +301,8 @@ func TestSetDefaults(t *testing.T) {
EtcdEndpoints: []string{
"etcd.default:2379",
},
ServicePort: int32(defaultMetaServicePort),
ServicePort: int32(defaultMetaServicePort),
EnableRegionFailover: proto.Bool(false),
},
Datanode: &DatanodeSpec{
ComponentSpec: ComponentSpec{
Expand Down Expand Up @@ -437,7 +439,8 @@ func TestSetDefaults(t *testing.T) {
},
},
},
ServicePort: int32(defaultMetaServicePort),
ServicePort: int32(defaultMetaServicePort),
EnableRegionFailover: proto.Bool(false),
},
Datanode: &DatanodeSpec{
ComponentSpec: ComponentSpec{
Expand Down Expand Up @@ -487,3 +490,41 @@ func TestSetDefaults(t *testing.T) {
}
}
}

func TestDefaultEnableRegionFailover(t *testing.T) {
clusterWithRemoteWAL := GreptimeDBCluster{
Spec: GreptimeDBClusterSpec{
Base: &PodTemplateSpec{
MainContainer: &MainContainerSpec{
Image: "greptime/greptimedb:latest",
},
},
Datanode: &DatanodeSpec{
ComponentSpec: ComponentSpec{
Replicas: proto.Int32(1),
},
},
Frontend: &FrontendSpec{
ComponentSpec: ComponentSpec{
Replicas: proto.Int32(1),
},
},
Meta: &MetaSpec{
ComponentSpec: ComponentSpec{
Replicas: proto.Int32(1),
},
},
RemoteWalProvider: &RemoteWalProvider{KafkaRemoteWal: &KafkaRemoteWal{
BrokerEndpoints: []string{"kafka.default:9092"},
}},
},
}

if err := clusterWithRemoteWAL.SetDefaults(); err != nil {
t.Errorf("set default cluster failed: %v", err)
}

if *clusterWithRemoteWAL.Spec.Meta.EnableRegionFailover != true {
t.Errorf("default EnableRegionFailover should be true")
}
}
2 changes: 1 addition & 1 deletion apis/v1alpha1/greptimedbcluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ type MetaSpec struct {

// EnableRegionFailover indicates whether to enable region failover.
// +optional
EnableRegionFailover bool `json:"enableRegionFailover,omitempty"`
EnableRegionFailover *bool `json:"enableRegionFailover,omitempty"`

// The meta will store data with this key prefix.
// +optional
Expand Down
5 changes: 5 additions & 0 deletions apis/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions controllers/greptimedbcluster/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,11 @@ func (r *Reconciler) validate(ctx context.Context, cluster *v1alpha1.GreptimeDBC
if err := r.validateTomlConfig(cluster.Spec.Meta.Config); err != nil {
return fmt.Errorf("invalid meta toml config: %v", err)
}
if cluster.Spec.Meta.EnableRegionFailover != nil && *cluster.Spec.Meta.EnableRegionFailover {
if cluster.Spec.RemoteWalProvider == nil {
return fmt.Errorf("remote wal provider must be specified when enable region failover")
}
}
}

if cluster.Spec.Datanode != nil {
Expand Down
161 changes: 158 additions & 3 deletions controllers/greptimedbcluster/deployers/datanode.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ package deployers
import (
"context"
"fmt"
"net/http"
"path"
"reflect"

appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
Expand All @@ -34,19 +36,21 @@ import (
"github.com/GreptimeTeam/greptimedb-operator/pkg/dbconfig"
"github.com/GreptimeTeam/greptimedb-operator/pkg/deployer"
"github.com/GreptimeTeam/greptimedb-operator/pkg/util"
k8sutil "github.com/GreptimeTeam/greptimedb-operator/pkg/util/k8s"
k8sutils "github.com/GreptimeTeam/greptimedb-operator/pkg/util/k8s"
)

// DatanodeDeployer is the deployer for datanode.
type DatanodeDeployer struct {
*CommonDeployer
maintenanceMode bool
}

var _ deployer.Deployer = &DatanodeDeployer{}

func NewDatanodeDeployer(mgr ctrl.Manager) *DatanodeDeployer {
return &DatanodeDeployer{
CommonDeployer: NewFromManager(mgr),
CommonDeployer: NewFromManager(mgr),
maintenanceMode: false,
}
}

Expand Down Expand Up @@ -118,7 +122,107 @@ func (d *DatanodeDeployer) CheckAndUpdateStatus(ctx context.Context, crdObject c
klog.Errorf("Failed to update status: %s", err)
}

return k8sutil.IsStatefulSetReady(sts), nil
return k8sutils.IsStatefulSetReady(sts), nil
}

// Apply is re-implemented for datanode to handle the maintenance mode.
func (d *DatanodeDeployer) Apply(ctx context.Context, crdObject client.Object, objects []client.Object) error {
updateObject := false

cluster, err := d.GetCluster(crdObject)
if err != nil {
return err
}

for _, newObject := range objects {
oldObject, err := k8sutils.CreateObjectIfNotExist(ctx, d.Client, k8sutils.SourceObject(newObject), newObject)
if err != nil {
return err
}

if oldObject != nil {
equal, err := k8sutils.IsObjectSpecEqual(oldObject, newObject, deployer.LastAppliedResourceSpec)
if err != nil {
return err
}

// If the spec is not equal, update the object.
if !equal {
if sts, ok := newObject.(*appsv1.StatefulSet); ok && d.shouldUserMaintenanceMode(cluster) {
if err := d.turnOnMaintenanceMode(ctx, sts, cluster); err != nil {
return err
}
}
if err := d.Client.Patch(ctx, newObject, client.MergeFrom(oldObject)); err != nil {
return err
}
updateObject = true
}
}
}

if updateObject {
// If the object is updated, we need to wait for the object to be ready.
// When the related object is ready, we will receive the event and enter the next reconcile loop.
return deployer.ErrSyncNotReady
}

return nil
}

func (d *DatanodeDeployer) PostSyncHooks() []deployer.Hook {
return []deployer.Hook{
d.turnOffMaintenanceMode,
}
}

func (d *DatanodeDeployer) turnOnMaintenanceMode(ctx context.Context, newSts *appsv1.StatefulSet, cluster *v1alpha1.GreptimeDBCluster) error {
oldSts := new(appsv1.StatefulSet)
// The oldSts must exist since we have checked it before.
if err := d.Get(ctx, client.ObjectKeyFromObject(newSts), oldSts); err != nil {
return err
}

if !d.maintenanceMode && d.isOldPodRestart(*newSts, *oldSts) {
klog.Infof("Turn on maintenance mode for datanode, statefulset: %s", newSts.Name)
if err := d.requestMetasrvForMaintenance(cluster, true); err != nil {
return err
}
d.maintenanceMode = true
}

return nil
}

func (d *DatanodeDeployer) turnOffMaintenanceMode(ctx context.Context, crdObject client.Object) error {
cluster, err := d.GetCluster(crdObject)
if err != nil {
return err
}

if d.maintenanceMode && d.shouldUserMaintenanceMode(cluster) {
klog.Infof("Turn off maintenance mode for datanode, cluster: %s", cluster.Name)
if err := d.requestMetasrvForMaintenance(cluster, false); err != nil {
return err
}
d.maintenanceMode = false
}

return nil
}

func (d *DatanodeDeployer) requestMetasrvForMaintenance(cluster *v1alpha1.GreptimeDBCluster, enabled bool) error {
requestURL := fmt.Sprintf("http://%s.%s:%d/admin/maintenance?enable=%v", common.ResourceName(cluster.GetName(), v1alpha1.MetaComponentKind), cluster.GetNamespace(), cluster.Spec.Meta.ServicePort, enabled)
rsp, err := http.Get(requestURL)
if err != nil {
return err
}
defer rsp.Body.Close()

if rsp.StatusCode != http.StatusOK {
return fmt.Errorf("failed to turn off maintenance mode for datanode, status code: %d", rsp.StatusCode)
}
return nil
}

func (d *DatanodeDeployer) deleteStorage(ctx context.Context, cluster *v1alpha1.GreptimeDBCluster) error {
Expand Down Expand Up @@ -153,6 +257,57 @@ func (d *DatanodeDeployer) deleteStorage(ctx context.Context, cluster *v1alpha1.
return nil
}

// isOldPodRestart checks if the existed pod needs to be restarted. For convenience, we only compare the necessary fields.
// TODO(zyy17): Do we have a easy way to implement this?
func (d *DatanodeDeployer) isOldPodRestart(new, old appsv1.StatefulSet) bool {
var (
newPodTemplate = new.Spec.Template
oldPodTemplate = old.Spec.Template
)

if !reflect.DeepEqual(newPodTemplate.GetObjectMeta().GetAnnotations(), oldPodTemplate.GetObjectMeta().GetAnnotations()) {
return true
}

if newPodTemplate.Spec.InitContainers[0].Image != oldPodTemplate.Spec.InitContainers[0].Image {
return true
}

// If the tolerations, affinity, nodeSelector are changed, the original Pod may need to be restarted for re-scheduling.
if !reflect.DeepEqual(newPodTemplate.Spec.Tolerations, oldPodTemplate.Spec.Tolerations) ||
!reflect.DeepEqual(newPodTemplate.Spec.Affinity, oldPodTemplate.Spec.Affinity) ||
!reflect.DeepEqual(newPodTemplate.Spec.NodeSelector, oldPodTemplate.Spec.NodeSelector) {
return true
}

// Compare the main container settings.
newMainContainer := newPodTemplate.Spec.Containers[constant.MainContainerIndex]
oldMainContainer := oldPodTemplate.Spec.Containers[constant.MainContainerIndex]
if newMainContainer.Image != oldMainContainer.Image {
return true
}

if !reflect.DeepEqual(newMainContainer.Command, oldMainContainer.Command) ||
!reflect.DeepEqual(newMainContainer.Args, oldMainContainer.Args) ||
!reflect.DeepEqual(newMainContainer.Env, oldMainContainer.Env) ||
!reflect.DeepEqual(newMainContainer.VolumeMounts, oldMainContainer.VolumeMounts) ||
!reflect.DeepEqual(newMainContainer.Ports, oldMainContainer.Ports) ||
!reflect.DeepEqual(newMainContainer.Resources, oldMainContainer.Resources) {
return true
}

return false
}

func (d *DatanodeDeployer) shouldUserMaintenanceMode(cluster *v1alpha1.GreptimeDBCluster) bool {
if cluster.Spec.RemoteWalProvider != nil &&
cluster.Spec.Meta.EnableRegionFailover != nil &&
*cluster.Spec.Meta.EnableRegionFailover {
return true
}
return false
}

var _ deployer.Builder = &datanodeBuilder{}

type datanodeBuilder struct {
Expand Down
2 changes: 1 addition & 1 deletion pkg/dbconfig/metasrv_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type MetasrvConfig struct {
// ConfigureByCluster configures the metasrv config by the given cluster.
func (c *MetasrvConfig) ConfigureByCluster(cluster *v1alpha1.GreptimeDBCluster) error {
if cluster.Spec.Meta != nil {
c.EnableRegionFailover = &cluster.Spec.Meta.EnableRegionFailover
c.EnableRegionFailover = cluster.Spec.Meta.EnableRegionFailover

if len(cluster.Spec.Meta.StoreKeyPrefix) > 0 {
c.StoreKeyPrefix = &cluster.Spec.Meta.StoreKeyPrefix
Expand Down
19 changes: 4 additions & 15 deletions pkg/deployer/deployer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"context"
"fmt"

"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"sigs.k8s.io/controller-runtime/pkg/client"

k8sutils "github.com/GreptimeTeam/greptimedb-operator/pkg/util/k8s"
Expand Down Expand Up @@ -50,7 +49,7 @@ type ComponentOperator interface {
// Apply creates or update the Kubernetes objects that generated by Render().
// If the object is not existed, it will be created.
// If the object is existed, it will be updated if the object is different.
Apply(ctx context.Context, objects []client.Object) error
Apply(ctx context.Context, crdObject client.Object, objects []client.Object) error

// CleanUp cleans up the resources that created by the deployer.
CleanUp(ctx context.Context, crdObject client.Object) error
Expand Down Expand Up @@ -86,7 +85,7 @@ func (d *DefaultDeployer) Sync(ctx context.Context, crdObject client.Object, ope
return err
}

if err := operator.Apply(ctx, objects); err != nil {
if err := operator.Apply(ctx, crdObject, objects); err != nil {
return err
}

Expand All @@ -110,10 +109,10 @@ func (d *DefaultDeployer) Generate(_ client.Object) ([]client.Object, error) {
return nil, nil
}

func (d *DefaultDeployer) Apply(ctx context.Context, objects []client.Object) error {
func (d *DefaultDeployer) Apply(ctx context.Context, _ client.Object, objects []client.Object) error {
updateObject := false
for _, newObject := range objects {
oldObject, err := k8sutils.CreateObjectIfNotExist(ctx, d.Client, d.sourceObject(newObject), newObject)
oldObject, err := k8sutils.CreateObjectIfNotExist(ctx, d.Client, k8sutils.SourceObject(newObject), newObject)
if err != nil {
return err
}
Expand Down Expand Up @@ -178,13 +177,3 @@ func (d *DefaultDeployer) onPostSync(ctx context.Context, crdObject client.Objec
}
return nil
}

// sourceObject create the unstructured object from the given object.
func (d *DefaultDeployer) sourceObject(input client.Object) client.Object {
u := &unstructured.Unstructured{}

// MUST set the APIVersion and Kind.
u.SetGroupVersionKind(input.GetObjectKind().GroupVersionKind())

return u
}
Loading

0 comments on commit 3a14be7

Please sign in to comment.