Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix pod anno conflict #88

Merged
merged 1 commit into from
Jul 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions pkg/controller/tunnel/innercluster_tunnel_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,7 @@ func (ict *InnerClusterTunnelController) SpawnNewCIDRForNRIPod(pod *v1.Pod) (str
}

klog.Infof("pod get a cidr from %s with %s", existingCIDR, secondaryCIDR)
cachedPod := pod.DeepCopy()
pod.GetAnnotations()[fmt.Sprintf(known.DaemonCIDR, known.NautiPrefix)] = secondaryCIDR
pod.GetAnnotations()[fmt.Sprintf(known.CNFCIDR, known.NautiPrefix)] = ict.globalCIDR
if err := utils.PatchPodConfig(ict.kubeClientSet, cachedPod, pod); err != nil {
if err := utils.PatchPodWithRetry(ict.kubeClientSet, pod, secondaryCIDR, ict.globalCIDR); err != nil {
klog.Errorf("set pod annotation with error %v", err)
return "", err
}
Expand Down
1 change: 1 addition & 0 deletions pkg/controller/tunnel/peer_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ func (p *PeerController) RecycleAllResources() {
}

func configHostRoutingRules(cidrs []string, operation known.RouteOperation) error {
klog.Infof("prepare to %v route with %s", operation, cidrs)
var ifaceIndex int
if wg, err := net.InterfaceByName(known.DefaultDeviceName); err == nil {
ifaceIndex = wg.Index
Expand Down
90 changes: 63 additions & 27 deletions utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,7 @@ func AddAnnotationToSelf(client kubernetes.Interface, annotationKey, annotationV
namespace := os.Getenv("NAUTI_PODNAMESPACE")

klog.Infof("podname is %s and namespace is %s ", podName, namespace)
// Get the Pod
pod, err := client.CoreV1().Pods(namespace).Get(context.TODO(), podName, metav1.GetOptions{})
if err != nil {
klog.Errorf("can't find pod with name %s in %s", podName, namespace)
return err
}
return setSpecificAnnotation(client, pod, annotationKey, annotationValue, override)
return setSpecificAnnotation(client, namespace, podName, annotationKey, annotationValue, override)
}

func UpdatePodLabels(client kubernetes.Interface, podName string, isLeader bool) {
Expand Down Expand Up @@ -101,6 +95,37 @@ func UpdatePodLabels(client kubernetes.Interface, podName string, isLeader bool)
}
}

// PatchPodWithRetry get and update specific pod.
func PatchPodWithRetry(client kubernetes.Interface, pod *v1.Pod, secondaryCIDR, globalCIDR string) error {
return retry.RetryOnConflict(retry.DefaultBackoff, func() (err error) {
cachedPod := pod.DeepCopy()
if pod.GetAnnotations() == nil {
pod.Annotations = make(map[string]string)
}
pod.Annotations[fmt.Sprintf(known.DaemonCIDR, known.NautiPrefix)] = secondaryCIDR
pod.Annotations[fmt.Sprintf(known.CNFCIDR, known.NautiPrefix)] = globalCIDR
patch, err := GenerateMergePatchPayload(cachedPod, pod)
if err != nil {
klog.Errorf("failed to generate patch for pod %s/%s: %v", pod.Name, pod.Namespace, err)
return err
}
_, patchErr := client.CoreV1().Pods(pod.Namespace).Patch(context.Background(), pod.Name,
types.MergePatchType, patch, metav1.PatchOptions{}, "")
if patchErr != nil {
klog.Errorf("patch pod %s/%s failed: %v", pod.Name, pod.Namespace, err)
pod, err = client.CoreV1().Pods(pod.Namespace).Get(context.TODO(), pod.GetName(), metav1.GetOptions{})
if k8serrors.IsNotFound(err) {
return nil
}
if err != nil {
return err
}
return patchErr
}
return nil
})
}

func PatchPodConfig(client kubernetes.Interface, cachedPod, pod *v1.Pod) error {
patch, err := GenerateMergePatchPayload(cachedPod, pod)
if err != nil {
Expand Down Expand Up @@ -140,32 +165,43 @@ func createMergePatch(originalJSON, modifiedJSON []byte, _ interface{}) ([]byte,
return jsonpatch.CreateMergePatch(originalJSON, modifiedJSON)
}

func setSpecificAnnotation(client kubernetes.Interface, pod *v1.Pod, annotationKey, annotationValue string,
func setSpecificAnnotation(client kubernetes.Interface, namespace, podName, annotationKey, annotationValue string,
override bool) error {
annoChanged := true
if pod.Annotations == nil {
pod.Annotations = map[string]string{}
}
annotationKey = fmt.Sprintf(annotationKey, known.NautiPrefix)

existingValues, ok := pod.Annotations[annotationKey]
if ok && !override {
existingValuesSlice := strings.Split(existingValues, ",")
if ContainsString(existingValuesSlice, annotationValue) {
annoChanged = false
return retry.RetryOnConflict(retry.DefaultBackoff, func() (err error) {
// Get the Pod
pod, err := client.CoreV1().Pods(namespace).Get(context.TODO(), podName, metav1.GetOptions{})
if err != nil {
klog.Errorf("can't find pod with name %s in %s", podName, namespace)
if k8serrors.IsNotFound(err) {
return nil
}
return err
}
cachedPod := pod.DeepCopy()
if pod.Annotations == nil {
pod.Annotations = map[string]string{}
}
annotationKey = fmt.Sprintf(annotationKey, known.NautiPrefix)

existingValues, ok := pod.Annotations[annotationKey]
if ok && !override {
existingValuesSlice := strings.Split(existingValues, ",")
if !ContainsString(existingValuesSlice, annotationValue) {
pod.Annotations[annotationKey] = existingValues + "," + annotationValue
}
} else {
pod.Annotations[annotationKey] = existingValues + "," + annotationValue
pod.Annotations[annotationKey] = annotationValue
}
} else {
pod.Annotations[annotationKey] = annotationValue
}
if annoChanged {
_, err := client.CoreV1().Pods(pod.Namespace).Update(context.TODO(), pod, metav1.UpdateOptions{})

patch, err := GenerateMergePatchPayload(cachedPod, pod)
if err != nil {
klog.Errorf("failed to generate patch for pod %s/%s: %v", pod.Name, pod.Namespace, err)
return err
}
}
return nil
_, patchErr := client.CoreV1().Pods(pod.Namespace).Patch(context.Background(), pod.Name,
types.MergePatchType, patch, metav1.PatchOptions{}, "")
return patchErr
})
}

// GetSpecificAnnotation get DaemonCIDR from pod annotation return "" if is empty.
Expand Down
Loading