From 8b093a6a5d35d44a9f58f72fb4acfaf0cd4a3856 Mon Sep 17 00:00:00 2001 From: Seth Hoenig Date: Tue, 20 Aug 2024 07:53:04 -0500 Subject: [PATCH] scheduler: support for device - aware numa scheduling (#1760) (#23837) (CE backport of ENT 59433d56c7215c0b8bf33764f41b57d9bd30160f (without ent files)) * scheduler: enhance numa aware scheduling with support for devices * cr: add comments --- api/resources.go | 9 + api/resources_test.go | 7 + client/lib/idset/idset.go | 7 + client/lib/numalib/detect_linux.go | 24 +- client/lib/numalib/detect_linux_test.go | 6 +- client/lib/numalib/testing.go | 19 ++ client/lib/numalib/topology.go | 29 ++- nomad/mock/node.go | 7 + nomad/structs/devices.go | 29 +++ nomad/structs/devices_test.go | 83 +++++++ nomad/structs/numa.go | 35 ++- nomad/structs/numa_test.go | 7 +- nomad/structs/structs.go | 25 +- nomad/structs/structs_test.go | 45 +++- nomad/structs/testing.go | 24 +- scheduler/device.go | 86 ++++++- scheduler/device_test.go | 190 ++++++++++++++- scheduler/numa_ce.go | 14 +- scheduler/preemption.go | 28 ++- scheduler/preemption_test.go | 136 +++++++---- scheduler/rank.go | 295 +++++++++++++++++++----- scheduler/rank_test.go | 31 +-- 22 files changed, 982 insertions(+), 154 deletions(-) create mode 100644 client/lib/numalib/testing.go diff --git a/api/resources.go b/api/resources.go index 612a748f24f5..856b0d367f63 100644 --- a/api/resources.go +++ b/api/resources.go @@ -4,6 +4,7 @@ package api import ( + "slices" "strconv" ) @@ -115,6 +116,10 @@ func (r *Resources) Merge(other *Resources) { type NUMAResource struct { // Affinity must be one of "none", "prefer", "require". Affinity string `hcl:"affinity,optional"` + + // Devices is the subset of devices requested by the task that must share + // the same numa node, along with the tasks reserved cpu cores. + Devices []string `hcl:"devices,optional"` } func (n *NUMAResource) Copy() *NUMAResource { @@ -123,6 +128,7 @@ func (n *NUMAResource) Copy() *NUMAResource { } return &NUMAResource{ Affinity: n.Affinity, + Devices: slices.Clone(n.Devices), } } @@ -133,6 +139,9 @@ func (n *NUMAResource) Canonicalize() { if n.Affinity == "" { n.Affinity = "none" } + if len(n.Devices) == 0 { + n.Devices = nil + } } type Port struct { diff --git a/api/resources_test.go b/api/resources_test.go index cbc55e47b598..f8c1a6c50df9 100644 --- a/api/resources_test.go +++ b/api/resources_test.go @@ -94,8 +94,11 @@ func TestNUMAResource_Copy(t *testing.T) { r1 := &NUMAResource{Affinity: "none"} r2 := r1.Copy() r1.Affinity = "require" + r1.Devices = []string{"nvidia/gpu"} must.Eq(t, "require", r1.Affinity) must.Eq(t, "none", r2.Affinity) + must.Eq(t, []string{"nvidia/gpu"}, r1.Devices) + must.SliceEmpty(t, r2.Devices) } func TestNUMAResource_Canonicalize(t *testing.T) { @@ -108,4 +111,8 @@ func TestNUMAResource_Canonicalize(t *testing.T) { var n2 = &NUMAResource{Affinity: ""} n2.Canonicalize() must.Eq(t, &NUMAResource{Affinity: "none"}, n2) + + var n3 = &NUMAResource{Affinity: "require", Devices: []string{}} + n3.Canonicalize() + must.Eq(t, &NUMAResource{Affinity: "require", Devices: nil}, n3) } diff --git a/client/lib/idset/idset.go b/client/lib/idset/idset.go index 2b99ce8966ed..d6e8307f13fb 100644 --- a/client/lib/idset/idset.go +++ b/client/lib/idset/idset.go @@ -106,11 +106,18 @@ func From[T, U ID](slice []U) *Set[T] { return result } +// Difference returns the set of elements in s but not in other. func (s *Set[T]) Difference(other *Set[T]) *Set[T] { diff := s.items.Difference(other.items) return &Set[T]{items: diff.(*set.Set[T])} } +// Intersect returns the set of elements that are in both s and other. +func (s *Set[T]) Intersect(other *Set[T]) *Set[T] { + intersection := s.items.Intersect(other.items) + return &Set[T]{items: intersection.(*set.Set[T])} +} + // Contains returns whether the Set contains item. func (s *Set[T]) Contains(item T) bool { return s.items.Contains(item) diff --git a/client/lib/numalib/detect_linux.go b/client/lib/numalib/detect_linux.go index 0d6b6d582a82..1c697127953a 100644 --- a/client/lib/numalib/detect_linux.go +++ b/client/lib/numalib/detect_linux.go @@ -7,7 +7,9 @@ package numalib import ( "fmt" + "io/fs" "os" + "path/filepath" "strconv" "strings" @@ -37,6 +39,7 @@ const ( cpuBaseFile = sysRoot + "/cpu/cpu%d/cpufreq/base_frequency" cpuSocketFile = sysRoot + "/cpu/cpu%d/topology/physical_package_id" cpuSiblingFile = sysRoot + "/cpu/cpu%d/topology/thread_siblings_list" + deviceFiles = "/sys/bus/pci/devices" ) // pathReaderFn is a path reader function, injected into all value getters to @@ -58,12 +61,29 @@ func (s *Sysfs) ScanSystem(top *Topology) { // detect core performance data s.discoverCores(top, os.ReadFile) + + // detect pci device bus associativity + s.discoverPCI(top, os.ReadFile) } func (*Sysfs) available() bool { return true } +func (*Sysfs) discoverPCI(st *Topology, readerFunc pathReaderFn) { + st.BusAssociativity = make(map[string]hw.NodeID) + + filepath.WalkDir(deviceFiles, func(path string, de fs.DirEntry, err error) error { + device := filepath.Base(path) + numaFile := filepath.Join(path, "numa_node") + node, err := getNumeric[int](numaFile, 64, readerFunc) + if err == nil && node >= 0 { + st.BusAssociativity[device] = hw.NodeID(node) + } + return nil + }) +} + func (*Sysfs) discoverOnline(st *Topology, readerFunc pathReaderFn) { ids, err := getIDSet[hw.NodeID](nodeOnline, readerFunc) if err == nil { @@ -156,13 +176,13 @@ func getIDSet[T idset.ID](path string, readerFunc pathReaderFn, args ...any) (*i return idset.Parse[T](string(s)), nil } -func getNumeric[T int | idset.ID](path string, maxSize int, readerFunc pathReaderFn, args ...any) (T, error) { +func getNumeric[T int | idset.ID](path string, bitSize int, readerFunc pathReaderFn, args ...any) (T, error) { path = fmt.Sprintf(path, args...) s, err := readerFunc(path) if err != nil { return 0, err } - i, err := strconv.ParseUint(strings.TrimSpace(string(s)), 10, maxSize) + i, err := strconv.ParseInt(strings.TrimSpace(string(s)), 10, bitSize) if err != nil { return 0, err } diff --git a/client/lib/numalib/detect_linux_test.go b/client/lib/numalib/detect_linux_test.go index 9354f5a70740..e253cacb6222 100644 --- a/client/lib/numalib/detect_linux_test.go +++ b/client/lib/numalib/detect_linux_test.go @@ -69,7 +69,7 @@ func goodSysData(path string) ([]byte, error) { } func TestSysfs_discoverOnline(t *testing.T) { - st := NewTopology(&idset.Set[hw.NodeID]{}, SLIT{}, []Core{}) + st := MockTopology(&idset.Set[hw.NodeID]{}, SLIT{}, []Core{}) goodIDSet := idset.From[hw.NodeID]([]uint8{0, 1}) oneNode := idset.From[hw.NodeID]([]uint8{0}) @@ -91,7 +91,7 @@ func TestSysfs_discoverOnline(t *testing.T) { } func TestSysfs_discoverCosts(t *testing.T) { - st := NewTopology(idset.Empty[hw.NodeID](), SLIT{}, []Core{}) + st := MockTopology(idset.Empty[hw.NodeID](), SLIT{}, []Core{}) twoNodes := idset.From[hw.NodeID]([]uint8{1, 3}) tests := []struct { @@ -121,7 +121,7 @@ func TestSysfs_discoverCosts(t *testing.T) { } func TestSysfs_discoverCores(t *testing.T) { - st := NewTopology(idset.Empty[hw.NodeID](), SLIT{}, []Core{}) + st := MockTopology(idset.Empty[hw.NodeID](), SLIT{}, []Core{}) oneNode := idset.From[hw.NodeID]([]uint8{0}) twoNodes := idset.From[hw.NodeID]([]uint8{1, 3}) diff --git a/client/lib/numalib/testing.go b/client/lib/numalib/testing.go new file mode 100644 index 000000000000..de5c7820c743 --- /dev/null +++ b/client/lib/numalib/testing.go @@ -0,0 +1,19 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: BUSL-1.1 + +package numalib + +import ( + "github.com/hashicorp/nomad/client/lib/idset" + "github.com/hashicorp/nomad/client/lib/numalib/hw" +) + +// MockTopology is a constructor for the Topology object, only used in tests for +// mocking. +func MockTopology(nodeIDs *idset.Set[hw.NodeID], distances SLIT, cores []Core) *Topology { + t := &Topology{ + nodeIDs: nodeIDs, + Distances: distances, Cores: cores} + t.SetNodes(nodeIDs) + return t +} diff --git a/client/lib/numalib/topology.go b/client/lib/numalib/topology.go index 46906be68f75..1d1147c8b9a1 100644 --- a/client/lib/numalib/topology.go +++ b/client/lib/numalib/topology.go @@ -63,21 +63,30 @@ type Topology struct { Distances SLIT Cores []Core + // BusAssociativity maps the specific bus each PCI device is plugged into + // with its hardware associated numa node + // + // e.g. "0000:03:00.0" -> 1 + // + // Note that the key may not exactly match the Locality.PciBusID from the + // fingerprint of the device with regard to the domain value. + // + // + // 0000:03:00.0 + // ^ ^ ^ ^ + // | | | |-- function (identifies functionality of device) + // | | |-- device (identifies the device number on the bus) + // | | + // | |-- bus (identifies which bus segment the device is connected to) + // | + // |-- domain (basically always 0, may be 0000 or 00000000) + BusAssociativity map[string]hw.NodeID + // explicit overrides from client configuration OverrideTotalCompute hw.MHz OverrideWitholdCompute hw.MHz } -// NewTopology is a constructor for the Topology object, only used in tests for -// mocking. -func NewTopology(nodeIDs *idset.Set[hw.NodeID], distances SLIT, cores []Core) *Topology { - t := &Topology{ - nodeIDs: nodeIDs, - Distances: distances, Cores: cores} - t.SetNodes(nodeIDs) - return t -} - func (t *Topology) SetNodes(nodes *idset.Set[hw.NodeID]) { t.nodeIDs = nodes if !nodes.Empty() { diff --git a/nomad/mock/node.go b/nomad/mock/node.go index d696a01cceb8..0a1729673749 100644 --- a/nomad/mock/node.go +++ b/nomad/mock/node.go @@ -130,6 +130,7 @@ func DrainNode() *structs.Node { // NvidiaNode returns a node with two instances of an Nvidia GPU func NvidiaNode() *structs.Node { n := Node() + n.NodeResources.Processors.Topology = structs.MockWorkstationTopology() n.NodeResources.Devices = []*structs.NodeDeviceResource{ { Type: "gpu", @@ -145,10 +146,16 @@ func NvidiaNode() *structs.Node { { ID: uuid.Generate(), Healthy: true, + Locality: &structs.NodeDeviceLocality{ + PciBusID: "0000:02:00.1", // node 0 + }, }, { ID: uuid.Generate(), Healthy: true, + Locality: &structs.NodeDeviceLocality{ + PciBusID: "0000:02:01.1", // node 0 + }, }, }, }, diff --git a/nomad/structs/devices.go b/nomad/structs/devices.go index a88bf1912c3d..9ed3438dbfd3 100644 --- a/nomad/structs/devices.go +++ b/nomad/structs/devices.go @@ -3,6 +3,8 @@ package structs +import "maps" + // DeviceAccounter is used to account for device usage on a node. It can detect // when a node is oversubscribed and can be used for deciding what devices are // free @@ -22,6 +24,25 @@ type DeviceAccounterInstance struct { Instances map[string]int } +// Locality returns the NodeDeviceLocality of the instance of the specific deviceID. +// +// If no instance matching the deviceID is found, nil is returned. +func (dai *DeviceAccounterInstance) GetLocality(instanceID string) *NodeDeviceLocality { + for _, instance := range dai.Device.Instances { + if instance.ID == instanceID { + return instance.Locality.Copy() + } + } + return nil +} + +func (dai *DeviceAccounterInstance) Copy() *DeviceAccounterInstance { + return &DeviceAccounterInstance{ + Device: dai.Device.Copy(), + Instances: maps.Clone(dai.Instances), + } +} + // NewDeviceAccounter returns a new device accounter. The node is used to // populate the set of available devices based on what healthy device instances // exist on the node. @@ -58,6 +79,14 @@ func NewDeviceAccounter(n *Node) *DeviceAccounter { return d } +func (d *DeviceAccounter) Copy() *DeviceAccounter { + devices := make(map[DeviceIdTuple]*DeviceAccounterInstance, len(d.Devices)) + for k, v := range d.Devices { + devices[k] = v.Copy() + } + return &DeviceAccounter{Devices: devices} +} + // AddAllocs takes a set of allocations and internally marks which devices are // used. If a device is used more than once by the set of passed allocations, // the collision will be returned as true. diff --git a/nomad/structs/devices_test.go b/nomad/structs/devices_test.go index b0c248ec3082..8acee158cd1f 100644 --- a/nomad/structs/devices_test.go +++ b/nomad/structs/devices_test.go @@ -262,3 +262,86 @@ func TestDeviceAccounter_AddReserved_Collision(t *testing.T) { res.DeviceIDs = []string{nvidiaDev0ID} require.True(d.AddReserved(res)) } + +func TestDeviceAccounterInstance_GetLocality(t *testing.T) { + ci.Parallel(t) + + dai := &DeviceAccounterInstance{ + Device: &NodeDeviceResource{ + Instances: []*NodeDevice{ + { + ID: "GPU-001", + Locality: &NodeDeviceLocality{ + PciBusID: "0000:01:01.1", + }, + }, + { + ID: "GPU-002", + Locality: &NodeDeviceLocality{ + PciBusID: "0000:01:02.1", + }, + }, + { + ID: "GPU-003", + Locality: &NodeDeviceLocality{ + PciBusID: "0000:01:03.1", + }, + }, + }, + }, + } + + t.Run("exists", func(t *testing.T) { + locality := dai.GetLocality("GPU-002") + must.Eq(t, "0000:01:02.1", locality.PciBusID) + }) + + t.Run("missing", func(t *testing.T) { + locality := dai.GetLocality("GPU-004") + must.Nil(t, locality) + }) +} + +func TestDeviceAccounterInstance_Copy(t *testing.T) { + original := &DeviceAccounterInstance{ + Device: &NodeDeviceResource{ + Vendor: "nvidia", + Type: "gpu", + Name: "1080ti", + Instances: []*NodeDevice{ + { + ID: "GPU-001", + Healthy: true, + HealthDescription: "healthy", + Locality: &NodeDeviceLocality{ + PciBusID: "0000:01:01.1", + }, + }, + { + ID: "GPU-002", + Healthy: true, + HealthDescription: "healthy", + Locality: &NodeDeviceLocality{ + PciBusID: "0000:01:02.1", + }, + }, + }, + Attributes: map[string]*psstructs.Attribute{}, + }, + Instances: map[string]int{ + "GPU-001": 1, + "GPU-002": 3, + }, + } + + clone := original.Copy() + clone.Device.Name = "name2" + clone.Device.Instances[0].ID = "GPU-003" + clone.Device.Instances[0].Locality.PciBusID = "0000:01:03.1" + clone.Instances["GPU-001"] = 2 + + must.Eq(t, "1080ti", original.Device.Name) + must.Eq(t, "GPU-001", original.Device.Instances[0].ID) + must.Eq(t, "0000:01:01.1", original.Device.Instances[0].Locality.PciBusID) + must.Eq(t, 1, original.Instances["GPU-001"]) +} diff --git a/nomad/structs/numa.go b/nomad/structs/numa.go index ff228ef21aa1..7eccb1b134a7 100644 --- a/nomad/structs/numa.go +++ b/nomad/structs/numa.go @@ -6,6 +6,7 @@ package structs import ( "errors" "fmt" + "slices" "github.com/hashicorp/nomad/client/lib/numalib" ) @@ -25,13 +26,44 @@ type NUMA struct { // Affinity is the numa affinity scheduling behavior. // One of "none", "prefer", "require". Affinity string + + // Devices is the set of devices requsted by the task that must share the + // same numa node, along with reserved cpu cores for the task. + Devices []string +} + +func (n *NUMA) GetDevices() []string { + if n == nil { + return []string{} + } + return n.Devices +} + +func (n *NUMA) Canonicalize() { + if n == nil { + return + } + if n.Affinity == "" { + n.Affinity = NoneNUMA + } + if len(n.Devices) == 0 { + n.Devices = nil + } } func (n *NUMA) Equal(o *NUMA) bool { if n == nil || o == nil { return n == o } - return n.Affinity == o.Affinity + + switch { + case n.Affinity != o.Affinity: + return false + case !slices.Equal(n.Devices, o.Devices): + return false + default: + return true + } } func (n *NUMA) Copy() *NUMA { @@ -40,6 +72,7 @@ func (n *NUMA) Copy() *NUMA { } return &NUMA{ Affinity: n.Affinity, + Devices: slices.Clone(n.Devices), } } diff --git a/nomad/structs/numa_test.go b/nomad/structs/numa_test.go index 325a318acdcf..3451e2294188 100644 --- a/nomad/structs/numa_test.go +++ b/nomad/structs/numa_test.go @@ -21,9 +21,13 @@ func TestNUMA_Equal(t *testing.T) { must.StructEqual(t, &NUMA{ Affinity: "none", + Devices: []string{"nvidia/gpu"}, }, []must.Tweak[*NUMA]{{ Field: "Affinity", Apply: func(n *NUMA) { n.Affinity = "require" }, + }, { + Field: "Devices", + Apply: func(n *NUMA) { n.Devices = []string{"a/b", "c/d"} }, }}) } @@ -67,7 +71,8 @@ func TestNUMA_Validate(t *testing.T) { for _, tc := range cases { t.Run(tc.name, func(t *testing.T) { numa := &NUMA{ - tc.affinity, + Affinity: tc.affinity, + Devices: nil, } result := numa.Validate() must.Eq(t, tc.exp, result) diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 18a5de39c4f3..5b8b57f4611f 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -2490,13 +2490,30 @@ func (r *Resources) Validate() error { mErr.Errors = append(mErr.Errors, errors.New("Task can't ask for disk resources, they have to be specified at the task group level.")) } + // Ensure devices are valid + devices := set.New[string](len(r.Devices)) for i, d := range r.Devices { if err := d.Validate(); err != nil { mErr.Errors = append(mErr.Errors, fmt.Errorf("device %d failed validation: %v", i+1, err)) } + devices.Insert(d.Name) } - // ensure memory_max is greater than memory, unless it is set to 0 or -1 which + // Ensure each numa bound device matches a device requested for task + if r.NUMA != nil { + for _, numaDevice := range r.NUMA.Devices { + if !devices.Contains(numaDevice) { + mErr.Errors = append(mErr.Errors, fmt.Errorf("numa device %q not requested as task resource", numaDevice)) + } + } + } + + // Ensure the numa block is valid + if err := r.NUMA.Validate(); err != nil { + mErr.Errors = append(mErr.Errors, err) + } + + // Ensure memory_max is greater than memory, unless it is set to 0 or -1 which // are both sentinel values if (r.MemoryMaxMB != 0 && r.MemoryMaxMB != memoryNoLimit) && r.MemoryMaxMB < r.MemoryMB { mErr.Errors = append(mErr.Errors, fmt.Errorf("MemoryMaxMB value (%d) should be larger than MemoryMB value (%d)", r.MemoryMaxMB, r.MemoryMB)) @@ -2623,6 +2640,8 @@ func (r *Resources) Canonicalize() { for _, n := range r.Networks { n.Canonicalize() } + + r.NUMA.Canonicalize() } // MeetsMinResources returns an error if the resources specified are less than @@ -3066,6 +3085,10 @@ type RequestedDevice struct { Affinities Affinities } +func (r *RequestedDevice) String() string { + return r.Name +} + func (r *RequestedDevice) Equal(o *RequestedDevice) bool { if r == o { return true diff --git a/nomad/structs/structs_test.go b/nomad/structs/structs_test.go index 3f8a90f469b3..8406aa03b914 100644 --- a/nomad/structs/structs_test.go +++ b/nomad/structs/structs_test.go @@ -2274,17 +2274,52 @@ func TestTask_Validate_Resources(t *testing.T) { MemoryMaxMB: -1, }, }, + { + name: "numa devices do not match", + res: &Resources{ + CPU: 100, + MemoryMB: 200, + Devices: []*RequestedDevice{ + { + Name: "evilcorp/gpu", + Count: 2, + }, + { + Name: "fpga", + Count: 1, + }, + { + Name: "net/nic/model1", + Count: 1, + }, + }, + NUMA: &NUMA{ + Affinity: "require", + Devices: []string{"evilcorp/gpu", "bad/bad", "fpga"}, + }, + }, + err: "numa device \"bad/bad\" not requested as task resource", + }, + { + name: "numa affinity not valid", + res: &Resources{ + CPU: 100, + MemoryMB: 200, + NUMA: &NUMA{ + Affinity: "bad", + }, + }, + err: "numa affinity must be one of none, prefer, or require", + }, } - for i := range cases { - tc := cases[i] + for _, tc := range cases { t.Run(tc.name, func(t *testing.T) { err := tc.res.Validate() if tc.err == "" { - require.NoError(t, err) + must.NoError(t, err) } else { - require.Error(t, err) - require.Contains(t, err.Error(), tc.err) + must.ErrorContains(t, err, tc.err) } }) } diff --git a/nomad/structs/testing.go b/nomad/structs/testing.go index e7be6e1ad5d8..63538dedbc4e 100644 --- a/nomad/structs/testing.go +++ b/nomad/structs/testing.go @@ -43,6 +43,7 @@ func NodeResourcesToAllocatedResources(n *NodeResources) *AllocatedResources { // - 1 socket, 1 NUMA node // - 4 cores @ 3500 MHz (14,000 MHz total) // - no client config overrides +// - no devices func MockBasicTopology() *numalib.Topology { cores := make([]numalib.Core, 4) for i := 0; i < 4; i++ { @@ -60,6 +61,7 @@ func MockBasicTopology() *numalib.Topology { Cores: cores, OverrideTotalCompute: 0, OverrideWitholdCompute: 0, + BusAssociativity: nil, } t.SetNodes(idset.From[hw.NodeID]([]hw.NodeID{0})) return t @@ -67,10 +69,17 @@ func MockBasicTopology() *numalib.Topology { // MockWorkstationTopology returns a numalib.Topology that looks like a typical // workstation; -// - 2 socket, 2 NUMA node (200% penalty) -// - 16 cores / 32 threads @ 3000 MHz (96,000 MHz total) -// - node0: odd cores, node1: even cores -// - no client config overrides +// - 2 socket, 2 NUMA node (200% penalty) +// - 16 cores / 32 threads @ 3000 MHz (96,000 MHz total) +// - node0: odd cores, node1: even cores +// - no client config overrides +// - node0 devices: +// nvidia/gpu/t1000 - 0000:02:00.1 +// nvidia/gpu/t1000 - 0000:02:01.1 +// net/type1 - 0000:03:00.2 +// - node1 devices: +// nvidia/gpu/r600 - 0000:08:00.1 +// fpga/kv0 - 0000:09:01.0 func MockWorkstationTopology() *numalib.Topology { cores := make([]numalib.Core, 32) for i := 0; i < 32; i++ { @@ -86,6 +95,13 @@ func MockWorkstationTopology() *numalib.Topology { t := &numalib.Topology{ Distances: numalib.SLIT{[]numalib.Cost{10, 20}, {20, 10}}, Cores: cores, + BusAssociativity: map[string]hw.NodeID{ + "0000:02:00.1": 0, + "0000:02:01.1": 0, + "0000:03:00.2": 0, + "0000:08:00.1": 1, + "0000:09:01.0": 1, + }, } t.SetNodes(idset.From[hw.NodeID]([]hw.NodeID{0, 1})) return t diff --git a/scheduler/device.go b/scheduler/device.go index 982da8133208..32ee0bef323a 100644 --- a/scheduler/device.go +++ b/scheduler/device.go @@ -5,9 +5,12 @@ package scheduler import ( "fmt" + "strings" "math" + "github.com/hashicorp/go-set/v2" + "github.com/hashicorp/nomad/client/lib/numalib" "github.com/hashicorp/nomad/nomad/structs" psstructs "github.com/hashicorp/nomad/plugins/shared/structs" ) @@ -30,10 +33,77 @@ func newDeviceAllocator(ctx Context, n *structs.Node) *deviceAllocator { } } -// AssignDevice takes a device request and returns an assignment as well as a -// score for the assignment. If no assignment could be made, an error is +func (da *deviceAllocator) Copy() *deviceAllocator { + accounter := da.DeviceAccounter.Copy() + allocator := &deviceAllocator{accounter, da.ctx} + return allocator +} + +type memoryNodeMatcher struct { + memoryNode int // the target memory node (-1 indicates don't care) + topology *numalib.Topology // the topology of the candidate node + devices *set.Set[string] // the set of devices requiring numa associativity +} + +// equalBusID will compare the instance specific device bus id values in a way +// that handles non-uniform domain strings (e.g. "0000" vs "00000000"). +// +// e.g. 0000:03:00.1 is equal to 00000000:03.00.1 +func equalBusID(a, b string) bool { + if a == b { + return true + } + noDomainA := strings.TrimLeft(a, "0") + noDomainB := strings.TrimLeft(b, "0") + return noDomainA == noDomainB +} + +// Matches returns whether the given device instance is on a PCI bus that is +// on the same NUMA node as the memory node of the matcher. +// +// instanceID is something like "GPU-6b5fa173-5fa6-2d38-54fe-d64c1fe4fe10" +// +// device is the grouping of device instance this instance belongs to and is +// how we find the pci bus locality. +func (m *memoryNodeMatcher) Matches(instanceID string, device *structs.NodeDeviceResource) bool { + // -1 is the sentinel value for not caring about the associated memory + // node, in which case we simply treat the device as a match + if m.memoryNode == -1 { + return true + } + + // if the device is not listed in the numa block of the task resources then + // we do not care about what node is is on + if !m.devices.Contains(device.ID().String()) { + return true + } + + // check if the hardware locality of the device matches the nume node of this + // memoryNodeMatcher instance. we do so by finding the specific device of + // the given instance id, looking at its locality, and comparing the locality + // using equalBusID because direct == equality does not work, due to + // differences in pci bus domain representations + for _, instance := range device.Instances { + if instance.ID == instanceID { + if instance.Locality != nil { + instanceBusID := instance.Locality.PciBusID + for busID, node := range m.topology.BusAssociativity { + if equalBusID(busID, instanceBusID) { + result := int(node) == m.memoryNode + return result + } + } + } + } + } + + return false +} + +// createOffer takes a device request and returns an assignment as well as a +// score for the assignment. If no assignment is possible, an error is // returned explaining why. -func (d *deviceAllocator) AssignDevice(ask *structs.RequestedDevice) (out *structs.AllocatedDeviceResource, score float64, err error) { +func (d *deviceAllocator) createOffer(mem *memoryNodeMatcher, ask *structs.RequestedDevice) (out *structs.AllocatedDeviceResource, score float64, err error) { // Try to hot path if len(d.Devices) == 0 { return nil, 0.0, fmt.Errorf("no devices available") @@ -52,10 +122,14 @@ func (d *deviceAllocator) AssignDevice(ask *structs.RequestedDevice) (out *struc for id, devInst := range d.Devices { // Check if we have enough unused instances to use this assignable := uint64(0) - for _, v := range devInst.Instances { - if v == 0 { - assignable++ + for instanceID, v := range devInst.Instances { + if v != 0 { + continue + } + if !mem.Matches(instanceID, devInst.Device) { + continue } + assignable++ } // This device doesn't have enough instances diff --git a/scheduler/device_test.go b/scheduler/device_test.go index 8d6fba161385..64d79819df65 100644 --- a/scheduler/device_test.go +++ b/scheduler/device_test.go @@ -6,7 +6,9 @@ package scheduler import ( "testing" + "github.com/hashicorp/go-set/v2" "github.com/hashicorp/nomad/ci" + "github.com/hashicorp/nomad/client/lib/numalib" "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" @@ -15,6 +17,12 @@ import ( "github.com/stretchr/testify/require" ) +func anyMemoryNodeMatcher() *memoryNodeMatcher { + return &memoryNodeMatcher{ + memoryNode: -1, + } +} + // deviceRequest takes the name, count and potential constraints and affinities // and returns a device request. func deviceRequest(name string, count uint64, @@ -104,7 +112,8 @@ func TestDeviceAllocator_Allocate_GenericRequest(t *testing.T) { // Build the request ask := deviceRequest("gpu", 1, nil, nil) - out, score, err := d.AssignDevice(ask) + mem := anyMemoryNodeMatcher() + out, score, err := d.createOffer(mem, ask) require.NotNil(out) require.Zero(score) require.NoError(err) @@ -127,7 +136,8 @@ func TestDeviceAllocator_Allocate_FullyQualifiedRequest(t *testing.T) { // Build the request ask := deviceRequest("intel/fpga/F100", 1, nil, nil) - out, score, err := d.AssignDevice(ask) + mem := anyMemoryNodeMatcher() + out, score, err := d.createOffer(mem, ask) require.NotNil(out) require.Zero(score) require.NoError(err) @@ -150,12 +160,65 @@ func TestDeviceAllocator_Allocate_NotEnoughInstances(t *testing.T) { // Build the request ask := deviceRequest("gpu", 4, nil, nil) - out, _, err := d.AssignDevice(ask) + mem := anyMemoryNodeMatcher() + out, _, err := d.createOffer(mem, ask) require.Nil(out) require.Error(err) require.Contains(err.Error(), "no devices match request") } +func TestDeviceAllocator_Allocate_NUMA_available(t *testing.T) { + ci.Parallel(t) + + _, ctx := testContext(t) + n := devNode() + d := newDeviceAllocator(ctx, n) + + ask := deviceRequest("nvidia/gpu/1080ti", 2, nil, nil) + + mem := &memoryNodeMatcher{ + memoryNode: 0, + topology: structs.MockWorkstationTopology(), + devices: set.From([]string{"nvidia/gpu/1080ti"}), + } + out, _, err := d.createOffer(mem, ask) + must.NoError(t, err) + must.SliceLen(t, 2, out.DeviceIDs) // DeviceIDs are actually instance ids +} + +func TestDeviceAllocator_Allocate_NUMA_node1(t *testing.T) { + ci.Parallel(t) + + _, ctx := testContext(t) + n := devNode() + n.NodeResources.Devices = append(n.NodeResources.Devices, &structs.NodeDeviceResource{ + Type: "fpga", + Vendor: "xilinx", + Name: "7XA", + Instances: []*structs.NodeDevice{ + { + ID: uuid.Generate(), + Healthy: true, + Locality: &structs.NodeDeviceLocality{ + PciBusID: "00000000:09:01.0", + }, + }, + }, + }) + d := newDeviceAllocator(ctx, n) + + ask := deviceRequest("xilinx/fpga/7XA", 1, nil, nil) + + mem := &memoryNodeMatcher{ + memoryNode: 1, + topology: structs.MockWorkstationTopology(), + devices: set.From([]string{"xilinx/fpga/7XA"}), + } + out, _, err := d.createOffer(mem, ask) + must.NoError(t, err) + must.SliceLen(t, 1, out.DeviceIDs) +} + // Test that asking for a device with constraints works func TestDeviceAllocator_Allocate_Constraints(t *testing.T) { ci.Parallel(t) @@ -272,7 +335,8 @@ func TestDeviceAllocator_Allocate_Constraints(t *testing.T) { // Build the request ask := deviceRequest(c.Name, 1, c.Constraints, nil) - out, score, err := d.AssignDevice(ask) + mem := anyMemoryNodeMatcher() + out, score, err := d.createOffer(mem, ask) if c.NoPlacement { require.Nil(t, out) } else { @@ -378,7 +442,8 @@ func TestDeviceAllocator_Allocate_Affinities(t *testing.T) { // Build the request ask := deviceRequest(c.Name, 1, nil, c.Affinities) - out, score, err := d.AssignDevice(ask) + mem := anyMemoryNodeMatcher() + out, score, err := d.createOffer(mem, ask) require.NotNil(out) require.NoError(err) if c.ZeroScore { @@ -393,3 +458,118 @@ func TestDeviceAllocator_Allocate_Affinities(t *testing.T) { }) } } + +func Test_equalBusID(t *testing.T) { + must.True(t, equalBusID("0000:03:00.1", "00000000:03:00.1")) + must.False(t, equalBusID("0000:03:00.1", "0000:03:00.0")) +} + +func Test_memoryNodeMatcher(t *testing.T) { + ci.Parallel(t) + + cases := []struct { + name string + memoryNode int // memory node in consideration + topology *numalib.Topology // cpu cores and device bus associativity + taskNumaDevices *set.Set[string] // devices that require numa associativity + instance string // asking if this particular instance (id) satisfies the request + device *structs.NodeDeviceResource // device group that contains specifics about instance(s) + exp bool + }{ + { + name: "ws: single gpu match on node 0", + memoryNode: 0, + topology: structs.MockWorkstationTopology(), + taskNumaDevices: set.From([]string{"nvidia/gpu/t1000"}), + instance: "GPU-T1000-01", + device: &structs.NodeDeviceResource{ + Vendor: "nvidia", + Type: "gpu", + Name: "t1000", + Instances: []*structs.NodeDevice{ + { + ID: "GPU-T1000-01", + Locality: &structs.NodeDeviceLocality{ + PciBusID: "0000:02:00.1", + }, + }, + }, + }, + exp: true, + }, + { + name: "ws: single gpu no match on node 1", + memoryNode: 1, + topology: structs.MockWorkstationTopology(), + taskNumaDevices: set.From([]string{"nvidia/gpu/t1000"}), + instance: "GPU-T1000-01", + device: &structs.NodeDeviceResource{ + Vendor: "nvidia", + Type: "gpu", + Name: "t1000", + Instances: []*structs.NodeDevice{ + { + ID: "GPU-T1000-01", + Locality: &structs.NodeDeviceLocality{ + PciBusID: "0000:02:00.1", + }, + }, + }, + }, + exp: false, + }, + { + name: "ws: net card match on node 0", + memoryNode: 0, + topology: structs.MockWorkstationTopology(), + taskNumaDevices: set.From([]string{"nvidia/gpu/t1000", "net/type1"}), + instance: "NET-T1-01", + device: &structs.NodeDeviceResource{ + Type: "net", + Name: "nic100", + Instances: []*structs.NodeDevice{ + { + ID: "NET-T1-01", + Locality: &structs.NodeDeviceLocality{ + PciBusID: "0000:03:00.2", + }, + }, + }, + }, + exp: true, + }, + { + name: "ws: any memory node", + memoryNode: -1, + exp: true, + }, + { + name: "ws: device is not requested to be numa aware", + memoryNode: 0, + taskNumaDevices: set.From([]string{"amd/gpu/t1000"}), + instance: "NET-T2-01", + device: &structs.NodeDeviceResource{ + Type: "net", + Name: "nic200", + Instances: []*structs.NodeDevice{ + { + ID: "NET-T2-01", + }, + }, + }, + exp: true, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + m := &memoryNodeMatcher{ + memoryNode: tc.memoryNode, + topology: tc.topology, + devices: tc.taskNumaDevices, + } + result := m.Matches(tc.instance, tc.device) + must.Eq(t, tc.exp, result) + }) + } +} diff --git a/scheduler/numa_ce.go b/scheduler/numa_ce.go index 146ca9d859eb..fa23fbcca169 100644 --- a/scheduler/numa_ce.go +++ b/scheduler/numa_ce.go @@ -16,9 +16,10 @@ import ( ) type coreSelector struct { - topology *numalib.Topology - availableCores *idset.Set[hw.CoreID] - shuffle func([]numalib.Core) + topology *numalib.Topology + availableCores *idset.Set[hw.CoreID] + shuffle func([]numalib.Core) + deviceMemoryNode int } // Select returns a set of CoreIDs that satisfy the requested core reservations, @@ -41,3 +42,10 @@ func randomizeCores(cores []numalib.Core) { cores[x], cores[y] = cores[y], cores[x] }) } + +// candidateMemoryNodes return -1 on CE, indicating any memory node is acceptable +// +// (NUMA aware scheduling is an enterprise feature) +func (cs *coreSelector) candidateMemoryNodes(ask *structs.Resources) []int { + return []int{-1} +} diff --git a/scheduler/preemption.go b/scheduler/preemption.go index 709f5d770264..96e1321dbf7d 100644 --- a/scheduler/preemption.go +++ b/scheduler/preemption.go @@ -4,9 +4,11 @@ package scheduler import ( + "maps" "math" "sort" + "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/nomad/structs" ) @@ -25,6 +27,13 @@ type allocInfo struct { resources *structs.ComparableResources } +func (ai *allocInfo) Copy() *allocInfo { + return &allocInfo{ + maxParallel: ai.maxParallel, + resources: ai.resources.Copy(), + } +} + // PreemptionResource interface is implemented by different // types of resources. type PreemptionResource interface { @@ -134,6 +143,23 @@ func NewPreemptor(jobPriority int, ctx Context, jobID *structs.NamespacedID) *Pr } } +func (p *Preemptor) Copy() *Preemptor { + currentPreemptions := make(map[structs.NamespacedID]map[string]int) + for k, v := range p.currentPreemptions { + currentPreemptions[k] = maps.Clone(v) + } + + return &Preemptor{ + currentPreemptions: currentPreemptions, + allocDetails: helper.DeepCopyMap(p.allocDetails), + jobPriority: p.jobPriority, + jobID: p.jobID, + nodeRemainingResources: p.nodeRemainingResources.Copy(), + currentAllocs: helper.CopySlice(p.currentAllocs), + ctx: p.ctx, + } +} + // SetNode sets the node func (p *Preemptor) SetNode(node *structs.Node) { nodeRemainingResources := node.NodeResources.Comparable() @@ -473,12 +499,12 @@ func newAllocDeviceGroup() *deviceGroupAllocs { // PreemptForDevice tries to find allocations to preempt to meet devices needed // This is called once per device request when assigning devices to the task func (p *Preemptor) PreemptForDevice(ask *structs.RequestedDevice, devAlloc *deviceAllocator) []*structs.Allocation { - // Group allocations by device, tracking the number of // instances used in each device by alloc id deviceToAllocs := make(map[structs.DeviceIdTuple]*deviceGroupAllocs) for _, alloc := range p.currentAllocs { for _, tr := range alloc.AllocatedResources.Tasks { + // Ignore allocs that don't use devices if len(tr.Devices) == 0 { continue diff --git a/scheduler/preemption_test.go b/scheduler/preemption_test.go index 265df4fb94ff..eb718f5dcd3d 100644 --- a/scheduler/preemption_test.go +++ b/scheduler/preemption_test.go @@ -5,10 +5,13 @@ package scheduler import ( "fmt" + "maps" "strconv" "testing" "github.com/hashicorp/nomad/ci" + "github.com/hashicorp/nomad/client/lib/numalib" + "github.com/hashicorp/nomad/client/lib/numalib/hw" "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" @@ -146,7 +149,82 @@ func TestResourceDistance(t *testing.T) { } -func TestPreemption(t *testing.T) { +func makeNodeResources(devices []*structs.NodeDeviceResource, busAssociativity map[string]hw.NodeID) *structs.NodeResources { + makeCore := func(node hw.NodeID, id hw.CoreID) numalib.Core { + sockets := map[hw.NodeID]hw.SocketID{ + 0: 0, + 1: 0, + 2: 1, + 3: 1, + } + return numalib.Core{ + NodeID: node, + SocketID: sockets[node], + ID: id, + Grade: numalib.Performance, + BaseSpeed: 4000, + } + } + + // 2 socket, 4 numa node system, 2 cores per node + processors := structs.NodeProcessorResources{ + Topology: &numalib.Topology{ + Nodes: []uint8{0, 1, 2, 3}, + Distances: numalib.SLIT{ + []numalib.Cost{10, 12, 32, 32}, + []numalib.Cost{12, 10, 32, 32}, + []numalib.Cost{32, 32, 10, 12}, + []numalib.Cost{32, 32, 12, 10}, + }, + Cores: []numalib.Core{ + makeCore(0, 0), + makeCore(0, 1), + makeCore(1, 2), + makeCore(1, 3), + makeCore(2, 4), + makeCore(2, 5), + makeCore(3, 6), + makeCore(3, 7), + }, + }, + } + + defaultNodeResources := &structs.NodeResources{ + Processors: processors, + Memory: structs.NodeMemoryResources{ + MemoryMB: 8192, + }, + Disk: structs.NodeDiskResources{ + DiskMB: 100 * 1024, + }, + Networks: []*structs.NetworkResource{ + { + Device: "eth0", + CIDR: "192.168.0.100/32", + MBits: 1000, + }, + }, + Devices: devices, + } + + defaultNodeResources.Compatibility() + + defaultNodeResources.Processors.Topology.BusAssociativity = maps.Clone(busAssociativity) + + return defaultNodeResources +} + +func makeDeviceInstance(instanceID, busID string) *structs.NodeDevice { + return &structs.NodeDevice{ + ID: instanceID, + Healthy: true, + Locality: &structs.NodeDeviceLocality{ + PciBusID: busID, + }, + } +} + +func TestPreemption_Normal(t *testing.T) { ci.Parallel(t) type testCase struct { @@ -182,6 +260,7 @@ func TestPreemption(t *testing.T) { defaultNodeResources := &structs.NodeResources{ Processors: processorResources, Cpu: legacyCpuResources, + Memory: structs.NodeMemoryResources{ MemoryMB: 8192, }, @@ -207,22 +286,10 @@ func TestPreemption(t *testing.T) { "memory_bandwidth": psstructs.NewIntAttribute(11, psstructs.UnitGBPerS), }, Instances: []*structs.NodeDevice{ - { - ID: deviceIDs[0], - Healthy: true, - }, - { - ID: deviceIDs[1], - Healthy: true, - }, - { - ID: deviceIDs[2], - Healthy: true, - }, - { - ID: deviceIDs[3], - Healthy: true, - }, + makeDeviceInstance(deviceIDs[0], "0000:00:00.0"), + makeDeviceInstance(deviceIDs[1], "0000:00:01.0"), + makeDeviceInstance(deviceIDs[2], "0000:00:02.0"), + makeDeviceInstance(deviceIDs[3], "0000:00:03.0"), }, }, { @@ -236,26 +303,11 @@ func TestPreemption(t *testing.T) { "memory_bandwidth": psstructs.NewIntAttribute(11, psstructs.UnitGBPerS), }, Instances: []*structs.NodeDevice{ - { - ID: deviceIDs[4], - Healthy: true, - }, - { - ID: deviceIDs[5], - Healthy: true, - }, - { - ID: deviceIDs[6], - Healthy: true, - }, - { - ID: deviceIDs[7], - Healthy: true, - }, - { - ID: deviceIDs[8], - Healthy: true, - }, + makeDeviceInstance(deviceIDs[4], "0000:00:04.0"), + makeDeviceInstance(deviceIDs[5], "0000:00:05.0"), + makeDeviceInstance(deviceIDs[6], "0000:00:06.0"), + makeDeviceInstance(deviceIDs[7], "0000:00:07.0"), + makeDeviceInstance(deviceIDs[8], "0000:00:08.0"), }, }, { @@ -266,14 +318,8 @@ func TestPreemption(t *testing.T) { "memory": psstructs.NewIntAttribute(4, psstructs.UnitGiB), }, Instances: []*structs.NodeDevice{ - { - ID: "fpga1", - Healthy: true, - }, - { - ID: "fpga2", - Healthy: false, - }, + makeDeviceInstance("fpga1", "0000:01:00.0"), + makeDeviceInstance("fpga2", "0000:02:01.0"), }, }, }, diff --git a/scheduler/rank.go b/scheduler/rank.go index f9a32c715587..e0634774f898 100644 --- a/scheduler/rank.go +++ b/scheduler/rank.go @@ -6,9 +6,12 @@ package scheduler import ( "fmt" "math" + "slices" + "github.com/hashicorp/go-set/v2" "github.com/hashicorp/nomad/client/lib/idset" "github.com/hashicorp/nomad/client/lib/numalib/hw" + "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/helper/safemath" "github.com/hashicorp/nomad/nomad/structs" ) @@ -204,7 +207,8 @@ func (iter *BinPackIterator) SetSchedulerConfiguration(schedConfig *structs.Sche } func (iter *BinPackIterator) Next() *RankedNode { -OUTER: + +NEXTNODE: for { // Get the next potential option option := iter.source.Next() @@ -292,7 +296,7 @@ OUTER: } else { iter.ctx.Logger().Named("binpack").Error(fmt.Sprintf("Invalid template for %s host network in port %s", port.HostNetwork, port.Label)) netIdx.Release() - continue OUTER + continue NEXTNODE } } } @@ -303,7 +307,7 @@ OUTER: } else { iter.ctx.Logger().Named("binpack").Error(fmt.Sprintf("Invalid template for %s host network in port %s", port.HostNetwork, port.Label)) netIdx.Release() - continue OUTER + continue NEXTNODE } } } @@ -314,7 +318,7 @@ OUTER: iter.ctx.Metrics().ExhaustedNode(option.Node, fmt.Sprintf("network: %s", err)) netIdx.Release() - continue OUTER + continue NEXTNODE } // Look for preemptible allocations to satisfy the network resource for this task @@ -326,7 +330,7 @@ OUTER: iter.ctx.Metrics().ExhaustedNode(option.Node, fmt.Sprintf("network: %s", err)) netIdx.Release() - continue OUTER + continue NEXTNODE } allocsToPreempt = append(allocsToPreempt, netPreemptions...) @@ -345,7 +349,7 @@ OUTER: iter.ctx.Metrics().ExhaustedNode(option.Node, fmt.Sprintf("network: %s", err)) netIdx.Release() - continue OUTER + continue NEXTNODE } } @@ -390,7 +394,7 @@ OUTER: iter.ctx.Metrics().ExhaustedNode(option.Node, fmt.Sprintf("network: %s", err)) netIdx.Release() - continue OUTER + continue NEXTNODE } // Look for preemptible allocations to satisfy the network resource for this task @@ -402,7 +406,7 @@ OUTER: iter.ctx.Metrics().ExhaustedNode(option.Node, fmt.Sprintf("network: %s", err)) netIdx.Release() - continue OUTER + continue NEXTNODE } allocsToPreempt = append(allocsToPreempt, netPreemptions...) @@ -421,7 +425,7 @@ OUTER: iter.ctx.Metrics().ExhaustedNode(option.Node, fmt.Sprintf("network: %s", err)) netIdx.Release() - continue OUTER + continue NEXTNODE } } // Reserve this to prevent another task from colliding @@ -431,56 +435,240 @@ OUTER: taskResources.Networks = []*structs.NetworkResource{offer} } - // Check if we need to assign devices - for _, req := range task.Resources.Devices { - offer, sumAffinities, err := devAllocator.AssignDevice(req) - if offer == nil { - // If eviction is not enabled, mark this node as exhausted and continue - if !iter.evict { - iter.ctx.Metrics().ExhaustedNode(option.Node, fmt.Sprintf("devices: %s", err)) - continue OUTER - } + // Acquire devices - // Attempt preemption - preemptor.SetCandidates(proposed) - devicePreemptions := preemptor.PreemptForDevice(req, devAllocator) + // deviceMemoryNode will record which NUMA memory node our devices + // connected to, or -1 to indicate we did not care + deviceMemoryNode := -1 - if devicePreemptions == nil { - iter.ctx.Logger().Named("binpack").Debug("preemption not possible", "requested_device", req) - iter.ctx.Metrics().ExhaustedNode(option.Node, fmt.Sprintf("devices: %s", err)) - netIdx.Release() - continue OUTER - } - allocsToPreempt = append(allocsToPreempt, devicePreemptions...) + // if there are no devices, skip over device assignments + if len(task.Resources.Devices) == 0 { + goto SELECTCORES + } - // First subtract out preempted allocations - proposed = structs.RemoveAllocs(proposed, allocsToPreempt) + { + // Attempt device assignments without pre-emption. + // + // This block will attempt to assign devices using the available + // CPU cores and devices WITHOUT leveraging preemption to make + // things fit. If this fails we do this logic again below but + // with pre-emption logic. + // + // We do this so as to give priority to device allocation + // options that do not involve killing other tasks, while still + // ensuring we get the NUMA associativity the task is asking for. + + // set of already consumed cores on this node + consumedCores := idset.Empty[hw.CoreID]() + for _, alloc := range proposed { + allocCores := alloc.AllocatedResources.Comparable().Flattened.Cpu.ReservedCores + idset.InsertSlice(consumedCores, allocCores...) + } - // Reset the device allocator with new set of proposed allocs - devAllocator := newDeviceAllocator(iter.ctx, option.Node) - devAllocator.AddAllocs(proposed) + // add cores reserved for other tasks + for _, tr := range total.Tasks { + taskCores := tr.Cpu.ReservedCores + idset.InsertSlice(consumedCores, taskCores...) + } - // Try offer again - offer, sumAffinities, err = devAllocator.AssignDevice(req) - if offer == nil { - iter.ctx.Logger().Named("binpack").Debug("unexpected error, unable to create device offer after considering preemption", "error", err) - iter.ctx.Metrics().ExhaustedNode(option.Node, fmt.Sprintf("devices: %s", err)) - continue OUTER + nodeCores := option.Node.NodeResources.Processors.Topology.UsableCores() + + // usable cores not yet consumed for this node + availableCores := nodeCores.Difference(consumedCores) + + // the memory nodes with sufficient cores for the task + // resources, calculated by subtracting off all cores currently + // in use because we are not allowing preemption + candidateMemoryNodes := (&coreSelector{ + topology: option.Node.NodeResources.Processors.Topology, + availableCores: availableCores, + }).candidateMemoryNodes(task.Resources) + + // snapshot the current state of device allocation, which we + // will revert to each time we run into a problem while selecting + // devices with memory node limitations + devAllocatorSnapshot := devAllocator.Copy() + taskResourcesSnapshot := slices.Clone(taskResources.Devices) + sumMatchingAffinitiesSnapshot := sumMatchingAffinities + totalDeviceAffinityWeightSnapshot := totalDeviceAffinityWeight + + SELECT_BY_NUMA_WITHOUT_EVICT: + for _, candidateMemoryNode := range candidateMemoryNodes { + deviceMemoryNode = candidateMemoryNode + + // attempt to assign devices using the given target memory + // node + count := 0 + for _, device := range task.Resources.Devices { + memory := &memoryNodeMatcher{ + memoryNode: candidateMemoryNode, + topology: option.Node.NodeResources.Processors.Topology, + devices: set.From(task.Resources.NUMA.GetDevices()), + } + + offer, sumAffinities, err := devAllocator.createOffer(memory, device) + if offer == nil || err != nil { + devAllocator = devAllocatorSnapshot + taskResources.Devices = taskResourcesSnapshot + sumMatchingAffinities = sumMatchingAffinitiesSnapshot + totalDeviceAffinityWeight = totalDeviceAffinityWeightSnapshot + continue SELECT_BY_NUMA_WITHOUT_EVICT + } + + // assign the offer for this device to our allocator + devAllocator.AddReserved(offer) + taskResources.Devices = append(taskResources.Devices, offer) + + // Add the scores + if len(device.Affinities) != 0 { + for _, a := range device.Affinities { + totalDeviceAffinityWeight += math.Abs(float64(a.Weight)) + } + sumMatchingAffinities += sumAffinities + } + count++ + } + + if count == len(task.Resources.Devices) { + // We were able to allocate every device, no need to + // try again using preemption. Skip on down to the + // allocation of cpu cores. + goto SELECTCORES } + + // reset allocation attempt to snapshot before trying with + // next memory node option + devAllocator = devAllocatorSnapshot + taskResources.Devices = taskResourcesSnapshot + sumMatchingAffinities = sumMatchingAffinitiesSnapshot + totalDeviceAffinityWeight = totalDeviceAffinityWeightSnapshot } + } + + { + // Attempt device assignments with pre-emption. + // + // This block will attempt to assign devices using any CPU cores + // and devices WITH leveraging preemption. We will have already + // made attempts without preemption. - // Store the resource - devAllocator.AddReserved(offer) - taskResources.Devices = append(taskResources.Devices, offer) + // If preemption is not enabled, then this node is exhausted. + if !iter.evict { + iter.ctx.Metrics().ExhaustedNode(option.Node, fmt.Sprintf("devices: %s", err)) + continue NEXTNODE + } - // Add the scores - if len(req.Affinities) != 0 { - for _, a := range req.Affinities { - totalDeviceAffinityWeight += math.Abs(float64(a.Weight)) + // get a list of available memory nodes, including cores currently + // in-use, which we can acquire by evicting tasks + candidateMemoryNodes := (&coreSelector{ + topology: option.Node.NodeResources.Processors.Topology, + availableCores: option.Node.NodeResources.Processors.Topology.UsableCores(), + }).candidateMemoryNodes(task.Resources) + + // snapshot the current state of device allocation, which we + // will revert to each time we run into a problem while selecting + // devices with memory node limitations + devAllocatorSnapshot := devAllocator.Copy() + taskResourcesSnapshot := slices.Clone(taskResources.Devices) + sumMatchingAffinitiesSnapshot := sumMatchingAffinities + totalDeviceAffinityWeightSnapshot := totalDeviceAffinityWeight + preemptorSnapshot := preemptor.Copy() + allocsToPreemptSnapshot := helper.CopySlice(allocsToPreempt) + proposedSnapshot := helper.CopySlice(proposed) + + var offerErr error = nil + + SELECT_BY_NUMA_WITH_EVICT: + for _, candidateMemoryNode := range candidateMemoryNodes { + deviceMemoryNode = candidateMemoryNode + + // attempt to assign devices using the given target memory + // node + count := 0 + for _, device := range task.Resources.Devices { + memory := &memoryNodeMatcher{ + memoryNode: candidateMemoryNode, + topology: option.Node.NodeResources.Processors.Topology, + devices: set.From(task.Resources.NUMA.GetDevices()), + } + + offer, sumAffinities, err := devAllocator.createOffer(memory, device) + if offer == nil { + offerErr = err + + // get the potential preemptions + preemptor.SetCandidates(proposed) // allocations + devicePreemptions := preemptor.PreemptForDevice(device, devAllocator) + + restoreSnapshots := func() { + devAllocator = devAllocatorSnapshot + taskResources.Devices = taskResourcesSnapshot + sumMatchingAffinities = sumMatchingAffinitiesSnapshot + totalDeviceAffinityWeight = totalDeviceAffinityWeightSnapshot + preemptor = preemptorSnapshot + allocsToPreempt = allocsToPreemptSnapshot + proposed = proposedSnapshot + } + + // not able to assign device even with preemption, + // reset to snapshots and try next memory node + if devicePreemptions == nil { + restoreSnapshots() + continue SELECT_BY_NUMA_WITH_EVICT + } + + allocsToPreempt = append(allocsToPreempt, devicePreemptions...) + + // subtract out preempted allocations + proposed = structs.RemoveAllocs(proposed, allocsToPreempt) + + // use a device allocator with new set of proposed allocs + devAllocatorEvict := newDeviceAllocator(iter.ctx, option.Node) + devAllocatorEvict.AddAllocs(proposed) + + // attempt the offer again + offerEvict, sumAffinitiesEvict, err := devAllocatorEvict.createOffer(memory, device) + if offerEvict == nil || err != nil { + // we cannot acquire this device even with preemption + iter.ctx.Logger().Named("binpack").Debug("unexpected error, unable to create device offer after considering preemption", "error", err) + iter.ctx.Metrics().ExhaustedNode(option.Node, fmt.Sprintf("devices: %s", err)) + continue NEXTNODE + } + + offer = offerEvict + sumAffinities = sumAffinitiesEvict + } + + // assign the offer for this device to our allocator + devAllocator.AddReserved(offer) + taskResources.Devices = append(taskResources.Devices, offer) + + // Add the scores + if len(device.Affinities) != 0 { + for _, a := range device.Affinities { + totalDeviceAffinityWeight += math.Abs(float64(a.Weight)) + } + sumMatchingAffinities += sumAffinities + } + count++ + } + + if count == len(task.Resources.Devices) { + // We were able to allocate every device. + goto SELECTCORES } - sumMatchingAffinities += sumAffinities } - } + + // We were not able to allocate every device, implying + // this node could not support the device ask. + iter.ctx.Logger().Named("binpack").Debug("preemption not possible") + iter.ctx.Metrics().ExhaustedNode(option.Node, fmt.Sprintf("devices: %s", offerErr)) + netIdx.Release() + continue NEXTNODE + + } // preempt attempt + + SELECTCORES: // Handle CPU core reservations if wantedCores := task.Resources.Cores; wantedCores > 0 { @@ -506,21 +694,22 @@ OUTER: // mark the node as exhausted if not enough cores available if availableCores.Size() < wantedCores { iter.ctx.Metrics().ExhaustedNode(option.Node, "cores") - continue OUTER + continue NEXTNODE } // set the task's reserved cores cores, bandwidth := (&coreSelector{ - topology: option.Node.NodeResources.Processors.Topology, - availableCores: availableCores, - shuffle: randomizeCores, + topology: option.Node.NodeResources.Processors.Topology, + availableCores: availableCores, + shuffle: randomizeCores, + deviceMemoryNode: deviceMemoryNode, }).Select(task.Resources) // mark the node as exhausted if not enough cores available given // the NUMA preference if cores == nil { iter.ctx.Metrics().ExhaustedNode(option.Node, "numa-cores") - continue OUTER + continue NEXTNODE } // set the cores and bandwidth consumed by the task diff --git a/scheduler/rank_test.go b/scheduler/rank_test.go index 4cb9b4d9ae2d..6dec2f777843 100644 --- a/scheduler/rank_test.go +++ b/scheduler/rank_test.go @@ -13,6 +13,7 @@ import ( "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" + "github.com/shoenig/test/must" "github.com/stretchr/testify/require" ) @@ -1904,11 +1905,14 @@ func TestBinPackIterator_Devices(t *testing.T) { for _, c := range cases { t.Run(c.Name, func(t *testing.T) { - require := require.New(t) - // Setup the context state, ctx := testContext(t) + // Canonicalize resources + for _, task := range c.TaskGroup.Tasks { + task.Resources.Canonicalize() + } + // Add the planned allocs if len(c.PlannedAllocs) != 0 { for _, alloc := range c.PlannedAllocs { @@ -1923,7 +1927,7 @@ func TestBinPackIterator_Devices(t *testing.T) { for _, alloc := range c.ExistingAllocs { alloc.NodeID = c.Node.ID } - require.NoError(state.UpsertAllocs(structs.MsgTypeTestSetup, 1000, c.ExistingAllocs)) + must.NoError(t, state.UpsertAllocs(structs.MsgTypeTestSetup, 1000, c.ExistingAllocs)) } static := NewStaticRankIterator(ctx, []*RankedNode{{Node: c.Node}}) @@ -1939,7 +1943,7 @@ func TestBinPackIterator_Devices(t *testing.T) { // Check we got the placements we are expecting for tname, devices := range c.ExpectedPlacements { tr, ok := out.TaskResources[tname] - require.True(ok) + must.True(t, ok) want := len(devices) got := 0 @@ -1947,20 +1951,20 @@ func TestBinPackIterator_Devices(t *testing.T) { got++ expected, ok := devices[*placed.ID()] - require.True(ok) - require.Equal(expected.Count, len(placed.DeviceIDs)) + must.True(t, ok) + must.Eq(t, expected.Count, len(placed.DeviceIDs)) for _, id := range expected.ExcludeIDs { - require.NotContains(placed.DeviceIDs, id) + must.SliceNotContains(t, placed.DeviceIDs, id) } } - require.Equal(want, got) + must.Eq(t, want, got) } // Check potential affinity scores if c.DeviceScore != 0.0 { - require.Len(out.Scores, 2) - require.Equal(c.DeviceScore, out.Scores[1]) + must.Len(t, 2, out.Scores) + must.Eq(t, c.DeviceScore, out.Scores[1]) } }) } @@ -2054,6 +2058,7 @@ func TestBinPackIterator_Device_Failure_With_Eviction(t *testing.T) { Count: 1, }, }, + NUMA: &structs.NUMA{Affinity: structs.NoneNUMA}, }, }, }, @@ -2067,13 +2072,11 @@ func TestBinPackIterator_Device_Failure_With_Eviction(t *testing.T) { scoreNorm := NewScoreNormalizationIterator(ctx, binp) out := collectRanked(scoreNorm) - require := require.New(t) // We expect a placement failure because we need 1 GPU device // and the other one is taken - - require.Len(out, 0) - require.Equal(1, ctx.metrics.DimensionExhausted["devices: no devices match request"]) + must.SliceEmpty(t, out) + must.Eq(t, 1, ctx.metrics.DimensionExhausted["devices: no devices match request"]) } func TestJobAntiAffinity_PlannedAlloc(t *testing.T) {