From 7834fc62301da1ee10a20ec9e93afe822e5c4958 Mon Sep 17 00:00:00 2001 From: Ruihang Lai Date: Wed, 27 Mar 2024 13:06:59 -0400 Subject: [PATCH] [Disco] Set worker CPU affinity with env variable This PR enables setting the CPU affinity of disco workers in MLC, following the support in apache/tvm#16807. The purpose is to try reduce the CPU core switch overhead brought to disco workers which may cause extra bubble times in disco workers before/during tasks. We use a macro `MLC_DISCO_WORKER_CPU_BINDING` to specify the CPU affinities of workers. This is by default not used. To enable it, you can run the command like ```shell MLC_DISCO_WORKER_CPU_BINDING=64,65,66,67 python some_mlc_app.py ``` to specify the four CPU core ids for the four workers. --- cpp/serve/function_table.cc | 31 +++++++++++++++++++++++++++++++ cpp/support/utils.h | 24 ++++++++++++++++++++++++ 2 files changed, 55 insertions(+) create mode 100644 cpp/support/utils.h diff --git a/cpp/serve/function_table.cc b/cpp/serve/function_table.cc index f4466c875b..a7f878c1ba 100644 --- a/cpp/serve/function_table.cc +++ b/cpp/serve/function_table.cc @@ -13,17 +13,44 @@ #include #include +#include #include #include #include #include "../support/load_bytes_from_file.h" +#include "../support/utils.h" #include "sampler/sampler.h" namespace mlc { namespace llm { namespace serve { +Optional GetDiscoWorkerCPUBinding(int num_workers) { + const char* raw_cpu_binding = std::getenv("MLC_DISCO_WORKER_CPU_BINDING"); + if (raw_cpu_binding == nullptr) { + return NullOpt; + } + + std::string cpu_binding_str(raw_cpu_binding); + std::vector cpu_ids_str = Split(cpu_binding_str, ','); + std::vector cpu_ids; + for (const std::string& cpu_id_str : cpu_ids_str) { + try { + cpu_ids.push_back(std::stol(cpu_id_str)); + } catch (std::invalid_argument const& ex) { + LOG(FATAL) << "Invalid MLC_DISCO_WORKER_CPU_BINDING \"" << cpu_binding_str << "\""; + } + } + if (static_cast(cpu_ids.size()) < num_workers) { + LOG(FATAL) << "Insufficient number of specified CPU workers in MLC_DISCO_WORKER_CPU_BINDING, " + "expecting at least " + << num_workers << "CPU ids but only " << cpu_ids.size() << " are given."; + } + + return IntTuple{cpu_ids}; +} + PackedFunc FunctionTable::SessionFuncAsPackedFunc(Session sess, DRef sess_func, String name) { return PackedFunc([sess, func = std::move(sess_func), name = std::move(name)]( TVMArgs args, TVMRetValue* rv) -> void { @@ -100,6 +127,10 @@ void FunctionTable::Init(TVMArgValue reload_lib, Device device, picojson::object } return SessionFuncAsPackedFunc(sess, func, name); }; + if (Optional cpu_ids = GetDiscoWorkerCPUBinding(/*num_workers=*/num_shards)) { + IntTuple cpu_ids_value = cpu_ids.value(); + sess->CallPacked(sess->GetGlobalFunc("runtime.disco.bind_worker_to_cpu_core"), cpu_ids_value); + } this->get_global_func = [this](const std::string& name) -> PackedFunc { return SessionFuncAsPackedFunc(sess, sess->GetGlobalFunc(name), name); }; diff --git a/cpp/support/utils.h b/cpp/support/utils.h new file mode 100644 index 0000000000..5360f0496c --- /dev/null +++ b/cpp/support/utils.h @@ -0,0 +1,24 @@ +/*! + * Copyright (c) 2023 by Contributors + * \file utils.h + * \brief Utility functions. + */ +#include +#include +#include + +namespace mlc { +namespace llm { + +inline std::vector Split(const std::string& str, char delim) { + std::string item; + std::istringstream is(str); + std::vector ret; + while (std::getline(is, item, delim)) { + ret.push_back(item); + } + return ret; +} + +} // namespace llm +} // namespace mlc