diff --git a/charts/templates/ovncni-ds.yaml b/charts/templates/ovncni-ds.yaml index 16f6a142c74a..3cee02330d94 100644 --- a/charts/templates/ovncni-ds.yaml +++ b/charts/templates/ovncni-ds.yaml @@ -79,6 +79,7 @@ spec: - --enable-metrics={{- .Values.networking.ENABLE_METRICS }} - --kubelet-dir={{ .Values.kubelet_conf.KUBELET_DIR }} - --enable-tproxy={{ .Values.func.ENABLE_TPROXY }} + - --ovs-vsctl-concurrency={{ .Values.performance.OVS_VSCTL_CONCURRENCY }} securityContext: runAsUser: 0 privileged: true diff --git a/charts/values.yaml b/charts/values.yaml index ad947b229700..861c3b56ef44 100644 --- a/charts/values.yaml +++ b/charts/values.yaml @@ -97,6 +97,7 @@ performance: RPMS: "openvswitch-kmod" GC_INTERVAL: 360 INSPECT_INTERVAL: 20 + OVS_VSCTL_CONCURRENCY: 100 debug: ENABLE_MIRROR: false diff --git a/cmd/daemon/cniserver.go b/cmd/daemon/cniserver.go index a8776c09bebc..58d6a105df08 100644 --- a/cmd/daemon/cniserver.go +++ b/cmd/daemon/cniserver.go @@ -20,6 +20,7 @@ import ( kubeovninformer "github.com/kubeovn/kube-ovn/pkg/client/informers/externalversions" "github.com/kubeovn/kube-ovn/pkg/daemon" + "github.com/kubeovn/kube-ovn/pkg/ovs" "github.com/kubeovn/kube-ovn/pkg/util" "github.com/kubeovn/kube-ovn/versions" ) @@ -37,6 +38,8 @@ func CmdMain() { util.LogFatalAndExit(err, "failed to do the OS initialization") } + ovs.UpdateOVSVsctlLimiter(config.OVSVsctlConcurrency) + nicBridgeMappings, err := daemon.InitOVSBridges() if err != nil { util.LogFatalAndExit(err, "failed to initialize OVS bridges") diff --git a/dist/images/install.sh b/dist/images/install.sh index 30e21c08216c..7bfde7d4eb86 100755 --- a/dist/images/install.sh +++ b/dist/images/install.sh @@ -33,6 +33,7 @@ IFACE=${IFACE:-} DPDK_TUNNEL_IFACE=${DPDK_TUNNEL_IFACE:-br-phy} ENABLE_BIND_LOCAL_IP=${ENABLE_BIND_LOCAL_IP:-true} ENABLE_TPROXY=${ENABLE_TPROXY:-false} +OVS_VSCTL_CONCURRENCY=${OVS_VSCTL_CONCURRENCY:-100} # debug DEBUG_WRAPPER=${DEBUG_WRAPPER:-} @@ -4065,6 +4066,7 @@ spec: - --log_file_max_size=0 - --kubelet-dir=$KUBELET_DIR - --enable-tproxy=$ENABLE_TPROXY + - --ovs-vsctl-concurrency=$OVS_VSCTL_CONCURRENCY securityContext: runAsUser: 0 privileged: true diff --git a/pkg/daemon/config.go b/pkg/daemon/config.go index 6396caa6dd01..bfcfb8f689b0 100644 --- a/pkg/daemon/config.go +++ b/pkg/daemon/config.go @@ -64,6 +64,7 @@ type Configuration struct { TCPConnCheckPort int UDPConnCheckPort int EnableTProxy bool + OVSVsctlConcurrency int32 } // ParseFlags will parse cmd args then init kubeClient and configuration @@ -102,6 +103,7 @@ func ParseFlags() *Configuration { argTCPConnectivityCheckPort = pflag.Int("tcp-conn-check-port", 8100, "TCP connectivity Check Port") argUDPConnectivityCheckPort = pflag.Int("udp-conn-check-port", 8101, "UDP connectivity Check Port") argEnableTProxy = pflag.Bool("enable-tproxy", false, "enable tproxy for vpc pod liveness or readiness probe") + argOVSVsctlConcurrency = pflag.Int32("ovs-vsctl-concurrency", 100, "concurrency limit of ovs-vsctl") ) // mute info log for ipset lib @@ -157,6 +159,7 @@ func ParseFlags() *Configuration { TCPConnCheckPort: *argTCPConnectivityCheckPort, UDPConnCheckPort: *argUDPConnectivityCheckPort, EnableTProxy: *argEnableTProxy, + OVSVsctlConcurrency: *argOVSVsctlConcurrency, } return config } diff --git a/pkg/ovs/ovs-vsctl.go b/pkg/ovs/ovs-vsctl.go index 681e312c0f59..0528694be9eb 100644 --- a/pkg/ovs/ovs-vsctl.go +++ b/pkg/ovs/ovs-vsctl.go @@ -1,6 +1,7 @@ package ovs import ( + "context" "fmt" "os/exec" "regexp" @@ -13,25 +14,57 @@ import ( "github.com/kubeovn/kube-ovn/pkg/util" ) +var limiter *Limiter + +func init() { + limiter = new(Limiter) +} + +func UpdateOVSVsctlLimiter(c int32) { + if c > 0 { + limiter.Update(c) + klog.V(4).Infof("update ovs-vsctl concurrency limit to %d", limiter.Limit()) + } +} + // Glory belongs to openvswitch/ovn-kubernetes // https://github.com/openvswitch/ovn-kubernetes/blob/master/go-controller/pkg/util/ovs.go var podNetNsRegexp = regexp.MustCompile(`pod_netns="([^"]+)"`) func Exec(args ...string) (string, error) { - start := time.Now() + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + var ( + start time.Time + elapsed float64 + output []byte + method, code string + err error + ) + + if err = limiter.Wait(ctx); err != nil { + klog.V(4).Infof("command %s %s waiting for execution timeout by concurrency limit of %d", OvsVsCtl, strings.Join(args, " "), limiter.Limit()) + return "", err + } + defer limiter.Done() + klog.V(4).Infof("command %s %s waiting for execution concurrency %d/%d", OvsVsCtl, strings.Join(args, " "), limiter.Current(), limiter.Limit()) + + start = time.Now() args = append([]string{"--timeout=30"}, args...) - output, err := exec.Command(OvsVsCtl, args...).CombinedOutput() - elapsed := float64((time.Since(start)) / time.Millisecond) + output, err = exec.Command(OvsVsCtl, args...).CombinedOutput() + elapsed = float64((time.Since(start)) / time.Millisecond) klog.V(4).Infof("command %s %s in %vms", OvsVsCtl, strings.Join(args, " "), elapsed) - method := "" + for _, arg := range args { if !strings.HasPrefix(arg, "--") { method = arg break } } - code := "0" + + code = "0" defer func() { ovsClientRequestLatency.WithLabelValues("ovsdb", method, code).Observe(elapsed) }() diff --git a/pkg/ovs/util.go b/pkg/ovs/util.go index 43cd59a920e0..0bc9f98dd402 100644 --- a/pkg/ovs/util.go +++ b/pkg/ovs/util.go @@ -1,9 +1,12 @@ package ovs import ( + "context" "fmt" "regexp" "strings" + "sync/atomic" + "time" kubeovnv1 "github.com/kubeovn/kube-ovn/pkg/apis/kubeovn/v1" "github.com/kubeovn/kube-ovn/pkg/util" @@ -235,3 +238,44 @@ func (m aclMatch) String() string { rule, _ := m.Match() return rule } + +type Limiter struct { + limit int32 + current int32 +} + +func (l *Limiter) Limit() int32 { + return l.limit +} + +func (l *Limiter) Current() int32 { + return l.current +} + +func (l *Limiter) Update(limit int32) { + l.limit = limit +} + +func (l *Limiter) Wait(ctx context.Context) error { + for { + select { + case <-ctx.Done(): + return fmt.Errorf("context canceled by timeout") + default: + if l.limit == 0 { + atomic.AddInt32(&l.current, 1) + return nil + } + + if atomic.LoadInt32(&l.current) < l.limit { + atomic.AddInt32(&l.current, 1) + return nil + } + time.Sleep(10 * time.Millisecond) + } + } +} + +func (l *Limiter) Done() { + atomic.AddInt32(&l.current, -1) +} diff --git a/pkg/ovs/util_test.go b/pkg/ovs/util_test.go index 0fd0c57e9e26..9a54a9fd8013 100644 --- a/pkg/ovs/util_test.go +++ b/pkg/ovs/util_test.go @@ -1,7 +1,9 @@ package ovs import ( + "context" "testing" + "time" "github.com/stretchr/testify/require" ) @@ -229,3 +231,92 @@ func Test_OrAclMatch_Match(t *testing.T) { require.ErrorContains(t, err, "acl rule key is required") }) } + +func Test_Limiter(t *testing.T) { + t.Parallel() + + t.Run("without limit", func(t *testing.T) { + t.Parallel() + + var ( + limiter *Limiter + err error + ) + + limiter = new(Limiter) + + err = limiter.Wait(context.Background()) + require.NoError(t, err) + require.Equal(t, int32(1), limiter.Current()) + + err = limiter.Wait(context.Background()) + require.NoError(t, err) + require.Equal(t, int32(2), limiter.Current()) + + limiter.Done() + require.Equal(t, int32(1), limiter.Current()) + + limiter.Done() + require.Equal(t, int32(0), limiter.Current()) + }) + + t.Run("with limit", func(t *testing.T) { + t.Parallel() + + var ( + limiter *Limiter + err error + ) + + limiter = new(Limiter) + limiter.Update(2) + + err = limiter.Wait(context.Background()) + require.NoError(t, err) + require.Equal(t, int32(1), limiter.Current()) + + err = limiter.Wait(context.Background()) + require.NoError(t, err) + require.Equal(t, int32(2), limiter.Current()) + + time.AfterFunc(10*time.Second, func() { + limiter.Done() + require.Equal(t, int32(1), limiter.Current()) + }) + + err = limiter.Wait(context.Background()) + require.NoError(t, err) + require.Equal(t, int32(2), limiter.Current()) + }) + + t.Run("with timeout", func(t *testing.T) { + t.Parallel() + + var ( + limiter *Limiter + err error + ) + + limiter = new(Limiter) + limiter.Update(2) + + err = limiter.Wait(context.Background()) + require.NoError(t, err) + require.Equal(t, int32(1), limiter.Current()) + + err = limiter.Wait(context.Background()) + require.NoError(t, err) + require.Equal(t, int32(2), limiter.Current()) + + time.AfterFunc(10*time.Second, func() { + limiter.Done() + }) + + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + err = limiter.Wait(ctx) + require.ErrorContains(t, err, "context canceled by timeout") + require.Equal(t, int32(2), limiter.Current()) + }) +} diff --git a/yamls/kube-ovn-dual-stack.yaml b/yamls/kube-ovn-dual-stack.yaml index a8cfe7a11ff6..dde6e05e04a0 100644 --- a/yamls/kube-ovn-dual-stack.yaml +++ b/yamls/kube-ovn-dual-stack.yaml @@ -201,6 +201,7 @@ spec: - --log_file=/var/log/kube-ovn/kube-ovn-cni.log - --log_file_max_size=0 - --enable-tproxy=false + - --ovs-vsctl-concurrency=100 securityContext: runAsUser: 0 privileged: true diff --git a/yamls/kube-ovn-ipv6.yaml b/yamls/kube-ovn-ipv6.yaml index a0d3e8a2b20b..1ecd23c9cfdd 100644 --- a/yamls/kube-ovn-ipv6.yaml +++ b/yamls/kube-ovn-ipv6.yaml @@ -201,6 +201,7 @@ spec: - --log_file=/var/log/kube-ovn/kube-ovn-cni.log - --log_file_max_size=0 - --enable-tproxy=false + - --ovs-vsctl-concurrency=100 securityContext: runAsUser: 0 privileged: true