Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix single row prediction performance in a multi-threaded environment #6024

Merged
merged 25 commits into from
Mar 18, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
e8c316f
Fix single row prediction performance in a multi-threaded environment
Ten0 Aug 6, 2023
fe31d4e
fix missing file change
Ten0 Aug 6, 2023
feaf3dc
fix lint
Ten0 Aug 7, 2023
f41755b
check whether freeze is due to booster shared lock being held by the …
Ten0 Aug 7, 2023
c52a7d7
what you get for having everything as an int
Ten0 Aug 7, 2023
601316b
Merge branch 'master' into 6021-fix_single_row_contention
shiyu1994 Aug 15, 2023
f4cf79d
Merge branch 'master' into 6021-fix_single_row_contention
jameslamb Sep 3, 2023
c7e9d2e
Merge branch 'master' into 6021-fix_single_row_contention
jameslamb Sep 6, 2023
6d949a0
Add TODO about FastConfig naming needing to be updated
Ten0 Jan 11, 2024
1fcbc3f
Add a test showcasing fast single row prediction in multi-thread
Ten0 Jan 11, 2024
4a80c8c
Add a workaround for #6142
Ten0 Jan 12, 2024
a864333
Merge branch '6021-fix_single_row_contention_v4' into 6021-fix_single…
Ten0 Jan 12, 2024
6db33b3
Merge branch 'master' into 6021-fix_single_row_contention
Ten0 Jan 12, 2024
49d7217
add more cleanup comment
Ten0 Jan 12, 2024
1a4f1aa
cleanup some unnecessary includes in the test
Ten0 Jan 12, 2024
bf5fc5f
Merge branch '6021-fix_single_row_contention_v4' into 6021-fix_single…
Ten0 Jan 12, 2024
e00191a
make windows cpp compiler happy
Ten0 Jan 12, 2024
46ca8e6
Merge branch '6021-fix_single_row_contention_v4' into 6021-fix_single…
Ten0 Jan 12, 2024
bfefc20
hopefully make static analysis happy
Ten0 Jan 22, 2024
e0d7913
Merge branch '6021-fix_single_row_contention_v4' into 6021-fix_single…
Ten0 Jan 22, 2024
ff613ad
Turn TODO into a regular comment
Ten0 Jan 24, 2024
063008a
Merge branch '6021-fix_single_row_contention_v4' into 6021-fix_single…
Ten0 Jan 24, 2024
a01916a
Merge remote-tracking branch 'upstream/master' into 6021-fix_single_r…
Ten0 Jan 26, 2024
99140fa
Merge branch 'master' into 6021-fix_single_row_contention
jameslamb Feb 3, 2024
4e29833
Merge branch 'master' into 6021-fix_single_row_contention
jameslamb Mar 18, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions include/LightGBM/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ const int kDefaultNumLeaves = 31;

