Skip to content

Commit

Permalink
scheduler: support for device - aware numa scheduling (#1760) (#23837)
Browse files Browse the repository at this point in the history
(CE backport of ENT 59433d56c7215c0b8bf33764f41b57d9bd30160f (without ent files))

* scheduler: enhance numa aware scheduling with support for devices

* cr: add comments
  • Loading branch information
shoenig authored Aug 20, 2024
1 parent 5fcec1f commit 8b093a6
Show file tree
Hide file tree
Showing 22 changed files with 982 additions and 154 deletions.
9 changes: 9 additions & 0 deletions api/resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package api

import (
"slices"
"strconv"
)

Expand Down Expand Up @@ -115,6 +116,10 @@ func (r *Resources) Merge(other *Resources) {
type NUMAResource struct {
// Affinity must be one of "none", "prefer", "require".
Affinity string `hcl:"affinity,optional"`

// Devices is the subset of devices requested by the task that must share
// the same numa node, along with the tasks reserved cpu cores.
Devices []string `hcl:"devices,optional"`
}

func (n *NUMAResource) Copy() *NUMAResource {
Expand All @@ -123,6 +128,7 @@ func (n *NUMAResource) Copy() *NUMAResource {
}
return &NUMAResource{
Affinity: n.Affinity,
Devices: slices.Clone(n.Devices),
}
}

Expand All @@ -133,6 +139,9 @@ func (n *NUMAResource) Canonicalize() {
if n.Affinity == "" {
n.Affinity = "none"
}
if len(n.Devices) == 0 {
n.Devices = nil
}
}

type Port struct {
Expand Down
7 changes: 7 additions & 0 deletions api/resources_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,11 @@ func TestNUMAResource_Copy(t *testing.T) {
r1 := &NUMAResource{Affinity: "none"}
r2 := r1.Copy()
r1.Affinity = "require"
r1.Devices = []string{"nvidia/gpu"}
must.Eq(t, "require", r1.Affinity)
must.Eq(t, "none", r2.Affinity)
must.Eq(t, []string{"nvidia/gpu"}, r1.Devices)
must.SliceEmpty(t, r2.Devices)
}

func TestNUMAResource_Canonicalize(t *testing.T) {
Expand All @@ -108,4 +111,8 @@ func TestNUMAResource_Canonicalize(t *testing.T) {
var n2 = &NUMAResource{Affinity: ""}
n2.Canonicalize()
must.Eq(t, &NUMAResource{Affinity: "none"}, n2)

var n3 = &NUMAResource{Affinity: "require", Devices: []string{}}
n3.Canonicalize()
must.Eq(t, &NUMAResource{Affinity: "require", Devices: nil}, n3)
}
7 changes: 7 additions & 0 deletions client/lib/idset/idset.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,11 +106,18 @@ func From[T, U ID](slice []U) *Set[T] {
return result
}

// Difference returns the set of elements in s but not in other.
func (s *Set[T]) Difference(other *Set[T]) *Set[T] {
diff := s.items.Difference(other.items)
return &Set[T]{items: diff.(*set.Set[T])}
}

// Intersect returns the set of elements that are in both s and other.
func (s *Set[T]) Intersect(other *Set[T]) *Set[T] {
intersection := s.items.Intersect(other.items)
return &Set[T]{items: intersection.(*set.Set[T])}
}

// Contains returns whether the Set contains item.
func (s *Set[T]) Contains(item T) bool {
return s.items.Contains(item)
Expand Down
24 changes: 22 additions & 2 deletions client/lib/numalib/detect_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ package numalib

import (
"fmt"
"io/fs"
"os"
"path/filepath"
"strconv"
"strings"

Expand Down Expand Up @@ -37,6 +39,7 @@ const (
cpuBaseFile = sysRoot + "/cpu/cpu%d/cpufreq/base_frequency"
cpuSocketFile = sysRoot + "/cpu/cpu%d/topology/physical_package_id"
cpuSiblingFile = sysRoot + "/cpu/cpu%d/topology/thread_siblings_list"
deviceFiles = "/sys/bus/pci/devices"
)

// pathReaderFn is a path reader function, injected into all value getters to
Expand All @@ -58,12 +61,29 @@ func (s *Sysfs) ScanSystem(top *Topology) {

// detect core performance data
s.discoverCores(top, os.ReadFile)

// detect pci device bus associativity
s.discoverPCI(top, os.ReadFile)
}

func (*Sysfs) available() bool {
return true
}

func (*Sysfs) discoverPCI(st *Topology, readerFunc pathReaderFn) {
st.BusAssociativity = make(map[string]hw.NodeID)

filepath.WalkDir(deviceFiles, func(path string, de fs.DirEntry, err error) error {
device := filepath.Base(path)
numaFile := filepath.Join(path, "numa_node")
node, err := getNumeric[int](numaFile, 64, readerFunc)
if err == nil && node >= 0 {
st.BusAssociativity[device] = hw.NodeID(node)
}
return nil
})
}

func (*Sysfs) discoverOnline(st *Topology, readerFunc pathReaderFn) {
ids, err := getIDSet[hw.NodeID](nodeOnline, readerFunc)
if err == nil {
Expand Down Expand Up @@ -156,13 +176,13 @@ func getIDSet[T idset.ID](path string, readerFunc pathReaderFn, args ...any) (*i
return idset.Parse[T](string(s)), nil
}

func getNumeric[T int | idset.ID](path string, maxSize int, readerFunc pathReaderFn, args ...any) (T, error) {
func getNumeric[T int | idset.ID](path string, bitSize int, readerFunc pathReaderFn, args ...any) (T, error) {
path = fmt.Sprintf(path, args...)
s, err := readerFunc(path)
if err != nil {
return 0, err
}
i, err := strconv.ParseUint(strings.TrimSpace(string(s)), 10, maxSize)
i, err := strconv.ParseInt(strings.TrimSpace(string(s)), 10, bitSize)
if err != nil {
return 0, err
}
Expand Down
6 changes: 3 additions & 3 deletions client/lib/numalib/detect_linux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func goodSysData(path string) ([]byte, error) {
}

func TestSysfs_discoverOnline(t *testing.T) {
st := NewTopology(&idset.Set[hw.NodeID]{}, SLIT{}, []Core{})
st := MockTopology(&idset.Set[hw.NodeID]{}, SLIT{}, []Core{})
goodIDSet := idset.From[hw.NodeID]([]uint8{0, 1})
oneNode := idset.From[hw.NodeID]([]uint8{0})

Expand All @@ -91,7 +91,7 @@ func TestSysfs_discoverOnline(t *testing.T) {
}

func TestSysfs_discoverCosts(t *testing.T) {
st := NewTopology(idset.Empty[hw.NodeID](), SLIT{}, []Core{})
st := MockTopology(idset.Empty[hw.NodeID](), SLIT{}, []Core{})
twoNodes := idset.From[hw.NodeID]([]uint8{1, 3})

tests := []struct {
Expand Down Expand Up @@ -121,7 +121,7 @@ func TestSysfs_discoverCosts(t *testing.T) {
}

func TestSysfs_discoverCores(t *testing.T) {
st := NewTopology(idset.Empty[hw.NodeID](), SLIT{}, []Core{})
st := MockTopology(idset.Empty[hw.NodeID](), SLIT{}, []Core{})
oneNode := idset.From[hw.NodeID]([]uint8{0})
twoNodes := idset.From[hw.NodeID]([]uint8{1, 3})

Expand Down
19 changes: 19 additions & 0 deletions client/lib/numalib/testing.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1

package numalib

import (
"github.com/hashicorp/nomad/client/lib/idset"
"github.com/hashicorp/nomad/client/lib/numalib/hw"
)

// MockTopology is a constructor for the Topology object, only used in tests for
// mocking.
func MockTopology(nodeIDs *idset.Set[hw.NodeID], distances SLIT, cores []Core) *Topology {
t := &Topology{
nodeIDs: nodeIDs,
Distances: distances, Cores: cores}
t.SetNodes(nodeIDs)
return t
}
29 changes: 19 additions & 10 deletions client/lib/numalib/topology.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,21 +63,30 @@ type Topology struct {
Distances SLIT
Cores []Core

// BusAssociativity maps the specific bus each PCI device is plugged into
// with its hardware associated numa node
//
// e.g. "0000:03:00.0" -> 1
//
// Note that the key may not exactly match the Locality.PciBusID from the
// fingerprint of the device with regard to the domain value.
//
//
// 0000:03:00.0
// ^ ^ ^ ^
// | | | |-- function (identifies functionality of device)
// | | |-- device (identifies the device number on the bus)
// | |
// | |-- bus (identifies which bus segment the device is connected to)
// |
// |-- domain (basically always 0, may be 0000 or 00000000)
BusAssociativity map[string]hw.NodeID

// explicit overrides from client configuration
OverrideTotalCompute hw.MHz
OverrideWitholdCompute hw.MHz
}

// NewTopology is a constructor for the Topology object, only used in tests for
// mocking.
func NewTopology(nodeIDs *idset.Set[hw.NodeID], distances SLIT, cores []Core) *Topology {
t := &Topology{
nodeIDs: nodeIDs,
Distances: distances, Cores: cores}
t.SetNodes(nodeIDs)
return t
}

func (t *Topology) SetNodes(nodes *idset.Set[hw.NodeID]) {
t.nodeIDs = nodes
if !nodes.Empty() {
Expand Down
7 changes: 7 additions & 0 deletions nomad/mock/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ func DrainNode() *structs.Node {
// NvidiaNode returns a node with two instances of an Nvidia GPU
func NvidiaNode() *structs.Node {
n := Node()
n.NodeResources.Processors.Topology = structs.MockWorkstationTopology()
n.NodeResources.Devices = []*structs.NodeDeviceResource{
{
Type: "gpu",
Expand All @@ -145,10 +146,16 @@ func NvidiaNode() *structs.Node {
{
ID: uuid.Generate(),
Healthy: true,
Locality: &structs.NodeDeviceLocality{
PciBusID: "0000:02:00.1", // node 0
},
},
{
ID: uuid.Generate(),
Healthy: true,
Locality: &structs.NodeDeviceLocality{
PciBusID: "0000:02:01.1", // node 0
},
},
},
},
Expand Down
29 changes: 29 additions & 0 deletions nomad/structs/devices.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@

package structs

import "maps"

// DeviceAccounter is used to account for device usage on a node. It can detect
// when a node is oversubscribed and can be used for deciding what devices are
// free
Expand All @@ -22,6 +24,25 @@ type DeviceAccounterInstance struct {
Instances map[string]int
}

// Locality returns the NodeDeviceLocality of the instance of the specific deviceID.
//
// If no instance matching the deviceID is found, nil is returned.
func (dai *DeviceAccounterInstance) GetLocality(instanceID string) *NodeDeviceLocality {
for _, instance := range dai.Device.Instances {
if instance.ID == instanceID {
return instance.Locality.Copy()
}
}
return nil
}

func (dai *DeviceAccounterInstance) Copy() *DeviceAccounterInstance {
return &DeviceAccounterInstance{
Device: dai.Device.Copy(),
Instances: maps.Clone(dai.Instances),
}
}

// NewDeviceAccounter returns a new device accounter. The node is used to
// populate the set of available devices based on what healthy device instances
// exist on the node.
Expand Down Expand Up @@ -58,6 +79,14 @@ func NewDeviceAccounter(n *Node) *DeviceAccounter {
return d
}

func (d *DeviceAccounter) Copy() *DeviceAccounter {
devices := make(map[DeviceIdTuple]*DeviceAccounterInstance, len(d.Devices))
for k, v := range d.Devices {
devices[k] = v.Copy()
}
return &DeviceAccounter{Devices: devices}
}

// AddAllocs takes a set of allocations and internally marks which devices are
// used. If a device is used more than once by the set of passed allocations,
// the collision will be returned as true.
Expand Down
83 changes: 83 additions & 0 deletions nomad/structs/devices_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,3 +262,86 @@ func TestDeviceAccounter_AddReserved_Collision(t *testing.T) {
res.DeviceIDs = []string{nvidiaDev0ID}
require.True(d.AddReserved(res))
}

func TestDeviceAccounterInstance_GetLocality(t *testing.T) {
ci.Parallel(t)

dai := &DeviceAccounterInstance{
Device: &NodeDeviceResource{
Instances: []*NodeDevice{
{
ID: "GPU-001",
Locality: &NodeDeviceLocality{
PciBusID: "0000:01:01.1",
},
},
{
ID: "GPU-002",
Locality: &NodeDeviceLocality{
PciBusID: "0000:01:02.1",
},
},
{
ID: "GPU-003",
Locality: &NodeDeviceLocality{
PciBusID: "0000:01:03.1",
},
},
},
},
}

t.Run("exists", func(t *testing.T) {
locality := dai.GetLocality("GPU-002")
must.Eq(t, "0000:01:02.1", locality.PciBusID)
})

t.Run("missing", func(t *testing.T) {
locality := dai.GetLocality("GPU-004")
must.Nil(t, locality)
})
}

func TestDeviceAccounterInstance_Copy(t *testing.T) {
original := &DeviceAccounterInstance{
Device: &NodeDeviceResource{
Vendor: "nvidia",
Type: "gpu",
Name: "1080ti",
Instances: []*NodeDevice{
{
ID: "GPU-001",
Healthy: true,
HealthDescription: "healthy",
Locality: &NodeDeviceLocality{
PciBusID: "0000:01:01.1",
},
},
{
ID: "GPU-002",
Healthy: true,
HealthDescription: "healthy",
Locality: &NodeDeviceLocality{
PciBusID: "0000:01:02.1",
},
},
},
Attributes: map[string]*psstructs.Attribute{},
},
Instances: map[string]int{
"GPU-001": 1,
"GPU-002": 3,
},
}

clone := original.Copy()
clone.Device.Name = "name2"
clone.Device.Instances[0].ID = "GPU-003"
clone.Device.Instances[0].Locality.PciBusID = "0000:01:03.1"
clone.Instances["GPU-001"] = 2

must.Eq(t, "1080ti", original.Device.Name)
must.Eq(t, "GPU-001", original.Device.Instances[0].ID)
must.Eq(t, "0000:01:01.1", original.Device.Instances[0].Locality.PciBusID)
must.Eq(t, 1, original.Instances["GPU-001"])
}
Loading

0 comments on commit 8b093a6

Please sign in to comment.