diff --git a/libraries/custom_appbase/include/eosio/chain/application.hpp b/libraries/custom_appbase/include/eosio/chain/application.hpp index 1d510a996d..bbe76aa3d7 100644 --- a/libraries/custom_appbase/include/eosio/chain/application.hpp +++ b/libraries/custom_appbase/include/eosio/chain/application.hpp @@ -30,14 +30,38 @@ class priority_queue_executor { static constexpr uint16_t minimum_runtime_ms = 3; // inform how many read_threads will be calling read_only/read_exclusive queues - // Currently only used to assert if exec_queue::read_exclusive is used without any read threads + // expected to only be called at program startup, not thread safe, not safe to call after startup void init_read_threads(size_t num_read_threads) { pri_queue_.init_read_threads(num_read_threads); } + // not thread safe, see init_read_threads comment + size_t get_read_threads() const { + return pri_queue_.get_read_threads(); + } + + // not thread safe, only call at program startup from the main thread (thread that calls app().exec() + void init_main_thread_id() { + main_thread_id_ = std::this_thread::get_id(); + } + + // not thread safe, but as long as set_main_thread_id() only called at program startup from main thread before + // other threads are created then this will be safe to access. + std::thread::id get_main_thread_id() const { + assert(main_thread_id_ != std::thread::id()); + return main_thread_id_; + } + template - auto post( int priority, exec_queue q, Func&& func ) { - return boost::asio::post( io_serv_, pri_queue_.wrap( priority, q, --order_, std::forward(func))); + void post( int priority, exec_queue q, Func&& func ) { + if (q == exec_queue::read_exclusive) { + // no reason to post to io_service which then places this in the read_exclusive_handlers queue. + // read_exclusive tasks are run exclusively by read threads by pulling off the read_exclusive handlers queue. + pri_queue_.add(priority, q, --order_, std::forward(func)); + } else { + // post to io_service as the main thread may be blocked on io_service.run_one() in application::exec() + boost::asio::post(io_serv_, pri_queue_.wrap(priority, q, --order_, std::forward(func))); + } } // Legacy and deprecated. To be removed after cleaning up its uses in base appbase @@ -125,6 +149,7 @@ class priority_queue_executor { // members are ordered taking into account that the last one is destructed first private: + std::thread::id main_thread_id_; boost::asio::io_service io_serv_; appbase::exec_pri_queue pri_queue_; std::atomic order_{ std::numeric_limits::max() }; // to maintain FIFO ordering in all queues within priority diff --git a/libraries/custom_appbase/include/eosio/chain/exec_pri_queue.hpp b/libraries/custom_appbase/include/eosio/chain/exec_pri_queue.hpp index 15f7664eab..6ef03f4b73 100644 --- a/libraries/custom_appbase/include/eosio/chain/exec_pri_queue.hpp +++ b/libraries/custom_appbase/include/eosio/chain/exec_pri_queue.hpp @@ -31,10 +31,17 @@ class exec_pri_queue : public boost::asio::execution_context public: // inform how many read_threads will be calling read_only/read_exclusive queues + // expected to only be called at program startup, not thread safe, not safe to call when lock_enabled_ void init_read_threads(size_t num_read_threads) { + assert(!lock_enabled_); num_read_threads_ = num_read_threads; } + // not strictly thread safe, see init_read_threads comment + size_t get_read_threads() const { + return num_read_threads_; + } + void stop() { std::lock_guard g( mtx_ ); exiting_blocking_ = true; @@ -60,7 +67,7 @@ class exec_pri_queue : public boost::asio::execution_context assert( num_read_threads_ > 0 || q != exec_queue::read_exclusive); prio_queue& que = priority_que(q); std::unique_ptr handler(new queued_handler(priority, order, std::move(function))); - if (lock_enabled_) { + if (lock_enabled_ || q == exec_queue::read_exclusive) { // called directly from any thread for read_exclusive std::lock_guard g( mtx_ ); que.push( std::move( handler ) ); if (num_waiting_) diff --git a/libraries/custom_appbase/tests/custom_appbase_tests.cpp b/libraries/custom_appbase/tests/custom_appbase_tests.cpp index f574a89b9f..1e12f85796 100644 --- a/libraries/custom_appbase/tests/custom_appbase_tests.cpp +++ b/libraries/custom_appbase/tests/custom_appbase_tests.cpp @@ -17,6 +17,7 @@ std::thread start_app_thread(appbase::scoped_app& app) { BOOST_CHECK(app->initialize(sizeof(argv) / sizeof(char*), const_cast(argv))); app->startup(); std::thread app_thread( [&]() { + app->executor().init_main_thread_id(); app->exec(); } ); return app_thread; @@ -343,7 +344,6 @@ BOOST_AUTO_TEST_CASE( execute_many_from_read_only_and_read_exclusive_queues ) { appbase::scoped_app app; auto app_thread = start_app_thread(app); - std::thread::id app_thread_id = app_thread.get_id(); // set to run functions from read_only & read_exclusive queues only app->executor().init_read_threads(3); @@ -408,7 +408,7 @@ BOOST_AUTO_TEST_CASE( execute_many_from_read_only_and_read_exclusive_queues ) { const auto run_on_1 = std::count_if(rslts.cbegin(), rslts.cend(), [&](const auto& v){ return v == read_thread1_id; }); const auto run_on_2 = std::count_if(rslts.cbegin(), rslts.cend(), [&](const auto& v){ return v == read_thread2_id; }); const auto run_on_3 = std::count_if(rslts.cbegin(), rslts.cend(), [&](const auto& v){ return v == read_thread3_id; }); - const auto run_on_main = std::count_if(rslts.cbegin(), rslts.cend(), [&](const auto& v){ return v == app_thread_id; }); + const auto run_on_main = std::count_if(rslts.cbegin(), rslts.cend(), [&](const auto& v){ return v == app->executor().get_main_thread_id(); }); BOOST_REQUIRE_EQUAL(run_on_1+run_on_2+run_on_3+run_on_main, num_expected); BOOST_CHECK(run_on_1 > 0); diff --git a/plugins/chain_api_plugin/chain_api_plugin.cpp b/plugins/chain_api_plugin/chain_api_plugin.cpp index e862199ae0..ee9fadfdc4 100644 --- a/plugins/chain_api_plugin/chain_api_plugin.cpp +++ b/plugins/chain_api_plugin/chain_api_plugin.cpp @@ -150,7 +150,6 @@ void chain_api_plugin::plugin_startup() { CHAIN_RO_CALL(get_required_keys, 200, http_params_types::params_required), CHAIN_RO_CALL(get_transaction_id, 200, http_params_types::params_required), // transaction related APIs will be posted to read_write queue after keys are recovered, they are safe to run in parallel until they post to the read_write queue - CHAIN_RO_CALL_ASYNC(send_read_only_transaction, chain_apis::read_only::send_read_only_transaction_results, 200, http_params_types::params_required), CHAIN_RO_CALL_ASYNC(compute_transaction, chain_apis::read_only::compute_transaction_results, 200, http_params_types::params_required), CHAIN_RW_CALL_ASYNC(push_transaction, chain_apis::read_write::push_transaction_results, 202, http_params_types::params_required), CHAIN_RW_CALL_ASYNC(push_transactions, chain_apis::read_write::push_transactions_results, 202, http_params_types::params_required), @@ -170,6 +169,8 @@ void chain_api_plugin::plugin_startup() { } _http_plugin.add_async_api({ + // chain_plugin send_read_only_transaction will post to read_exclusive queue + CHAIN_RO_CALL_ASYNC(send_read_only_transaction, chain_apis::read_only::send_read_only_transaction_results, 200, http_params_types::params_required), CHAIN_RO_CALL_WITH_400(get_raw_block, 200, http_params_types::params_required), CHAIN_RO_CALL_WITH_400(get_block_header, 200, http_params_types::params_required) }); diff --git a/plugins/chain_plugin/chain_plugin.cpp b/plugins/chain_plugin/chain_plugin.cpp index e4beb113da..a6c7d97108 100644 --- a/plugins/chain_plugin/chain_plugin.cpp +++ b/plugins/chain_plugin/chain_plugin.cpp @@ -2567,12 +2567,19 @@ void read_only::compute_transaction(compute_transaction_params params, next_func } void read_only::send_read_only_transaction(send_read_only_transaction_params params, next_function next) { + static bool read_only_enabled = app().executor().get_read_threads() > 0; + EOS_ASSERT( read_only_enabled, unsupported_feature, + "read-only transactions execution not enabled on API node. Set read-only-threads > 0" ); + send_transaction_params_t gen_params { .return_failure_trace = false, .retry_trx = false, .retry_trx_num_blocks = std::nullopt, .trx_type = transaction_metadata::trx_type::read_only, .transaction = std::move(params.transaction) }; - return send_transaction_gen(*this, std::move(gen_params), std::move(next)); + // run read-only trx exclusively on read-only threads + app().executor().post(priority::low, exec_queue::read_exclusive, [this, gen_params{std::move(gen_params)}, next{std::move(next)}]() mutable { + send_transaction_gen(*this, std::move(gen_params), std::move(next)); + }); } read_only::get_transaction_id_result read_only::get_transaction_id( const read_only::get_transaction_id_params& params, const fc::time_point& ) const { diff --git a/plugins/http_plugin/include/eosio/http_plugin/common.hpp b/plugins/http_plugin/include/eosio/http_plugin/common.hpp index 5f12134086..090464f068 100644 --- a/plugins/http_plugin/include/eosio/http_plugin/common.hpp +++ b/plugins/http_plugin/include/eosio/http_plugin/common.hpp @@ -154,7 +154,7 @@ struct http_plugin_state { */ inline auto make_http_response_handler(http_plugin_state& plugin_state, detail::abstract_conn_ptr session_ptr, http_content_type content_type) { return [&plugin_state, - session_ptr{std::move(session_ptr)}, content_type](int code, std::optional response) { + session_ptr{std::move(session_ptr)}, content_type](int code, std::optional response) mutable { auto payload_size = detail::in_flight_sizeof(response); if(auto error_str = session_ptr->verify_max_bytes_in_flight(payload_size); !error_str.empty()) { session_ptr->send_busy_response(std::move(error_str)); @@ -164,8 +164,8 @@ inline auto make_http_response_handler(http_plugin_state& plugin_state, detail:: plugin_state.bytes_in_flight += payload_size; // post back to an HTTP thread to allow the response handler to be called from any thread - boost::asio::post(plugin_state.thread_pool.get_executor(), - [&plugin_state, session_ptr, code, payload_size, response = std::move(response), content_type]() { + boost::asio::dispatch(plugin_state.thread_pool.get_executor(), + [&plugin_state, session_ptr{std::move(session_ptr)}, code, payload_size, response = std::move(response), content_type]() { try { plugin_state.bytes_in_flight -= payload_size; if (response.has_value()) { diff --git a/plugins/producer_plugin/producer_plugin.cpp b/plugins/producer_plugin/producer_plugin.cpp index ccba614c65..577e43bb55 100644 --- a/plugins/producer_plugin/producer_plugin.cpp +++ b/plugins/producer_plugin/producer_plugin.cpp @@ -774,14 +774,12 @@ class producer_plugin_impl : public std::enable_shared_from_this next) { if (trx_type == transaction_metadata::trx_type::read_only) { - EOS_ASSERT( _ro_thread_pool_size > 0, unsupported_feature, - "read-only transactions execution not enabled on API node. Set read-only-threads > 0" ); + assert(_ro_thread_pool_size > 0); // enforced by chain_plugin + assert(app().executor().get_main_thread_id() != std::this_thread::get_id()); // should only be called from read only threads // Post all read only trxs to read_exclusive queue for execution. auto trx_metadata = transaction_metadata::create_no_recover_keys(trx, transaction_metadata::trx_type::read_only); - app().executor().post(priority::low, exec_queue::read_exclusive, [this, trx{std::move(trx_metadata)}, next{std::move(next)}]() mutable { - push_read_only_transaction(std::move(trx), std::move(next)); - }); + push_read_only_transaction(std::move(trx_metadata), std::move(next)); return; } @@ -2659,11 +2657,8 @@ void producer_plugin::log_failed_transaction(const transaction_id_type& trx_i // Called from only one read_only thread void producer_plugin_impl::switch_to_write_window() { - if (_log.is_enabled(fc::log_level::debug)) { - auto now = fc::time_point::now(); - fc_dlog(_log, "Read-only threads ${n}, read window ${r}us, total all threads ${t}us", - ("n", _ro_thread_pool_size)("r", now - _ro_read_window_start_time)("t", _ro_all_threads_exec_time_us.load())); - } + fc_ilog(_log, "Read-only threads ${n}, read window ${r}us, total all threads ${t}us", + ("n", _ro_thread_pool_size)("r", fc::time_point::now() - _ro_read_window_start_time)("t", _ro_all_threads_exec_time_us.load())); chain::controller& chain = chain_plug->chain(); @@ -2714,10 +2709,12 @@ void producer_plugin_impl::switch_to_read_window() { // we are in write window, so no read-only trx threads are processing transactions. app().get_io_service().poll(); // make sure we schedule any ready - if (app().executor().read_only_queue_empty() && app().executor().read_exclusive_queue_empty()) { // no read-only tasks to process. stay in write window + if (app().executor().read_exclusive_queue_empty() && app().executor().read_only_queue_empty()) { // no read-only tasks to process. stay in write window start_write_window(); // restart write window timer for next round return; } + fc_dlog(_log, "Read only queue size ${s1}, read exclusive size ${s2}", + ("s1", app().executor().read_only_queue_size())("s2", app().executor().read_exclusive_queue_size())); uint32_t pending_block_num = chain.head_block_num() + 1; _ro_read_window_start_time = fc::time_point::now(); diff --git a/programs/nodeos/main.cpp b/programs/nodeos/main.cpp index 6e2feeba91..cd291920be 100644 --- a/programs/nodeos/main.cpp +++ b/programs/nodeos/main.cpp @@ -190,6 +190,7 @@ int main(int argc, char** argv) ilog("${name} using configuration file ${c}", ("name", nodeos::config::node_executable_name)("c", app->full_config_file_path().string())); ilog("${name} data directory is ${d}", ("name", nodeos::config::node_executable_name)("d", app->data_dir().string())); ::detail::log_non_default_options(app->get_parsed_options()); + app->executor().init_main_thread_id(); app->startup(); app->set_thread_priority_max(); app->exec();