diff --git a/cmd/katalyst-agent/app/options/qrm/network_plugin.go b/cmd/katalyst-agent/app/options/qrm/network_plugin.go index 913e18835..b2b68298f 100644 --- a/cmd/katalyst-agent/app/options/qrm/network_plugin.go +++ b/cmd/katalyst-agent/app/options/qrm/network_plugin.go @@ -26,6 +26,10 @@ import ( type NetworkOptions struct { PolicyName string NetClass NetClassOptions + ReservedBandwidth uint32 + EgressCapacityRate float32 + IngressCapacityRate float32 + SkipNetworkStateCorruption bool PodLevelNetClassAnnoKey string PodLevelNetAttributesAnnoKeys string IPv4ResourceAllocationAnnotationKey string @@ -51,6 +55,10 @@ func NewNetworkOptions() *NetworkOptions { return &NetworkOptions{ PolicyName: "static", PodLevelNetClassAnnoKey: consts.PodAnnotationNetClassKey, + ReservedBandwidth: 0, + EgressCapacityRate: 0.94, + IngressCapacityRate: 0.9, + SkipNetworkStateCorruption: false, PodLevelNetAttributesAnnoKeys: "", IPv4ResourceAllocationAnnotationKey: "qrm.katalyst.kubewharf.io/inet_addr", IPv6ResourceAllocationAnnotationKey: "qrm.katalyst.kubewharf.io/inet_addr_ipv6", @@ -74,6 +82,14 @@ func (o *NetworkOptions) AddFlags(fss *cliflag.NamedFlagSets) { o.NetClass.DedicatedCores, "net class id for dedicated_cores") fs.Uint32Var(&o.NetClass.SystemCores, "network-resource-plugin-class-id-system-cores", o.NetClass.SystemCores, "net class id for system_cores") + fs.Uint32Var(&o.ReservedBandwidth, "network-resource-plugin-reserved-bandwidth", + o.ReservedBandwidth, "reserved bandwidth for business-critical jobs") + fs.Float32Var(&o.EgressCapacityRate, "network-resource-plugin-egress-capacity-rate", + o.EgressCapacityRate, "ratio of available egress capacity to egress line speed") + fs.Float32Var(&o.IngressCapacityRate, "network-resource-plugin-ingress-capacity-rate", + o.IngressCapacityRate, "ratio of available ingress capacity to ingress line speed") + fs.BoolVar(&o.SkipNetworkStateCorruption, "skip-network-state-corruption", + o.SkipNetworkStateCorruption, "if set true, we will skip network state corruption") fs.StringVar(&o.PodLevelNetClassAnnoKey, "network-resource-plugin-net-class-annotation-key", o.PodLevelNetClassAnnoKey, "The annotation key of pod-level net class") fs.StringVar(&o.PodLevelNetAttributesAnnoKeys, "network-resource-plugin-net-attributes-keys", @@ -98,6 +114,10 @@ func (o *NetworkOptions) ApplyTo(conf *qrmconfig.NetworkQRMPluginConfig) error { conf.NetClass.SharedCores = o.NetClass.SharedCores conf.NetClass.DedicatedCores = o.NetClass.DedicatedCores conf.NetClass.SystemCores = o.NetClass.SystemCores + conf.ReservedBandwidth = o.ReservedBandwidth + conf.EgressCapacityRate = o.EgressCapacityRate + conf.IngressCapacityRate = o.IngressCapacityRate + conf.SkipNetworkStateCorruption = o.SkipNetworkStateCorruption conf.PodLevelNetClassAnnoKey = o.PodLevelNetClassAnnoKey conf.PodLevelNetAttributesAnnoKeys = o.PodLevelNetAttributesAnnoKeys conf.IPv4ResourceAllocationAnnotationKey = o.IPv4ResourceAllocationAnnotationKey diff --git a/pkg/agent/qrm-plugins/network/state/checkpoint.go b/pkg/agent/qrm-plugins/network/state/checkpoint.go new file mode 100644 index 000000000..f4a352820 --- /dev/null +++ b/pkg/agent/qrm-plugins/network/state/checkpoint.go @@ -0,0 +1,62 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package state + +import ( + "encoding/json" + + "k8s.io/kubernetes/pkg/kubelet/checkpointmanager" + "k8s.io/kubernetes/pkg/kubelet/checkpointmanager/checksum" +) + +var _ checkpointmanager.Checkpoint = &NetworkPluginCheckpoint{} + +type NetworkPluginCheckpoint struct { + PolicyName string `json:"policyName"` + MachineState NICMap `json:"machineState"` + PodEntries PodEntries `json:"pod_entries"` + Checksum checksum.Checksum `json:"checksum"` +} + +func NewNetworkPluginCheckpoint() *NetworkPluginCheckpoint { + return &NetworkPluginCheckpoint{ + PodEntries: make(PodEntries), + MachineState: make(NICMap), + } +} + +// MarshalCheckpoint returns marshaled checkpoint +func (cp *NetworkPluginCheckpoint) MarshalCheckpoint() ([]byte, error) { + // make sure checksum wasn't set before, so it doesn't affect output checksum + cp.Checksum = 0 + cp.Checksum = checksum.New(cp) + return json.Marshal(*cp) +} + +// UnmarshalCheckpoint tries to unmarshal passed bytes to checkpoint +func (cp *NetworkPluginCheckpoint) UnmarshalCheckpoint(blob []byte) error { + return json.Unmarshal(blob, cp) +} + +// VerifyChecksum verifies that current checksum of checkpoint is valid +func (cp *NetworkPluginCheckpoint) VerifyChecksum() error { + ck := cp.Checksum + cp.Checksum = 0 + err := ck.Verify(cp) + cp.Checksum = ck + return err +} diff --git a/pkg/agent/qrm-plugins/network/state/state.go b/pkg/agent/qrm-plugins/network/state/state.go new file mode 100644 index 000000000..ac5554db0 --- /dev/null +++ b/pkg/agent/qrm-plugins/network/state/state.go @@ -0,0 +1,305 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package state + +import ( + "encoding/json" + "fmt" + + info "github.com/google/cadvisor/info/v1" + pluginapi "k8s.io/kubelet/pkg/apis/resourceplugin/v1alpha1" + + "github.com/kubewharf/katalyst-core/pkg/util/general" + "github.com/kubewharf/katalyst-core/pkg/util/machine" +) + +type AllocationInfo struct { + PodUid string `json:"pod_uid,omitempty"` + PodNamespace string `json:"pod_namespace,omitempty"` + PodName string `json:"pod_name,omitempty"` + ContainerName string `json:"container_name,omitempty"` + ContainerType string `json:"container_type,omitempty"` + ContainerIndex uint64 `json:"container_index,omitempty"` + RampUp bool `json:"ramp_up,omitempty"` + PodRole string `json:"pod_role,omitempty"` + PodType string `json:"pod_type,omitempty"` + Egress uint32 `json:"egress"` + Ingress uint32 `json:"ingress"` + IfName string `json:"if_name"` // we do not support cross-nic bandwidth + NumaNodes machine.CPUSet `json:"numa_node"` // associated numa nodes of the socket connecting to the selected NIC + + Labels map[string]string `json:"labels"` + Annotations map[string]string `json:"annotations"` +} + +type ContainerEntries map[string]*AllocationInfo // Keyed by container name +type PodEntries map[string]ContainerEntries // Keyed by pod UID + +// NICState indicates the status of a NIC, including the capacity/reservation/allocation (in Mbps) +type NICState struct { + EgressState BandwidthInfo `json:"egress_state"` + IngressState BandwidthInfo `json:"ingress_state"` + PodEntries PodEntries `json:"pod_entries"` +} + +type BandwidthInfo struct { + // Per K8s definition: allocatable = capacity - reserved, free = allocatable - allocated + // All rates are in unit of Mbps + + // Actual line speed of a NIC. E.g. a 25Gbps NIC's max bandwidth is around 23.5Gbps + // It's configurable. Its value = NIC line speed x configured CapacityRate + Capacity uint32 + // Reserved bandwidth on this NIC (e.g. for system components or high priority tasks) + // For the sake of safety, we generally keep an overflow buffer and do not allocate all bandwidth to tasks + // Thus, both reservations should be set slightly larger than the actual required amount + SysReservation uint32 + Reservation uint32 + Allocatable uint32 + Allocated uint32 + Free uint32 +} + +type NICMap map[string]*NICState // keyed by NIC name i.e. eth0 + +func (ai *AllocationInfo) String() string { + if ai == nil { + return "" + } + + contentBytes, err := json.Marshal(ai) + if err != nil { + general.LoggerWithPrefix("AllocationInfo.String", general.LoggingPKGFull).Errorf("marshal AllocationInfo failed with error: %v", err) + return "" + } + return string(contentBytes) +} + +func (ai *AllocationInfo) Clone() *AllocationInfo { + if ai == nil { + return nil + } + + clone := &AllocationInfo{ + PodUid: ai.PodUid, + PodNamespace: ai.PodNamespace, + PodName: ai.PodName, + ContainerName: ai.ContainerName, + ContainerType: ai.ContainerType, + ContainerIndex: ai.ContainerIndex, + RampUp: ai.RampUp, + PodRole: ai.PodRole, + PodType: ai.PodType, + Egress: ai.Egress, + Ingress: ai.Ingress, + IfName: ai.IfName, + NumaNodes: ai.NumaNodes.Clone(), + Labels: general.DeepCopyMap(ai.Labels), + Annotations: general.DeepCopyMap(ai.Annotations), + } + + return clone +} + +// CheckMainContainer returns true if the AllocationInfo is for main container +func (ai *AllocationInfo) CheckMainContainer() bool { + return ai.ContainerType == pluginapi.ContainerType_MAIN.String() +} + +// CheckSideCar returns true if the AllocationInfo is for side-car container +func (ai *AllocationInfo) CheckSideCar() bool { + return ai.ContainerType == pluginapi.ContainerType_SIDECAR.String() +} + +func (pe PodEntries) Clone() PodEntries { + clone := make(PodEntries) + for podUID, containerEntries := range pe { + clone[podUID] = make(ContainerEntries) + for containerName, allocationInfo := range containerEntries { + clone[podUID][containerName] = allocationInfo.Clone() + } + } + return clone +} + +func (pe PodEntries) String() string { + if pe == nil { + return "" + } + + contentBytes, err := json.Marshal(pe) + if err != nil { + general.LoggerWithPrefix("PodEntries.String", general.LoggingPKGFull).Errorf("marshal PodEntries failed with error: %v", err) + return "" + } + return string(contentBytes) +} + +// GetMainContainerAllocation returns AllocationInfo that belongs +// the main container for this pod +func (pe PodEntries) GetMainContainerAllocation(podUID string) (*AllocationInfo, bool) { + for _, allocationInfo := range pe[podUID] { + if allocationInfo.CheckMainContainer() { + return allocationInfo, true + } + } + return nil, false +} + +func (ns *NICState) String() string { + if ns == nil { + return "" + } + + contentBytes, err := json.Marshal(ns) + if err != nil { + general.LoggerWithPrefix("NICState.String", general.LoggingPKGFull).Errorf("marshal NICState failed with error: %v", err) + return "" + } + return string(contentBytes) +} + +func (ns *NICState) Clone() *NICState { + if ns == nil { + return nil + } + + return &NICState{ + EgressState: BandwidthInfo{ + Capacity: ns.EgressState.Capacity, + SysReservation: ns.EgressState.SysReservation, + Reservation: ns.EgressState.Reservation, + Allocatable: ns.EgressState.Allocatable, + Allocated: ns.EgressState.Allocated, + Free: ns.EgressState.Free, + }, + IngressState: BandwidthInfo{ + Capacity: ns.IngressState.Capacity, + SysReservation: ns.IngressState.SysReservation, + Reservation: ns.IngressState.Reservation, + Allocatable: ns.IngressState.Allocatable, + Allocated: ns.IngressState.Allocated, + Free: ns.IngressState.Free, + }, + PodEntries: ns.PodEntries.Clone(), + } +} + +// SetAllocationInfo adds a new AllocationInfo (for pod/container pairs) into the given NICState +func (ns *NICState) SetAllocationInfo(podUID string, containerName string, allocationInfo *AllocationInfo) { + if ns == nil { + return + } + + if allocationInfo == nil { + general.LoggerWithPrefix("NICState.SetAllocationInfo", general.LoggingPKGFull).Errorf("passed allocationInfo is nil") + return + } + + if ns.PodEntries == nil { + ns.PodEntries = make(PodEntries) + } + + if _, ok := ns.PodEntries[podUID]; !ok { + ns.PodEntries[podUID] = make(ContainerEntries) + } + + ns.PodEntries[podUID][containerName] = allocationInfo.Clone() +} + +func (nm NICMap) Clone() NICMap { + clone := make(NICMap) + for ifname, ns := range nm { + clone[ifname] = ns.Clone() + } + return clone +} + +// EgressBandwidthPerNIC is a helper function to parse egress bandwidth per NIC +func (nm NICMap) EgressBandwidthPerNIC() (uint32, error) { + if len(nm) == 0 { + return 0, fmt.Errorf("getEgressBandwidthPerNICFromMachineState got nil nicMap") + } + + for _, nicState := range nm { + if nicState != nil { + return nicState.EgressState.Allocatable, nil + } + } + + return 0, fmt.Errorf("getEgressBandwidthPerNICFromMachineState doesn't get valid nicState") +} + +// IngressBandwidthPerNIC is a helper function to parse egress bandwidth per NIC +func (nm NICMap) IngressBandwidthPerNIC() (uint32, error) { + if len(nm) == 0 { + return 0, fmt.Errorf("getIngressBandwidthPerNICFromMachineState got nil nicMap") + } + + for _, nicState := range nm { + if nicState != nil { + return nicState.IngressState.Allocatable, nil + } + } + + return 0, fmt.Errorf("getIngressBandwidthPerNICFromMachineState doesn't get valid nicState") +} + +func (nm NICMap) String() string { + if nm == nil { + return "" + } + + contentBytes, err := json.Marshal(nm) + if err != nil { + general.LoggerWithPrefix("NICMap.String", general.LoggingPKGFull).Errorf("marshal NICMap failed with error: %v", err) + return "" + } + return string(contentBytes) +} + +// reader is used to get information from local states +type reader interface { + GetMachineState() NICMap + GetPodEntries() PodEntries + GetAllocationInfo(podUID, containerName string) *AllocationInfo +} + +// writer is used to store information into local states, +// and it also provides functionality to maintain the local files +type writer interface { + SetMachineState(nicMap NICMap) + SetPodEntries(podEntries PodEntries) + SetAllocationInfo(podUID, containerName string, allocationInfo *AllocationInfo) + + Delete(podUID, containerName string) + ClearState() +} + +// ReadonlyState interface only provides methods for tracking pod assignments +type ReadonlyState interface { + reader + + GetMachineInfo() *info.MachineInfo + GetEnabledNICs() []machine.InterfaceInfo + GetReservedBandwidth() map[string]uint32 +} + +// State interface provides methods for tracking and setting pod assignments +type State interface { + writer + ReadonlyState +} diff --git a/pkg/agent/qrm-plugins/network/state/state_checkpoint.go b/pkg/agent/qrm-plugins/network/state/state_checkpoint.go new file mode 100644 index 000000000..fe08cd0c0 --- /dev/null +++ b/pkg/agent/qrm-plugins/network/state/state_checkpoint.go @@ -0,0 +1,250 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package state + +import ( + "fmt" + "path" + "reflect" + "sync" + + info "github.com/google/cadvisor/info/v1" + + "github.com/kubewharf/katalyst-core/pkg/config/agent/qrm" + "github.com/kubewharf/katalyst-core/pkg/util/general" + "github.com/kubewharf/katalyst-core/pkg/util/machine" + + "k8s.io/kubernetes/pkg/kubelet/checkpointmanager" + "k8s.io/kubernetes/pkg/kubelet/checkpointmanager/errors" +) + +var _ State = &stateCheckpoint{} +var generalLog general.Logger = general.LoggerWithPrefix("network_plugin", general.LoggingPKGFull) + +// stateCheckpoint is an in-memory implementation of State; +// everytime we want to read or write states, those requests will always +// go to in-memory State, and then go to disk State, i.e. in write-back mode +type stateCheckpoint struct { + sync.RWMutex + cache State + policyName string + checkpointManager checkpointmanager.CheckpointManager + checkpointName string + // when we add new properties to checkpoint, + // it will cause checkpoint corruption and we should skip it + skipStateCorruption bool +} + +func NewCheckpointState(conf *qrm.QRMPluginsConfiguration, stateDir, checkpointName, policyName string, + machineInfo *info.MachineInfo, nics []machine.InterfaceInfo, reservedBandwidth map[string]uint32, + skipStateCorruption bool) (State, error) { + + checkpointManager, err := checkpointmanager.NewCheckpointManager(stateDir) + if err != nil { + return nil, fmt.Errorf("failed to initialize checkpoint manager: %v", err) + } + + defaultCache, err := NewNetworkPluginState(conf, machineInfo, nics, reservedBandwidth) + if err != nil { + return nil, fmt.Errorf("NewNetworkPluginState failed with error: %v", err) + } + + stateCheckpoint := &stateCheckpoint{ + cache: defaultCache, + policyName: policyName, + checkpointManager: checkpointManager, + checkpointName: checkpointName, + skipStateCorruption: skipStateCorruption, + } + + if err := stateCheckpoint.restoreState(conf, nics, reservedBandwidth); err != nil { + return nil, fmt.Errorf("could not restore state from checkpoint: %v, please drain this node and delete the network plugin checkpoint file %q before restarting Kubelet", + err, path.Join(stateDir, checkpointName)) + } + + return stateCheckpoint, nil +} + +func (sc *stateCheckpoint) restoreState(conf *qrm.QRMPluginsConfiguration, nics []machine.InterfaceInfo, reservedBandwidth map[string]uint32) error { + sc.Lock() + defer sc.Unlock() + var err error + var foundAndSkippedStateCorruption bool + + checkpoint := NewNetworkPluginCheckpoint() + if err = sc.checkpointManager.GetCheckpoint(sc.checkpointName, checkpoint); err != nil { + if err == errors.ErrCheckpointNotFound { + return sc.storeState() + } else if err == errors.ErrCorruptCheckpoint { + if !sc.skipStateCorruption { + return err + } + + foundAndSkippedStateCorruption = true + generalLog.Infof("restore checkpoint failed with err: %s, but we skip it", err) + } else { + return err + } + } + + if sc.policyName != checkpoint.PolicyName && !sc.skipStateCorruption { + return fmt.Errorf("[network_plugin] configured policy %q differs from state checkpoint policy %q", sc.policyName, checkpoint.PolicyName) + } + + generatedNetworkState, err := GenerateMachineStateFromPodEntries(conf, nics, checkpoint.PodEntries, reservedBandwidth) + if err != nil { + return fmt.Errorf("GenerateMachineStateFromPodEntries failed with error: %v", err) + } + + sc.cache.SetMachineState(generatedNetworkState) + sc.cache.SetPodEntries(checkpoint.PodEntries) + + if !reflect.DeepEqual(generatedNetworkState, checkpoint.MachineState) { + generalLog.Warningf("machine state changed: "+ + "generatedNetworkState: %s; checkpointMachineState: %s", + generatedNetworkState.String(), checkpoint.MachineState.String()) + + err = sc.storeState() + if err != nil { + return fmt.Errorf("storeState when machine state changed failed with error: %v", err) + } + } + + if foundAndSkippedStateCorruption { + generalLog.Infof("found and skipped state corruption, we shoud store to rectify the checksum") + + err = sc.storeState() + if err != nil { + return fmt.Errorf("storeState failed with error: %v", err) + } + } + + generalLog.InfoS("state checkpoint: restored state from checkpoint") + + return nil +} + +func (sc *stateCheckpoint) storeState() error { + checkpoint := NewNetworkPluginCheckpoint() + checkpoint.PolicyName = sc.policyName + checkpoint.MachineState = sc.cache.GetMachineState() + checkpoint.PodEntries = sc.cache.GetPodEntries() + + err := sc.checkpointManager.CreateCheckpoint(sc.checkpointName, checkpoint) + if err != nil { + generalLog.ErrorS(err, "could not save checkpoint") + return err + } + return nil +} + +func (sc *stateCheckpoint) GetReservedBandwidth() map[string]uint32 { + sc.RLock() + defer sc.RUnlock() + + return sc.cache.GetReservedBandwidth() +} + +func (sc *stateCheckpoint) GetMachineInfo() *info.MachineInfo { + sc.RLock() + defer sc.RUnlock() + + return sc.cache.GetMachineInfo() +} + +func (sc *stateCheckpoint) GetEnabledNICs() []machine.InterfaceInfo { + sc.RLock() + defer sc.RUnlock() + + return sc.cache.GetEnabledNICs() +} + +func (sc *stateCheckpoint) GetMachineState() NICMap { + sc.RLock() + defer sc.RUnlock() + + return sc.cache.GetMachineState() +} + +func (sc *stateCheckpoint) GetAllocationInfo(podUID, containerName string) *AllocationInfo { + sc.RLock() + defer sc.RUnlock() + + return sc.cache.GetAllocationInfo(podUID, containerName) +} + +func (sc *stateCheckpoint) GetPodEntries() PodEntries { + sc.RLock() + defer sc.RUnlock() + + return sc.cache.GetPodEntries() +} + +func (sc *stateCheckpoint) SetMachineState(nicMap NICMap) { + sc.Lock() + defer sc.Unlock() + + sc.cache.SetMachineState(nicMap) + err := sc.storeState() + if err != nil { + generalLog.ErrorS(err, "store machineState to checkpoint error") + } +} + +func (sc *stateCheckpoint) SetAllocationInfo(podUID, containerName string, allocationInfo *AllocationInfo) { + sc.Lock() + defer sc.Unlock() + + sc.cache.SetAllocationInfo(podUID, containerName, allocationInfo) + err := sc.storeState() + if err != nil { + generalLog.ErrorS(err, "store allocationInfo to checkpoint error") + } +} + +func (sc *stateCheckpoint) SetPodEntries(podEntries PodEntries) { + sc.Lock() + defer sc.Unlock() + + sc.cache.SetPodEntries(podEntries) + err := sc.storeState() + if err != nil { + generalLog.ErrorS(err, "store pod entries to checkpoint error", "err") + } +} + +func (sc *stateCheckpoint) Delete(podUID, containerName string) { + sc.Lock() + defer sc.Unlock() + + sc.cache.Delete(podUID, containerName) + err := sc.storeState() + if err != nil { + generalLog.ErrorS(err, "store state after delete operation to checkpoint error") + } +} + +func (sc *stateCheckpoint) ClearState() { + sc.Lock() + defer sc.Unlock() + + sc.cache.ClearState() + err := sc.storeState() + if err != nil { + generalLog.ErrorS(err, "store state after clear operation to checkpoint error") + } +} diff --git a/pkg/agent/qrm-plugins/network/state/state_net.go b/pkg/agent/qrm-plugins/network/state/state_net.go new file mode 100644 index 000000000..995408a72 --- /dev/null +++ b/pkg/agent/qrm-plugins/network/state/state_net.go @@ -0,0 +1,172 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package state + +import ( + "fmt" + "sync" + + info "github.com/google/cadvisor/info/v1" + + "github.com/kubewharf/katalyst-core/pkg/config/agent/qrm" + "github.com/kubewharf/katalyst-core/pkg/util/machine" +) + +// networkPluginState is an in-memory implementation of State; +// everytime we want to read or write states, those requests will always +// go to in-memory State, and then go to disk State, i.e. in write-back mode +type networkPluginState struct { + sync.RWMutex + + qrmConf *qrm.QRMPluginsConfiguration + machineInfo *info.MachineInfo + nics []machine.InterfaceInfo + reservedBandwidth map[string]uint32 + + machineState NICMap + podEntries PodEntries +} + +var _ State = &networkPluginState{} + +func NewNetworkPluginState(conf *qrm.QRMPluginsConfiguration, machineInfo *info.MachineInfo, nics []machine.InterfaceInfo, reservedBandwidth map[string]uint32) (State, error) { + generalLog.InfoS("initializing new network plugin in-memory state store") + + defaultMachineState, err := GenerateMachineState(conf, nics, reservedBandwidth) + if err != nil { + return nil, fmt.Errorf("GenerateMachineState failed with error: %v", err) + } + + return &networkPluginState{ + qrmConf: conf, + machineState: defaultMachineState, + machineInfo: machineInfo.Clone(), + reservedBandwidth: reservedBandwidth, + podEntries: make(PodEntries), + }, nil +} + +func (s *networkPluginState) GetReservedBandwidth() map[string]uint32 { + s.RLock() + defer s.RUnlock() + + clonedReservedBandwidth := make(map[string]uint32) + for iface, bandwidth := range s.reservedBandwidth { + clonedReservedBandwidth[iface] = bandwidth + } + + return clonedReservedBandwidth +} + +func (s *networkPluginState) GetMachineState() NICMap { + s.RLock() + defer s.RUnlock() + + return s.machineState.Clone() +} + +func (s *networkPluginState) GetMachineInfo() *info.MachineInfo { + s.RLock() + defer s.RUnlock() + + return s.machineInfo.Clone() +} + +func (s *networkPluginState) GetEnabledNICs() []machine.InterfaceInfo { + s.RLock() + defer s.RUnlock() + + clonedNics := make([]machine.InterfaceInfo, len(s.nics)) + copy(clonedNics, s.nics) + + return clonedNics +} + +func (s *networkPluginState) GetAllocationInfo(podUID, containerName string) *AllocationInfo { + s.RLock() + defer s.RUnlock() + + if res, ok := s.podEntries[podUID][containerName]; ok { + return res.Clone() + } + return nil +} + +func (s *networkPluginState) GetPodEntries() PodEntries { + s.RLock() + defer s.RUnlock() + + return s.podEntries.Clone() +} + +func (s *networkPluginState) SetMachineState(nicMap NICMap) { + s.Lock() + defer s.Unlock() + + s.machineState = nicMap.Clone() + generalLog.InfoS("updated network plugin machine state", + "NICMap", nicMap.String()) +} + +func (s *networkPluginState) SetAllocationInfo(podUID, containerName string, allocationInfo *AllocationInfo) { + s.Lock() + defer s.Unlock() + + if _, ok := s.podEntries[podUID]; !ok { + s.podEntries[podUID] = make(ContainerEntries) + } + + s.podEntries[podUID][containerName] = allocationInfo.Clone() + generalLog.InfoS("updated network plugin pod resource entries", + "podUID", podUID, + "containerName", containerName, + "allocationInfo", allocationInfo.String()) +} + +func (s *networkPluginState) SetPodEntries(podEntries PodEntries) { + s.Lock() + defer s.Unlock() + + s.podEntries = podEntries.Clone() + generalLog.InfoS("updated network plugin pod resource entries", + "podEntries", podEntries.String()) +} + +func (s *networkPluginState) Delete(podUID, containerName string) { + s.Lock() + defer s.Unlock() + + if _, ok := s.podEntries[podUID]; !ok { + return + } + + delete(s.podEntries[podUID], containerName) + if len(s.podEntries[podUID]) == 0 { + delete(s.podEntries, podUID) + } + generalLog.InfoS("deleted container entry", "podUID", podUID, "containerName", containerName) +} + +func (s *networkPluginState) ClearState() { + s.Lock() + defer s.Unlock() + + s.machineState, _ = GenerateMachineState(s.qrmConf, s.nics, s.reservedBandwidth) + s.podEntries = make(PodEntries) + + generalLog.InfoS("cleared state") +} diff --git a/pkg/agent/qrm-plugins/network/state/util.go b/pkg/agent/qrm-plugins/network/state/util.go new file mode 100644 index 000000000..4631b446d --- /dev/null +++ b/pkg/agent/qrm-plugins/network/state/util.go @@ -0,0 +1,118 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package state + +import ( + "fmt" + + "github.com/kubewharf/katalyst-core/pkg/config/agent/qrm" + "github.com/kubewharf/katalyst-core/pkg/util/general" + "github.com/kubewharf/katalyst-core/pkg/util/machine" +) + +// GenerateMachineState returns NICResourcesMap based on +// machine info and reserved resources +func GenerateMachineState(conf *qrm.QRMPluginsConfiguration, nics []machine.InterfaceInfo, reservation map[string]uint32) (NICMap, error) { + if len(nics) == 0 { + return nil, fmt.Errorf("GenerateMachineState got invalid nics") + } + + defaultMachineState := make(NICMap) + for _, iface := range nics { + reservedBandwidth := reservation[iface.Iface] + egressCapacity := uint32(float32(iface.Speed) * conf.EgressCapacityRate) + ingressCapacity := uint32(float32(iface.Speed) * conf.IngressCapacityRate) + // we do not differentiate egress reservation and ingress reservation for now. That is, they are supposed to be the same. + if reservedBandwidth >= egressCapacity || reservedBandwidth >= ingressCapacity { + return nil, fmt.Errorf("invalid bandwidth reservation: %d on NIC: %s with capacity: [%d/%d]", reservedBandwidth, iface.Iface, egressCapacity, ingressCapacity) + } + + allocatableEgress := egressCapacity - reservedBandwidth + allocatableIngress := ingressCapacity - reservedBandwidth + + defaultMachineState[iface.Iface] = &NICState{ + EgressState: BandwidthInfo{ + Capacity: egressCapacity, + SysReservation: 0, + Reservation: reservedBandwidth, + Allocatable: allocatableEgress, + Allocated: 0, + Free: allocatableEgress, + }, + IngressState: BandwidthInfo{ + Capacity: ingressCapacity, + SysReservation: 0, + Reservation: reservedBandwidth, + Allocatable: allocatableIngress, + Allocated: 0, + Free: allocatableIngress, + }, + PodEntries: make(PodEntries), + } + } + + return defaultMachineState, nil +} + +// GenerateMachineStateFromPodEntries returns NICMap for bandwidth based on +// machine info and reserved resources along with existed pod entries +func GenerateMachineStateFromPodEntries(conf *qrm.QRMPluginsConfiguration, nics []machine.InterfaceInfo, + podEntries PodEntries, reservation map[string]uint32) (NICMap, error) { + machineState, err := GenerateMachineState(conf, nics, reservation) + if err != nil { + return nil, fmt.Errorf("GenerateMachineState failed with error: %v", err) + } + + for nicName, nicState := range machineState { + var allocatedEgressOnNIC, allocatedIngressOnNIC uint32 = 0, 0 + + for podUID, containerEntries := range podEntries { + for containerName, allocationInfo := range containerEntries { + if containerName != "" && allocationInfo != nil && allocationInfo.IfName == nicName { + allocatedEgressOnNIC += allocationInfo.Egress + allocatedIngressOnNIC += allocationInfo.Ingress + + nicState.SetAllocationInfo(podUID, containerName, allocationInfo) + } + } + } + + nicState.EgressState.Allocated = allocatedEgressOnNIC + nicState.IngressState.Allocated = allocatedIngressOnNIC + + generalLog := general.LoggerWithPrefix("GenerateBandwidthStateFromPodEntries", general.LoggingPKGFull) + if nicState.EgressState.Allocatable < nicState.EgressState.Allocated || nicState.IngressState.Allocatable < nicState.EgressState.Allocated { + generalLog.Warningf("invalid allocated egress bandwidth: %d on NIC: %s"+ + " with allocatable bandwidth size: %d, total egress capacity size: %d, reserved bandwidth size: %d", + nicState.EgressState.Allocated, nicName, nicState.EgressState.Allocatable, nicState.EgressState.Capacity, nicState.EgressState.Reservation) + nicState.EgressState.Allocatable = nicState.EgressState.Allocated + } + nicState.EgressState.Free = nicState.EgressState.Allocatable - nicState.EgressState.Allocated + + if nicState.IngressState.Allocatable < nicState.IngressState.Allocated { + generalLog.Warningf("invalid allocated ingress bandwidth: %d on NIC: %s"+ + " with allocatable bandwidth size: %d, total ingress capacity size: %d, reserved bandwidth size: %d", + nicState.IngressState.Allocated, nicName, nicState.IngressState.Allocatable, nicState.IngressState.Capacity, nicState.EgressState.Reservation) + nicState.IngressState.Allocatable = nicState.IngressState.Allocated + } + nicState.IngressState.Free = nicState.IngressState.Allocatable - nicState.IngressState.Allocated + + machineState[nicName] = nicState + } + + return machineState, nil +} diff --git a/pkg/agent/qrm-plugins/network/staticpolicy/policy.go b/pkg/agent/qrm-plugins/network/staticpolicy/policy.go index b6737835b..e11e64162 100644 --- a/pkg/agent/qrm-plugins/network/staticpolicy/policy.go +++ b/pkg/agent/qrm-plugins/network/staticpolicy/policy.go @@ -19,6 +19,8 @@ package staticpolicy import ( "context" "fmt" + "sort" + "strconv" "strings" "sync" "time" @@ -27,12 +29,16 @@ import ( pluginapi "k8s.io/kubelet/pkg/apis/resourceplugin/v1alpha1" maputil "k8s.io/kubernetes/pkg/util/maps" + apinode "github.com/kubewharf/katalyst-api/pkg/apis/node/v1alpha1" apiconsts "github.com/kubewharf/katalyst-api/pkg/consts" + "github.com/kubewharf/katalyst-api/pkg/plugins/skeleton" "github.com/kubewharf/katalyst-core/cmd/katalyst-agent/app/agent" + "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/network/state" "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/util" "github.com/kubewharf/katalyst-core/pkg/config" agentconfig "github.com/kubewharf/katalyst-core/pkg/config/agent" + "github.com/kubewharf/katalyst-core/pkg/config/agent/qrm" "github.com/kubewharf/katalyst-core/pkg/config/generic" "github.com/kubewharf/katalyst-core/pkg/metaserver" "github.com/kubewharf/katalyst-core/pkg/metrics" @@ -48,6 +54,8 @@ const ( // NetworkResourcePluginPolicyNameStatic is the policy name of static network resource plugin NetworkResourcePluginPolicyNameStatic = "static" + NetworkPluginStateFileName = "network_plugin_state" + // IPsSeparator is used to split merged IPs string IPsSeparator = "," ) @@ -60,10 +68,12 @@ type StaticPolicy struct { stopCh chan struct{} started bool qosConfig *generic.QoSConfiguration + qrmConfig *qrm.QRMPluginsConfiguration emitter metrics.MetricEmitter metaServer *metaserver.MetaServer agentCtx *agent.GenericContext nics []machine.InterfaceInfo + state state.State CgroupV2Env bool qosLevelToNetClassMap map[string]uint32 @@ -86,12 +96,37 @@ func NewStaticPolicy(agentCtx *agent.GenericContext, conf *config.Configuration, Val: NetworkResourcePluginPolicyNameStatic, }) + // it is incorrect to reserve bandwidth on those diabled NICs. + // we only count active NICs as available network devices and allocate bandwidth on them + enabledNICs := filterNICsByAvailability(agentCtx.KatalystMachineInfo.ExtraNetworkInfo.Interface, nil, nil) + + // the NICs should be in order by interface name so that we can adopt specific policies for bandwidth reservation or allocation + // e.g. reserve bandwidth for high-priority tasks on the first NIC + sort.SliceStable(enabledNICs, func(i, j int) bool { + return enabledNICs[i].Iface < enabledNICs[j].Iface + }) + + // we only support one spreading policy for now: reserve the bandwidth on the first NIC. + // TODO: make the reservation policy configurable + reservation, err := getReservedBandwidth(enabledNICs, conf.ReservedBandwidth, FirstNIC) + if err != nil { + return false, agent.ComponentStub{}, fmt.Errorf("getReservedBandwidth failed with error: %v", err) + } + + stateImpl, err := state.NewCheckpointState(conf.QRMPluginsConfiguration, conf.GenericQRMPluginConfiguration.StateFileDirectory, NetworkPluginStateFileName, + NetworkResourcePluginPolicyNameStatic, agentCtx.MachineInfo, enabledNICs, reservation, conf.SkipNetworkStateCorruption) + if err != nil { + return false, agent.ComponentStub{}, fmt.Errorf("NewCheckpointState failed with error: %v", err) + } + policyImplement := &StaticPolicy{ - nics: agentCtx.KatalystMachineInfo.ExtraNetworkInfo.Interface, + nics: enabledNICs, qosConfig: conf.QoSConfiguration, + qrmConfig: conf.QRMPluginsConfiguration, emitter: wrappedEmitter, metaServer: agentCtx.MetaServer, agentCtx: agentCtx, + state: stateImpl, stopCh: make(chan struct{}), name: fmt.Sprintf("%s_%s", agentName, NetworkResourcePluginPolicyNameStatic), qosLevelToNetClassMap: make(map[string]uint32), @@ -198,76 +233,6 @@ func (p *StaticPolicy) ResourceName() string { return string(apiconsts.ResourceNetBandwidth) } -func (p *StaticPolicy) calculateHints(req *pluginapi.ResourceRequest) (map[string]*pluginapi.ListOfTopologyHints, error) { - hints := map[string]*pluginapi.ListOfTopologyHints{ - p.ResourceName(): { - Hints: []*pluginapi.TopologyHint{}, - }, - } - - filteredNICs, err := filterAvailableNICsByReq(p.nics, req, p.agentCtx) - if err != nil { - return nil, fmt.Errorf("filterAvailableNICsByReq failed with error: %v", err) - } - - if len(filteredNICs) == 0 { - general.InfoS("filteredNICs is empty", - "podNamespace", req.PodNamespace, - "podName", req.PodName, - "containerName", req.ContainerName) - return hints, nil - } - - numasToHintMap := make(map[string]*pluginapi.TopologyHint) - for _, nic := range filteredNICs { - siblingNUMAs, err := machine.GetSiblingNUMAs(nic.NumaNode, p.agentCtx.CPUTopology) - if err != nil { - return nil, fmt.Errorf("get siblingNUMAs for nic: %s failed with error: %v", nic.Iface, err) - } - - // TODO: should be refined when involving bandwidth calculation - nicPreference, err := checkNICPreferenceOfReq(nic, req.Annotations) - if err != nil { - return nil, fmt.Errorf("checkNICPreferenceOfReq for nic: %s failed with error: %v", nic.Iface, err) - } - - siblingNUMAsStr := siblingNUMAs.String() - if numasToHintMap[siblingNUMAsStr] == nil { - numasToHintMap[siblingNUMAsStr] = &pluginapi.TopologyHint{ - Nodes: siblingNUMAs.ToSliceUInt64(), - } - } - - if nicPreference { - general.InfoS("set nic preferred to true", - "podNamespace", req.PodNamespace, - "podName", req.PodName, - "containerName", req.ContainerName, - "nic", nic.Iface) - numasToHintMap[siblingNUMAsStr].Preferred = nicPreference - } - } - - for _, hint := range numasToHintMap { - hints[p.ResourceName()].Hints = append(hints[p.ResourceName()].Hints, hint) - } - - if !isReqAffinityRestricted(req.Annotations) && !isReqNamespaceRestricted(req.Annotations) { - general.InfoS("add all NUMAs to hint to avoid affinity error", - "podNamespace", req.PodNamespace, - "podName", req.PodName, - "containerName", req.ContainerName, - req.Annotations[apiconsts.PodAnnotationNetworkEnhancementAffinityRestricted], - apiconsts.PodAnnotationNetworkEnhancementAffinityRestrictedTrue) - - hints[p.ResourceName()].Hints = append(hints[p.ResourceName()].Hints, &pluginapi.TopologyHint{ - Nodes: p.agentCtx.CPUDetails.NUMANodes().ToSliceUInt64(), - }) - } - - return hints, nil -} - // GetTopologyHints returns hints of corresponding resources func (p *StaticPolicy) GetTopologyHints(_ context.Context, req *pluginapi.ResourceRequest) (resp *pluginapi.ResourceHintsResponse, err error) { @@ -283,13 +248,19 @@ func (p *StaticPolicy) GetTopologyHints(_ context.Context, return nil, err } + reqInt, err := util.GetQuantityFromResourceReq(req) + if err != nil { + return nil, fmt.Errorf("getReqQuantityFromResourceReq failed with error: %v", err) + } + general.InfoS("called", "podNamespace", req.PodNamespace, "podName", req.PodName, "containerName", req.ContainerName, "qosLevel", qosLevel, "resourceRequests", req.ResourceRequests, - "reqAnnotations", req.Annotations) + "reqAnnotations", req.Annotations, + "netBandwidthReq(Mbps)", reqInt) p.Lock() defer func() { @@ -323,31 +294,153 @@ func (p *StaticPolicy) RemovePod(_ context.Context, return nil, fmt.Errorf("RemovePod got nil req") } - if p.CgroupV2Env { - if err := p.removePod(req.PodUid); err != nil { - general.ErrorS(err, "remove pod failed with error", "podUID", req.PodUid) - return nil, err - } + p.Lock() + defer p.Unlock() + + if err := p.removePod(req.PodUid); err != nil { + general.ErrorS(err, "remove pod failed with error", "podUID", req.PodUid) + return nil, err } + return &pluginapi.RemovePodResponse{}, nil } // GetResourcesAllocation returns allocation results of corresponding resources func (p *StaticPolicy) GetResourcesAllocation(_ context.Context, _ *pluginapi.GetResourcesAllocationRequest) (*pluginapi.GetResourcesAllocationResponse, error) { + // no need to implement this function, because NeedReconcile is false return &pluginapi.GetResourcesAllocationResponse{}, nil } // GetTopologyAwareResources returns allocation results of corresponding resources as topology aware format func (p *StaticPolicy) GetTopologyAwareResources(_ context.Context, - _ *pluginapi.GetTopologyAwareResourcesRequest) (*pluginapi.GetTopologyAwareResourcesResponse, error) { - return &pluginapi.GetTopologyAwareResourcesResponse{}, nil + req *pluginapi.GetTopologyAwareResourcesRequest) (*pluginapi.GetTopologyAwareResourcesResponse, error) { + if req == nil { + return nil, fmt.Errorf("GetTopologyAwareResources got nil req") + } + + p.Lock() + defer p.Unlock() + + allocationInfo := p.state.GetAllocationInfo(req.PodUid, req.ContainerName) + if allocationInfo == nil { + return &pluginapi.GetTopologyAwareResourcesResponse{}, nil + } + + socket, err := p.getSocketIDByNIC(allocationInfo.IfName) + if err != nil { + return nil, fmt.Errorf("failed to find topologyNode for pod %s, container %s : %v", req.PodUid, req.ContainerName, err) + } + + nic := p.getNICByName(allocationInfo.IfName) + topologyAwareQuantityList := []*pluginapi.TopologyAwareQuantity{ + { + ResourceValue: float64(allocationInfo.Egress), + Node: uint64(socket), + Name: allocationInfo.IfName, + Type: string(apinode.TopologyTypeNIC), + TopologyLevel: pluginapi.TopologyLevel_SOCKET, + Annotations: map[string]string{ + apiconsts.ResourceAnnotationKeyResourceIdentifier: getResourceIdentifier(nic.NSName, allocationInfo.IfName), + }, + }, + } + resp := &pluginapi.GetTopologyAwareResourcesResponse{ + PodUid: allocationInfo.PodUid, + PodName: allocationInfo.PodName, + PodNamespace: allocationInfo.PodNamespace, + ContainerTopologyAwareResources: &pluginapi.ContainerTopologyAwareResources{ + ContainerName: allocationInfo.ContainerName, + }, + } + + if allocationInfo.CheckSideCar() { + resp.ContainerTopologyAwareResources.AllocatedResources = map[string]*pluginapi.TopologyAwareResource{ + string(apiconsts.ResourceNetBandwidth): { + IsNodeResource: true, + IsScalarResource: true, + AggregatedQuantity: 0, + OriginalAggregatedQuantity: 0, + TopologyAwareQuantityList: nil, + OriginalTopologyAwareQuantityList: nil, + }, + } + } else { + resp.ContainerTopologyAwareResources.AllocatedResources = map[string]*pluginapi.TopologyAwareResource{ + string(apiconsts.ResourceNetBandwidth): { + IsNodeResource: true, + IsScalarResource: true, + AggregatedQuantity: float64(allocationInfo.Egress), + OriginalAggregatedQuantity: float64(allocationInfo.Egress), + TopologyAwareQuantityList: topologyAwareQuantityList, + OriginalTopologyAwareQuantityList: topologyAwareQuantityList, + }, + } + } + + return resp, nil } // GetTopologyAwareAllocatableResources returns corresponding allocatable resources as topology aware format func (p *StaticPolicy) GetTopologyAwareAllocatableResources(_ context.Context, _ *pluginapi.GetTopologyAwareAllocatableResourcesRequest) (*pluginapi.GetTopologyAwareAllocatableResourcesResponse, error) { - return &pluginapi.GetTopologyAwareAllocatableResourcesResponse{}, nil + p.Lock() + defer p.Unlock() + + machineState := p.state.GetMachineState() + + topologyAwareAllocatableQuantityList := make([]*pluginapi.TopologyAwareQuantity, 0, len(machineState)) + topologyAwareCapacityQuantityList := make([]*pluginapi.TopologyAwareQuantity, 0, len(machineState)) + + var aggregatedAllocatableQuantity, aggregatedCapacityQuantity uint32 = 0, 0 + for _, iface := range p.nics { + nicState := machineState[iface.Iface] + if nicState == nil { + return nil, fmt.Errorf("nil nicState for NIC: %s", iface.Iface) + } + + topologyNode, err := p.getSocketIDByNIC(iface.Iface) + if err != nil { + return nil, fmt.Errorf("failed to find topologyNode: %v", err) + } + + resourceIdentifier := getResourceIdentifier(iface.NSName, iface.Iface) + topologyAwareAllocatableQuantityList = append(topologyAwareAllocatableQuantityList, &pluginapi.TopologyAwareQuantity{ + ResourceValue: float64(general.MinUInt32(nicState.EgressState.Allocatable, nicState.IngressState.Allocatable)), + Node: uint64(topologyNode), + Name: iface.Iface, + Type: string(apinode.TopologyTypeNIC), + TopologyLevel: pluginapi.TopologyLevel_SOCKET, + Annotations: map[string]string{ + apiconsts.ResourceAnnotationKeyResourceIdentifier: resourceIdentifier, + }, + }) + topologyAwareCapacityQuantityList = append(topologyAwareCapacityQuantityList, &pluginapi.TopologyAwareQuantity{ + ResourceValue: float64(general.MinUInt32(nicState.EgressState.Capacity, nicState.IngressState.Capacity)), + Node: uint64(topologyNode), + Name: iface.Iface, + Type: string(apinode.TopologyTypeNIC), + TopologyLevel: pluginapi.TopologyLevel_SOCKET, + Annotations: map[string]string{ + apiconsts.ResourceAnnotationKeyResourceIdentifier: resourceIdentifier, + }, + }) + aggregatedAllocatableQuantity += general.MinUInt32(nicState.EgressState.Allocatable, nicState.IngressState.Allocatable) + aggregatedCapacityQuantity += general.MinUInt32(nicState.EgressState.Capacity, nicState.IngressState.Capacity) + } + + return &pluginapi.GetTopologyAwareAllocatableResourcesResponse{ + AllocatableResources: map[string]*pluginapi.AllocatableTopologyAwareResource{ + string(apiconsts.ResourceNetBandwidth): { + IsNodeResource: true, + IsScalarResource: true, + AggregatedAllocatableQuantity: float64(aggregatedAllocatableQuantity), + TopologyAwareAllocatableQuantityList: topologyAwareAllocatableQuantityList, + AggregatedCapacityQuantity: float64(aggregatedCapacityQuantity), + TopologyAwareCapacityQuantityList: topologyAwareCapacityQuantityList, + }, + }, + }, nil } // GetResourcePluginOptions returns options to be communicated with Resource Manager @@ -360,38 +453,6 @@ func (p *StaticPolicy) GetResourcePluginOptions(context.Context, }, nil } -func (p *StaticPolicy) selectNICByReq(req *pluginapi.ResourceRequest) (machine.InterfaceInfo, error) { - filteredNICs, err := filterAvailableNICsByReq(p.nics, req, p.agentCtx) - if err != nil { - return machine.InterfaceInfo{}, fmt.Errorf("filterAvailableNICsByReq failed with error: %v", err) - } else if len(filteredNICs) == 0 { - return machine.InterfaceInfo{}, fmt.Errorf("filteredNICs is emptry") - } - - return getRandomNICs(filteredNICs), nil -} - -// TODO: fill resource allocation annotations with allocated bandwidth quantity -func (p *StaticPolicy) getResourceAllocationAnnotations(podAnnotations map[string]string, selectedNIC machine.InterfaceInfo) (map[string]string, error) { - netClsID, err := p.getNetClassID(podAnnotations, p.podLevelNetClassAnnoKey) - if err != nil { - return nil, fmt.Errorf("getNetClassID failed with error: %v", err) - } - - resourceAllocationAnnotations := map[string]string{ - p.ipv4ResourceAllocationAnnotationKey: strings.Join(selectedNIC.GetNICIPs(machine.IPVersionV4), IPsSeparator), - p.ipv6ResourceAllocationAnnotationKey: strings.Join(selectedNIC.GetNICIPs(machine.IPVersionV6), IPsSeparator), - p.netInterfaceNameResourceAllocationAnnotationKey: selectedNIC.Iface, - p.netClassIDResourceAllocationAnnotationKey: fmt.Sprintf("%d", netClsID), - } - - if len(selectedNIC.NSAbsolutePath) > 0 { - resourceAllocationAnnotations[p.netNSPathResourceAllocationAnnotationKey] = selectedNIC.NSAbsolutePath - } - - return resourceAllocationAnnotations, nil -} - // Allocate is called during pod admit so that the resource // plugin can allocate corresponding resource for the container // according to resource request @@ -413,13 +474,18 @@ func (p *StaticPolicy) Allocate(_ context.Context, return nil, err } + reqInt, err := util.GetQuantityFromResourceReq(req) + if err != nil { + return nil, fmt.Errorf("getReqQuantityFromResourceReq failed with error: %v", err) + } + general.InfoS("called", "podNamespace", req.PodNamespace, "podName", req.PodName, "containerName", req.ContainerName, "qosLevel", qosLevel, - "resourceRequests", req.ResourceRequests, - "reqAnnotations", req.Annotations) + "reqAnnotations", req.Annotations, + "netBandwidthReq(Mbps)", reqInt) p.Lock() defer func() { @@ -429,35 +495,137 @@ func (p *StaticPolicy) Allocate(_ context.Context, } }() + emptyResponse := &pluginapi.ResourceAllocationResponse{ + PodUid: req.PodUid, + PodNamespace: req.PodNamespace, + PodName: req.PodName, + ContainerName: req.ContainerName, + ContainerType: req.ContainerType, + ContainerIndex: req.ContainerIndex, + PodRole: req.PodRole, + PodType: req.PodType, + ResourceName: p.ResourceName(), + Labels: general.DeepCopyMap(req.Labels), + Annotations: general.DeepCopyMap(req.Annotations), + } + // currently, not to deal with init containers if req.ContainerType == pluginapi.ContainerType_INIT { - return &pluginapi.ResourceAllocationResponse{ - PodUid: req.PodUid, - PodNamespace: req.PodNamespace, - PodName: req.PodName, - ContainerName: req.ContainerName, - ContainerType: req.ContainerType, - ContainerIndex: req.ContainerIndex, - PodRole: req.PodRole, - PodType: req.PodType, - ResourceName: p.ResourceName(), - Labels: general.DeepCopyMap(req.Labels), - Annotations: general.DeepCopyMap(req.Annotations), - }, nil + return emptyResponse, nil } else if req.ContainerType == pluginapi.ContainerType_SIDECAR { - // not to deal with sidcars, and return a trivial allocationResult to avoid re-allocating - return packAllocationResponse(req, p.ResourceName(), 0, nil) + // not to deal with sidecars, and return a trivial allocationResult to avoid re-allocating + return packAllocationResponse(req, &state.AllocationInfo{}, nil, nil) + } + + // check allocationInfo is nil or not + podEntries := p.state.GetPodEntries() + allocationInfo := p.state.GetAllocationInfo(req.PodUid, req.ContainerName) + + if allocationInfo != nil { + if allocationInfo.Egress >= uint32(reqInt) && allocationInfo.Ingress >= uint32(reqInt) { + general.InfoS("already allocated and meet requirement", + "podNamespace", req.PodNamespace, + "podName", req.PodName, + "containerName", req.ContainerName, + "bandwidthReq(Mbps)", reqInt, + "currentResult(Mbps)", allocationInfo.Egress) + + resourceAllocationAnnotations, err := p.getResourceAllocationAnnotations(podAnnotations, allocationInfo) + if err != nil { + err = fmt.Errorf("getResourceAllocationAnnotations for pod: %s/%s, container: %s failed with error: %v", + req.PodNamespace, req.PodName, req.ContainerName, err) + general.Errorf("%s", err.Error()) + return nil, err + } + + resp, packErr := packAllocationResponse(req, allocationInfo, req.Hint, resourceAllocationAnnotations) + if packErr != nil { + general.Errorf("pod: %s/%s, container: %s packAllocationResponse failed with error: %v", + req.PodNamespace, req.PodName, req.ContainerName, packErr) + return nil, fmt.Errorf("packAllocationResponse failed with error: %v", packErr) + } + return resp, nil + } else { + general.InfoS("not meet requirement, clear record and re-allocate", + "podNamespace", req.PodNamespace, + "podName", req.PodName, + "containerName", req.ContainerName, + "bandwidthReq(Mbps)", reqInt, + "currentResult(Mbps)", allocationInfo.Egress) + delete(podEntries, req.PodUid) + + _, stateErr := state.GenerateMachineStateFromPodEntries(p.qrmConfig, p.nics, podEntries, p.state.GetReservedBandwidth()) + if stateErr != nil { + general.ErrorS(stateErr, "generateNetworkMachineStateByPodEntries failed", + "podNamespace", req.PodNamespace, + "podName", req.PodName, + "containerName", req.ContainerName, + "bandwidthReq(Mbps)", reqInt, + "currentResult(Mbps)", allocationInfo.Egress) + return nil, fmt.Errorf("generateNetworkMachineStateByPodEntries failed with error: %v", stateErr) + } + } } - selectedNIC, err := p.selectNICByReq(req) + candidateNICs, err := p.selectNICsByReq(req) if err != nil { - err = fmt.Errorf("selectNICByReq for pod: %s/%s, container: %s failed with error: %v", - req.PodNamespace, req.PodName, req.ContainerName, err) + err = fmt.Errorf("selectNICsByReq for pod: %s/%s, container: %s, reqInt: %d, failed with error: %v", + req.PodNamespace, req.PodName, req.ContainerName, reqInt, err) general.Errorf("%s", err.Error()) return nil, err } - resourceAllocationAnnotations, err := p.getResourceAllocationAnnotations(podAnnotations, selectedNIC) + if len(candidateNICs) == 0 { + general.ErrorS(err, "insufficient bandwidth on this node to satisfy the request", + "podNamespace", req.PodNamespace, + "podName", req.PodName, + "containerName", req.ContainerName, + "netBandwidthReq(Mbps)", reqInt, + "nicState", p.state.GetMachineState().String()) + return nil, fmt.Errorf("failed to meet the bandwidth requirement of %d Mbps", reqInt) + } + + // we only support one policy and hard code it for now + // TODO: make the policy configurable + selectedNIC := selectOneNIC(candidateNICs, RandomOne) + general.Infof("select NIC %s to allocate bandwidth (%dMbps)", selectedNIC.Iface, reqInt) + + siblingNUMAs, err := machine.GetSiblingNUMAs(selectedNIC.NumaNode, p.agentCtx.CPUTopology) + if err != nil { + general.Errorf("get siblingNUMAs for nic: %s failed with error: %v. Incorrect NumaNodes in machineState allocationInfo", selectedNIC.Iface, err) + } + + // generate the response hint + // it could be different from the req.Hint if the affinitive NIC does not have sufficient bandwidth + nicPreference, err := checkNICPreferenceOfReq(selectedNIC, req.Annotations) + if err != nil { + return nil, fmt.Errorf("checkNICPreferenceOfReq for nic: %s failed with error: %v", selectedNIC.Iface, err) + } + + respHint := &pluginapi.TopologyHint{ + Nodes: siblingNUMAs.ToSliceUInt64(), + Preferred: nicPreference, + } + + // generate allocationInfo and update the checkpoint accordingly + newAllocation := &state.AllocationInfo{ + PodUid: req.PodUid, + PodNamespace: req.PodNamespace, + PodName: req.PodName, + ContainerName: req.ContainerName, + ContainerType: req.ContainerType.String(), + ContainerIndex: req.ContainerIndex, + PodRole: req.PodRole, + PodType: req.PodType, + Egress: uint32(reqInt), + Ingress: uint32(reqInt), + IfName: selectedNIC.Iface, + NumaNodes: siblingNUMAs, + Labels: general.DeepCopyMap(req.Labels), + Annotations: general.DeepCopyMap(req.Annotations), + } + + resourceAllocationAnnotations, err := p.getResourceAllocationAnnotations(podAnnotations, newAllocation) if err != nil { err = fmt.Errorf("getResourceAllocationAnnotations for pod: %s/%s, container: %s failed with error: %v", req.PodNamespace, req.PodName, req.ContainerName, err) @@ -465,8 +633,24 @@ func (p *StaticPolicy) Allocate(_ context.Context, return nil, err } - // TODO fill it with allocated bandwidth quantity - return packAllocationResponse(req, p.ResourceName(), 0, resourceAllocationAnnotations) + // update PodEntries + p.state.SetAllocationInfo(req.PodUid, req.ContainerName, newAllocation) + + machineState, stateErr := state.GenerateMachineStateFromPodEntries(p.qrmConfig, p.nics, p.state.GetPodEntries(), p.state.GetReservedBandwidth()) + if stateErr != nil { + general.ErrorS(stateErr, "generateNetworkMachineStateByPodEntries failed", + "podNamespace", req.PodNamespace, + "podName", req.PodName, + "containerName", req.ContainerName, + "bandwidthReq(Mbps)", reqInt, + "currentResult(Mbps)", allocationInfo.Egress) + return nil, fmt.Errorf("generateNetworkMachineStateByPodEntries failed with error: %v", stateErr) + } + + // update state cache + p.state.SetMachineState(machineState) + + return packAllocationResponse(req, newAllocation, respHint, resourceAllocationAnnotations) } // PreStartContainer is called, if indicated by resource plugin during registration phase, @@ -546,25 +730,211 @@ func (p *StaticPolicy) applyNetClass() { } } -func (p *StaticPolicy) removePod(podUID string) error { - cgIDList, err := p.metaServer.ExternalManager.ListCgroupIDsForPod(podUID) +func (p *StaticPolicy) filterAvailableNICsByBandwidth(nics []machine.InterfaceInfo, req *pluginapi.ResourceRequest, _ *agent.GenericContext) []machine.InterfaceInfo { + filteredNICs := make([]machine.InterfaceInfo, 0, len(nics)) + + if req == nil { + general.Infof("filterNICsByBandwidth got nil req") + return nil + } + + reqInt, err := util.GetQuantityFromResourceReq(req) if err != nil { - if general.IsErrNotFound(err) { - general.Warningf("cgroup ids for pod not found") - return nil + general.Errorf("getReqQuantityFromResourceReq failed with error: %v", err) + return nil + } + + machineState := p.state.GetMachineState() + if len(machineState) == 0 || len(nics) == 0 { + general.Errorf("filterNICsByBandwidth with 0 NIC") + return nil + } + + // filter NICs by available bandwidth + for _, iface := range nics { + if machineState[iface.Iface].EgressState.Free >= uint32(reqInt) && machineState[iface.Iface].IngressState.Free >= uint32(reqInt) { + filteredNICs = append(filteredNICs, iface) + } + } + + // no nic meets the bandwidth request + if len(filteredNICs) == 0 { + general.InfoS("nic list returned by filtereNICsByBandwidth is empty", + "podNamespace", req.PodNamespace, + "podName", req.PodName, + "containerName", req.ContainerName) + } + + return filteredNICs +} + +func (p *StaticPolicy) calculateHints(req *pluginapi.ResourceRequest) (map[string]*pluginapi.ListOfTopologyHints, error) { + // resp.hints: 1) empty, means no resource (i.e. NIC) meeting requirements found; 2) nil, does not care about the hints + // since NIC is a kind of topology-aware resource, it is incorrect to return nil + hints := map[string]*pluginapi.ListOfTopologyHints{ + p.ResourceName(): { + Hints: []*pluginapi.TopologyHint{}, + }, + } + + candidateNICs, err := p.selectNICsByReq(req) + if err != nil { + return hints, fmt.Errorf("failed to select available NICs: %v", err) + } + + if len(candidateNICs) == 0 { + general.InfoS("candidateNICs is empty", + "podNamespace", req.PodNamespace, + "podName", req.PodName, + "containerName", req.ContainerName) + // if the req.NS asks to allocate on the 1st NIC which does not have sufficient bandwidth, candidateNICs is empty. + // however, we should not return directly here. To indicate the option of the 2nd NIC if no restricted affinity or ns requested, we return [0,1,2,3] instead. + } + + numasToHintMap := make(map[string]*pluginapi.TopologyHint) + for _, nic := range candidateNICs { + siblingNUMAs, err := machine.GetSiblingNUMAs(nic.NumaNode, p.agentCtx.CPUTopology) + if err != nil { + return nil, fmt.Errorf("get siblingNUMAs for nic: %s failed with error: %v", nic.Iface, err) + } + + nicPreference, err := checkNICPreferenceOfReq(nic, req.Annotations) + if err != nil { + return nil, fmt.Errorf("checkNICPreferenceOfReq for nic: %s failed with error: %v", nic.Iface, err) + } + + siblingNUMAsStr := siblingNUMAs.String() + if numasToHintMap[siblingNUMAsStr] == nil { + numasToHintMap[siblingNUMAsStr] = &pluginapi.TopologyHint{ + Nodes: siblingNUMAs.ToSliceUInt64(), + } + } + + if nicPreference { + general.InfoS("set nic preferred to true", + "podNamespace", req.PodNamespace, + "podName", req.PodName, + "containerName", req.ContainerName, + "nic", nic.Iface) + numasToHintMap[siblingNUMAsStr].Preferred = nicPreference } - return fmt.Errorf("[NetworkStaticPolicy.removePod] list cgroup ids of pod: %s failed with error: %v", podUID, err) } - for _, cgID := range cgIDList { - go func(cgID uint64) { - if err := p.metaServer.ExternalManager.ClearNetClass(cgID); err != nil { - general.Errorf("delete net class failed, cgID: %v, err: %v", cgID, err) - return + for _, hint := range numasToHintMap { + hints[p.ResourceName()].Hints = append(hints[p.ResourceName()].Hints, hint) + } + + // check if restricted affinity or ns requested + if !isReqAffinityRestricted(req.Annotations) && !isReqNamespaceRestricted(req.Annotations) { + general.InfoS("add all NUMAs to hint to avoid affinity error", + "podNamespace", req.PodNamespace, + "podName", req.PodName, + "containerName", req.ContainerName, + req.Annotations[apiconsts.PodAnnotationNetworkEnhancementAffinityRestricted], + apiconsts.PodAnnotationNetworkEnhancementAffinityRestrictedTrue) + + hints[p.ResourceName()].Hints = append(hints[p.ResourceName()].Hints, &pluginapi.TopologyHint{ + Nodes: p.agentCtx.CPUDetails.NUMANodes().ToSliceUInt64(), + }) + } + + return hints, nil +} + +/* +The NIC selection depends on the following three aspects: available Bandwidth on each NIC, Namespace parameter in request, and req.Hints. +1) The availability of sufficient bandwidth on the NIC is a prerequisite for determining whether the card can be selected. +If there is insufficient bandwidth on a NIC, it cannot be included in the candidate list. + +2) We may put NICs into separate net namespaces in order to use both NICs simultaneously (Host network mode). +If a container wants to request a specific NIC through the namespace parameter, this requirement must also be met. +If the specified NIC has insufficient bandwidth, it cannot be included in the candidate list. + +3) The req.Hints parameter represents the affinity of a NIC. For example, a socket container running on a specific socket +may use req.Hints to prioritize the selection of a NIC connected to that socket. However, this requirement is only satisfied as much as possible. +If the NIC connected to the socket has sufficient bandwidth, only this NIC is returned. Otherwise, other cards with sufficient bandwidth will be returned. +*/ +func (p *StaticPolicy) selectNICsByReq(req *pluginapi.ResourceRequest) ([]machine.InterfaceInfo, error) { + nicFilters := []NICFilter{ + p.filterAvailableNICsByBandwidth, + filterNICsByNamespaceType, + filterNICsByHint, + } + + candidateNICs, err := filterAvailableNICsByReq(p.nics, req, p.agentCtx, nicFilters) + if err != nil { + return nil, fmt.Errorf("filterAvailableNICsByReq failed with error: %v", err) + } + + // this node can not meet the combined requests + if len(candidateNICs) == 0 { + general.InfoS("nic list returned by filterAvailableNICsByReq is empty", + "podNamespace", req.PodNamespace, + "podName", req.PodName, + "containerName", req.ContainerName) + } + + return candidateNICs, nil +} + +func (p *StaticPolicy) getResourceAllocationAnnotations(podAnnotations map[string]string, allocation *state.AllocationInfo) (map[string]string, error) { + netClsID, err := p.getNetClassID(podAnnotations, p.podLevelNetClassAnnoKey) + if err != nil { + return nil, fmt.Errorf("getNetClassID failed with error: %v", err) + } + + selectedNIC := p.getNICByName(allocation.IfName) + + resourceAllocationAnnotations := map[string]string{ + p.ipv4ResourceAllocationAnnotationKey: strings.Join(selectedNIC.GetNICIPs(machine.IPVersionV4), IPsSeparator), + p.ipv6ResourceAllocationAnnotationKey: strings.Join(selectedNIC.GetNICIPs(machine.IPVersionV6), IPsSeparator), + p.netInterfaceNameResourceAllocationAnnotationKey: selectedNIC.Iface, + p.netClassIDResourceAllocationAnnotationKey: fmt.Sprintf("%d", netClsID), + // TODO: support differentiated Egress/Ingress bandwidth later + p.netBandwidthResourceAllocationAnnotationKey: strconv.Itoa(int(allocation.Egress)), + } + + if len(selectedNIC.NSAbsolutePath) > 0 { + resourceAllocationAnnotations[p.netNSPathResourceAllocationAnnotationKey] = selectedNIC.NSAbsolutePath + } + + return resourceAllocationAnnotations, nil +} + +func (p *StaticPolicy) removePod(podUID string) error { + if p.CgroupV2Env { + cgIDList, err := p.metaServer.ExternalManager.ListCgroupIDsForPod(podUID) + if err != nil { + if general.IsErrNotFound(err) { + general.Warningf("cgroup ids for pod not found") + return nil } - }(cgID) + return fmt.Errorf("[NetworkStaticPolicy.removePod] list cgroup ids of pod: %s failed with error: %v", podUID, err) + } + + for _, cgID := range cgIDList { + go func(cgID uint64) { + if err := p.metaServer.ExternalManager.ClearNetClass(cgID); err != nil { + general.Errorf("delete net class failed, cgID: %v, err: %v", cgID, err) + return + } + }(cgID) + } + } + + // update state cache + podEntries := p.state.GetPodEntries() + delete(podEntries, podUID) + + machineState, err := state.GenerateMachineStateFromPodEntries(p.qrmConfig, p.nics, podEntries, p.state.GetReservedBandwidth()) + if err != nil { + general.Errorf("pod: %s, GenerateMachineStateFromPodEntries failed with error: %v", podUID, err) + return fmt.Errorf("calculate machineState by updated pod entries failed with error: %v", err) } + p.state.SetPodEntries(podEntries) + p.state.SetMachineState(machineState) + return nil } @@ -591,3 +961,29 @@ func (p *StaticPolicy) getNetClassIDByQoSLevel(qosLevel string) (uint32, error) return 0, fmt.Errorf("netClsID for qosLevel: %s isn't found", qosLevel) } } + +func (p *StaticPolicy) getNICByName(ifName string) machine.InterfaceInfo { + for idx := range p.nics { + if p.nics[idx].Iface == ifName { + return p.nics[idx] + } + } + + return machine.InterfaceInfo{} +} + +// return the Socket id/index that the specified NIC attached to +func (p *StaticPolicy) getSocketIDByNIC(ifName string) (int, error) { + for _, iface := range p.nics { + if iface.Iface == ifName { + socketIDs := p.agentCtx.KatalystMachineInfo.CPUDetails.SocketsInNUMANodes(iface.NumaNode) + if socketIDs.Size() == 0 { + return -1, fmt.Errorf("failed to find the associated socket ID for the specified NIC %s", ifName) + } + + return socketIDs.ToSliceInt()[0], nil + } + } + + return -1, fmt.Errorf("invalid NIC name - failed to find a matched NIC") +} diff --git a/pkg/agent/qrm-plugins/network/staticpolicy/policy_test.go b/pkg/agent/qrm-plugins/network/staticpolicy/policy_test.go index f28570fd3..40c0ee3b2 100644 --- a/pkg/agent/qrm-plugins/network/staticpolicy/policy_test.go +++ b/pkg/agent/qrm-plugins/network/staticpolicy/policy_test.go @@ -19,10 +19,13 @@ package staticpolicy import ( "context" "fmt" + "io/ioutil" "net" + "os" "sort" "testing" + info "github.com/google/cadvisor/info/v1" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" v1 "k8s.io/api/core/v1" @@ -31,10 +34,13 @@ import ( "k8s.io/apimachinery/pkg/util/uuid" pluginapi "k8s.io/kubelet/pkg/apis/resourceplugin/v1alpha1" + apinode "github.com/kubewharf/katalyst-api/pkg/apis/node/v1alpha1" "github.com/kubewharf/katalyst-api/pkg/consts" + apiconsts "github.com/kubewharf/katalyst-api/pkg/consts" katalyst_base "github.com/kubewharf/katalyst-core/cmd/base" "github.com/kubewharf/katalyst-core/cmd/katalyst-agent/app/agent" "github.com/kubewharf/katalyst-core/cmd/katalyst-agent/app/options" + "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/network/state" "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/util" "github.com/kubewharf/katalyst-core/pkg/config" "github.com/kubewharf/katalyst-core/pkg/metaserver" @@ -69,6 +75,9 @@ const ( testEth0NSAbsolutePath = "" testEth0NSName = "" + testEth1Name = "eth1" + testEth1AffinitiveNUMANode = 1 + testEth2Name = "eth2" testEth2AffinitiveNUMANode = 2 testEth2NSAbsolutePath = "/var/run/ns2" @@ -119,8 +128,37 @@ func makeStaticPolicy(t *testing.T) *StaticPolicy { Val: NetworkResourcePluginPolicyNameStatic, }) + cpuTopology, err := machine.GenerateDummyCPUTopology(16, 2, 4) + assert.NoError(t, err) + agentCtx.KatalystMachineInfo.CPUTopology = cpuTopology + + mockQrmConfig := generateTestConfiguration(t).QRMPluginsConfiguration + mockQrmConfig.ReservedBandwidth = 4000 + mockQrmConfig.EgressCapacityRate = 0.9 + mockQrmConfig.IngressCapacityRate = 0.85 + + nics := makeNICs() + availableNICs := filterNICsByAvailability(nics, nil, nil) + assert.Len(t, availableNICs, 2) + + expectedReservation := map[string]uint32{ + testEth0Name: 4000, + } + reservation, err := getReservedBandwidth(availableNICs, mockQrmConfig.ReservedBandwidth, FirstNIC) + assert.NoError(t, err) + assert.Equal(t, expectedReservation, reservation) + + tmpDir, err := ioutil.TempDir("", "checkpoint") + assert.NoError(t, err) + defer os.RemoveAll(tmpDir) + + stateImpl, err := state.NewCheckpointState(mockQrmConfig, tmpDir, NetworkPluginStateFileName, + NetworkResourcePluginPolicyNameStatic, &info.MachineInfo{}, availableNICs, reservation, false) + assert.NoError(t, err) + return &StaticPolicy{ qosConfig: generateTestConfiguration(t).QoSConfiguration, + qrmConfig: mockQrmConfig, emitter: wrappedEmitter, metaServer: agentCtx.MetaServer, stopCh: make(chan struct{}), @@ -130,13 +168,14 @@ func makeStaticPolicy(t *testing.T) *StaticPolicy { consts.PodAnnotationQoSLevelReclaimedCores: testDefaultReclaimedNetClsId, consts.PodAnnotationQoSLevelDedicatedCores: testDefaultDedicatedNetClsId, }, - agentCtx: agentCtx, - nics: makeNICs(), - podLevelNetClassAnnoKey: consts.PodAnnotationNetClassKey, - podLevelNetAttributesAnnoKeys: []string{}, - ipv4ResourceAllocationAnnotationKey: testIPv4ResourceAllocationAnnotationKey, - ipv6ResourceAllocationAnnotationKey: testIPv6ResourceAllocationAnnotationKey, - netNSPathResourceAllocationAnnotationKey: testNetNSPathResourceAllocationAnnotationKey, + agentCtx: agentCtx, + nics: availableNICs, + state: stateImpl, + podLevelNetClassAnnoKey: consts.PodAnnotationNetClassKey, + podLevelNetAttributesAnnoKeys: []string{}, + ipv4ResourceAllocationAnnotationKey: testIPv4ResourceAllocationAnnotationKey, + ipv6ResourceAllocationAnnotationKey: testIPv6ResourceAllocationAnnotationKey, + netNSPathResourceAllocationAnnotationKey: testNetNSPathResourceAllocationAnnotationKey, netInterfaceNameResourceAllocationAnnotationKey: testNetInterfaceNameResourceAllocationAnnotationKey, netClassIDResourceAllocationAnnotationKey: testNetClassIDResourceAllocationAnnotationKey, netBandwidthResourceAllocationAnnotationKey: testNetBandwidthResourceAllocationAnnotationKey, @@ -150,6 +189,7 @@ func makeNICs() []machine.InterfaceInfo { return []machine.InterfaceInfo{ { Iface: testEth0Name, + Speed: 25000, NumaNode: testEth0AffinitiveNUMANode, Enable: true, Addr: &machine.IfaceAddr{ @@ -158,8 +198,16 @@ func makeNICs() []machine.InterfaceInfo { NSAbsolutePath: testEth0NSAbsolutePath, NSName: testEth0NSName, }, + { + Iface: testEth1Name, + Speed: 25000, + NumaNode: testEth1AffinitiveNUMANode, + Enable: false, + Addr: &machine.IfaceAddr{}, + }, { Iface: testEth2Name, + Speed: 25000, NumaNode: testEth2AffinitiveNUMANode, Enable: true, Addr: &machine.IfaceAddr{ @@ -174,12 +222,24 @@ func makeNICs() []machine.InterfaceInfo { func TestNewStaticPolicy(t *testing.T) { t.Parallel() - neetToRun, policy, err := NewStaticPolicy(makeTestGenericContext(t), generateTestConfiguration(t), nil, NetworkResourcePluginPolicyNameStatic) + agentCtx := makeTestGenericContext(t) + agentCtx.KatalystMachineInfo.ExtraNetworkInfo.Interface = makeNICs() + agentCtx.MachineInfo = &info.MachineInfo{} + + conf := generateTestConfiguration(t) + conf.QRMPluginsConfiguration.ReservedBandwidth = 4000 + conf.QRMPluginsConfiguration.EgressCapacityRate = 0.9 + conf.QRMPluginsConfiguration.IngressCapacityRate = 0.85 + + tmpDir, err := ioutil.TempDir("", "checkpoint") + assert.NoError(t, err) + defer os.RemoveAll(tmpDir) + conf.GenericQRMPluginConfiguration.StateFileDirectory = tmpDir + + neetToRun, policy, err := NewStaticPolicy(agentCtx, conf, nil, NetworkResourcePluginPolicyNameStatic) assert.NoError(t, err) assert.NotNil(t, policy) assert.True(t, neetToRun) - - return } func TestRemovePod(t *testing.T) { @@ -188,14 +248,69 @@ func TestRemovePod(t *testing.T) { policy := makeStaticPolicy(t) assert.NotNil(t, policy) - req := &pluginapi.RemovePodRequest{ - PodUid: string(uuid.NewUUID()), + podID := string(uuid.NewUUID()) + testName := "test" + var bwReq float64 = 5000 + + // create a new Pod with bandwidth request + addReq := &pluginapi.ResourceRequest{ + PodUid: podID, + PodNamespace: testName, + PodName: testName, + ContainerName: testName, + ContainerType: pluginapi.ContainerType_MAIN, + ContainerIndex: 0, + ResourceName: string(consts.ResourceNetBandwidth), + Hint: &pluginapi.TopologyHint{ + Nodes: []uint64{0, 1}, + Preferred: true, + }, + ResourceRequests: map[string]float64{ + string(consts.ResourceNetBandwidth): bwReq, + }, + Labels: map[string]string{ + consts.PodAnnotationQoSLevelKey: consts.PodAnnotationQoSLevelSharedCores, + }, + Annotations: map[string]string{ + consts.PodAnnotationNetClassKey: testSharedNetClsId, + consts.PodAnnotationQoSLevelKey: consts.PodAnnotationQoSLevelSharedCores, + consts.PodAnnotationNetworkEnhancementKey: testHostPreferEnhancementValue, + }, + } + + resp, err := policy.Allocate(context.Background(), addReq) + + // verify the state + allocationInfo := policy.state.GetAllocationInfo(podID, testName) + machineState := policy.state.GetMachineState() + podEntries := policy.state.GetPodEntries() + assert.NoError(t, err) + assert.NotNil(t, resp) + assert.Equal(t, allocationInfo.IfName, testEth0Name) + assert.Equal(t, allocationInfo.Egress, uint32(bwReq)) + assert.Equal(t, allocationInfo.Ingress, uint32(bwReq)) + assert.Len(t, machineState, 2) + assert.Len(t, machineState[testEth0Name].PodEntries, 1) + assert.EqualValues(t, machineState[testEth0Name].PodEntries[podID][testName], allocationInfo) + assert.Len(t, podEntries, 1) + assert.EqualValues(t, podEntries, machineState[testEth0Name].PodEntries) + + // remove the pod + delReq := &pluginapi.RemovePodRequest{ + PodUid: podID, } - _, err := policy.RemovePod(context.TODO(), req) + _, err = policy.RemovePod(context.TODO(), delReq) assert.NoError(t, err) - return + // verify the state again + allocationInfo = policy.state.GetAllocationInfo(podID, testName) + machineState = policy.state.GetMachineState() + podEntries = policy.state.GetPodEntries() + assert.Nil(t, allocationInfo) + assert.Len(t, machineState, 2) + assert.Len(t, machineState[testEth0Name].PodEntries, 0) + assert.Len(t, podEntries, 0) } func TestAllocate(t *testing.T) { @@ -205,20 +320,24 @@ func TestAllocate(t *testing.T) { testCases := []struct { description string + noError bool req *pluginapi.ResourceRequest expectedResp *pluginapi.ResourceAllocationResponse }{ { description: "req for init container", + noError: true, req: &pluginapi.ResourceRequest{ - PodUid: string(uuid.NewUUID()), - PodNamespace: testName, - PodName: testName, - ContainerName: testName, - ContainerType: pluginapi.ContainerType_INIT, - ContainerIndex: 0, - ResourceName: string(consts.ResourceNetBandwidth), - ResourceRequests: make(map[string]float64), + PodUid: string(uuid.NewUUID()), + PodNamespace: testName, + PodName: testName, + ContainerName: testName, + ContainerType: pluginapi.ContainerType_INIT, + ContainerIndex: 0, + ResourceName: string(consts.ResourceNetBandwidth), + ResourceRequests: map[string]float64{ + string(consts.ResourceNetBandwidth): 5000, + }, Labels: map[string]string{ consts.PodAnnotationQoSLevelKey: consts.PodAnnotationQoSLevelSharedCores, }, @@ -244,6 +363,7 @@ func TestAllocate(t *testing.T) { }, { description: "req for shared_cores main container with host netns preference", + noError: true, req: &pluginapi.ResourceRequest{ PodUid: string(uuid.NewUUID()), PodNamespace: testName, @@ -256,7 +376,9 @@ func TestAllocate(t *testing.T) { Nodes: []uint64{0, 1}, Preferred: true, }, - ResourceRequests: make(map[string]float64), + ResourceRequests: map[string]float64{ + string(consts.ResourceNetBandwidth): 5000, + }, Labels: map[string]string{ consts.PodAnnotationQoSLevelKey: consts.PodAnnotationQoSLevelSharedCores, }, @@ -276,14 +398,16 @@ func TestAllocate(t *testing.T) { AllocationResult: &pluginapi.ResourceAllocation{ ResourceAllocation: map[string]*pluginapi.ResourceAllocationInfo{ string(consts.ResourceNetBandwidth): { - IsNodeResource: false, + IsNodeResource: true, IsScalarResource: true, - AllocatedQuantity: 0, + AllocatedQuantity: 5000, + AllocationResult: machine.NewCPUSet(0, 1).String(), Annotations: map[string]string{ testIPv4ResourceAllocationAnnotationKey: testEth0IPv4, testIPv6ResourceAllocationAnnotationKey: "", testNetInterfaceNameResourceAllocationAnnotationKey: testEth0Name, testNetClassIDResourceAllocationAnnotationKey: testSharedNetClsId, + testNetBandwidthResourceAllocationAnnotationKey: "5000", }, ResourceHints: &pluginapi.ListOfTopologyHints{ Hints: []*pluginapi.TopologyHint{ @@ -307,6 +431,7 @@ func TestAllocate(t *testing.T) { }, { description: "req for reclaimed_cores main container with not host netns preference", + noError: true, req: &pluginapi.ResourceRequest{ PodUid: string(uuid.NewUUID()), PodNamespace: testName, @@ -319,7 +444,9 @@ func TestAllocate(t *testing.T) { Nodes: []uint64{2, 3}, Preferred: true, }, - ResourceRequests: make(map[string]float64), + ResourceRequests: map[string]float64{ + string(consts.ResourceNetBandwidth): 5000, + }, Annotations: map[string]string{ consts.PodAnnotationNetClassKey: testReclaimedNetClsId, consts.PodAnnotationQoSLevelKey: consts.PodAnnotationQoSLevelReclaimedCores, @@ -339,15 +466,17 @@ func TestAllocate(t *testing.T) { AllocationResult: &pluginapi.ResourceAllocation{ ResourceAllocation: map[string]*pluginapi.ResourceAllocationInfo{ string(consts.ResourceNetBandwidth): { - IsNodeResource: false, + IsNodeResource: true, IsScalarResource: true, - AllocatedQuantity: 0, + AllocatedQuantity: 5000, + AllocationResult: machine.NewCPUSet(2, 3).String(), Annotations: map[string]string{ testIPv4ResourceAllocationAnnotationKey: "", testIPv6ResourceAllocationAnnotationKey: testEth2IPv6, testNetNSPathResourceAllocationAnnotationKey: testEth2NSAbsolutePath, testNetInterfaceNameResourceAllocationAnnotationKey: testEth2Name, testNetClassIDResourceAllocationAnnotationKey: testReclaimedNetClsId, + testNetBandwidthResourceAllocationAnnotationKey: "5000", }, ResourceHints: &pluginapi.ListOfTopologyHints{ Hints: []*pluginapi.TopologyHint{ @@ -371,6 +500,7 @@ func TestAllocate(t *testing.T) { }, { description: "req for dedicated_cores main container with host netns guarantee", + noError: true, req: &pluginapi.ResourceRequest{ PodUid: string(uuid.NewUUID()), PodNamespace: testName, @@ -383,7 +513,9 @@ func TestAllocate(t *testing.T) { Nodes: []uint64{0, 1}, Preferred: true, }, - ResourceRequests: make(map[string]float64), + ResourceRequests: map[string]float64{ + string(consts.ResourceNetBandwidth): 5000, + }, Annotations: map[string]string{ consts.PodAnnotationQoSLevelKey: consts.PodAnnotationQoSLevelDedicatedCores, consts.PodAnnotationNetworkEnhancementKey: testHostEnhancementValue, @@ -402,14 +534,16 @@ func TestAllocate(t *testing.T) { AllocationResult: &pluginapi.ResourceAllocation{ ResourceAllocation: map[string]*pluginapi.ResourceAllocationInfo{ string(consts.ResourceNetBandwidth): { - IsNodeResource: false, + IsNodeResource: true, IsScalarResource: true, - AllocatedQuantity: 0, + AllocatedQuantity: 5000, + AllocationResult: machine.NewCPUSet(0, 1).String(), Annotations: map[string]string{ testIPv4ResourceAllocationAnnotationKey: testEth0IPv4, testIPv6ResourceAllocationAnnotationKey: "", testNetInterfaceNameResourceAllocationAnnotationKey: testEth0Name, testNetClassIDResourceAllocationAnnotationKey: fmt.Sprintf("%d", testDefaultDedicatedNetClsId), + testNetBandwidthResourceAllocationAnnotationKey: "5000", }, ResourceHints: &pluginapi.ListOfTopologyHints{ Hints: []*pluginapi.TopologyHint{ @@ -431,19 +565,119 @@ func TestAllocate(t *testing.T) { }, }, }, + { + description: "req for dedicated_cores main container with host netns guarantee and exceeded bandwidth over the 1st NIC", + noError: false, + req: &pluginapi.ResourceRequest{ + PodUid: string(uuid.NewUUID()), + PodNamespace: testName, + PodName: testName, + ContainerName: testName, + ContainerType: pluginapi.ContainerType_MAIN, + ContainerIndex: 0, + ResourceName: string(consts.ResourceNetBandwidth), + Hint: &pluginapi.TopologyHint{ + Nodes: []uint64{0, 1}, + Preferred: true, + }, + ResourceRequests: map[string]float64{ + string(consts.ResourceNetBandwidth): 20000, + }, + Annotations: map[string]string{ + consts.PodAnnotationQoSLevelKey: consts.PodAnnotationQoSLevelDedicatedCores, + consts.PodAnnotationNetworkEnhancementKey: testHostEnhancementValue, + }, + Labels: map[string]string{ + consts.PodAnnotationQoSLevelKey: consts.PodAnnotationQoSLevelDedicatedCores, + }, + }, + expectedResp: nil, + }, + { + description: "req for dedicated_cores main container with host netns guarantee and exceeded bandwidth over the 1st NIC which is preferred", + noError: false, + req: &pluginapi.ResourceRequest{ + PodUid: string(uuid.NewUUID()), + PodNamespace: testName, + PodName: testName, + ContainerName: testName, + ContainerType: pluginapi.ContainerType_MAIN, + ContainerIndex: 0, + ResourceName: string(consts.ResourceNetBandwidth), + Hint: &pluginapi.TopologyHint{ + Nodes: []uint64{0, 1}, + Preferred: true, + }, + ResourceRequests: map[string]float64{ + string(consts.ResourceNetBandwidth): 20000, + }, + Annotations: map[string]string{ + consts.PodAnnotationQoSLevelKey: consts.PodAnnotationQoSLevelDedicatedCores, + consts.PodAnnotationNetworkEnhancementKey: testHostPreferEnhancementValue, + }, + Labels: map[string]string{ + consts.PodAnnotationQoSLevelKey: consts.PodAnnotationQoSLevelDedicatedCores, + }, + }, + expectedResp: &pluginapi.ResourceAllocationResponse{ + PodNamespace: testName, + PodName: testName, + ContainerName: testName, + ContainerType: pluginapi.ContainerType_MAIN, + ContainerIndex: 0, + ResourceName: string(consts.ResourceNetBandwidth), + AllocationResult: &pluginapi.ResourceAllocation{ + ResourceAllocation: map[string]*pluginapi.ResourceAllocationInfo{ + string(consts.ResourceNetBandwidth): { + IsNodeResource: true, + IsScalarResource: true, + AllocatedQuantity: 20000, + AllocationResult: machine.NewCPUSet(2, 3).String(), + Annotations: map[string]string{ + testIPv4ResourceAllocationAnnotationKey: testEth2IPv6, + testIPv6ResourceAllocationAnnotationKey: "", + testNetInterfaceNameResourceAllocationAnnotationKey: testEth2Name, + testNetClassIDResourceAllocationAnnotationKey: fmt.Sprintf("%d", testDefaultDedicatedNetClsId), + testNetBandwidthResourceAllocationAnnotationKey: "20000", + }, + ResourceHints: &pluginapi.ListOfTopologyHints{ + Hints: []*pluginapi.TopologyHint{ + { + Nodes: []uint64{2, 3}, + Preferred: false, + }, + }, + }, + }, + }, + }, + Labels: map[string]string{ + consts.PodAnnotationQoSLevelKey: consts.PodAnnotationQoSLevelDedicatedCores, + }, + Annotations: map[string]string{ + consts.PodAnnotationQoSLevelKey: consts.PodAnnotationQoSLevelDedicatedCores, + consts.PodAnnotationNetworkEnhancementNamespaceType: consts.PodAnnotationNetworkEnhancementNamespaceTypeHostPrefer, + }, + }, + }, } for _, tc := range testCases { staticPolicy := makeStaticPolicy(t) resp, err := staticPolicy.Allocate(context.Background(), tc.req) - assert.NoError(t, err) - assert.NotNil(t, resp) - - tc.expectedResp.PodUid = tc.req.PodUid - t.Logf("expect: %v", tc.expectedResp.AllocationResult) - t.Logf("actucal: %v", resp.AllocationResult) - assert.Equalf(t, tc.expectedResp, resp, "failed in test case: %s", tc.description) + if tc.noError { + assert.NoError(t, err) + assert.NotNil(t, resp) + + tc.expectedResp.PodUid = tc.req.PodUid + t.Logf("expect: %v", tc.expectedResp.AllocationResult) + t.Logf("actucal: %v", resp.AllocationResult) + assert.Equalf(t, tc.expectedResp, resp, "failed in test case: %s", tc.description) + } else { + assert.Error(t, err) + assert.Nil(t, resp) + } } } @@ -580,14 +814,16 @@ func TestGetTopologyHints(t *testing.T) { { description: "req for init container", req: &pluginapi.ResourceRequest{ - PodUid: string(uuid.NewUUID()), - PodNamespace: testName, - PodName: testName, - ContainerName: testName, - ContainerType: pluginapi.ContainerType_INIT, - ContainerIndex: 0, - ResourceName: string(consts.ResourceNetBandwidth), - ResourceRequests: make(map[string]float64), + PodUid: string(uuid.NewUUID()), + PodNamespace: testName, + PodName: testName, + ContainerName: testName, + ContainerType: pluginapi.ContainerType_INIT, + ContainerIndex: 0, + ResourceName: string(consts.ResourceNetBandwidth), + ResourceRequests: map[string]float64{ + string(consts.ResourceNetBandwidth): 5000, + }, Labels: map[string]string{ consts.PodAnnotationQoSLevelKey: consts.PodAnnotationQoSLevelSharedCores, }, @@ -616,14 +852,16 @@ func TestGetTopologyHints(t *testing.T) { { description: "req for shared_cores main container with host netns preference", req: &pluginapi.ResourceRequest{ - PodUid: string(uuid.NewUUID()), - PodNamespace: testName, - PodName: testName, - ContainerName: testName, - ContainerType: pluginapi.ContainerType_MAIN, - ContainerIndex: 0, - ResourceName: string(consts.ResourceNetBandwidth), - ResourceRequests: make(map[string]float64), + PodUid: string(uuid.NewUUID()), + PodNamespace: testName, + PodName: testName, + ContainerName: testName, + ContainerType: pluginapi.ContainerType_MAIN, + ContainerIndex: 0, + ResourceName: string(consts.ResourceNetBandwidth), + ResourceRequests: map[string]float64{ + string(consts.ResourceNetBandwidth): 5000, + }, Labels: map[string]string{ consts.PodAnnotationQoSLevelKey: consts.PodAnnotationQoSLevelSharedCores, }, @@ -670,14 +908,16 @@ func TestGetTopologyHints(t *testing.T) { { description: "req for reclaimed_cores main container with not host netns preference", req: &pluginapi.ResourceRequest{ - PodUid: string(uuid.NewUUID()), - PodNamespace: testName, - PodName: testName, - ContainerName: testName, - ContainerType: pluginapi.ContainerType_MAIN, - ContainerIndex: 0, - ResourceName: string(consts.ResourceNetBandwidth), - ResourceRequests: make(map[string]float64), + PodUid: string(uuid.NewUUID()), + PodNamespace: testName, + PodName: testName, + ContainerName: testName, + ContainerType: pluginapi.ContainerType_MAIN, + ContainerIndex: 0, + ResourceName: string(consts.ResourceNetBandwidth), + ResourceRequests: map[string]float64{ + string(consts.ResourceNetBandwidth): 5000, + }, Annotations: map[string]string{ consts.PodAnnotationNetClassKey: testReclaimedNetClsId, consts.PodAnnotationQoSLevelKey: consts.PodAnnotationQoSLevelReclaimedCores, @@ -724,14 +964,16 @@ func TestGetTopologyHints(t *testing.T) { { description: "req for dedicated_cores main container with host netns guarantee", req: &pluginapi.ResourceRequest{ - PodUid: string(uuid.NewUUID()), - PodNamespace: testName, - PodName: testName, - ContainerName: testName, - ContainerType: pluginapi.ContainerType_MAIN, - ContainerIndex: 0, - ResourceName: string(consts.ResourceNetBandwidth), - ResourceRequests: make(map[string]float64), + PodUid: string(uuid.NewUUID()), + PodNamespace: testName, + PodName: testName, + ContainerName: testName, + ContainerType: pluginapi.ContainerType_MAIN, + ContainerIndex: 0, + ResourceName: string(consts.ResourceNetBandwidth), + ResourceRequests: map[string]float64{ + string(consts.ResourceNetBandwidth): 5000, + }, Annotations: map[string]string{ consts.PodAnnotationQoSLevelKey: consts.PodAnnotationQoSLevelDedicatedCores, consts.PodAnnotationNetworkEnhancementKey: testHostEnhancementValue, @@ -766,6 +1008,99 @@ func TestGetTopologyHints(t *testing.T) { }, }, }, + { + description: "req for dedicated_cores main container with exceeded bandwidth over the 1st NIC which is preferred", + req: &pluginapi.ResourceRequest{ + PodUid: string(uuid.NewUUID()), + PodNamespace: testName, + PodName: testName, + ContainerName: testName, + ContainerType: pluginapi.ContainerType_MAIN, + ContainerIndex: 0, + ResourceName: string(consts.ResourceNetBandwidth), + ResourceRequests: map[string]float64{ + string(consts.ResourceNetBandwidth): 20000, + }, + Annotations: map[string]string{ + consts.PodAnnotationQoSLevelKey: consts.PodAnnotationQoSLevelDedicatedCores, + consts.PodAnnotationNetworkEnhancementKey: testHostPreferEnhancementValue, + }, + Labels: map[string]string{ + consts.PodAnnotationQoSLevelKey: consts.PodAnnotationQoSLevelDedicatedCores, + }, + }, + expectedResp: &pluginapi.ResourceHintsResponse{ + PodNamespace: testName, + PodName: testName, + ContainerName: testName, + ContainerType: pluginapi.ContainerType_MAIN, + ContainerIndex: 0, + ResourceName: string(consts.ResourceNetBandwidth), + ResourceHints: map[string]*pluginapi.ListOfTopologyHints{ + string(consts.ResourceNetBandwidth): { + Hints: []*pluginapi.TopologyHint{ + { + Nodes: []uint64{2, 3}, + Preferred: false, + }, + { + Nodes: []uint64{0, 1, 2, 3}, + Preferred: false, + }, + }, + }, + }, + Labels: map[string]string{ + consts.PodAnnotationQoSLevelKey: consts.PodAnnotationQoSLevelDedicatedCores, + }, + Annotations: map[string]string{ + consts.PodAnnotationQoSLevelKey: consts.PodAnnotationQoSLevelDedicatedCores, + consts.PodAnnotationNetworkEnhancementNamespaceType: consts.PodAnnotationNetworkEnhancementNamespaceTypeHostPrefer, + }, + }, + }, + { + description: "req for dedicated_cores main container with exceeded bandwidth over the 1st NIC which is required", + req: &pluginapi.ResourceRequest{ + PodUid: string(uuid.NewUUID()), + PodNamespace: testName, + PodName: testName, + ContainerName: testName, + ContainerType: pluginapi.ContainerType_MAIN, + ContainerIndex: 0, + ResourceName: string(consts.ResourceNetBandwidth), + ResourceRequests: map[string]float64{ + string(consts.ResourceNetBandwidth): 20000, + }, + Annotations: map[string]string{ + consts.PodAnnotationQoSLevelKey: consts.PodAnnotationQoSLevelDedicatedCores, + consts.PodAnnotationNetworkEnhancementKey: testHostEnhancementValue, + }, + Labels: map[string]string{ + consts.PodAnnotationQoSLevelKey: consts.PodAnnotationQoSLevelDedicatedCores, + }, + }, + expectedResp: &pluginapi.ResourceHintsResponse{ + PodNamespace: testName, + PodName: testName, + ContainerName: testName, + ContainerType: pluginapi.ContainerType_MAIN, + ContainerIndex: 0, + ResourceName: string(consts.ResourceNetBandwidth), + ResourceHints: map[string]*pluginapi.ListOfTopologyHints{ + string(consts.ResourceNetBandwidth): { + Hints: []*pluginapi.TopologyHint{}, + }, + }, + Labels: map[string]string{ + consts.PodAnnotationQoSLevelKey: consts.PodAnnotationQoSLevelDedicatedCores, + }, + Annotations: map[string]string{ + consts.PodAnnotationQoSLevelKey: consts.PodAnnotationQoSLevelDedicatedCores, + consts.PodAnnotationNetworkEnhancementNamespaceType: consts.PodAnnotationNetworkEnhancementNamespaceTypeHost, + }, + }, + }, } for _, tc := range testCases { @@ -835,13 +1170,63 @@ func TestGetTopologyAwareResources(t *testing.T) { policy := makeStaticPolicy(t) assert.NotNil(t, policy) + podID := string(uuid.NewUUID()) + testName := "test" + var bwReq float64 = 5000 + + // create a new Pod with bandwidth request + addReq := &pluginapi.ResourceRequest{ + PodUid: podID, + PodNamespace: testName, + PodName: testName, + ContainerName: testName, + ContainerType: pluginapi.ContainerType_MAIN, + ContainerIndex: 0, + ResourceName: string(consts.ResourceNetBandwidth), + Hint: &pluginapi.TopologyHint{ + Nodes: []uint64{0, 1}, + Preferred: true, + }, + ResourceRequests: map[string]float64{ + string(consts.ResourceNetBandwidth): bwReq, + }, + Labels: map[string]string{ + consts.PodAnnotationQoSLevelKey: consts.PodAnnotationQoSLevelSharedCores, + }, + Annotations: map[string]string{ + consts.PodAnnotationNetClassKey: testSharedNetClsId, + consts.PodAnnotationQoSLevelKey: consts.PodAnnotationQoSLevelSharedCores, + consts.PodAnnotationNetworkEnhancementKey: testHostPreferEnhancementValue, + }, + } + + _, err := policy.Allocate(context.Background(), addReq) + assert.NoError(t, err) + req := &pluginapi.GetTopologyAwareResourcesRequest{ - PodUid: string(uuid.NewUUID()), - ContainerName: "test-container-name", + PodUid: podID, + ContainerName: testName, } - _, err := policy.GetTopologyAwareResources(context.TODO(), req) + resp, err := policy.GetTopologyAwareResources(context.TODO(), req) assert.NoError(t, err) + assert.NotNil(t, resp) + assert.Len(t, resp.ContainerTopologyAwareResources.AllocatedResources, 1) + assert.Equal(t, resp.ContainerTopologyAwareResources.AllocatedResources[string(apiconsts.ResourceNetBandwidth)].AggregatedQuantity, bwReq) + assert.Len(t, resp.ContainerTopologyAwareResources.AllocatedResources[string(apiconsts.ResourceNetBandwidth)].TopologyAwareQuantityList, 1) + + expectedTopologyAwareQuantity := pluginapi.TopologyAwareQuantity{ + ResourceValue: bwReq, + Node: uint64(0), + Name: testEth0Name, + Type: string(apinode.TopologyTypeNIC), + TopologyLevel: pluginapi.TopologyLevel_SOCKET, + Annotations: map[string]string{ + // testEth0NSName is empty, so remove the prefix + apiconsts.ResourceAnnotationKeyResourceIdentifier: testEth0Name, + }, + } + assert.Equal(t, *resp.ContainerTopologyAwareResources.AllocatedResources[string(apiconsts.ResourceNetBandwidth)].TopologyAwareQuantityList[0], expectedTopologyAwareQuantity) } func TestGetTopologyAwareAllocatableResources(t *testing.T) { @@ -850,8 +1235,63 @@ func TestGetTopologyAwareAllocatableResources(t *testing.T) { policy := makeStaticPolicy(t) assert.NotNil(t, policy) - _, err := policy.GetTopologyAwareAllocatableResources(context.TODO(), &pluginapi.GetTopologyAwareAllocatableResourcesRequest{}) + resp, err := policy.GetTopologyAwareAllocatableResources(context.TODO(), &pluginapi.GetTopologyAwareAllocatableResourcesRequest{}) + assert.NotNil(t, resp) assert.NoError(t, err) + assert.Equal(t, resp.AllocatableResources[string(apiconsts.ResourceNetBandwidth)].AggregatedAllocatableQuantity, float64(38500)) + assert.Equal(t, resp.AllocatableResources[string(apiconsts.ResourceNetBandwidth)].AggregatedCapacityQuantity, float64(42500)) + assert.Len(t, resp.AllocatableResources[string(apiconsts.ResourceNetBandwidth)].TopologyAwareAllocatableQuantityList, 2) + assert.Len(t, resp.AllocatableResources[string(apiconsts.ResourceNetBandwidth)].TopologyAwareCapacityQuantityList, 2) + + expectedTopologyAwareAllocatableQuantityList := []*pluginapi.TopologyAwareQuantity{ + { + ResourceValue: float64(17250), + Node: uint64(0), + Name: testEth0Name, + Type: string(apinode.TopologyTypeNIC), + TopologyLevel: pluginapi.TopologyLevel_SOCKET, + Annotations: map[string]string{ + // testEth0NSName is empty, so remove the prefix + apiconsts.ResourceAnnotationKeyResourceIdentifier: testEth0Name, + }, + }, + { + ResourceValue: float64(21250), + Node: uint64(1), + Name: testEth2Name, + Type: string(apinode.TopologyTypeNIC), + TopologyLevel: pluginapi.TopologyLevel_SOCKET, + Annotations: map[string]string{ + apiconsts.ResourceAnnotationKeyResourceIdentifier: fmt.Sprintf("%s-%s", testEth2NSName, testEth2Name), + }, + }, + } + assert.Equal(t, resp.AllocatableResources[string(apiconsts.ResourceNetBandwidth)].TopologyAwareAllocatableQuantityList, expectedTopologyAwareAllocatableQuantityList) + + expectedTopologyAwareCapacityQuantityList := []*pluginapi.TopologyAwareQuantity{ + { + ResourceValue: float64(21250), + Node: uint64(0), + Name: testEth0Name, + Type: string(apinode.TopologyTypeNIC), + TopologyLevel: pluginapi.TopologyLevel_SOCKET, + Annotations: map[string]string{ + // testEth0NSName is empty, so remove the prefix + apiconsts.ResourceAnnotationKeyResourceIdentifier: testEth0Name, + }, + }, + { + ResourceValue: float64(21250), + Node: uint64(1), + Name: testEth2Name, + Type: string(apinode.TopologyTypeNIC), + TopologyLevel: pluginapi.TopologyLevel_SOCKET, + Annotations: map[string]string{ + apiconsts.ResourceAnnotationKeyResourceIdentifier: fmt.Sprintf("%s-%s", testEth2NSName, testEth2Name), + }, + }, + } + assert.Equal(t, resp.AllocatableResources[string(apiconsts.ResourceNetBandwidth)].TopologyAwareCapacityQuantityList, expectedTopologyAwareCapacityQuantityList) } func TestGetResourcePluginOptions(t *testing.T) { diff --git a/pkg/agent/qrm-plugins/network/staticpolicy/util.go b/pkg/agent/qrm-plugins/network/staticpolicy/util.go index 71e5bc87d..ddaad20f1 100644 --- a/pkg/agent/qrm-plugins/network/staticpolicy/util.go +++ b/pkg/agent/qrm-plugins/network/staticpolicy/util.go @@ -25,17 +25,24 @@ import ( "github.com/kubewharf/katalyst-api/pkg/consts" "github.com/kubewharf/katalyst-core/cmd/katalyst-agent/app/agent" + "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/network/state" "github.com/kubewharf/katalyst-core/pkg/util/general" "github.com/kubewharf/katalyst-core/pkg/util/machine" ) -type NICFilter func(nics []machine.InterfaceInfo, req *pluginapi.ResourceRequest, agentCtx *agent.GenericContext) []machine.InterfaceInfo +type ReservationPolicy string +type NICSelectionPoligy string -var nicFilters = []NICFilter{ - filterNICsByAvailability, - filterNICsByNamespaceType, - filterNICsByHint, -} +const ( + FirstNIC ReservationPolicy = "first" + EvenDistribution ReservationPolicy = "even" + + RandomOne NICSelectionPoligy = "random" + FirstOne NICSelectionPoligy = "first" + LastOne NICSelectionPoligy = "last" +) + +type NICFilter func(nics []machine.InterfaceInfo, req *pluginapi.ResourceRequest, agentCtx *agent.GenericContext) []machine.InterfaceInfo // isReqAffinityRestricted returns true if allocated network interface must have affinity with allocated numa func isReqAffinityRestricted(reqAnnotations map[string]string) bool { @@ -91,7 +98,7 @@ func checkNICPreferenceOfReq(nic machine.InterfaceInfo, reqAnnotations map[strin } // filterAvailableNICsByReq walks through nicFilters to select the targeted network interfaces -func filterAvailableNICsByReq(nics []machine.InterfaceInfo, req *pluginapi.ResourceRequest, agentCtx *agent.GenericContext) ([]machine.InterfaceInfo, error) { +func filterAvailableNICsByReq(nics []machine.InterfaceInfo, req *pluginapi.ResourceRequest, agentCtx *agent.GenericContext, nicFilters []NICFilter) ([]machine.InterfaceInfo, error) { if req == nil { return nil, fmt.Errorf("filterAvailableNICsByReq got nil req") } else if agentCtx == nil { @@ -105,7 +112,7 @@ func filterAvailableNICsByReq(nics []machine.InterfaceInfo, req *pluginapi.Resou return filteredNICs, nil } -func filterNICsByAvailability(nics []machine.InterfaceInfo, _ *pluginapi.ResourceRequest, _ *agent.GenericContext) []machine.InterfaceInfo { +func filterNICsByAvailability(nics []machine.InterfaceInfo, req *pluginapi.ResourceRequest, _ *agent.GenericContext) []machine.InterfaceInfo { filteredNICs := make([]machine.InterfaceInfo, 0, len(nics)) for _, nic := range nics { if !nic.Enable { @@ -119,6 +126,13 @@ func filterNICsByAvailability(nics []machine.InterfaceInfo, _ *pluginapi.Resourc filteredNICs = append(filteredNICs, nic) } + if len(filteredNICs) == 0 { + general.InfoS("nic list returned by filterNICsByAvailability is empty", + "podNamespace", req.PodNamespace, + "podName", req.PodName, + "containerName", req.ContainerName) + } + return filteredNICs } @@ -149,6 +163,13 @@ func filterNICsByNamespaceType(nics []machine.InterfaceInfo, req *pluginapi.Reso } } + if len(filteredNICs) == 0 { + general.InfoS("nic list returned by filterNICsByNamespaceType is empty", + "podNamespace", req.PodNamespace, + "podName", req.PodName, + "containerName", req.ContainerName) + } + return filteredNICs } @@ -179,7 +200,6 @@ func filterNICsByHint(nics []machine.InterfaceInfo, req *pluginapi.ResourceReque } if siblingNUMAs.Equals(hintNUMASet) { - // TODO: if multi-nics meets the hint, we need to choose best one according to left bandwidth or other properties if exactlyMatchNIC == nil { general.InfoS("add hint exactly matched nic", "podNamespace", req.PodNamespace, @@ -214,10 +234,31 @@ func getRandomNICs(nics []machine.InterfaceInfo) machine.InterfaceInfo { return nics[rand.Intn(len(nics))] } +func selectOneNIC(nics []machine.InterfaceInfo, policy NICSelectionPoligy) machine.InterfaceInfo { + if len(nics) == 0 { + general.Errorf("no NIC to select") + return machine.InterfaceInfo{} + } + + switch policy { + case RandomOne: + return getRandomNICs(nics) + case FirstOne: + // since we only pass filtered nics, always picking the first or the last one actually indicates a kind of binpacking + return nics[0] + case LastOne: + return nics[len(nics)-1] + } + + // use LastOne as default + return nics[len(nics)-1] +} + // packAllocationResponse fills pluginapi.ResourceAllocationResponse with information from AllocationInfo and pluginapi.ResourceRequest -func packAllocationResponse(req *pluginapi.ResourceRequest, resourceName string, - allocatedQuantity float64, resourceAllocationAnnotations map[string]string) (*pluginapi.ResourceAllocationResponse, error) { - if req == nil { +func packAllocationResponse(req *pluginapi.ResourceRequest, allocationInfo *state.AllocationInfo, respHint *pluginapi.TopologyHint, resourceAllocationAnnotations map[string]string) (*pluginapi.ResourceAllocationResponse, error) { + if allocationInfo == nil { + return nil, fmt.Errorf("packAllocationResponse got nil allocationInfo") + } else if req == nil { return nil, fmt.Errorf("packAllocationResponse got nil request") } @@ -230,17 +271,18 @@ func packAllocationResponse(req *pluginapi.ResourceRequest, resourceName string, ContainerIndex: req.ContainerIndex, PodRole: req.PodRole, PodType: req.PodType, - ResourceName: resourceName, + ResourceName: req.ResourceName, AllocationResult: &pluginapi.ResourceAllocation{ ResourceAllocation: map[string]*pluginapi.ResourceAllocationInfo{ - resourceName: { - IsNodeResource: false, + string(consts.ResourceNetBandwidth): { + IsNodeResource: true, IsScalarResource: true, // to avoid re-allocating - AllocatedQuantity: allocatedQuantity, + AllocatedQuantity: float64(allocationInfo.Egress), + AllocationResult: allocationInfo.NumaNodes.String(), Annotations: resourceAllocationAnnotations, ResourceHints: &pluginapi.ListOfTopologyHints{ Hints: []*pluginapi.TopologyHint{ - req.Hint, + respHint, }, }, }, @@ -250,3 +292,38 @@ func packAllocationResponse(req *pluginapi.ResourceRequest, resourceName string, Annotations: general.DeepCopyMap(req.Annotations), }, nil } + +// getReservedBandwidth is used to spread total reserved bandwidth into per-nic level. +func getReservedBandwidth(nics []machine.InterfaceInfo, reservation uint32, policy ReservationPolicy) (map[string]uint32, error) { + nicCount := len(nics) + + if nicCount == 0 { + return nil, fmt.Errorf("getReservedBandwidth got invalid NICs") + } + + general.Infof("reservedBanwidth: %d, nicCount: %d, policy: %s, ", + reservation, nicCount, policy) + + reservedBandwidth := make(map[string]uint32) + + switch policy { + case FirstNIC: + reservedBandwidth[nics[0].Iface] = reservation + case EvenDistribution: + for _, iface := range nics { + reservedBandwidth[iface.Iface] = reservation / uint32(nicCount) + } + default: + return nil, fmt.Errorf("unsupported network bandwidth reservation policy: %s", policy) + } + + return reservedBandwidth, nil +} + +func getResourceIdentifier(ifaceNS, ifaceName string) string { + if len(ifaceNS) > 0 { + return fmt.Sprintf("%s-%s", ifaceNS, ifaceName) + } + + return ifaceName +} diff --git a/pkg/agent/qrm-plugins/util/util.go b/pkg/agent/qrm-plugins/util/util.go index a41e61f4f..f8280f82c 100644 --- a/pkg/agent/qrm-plugins/util/util.go +++ b/pkg/agent/qrm-plugins/util/util.go @@ -51,7 +51,7 @@ func GetQuantityFromResourceReq(req *pluginapi.ResourceRequest) (int, error) { return general.Max(int(math.Ceil(req.ResourceRequests[key])), 0), nil case string(consts.ReclaimedResourceMilliCPU): return general.Max(int(math.Ceil(req.ResourceRequests[key]/1000.0)), 0), nil - case string(v1.ResourceMemory), string(consts.ReclaimedResourceMemory): + case string(v1.ResourceMemory), string(consts.ReclaimedResourceMemory), string(consts.ResourceNetBandwidth): return general.Max(int(math.Ceil(req.ResourceRequests[key])), 0), nil default: return 0, fmt.Errorf("invalid request resource name: %s", key) diff --git a/pkg/config/agent/qrm/network_plugin.go b/pkg/config/agent/qrm/network_plugin.go index b50dc1136..3a748195e 100644 --- a/pkg/config/agent/qrm/network_plugin.go +++ b/pkg/config/agent/qrm/network_plugin.go @@ -19,8 +19,18 @@ package qrm // NetworkQRMPluginConfig is the config of network QRM plugin type NetworkQRMPluginConfig struct { // PolicyName is used to switch between several strategies - PolicyName string - NetClass NetClassConfig + PolicyName string + NetClass NetClassConfig + // Reserved network bandwidth in unit of Mbps for business-critical jobs (e.g. online services). + // In phase 1, we only support the reservation for business-critical jobs. The system component reservation might be added later. + // Also, we do not differentiate the egress and ingress reservation for now. That is, the reserved bandwidth on egress and ingress is supposed to be same + ReservedBandwidth uint32 + // The ratio of available capacity to NIC line speed. For example, a 25Gbps NIC's max bandwidth is around 23.5Gbps. + // Please note, the ingress rate throttling may need additional virtual device like ifb, which results in lower capacity than egress + EgressCapacityRate float32 + IngressCapacityRate float32 + // skip network state corruption and it will be used after updating state properties + SkipNetworkStateCorruption bool PodLevelNetClassAnnoKey string PodLevelNetAttributesAnnoKeys string IPv4ResourceAllocationAnnotationKey string diff --git a/pkg/util/general/common.go b/pkg/util/general/common.go index a5e5a5022..9aa928c30 100644 --- a/pkg/util/general/common.go +++ b/pkg/util/general/common.go @@ -90,6 +90,14 @@ func MinUInt64(a, b uint64) uint64 { } } +func MinUInt32(a, b uint32) uint32 { + if a <= b { + return a + } else { + return b + } +} + // IsNameEnabled check if a specified name enabled or not. func IsNameEnabled(name string, disabledByDefault sets.String, enableNames []string) bool { hasStar := false