diff --git a/deploy/cluster/cluster_role.yaml b/deploy/cluster/cluster_role.yaml index 8aa1a3cca..0796f7f74 100644 --- a/deploy/cluster/cluster_role.yaml +++ b/deploy/cluster/cluster_role.yaml @@ -19,6 +19,7 @@ rules: - "" resources: - configmaps + - pods/exec - secrets - services - events diff --git a/deploy/namespace/role.yaml b/deploy/namespace/role.yaml index b92bca55d..6248880de 100644 --- a/deploy/namespace/role.yaml +++ b/deploy/namespace/role.yaml @@ -19,6 +19,7 @@ rules: - "" resources: - configmaps + - pods/exec - secrets - services - events diff --git a/go.sum b/go.sum index 7bdd898aa..9e41eb1a0 100644 --- a/go.sum +++ b/go.sum @@ -203,6 +203,7 @@ github.com/docker/go-units v0.3.3/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDD github.com/docker/libnetwork v0.0.0-20180830151422-a9cd636e3789/go.mod h1:93m0aTqz6z+g32wla4l4WxTrdtvBRmVzYRkYvasA5Z8= github.com/docker/libtrust v0.0.0-20150526203908-9cbd2a1374f4/go.mod h1:cyGadeNEkKy96OOhEzfZl+yxihPEzKnqJwvfuSUqbZE= github.com/docker/spdystream v0.0.0-20160310174837-449fdfce4d96/go.mod h1:Qh8CwZgvJUkLughtfhJv5dyTYa91l1fOUCrgjqmcifM= +github.com/docker/spdystream v0.0.0-20181023171402-6480d4af844c h1:ZfSZ3P3BedhKGUhzj7BQlPSU4OvT6tfOKe3DVHzOA7s= github.com/docker/spdystream v0.0.0-20181023171402-6480d4af844c/go.mod h1:Qh8CwZgvJUkLughtfhJv5dyTYa91l1fOUCrgjqmcifM= github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815/go.mod h1:WwZ+bS3ebgob9U8Nd0kOddGdZWjyMGR8Wziv+TBNwSE= github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= @@ -211,7 +212,9 @@ github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1 github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= github.com/edsrzf/mmap-go v0.0.0-20170320065105-0bce6a688712/go.mod h1:YO35OhQPt3KJa3ryjFM5Bs14WD66h8eGKpfaBNrHW5M= github.com/elazarl/goproxy v0.0.0-20170405201442-c4fc26588b6e/go.mod h1:/Zj4wYkgs4iZTTu3o/KG3Itv/qCCa8VVMlb3i9OVuzc= +github.com/elazarl/goproxy v0.0.0-20190421051319-9d40249d3c2f h1:8GDPb0tCY8LQ+OJ3dbHb5sA6YZWXFORQYZx5sdsTlMs= github.com/elazarl/goproxy v0.0.0-20190421051319-9d40249d3c2f/go.mod h1:/Zj4wYkgs4iZTTu3o/KG3Itv/qCCa8VVMlb3i9OVuzc= +github.com/elazarl/goproxy/ext v0.0.0-20190421051319-9d40249d3c2f h1:AUj1VoZUfhPhOPHULCQQDnGhRelpFWHMLhQVWDsS0v4= github.com/elazarl/goproxy/ext v0.0.0-20190421051319-9d40249d3c2f/go.mod h1:gNh8nYJoAm43RfaxurUnxr+N1PwuFV3ZMl/efxlIlY8= github.com/emicklei/go-restful v0.0.0-20170410110728-ff4f55a20633/go.mod h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs= github.com/emicklei/go-restful v2.9.3+incompatible/go.mod h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs= diff --git a/pkg/apis/redis/v1alpha1/constants.go b/pkg/apis/redis/v1alpha1/constants.go index 410ab52e8..6be886f7d 100644 --- a/pkg/apis/redis/v1alpha1/constants.go +++ b/pkg/apis/redis/v1alpha1/constants.go @@ -45,6 +45,8 @@ const ( ClusterStatusRebalancing ClusterStatus = "Rebalancing" // ClusterStatusRollingUpdate ClusterStatus RollingUpdate ClusterStatusRollingUpdate ClusterStatus = "RollingUpdate" + // ClusterStatusResetPassword ClusterStatus ResetPassword + ClusterStatusResetPassword ClusterStatus = "ResetPassword" ) // NodesPlacementInfo Redis Nodes placement mode information diff --git a/pkg/controller/distributedrediscluster/distributedrediscluster_controller.go b/pkg/controller/distributedrediscluster/distributedrediscluster_controller.go index 5ecd62645..cdac41db7 100644 --- a/pkg/controller/distributedrediscluster/distributedrediscluster_controller.go +++ b/pkg/controller/distributedrediscluster/distributedrediscluster_controller.go @@ -8,7 +8,11 @@ import ( "github.com/spf13/pflag" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime" + runtimeschema "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/runtime/serializer" + "k8s.io/client-go/kubernetes/scheme" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/apiutil" "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" @@ -22,6 +26,7 @@ import ( "github.com/ucloud/redis-cluster-operator/pkg/config" "github.com/ucloud/redis-cluster-operator/pkg/controller/heal" clustermanger "github.com/ucloud/redis-cluster-operator/pkg/controller/manager" + "github.com/ucloud/redis-cluster-operator/pkg/exec" "github.com/ucloud/redis-cluster-operator/pkg/k8sutil" "github.com/ucloud/redis-cluster-operator/pkg/redisutil" "github.com/ucloud/redis-cluster-operator/pkg/resources/statefulsets" @@ -56,11 +61,22 @@ func FlagSet() *pflag.FlagSet { // Add creates a new DistributedRedisCluster Controller and adds it to the Manager. The Manager will set fields on the Controller // and Start it when the Manager is Started. func Add(mgr manager.Manager) error { - return add(mgr, newReconciler(mgr)) + gvk := runtimeschema.GroupVersionKind{ + Group: "", + Version: "v1", + Kind: "Pod", + } + restClient, err := apiutil.RESTClientForGVK(gvk, mgr.GetConfig(), serializer.NewCodecFactory(scheme.Scheme)) + if err != nil { + return err + } + execer := exec.NewRemoteExec(restClient, mgr.GetConfig(), log) + + return add(mgr, newReconciler(mgr, execer)) } // newReconciler returns a new reconcile.Reconciler -func newReconciler(mgr manager.Manager) reconcile.Reconciler { +func newReconciler(mgr manager.Manager, execer exec.IExec) reconcile.Reconciler { reconiler := &ReconcileDistributedRedisCluster{client: mgr.GetClient(), scheme: mgr.GetScheme()} reconiler.statefulSetController = k8sutil.NewStatefulSetController(reconiler.client) reconiler.serviceController = k8sutil.NewServiceController(reconiler.client) @@ -69,6 +85,7 @@ func newReconciler(mgr manager.Manager) reconcile.Reconciler { reconiler.crController = k8sutil.NewCRControl(reconiler.client) reconiler.ensurer = clustermanger.NewEnsureResource(reconiler.client, log) reconiler.checker = clustermanger.NewCheck(reconiler.client) + reconiler.execer = execer return reconiler } @@ -137,6 +154,7 @@ type ReconcileDistributedRedisCluster struct { scheme *runtime.Scheme ensurer clustermanger.IEnsureResource checker clustermanger.ICheck + execer exec.IExec statefulSetController k8sutil.IStatefulSetControl serviceController k8sutil.IServiceControl pdbController k8sutil.IPodDisruptionBudgetControl diff --git a/pkg/controller/distributedrediscluster/status.go b/pkg/controller/distributedrediscluster/status.go index 44df692c8..ff83c461f 100644 --- a/pkg/controller/distributedrediscluster/status.go +++ b/pkg/controller/distributedrediscluster/status.go @@ -38,6 +38,11 @@ func SetClusterUpdating(status *redisv1alpha1.DistributedRedisClusterStatus, rea status.Reason = reason } +func SetClusterResetPassword(status *redisv1alpha1.DistributedRedisClusterStatus, reason string) { + status.Status = redisv1alpha1.ClusterStatusResetPassword + status.Reason = reason +} + func buildClusterStatus(clusterInfos *redisutil.ClusterInfos, pods []*corev1.Pod, cluster *redisv1alpha1.DistributedRedisCluster, reqLogger logr.Logger) *redisv1alpha1.DistributedRedisClusterStatus { oldStatus := cluster.Status diff --git a/pkg/controller/distributedrediscluster/sync_handler.go b/pkg/controller/distributedrediscluster/sync_handler.go index 78deb2f0e..d2968c6ae 100644 --- a/pkg/controller/distributedrediscluster/sync_handler.go +++ b/pkg/controller/distributedrediscluster/sync_handler.go @@ -326,6 +326,9 @@ func (r *ReconcileDistributedRedisCluster) resetClusterPassword(ctx *syncContext return nil } + SetClusterResetPassword(&ctx.cluster.Status, "updating cluster's password") + r.crController.UpdateCRStatus(ctx.cluster) + matchLabels := getLabels(ctx.cluster) redisClusterPods, err := r.statefulSetController.GetStatefulSetPodsByLabels(namespace, matchLabels) if err != nil { @@ -342,12 +345,21 @@ func (r *ReconcileDistributedRedisCluster) resetClusterPassword(ctx *syncContext return err } - admin, err := newRedisAdmin(clusterPods(redisClusterPods.Items), oldPassword, config.RedisConf(), ctx.reqLogger) + podSet := clusterPods(redisClusterPods.Items) + admin, err := newRedisAdmin(podSet, oldPassword, config.RedisConf(), ctx.reqLogger) if err != nil { return err } defer admin.Close() + // Update the password recorded in the file /etc/redis_password, redis pod preStop hook + // need /etc/redis_password do CLUSTER FAILOVER + cmd := fmt.Sprintf("echo %s > /etc/redis_password", newPassword) + if err := r.execer.ExecCommandInPodSet(podSet, "/bin/sh", "-c", cmd); err != nil { + return err + } + + // Reset all redis pod's password. if err := admin.ResetPassword(newPassword); err != nil { return err } diff --git a/pkg/exec/exec.go b/pkg/exec/exec.go new file mode 100644 index 000000000..643465c9a --- /dev/null +++ b/pkg/exec/exec.go @@ -0,0 +1,129 @@ +package exec + +import ( + "bytes" + "io" + "net/url" + "strings" + + "github.com/go-logr/logr" + corev1 "k8s.io/api/core/v1" + "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/remotecommand" +) + +// IExec is an injectable interface for running remote exec commands. +type IExec interface { + // ExecCommandInPodSet exec cmd in pod set. + ExecCommandInPodSet(podSet []*corev1.Pod, cmd ...string) error +} + +type remoteExec struct { + restGVKClient rest.Interface + logger logr.Logger + config *rest.Config +} + +// NewRemoteExec returns a new IExec which will exec remote cmd. +func NewRemoteExec(restGVKClient rest.Interface, config *rest.Config, logger logr.Logger) IExec { + return &remoteExec{ + restGVKClient: restGVKClient, + logger: logger, + config: config, + } +} + +// ExecOptions passed to ExecWithOptions. +type ExecOptions struct { + Command []string + + Namespace string + PodName string + ContainerName string + + Stdin io.Reader + CaptureStdout bool + CaptureStderr bool + // If false, whitespace in std{err,out} will be removed. + PreserveWhitespace bool +} + +// ExecCommandInPodSet implements IExec interface. +func (e *remoteExec) ExecCommandInPodSet(podSet []*corev1.Pod, cmd ...string) error { + for _, pod := range podSet { + if _, err := e.ExecCommandInContainer(pod.Namespace, pod.Name, pod.Spec.Containers[0].Name, cmd...); err != nil { + return err + } + } + return nil +} + +// ExecCommandInContainer executes a command in the specified container. +func (e *remoteExec) ExecCommandInContainer(namespace, podName, containerName string, cmd ...string) (string, error) { + stdout, stderr, err := e.ExecCommandInContainerWithFullOutput(namespace, podName, containerName, cmd...) + if stderr != "" { + e.logger.Info("ExecCommand", "command", cmd, "stderr", stderr) + } + return stdout, err +} + +// ExecCommandInContainerWithFullOutput executes a command in the +// specified container and return stdout, stderr and error +func (e *remoteExec) ExecCommandInContainerWithFullOutput(namespace, podName, containerName string, cmd ...string) (string, string, error) { + return e.ExecWithOptions(ExecOptions{ + Command: cmd, + Namespace: namespace, + PodName: podName, + ContainerName: containerName, + + Stdin: nil, + CaptureStdout: true, + CaptureStderr: true, + PreserveWhitespace: false, + }) +} + +// ExecWithOptions executes a command in the specified container, +// returning stdout, stderr and error. `options` allowed for +// additional parameters to be passed. +func (e *remoteExec) ExecWithOptions(options ExecOptions) (string, string, error) { + const tty = false + + req := e.restGVKClient.Post(). + Resource("pods"). + Name(options.PodName). + Namespace(options.Namespace). + SubResource("exec"). + Param("container", options.ContainerName) + + req.VersionedParams(&corev1.PodExecOptions{ + Container: options.ContainerName, + Command: options.Command, + Stdin: options.Stdin != nil, + Stdout: options.CaptureStdout, + Stderr: options.CaptureStderr, + TTY: tty, + }, scheme.ParameterCodec) + + var stdout, stderr bytes.Buffer + err := execute("POST", req.URL(), e.config, options.Stdin, &stdout, &stderr, tty) + + if options.PreserveWhitespace { + return stdout.String(), stderr.String(), err + } + return strings.TrimSpace(stdout.String()), strings.TrimSpace(stderr.String()), err +} + +func execute(method string, url *url.URL, config *rest.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool) error { + exec, err := remotecommand.NewSPDYExecutor(config, method, url) + if err != nil { + return err + } + return exec.Stream(remotecommand.StreamOptions{ + Stdin: stdin, + Stdout: stdout, + Stderr: stderr, + Tty: tty, + }) +} diff --git a/pkg/resources/configmaps/configmap.go b/pkg/resources/configmaps/configmap.go index fd7a450a5..07b05bef7 100644 --- a/pkg/resources/configmaps/configmap.go +++ b/pkg/resources/configmaps/configmap.go @@ -24,22 +24,23 @@ func NewConfigMapForCR(cluster *redisv1alpha1.DistributedRedisCluster, labels ma shutdownContent := `#!/bin/sh CLUSTER_CONFIG="/data/nodes.conf" failover() { - echo "Do CLUSTER FAILOVER" - masterID=$(cat ${CLUSTER_CONFIG} | grep "myself" | awk '{print $1}') - echo "Master: ${masterID}" - slave=$(cat ${CLUSTER_CONFIG} | grep ${masterID} | grep "slave" | awk 'NR==1{print $2}' | sed 's/:6379@16379//') - echo "Slave: ${slave}" - if [[ -z "${REDIS_PASSWORD}" ]]; then - redis-cli -h ${slave} CLUSTER FAILOVER - else - redis-cli -h ${slave} -a "${REDIS_PASSWORD}" CLUSTER FAILOVER - fi - echo "Wait for MASTER <-> SLAVE syncFinished" - sleep 20 + echo "Do CLUSTER FAILOVER" + masterID=$(cat ${CLUSTER_CONFIG} | grep "myself" | awk '{print $1}') + echo "Master: ${masterID}" + slave=$(cat ${CLUSTER_CONFIG} | grep ${masterID} | grep "slave" | awk 'NR==1{print $2}' | sed 's/:6379@16379//') + echo "Slave: ${slave}" + password=$(cat /etc/redis_password) + if [[ -z "${password}" ]]; then + redis-cli -h ${slave} CLUSTER FAILOVER + else + redis-cli -h ${slave} -a "${password}" CLUSTER FAILOVER + fi + echo "Wait for MASTER <-> SLAVE syncFinished" + sleep 20 } if [ -f ${CLUSTER_CONFIG} ]; then - cat ${CLUSTER_CONFIG} | grep "myself" | grep "master" && \ - failover + cat ${CLUSTER_CONFIG} | grep "myself" | grep "master" && \ + failover fi` // Fixed Nodes.conf does not update IP address of a node when IP changes after restart, diff --git a/pkg/resources/statefulsets/statefulset.go b/pkg/resources/statefulsets/statefulset.go index 9bc6177fa..2591f1889 100644 --- a/pkg/resources/statefulsets/statefulset.go +++ b/pkg/resources/statefulsets/statefulset.go @@ -258,6 +258,11 @@ func redisServerContainer(cluster *redisv1alpha1.DistributedRedisCluster, passwo Resources: *cluster.Spec.Resources, // TODO store redis data when pod stop Lifecycle: &corev1.Lifecycle{ + PostStart: &corev1.Handler{ + Exec: &corev1.ExecAction{ + Command: []string{"/bin/sh", "-c", "echo ${REDIS_PASSWORD} > /etc/redis_password"}, + }, + }, PreStop: &corev1.Handler{ Exec: &corev1.ExecAction{ Command: []string{"/bin/sh", "/conf/shutdown.sh"},