Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
Signed-off-by: zhangzujian <[email protected]>
  • Loading branch information
zhangzujian committed Dec 17, 2024
1 parent 1cd50fa commit 4280f84
Show file tree
Hide file tree
Showing 12 changed files with 108 additions and 328 deletions.
9 changes: 3 additions & 6 deletions pkg/controller/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,12 +218,9 @@ func (c *Controller) handleUpdateEndpoint(key string) error {
}

if svcVpc = svc.Annotations[util.VpcAnnotation]; svcVpc != vpcName {
if svc.Annotations == nil {
svc.Annotations = make(map[string]string, 1)
}
svc.Annotations[util.VpcAnnotation] = vpcName
if _, err = c.config.KubeClient.CoreV1().Services(namespace).Update(context.Background(), svc, metav1.UpdateOptions{}); err != nil {
klog.Errorf("failed to update service %s: %v", key, err)
patch := util.KVPatch{util.VpcAnnotation: vpcName}
if err = util.PatchAnnotations(c.config.KubeClient.CoreV1().Services(namespace), svc.Name, patch); err != nil {
klog.Errorf("failed to patch service %s: %v", key, err)
return err
}
}
Expand Down
15 changes: 4 additions & 11 deletions pkg/controller/inspection.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
package controller

import (
"context"
"fmt"

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog/v2"

"github.com/kubeovn/kube-ovn/pkg/ovs"
Expand Down Expand Up @@ -48,15 +45,11 @@ func (c *Controller) inspectPod() error {
}

if !exists { // pod exists but not lsp
delete(pod.Annotations, fmt.Sprintf(util.AllocatedAnnotationTemplate, podNet.ProviderName))
delete(pod.Annotations, fmt.Sprintf(util.RoutedAnnotationTemplate, podNet.ProviderName))
patch, err := util.GenerateStrategicMergePatchPayload(oriPod, pod)
if err != nil {
klog.Errorf("failed to generate patch payload, %v", err)
return err
patch := util.KVPatch{
fmt.Sprintf(util.AllocatedAnnotationTemplate, podNet.ProviderName): nil,
fmt.Sprintf(util.RoutedAnnotationTemplate, podNet.ProviderName): nil,
}
if _, err := c.config.KubeClient.CoreV1().Pods(pod.Namespace).Patch(context.Background(), pod.Name,
types.StrategicMergePatchType, patch, metav1.PatchOptions{}, ""); err != nil {
if err = util.PatchAnnotations(c.config.KubeClient.CoreV1().Pods(pod.Namespace), pod.Name, patch); err != nil {
klog.Errorf("patch pod %s/%s failed %v during inspection", pod.Name, pod.Namespace, err)
return err
}
Expand Down
12 changes: 4 additions & 8 deletions pkg/controller/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,14 +329,10 @@ func (c *Controller) handleNodeAnnotationsForProviderNetworks(node *v1.Node) err
}

if len(node.Annotations) != 0 {
newNode := node.DeepCopy()
delete(newNode.Annotations, excludeAnno)
delete(newNode.Annotations, interfaceAnno)
if len(newNode.Annotations) != len(node.Annotations) {
if _, err = c.config.KubeClient.CoreV1().Nodes().Update(context.Background(), newNode, metav1.UpdateOptions{}); err != nil {
klog.Errorf("failed to update node %s: %v", node.Name, err)
return err
}
patch := util.KVPatch{excludeAnno: nil, interfaceAnno: nil}
if err = util.PatchAnnotations(c.config.KubeClient.CoreV1().Nodes(), node.Name, patch); err != nil {
klog.Errorf("failed to patch node %s: %v", node.Name, err)
return err
}
}

Expand Down
118 changes: 57 additions & 61 deletions pkg/controller/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -449,15 +449,14 @@ func (c *Controller) handleAddOrUpdatePod(key string) (err error) {
defer func() { _ = c.podKeyMutex.UnlockKey(key) }()
klog.Infof("handle add/update pod %s", key)

cachedPod, err := c.podsLister.Pods(namespace).Get(name)
pod, err := c.podsLister.Pods(namespace).Get(name)
if err != nil {
if k8serrors.IsNotFound(err) {
return nil
}
klog.Error(err)
return err
}
pod := cachedPod.DeepCopy()
if err := util.ValidatePodNetwork(pod.Annotations); err != nil {
klog.Errorf("validate pod %s/%s failed: %v", namespace, name, err)
c.recorder.Eventf(pod, v1.EventTypeWarning, "ValidatePodNetworkFailed", err.Error())
Expand All @@ -469,39 +468,34 @@ func (c *Controller) handleAddOrUpdatePod(key string) (err error) {
klog.Errorf("failed to get pod nets %v", err)
return err
}
if len(pod.Annotations) == 0 {
pod.Annotations = map[string]string{}
}

// check and do hotnoplug nic
if cachedPod, err = c.syncKubeOvnNet(cachedPod, pod, podNets); err != nil {
if pod, err = c.syncKubeOvnNet(pod, podNets); err != nil {
klog.Errorf("failed to sync pod nets %v", err)
return err
}
if cachedPod == nil {
if pod == nil {
// pod has been deleted
return nil
}
pod = cachedPod.DeepCopy()
needAllocatePodNets := needAllocateSubnets(pod, podNets)
if len(needAllocatePodNets) != 0 {
if cachedPod, err = c.reconcileAllocateSubnets(cachedPod, pod, needAllocatePodNets); err != nil {
if pod, err = c.reconcileAllocateSubnets(pod, needAllocatePodNets); err != nil {
klog.Error(err)
return err
}
if cachedPod == nil {
if pod == nil {
// pod has been deleted
return nil
}
}

// check if route subnet is need.
pod = cachedPod.DeepCopy()
return c.reconcileRouteSubnets(cachedPod, pod, needRouteSubnets(pod, podNets))
return c.reconcileRouteSubnets(pod, needRouteSubnets(pod, podNets))
}

// do the same thing as add pod
func (c *Controller) reconcileAllocateSubnets(cachedPod, pod *v1.Pod, needAllocatePodNets []*kubeovnNet) (*v1.Pod, error) {
func (c *Controller) reconcileAllocateSubnets(pod *v1.Pod, needAllocatePodNets []*kubeovnNet) (*v1.Pod, error) {
namespace := pod.Namespace
name := pod.Name
klog.Infof("sync pod %s/%s allocated", namespace, name)
Expand All @@ -518,6 +512,7 @@ func (c *Controller) reconcileAllocateSubnets(cachedPod, pod *v1.Pod, needAlloca
vmKey = fmt.Sprintf("%s/%s", namespace, vmName)
}
// Avoid create lsp for already running pod in ovn-nb when controller restart
patch := util.KVPatch{}
for _, podNet := range needAllocatePodNets {
// the subnet may changed when alloc static ip from the latter subnet after ns supports multi subnets
v4IP, v6IP, mac, subnet, err := c.acquireAddress(pod, podNet)
Expand All @@ -527,26 +522,26 @@ func (c *Controller) reconcileAllocateSubnets(cachedPod, pod *v1.Pod, needAlloca
return nil, err
}
ipStr := util.GetStringIP(v4IP, v6IP)
pod.Annotations[fmt.Sprintf(util.IPAddressAnnotationTemplate, podNet.ProviderName)] = ipStr
patch[fmt.Sprintf(util.IPAddressAnnotationTemplate, podNet.ProviderName)] = ipStr
if mac == "" {
delete(pod.Annotations, fmt.Sprintf(util.MacAddressAnnotationTemplate, podNet.ProviderName))
patch[fmt.Sprintf(util.MacAddressAnnotationTemplate, podNet.ProviderName)] = nil
} else {
pod.Annotations[fmt.Sprintf(util.MacAddressAnnotationTemplate, podNet.ProviderName)] = mac
patch[fmt.Sprintf(util.MacAddressAnnotationTemplate, podNet.ProviderName)] = mac
}
pod.Annotations[fmt.Sprintf(util.CidrAnnotationTemplate, podNet.ProviderName)] = subnet.Spec.CIDRBlock
pod.Annotations[fmt.Sprintf(util.GatewayAnnotationTemplate, podNet.ProviderName)] = subnet.Spec.Gateway
patch[fmt.Sprintf(util.CidrAnnotationTemplate, podNet.ProviderName)] = subnet.Spec.CIDRBlock
patch[fmt.Sprintf(util.GatewayAnnotationTemplate, podNet.ProviderName)] = subnet.Spec.Gateway
if isOvnSubnet(podNet.Subnet) {
pod.Annotations[fmt.Sprintf(util.LogicalSwitchAnnotationTemplate, podNet.ProviderName)] = subnet.Name
patch[fmt.Sprintf(util.LogicalSwitchAnnotationTemplate, podNet.ProviderName)] = subnet.Name
if pod.Annotations[fmt.Sprintf(util.PodNicAnnotationTemplate, podNet.ProviderName)] == "" {
pod.Annotations[fmt.Sprintf(util.PodNicAnnotationTemplate, podNet.ProviderName)] = c.config.PodNicType
patch[fmt.Sprintf(util.PodNicAnnotationTemplate, podNet.ProviderName)] = c.config.PodNicType
}
} else {
delete(pod.Annotations, fmt.Sprintf(util.LogicalSwitchAnnotationTemplate, podNet.ProviderName))
delete(pod.Annotations, fmt.Sprintf(util.PodNicAnnotationTemplate, podNet.ProviderName))
patch[fmt.Sprintf(util.LogicalSwitchAnnotationTemplate, podNet.ProviderName)] = nil
patch[fmt.Sprintf(util.PodNicAnnotationTemplate, podNet.ProviderName)] = nil
}
pod.Annotations[fmt.Sprintf(util.AllocatedAnnotationTemplate, podNet.ProviderName)] = "true"
patch[fmt.Sprintf(util.AllocatedAnnotationTemplate, podNet.ProviderName)] = "true"
if vmKey != "" {
pod.Annotations[fmt.Sprintf(util.VMAnnotationTemplate, podNet.ProviderName)] = vmName
patch[fmt.Sprintf(util.VMAnnotationTemplate, podNet.ProviderName)] = vmName
}
if err := util.ValidateNetworkBroadcast(podNet.Subnet.Spec.CIDRBlock, ipStr); err != nil {
klog.Errorf("validate pod %s/%s failed: %v", namespace, name, err)
Expand All @@ -556,7 +551,7 @@ func (c *Controller) reconcileAllocateSubnets(cachedPod, pod *v1.Pod, needAlloca

if podNet.Type != providerTypeIPAM {
if (subnet.Spec.Vlan == "" || subnet.Spec.LogicalGateway || subnet.Spec.U2OInterconnection) && subnet.Spec.Vpc != "" {
pod.Annotations[fmt.Sprintf(util.LogicalRouterAnnotationTemplate, podNet.ProviderName)] = subnet.Spec.Vpc
patch[fmt.Sprintf(util.LogicalRouterAnnotationTemplate, podNet.ProviderName)] = subnet.Spec.Vpc
}

if subnet.Spec.Vlan != "" {
Expand All @@ -566,8 +561,8 @@ func (c *Controller) reconcileAllocateSubnets(cachedPod, pod *v1.Pod, needAlloca
c.recorder.Eventf(pod, v1.EventTypeWarning, "GetVlanInfoFailed", err.Error())
return nil, err
}
pod.Annotations[fmt.Sprintf(util.VlanIDAnnotationTemplate, podNet.ProviderName)] = strconv.Itoa(vlan.Spec.ID)
pod.Annotations[fmt.Sprintf(util.ProviderNetworkTemplate, podNet.ProviderName)] = vlan.Spec.Provider
patch[fmt.Sprintf(util.VlanIDAnnotationTemplate, podNet.ProviderName)] = strconv.Itoa(vlan.Spec.ID)
patch[fmt.Sprintf(util.ProviderNetworkTemplate, podNet.ProviderName)] = vlan.Spec.Provider
}

portSecurity := false
Expand Down Expand Up @@ -628,14 +623,7 @@ func (c *Controller) reconcileAllocateSubnets(cachedPod, pod *v1.Pod, needAlloca
return nil, err
}
}
patch, err := util.GenerateMergePatchPayload(cachedPod, pod)
if err != nil {
klog.Errorf("failed to generate patch for pod %s/%s: %v", name, namespace, err)
return nil, err
}
patchedPod, err := c.config.KubeClient.CoreV1().Pods(namespace).Patch(context.Background(), name,
types.MergePatchType, patch, metav1.PatchOptions{}, "")
if err != nil {
if err = util.PatchAnnotations(c.config.KubeClient.CoreV1().Pods(namespace), name, patch); err != nil {
if k8serrors.IsNotFound(err) {
// Sometimes pod is deleted between kube-ovn configure ovn-nb and patch pod.
// Then we need to recycle the resource again.
Expand All @@ -644,18 +632,29 @@ func (c *Controller) reconcileAllocateSubnets(cachedPod, pod *v1.Pod, needAlloca
c.deletePodQueue.AddRateLimited(key)
return nil, nil
}
klog.Errorf("patch pod %s/%s failed: %v", name, namespace, err)
klog.Errorf("failed to patch pod %s/%s: %v", namespace, name, err)
return nil, err
}

if pod, err = c.config.KubeClient.CoreV1().Pods(namespace).Get(context.TODO(), name, metav1.GetOptions{}); err != nil {
if k8serrors.IsNotFound(err) {
key := strings.Join([]string{namespace, name}, "/")
c.deletingPodObjMap.Store(key, pod)
c.deletePodQueue.AddRateLimited(key)
return nil, nil
}
klog.Errorf("failed to get pod %s/%s: %v", namespace, name, err)
return nil, err
}

if vpcGwName, isVpcNatGw := pod.Annotations[util.VpcNatGatewayAnnotation]; isVpcNatGw {
c.initVpcNatGatewayQueue.Add(vpcGwName)
}
return patchedPod.DeepCopy(), nil
return pod, nil
}

// do the same thing as update pod
func (c *Controller) reconcileRouteSubnets(cachedPod, pod *v1.Pod, needRoutePodNets []*kubeovnNet) error {
func (c *Controller) reconcileRouteSubnets(pod *v1.Pod, needRoutePodNets []*kubeovnNet) error {
// the lb-svc pod has dependencies on Running state, check it when pod state get updated
if err := c.checkAndReInitLbSvcPod(pod); err != nil {
klog.Errorf("failed to init iptable rules for load-balancer pod %s/%s: %v", pod.Namespace, pod.Name, err)
Expand All @@ -673,7 +672,7 @@ func (c *Controller) reconcileRouteSubnets(cachedPod, pod *v1.Pod, needRoutePodN

var podIP string
var subnet *kubeovnv1.Subnet

patch := util.KVPatch{}
for _, podNet := range needRoutePodNets {
// in case update handler overlap the annotation when cache is not in sync
if pod.Annotations[fmt.Sprintf(util.AllocatedAnnotationTemplate, podNet.ProviderName)] == "" {
Expand Down Expand Up @@ -858,15 +857,9 @@ func (c *Controller) reconcileRouteSubnets(cachedPod, pod *v1.Pod, needRoutePodN
}
}

pod.Annotations[fmt.Sprintf(util.RoutedAnnotationTemplate, podNet.ProviderName)] = "true"
patch[fmt.Sprintf(util.RoutedAnnotationTemplate, podNet.ProviderName)] = "true"
}
patch, err := util.GenerateMergePatchPayload(cachedPod, pod)
if err != nil {
klog.Errorf("failed to generate patch for pod %s/%s: %v", name, namespace, err)
return err
}
if _, err := c.config.KubeClient.CoreV1().Pods(namespace).Patch(context.Background(), name,
types.MergePatchType, patch, metav1.PatchOptions{}, ""); err != nil {
if err := util.PatchAnnotations(c.config.KubeClient.CoreV1().Pods(namespace), name, patch); err != nil {
if k8serrors.IsNotFound(err) {
// Sometimes pod is deleted between kube-ovn configure ovn-nb and patch pod.
// Then we need to recycle the resource again.
Expand All @@ -875,7 +868,7 @@ func (c *Controller) reconcileRouteSubnets(cachedPod, pod *v1.Pod, needRoutePodN
c.deletePodQueue.AddRateLimited(key)
return nil
}
klog.Errorf("patch pod %s/%s failed %v", name, namespace, err)
klog.Errorf("failed to patch pod %s/%s: %v", namespace, name, err)
return err
}
return nil
Expand Down Expand Up @@ -1150,7 +1143,7 @@ func (c *Controller) handleUpdatePodSecurity(key string) error {
return nil
}

func (c *Controller) syncKubeOvnNet(cachedPod, pod *v1.Pod, podNets []*kubeovnNet) (*v1.Pod, error) {
func (c *Controller) syncKubeOvnNet(pod *v1.Pod, podNets []*kubeovnNet) (*v1.Pod, error) {
podName := c.getNameByPod(pod)
key := fmt.Sprintf("%s/%s", pod.Namespace, podName)
targetPortNameList := strset.NewWithSize(len(podNets))
Expand Down Expand Up @@ -1203,32 +1196,35 @@ func (c *Controller) syncKubeOvnNet(cachedPod, pod *v1.Pod, podNets []*kubeovnNe
}
}

patch := util.KVPatch{}
for _, providerName := range annotationsNeedToDel {
for annotationKey := range pod.Annotations {
if strings.HasPrefix(annotationKey, providerName) {
delete(pod.Annotations, annotationKey)
for key := range pod.Annotations {
if strings.HasPrefix(key, providerName) {
patch[key] = nil
}
}
}
if len(cachedPod.Annotations) == len(pod.Annotations) {
if len(patch) == 0 {
return pod, nil
}
patch, err := util.GenerateMergePatchPayload(cachedPod, pod)
if err != nil {
klog.Errorf("failed to generate patch payload for pod '%s', %v", pod.Name, err)

if err = util.PatchAnnotations(c.config.KubeClient.CoreV1().Pods(pod.Namespace), pod.Name, patch); err != nil {
if k8serrors.IsNotFound(err) {
return nil, nil
}
klog.Errorf("failed to clean annotations for pod %s/%s: %v", pod.Namespace, pod.Name, err)
return nil, err
}
patchedPod, err := c.config.KubeClient.CoreV1().Pods(pod.Namespace).Patch(context.Background(), pod.Name,
types.MergePatchType, patch, metav1.PatchOptions{}, "")
if err != nil {

if pod, err = c.config.KubeClient.CoreV1().Pods(pod.Namespace).Get(context.TODO(), pod.Name, metav1.GetOptions{}); err != nil {
if k8serrors.IsNotFound(err) {
return nil, nil
}
klog.Errorf("failed to delete useless annotations for pod %s: %v", pod.Name, err)
klog.Errorf("failed to get pod %s/%s: %v", pod.Namespace, pod.Name, err)
return nil, err
}

return patchedPod.DeepCopy(), nil
return pod, nil
}

func isStatefulSetPod(pod *v1.Pod) (bool, string, types.UID) {
Expand Down
16 changes: 2 additions & 14 deletions pkg/controller/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,13 +108,6 @@ func (c *Controller) enqueueUpdateService(oldObj, newObj interface{}) {
return
}

var key string
var err error
if key, err = cache.MetaNamespaceKeyFunc(newObj); err != nil {
utilruntime.HandleError(err)
return
}

oldClusterIps := getVipIps(oldSvc)
newClusterIps := getVipIps(newSvc)
var ipsToDel []string
Expand All @@ -124,6 +117,7 @@ func (c *Controller) enqueueUpdateService(oldObj, newObj interface{}) {
}
}

key := cache.MetaObjectToName(newSvc).String()
klog.V(3).Infof("enqueue update service %s", key)
if len(ipsToDel) != 0 {
ipsToDelStr := strings.Join(ipsToDel, ",")
Expand All @@ -139,13 +133,7 @@ func (c *Controller) enqueueUpdateService(oldObj, newObj interface{}) {
}

func (c *Controller) handleDeleteService(service *vpcService) error {
key, err := cache.MetaNamespaceKeyFunc(service.Svc)
if err != nil {
klog.Error(err)
utilruntime.HandleError(fmt.Errorf("failed to get meta namespace key of %#v: %w", service.Svc, err))
return nil
}

key := cache.MetaObjectToName(service.Svc).String()
c.svcKeyMutex.LockKey(key)
defer func() { _ = c.svcKeyMutex.UnlockKey(key) }()
klog.Infof("handle delete service %s", key)
Expand Down
Loading

0 comments on commit 4280f84

Please sign in to comment.