Skip to content

Commit

Permalink
GH-1662 Reduce the number of thread hops
Browse files Browse the repository at this point in the history
  • Loading branch information
heifner committed Sep 30, 2023
1 parent 25bb157 commit ac4684a
Show file tree
Hide file tree
Showing 8 changed files with 60 additions and 22 deletions.
31 changes: 28 additions & 3 deletions libraries/custom_appbase/include/eosio/chain/application.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,38 @@ class priority_queue_executor {
static constexpr uint16_t minimum_runtime_ms = 3;

// inform how many read_threads will be calling read_only/read_exclusive queues
// Currently only used to assert if exec_queue::read_exclusive is used without any read threads
// expected to only be called at program startup, not thread safe, not safe to call after startup
void init_read_threads(size_t num_read_threads) {
pri_queue_.init_read_threads(num_read_threads);
}

// not thread safe, see init_read_threads comment
size_t get_read_threads() const {
return pri_queue_.get_read_threads();
}

// not thread safe, only call at program startup from the main thread (thread that calls app().exec()
void init_main_thread_id() {
main_thread_id_ = std::this_thread::get_id();
}

// not thread safe, but as long as set_main_thread_id() only called at program startup from main thread before
// other threads are created then this will be safe to access.
std::thread::id get_main_thread_id() const {
assert(main_thread_id_ != std::thread::id());
return main_thread_id_;
}

template <typename Func>
auto post( int priority, exec_queue q, Func&& func ) {
return boost::asio::post( io_serv_, pri_queue_.wrap( priority, q, --order_, std::forward<Func>(func)));
void post( int priority, exec_queue q, Func&& func ) {
if (q == exec_queue::read_exclusive) {
// no reason to post to io_service which then places this in the read_exclusive_handlers queue.
// read_exclusive tasks are run exclusively by read threads by pulling off the read_exclusive handlers queue.
pri_queue_.add(priority, q, --order_, std::forward<Func>(func));
} else {
// post to io_service as the main thread may be blocked on io_service.run_one() in application::exec()
boost::asio::post(io_serv_, pri_queue_.wrap(priority, q, --order_, std::forward<Func>(func)));
}
}

// Legacy and deprecated. To be removed after cleaning up its uses in base appbase
Expand Down Expand Up @@ -125,6 +149,7 @@ class priority_queue_executor {

// members are ordered taking into account that the last one is destructed first
private:
std::thread::id main_thread_id_;
boost::asio::io_service io_serv_;
appbase::exec_pri_queue pri_queue_;
std::atomic<std::size_t> order_{ std::numeric_limits<size_t>::max() }; // to maintain FIFO ordering in all queues within priority
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,17 @@ class exec_pri_queue : public boost::asio::execution_context
public:

// inform how many read_threads will be calling read_only/read_exclusive queues
// expected to only be called at program startup, not thread safe, not safe to call when lock_enabled_
void init_read_threads(size_t num_read_threads) {
assert(!lock_enabled_);
num_read_threads_ = num_read_threads;
}

// not strictly thread safe, see init_read_threads comment
size_t get_read_threads() const {
return num_read_threads_;
}

void stop() {
std::lock_guard g( mtx_ );
exiting_blocking_ = true;
Expand All @@ -60,7 +67,7 @@ class exec_pri_queue : public boost::asio::execution_context
assert( num_read_threads_ > 0 || q != exec_queue::read_exclusive);
prio_queue& que = priority_que(q);
std::unique_ptr<queued_handler_base> handler(new queued_handler<Function>(priority, order, std::move(function)));
if (lock_enabled_) {
if (lock_enabled_ || q == exec_queue::read_exclusive) { // called directly from any thread for read_exclusive
std::lock_guard g( mtx_ );
que.push( std::move( handler ) );
if (num_waiting_)
Expand Down
4 changes: 2 additions & 2 deletions libraries/custom_appbase/tests/custom_appbase_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ std::thread start_app_thread(appbase::scoped_app& app) {
BOOST_CHECK(app->initialize(sizeof(argv) / sizeof(char*), const_cast<char**>(argv)));
app->startup();
std::thread app_thread( [&]() {
app->executor().init_main_thread_id();
app->exec();
} );
return app_thread;
Expand Down Expand Up @@ -343,7 +344,6 @@ BOOST_AUTO_TEST_CASE( execute_many_from_read_only_and_read_exclusive_queues ) {
appbase::scoped_app app;

auto app_thread = start_app_thread(app);
std::thread::id app_thread_id = app_thread.get_id();

// set to run functions from read_only & read_exclusive queues only
app->executor().init_read_threads(3);
Expand Down Expand Up @@ -408,7 +408,7 @@ BOOST_AUTO_TEST_CASE( execute_many_from_read_only_and_read_exclusive_queues ) {
const auto run_on_1 = std::count_if(rslts.cbegin(), rslts.cend(), [&](const auto& v){ return v == read_thread1_id; });
const auto run_on_2 = std::count_if(rslts.cbegin(), rslts.cend(), [&](const auto& v){ return v == read_thread2_id; });
const auto run_on_3 = std::count_if(rslts.cbegin(), rslts.cend(), [&](const auto& v){ return v == read_thread3_id; });
const auto run_on_main = std::count_if(rslts.cbegin(), rslts.cend(), [&](const auto& v){ return v == app_thread_id; });
const auto run_on_main = std::count_if(rslts.cbegin(), rslts.cend(), [&](const auto& v){ return v == app->executor().get_main_thread_id(); });

BOOST_REQUIRE_EQUAL(run_on_1+run_on_2+run_on_3+run_on_main, num_expected);
BOOST_CHECK(run_on_1 > 0);
Expand Down
3 changes: 2 additions & 1 deletion plugins/chain_api_plugin/chain_api_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,6 @@ void chain_api_plugin::plugin_startup() {
CHAIN_RO_CALL(get_required_keys, 200, http_params_types::params_required),
CHAIN_RO_CALL(get_transaction_id, 200, http_params_types::params_required),
// transaction related APIs will be posted to read_write queue after keys are recovered, they are safe to run in parallel until they post to the read_write queue
CHAIN_RO_CALL_ASYNC(send_read_only_transaction, chain_apis::read_only::send_read_only_transaction_results, 200, http_params_types::params_required),
CHAIN_RO_CALL_ASYNC(compute_transaction, chain_apis::read_only::compute_transaction_results, 200, http_params_types::params_required),
CHAIN_RW_CALL_ASYNC(push_transaction, chain_apis::read_write::push_transaction_results, 202, http_params_types::params_required),
CHAIN_RW_CALL_ASYNC(push_transactions, chain_apis::read_write::push_transactions_results, 202, http_params_types::params_required),
Expand All @@ -170,6 +169,8 @@ void chain_api_plugin::plugin_startup() {
}

_http_plugin.add_async_api({
// chain_plugin send_read_only_transaction will post to read_exclusive queue
CHAIN_RO_CALL_ASYNC(send_read_only_transaction, chain_apis::read_only::send_read_only_transaction_results, 200, http_params_types::params_required),
CHAIN_RO_CALL_WITH_400(get_raw_block, 200, http_params_types::params_required),
CHAIN_RO_CALL_WITH_400(get_block_header, 200, http_params_types::params_required)
});
Expand Down
9 changes: 8 additions & 1 deletion plugins/chain_plugin/chain_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2567,12 +2567,19 @@ void read_only::compute_transaction(compute_transaction_params params, next_func
}

void read_only::send_read_only_transaction(send_read_only_transaction_params params, next_function<send_read_only_transaction_results> next) {
static bool read_only_enabled = app().executor().get_read_threads() > 0;
EOS_ASSERT( read_only_enabled, unsupported_feature,
"read-only transactions execution not enabled on API node. Set read-only-threads > 0" );

send_transaction_params_t gen_params { .return_failure_trace = false,
.retry_trx = false,
.retry_trx_num_blocks = std::nullopt,
.trx_type = transaction_metadata::trx_type::read_only,
.transaction = std::move(params.transaction) };
return send_transaction_gen(*this, std::move(gen_params), std::move(next));
// run read-only trx exclusively on read-only threads
app().executor().post(priority::low, exec_queue::read_exclusive, [this, gen_params{std::move(gen_params)}, next{std::move(next)}]() mutable {
send_transaction_gen(*this, std::move(gen_params), std::move(next));
});
}

read_only::get_transaction_id_result read_only::get_transaction_id( const read_only::get_transaction_id_params& params, const fc::time_point& ) const {
Expand Down
6 changes: 3 additions & 3 deletions plugins/http_plugin/include/eosio/http_plugin/common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ struct http_plugin_state {
*/
inline auto make_http_response_handler(http_plugin_state& plugin_state, detail::abstract_conn_ptr session_ptr, http_content_type content_type) {
return [&plugin_state,
session_ptr{std::move(session_ptr)}, content_type](int code, std::optional<fc::variant> response) {
session_ptr{std::move(session_ptr)}, content_type](int code, std::optional<fc::variant> response) mutable {
auto payload_size = detail::in_flight_sizeof(response);
if(auto error_str = session_ptr->verify_max_bytes_in_flight(payload_size); !error_str.empty()) {
session_ptr->send_busy_response(std::move(error_str));
Expand All @@ -164,8 +164,8 @@ inline auto make_http_response_handler(http_plugin_state& plugin_state, detail::
plugin_state.bytes_in_flight += payload_size;

// post back to an HTTP thread to allow the response handler to be called from any thread
boost::asio::post(plugin_state.thread_pool.get_executor(),
[&plugin_state, session_ptr, code, payload_size, response = std::move(response), content_type]() {
boost::asio::dispatch(plugin_state.thread_pool.get_executor(),
[&plugin_state, session_ptr{std::move(session_ptr)}, code, payload_size, response = std::move(response), content_type]() {
try {
plugin_state.bytes_in_flight -= payload_size;
if (response.has_value()) {
Expand Down
19 changes: 8 additions & 11 deletions plugins/producer_plugin/producer_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -774,14 +774,12 @@ class producer_plugin_impl : public std::enable_shared_from_this<producer_plugin
bool return_failure_traces,
next_function<transaction_trace_ptr> next) {
if (trx_type == transaction_metadata::trx_type::read_only) {
EOS_ASSERT( _ro_thread_pool_size > 0, unsupported_feature,
"read-only transactions execution not enabled on API node. Set read-only-threads > 0" );
assert(_ro_thread_pool_size > 0); // enforced by chain_plugin
assert(app().executor().get_main_thread_id() != std::this_thread::get_id()); // should only be called from read only threads

// Post all read only trxs to read_exclusive queue for execution.
auto trx_metadata = transaction_metadata::create_no_recover_keys(trx, transaction_metadata::trx_type::read_only);
app().executor().post(priority::low, exec_queue::read_exclusive, [this, trx{std::move(trx_metadata)}, next{std::move(next)}]() mutable {
push_read_only_transaction(std::move(trx), std::move(next));
});
push_read_only_transaction(std::move(trx_metadata), std::move(next));
return;
}

Expand Down Expand Up @@ -2659,11 +2657,8 @@ void producer_plugin::log_failed_transaction(const transaction_id_type& trx_i

// Called from only one read_only thread
void producer_plugin_impl::switch_to_write_window() {
if (_log.is_enabled(fc::log_level::debug)) {
auto now = fc::time_point::now();
fc_dlog(_log, "Read-only threads ${n}, read window ${r}us, total all threads ${t}us",
("n", _ro_thread_pool_size)("r", now - _ro_read_window_start_time)("t", _ro_all_threads_exec_time_us.load()));
}
fc_ilog(_log, "Read-only threads ${n}, read window ${r}us, total all threads ${t}us",
("n", _ro_thread_pool_size)("r", fc::time_point::now() - _ro_read_window_start_time)("t", _ro_all_threads_exec_time_us.load()));

chain::controller& chain = chain_plug->chain();

Expand Down Expand Up @@ -2714,10 +2709,12 @@ void producer_plugin_impl::switch_to_read_window() {

// we are in write window, so no read-only trx threads are processing transactions.
app().get_io_service().poll(); // make sure we schedule any ready
if (app().executor().read_only_queue_empty() && app().executor().read_exclusive_queue_empty()) { // no read-only tasks to process. stay in write window
if (app().executor().read_exclusive_queue_empty() && app().executor().read_only_queue_empty()) { // no read-only tasks to process. stay in write window
start_write_window(); // restart write window timer for next round
return;
}
fc_dlog(_log, "Read only queue size ${s1}, read exclusive size ${s2}",
("s1", app().executor().read_only_queue_size())("s2", app().executor().read_exclusive_queue_size()));

uint32_t pending_block_num = chain.head_block_num() + 1;
_ro_read_window_start_time = fc::time_point::now();
Expand Down
1 change: 1 addition & 0 deletions programs/nodeos/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ int main(int argc, char** argv)
ilog("${name} using configuration file ${c}", ("name", nodeos::config::node_executable_name)("c", app->full_config_file_path().string()));
ilog("${name} data directory is ${d}", ("name", nodeos::config::node_executable_name)("d", app->data_dir().string()));
::detail::log_non_default_options(app->get_parsed_options());
app->executor().init_main_thread_id();
app->startup();
app->set_thread_priority_max();
app->exec();
Expand Down

0 comments on commit ac4684a

Please sign in to comment.