Skip to content

Commit

Permalink
Merge pull request #472 from WangZzzhe/dev/overcommit-bindcpu
Browse files Browse the repository at this point in the history
feat(overcommit): add realtime overcommit advisor plugin
  • Loading branch information
luomingmeng authored Apr 28, 2024
2 parents 404d1b2 + 88bd3ae commit b1af056
Show file tree
Hide file tree
Showing 42 changed files with 4,242 additions and 81 deletions.
3 changes: 3 additions & 0 deletions cmd/base/context_fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"reflect"
"strconv"

nodev1alpha1 "github.com/kubewharf/katalyst-api/pkg/apis/node/v1alpha1"

"github.com/pkg/errors"
appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -241,6 +243,7 @@ func GenerateFakeGenericContext(objects ...[]runtime.Object) (*GenericContext, e
utilruntime.Must(v1alpha1.AddToScheme(scheme))
utilruntime.Must(overcommitapis.AddToScheme(scheme))
utilruntime.Must(apiregistration.AddToScheme(scheme))
utilruntime.Must(nodev1alpha1.AddToScheme(scheme))

fakeMetaClient := metaFake.NewSimpleMetadataClient(scheme, nilObjectFilter(metaObjects)...)
fakeInternalClient := externalfake.NewSimpleClientset(nilObjectFilter(internalObjects)...)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
Copyright 2022 The Katalyst Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package overcommit

import (
"k8s.io/apimachinery/pkg/util/errors"
cliflag "k8s.io/component-base/cli/flag"

"github.com/kubewharf/katalyst-core/cmd/katalyst-agent/app/options/sysadvisor/overcommit/realtime"
"github.com/kubewharf/katalyst-core/pkg/config/agent/sysadvisor/overcommit"
)

type OvercommitAwarePluginOptions struct {
*realtime.RealtimeOvercommitOptions
}

// NewOvercommitAwarePluginOptions creates a new Options with a default config.
func NewOvercommitAwarePluginOptions() *OvercommitAwarePluginOptions {
return &OvercommitAwarePluginOptions{
RealtimeOvercommitOptions: realtime.NewRealtimeOvercommitOptions(),
}
}

func (o *OvercommitAwarePluginOptions) AddFlags(fss *cliflag.NamedFlagSets) {
fs := fss.FlagSet("overcommit_aware_plugin")

o.RealtimeOvercommitOptions.AddFlags(fs)
}

func (o *OvercommitAwarePluginOptions) ApplyTo(c *overcommit.OvercommitAwarePluginConfiguration) error {
var errList []error

errList = append(errList, o.RealtimeOvercommitOptions.ApplyTo(c.RealtimeOvercommitConfiguration))

return errors.NewAggregate(errList)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
Copyright 2022 The Katalyst Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package realtime

import (
"time"

"github.com/spf13/pflag"

"github.com/kubewharf/katalyst-core/pkg/config/agent/sysadvisor/overcommit/realtime"
)

type RealtimeOvercommitOptions struct {
SyncPeriod time.Duration
SyncPodTimeout time.Duration

TargetCPULoad float64
TargetMemoryLoad float64
EstimatedPodCPULoad float64
EstimatedPodMemoryLoad float64

CPUMetricsToGather []string
MemoryMetricsToGather []string
}

func NewRealtimeOvercommitOptions() *RealtimeOvercommitOptions {
return &RealtimeOvercommitOptions{
SyncPeriod: 10 * time.Second,
SyncPodTimeout: 2 * time.Second,
TargetCPULoad: 0.6,
TargetMemoryLoad: 0.8,
EstimatedPodCPULoad: 0.4,
EstimatedPodMemoryLoad: 0.8,

CPUMetricsToGather: []string{},
MemoryMetricsToGather: []string{},
}
}

func (r *RealtimeOvercommitOptions) AddFlags(fs *pflag.FlagSet) {
fs.DurationVar(&r.SyncPeriod, "realtime-overcommit-sync-period", r.SyncPeriod,
"period for realtime overcommit advisor to calculate node resource overcommit ratio")
fs.DurationVar(&r.SyncPodTimeout, "realtime-overcommit-sync-pod-timeout", r.SyncPodTimeout,
"timeout for realtime overcommit advisor to list pod")
fs.Float64Var(&r.TargetCPULoad, "realtime-overcommit-CPU-targetload", r.TargetCPULoad,
"target node load for realtime overcommit advisor to calculate node CPU overcommit ratio")
fs.Float64Var(&r.TargetMemoryLoad, "realtime-overcommit-mem-targetload", r.TargetMemoryLoad,
"target node load for realtime overcommit advisor to calculate node memory overcommit ratio")
fs.Float64Var(&r.EstimatedPodCPULoad, "realtime-overcommit-estimated-cpuload", r.EstimatedPodCPULoad,
"estimated pod load for realtime overcommit advisor to calculate node CPU overcommit ratio")
fs.Float64Var(&r.EstimatedPodMemoryLoad, "realtime-overcommit-estimated-memload", r.EstimatedPodMemoryLoad,
"estimated pod load for realtime overcommit advisor to calculate node memory overcommit ratio")
fs.StringSliceVar(&r.CPUMetricsToGather, "CPU-metrics-to-gather", r.CPUMetricsToGather,
"metrics list used to calculate node cpu overcommitment ratio")
fs.StringSliceVar(&r.MemoryMetricsToGather, "memory-metrics-to-gather", r.MemoryMetricsToGather,
"metrics list used to calculate node memory overcommitment ratio")
}

func (r *RealtimeOvercommitOptions) ApplyTo(o *realtime.RealtimeOvercommitConfiguration) error {
o.SyncPeriod = r.SyncPeriod
o.SyncPodTimeout = r.SyncPodTimeout

if r.TargetCPULoad > 0.0 && r.TargetMemoryLoad < 1.0 {
o.TargetCPULoad = r.TargetCPULoad
}
if r.TargetMemoryLoad > 0.0 && r.TargetMemoryLoad < 1.0 {
o.TargetMemoryLoad = r.TargetMemoryLoad
}
if r.EstimatedPodCPULoad > 0.0 && r.EstimatedPodCPULoad < 1.0 {
o.EstimatedPodCPULoad = r.EstimatedPodCPULoad
}
if r.EstimatedPodMemoryLoad > 0.0 && r.EstimatedPodMemoryLoad < 1.0 {
o.EstimatedPodMemoryLoad = r.EstimatedPodMemoryLoad
}

o.CPUMetricsToGather = r.CPUMetricsToGather
o.MemoryMetricsToGather = r.MemoryMetricsToGather

return nil
}
13 changes: 9 additions & 4 deletions cmd/katalyst-agent/app/options/sysadvisor/sysadvisor_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/kubewharf/katalyst-core/cmd/katalyst-agent/app/options/sysadvisor/inference"
"github.com/kubewharf/katalyst-core/cmd/katalyst-agent/app/options/sysadvisor/metacache"
metricemitter "github.com/kubewharf/katalyst-core/cmd/katalyst-agent/app/options/sysadvisor/metric-emitter"
"github.com/kubewharf/katalyst-core/cmd/katalyst-agent/app/options/sysadvisor/overcommit"
"github.com/kubewharf/katalyst-core/cmd/katalyst-agent/app/options/sysadvisor/qosaware"
"github.com/kubewharf/katalyst-core/pkg/config/agent/sysadvisor"
)
Expand Down Expand Up @@ -75,15 +76,17 @@ type SysAdvisorPluginsOptions struct {
*metacache.MetaCachePluginOptions
*metricemitter.MetricEmitterPluginOptions
*inference.InferencePluginOptions
*overcommit.OvercommitAwarePluginOptions
}

// NewSysAdvisorPluginsOptions creates a new Options with a default config.
func NewSysAdvisorPluginsOptions() *SysAdvisorPluginsOptions {
return &SysAdvisorPluginsOptions{
QoSAwarePluginOptions: qosaware.NewQoSAwarePluginOptions(),
MetaCachePluginOptions: metacache.NewMetaCachePluginOptions(),
MetricEmitterPluginOptions: metricemitter.NewMetricEmitterPluginOptions(),
InferencePluginOptions: inference.NewInferencePluginOptions(),
QoSAwarePluginOptions: qosaware.NewQoSAwarePluginOptions(),
MetaCachePluginOptions: metacache.NewMetaCachePluginOptions(),
MetricEmitterPluginOptions: metricemitter.NewMetricEmitterPluginOptions(),
InferencePluginOptions: inference.NewInferencePluginOptions(),
OvercommitAwarePluginOptions: overcommit.NewOvercommitAwarePluginOptions(),
}
}

Expand All @@ -93,6 +96,7 @@ func (o *SysAdvisorPluginsOptions) AddFlags(fss *cliflag.NamedFlagSets) {
o.MetaCachePluginOptions.AddFlags(fss)
o.MetricEmitterPluginOptions.AddFlags(fss)
o.InferencePluginOptions.AddFlags(fss)
o.OvercommitAwarePluginOptions.AddFlags(fss)
}

// ApplyTo fills up config with options
Expand All @@ -102,6 +106,7 @@ func (o *SysAdvisorPluginsOptions) ApplyTo(c *sysadvisor.SysAdvisorPluginsConfig
errList = append(errList, o.MetaCachePluginOptions.ApplyTo(c.MetaCachePluginConfiguration))
errList = append(errList, o.MetricEmitterPluginOptions.ApplyTo(c.MetricEmitterPluginConfiguration))
errList = append(errList, o.InferencePluginOptions.ApplyTo(c.InferencePluginConfiguration))
errList = append(errList, o.OvercommitAwarePluginOptions.ApplyTo(c.OvercommitAwarePluginConfiguration))
return errors.NewAggregate(errList)
}

Expand Down
5 changes: 3 additions & 2 deletions cmd/katalyst-scheduler/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,8 +357,9 @@ func Setup(ctx context.Context, opts *options.Options, outOfTreeRegistryOptions
return nil, nil, err
}

eventhandlers.AddCNREventHandler(cc.InformerFactory, cc.InternalInformerFactory)
eventhandlers.AddPodEventHandler(cc.InformerFactory, cc.InternalInformerFactory)
for _, handlerFunc := range eventhandlers.ListEventHandlerFunc() {
handlerFunc(cc.InformerFactory, cc.InternalInformerFactory)
}

return &cc, sched, nil
}
2 changes: 2 additions & 0 deletions cmd/katalyst-scheduler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"k8s.io/component-base/logs"

