Skip to content

Commit

Permalink
#12961: Allow CQ reader and worker thread to be on different cores
Browse files Browse the repository at this point in the history
  • Loading branch information
tt-asaigal committed Oct 18, 2024
1 parent e902289 commit fb42828
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 40 deletions.
4 changes: 2 additions & 2 deletions tt_metal/impl/device/device.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ namespace tt {
namespace tt_metal {

Device::Device(
chip_id_t device_id, const uint8_t num_hw_cqs, size_t l1_small_size, size_t trace_region_size, const std::vector<uint32_t> &l1_bank_remap, bool minimal, uint32_t worker_core) :
id_(device_id), worker_thread_core(worker_core), work_executor(worker_core, device_id) {
chip_id_t device_id, const uint8_t num_hw_cqs, size_t l1_small_size, size_t trace_region_size, const std::vector<uint32_t> &l1_bank_remap, bool minimal, uint32_t worker_core, uint32_t completion_queue_reader_core) :
id_(device_id), worker_thread_core(worker_core), completion_queue_reader_core(completion_queue_reader_core), work_executor(worker_core, device_id) {
ZoneScoped;
tunnel_device_dispatch_workers_ = {};
this->initialize(num_hw_cqs, l1_small_size, trace_region_size, l1_bank_remap, minimal);
Expand Down
4 changes: 3 additions & 1 deletion tt_metal/impl/device/device.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ class Device {
std::size_t trace_region_size,
const std::vector<uint32_t> &l1_bank_remap = {},
bool minimal = false,
uint32_t worker_core = 0);
uint32_t worker_core = 0,
uint32_t completion_queue_reader_core = 0);

~Device();

Expand Down Expand Up @@ -291,6 +292,7 @@ class Device {
// all tasks scheduled on this device
WorkExecutor work_executor;
uint32_t worker_thread_core;
uint32_t completion_queue_reader_core;
std::unique_ptr<SystemMemoryManager> sysmem_manager_;
LaunchMessageRingBufferState worker_launch_message_buffer_state;
uint8_t num_hw_cqs_;
Expand Down
101 changes: 67 additions & 34 deletions tt_metal/impl/device/device_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,55 +33,87 @@ std::unordered_map<int, std::vector<uint32_t>> get_cpu_cores_per_numa_node(std::
return cpu_cores_per_numa_node;
}

int get_cpu_core_for_device_worker_thread(
std::pair<int, int> get_cpu_cores_for_dispatch_threads(
int mmio_controlled_device_id,
const std::unordered_map<int, std::vector<uint32_t>>& cpu_cores_per_numa_node,
std::unordered_set<uint32_t>& free_cores) {
int core_assigned_to_device = 0;
if (numa_available() != -1) {
// Get NUMA node that the current device is mapped to through UMD
int numa_node_for_device = tt::Cluster::instance().get_numa_node_for_device(mmio_controlled_device_id);
if (cpu_cores_per_numa_node.find(numa_node_for_device) != cpu_cores_per_numa_node.end()) {
// NUMA node reported by UMD exists on host. Choose a core on this numa-node using round robin policy
int num_cores_in_numa_node = cpu_cores_per_numa_node.at(numa_node_for_device).size();
core_assigned_to_device =
cpu_cores_per_numa_node.at(numa_node_for_device).at(mmio_controlled_device_id % num_cores_in_numa_node);
std::unordered_set<uint32_t>& free_cores,
uint32_t num_devices,
bool use_separate_procs) {

int core_assigned_to_device_worker_thread = 0;
int core_assigned_to_device_completion_queue_reader = 0;
uint32_t num_online_processors = sysconf(_SC_NPROCESSORS_ONLN);
// Get NUMA node that the current device is mapped to through UMD
int numa_node_for_device = tt::Cluster::instance().get_numa_node_for_device(mmio_controlled_device_id);

if (numa_available() != -1 and cpu_cores_per_numa_node.find(numa_node_for_device) != cpu_cores_per_numa_node.end()) {
// NUMA node reported by UMD exists on host. Choose a core on this numa-node using round robin policy
const auto& cpu_core_for_numa_node = cpu_cores_per_numa_node.at(numa_node_for_device);
int num_cores_in_numa_node = cpu_core_for_numa_node.size();
core_assigned_to_device_worker_thread =
cpu_core_for_numa_node.at(mmio_controlled_device_id % num_cores_in_numa_node);
if (use_separate_procs) {
core_assigned_to_device_completion_queue_reader =
cpu_core_for_numa_node.at((mmio_controlled_device_id + num_devices) % num_cores_in_numa_node);
} else {
// NUMA node reported by UMD does not exist on host. Use round-robin binding policy for this worker thread.
log_warning(
tt::LogMetal,
"NUMA node {} for device {} does not exist on host.",
numa_node_for_device,
mmio_controlled_device_id);
core_assigned_to_device = mmio_controlled_device_id % sysconf(_SC_NPROCESSORS_ONLN);
core_assigned_to_device_completion_queue_reader = core_assigned_to_device_worker_thread;
}
} else {
// System does not use NUMA. Use-round robin binding strategy.
core_assigned_to_device = mmio_controlled_device_id % sysconf(_SC_NPROCESSORS_ONLN);
// NUMA node reported by UMD does not exist on host. Use round-robin binding policy for this worker thread.
log_warning(
tt::LogMetal,
"NUMA node {} for device {} does not exist on host or NUMA is not available.",
numa_node_for_device,
mmio_controlled_device_id);
core_assigned_to_device_worker_thread = mmio_controlled_device_id % num_online_processors;
if (use_separate_procs) {
core_assigned_to_device_completion_queue_reader = (mmio_controlled_device_id + num_devices) % num_online_processors;
} else {
core_assigned_to_device_completion_queue_reader = core_assigned_to_device_worker_thread;
}
}

free_cores.erase(core_assigned_to_device_worker_thread);
if (use_separate_procs) {
free_cores.erase(core_assigned_to_device_completion_queue_reader);
}
free_cores.erase(core_assigned_to_device);
return core_assigned_to_device;
return std::make_pair(core_assigned_to_device_worker_thread, core_assigned_to_device_completion_queue_reader);
}

std::unordered_map<uint32_t, uint32_t> get_device_id_to_core_map(
const std::vector<chip_id_t>& device_ids,
std::unordered_set<uint32_t>& free_cores,
bool use_numa_node_based_thread_binding) {
std::unordered_map<uint32_t, uint32_t> device_to_core_map = {};
bool use_numa_node_based_thread_binding,
const uint8_t num_hw_cqs,
std::unordered_map<uint32_t, uint32_t>& completion_queue_reader_to_cpu_core_map) {
uint32_t num_online_processors = sysconf(_SC_NPROCESSORS_ONLN);
constexpr uint32_t max_num_procs_per_device = 2;
// When using multiple command queues, assign separate CPU cores to worker and completion queue reader threads,
// if enough processors exist on host. Atleast one core is given to the main thread.
bool separate_procs_for_worker_and_reader = (num_hw_cqs > 1) && (max_num_procs_per_device * device_ids.size() <= num_online_processors - 1);
std::unordered_map<uint32_t, uint32_t> worker_thread_to_cpu_core_map = {};
if (use_numa_node_based_thread_binding) {
auto cpu_cores_per_numa_node = device_cpu_allocator::get_cpu_cores_per_numa_node(free_cores);
for (const auto& device_id : device_ids) {
device_to_core_map.insert(
{device_id,
device_cpu_allocator::get_cpu_core_for_device_worker_thread(
device_id, cpu_cores_per_numa_node, free_cores)});
auto [worker_thread_core, completion_queue_reader_core] = device_cpu_allocator::get_cpu_cores_for_dispatch_threads(
device_id, cpu_cores_per_numa_node, free_cores, device_ids.size(), separate_procs_for_worker_and_reader);
worker_thread_to_cpu_core_map.insert({device_id, worker_thread_core});
completion_queue_reader_to_cpu_core_map.insert({device_id, completion_queue_reader_core});
}
} else {
// Round Robin CPU assignment for worker and completion queue reader threads
for (const auto& device_id : device_ids) {
device_to_core_map.insert({device_id, device_id % sysconf(_SC_NPROCESSORS_ONLN)});
uint32_t worker_thread_proc = device_id % num_online_processors;
worker_thread_to_cpu_core_map.insert({device_id, worker_thread_proc});
if (separate_procs_for_worker_and_reader) {
uint32_t completion_queue_reader_proc = (device_id + device_ids.size()) % num_online_processors;
completion_queue_reader_to_cpu_core_map.insert({device_id, completion_queue_reader_proc});
} else {
completion_queue_reader_to_cpu_core_map.insert({device_id, worker_thread_proc});
}
}
}
return device_to_core_map;
return worker_thread_to_cpu_core_map;
}

void bind_current_thread_to_free_cores(const std::unordered_set<uint32_t>& free_cores) {
Expand Down Expand Up @@ -225,9 +257,10 @@ void DevicePool::activate_device(chip_id_t id) {
}
if (this->devices[id] == nullptr) {
log_debug(tt::LogMetal, "DevicePool new device {}", id);
int core_assigned_to_device = this->device_to_core_map.at(id);
int worker_core_thread_core = this->worker_thread_to_cpu_core_map.at(id);
int completion_queue_reader_core = this->completion_queue_reader_to_cpu_core_map.at(id);
auto dev =
new Device(id, this->num_hw_cqs, this->l1_small_size, this->trace_region_size, this->l1_bank_remap, false, core_assigned_to_device);
new Device(id, this->num_hw_cqs, this->l1_small_size, this->trace_region_size, this->l1_bank_remap, false, worker_core_thread_core, completion_queue_reader_core);
dev->update_dispatch_cores_for_multi_cq_eth_dispatch();
if (!this->firmware_built_keys.contains(dev->build_key())) {
dev->build_firmware();
Expand Down Expand Up @@ -364,8 +397,8 @@ DevicePool::DevicePool(
all_device_ids.emplace_back((chip_id_t)i);
}
std::unordered_set<uint32_t> free_cores = {};
this->device_to_core_map =
device_cpu_allocator::get_device_id_to_core_map(all_device_ids, free_cores, use_numa_node_based_thread_binding);
this->worker_thread_to_cpu_core_map =
device_cpu_allocator::get_device_id_to_core_map(all_device_ids, free_cores, use_numa_node_based_thread_binding, num_hw_cqs, this->completion_queue_reader_to_cpu_core_map);
if (use_numa_node_based_thread_binding) {
// Bind main thread to cores not being used by workers
device_cpu_allocator::bind_current_thread_to_free_cores(free_cores);
Expand Down
4 changes: 2 additions & 2 deletions tt_metal/impl/device/device_pool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ class DevicePool {
std::unordered_set<uint32_t> firmware_built_keys;

// Determine which CPU cores the worker threads need to be placed on for each device
std::unordered_map<uint32_t, uint32_t> device_to_core_map;

std::unordered_map<uint32_t, uint32_t> worker_thread_to_cpu_core_map;
std::unordered_map<uint32_t, uint32_t> completion_queue_reader_to_cpu_core_map;
void init_firmware_on_active_devices() const;
void init_profiler_devices() const;
void activate_device(chip_id_t id);
Expand Down
2 changes: 1 addition & 1 deletion tt_metal/impl/dispatch/command_queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1828,7 +1828,7 @@ HWCommandQueue::HWCommandQueue(Device* device, uint32_t id, NOC noc_index) :
std::thread completion_queue_thread = std::thread(&HWCommandQueue::read_completion_queue, this);
this->completion_queue_thread = std::move(completion_queue_thread);
// Set the affinity of the completion queue reader.
set_device_thread_affinity(this->completion_queue_thread, device->worker_thread_core);
set_device_thread_affinity(this->completion_queue_thread, device->completion_queue_reader_core);
this->expected_num_workers_completed = 0;
}

Expand Down

0 comments on commit fb42828

Please sign in to comment.