From 1a3832ba1ee9e723ede0fa692117e9ec7d83aac9 Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Wed, 5 Oct 2022 17:33:15 +0100 Subject: [PATCH] fix(kuma-cp): remove Dataplane for Pod without IP (backport #4964) (#5097) fix(kuma-cp): remove Dataplane for Pod without IP (#4964) Signed-off-by: Jakub Dyszkiewicz Signed-off-by: Ilya Lobkov # Conflicts: # pkg/plugins/runtime/k8s/controllers/pod_status_controller.go # test/e2e/graceful/eviction.go # test/e2e_env/kubernetes/jobs/jobs.go # test/e2e_env/kubernetes/kubernetes_suite_test.go Co-authored-by: Jakub Dyszkiewicz --- .../k8s/controllers/gateway_converter.go | 4 +- .../runtime/k8s/controllers/pod_controller.go | 185 +++++++++++------- .../k8s/controllers/pod_status_controller.go | 15 +- 3 files changed, 124 insertions(+), 80 deletions(-) diff --git a/pkg/plugins/runtime/k8s/controllers/gateway_converter.go b/pkg/plugins/runtime/k8s/controllers/gateway_converter.go index c5f3c86a0a57..d5aac6d38878 100644 --- a/pkg/plugins/runtime/k8s/controllers/gateway_converter.go +++ b/pkg/plugins/runtime/k8s/controllers/gateway_converter.go @@ -62,8 +62,8 @@ func (r *PodReconciler) createorUpdateBuiltinGatewayDataplane(ctx context.Contex return nil }) + log := r.Log.WithValues("pod", kube_types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name}) if err != nil { - log := r.Log.WithValues("pod", kube_types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name}) log.Error(err, "unable to create/update Dataplane", "operationResult", operationResult) r.EventRecorder.Eventf(pod, kube_core.EventTypeWarning, FailedToGenerateKumaDataplaneReason, "Failed to generate Kuma Dataplane: %s", err.Error()) return err @@ -71,8 +71,10 @@ func (r *PodReconciler) createorUpdateBuiltinGatewayDataplane(ctx context.Contex switch operationResult { case kube_controllerutil.OperationResultCreated: + log.Info("Dataplane created") r.EventRecorder.Eventf(pod, kube_core.EventTypeNormal, CreatedKumaDataplaneReason, "Created Kuma Dataplane: %s", dataplane.Name) case kube_controllerutil.OperationResultUpdated: + log.Info("Dataplane updated") r.EventRecorder.Eventf(pod, kube_core.EventTypeNormal, UpdatedKumaDataplaneReason, "Updated Kuma Dataplane: %s", dataplane.Name) } return nil diff --git a/pkg/plugins/runtime/k8s/controllers/pod_controller.go b/pkg/plugins/runtime/k8s/controllers/pod_controller.go index 065dbc4a4885..8ec37a974538 100644 --- a/pkg/plugins/runtime/k8s/controllers/pod_controller.go +++ b/pkg/plugins/runtime/k8s/controllers/pod_controller.go @@ -23,6 +23,7 @@ import ( "github.com/kumahq/kuma/pkg/dns/vips" k8s_common "github.com/kumahq/kuma/pkg/plugins/common/k8s" mesh_k8s "github.com/kumahq/kuma/pkg/plugins/resources/k8s/native/api/v1alpha1" + k8s_model "github.com/kumahq/kuma/pkg/plugins/resources/k8s/native/pkg/model" "github.com/kumahq/kuma/pkg/plugins/runtime/k8s/metadata" util_k8s "github.com/kumahq/kuma/pkg/plugins/runtime/k8s/util" ) @@ -53,45 +54,26 @@ type PodReconciler struct { func (r *PodReconciler) Reconcile(ctx context.Context, req kube_ctrl.Request) (kube_ctrl.Result, error) { log := r.Log.WithValues("pod", req.NamespacedName) + log.V(1).Info("reconcile") // Fetch the Pod instance pod := &kube_core.Pod{} if err := r.Get(ctx, req.NamespacedName, pod); err != nil { if kube_apierrs.IsNotFound(err) { + log.V(1).Info("pod not found. Skipping") return kube_ctrl.Result{}, nil } log.Error(err, "unable to fetch Pod") return kube_ctrl.Result{}, err } - // skip a Pod if it doesn't have an IP address yet - if pod.Status.PodIP == "" { - return kube_ctrl.Result{}, nil - } - - // skip a Pod if is complete/terminated (most probably a completed job) - if r.isPodComplete(pod) { - return kube_ctrl.Result{}, nil - } - // for Pods marked with ingress annotation special type of Dataplane will be injected enabled, exist, err := metadata.Annotations(pod.Annotations).GetEnabled(metadata.KumaIngressAnnotation) if err != nil { return kube_ctrl.Result{}, err } if exist && enabled { - if pod.Namespace != r.SystemNamespace { - return kube_ctrl.Result{}, errors.Errorf("Ingress can only be deployed in system namespace %q", r.SystemNamespace) - } - services, err := r.findMatchingServices(ctx, pod) - if err != nil { - return kube_ctrl.Result{}, err - } - err = r.createOrUpdateIngress(ctx, pod, services) - if err != nil { - return kube_ctrl.Result{}, err - } - return kube_ctrl.Result{}, nil + return kube_ctrl.Result{}, r.reconcileZoneIngress(ctx, pod, log) } // for Pods marked with egress annotation special type of Dataplane will be injected @@ -100,58 +82,135 @@ func (r *PodReconciler) Reconcile(ctx context.Context, req kube_ctrl.Request) (k return kube_ctrl.Result{}, err } if egressExist && egressEnabled { - if pod.Namespace != r.SystemNamespace { - return kube_ctrl.Result{}, errors.Errorf("Egress can only be deployed in system namespace %q", r.SystemNamespace) - } - services, err := r.findMatchingServices(ctx, pod) - if err != nil { - return kube_ctrl.Result{}, err - } - err = r.createOrUpdateEgress(ctx, pod, services) - if err != nil { - return kube_ctrl.Result{}, err - } - - return kube_ctrl.Result{}, nil - } - - ns := kube_core.Namespace{} - if err := r.Client.Get(ctx, kube_types.NamespacedName{Name: pod.Namespace}, &ns); err != nil { - return kube_ctrl.Result{}, errors.Wrap(err, "unable to get Namespace for Pod") + return kube_ctrl.Result{}, r.reconcileZoneEgress(ctx, pod, log) } // If we are using a builtin gateway, we want to generate a builtin gateway // dataplane. if name, _ := metadata.Annotations(pod.Annotations).GetString(metadata.KumaGatewayAnnotation); name == metadata.AnnotationBuiltin { - return kube_ctrl.Result{}, r.createorUpdateBuiltinGatewayDataplane(ctx, pod, &ns) + return kube_ctrl.Result{}, r.reconcileBuiltinGatewayDataplane(ctx, pod, log) } // only Pods with injected Kuma need a Dataplane descriptor - injected, exist, err := metadata.Annotations(pod.Annotations).GetEnabled(metadata.KumaSidecarInjectedAnnotation) + injected, _, err := metadata.Annotations(pod.Annotations).GetEnabled(metadata.KumaSidecarInjectedAnnotation) if err != nil { return kube_ctrl.Result{}, err } - if !exist || !injected { - return kube_ctrl.Result{}, nil + if injected { + return kube_ctrl.Result{}, r.reconcileDataplane(ctx, pod, log) + } + + return kube_ctrl.Result{}, nil +} + +func (r *PodReconciler) reconcileDataplane(ctx context.Context, pod *kube_core.Pod, log logr.Logger) error { + dp := &mesh_k8s.Dataplane{ + ObjectMeta: kube_meta.ObjectMeta{Name: pod.Name, Namespace: pod.Namespace}, + } + if pod.Status.Phase == kube_core.PodSucceeded { + // Remove Dataplane object for Pods that are indefinitely in Succeeded phase, i.e. Jobs + return r.deleteObjectIfExist(ctx, dp, "pod succeeded", log) + } + if pod.Status.PodIP == "" { + return r.deleteObjectIfExist(ctx, dp, "pod IP is empty", log) + } + + ns := kube_core.Namespace{} + if err := r.Client.Get(ctx, kube_types.NamespacedName{Name: pod.Namespace}, &ns); err != nil { + return errors.Wrap(err, "unable to get Namespace for Pod") } services, err := r.findMatchingServices(ctx, pod) if err != nil { - return kube_ctrl.Result{}, err + return err } others, err := r.findOtherDataplanes(ctx, pod, &ns) if err != nil { - return kube_ctrl.Result{}, err + return err } - r.Log.WithValues("req", req).V(1).Info("other dataplanes", "others", others) - if err := r.createOrUpdateDataplane(ctx, pod, &ns, services, others); err != nil { - return kube_ctrl.Result{}, err + return err } + return nil +} - return kube_ctrl.Result{}, nil +func (r *PodReconciler) deleteObjectIfExist(ctx context.Context, object k8s_model.KubernetesObject, cause string, log logr.Logger) error { + log = log.WithValues( + "cause", cause, + "kind", object.GetObjectKind(), + "name", object.GetName(), + "namespace", object.GetNamespace(), + ) + if err := r.Client.Delete(ctx, object); err != nil { + if kube_apierrs.IsNotFound(err) { + log.V(1).Info("Object is not found, nothing to delete") + return nil + } + return errors.Wrapf(err, "could not delete %v %s/%s", object.GetObjectKind(), object.GetName(), object.GetNamespace()) + } + log.Info("Object deleted") + return nil +} + +func (r *PodReconciler) reconcileBuiltinGatewayDataplane(ctx context.Context, pod *kube_core.Pod, log logr.Logger) error { + if pod.Status.PodIP == "" { + dp := &mesh_k8s.Dataplane{ + ObjectMeta: kube_meta.ObjectMeta{Name: pod.Name, Namespace: pod.Namespace}, + } + return r.deleteObjectIfExist(ctx, dp, "pod IP is empty", log) + } + + ns := kube_core.Namespace{} + if err := r.Client.Get(ctx, kube_types.NamespacedName{Name: pod.Namespace}, &ns); err != nil { + return errors.Wrap(err, "unable to get Namespace for Pod") + } + return r.createorUpdateBuiltinGatewayDataplane(ctx, pod, &ns) +} + +func (r *PodReconciler) reconcileZoneIngress(ctx context.Context, pod *kube_core.Pod, log logr.Logger) error { + if pod.Status.PodIP == "" { + zi := &mesh_k8s.ZoneIngress{ + ObjectMeta: kube_meta.ObjectMeta{Name: pod.Name}, + } + return r.deleteObjectIfExist(ctx, zi, "pod IP is empty", log) + } + + if pod.Namespace != r.SystemNamespace { + return errors.Errorf("Ingress can only be deployed in system namespace %q", r.SystemNamespace) + } + services, err := r.findMatchingServices(ctx, pod) + if err != nil { + return err + } + err = r.createOrUpdateIngress(ctx, pod, services) + if err != nil { + return err + } + return nil +} + +func (r *PodReconciler) reconcileZoneEgress(ctx context.Context, pod *kube_core.Pod, log logr.Logger) error { + if pod.Status.PodIP == "" { + zi := &mesh_k8s.ZoneEgress{ + ObjectMeta: kube_meta.ObjectMeta{Name: pod.Name}, + } + return r.deleteObjectIfExist(ctx, zi, "pod IP is empty", log) + } + + if pod.Namespace != r.SystemNamespace { + return errors.Errorf("Egress can only be deployed in system namespace %q", r.SystemNamespace) + } + services, err := r.findMatchingServices(ctx, pod) + if err != nil { + return err + } + err = r.createOrUpdateEgress(ctx, pod, services) + if err != nil { + return err + } + return nil } func (r *PodReconciler) findMatchingServices(ctx context.Context, pod *kube_core.Pod) ([]*kube_core.Service, error) { @@ -218,16 +277,18 @@ func (r *PodReconciler) createOrUpdateDataplane( } return nil }) + log := r.Log.WithValues("pod", kube_types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name}) if err != nil { - log := r.Log.WithValues("pod", kube_types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name}) log.Error(err, "unable to create/update Dataplane", "operationResult", operationResult) r.EventRecorder.Eventf(pod, kube_core.EventTypeWarning, FailedToGenerateKumaDataplaneReason, "Failed to generate Kuma Dataplane: %s", err.Error()) return err } switch operationResult { case kube_controllerutil.OperationResultCreated: + log.Info("Dataplane created") r.EventRecorder.Eventf(pod, kube_core.EventTypeNormal, CreatedKumaDataplaneReason, "Created Kuma Dataplane: %s", pod.Name) case kube_controllerutil.OperationResultUpdated: + log.Info("Dataplane updated") r.EventRecorder.Eventf(pod, kube_core.EventTypeNormal, UpdatedKumaDataplaneReason, "Updated Kuma Dataplane: %s", pod.Name) } return nil @@ -250,16 +311,18 @@ func (r *PodReconciler) createOrUpdateIngress(ctx context.Context, pod *kube_cor } return nil }) + log := r.Log.WithValues("pod", kube_types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name}) if err != nil { - log := r.Log.WithValues("pod", kube_types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name}) log.Error(err, "unable to create/update Ingress", "operationResult", operationResult) r.EventRecorder.Eventf(pod, kube_core.EventTypeWarning, FailedToGenerateKumaDataplaneReason, "Failed to generate Kuma Ingress: %s", err.Error()) return err } switch operationResult { case kube_controllerutil.OperationResultCreated: + log.Info("ZoneIngress created") r.EventRecorder.Eventf(pod, kube_core.EventTypeNormal, CreatedKumaDataplaneReason, "Created Kuma Ingress: %s", pod.Name) case kube_controllerutil.OperationResultUpdated: + log.Info("ZoneIngress updated") r.EventRecorder.Eventf(pod, kube_core.EventTypeNormal, UpdatedKumaDataplaneReason, "Updated Kuma Ingress: %s", pod.Name) } return nil @@ -282,16 +345,18 @@ func (r *PodReconciler) createOrUpdateEgress(ctx context.Context, pod *kube_core } return nil }) + log := r.Log.WithValues("pod", kube_types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name}) if err != nil { - log := r.Log.WithValues("pod", kube_types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name}) log.Error(err, "unable to create/update Egress", "operationResult", operationResult) r.EventRecorder.Eventf(pod, kube_core.EventTypeWarning, FailedToGenerateKumaDataplaneReason, "Failed to generate Kuma Egress: %s", err.Error()) return err } switch operationResult { case kube_controllerutil.OperationResultCreated: + log.Info("ZoneEgress created") r.EventRecorder.Eventf(pod, kube_core.EventTypeNormal, CreatedKumaDataplaneReason, "Created Kuma Egress: %s", pod.Name) case kube_controllerutil.OperationResultUpdated: + log.Info("ZoneEgress updated") r.EventRecorder.Eventf(pod, kube_core.EventTypeNormal, UpdatedKumaDataplaneReason, "Updated Kuma Egress: %s", pod.Name) } return nil @@ -308,20 +373,6 @@ func (r *PodReconciler) SetupWithManager(mgr kube_ctrl.Manager) error { Complete(r) } -func (r *PodReconciler) isPodComplete(pod *kube_core.Pod) bool { - for _, cs := range pod.Status.ContainerStatuses { - // the sidecar amy or may not be terminated yet - if cs.Name == util_k8s.KumaSidecarContainerName { - continue - } - if cs.State.Terminated == nil { - // at least one container not terminated, therefore pod is still active - return false - } - } - return true -} - func ServiceToPodsMapper(l logr.Logger, client kube_client.Client) kube_handler.MapFunc { l = l.WithName("service-to-pods-mapper") return func(obj kube_client.Object) []kube_reconile.Request { diff --git a/pkg/plugins/runtime/k8s/controllers/pod_status_controller.go b/pkg/plugins/runtime/k8s/controllers/pod_status_controller.go index 9bbca14dd8ef..3b5a7e7c9f3c 100644 --- a/pkg/plugins/runtime/k8s/controllers/pod_status_controller.go +++ b/pkg/plugins/runtime/k8s/controllers/pod_status_controller.go @@ -5,7 +5,6 @@ import ( "github.com/go-logr/logr" "github.com/pkg/errors" - "go.uber.org/multierr" kube_core "k8s.io/api/core/v1" kube_apierrs "k8s.io/apimachinery/pkg/api/errors" kube_runtime "k8s.io/apimachinery/pkg/runtime" @@ -64,18 +63,10 @@ func (r *PodStatusReconciler) Reconcile(ctx context.Context, req kube_ctrl.Reque return kube_ctrl.Result{}, err } - var errs error - err := r.EnvoyAdminClient.PostQuit(dp) - if err != nil { - errs = multierr.Append(errs, errors.Wrapf(err, "envoy admin client failed. Most probably the pod is already going down.")) + if err := r.EnvoyAdminClient.PostQuit(dp); err != nil { + return kube_ctrl.Result{}, errors.Wrap(err, "envoy admin client failed. Most probably the pod is already going down.") } - - err = r.Client.Delete(ctx, dataplane) - if err != nil { - errs = multierr.Append(errs, errors.Wrapf(err, "unable to delete the job's dataplane")) - } - - return kube_ctrl.Result{}, errs + return kube_ctrl.Result{}, nil } func (r *PodStatusReconciler) SetupWithManager(mgr kube_ctrl.Manager) error {