Skip to content

Commit

Permalink
refactor(reporter): report device allocations through addContainerDev…
Browse files Browse the repository at this point in the history
…ices

Signed-off-by: caohe <[email protected]>
  • Loading branch information
caohe authored and waynepeking348 committed Nov 20, 2023
1 parent c296bfd commit b6e790d
Show file tree
Hide file tree
Showing 6 changed files with 43 additions and 102 deletions.
9 changes: 5 additions & 4 deletions cmd/katalyst-agent/app/options/reporter/kubelet_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type KubeletPluginOptions struct {
PodResourcesServerEndpoints []string
KubeletResourcePluginPaths []string
EnableReportTopologyPolicy bool
ResourceNameToZoneNameMap map[string]string
ResourceNameToZoneTypeMap map[string]string
}

func NewKubeletPluginOptions() *KubeletPluginOptions {
Expand All @@ -39,7 +39,7 @@ func NewKubeletPluginOptions() *KubeletPluginOptions {
pluginapi.ResourcePluginPath,
},
EnableReportTopologyPolicy: false,
ResourceNameToZoneNameMap: make(map[string]string),
ResourceNameToZoneTypeMap: make(map[string]string),
}
}

Expand All @@ -52,14 +52,15 @@ func (o *KubeletPluginOptions) AddFlags(fss *cliflag.NamedFlagSets) {
"the path of kubelet resource plugin")
fs.BoolVar(&o.EnableReportTopologyPolicy, "enable-report-topology-policy", o.EnableReportTopologyPolicy,
"whether to report topology policy")
fs.Var(cliflag.NewMapStringString(&o.ResourceNameToZoneNameMap), "resource-name-to-zone-name-map", "a map that stores the mapping relationship between resource names to zone names in KCNR (e.g. nvidia.com/gpu=GPU,...)")
fs.StringToStringVar(&o.ResourceNameToZoneTypeMap, "resource-name-to-zone-type-map", o.ResourceNameToZoneTypeMap,
"a map that stores the mapping relationship between resource names to zone types in KCNR (e.g. nvidia.com/gpu=GPU,...)")
}

func (o *KubeletPluginOptions) ApplyTo(c *reporter.KubeletPluginConfiguration) error {
c.PodResourcesServerEndpoints = o.PodResourcesServerEndpoints
c.KubeletResourcePluginPaths = o.KubeletResourcePluginPaths
c.EnableReportTopologyPolicy = o.EnableReportTopologyPolicy
c.ResourceNameToZoneNameMap = o.ResourceNameToZoneNameMap
c.ResourceNameToZoneTypeMap = o.ResourceNameToZoneTypeMap

return nil
}
2 changes: 1 addition & 1 deletion pkg/agent/resourcemanager/fetcher/kubelet/kubeletplugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func NewKubeletReporterPlugin(emitter metrics.MetricEmitter, metaServer *metaser
}

