Skip to content

Commit

Permalink
[Disco] Set worker CPU affinity with env variable
Browse files Browse the repository at this point in the history
This PR enables setting the CPU affinity of disco workers in
MLC, following the support in apache/tvm#16807. The purpose is
to try reduce the CPU core switch overhead brought to disco workers
which may cause extra bubble times in disco workers before/during
tasks.

We use a macro `MLC_DISCO_WORKER_CPU_BINDING` to specify the CPU
affinities of workers. This is by default not used. To enable it,
you can run the command like

```shell
MLC_DISCO_WORKER_CPU_BINDING=64,65,66,67 python some_mlc_app.py
```

to specify the four CPU core ids for the four workers.
  • Loading branch information
MasterJH5574 committed Apr 1, 2024
1 parent 8a82f93 commit 7c281c9
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 0 deletions.
31 changes: 31 additions & 0 deletions cpp/serve/function_table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,44 @@
#include <tvm/runtime/packed_func.h>
#include <tvm/runtime/registry.h>

#include <cstdlib>
#include <filesystem>
#include <string>
#include <vector>

#include "../support/load_bytes_from_file.h"
#include "../support/utils.h"
#include "sampler/sampler.h"

namespace mlc {
namespace llm {
namespace serve {

Optional<IntTuple> GetDiscoWorkerCPUBinding(int num_workers) {
const char* raw_cpu_binding = std::getenv("MLC_DISCO_WORKER_CPU_BINDING");
if (raw_cpu_binding == nullptr) {
return NullOpt;
}

std::string cpu_binding_str(raw_cpu_binding);
std::vector<std::string> cpu_ids_str = Split(cpu_binding_str, ',');
std::vector<int64_t> cpu_ids;
for (const std::string& cpu_id_str : cpu_ids_str) {
try {
cpu_ids.push_back(std::stol(cpu_id_str));
} catch (std::invalid_argument const& ex) {
LOG(FATAL) << "Invalid MLC_DISCO_WORKER_CPU_BINDING \"" << cpu_binding_str << "\"";
}
}
if (static_cast<int>(cpu_ids.size()) < num_workers) {
LOG(FATAL) << "Insufficient number of specified CPU workers in MLC_DISCO_WORKER_CPU_BINDING, "
"expecting at least "
<< num_workers << "CPU ids but only " << cpu_ids.size() << " are given.";
}

return IntTuple{cpu_ids};
}

PackedFunc FunctionTable::SessionFuncAsPackedFunc(Session sess, DRef sess_func, String name) {
return PackedFunc([sess, func = std::move(sess_func), name = std::move(name)](
TVMArgs args, TVMRetValue* rv) -> void {
Expand Down Expand Up @@ -100,6 +127,10 @@ void FunctionTable::Init(TVMArgValue reload_lib, Device device, picojson::object
}
return SessionFuncAsPackedFunc(sess, func, name);
};
if (Optional<IntTuple> cpu_ids = GetDiscoWorkerCPUBinding(/*num_workers=*/num_shards)) {
IntTuple cpu_ids_value = cpu_ids.value();
sess->CallPacked(sess->GetGlobalFunc("runtime.disco.bind_worker_to_cpu_core"), cpu_ids_value);
}
this->get_global_func = [this](const std::string& name) -> PackedFunc {
return SessionFuncAsPackedFunc(sess, sess->GetGlobalFunc(name), name);
};
Expand Down
24 changes: 24 additions & 0 deletions cpp/support/utils.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*!
* Copyright (c) 2023 by Contributors
* \file utils.h
* \brief Utility functions.
*/
#include <sstream>
#include <string>
#include <vector>

namespace mlc {
namespace llm {

inline std::vector<std::string> Split(const std::string& str, char delim) {
std::string item;
std::istringstream is(str);
std::vector<std::string> ret;
while (std::getline(is, item, delim)) {
ret.push_back(item);
}
return ret;
}

} // namespace llm
} // namespace mlc

0 comments on commit 7c281c9

Please sign in to comment.