struct Config {
public:
Config() {}
explicit Config(std::unordered_map<std::string, std::string> parameters_map) {
Set(parameters_map);
}
std::string ToString() const;
/*!
* \brief Get string value by specific name of key
Expand Down
145 changes: 85 additions & 60 deletions src/c_api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,12 @@ yamc::shared_lock<yamc::alternate::shared_mutex> lock(&mtx);
const int PREDICTOR_TYPES = 4;

// Single row predictor to abstract away caching logic
class SingleRowPredictor {
class SingleRowPredictorInner {
public:
PredictFunction predict_function;
int64_t num_pred_in_one_row;

SingleRowPredictor(int predict_type, Boosting* boosting, const Config& config, int start_iter, int num_iter) {
SingleRowPredictorInner(int predict_type, Boosting* boosting, const Config& config, int start_iter, int num_iter) {
bool is_predict_leaf = false;
bool is_raw_score = false;
bool predict_contrib = false;
Expand All @@ -85,7 +85,7 @@ class SingleRowPredictor {
num_total_model_ = boosting->NumberOfTotalModel();
}

~SingleRowPredictor() {}
~SingleRowPredictorInner() {}

bool IsPredictorEqual(const Config& config, int iter, Boosting* boosting) {
return early_stop_ == config.pred_early_stop &&
Expand All @@ -104,6 +104,60 @@ class SingleRowPredictor {
int num_total_model_;
};

/*!
* \brief Object to store resources meant for single-row Fast Predict methods.
*
* For legacy reasons this is called `FastConfig` in the public C API.
*
* Meant to be used by the *Fast* predict methods only.
* It stores the configuration and prediction resources for reuse across predictions.
*/
struct SingleRowPredictor {
public:
SingleRowPredictor(yamc::alternate::shared_mutex *booster_mutex,
const char *parameters,
const int data_type,
const int32_t num_cols,
int predict_type,
Boosting *boosting,
int start_iter,
int num_iter) : config(Config::Str2Map(parameters)), data_type(data_type), num_cols(num_cols), single_row_predictor_inner(predict_type, boosting, config, start_iter, num_iter), booster_mutex(booster_mutex) {
if (!config.predict_disable_shape_check && num_cols != boosting->MaxFeatureIdx() + 1) {
Log::Fatal("The number of features in data (%d) is not the same as it was in training data (%d).\n"\
"You can set ``predict_disable_shape_check=true`` to discard this error, but please be aware what you are doing.", num_cols, boosting->MaxFeatureIdx() + 1);
}
}

void Predict(std::function<std::vector<std::pair<int, double>>(int row_idx)> get_row_fun,
double* out_result, int64_t* out_len) const {
UNIQUE_LOCK(single_row_predictor_mutex)
yamc::shared_lock<yamc::alternate::shared_mutex> booster_shared_lock(booster_mutex);

auto one_row = get_row_fun(0);
single_row_predictor_inner.predict_function(one_row, out_result);

*out_len = single_row_predictor_inner.num_pred_in_one_row;
}

public:
Config config;
const int data_type;
const int32_t num_cols;

private:
SingleRowPredictorInner single_row_predictor_inner;

// Prevent the booster from being modified while we have a predictor relying on it during prediction
yamc::alternate::shared_mutex *booster_mutex;

// If several threads try to predict at the same time using the same SingleRowPredictor
// we want them to still provide correct values, so the mutex is necessary due to the shared
// resources in the predictor.
// However the recommended approach is to instantiate one SingleRowPredictor per thread,
// to avoid contention here.
mutable yamc::alternate::shared_mutex single_row_predictor_mutex;
};

class Booster {
public:
explicit Booster(const char* filename) {
Expand Down Expand Up @@ -373,15 +427,20 @@ class Booster {
boosting_->RollbackOneIter();
}

void SetSingleRowPredictor(int start_iteration, int num_iteration, int predict_type, const Config& config) {
void SetSingleRowPredictorInner(int start_iteration, int num_iteration, int predict_type, const Config& config) {
UNIQUE_LOCK(mutex_)
if (single_row_predictor_[predict_type].get() == nullptr ||
!single_row_predictor_[predict_type]->IsPredictorEqual(config, num_iteration, boosting_.get())) {
single_row_predictor_[predict_type].reset(new SingleRowPredictor(predict_type, boosting_.get(),
single_row_predictor_[predict_type].reset(new SingleRowPredictorInner(predict_type, boosting_.get(),
config, start_iteration, num_iteration));
}
}

std::unique_ptr<SingleRowPredictor> InitSingleRowPredictor(int predict_type, int start_iteration, int num_iteration, int data_type, int32_t num_cols, const char *parameters) {
return std::unique_ptr<SingleRowPredictor>(new SingleRowPredictor(
&mutex_, parameters, data_type, num_cols, predict_type, boosting_.get(), start_iteration, num_iteration));
}

void PredictSingleRow(int predict_type, int ncol,
std::function<std::vector<std::pair<int, double>>(int row_idx)> get_row_fun,
const Config& config,
Expand Down Expand Up @@ -814,7 +873,7 @@ class Booster {
private:
const Dataset* train_data_;
std::unique_ptr<Boosting> boosting_;
std::unique_ptr<SingleRowPredictor> single_row_predictor_[PREDICTOR_TYPES];
std::unique_ptr<SingleRowPredictorInner> single_row_predictor_[PREDICTOR_TYPES];

/*! \brief All configs */
Config config_;
Expand Down Expand Up @@ -847,6 +906,7 @@ using LightGBM::Log;
using LightGBM::Network;
using LightGBM::Random;
using LightGBM::ReduceScatterFunction;
using LightGBM::SingleRowPredictor;

// some help functions used to convert data

Expand Down Expand Up @@ -2053,35 +2113,11 @@ int LGBM_BoosterCalcNumPredict(BoosterHandle handle,
API_END();
}

/*!
* \brief Object to store resources meant for single-row Fast Predict methods.
*
* Meant to be used as a basic struct by the *Fast* predict methods only.
* It stores the configuration resources for reuse during prediction.
*
* Even the row function is stored. We score the instance at the same memory
* address all the time. One just replaces the feature values at that address
* and scores again with the *Fast* methods.
*/
struct FastConfig {
FastConfig(Booster *const booster_ptr,
const char *parameter,
const int predict_type_,
const int data_type_,
const int32_t num_cols) : booster(booster_ptr), predict_type(predict_type_), data_type(data_type_), ncol(num_cols) {
config.Set(Config::Str2Map(parameter));
}

Booster* const booster;
Config config;
const int predict_type;
const int data_type;
const int32_t ncol;
};

int LGBM_FastConfigFree(FastConfigHandle fastConfig) {
Ten0 marked this conversation as resolved.
Show resolved Hide resolved
API_BEGIN();
delete reinterpret_cast<FastConfig*>(fastConfig);
delete reinterpret_cast<SingleRowPredictor*>(fastConfig);
API_END();
}

Expand Down Expand Up @@ -2229,7 +2265,7 @@ int LGBM_BoosterPredictForCSRSingleRow(BoosterHandle handle,
OMP_SET_NUM_THREADS(config.num_threads);
Booster* ref_booster = reinterpret_cast<Booster*>(handle);
auto get_row_fun = RowFunctionFromCSR<int>(indptr, indptr_type, indices, data, data_type, nindptr, nelem);
ref_booster->SetSingleRowPredictor(start_iteration, num_iteration, predict_type, config);
ref_booster->SetSingleRowPredictorInner(start_iteration, num_iteration, predict_type, config);
ref_booster->PredictSingleRow(predict_type, static_cast<int32_t>(num_col), get_row_fun, config, out_result, out_len);
API_END();
}
Expand All @@ -2249,18 +2285,14 @@ int LGBM_BoosterPredictForCSRSingleRowFastInit(BoosterHandle handle,
Log::Fatal("The number of columns should be smaller than INT32_MAX.");
}

auto fastConfig_ptr = std::unique_ptr<FastConfig>(new FastConfig(
reinterpret_cast<Booster*>(handle),
parameter,
predict_type,
data_type,
static_cast<int32_t>(num_col)));
Booster* ref_booster = reinterpret_cast<Booster*>(handle);

OMP_SET_NUM_THREADS(fastConfig_ptr->config.num_threads);
std::unique_ptr<SingleRowPredictor> single_row_predictor =
ref_booster->InitSingleRowPredictor(start_iteration, num_iteration, predict_type, data_type, static_cast<int32_t>(num_col), parameter);

fastConfig_ptr->booster->SetSingleRowPredictor(start_iteration, num_iteration, predict_type, fastConfig_ptr->config);
OMP_SET_NUM_THREADS(single_row_predictor->config.num_threads);

*out_fastConfig = fastConfig_ptr.release();
*out_fastConfig = single_row_predictor.release();
API_END();
}

Expand All @@ -2274,10 +2306,9 @@ int LGBM_BoosterPredictForCSRSingleRowFast(FastConfigHandle fastConfig_handle,
int64_t* out_len,
double* out_result) {
API_BEGIN();
FastConfig *fastConfig = reinterpret_cast<FastConfig*>(fastConfig_handle);
auto get_row_fun = RowFunctionFromCSR<int>(indptr, indptr_type, indices, data, fastConfig->data_type, nindptr, nelem);
fastConfig->booster->PredictSingleRow(fastConfig->predict_type, fastConfig->ncol,
get_row_fun, fastConfig->config, out_result, out_len);
SingleRowPredictor *single_row_predictor = reinterpret_cast<SingleRowPredictor*>(fastConfig_handle);
auto get_row_fun = RowFunctionFromCSR<int>(indptr, indptr_type, indices, data, single_row_predictor->data_type, nindptr, nelem);
single_row_predictor->Predict(get_row_fun, out_result, out_len);
API_END();
}

Expand Down Expand Up @@ -2392,7 +2423,7 @@ int LGBM_BoosterPredictForMatSingleRow(BoosterHandle handle,
OMP_SET_NUM_THREADS(config.num_threads);
Booster* ref_booster = reinterpret_cast<Booster*>(handle);
auto get_row_fun = RowPairFunctionFromDenseMatric(data, 1, ncol, data_type, is_row_major);
ref_booster->SetSingleRowPredictor(start_iteration, num_iteration, predict_type, config);
ref_booster->SetSingleRowPredictorInner(start_iteration, num_iteration, predict_type, config);
ref_booster->PredictSingleRow(predict_type, ncol, get_row_fun, config, out_result, out_len);
API_END();
}
Expand All @@ -2406,18 +2437,14 @@ int LGBM_BoosterPredictForMatSingleRowFastInit(BoosterHandle handle,
const char* parameter,
FastConfigHandle *out_fastConfig) {
API_BEGIN();
auto fastConfig_ptr = std::unique_ptr<FastConfig>(new FastConfig(
reinterpret_cast<Booster*>(handle),
parameter,
predict_type,
data_type,
ncol));
Booster* ref_booster = reinterpret_cast<Booster*>(handle);

OMP_SET_NUM_THREADS(fastConfig_ptr->config.num_threads);
std::unique_ptr<SingleRowPredictor> single_row_predictor =
ref_booster->InitSingleRowPredictor(predict_type, start_iteration, num_iteration, data_type, ncol, parameter);

fastConfig_ptr->booster->SetSingleRowPredictor(start_iteration, num_iteration, predict_type, fastConfig_ptr->config);
OMP_SET_NUM_THREADS(single_row_predictor->config.num_threads);

*out_fastConfig = fastConfig_ptr.release();
*out_fastConfig = single_row_predictor.release();
API_END();
}

Expand All @@ -2426,12 +2453,10 @@ int LGBM_BoosterPredictForMatSingleRowFast(FastConfigHandle fastConfig_handle,
int64_t* out_len,
double* out_result) {
API_BEGIN();
FastConfig *fastConfig = reinterpret_cast<FastConfig*>(fastConfig_handle);
SingleRowPredictor *single_row_predictor = reinterpret_cast<SingleRowPredictor*>(fastConfig_handle);
// Single row in row-major format:
auto get_row_fun = RowPairFunctionFromDenseMatric(data, 1, fastConfig->ncol, fastConfig->data_type, 1);
fastConfig->booster->PredictSingleRow(fastConfig->predict_type, fastConfig->ncol,
get_row_fun, fastConfig->config,
out_result, out_len);
auto get_row_fun = RowPairFunctionFromDenseMatric(data, 1, single_row_predictor->num_cols, single_row_predictor->data_type, 1);
single_row_predictor->Predict(get_row_fun, out_result, out_len);
API_END();
}

Expand Down