Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add concurrency limiter to ovs-vsctl #3288

Merged
merged 2 commits into from
Oct 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions charts/templates/ovncni-ds.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ spec:
- --enable-metrics={{- .Values.networking.ENABLE_METRICS }}
- --kubelet-dir={{ .Values.kubelet_conf.KUBELET_DIR }}
- --enable-tproxy={{ .Values.func.ENABLE_TPROXY }}
- --ovs-vsctl-concurrency={{ .Values.performance.OVS_VSCTL_CONCURRENCY }}
securityContext:
runAsUser: 0
privileged: true
Expand Down
1 change: 1 addition & 0 deletions charts/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ performance:
RPMS: "openvswitch-kmod"
GC_INTERVAL: 360
INSPECT_INTERVAL: 20
OVS_VSCTL_CONCURRENCY: 100

debug:
ENABLE_MIRROR: false
Expand Down
3 changes: 3 additions & 0 deletions cmd/daemon/cniserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

kubeovninformer "github.com/kubeovn/kube-ovn/pkg/client/informers/externalversions"
"github.com/kubeovn/kube-ovn/pkg/daemon"
"github.com/kubeovn/kube-ovn/pkg/ovs"
"github.com/kubeovn/kube-ovn/pkg/util"
"github.com/kubeovn/kube-ovn/versions"
)
Expand All @@ -37,6 +38,8 @@ func CmdMain() {
util.LogFatalAndExit(err, "failed to do the OS initialization")
}

ovs.UpdateOVSVsctlLimiter(config.OVSVsctlConcurrency)

nicBridgeMappings, err := daemon.InitOVSBridges()
if err != nil {
util.LogFatalAndExit(err, "failed to initialize OVS bridges")
Expand Down
2 changes: 2 additions & 0 deletions dist/images/install.sh
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ IFACE=${IFACE:-}
DPDK_TUNNEL_IFACE=${DPDK_TUNNEL_IFACE:-br-phy}
ENABLE_BIND_LOCAL_IP=${ENABLE_BIND_LOCAL_IP:-true}
ENABLE_TPROXY=${ENABLE_TPROXY:-false}
OVS_VSCTL_CONCURRENCY=${OVS_VSCTL_CONCURRENCY:-100}

# debug
DEBUG_WRAPPER=${DEBUG_WRAPPER:-}
Expand Down Expand Up @@ -4065,6 +4066,7 @@ spec:
- --log_file_max_size=0
- --kubelet-dir=$KUBELET_DIR
- --enable-tproxy=$ENABLE_TPROXY
- --ovs-vsctl-concurrency=$OVS_VSCTL_CONCURRENCY
securityContext:
runAsUser: 0
privileged: true
Expand Down
3 changes: 3 additions & 0 deletions pkg/daemon/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ type Configuration struct {
TCPConnCheckPort int
UDPConnCheckPort int
EnableTProxy bool
OVSVsctlConcurrency int32
}

// ParseFlags will parse cmd args then init kubeClient and configuration
Expand Down Expand Up @@ -102,6 +103,7 @@ func ParseFlags() *Configuration {
argTCPConnectivityCheckPort = pflag.Int("tcp-conn-check-port", 8100, "TCP connectivity Check Port")
argUDPConnectivityCheckPort = pflag.Int("udp-conn-check-port", 8101, "UDP connectivity Check Port")
argEnableTProxy = pflag.Bool("enable-tproxy", false, "enable tproxy for vpc pod liveness or readiness probe")
argOVSVsctlConcurrency = pflag.Int32("ovs-vsctl-concurrency", 100, "concurrency limit of ovs-vsctl")
)

// mute info log for ipset lib
Expand Down Expand Up @@ -157,6 +159,7 @@ func ParseFlags() *Configuration {
TCPConnCheckPort: *argTCPConnectivityCheckPort,
UDPConnCheckPort: *argUDPConnectivityCheckPort,
EnableTProxy: *argEnableTProxy,
OVSVsctlConcurrency: *argOVSVsctlConcurrency,
}
return config
}
Expand Down
43 changes: 38 additions & 5 deletions pkg/ovs/ovs-vsctl.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package ovs

