From 16dc8d590cd70d59293b04145c991e2720bdec3e Mon Sep 17 00:00:00 2001 From: lmxia Date: Wed, 31 Jul 2024 15:11:11 +0800 Subject: [PATCH] fix pod anno conflict Signed-off-by: lmxia --- .../tunnel/innercluster_tunnel_controller.go | 5 +- pkg/controller/tunnel/peer_controller.go | 1 + utils/utils.go | 90 +++++++++++++------ 3 files changed, 65 insertions(+), 31 deletions(-) diff --git a/pkg/controller/tunnel/innercluster_tunnel_controller.go b/pkg/controller/tunnel/innercluster_tunnel_controller.go index 5948db79..da1e6198 100644 --- a/pkg/controller/tunnel/innercluster_tunnel_controller.go +++ b/pkg/controller/tunnel/innercluster_tunnel_controller.go @@ -72,10 +72,7 @@ func (ict *InnerClusterTunnelController) SpawnNewCIDRForNRIPod(pod *v1.Pod) (str } klog.Infof("pod get a cidr from %s with %s", existingCIDR, secondaryCIDR) - cachedPod := pod.DeepCopy() - pod.GetAnnotations()[fmt.Sprintf(known.DaemonCIDR, known.NautiPrefix)] = secondaryCIDR - pod.GetAnnotations()[fmt.Sprintf(known.CNFCIDR, known.NautiPrefix)] = ict.globalCIDR - if err := utils.PatchPodConfig(ict.kubeClientSet, cachedPod, pod); err != nil { + if err := utils.PatchPodWithRetry(ict.kubeClientSet, pod, secondaryCIDR, ict.globalCIDR); err != nil { klog.Errorf("set pod annotation with error %v", err) return "", err } diff --git a/pkg/controller/tunnel/peer_controller.go b/pkg/controller/tunnel/peer_controller.go index 52650771..315b237d 100644 --- a/pkg/controller/tunnel/peer_controller.go +++ b/pkg/controller/tunnel/peer_controller.go @@ -233,6 +233,7 @@ func (p *PeerController) RecycleAllResources() { } func configHostRoutingRules(cidrs []string, operation known.RouteOperation) error { + klog.Infof("prepare to %s route with %s", operation, cidrs) var ifaceIndex int if wg, err := net.InterfaceByName(known.DefaultDeviceName); err == nil { ifaceIndex = wg.Index diff --git a/utils/utils.go b/utils/utils.go index 80b2fd6e..94db0dbd 100644 --- a/utils/utils.go +++ b/utils/utils.go @@ -66,13 +66,7 @@ func AddAnnotationToSelf(client kubernetes.Interface, annotationKey, annotationV namespace := os.Getenv("NAUTI_PODNAMESPACE") klog.Infof("podname is %s and namespace is %s ", podName, namespace) - // Get the Pod - pod, err := client.CoreV1().Pods(namespace).Get(context.TODO(), podName, metav1.GetOptions{}) - if err != nil { - klog.Errorf("can't find pod with name %s in %s", podName, namespace) - return err - } - return setSpecificAnnotation(client, pod, annotationKey, annotationValue, override) + return setSpecificAnnotation(client, namespace, podName, annotationKey, annotationValue, override) } func UpdatePodLabels(client kubernetes.Interface, podName string, isLeader bool) { @@ -101,6 +95,37 @@ func UpdatePodLabels(client kubernetes.Interface, podName string, isLeader bool) } } +// PatchPodWithRetry get and update specific pod. +func PatchPodWithRetry(client kubernetes.Interface, pod *v1.Pod, secondaryCIDR, globalCIDR string) error { + return retry.RetryOnConflict(retry.DefaultBackoff, func() (err error) { + cachedPod := pod.DeepCopy() + if pod.GetAnnotations() == nil { + pod.Annotations = make(map[string]string) + } + pod.Annotations[fmt.Sprintf(known.DaemonCIDR, known.NautiPrefix)] = secondaryCIDR + pod.Annotations[fmt.Sprintf(known.CNFCIDR, known.NautiPrefix)] = globalCIDR + patch, err := GenerateMergePatchPayload(cachedPod, pod) + if err != nil { + klog.Errorf("failed to generate patch for pod %s/%s: %v", pod.Name, pod.Namespace, err) + return err + } + _, patchErr := client.CoreV1().Pods(pod.Namespace).Patch(context.Background(), pod.Name, + types.MergePatchType, patch, metav1.PatchOptions{}, "") + if patchErr != nil { + klog.Errorf("patch pod %s/%s failed: %v", pod.Name, pod.Namespace, err) + pod, err = client.CoreV1().Pods(pod.Namespace).Get(context.TODO(), pod.GetName(), metav1.GetOptions{}) + if k8serrors.IsNotFound(err) { + return nil + } + if err != nil { + return err + } + return patchErr + } + return nil + }) +} + func PatchPodConfig(client kubernetes.Interface, cachedPod, pod *v1.Pod) error { patch, err := GenerateMergePatchPayload(cachedPod, pod) if err != nil { @@ -140,32 +165,43 @@ func createMergePatch(originalJSON, modifiedJSON []byte, _ interface{}) ([]byte, return jsonpatch.CreateMergePatch(originalJSON, modifiedJSON) } -func setSpecificAnnotation(client kubernetes.Interface, pod *v1.Pod, annotationKey, annotationValue string, +func setSpecificAnnotation(client kubernetes.Interface, namespace, podName, annotationKey, annotationValue string, override bool) error { - annoChanged := true - if pod.Annotations == nil { - pod.Annotations = map[string]string{} - } - annotationKey = fmt.Sprintf(annotationKey, known.NautiPrefix) - - existingValues, ok := pod.Annotations[annotationKey] - if ok && !override { - existingValuesSlice := strings.Split(existingValues, ",") - if ContainsString(existingValuesSlice, annotationValue) { - annoChanged = false + return retry.RetryOnConflict(retry.DefaultBackoff, func() (err error) { + // Get the Pod + pod, err := client.CoreV1().Pods(namespace).Get(context.TODO(), podName, metav1.GetOptions{}) + if err != nil { + klog.Errorf("can't find pod with name %s in %s", podName, namespace) + if k8serrors.IsNotFound(err) { + return nil + } + return err + } + cachedPod := pod.DeepCopy() + if pod.Annotations == nil { + pod.Annotations = map[string]string{} + } + annotationKey = fmt.Sprintf(annotationKey, known.NautiPrefix) + + existingValues, ok := pod.Annotations[annotationKey] + if ok && !override { + existingValuesSlice := strings.Split(existingValues, ",") + if !ContainsString(existingValuesSlice, annotationValue) { + pod.Annotations[annotationKey] = existingValues + "," + annotationValue + } } else { - pod.Annotations[annotationKey] = existingValues + "," + annotationValue + pod.Annotations[annotationKey] = annotationValue } - } else { - pod.Annotations[annotationKey] = annotationValue - } - if annoChanged { - _, err := client.CoreV1().Pods(pod.Namespace).Update(context.TODO(), pod, metav1.UpdateOptions{}) + + patch, err := GenerateMergePatchPayload(cachedPod, pod) if err != nil { + klog.Errorf("failed to generate patch for pod %s/%s: %v", pod.Name, pod.Namespace, err) return err } - } - return nil + _, patchErr := client.CoreV1().Pods(pod.Namespace).Patch(context.Background(), pod.Name, + types.MergePatchType, patch, metav1.PatchOptions{}, "") + return patchErr + }) } // GetSpecificAnnotation get DaemonCIDR from pod annotation return "" if is empty.