topologyStatusAdapter, err := topology.NewPodResourcesServerTopologyAdapter(metaServer,
conf.PodResourcesServerEndpoints, conf.KubeletResourcePluginPaths, conf.ResourceNameToZoneNameMap,
conf.PodResourcesServerEndpoints, conf.KubeletResourcePluginPaths, conf.ResourceNameToZoneTypeMap,
nil, p.getNumaInfo, nil, podresources.GetV1Client)
if err != nil {
return nil, err
Expand Down
110 changes: 17 additions & 93 deletions pkg/agent/resourcemanager/fetcher/kubelet/topology/topology_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,13 +84,13 @@ type topologyAdapterImpl struct {
// kubeletResourcePluginPaths is the path of kubelet resource plugin
kubeletResourcePluginPaths []string

// resourceNameToZoneNameMap is a map that stores the mapping relationship between resource names to zone names
resourceNameToZoneNameMap map[string]string
// resourceNameToZoneTypeMap is a map that stores the mapping relationship between resource names to zone types for device zones
resourceNameToZoneTypeMap map[string]string
}

// NewPodResourcesServerTopologyAdapter creates a topology adapter which uses pod resources server
func NewPodResourcesServerTopologyAdapter(metaServer *metaserver.MetaServer, endpoints []string,
kubeletResourcePluginPaths []string, resourceNameToZoneNameMap map[string]string, skipDeviceNames sets.String,
kubeletResourcePluginPaths []string, resourceNameToZoneTypeMap map[string]string, skipDeviceNames sets.String,
numaInfoGetter NumaInfoGetter, podResourcesFilter PodResourcesFilter, getClientFunc podresources.GetClientFunc) (Adapter, error) {
numaInfo, err := numaInfoGetter()
if err != nil {
Expand All @@ -115,7 +115,7 @@ func NewPodResourcesServerTopologyAdapter(metaServer *metaserver.MetaServer, end
skipDeviceNames: skipDeviceNames,
getClientFunc: getClientFunc,
podResourcesFilter: podResourcesFilter,
resourceNameToZoneNameMap: resourceNameToZoneNameMap,
resourceNameToZoneTypeMap: resourceNameToZoneTypeMap,
}, nil
}

Expand Down Expand Up @@ -191,12 +191,6 @@ func (p *topologyAdapterImpl) GetTopologyZones(parentCtx context.Context) ([]*no
return nil, errors.Wrap(err, "get device zone topology failed")
}

deviceZoneAllocations, err := p.getDeviceZoneAllocations(podList, podResourcesList)
if err != nil {
return nil, errors.Wrap(err, "get device zone allocations failed")
}
mergeZoneAllocations(zoneAllocations, deviceZoneAllocations)

return topologyZoneGenerator.GenerateTopologyZoneStatus(zoneAllocations, zoneResources, zoneAttributes), nil
}

Expand Down Expand Up @@ -333,19 +327,15 @@ func (p *topologyAdapterImpl) addDeviceZoneNodes(generator *util.TopologyZoneGen
return fmt.Errorf("allocatable Resources is nil")
}
var errList []error
for targetResourceName, targetZoneName := range p.resourceNameToZoneNameMap {
for _, device := range allocatableResources.Devices {
if targetResourceName == device.ResourceName {
for _, deviceId := range device.DeviceIds {
deviceNode := util.GenerateDeviceZoneNode(deviceId, targetZoneName)
if len(device.Topology.Nodes) == 0 {
continue
}
numaZoneNode := util.GenerateNumaZoneNode(int(device.Topology.Nodes[0].ID))
for _, device := range allocatableResources.Devices {
if targetZoneType, ok := p.resourceNameToZoneTypeMap[device.ResourceName]; ok {
for _, deviceId := range device.DeviceIds {
deviceNode := util.GenerateDeviceZoneNode(deviceId, targetZoneType)
for _, numaNode := range device.Topology.Nodes {
numaZoneNode := util.GenerateNumaZoneNode(int(numaNode.ID))
err := generator.AddNode(&numaZoneNode, deviceNode)
if err != nil {
errList = append(errList, err)
continue
}
}
}
Expand Down Expand Up @@ -489,73 +479,6 @@ func (p *topologyAdapterImpl) getZoneAllocations(podList []*v1.Pod, podResources
return zoneAllocationsMap, nil
}

func (p *topologyAdapterImpl) getDeviceZoneAllocations(podList []*v1.Pod, podResourcesList []*podresv1.PodResources) (map[util.ZoneNode]util.ZoneAllocations, error) {
var (
err error
errList []error
)

podMap := native.GetPodNamespaceNameKeyMap(podList)
zoneAllocationsMap := make(map[util.ZoneNode]util.ZoneAllocations)
for _, podResources := range podResourcesList {
if podResources == nil {
continue
}

podKey := native.GenerateNamespaceNameKey(podResources.Namespace, podResources.Name)
pod, ok := podMap[podKey]
if !ok {
errList = append(errList, fmt.Errorf("pod %s not found in metaserver", podKey))
continue
}

if native.PodIsTerminated(pod) {
continue
}

// the pod resource filter will filter out unwanted pods
if p.podResourcesFilter != nil {
podResources, err = p.podResourcesFilter(pod, podResources)
if err != nil {
errList = append(errList, err)
continue
}

// if podResources is nil, it means that the pod is filtered out
if podResources == nil {
continue
}
}

for _, c := range podResources.Containers {
for _, device := range c.Devices {
for targetResourceName, targetZoneName := range p.resourceNameToZoneNameMap {
if device.ResourceName == targetResourceName {
for _, deviceId := range device.DeviceIds {
deviceNode := util.GenerateDeviceZoneNode(deviceId, targetZoneName)
if _, ok := zoneAllocationsMap[deviceNode]; !ok {
zoneAllocationsMap[deviceNode] = []*nodev1alpha1.Allocation{
{Consumer: native.GenerateUniqObjectUIDKey(pod),
Requests: &v1.ResourceList{
v1.ResourceName(device.ResourceName): resource.MustParse("1"),
},
},
}
}
}
}
}
}
}
}

if len(errList) > 0 {
return nil, utilerrors.NewAggregate(errList)
}

return zoneAllocationsMap, nil
}

// getZoneAttributes gets a map of zone node to zone attributes, which is generated from the annotation of
// topology aware quantity and socket and numa zone are not support attribute here
func (p *topologyAdapterImpl) getZoneAttributes(allocatableResources *podresv1.AllocatableResourcesResponse) (map[util.ZoneNode]util.ZoneAttributes, error) {
Expand Down Expand Up @@ -679,6 +602,13 @@ func (p *topologyAdapterImpl) addContainerDevices(zoneResources map[util.ZoneNod

zoneNode := util.GenerateNumaZoneNode(int(node.ID))
zoneResources = addZoneQuantity(zoneResources, zoneNode, resourceName, oneQuantity)

if zoneType, ok := p.resourceNameToZoneTypeMap[device.ResourceName]; ok {
for _, deviceId := range device.DeviceIds {
deviceNode := util.GenerateDeviceZoneNode(deviceId, zoneType)
zoneResources = addZoneQuantity(zoneResources, deviceNode, resourceName, oneQuantity)
}
}
}
}

Expand Down Expand Up @@ -866,9 +796,3 @@ func filterAllocatedPodResourcesList(podResourcesList []*podresv1.PodResources)

return allocatedPodResourcesList
}

func mergeZoneAllocations(zone1, zone2 map[util.ZoneNode]util.ZoneAllocations) {
for zoneNode, allocations := range zone2 {
zone1[zoneNode] = allocations
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -905,6 +905,14 @@ func Test_podResourcesServerTopologyAdapterImpl_GetTopologyZones_ReportRDMATopol
{
Type: nodev1alpha1.TopologyTypeNIC,
Name: "eth0",
Resources: nodev1alpha1.Resources{
Capacity: &v1.ResourceList{
"resource.katalyst.kubewharf.io/rdma": resource.MustParse("1"),
},
Allocatable: &v1.ResourceList{
"resource.katalyst.kubewharf.io/rdma": resource.MustParse("1"),
},
},
Allocations: []*nodev1alpha1.Allocation{
{
Consumer: "default/pod-2/pod-2-uid",
Expand Down Expand Up @@ -941,6 +949,14 @@ func Test_podResourcesServerTopologyAdapterImpl_GetTopologyZones_ReportRDMATopol
{
Type: nodev1alpha1.TopologyTypeNIC,
Name: "eth1",
Resources: nodev1alpha1.Resources{
Capacity: &v1.ResourceList{
"resource.katalyst.kubewharf.io/rdma": resource.MustParse("1"),
},
Allocatable: &v1.ResourceList{
"resource.katalyst.kubewharf.io/rdma": resource.MustParse("1"),
},
},
},
},
},
Expand All @@ -962,7 +978,7 @@ func Test_podResourcesServerTopologyAdapterImpl_GetTopologyZones_ReportRDMATopol
},
},
numaSocketZoneNodeMap: tt.fields.numaSocketZoneNodeMap,
resourceNameToZoneNameMap: map[string]string{
resourceNameToZoneTypeMap: map[string]string{
"resource.katalyst.kubewharf.io/rdma": "NIC",
},
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/config/agent/reporter/kubelet_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ type KubeletPluginConfiguration struct {
PodResourcesServerEndpoints []string
KubeletResourcePluginPaths []string
EnableReportTopologyPolicy bool
ResourceNameToZoneNameMap map[string]string
ResourceNameToZoneTypeMap map[string]string
}

func NewKubeletPluginConfiguration() *KubeletPluginConfiguration {
Expand Down
4 changes: 2 additions & 2 deletions pkg/util/cnr.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,10 +375,10 @@ func GenerateSocketZoneNode(socketID int) ZoneNode {
}

// GenerateDeviceZoneNode generates device zone node through device id, which must be unique
func GenerateDeviceZoneNode(deviceId, zoneName string) ZoneNode {
func GenerateDeviceZoneNode(deviceId, zoneType string) ZoneNode {
return ZoneNode{
Meta: ZoneMeta{
Type: nodev1alpha1.TopologyType(zoneName),
Type: nodev1alpha1.TopologyType(zoneType),
Name: deviceId,
},
}
Expand Down

0 comments on commit b6e790d

Please sign in to comment.