Skip to content

Commit

Permalink
add load balancer health check (#3216)
Browse files Browse the repository at this point in the history
* add load balancer health check

Signed-off-by: 夜微澜 <[email protected]>

* reuse a VIP named by subnet to health check

Signed-off-by: 夜微澜 <[email protected]>

* clear health check while slr deleted

Signed-off-by: 夜微澜 <[email protected]>

* fix health check unit test

Signed-off-by: 夜微澜 <[email protected]>

* fix go.mod

Signed-off-by: 夜微澜 <[email protected]>

* format && clean

Signed-off-by: 夜微澜 <[email protected]>

* revert changes to makefile parameters

Signed-off-by: 夜微澜 <[email protected]>

* update format

Signed-off-by: 夜微澜 <[email protected]>

---------

Signed-off-by: 夜微澜 <[email protected]>
  • Loading branch information
qiutingjun authored and zbb88888 committed Oct 19, 2023
1 parent e01a853 commit 127a87a
Show file tree
Hide file tree
Showing 23 changed files with 2,458 additions and 395 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ require (
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.8.4
github.com/vishvananda/netlink v1.2.1-beta.2
golang.org/x/exp v0.0.0-20231006140011-7918f672742d
golang.org/x/mod v0.13.0
golang.org/x/sys v0.13.0
golang.org/x/time v0.3.0
Expand Down
383 changes: 375 additions & 8 deletions mocks/pkg/ovs/interface.go

Large diffs are not rendered by default.

254 changes: 192 additions & 62 deletions pkg/controller/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,16 @@ import (
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"

kubeovnv1 "github.com/kubeovn/kube-ovn/pkg/apis/kubeovn/v1"
"github.com/kubeovn/kube-ovn/pkg/util"
)

func (c *Controller) enqueueAddEndpoint(obj interface{}) {
var key string
var err error
var (
key string
err error
)

if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil {
utilruntime.HandleError(err)
return
Expand All @@ -37,8 +41,11 @@ func (c *Controller) enqueueUpdateEndpoint(oldObj, newObj interface{}) {
return
}

var key string
var err error
var (
key string
err error
)

if key, err = cache.MetaNamespaceKeyFunc(newObj); err != nil {
utilruntime.HandleError(err)
return
Expand All @@ -59,23 +66,28 @@ func (c *Controller) processNextUpdateEndpointWorkItem() bool {
return false
}

err := func(obj interface{}) error {
if err := func(obj interface{}) error {
defer c.updateEndpointQueue.Done(obj)
var key string
var ok bool

var (
key string
ok bool
err error
)

if key, ok = obj.(string); !ok {
c.updateEndpointQueue.Forget(obj)
utilruntime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj))
return nil
}
if err := c.handleUpdateEndpoint(key); err != nil {

if err = c.handleUpdateEndpoint(key); err != nil {
c.updateEndpointQueue.AddRateLimited(key)
return fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error())
}
c.updateEndpointQueue.Forget(obj)
return nil
}(obj)
if err != nil {
}(obj); err != nil {
utilruntime.HandleError(err)
return true
}
Expand Down Expand Up @@ -112,55 +124,48 @@ func (c *Controller) handleUpdateEndpoint(key string) error {
}
svc := cachedService.DeepCopy()

var LbIPs []string
if vip, ok := svc.Annotations[util.SwitchLBRuleVipsAnnotation]; ok {
LbIPs = []string{vip}
} else if LbIPs = util.ServiceClusterIPs(*svc); len(LbIPs) == 0 {
return nil
}
var (
pods []*v1.Pod
lbVips []string
vip, vpcName, subnetName string
ok bool
ignoreHealthCheck = true
)

pods, err := c.podsLister.Pods(namespace).List(labels.Set(svc.Spec.Selector).AsSelector())
if err != nil {
klog.Errorf("failed to get pods for service %s in namespace %s: %v", name, namespace, err)
return err
}

var vpcName string
for _, pod := range pods {
if len(pod.Annotations) == 0 {
continue
}
if vip, ok = svc.Annotations[util.SwitchLBRuleVipsAnnotation]; ok {
lbVips = []string{vip}

for _, subset := range ep.Subsets {
for _, addr := range subset.Addresses {
if addr.IP == pod.Status.PodIP {
if vpcName = pod.Annotations[util.LogicalRouterAnnotation]; vpcName != "" {
break
}
for _, address := range subset.Addresses {
// TODO: IPv6
if util.CheckProtocol(vip) == kubeovnv1.ProtocolIPv4 &&
address.TargetRef.Name != "" {
ignoreHealthCheck = false
}
}
if vpcName != "" {
break
}
}
if vpcName != "" {
break
}
} else if lbVips = util.ServiceClusterIPs(*svc); len(lbVips) == 0 {
return nil
}

if vpcName == "" {
if vpcName = svc.Annotations[util.VpcAnnotation]; vpcName == "" {
vpcName = c.config.ClusterRouter
}
if pods, err = c.podsLister.Pods(namespace).List(labels.Set(svc.Spec.Selector).AsSelector()); err != nil {
klog.Errorf("failed to get pods for service %s in namespace %s: %v", name, namespace, err)
return err
}

vpc, err := c.vpcsLister.Get(vpcName)
if err != nil {
vpcName, subnetName = c.getVpcSubnetName(pods, ep, svc)

var (
vpc *kubeovnv1.Vpc
svcVpc string
)

if vpc, err = c.vpcsLister.Get(vpcName); err != nil {
klog.Errorf("failed to get vpc %s of lb, %v", vpcName, err)
return err
}

if svcVpc := svc.Annotations[util.VpcAnnotation]; svcVpc != vpcName {
if svcVpc = svc.Annotations[util.VpcAnnotation]; svcVpc != vpcName {
if svc.Annotations == nil {
svc.Annotations = make(map[string]string, 1)
}
Expand All @@ -177,7 +182,7 @@ func (c *Controller) handleUpdateEndpoint(key string) error {
tcpLb, udpLb, sctpLb, oldTCPLb, oldUDPLb, oldSctpLb = oldTCPLb, oldUDPLb, oldSctpLb, tcpLb, udpLb, sctpLb
}

for _, settingIP := range LbIPs {
for _, lbVip := range lbVips {
for _, port := range svc.Spec.Ports {
var lb, oldLb string
switch port.Protocol {
Expand All @@ -189,37 +194,159 @@ func (c *Controller) handleUpdateEndpoint(key string) error {
lb, oldLb = sctpLb, oldSctpLb
}

vip := util.JoinHostPort(settingIP, port.Port)
backends := getServicePortBackends(ep, pods, port, settingIP)
var (
vip, checkIP string
backends []string
ipPortMapping, externals map[string]string
)
vip = util.JoinHostPort(lbVip, port.Port)

if !ignoreHealthCheck {
if checkIP, err = c.getHealthCheckVip(vpcName, subnetName, lbVip); err != nil {
return err
}

externals = map[string]string{
util.SwitchLBRuleSubnet: subnetName,
}
}

ipPortMapping, backends = getIPPortMappingBackend(ep, pods, port, lbVip, checkIP, ignoreHealthCheck)

// for performance reason delete lb with no backends
if len(backends) != 0 {
klog.V(3).Infof("update vip %s with backends %s to LB %s", vip, backends, lb)
klog.Infof("add vip endpoint %s, backends %v to LB %s", vip, backends, lb)
if err = c.OVNNbClient.LoadBalancerAddVip(lb, vip, backends...); err != nil {
klog.Errorf("failed to add vip %s with backends %s to LB %s: %v", vip, backends, lb, err)
klog.Errorf("failed to add vip %s with backends %s to LB %s: %v", lbVip, backends, lb, err)
return err
}
if !ignoreHealthCheck && len(ipPortMapping) != 0 {
klog.Infof("add health check ip port mapping %v to LB %s", ipPortMapping, lb)
if err = c.OVNNbClient.LoadBalancerAddHealthCheck(lb, vip, ignoreHealthCheck, ipPortMapping, externals); err != nil {
klog.Errorf("failed to add health check for vip %s with ip port mapping %s to LB %s: %v", lbVip, ipPortMapping, lb, err)
return err
}
}
} else {
klog.V(3).Infof("delete vip %s from LB %s", vip, lb)
if err := c.OVNNbClient.LoadBalancerDeleteVip(lb, vip); err != nil {
klog.Errorf("failed to delete vip %s from LB %s: %v", vip, lb, err)
klog.V(3).Infof("delete vip endpoint %s from LB %s", vip, lb)
if err = c.OVNNbClient.LoadBalancerDeleteVip(lb, vip, ignoreHealthCheck); err != nil {
klog.Errorf("failed to delete vip endpoint %s from LB %s: %v", vip, lb, err)
return err
}
klog.V(3).Infof("delete vip %s from old LB %s", vip, lb)
if err := c.OVNNbClient.LoadBalancerDeleteVip(oldLb, vip); err != nil {
klog.Errorf("failed to delete vip %s from LB %s: %v", vip, lb, err)

klog.V(3).Infof("delete vip endpoint %s from old LB %s", vip, oldLb)
if err = c.OVNNbClient.LoadBalancerDeleteVip(oldLb, vip, ignoreHealthCheck); err != nil {
klog.Errorf("failed to delete vip %s from LB %s: %v", vip, oldLb, err)
return err
}
}
}
}

return nil
}

func getServicePortBackends(endpoints *v1.Endpoints, pods []*v1.Pod, servicePort v1.ServicePort, serviceIP string) []string {
backends := []string{}
protocol := util.CheckProtocol(serviceIP)
func (c *Controller) getVpcSubnetName(pods []*v1.Pod, endpoints *v1.Endpoints, service *v1.Service) (string, string) {
var (
vpcName string
subnetName string
)

for _, pod := range pods {
if len(pod.Annotations) == 0 {
continue
}

if subnetName == "" {
subnetName = pod.Annotations[util.LogicalSwitchAnnotation]
}

LOOP:
for _, subset := range endpoints.Subsets {
for _, addr := range subset.Addresses {
if addr.IP == pod.Status.PodIP {
if vpcName == "" {
vpcName = pod.Annotations[util.LogicalRouterAnnotation]
}

if vpcName != "" {
break LOOP
}
}
}
}

if vpcName != "" && subnetName != "" {
break
}
}

if vpcName == "" {
if vpcName = service.Annotations[util.VpcAnnotation]; vpcName == "" {
vpcName = c.config.ClusterRouter
}
}

if subnetName == "" {
subnetName = util.DefaultSubnet
}

return vpcName, subnetName
}

func (c *Controller) getHealthCheckVip(vpcName, subnetName, lbVip string) (string, error) {
var (
checkVip *kubeovnv1.Vip
checkIP string
err error
)

if checkVip, err = c.config.KubeOvnClient.KubeovnV1().Vips().Get(context.Background(), subnetName, metav1.GetOptions{}); err != nil {
if errors.IsNotFound(err) {
if checkVip, err = c.config.KubeOvnClient.
KubeovnV1().
Vips().
Create(context.Background(),
&kubeovnv1.Vip{
ObjectMeta: metav1.ObjectMeta{
Name: subnetName,
},
Spec: kubeovnv1.VipSpec{
Subnet: subnetName,
},
},
metav1.CreateOptions{},
); err != nil {
klog.Errorf("failed to create health check vip from vpc %s subnet %s, %v", vpcName, subnetName, err)
return checkIP, err
}
} else {
klog.Errorf("failed to get health check vip from vpc %s subnet %s, %v", vpcName, subnetName, err)
return checkIP, err
}
}

switch util.CheckProtocol(lbVip) {
case kubeovnv1.ProtocolIPv4:
checkIP = checkVip.Status.V4ip
case kubeovnv1.ProtocolIPv6:
checkIP = checkVip.Status.V6ip
}
if checkIP == "" {
err = fmt.Errorf("failed to get health check vip from vpc %s subnet %s", vpcName, subnetName)
klog.Error(err)
return checkIP, err
}

return checkIP, nil
}

func getIPPortMappingBackend(endpoints *v1.Endpoints, pods []*v1.Pod, servicePort v1.ServicePort, serviceIP, checkVip string, ignoreHealthCheck bool) (map[string]string, []string) {
var (
ipPortMapping = map[string]string{}
backends = []string{}
protocol = util.CheckProtocol(serviceIP)
)

for _, subset := range endpoints.Subsets {
var targetPort int32
for _, port := range subset.Ports {
Expand All @@ -233,13 +360,16 @@ func getServicePortBackends(endpoints *v1.Endpoints, pods []*v1.Pod, servicePort
}

for _, address := range subset.Addresses {
if !ignoreHealthCheck && address.TargetRef.Name != "" {
ipName := fmt.Sprintf("%s.%s", address.TargetRef.Name, endpoints.Namespace)
ipPortMapping[address.IP] = fmt.Sprintf(util.HealthCheckNamedVipTemplate, ipName, checkVip)
}
if address.TargetRef == nil || address.TargetRef.Kind != "Pod" {
if util.CheckProtocol(address.IP) == protocol {
backends = append(backends, util.JoinHostPort(address.IP, targetPort))
}
continue
}

var ip string
for _, pod := range pods {
if pod.Name == address.TargetRef.Name {
Expand All @@ -265,5 +395,5 @@ func getServicePortBackends(endpoints *v1.Endpoints, pods []*v1.Pod, servicePort
}
}

return backends
return ipPortMapping, backends
}
File renamed without changes.
Loading

0 comments on commit 127a87a

Please sign in to comment.