import (
"context"
"fmt"
"os/exec"
"regexp"
Expand All @@ -13,25 +14,57 @@ import (
"github.com/kubeovn/kube-ovn/pkg/util"
)

var limiter *Limiter

func init() {
limiter = new(Limiter)
}

func UpdateOVSVsctlLimiter(c int32) {
if c >= 0 {
limiter.Update(c)
klog.V(4).Infof("update ovs-vsctl concurrency limit to %d", limiter.Limit())
}
}

// Glory belongs to openvswitch/ovn-kubernetes
// https://github.com/openvswitch/ovn-kubernetes/blob/master/go-controller/pkg/util/ovs.go

var podNetNsRegexp = regexp.MustCompile(`pod_netns="([^"]+)"`)

func Exec(args ...string) (string, error) {
start := time.Now()
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

var (
start time.Time
elapsed float64
output []byte
method, code string
err error
)

if err = limiter.Wait(ctx); err != nil {
klog.V(4).Infof("command %s %s waiting for execution timeout by concurrency limit of %d", OvsVsCtl, strings.Join(args, " "), limiter.Limit())
return "", err
}
defer limiter.Done()
klog.V(4).Infof("command %s %s waiting for execution concurrency %d/%d", OvsVsCtl, strings.Join(args, " "), limiter.Current(), limiter.Limit())

start = time.Now()
args = append([]string{"--timeout=30"}, args...)
output, err := exec.Command(OvsVsCtl, args...).CombinedOutput()
elapsed := float64((time.Since(start)) / time.Millisecond)
output, err = exec.Command(OvsVsCtl, args...).CombinedOutput()
elapsed = float64((time.Since(start)) / time.Millisecond)
klog.V(4).Infof("command %s %s in %vms", OvsVsCtl, strings.Join(args, " "), elapsed)
method := ""

for _, arg := range args {
if !strings.HasPrefix(arg, "--") {
method = arg
break
}
}
code := "0"

code = "0"
defer func() {
ovsClientRequestLatency.WithLabelValues("ovsdb", method, code).Observe(elapsed)
}()
Expand Down
44 changes: 44 additions & 0 deletions pkg/ovs/util.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package ovs

import (
"context"
"fmt"
"regexp"
"strings"
"sync/atomic"
"time"

kubeovnv1 "github.com/kubeovn/kube-ovn/pkg/apis/kubeovn/v1"
"github.com/kubeovn/kube-ovn/pkg/util"
Expand Down Expand Up @@ -235,3 +238,44 @@ func (m aclMatch) String() string {
rule, _ := m.Match()
return rule
}

type Limiter struct {
limit int32
current int32
}

func (l *Limiter) Limit() int32 {
return l.limit
}

func (l *Limiter) Current() int32 {
return atomic.LoadInt32(&l.current)
}

func (l *Limiter) Update(limit int32) {
l.limit = limit
}

func (l *Limiter) Wait(ctx context.Context) error {
for {
select {
case <-ctx.Done():
return fmt.Errorf("context canceled by timeout")
default:
if l.limit == 0 {
atomic.AddInt32(&l.current, 1)
return nil
}

if atomic.LoadInt32(&l.current) < l.limit {
atomic.AddInt32(&l.current, 1)
return nil
}
time.Sleep(10 * time.Millisecond)
}
}
}

func (l *Limiter) Done() {
atomic.AddInt32(&l.current, -1)
}
91 changes: 91 additions & 0 deletions pkg/ovs/util_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package ovs

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -229,3 +231,92 @@ func Test_OrAclMatch_Match(t *testing.T) {
require.ErrorContains(t, err, "acl rule key is required")
})
}

func Test_Limiter(t *testing.T) {
t.Parallel()

t.Run("without limit", func(t *testing.T) {
t.Parallel()

var (
limiter *Limiter
err error
)

limiter = new(Limiter)

err = limiter.Wait(context.Background())
require.NoError(t, err)
require.Equal(t, int32(1), limiter.Current())

err = limiter.Wait(context.Background())
require.NoError(t, err)
require.Equal(t, int32(2), limiter.Current())

limiter.Done()
require.Equal(t, int32(1), limiter.Current())

limiter.Done()
require.Equal(t, int32(0), limiter.Current())
})

t.Run("with limit", func(t *testing.T) {
t.Parallel()

var (
limiter *Limiter
err error
)

limiter = new(Limiter)
limiter.Update(2)

err = limiter.Wait(context.Background())
require.NoError(t, err)
require.Equal(t, int32(1), limiter.Current())

err = limiter.Wait(context.Background())
require.NoError(t, err)
require.Equal(t, int32(2), limiter.Current())

time.AfterFunc(10*time.Second, func() {
limiter.Done()
require.Equal(t, int32(1), limiter.Current())
})

err = limiter.Wait(context.Background())
require.NoError(t, err)
require.Equal(t, int32(2), limiter.Current())
})

t.Run("with timeout", func(t *testing.T) {
t.Parallel()

var (
limiter *Limiter
err error
)

limiter = new(Limiter)
limiter.Update(2)

err = limiter.Wait(context.Background())
require.NoError(t, err)
require.Equal(t, int32(1), limiter.Current())

err = limiter.Wait(context.Background())
require.NoError(t, err)
require.Equal(t, int32(2), limiter.Current())

time.AfterFunc(10*time.Second, func() {
limiter.Done()
})

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

err = limiter.Wait(ctx)
require.ErrorContains(t, err, "context canceled by timeout")
require.Equal(t, int32(2), limiter.Current())
})
}
1 change: 1 addition & 0 deletions yamls/kube-ovn-dual-stack.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ spec:
- --log_file=/var/log/kube-ovn/kube-ovn-cni.log
- --log_file_max_size=0
- --enable-tproxy=false
- --ovs-vsctl-concurrency=100
securityContext:
runAsUser: 0
privileged: true
Expand Down
1 change: 1 addition & 0 deletions yamls/kube-ovn-ipv6.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ spec:
- --log_file=/var/log/kube-ovn/kube-ovn-cni.log
- --log_file_max_size=0
- --enable-tproxy=false
- --ovs-vsctl-concurrency=100
securityContext:
runAsUser: 0
privileged: true
Expand Down
Loading