"github.com/kubewharf/katalyst-core/cmd/katalyst-scheduler/app"
"github.com/kubewharf/katalyst-core/pkg/scheduler/plugins/nodeovercommitment"
"github.com/kubewharf/katalyst-core/pkg/scheduler/plugins/noderesourcetopology"
"github.com/kubewharf/katalyst-core/pkg/scheduler/plugins/qosawarenoderesources"

Expand All @@ -38,6 +39,7 @@ func main() {
app.WithPlugin(qosawarenoderesources.FitName, qosawarenoderesources.NewFit),
app.WithPlugin(qosawarenoderesources.BalancedAllocationName, qosawarenoderesources.NewBalancedAllocation),
app.WithPlugin(noderesourcetopology.TopologyMatchName, noderesourcetopology.New),
app.WithPlugin(nodeovercommitment.Name, nodeovercommitment.New),
)

if err := runCommand(command); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ require (
github.com/google/cadvisor v0.44.2
github.com/google/uuid v1.3.0
github.com/klauspost/cpuid/v2 v2.2.6
github.com/kubewharf/katalyst-api v0.4.1-0.20240416065828-9edab1e2f1f1
github.com/kubewharf/katalyst-api v0.4.1-0.20240423064035-1a0977f4e08c
github.com/montanaflynn/stats v0.7.1
github.com/opencontainers/runc v1.1.6
github.com/opencontainers/selinux v1.10.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -554,8 +554,8 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/kubewharf/katalyst-api v0.4.1-0.20240416065828-9edab1e2f1f1 h1:wRSFa6v3ONl2D8ZsEyIj3O/I2euSqbwXgWsPdy7w6oY=
github.com/kubewharf/katalyst-api v0.4.1-0.20240416065828-9edab1e2f1f1/go.mod h1:Y2IeIorxQamF2a3oa0+URztl5QCSty6Jj3zD83R8J9k=
github.com/kubewharf/katalyst-api v0.4.1-0.20240423064035-1a0977f4e08c h1:8H3twj9fHNtR06Tn5quV7oJuDsuP2R8wFeNlQvJTHB8=
github.com/kubewharf/katalyst-api v0.4.1-0.20240423064035-1a0977f4e08c/go.mod h1:Y2IeIorxQamF2a3oa0+URztl5QCSty6Jj3zD83R8J9k=
github.com/kubewharf/kubelet v1.24.6-kubewharf.8 h1:2e89T/nZTgzaVhyRsZuwEdRk8V8kJXs4PRkgfeG4Ai4=
github.com/kubewharf/kubelet v1.24.6-kubewharf.8/go.mod h1:MxbSZUx3wXztFneeelwWWlX7NAAStJ6expqq7gY2J3c=
github.com/kyoh86/exportloopref v0.1.7/go.mod h1:h1rDl2Kdj97+Kwh4gdz3ujE7XHmH51Q0lUiZ1z4NLj8=
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
Copyright 2022 The Katalyst Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package overcommitmentaware

import (
"context"

"github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/metacache"
"github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/plugin"
"github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/plugin/overcommitmentaware/realtime"
"github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/plugin/overcommitmentaware/reporter"
"github.com/kubewharf/katalyst-core/pkg/config"
"github.com/kubewharf/katalyst-core/pkg/metaserver"
"github.com/kubewharf/katalyst-core/pkg/metrics"
metricspool "github.com/kubewharf/katalyst-core/pkg/metrics/metrics-pool"
)

const (
PluginName = "overcommitment-aware-plugin"
)

// OvercommitmentAwarePlugin calculates node overcommitment ratio,
// values will be reported to node KCNR annotations by the reporter.
type OvercommitmentAwarePlugin struct {
name string

realtimeAdvisor *realtime.RealtimeOvercommitmentAdvisor
reporter reporter.OvercommitRatioReporter

emitter metrics.MetricEmitter
}

func NewOvercommitmentAwarePlugin(
pluginName string, conf *config.Configuration,
_ interface{},
emitterPool metricspool.MetricsEmitterPool,
metaServer *metaserver.MetaServer,
_ metacache.MetaCache,
) (plugin.SysAdvisorPlugin, error) {
emitter := emitterPool.GetDefaultMetricsEmitter()

realtimeOvercommitmentAdvisor := realtime.NewRealtimeOvercommitmentAdvisor(conf, metaServer, emitter)

overcommitRatioReporter, err := reporter.NewOvercommitRatioReporter(emitter, conf, realtimeOvercommitmentAdvisor, metaServer)
if err != nil {
return nil, err
}

op := &OvercommitmentAwarePlugin{
name: pluginName,

realtimeAdvisor: realtimeOvercommitmentAdvisor,
reporter: overcommitRatioReporter,
}

return op, nil
}

func (op *OvercommitmentAwarePlugin) Run(ctx context.Context) {
go op.realtimeAdvisor.Run(ctx)

go op.reporter.Run(ctx)
}

func (op *OvercommitmentAwarePlugin) Name() string {
return op.name
}

func (op *OvercommitmentAwarePlugin) Init() error {
return nil
}
Loading

0 comments on commit b1af056

Please sign in to comment.