From ae1a89cf559edc4c23ddfa1b612fe524a36a3ee7 Mon Sep 17 00:00:00 2001 From: Kevin Heifner Date: Fri, 15 Sep 2023 13:55:08 -0500 Subject: [PATCH 01/20] GH-1639 Change default max-transaction-time from 30 to 499 --- .../03_plugins/producer_plugin/index.md | 8 +++--- plugins/producer_plugin/producer_plugin.cpp | 25 ++++++++----------- 2 files changed, 15 insertions(+), 18 deletions(-) diff --git a/docs/01_nodeos/03_plugins/producer_plugin/index.md b/docs/01_nodeos/03_plugins/producer_plugin/index.md index 2008de7d6f..07aa7b1067 100644 --- a/docs/01_nodeos/03_plugins/producer_plugin/index.md +++ b/docs/01_nodeos/03_plugins/producer_plugin/index.md @@ -27,10 +27,10 @@ Config Options for eosio::producer_plugin: chain is stale. -x [ --pause-on-startup ] Start this node in a state where production is paused - --max-transaction-time arg (=30) Limits the maximum time (in - milliseconds) that is allowed a pushed - transaction's code to execute before - being considered invalid + --max-transaction-time arg (=499) Locally lowers the max_transaction_cpu_ + usage limit (in milliseconds) that an + input transaction is allowed to execute + before being considered invalid --max-irreversible-block-age arg (=-1) Limits the maximum age (in seconds) of the DPOS Irreversible Block for a chain diff --git a/plugins/producer_plugin/producer_plugin.cpp b/plugins/producer_plugin/producer_plugin.cpp index bffdf69e4d..25ac6db4e6 100644 --- a/plugins/producer_plugin/producer_plugin.cpp +++ b/plugins/producer_plugin/producer_plugin.cpp @@ -1047,8 +1047,8 @@ void producer_plugin::set_program_options( producer_options.add_options() ("enable-stale-production,e", boost::program_options::bool_switch()->notifier([this](bool e){my->_production_enabled = e;}), "Enable block production, even if the chain is stale.") ("pause-on-startup,x", boost::program_options::bool_switch()->notifier([this](bool p){my->_pause_production = p;}), "Start this node in a state where production is paused") - ("max-transaction-time", bpo::value()->default_value(30), - "Limits the maximum time (in milliseconds) that is allowed a pushed transaction's code to execute before being considered invalid") + ("max-transaction-time", bpo::value()->default_value(config::block_interval_ms-1), + "Locally lowers the max_transaction_cpu_usage limit (in milliseconds) that an input transaction is allowed to execute before being considered invalid") ("max-irreversible-block-age", bpo::value()->default_value( -1 ), "Limits the maximum age (in seconds) of the DPOS Irreversible Block for a chain this node will produce blocks on (use negative value to indicate unlimited)") ("producer-name,p", boost::program_options::value>()->composing()->multitoken(), @@ -1270,28 +1270,25 @@ void producer_plugin_impl::plugin_initialize(const boost::program_options::varia ("read", _ro_read_window_time_us)("min", _ro_read_window_minimum_time_us)); _ro_read_window_effective_time_us = _ro_read_window_time_us - _ro_read_window_minimum_time_us; - // Make sure a read-only transaction can finish within the read - // window if scheduled at the very beginning of the window. - // Add _ro_read_window_minimum_time_us for safety margin. - if (_max_transaction_time_ms.load() > 0) { - EOS_ASSERT( - _ro_read_window_time_us > (fc::milliseconds(_max_transaction_time_ms.load()) + _ro_read_window_minimum_time_us), - plugin_config_exception, - "read-only-read-window-time-us (${read} us) must be greater than max-transaction-time (${trx_time} us) " - "plus ${min} us, required: ${read} us > (${trx_time} us + ${min} us).", - ("read", _ro_read_window_time_us)("trx_time", _max_transaction_time_ms.load() * 1000)("min", _ro_read_window_minimum_time_us)); - } ilog("read-only-write-window-time-us: ${ww} us, read-only-read-window-time-us: ${rw} us, effective read window time to be used: ${w} us", ("ww", _ro_write_window_time_us)("rw", _ro_read_window_time_us)("w", _ro_read_window_effective_time_us)); } - // Make sure _ro_max_trx_time_us is alwasys set. + // Make sure _ro_max_trx_time_us is always set. + // Make sure a read-only transaction can finish within the read + // window if scheduled at the very beginning of the window. + // Add _ro_read_window_minimum_time_us for safety margin. if (_max_transaction_time_ms.load() > 0) { _ro_max_trx_time_us = fc::milliseconds(_max_transaction_time_ms.load()); } else { // max-transaction-time can be set to negative for unlimited time _ro_max_trx_time_us = fc::microseconds::maximum(); } + if (_ro_max_trx_time_us > _ro_read_window_effective_time_us) { + _ro_max_trx_time_us = _ro_read_window_effective_time_us; + } + ilog("Read-only max transaction time ${rot}us set to fit in the effective read-only window ${row}us.", + ("rot", _ro_max_trx_time_us)("row", _ro_read_window_effective_time_us)); ilog("read-only-threads ${s}, max read-only trx time to be enforced: ${t} us", ("s", _ro_thread_pool_size)("t", _ro_max_trx_time_us)); _incoming_block_sync_provider = app().get_method().register_provider( From 494fbae3f577f152ba79ae28e6e418e84437f349 Mon Sep 17 00:00:00 2001 From: Kevin Heifner Date: Mon, 18 Sep 2023 10:33:08 -0500 Subject: [PATCH 02/20] GH-1639 Execute read-only trxs only on read-only threads --- .../include/eosio/chain/application.hpp | 38 ++++++++++++++----- .../include/eosio/chain/exec_pri_queue.hpp | 15 +++++++- plugins/producer_plugin/producer_plugin.cpp | 33 ++++++---------- tests/read_only_trx_test.py | 1 + tests/test_read_only_trx.cpp | 12 ------ 5 files changed, 55 insertions(+), 44 deletions(-) diff --git a/libraries/custom_appbase/include/eosio/chain/application.hpp b/libraries/custom_appbase/include/eosio/chain/application.hpp index b630e16ce9..3871c01342 100644 --- a/libraries/custom_appbase/include/eosio/chain/application.hpp +++ b/libraries/custom_appbase/include/eosio/chain/application.hpp @@ -23,15 +23,21 @@ enum class exec_window { enum class exec_queue { read_only, // the queue storing tasks which are safe to execute - // in parallel with other read-only tasks in the read-only + // in parallel with other read-only & read_exclusive tasks in the read-only // thread pool as well as on the main app thread. // Multi-thread safe as long as nothing is executed from the read_write queue. - read_write // the queue storing tasks which can be only executed + read_write, // the queue storing tasks which can be only executed // on the app thread while read-only tasks are // not being executed in read-only threads. Single threaded. + read_exclusive // the queue storing tasks which should only be executed + // in parallel with other read_exclusive or read_only tasks in the + // read-only thread pool. Should never be executed on the main thread. + // If no read-only thread pool is available this queue grows unbounded + // as tasks will never execute. User is responsible for not queueing + // read_exclusive tasks if no read-only thread pool is available. }; -class two_queue_executor { +class three_queue_executor { public: // Trade off on returning to appbase exec() loop as the overhead of poll/run can be measurable for small running tasks. @@ -42,8 +48,10 @@ class two_queue_executor { auto post( int priority, exec_queue q, Func&& func ) { if ( q == exec_queue::read_write ) return boost::asio::post(io_serv_, read_write_queue_.wrap(priority, --order_, std::forward(func))); - else + else if ( q == exec_queue::read_only ) return boost::asio::post( io_serv_, read_only_queue_.wrap( priority, --order_, std::forward( func))); + else + return boost::asio::post( io_serv_, read_exclusive_queue_.wrap( priority, --order_, std::forward( func))); } // Legacy and deprecated. To be removed after cleaning up its uses in base appbase @@ -56,6 +64,7 @@ class two_queue_executor { boost::asio::io_service& get_io_service() { return io_serv_; } + // called from main thread bool execute_highest() { // execute for at least minimum runtime const auto end = std::chrono::high_resolution_clock::now() + std::chrono::milliseconds(minimum_runtime_ms); @@ -81,13 +90,20 @@ class two_queue_executor { return more; } - bool execute_highest_read_only() { + bool execute_highest_read() { // execute for at least minimum runtime const auto end = std::chrono::high_resolution_clock::now() + std::chrono::milliseconds(minimum_runtime_ms); bool more = false; while (true) { - more = read_only_queue_.execute_highest_locked( true ); + std::optional exec_read_only_queue = read_only_queue_.compare_queues_locked(read_exclusive_queue_); + if (!exec_read_only_queue) { + more = read_only_queue_.execute_highest_locked( true ); + } else if (*exec_read_only_queue) { + more = read_only_queue_.execute_highest_locked( false ); + } else { + more = read_exclusive_queue_.execute_highest_locked( false ); + } if (!more || std::chrono::high_resolution_clock::now() > end) break; } @@ -106,16 +122,19 @@ class two_queue_executor { void clear() { read_only_queue_.clear(); read_write_queue_.clear(); + read_exclusive_queue_.clear(); } void set_to_read_window(uint32_t num_threads, std::function should_exit) { exec_window_ = exec_window::read; - read_only_queue_.enable_locking(num_threads, std::move(should_exit)); + read_only_queue_.enable_locking(num_threads, should_exit); + read_exclusive_queue_.enable_locking(num_threads, std::move(should_exit)); } void set_to_write_window() { exec_window_ = exec_window::write; read_only_queue_.disable_locking(); + read_exclusive_queue_.disable_locking(); } bool is_read_window() const { @@ -127,19 +146,20 @@ class two_queue_executor { } auto& read_only_queue() { return read_only_queue_; } - auto& read_write_queue() { return read_write_queue_; } + auto& read_exclusive_queue() { return read_exclusive_queue_; } // members are ordered taking into account that the last one is destructed first private: boost::asio::io_service io_serv_; appbase::exec_pri_queue read_only_queue_; appbase::exec_pri_queue read_write_queue_; + appbase::exec_pri_queue read_exclusive_queue_; std::atomic order_ { std::numeric_limits::max() }; // to maintain FIFO ordering in both queues within priority exec_window exec_window_ { exec_window::write }; }; -using application = application_t; +using application = application_t; } #include diff --git a/libraries/custom_appbase/include/eosio/chain/exec_pri_queue.hpp b/libraries/custom_appbase/include/eosio/chain/exec_pri_queue.hpp index 984adc8d60..4e2f0263ad 100644 --- a/libraries/custom_appbase/include/eosio/chain/exec_pri_queue.hpp +++ b/libraries/custom_appbase/include/eosio/chain/exec_pri_queue.hpp @@ -13,6 +13,11 @@ class exec_pri_queue : public boost::asio::execution_context { public: + ~exec_pri_queue() { + exiting_blocking_ = true; + cond_.notify_all(); + } + void enable_locking(uint32_t num_threads, std::function should_exit) { assert(num_threads > 0 && num_waiting_ == 0); lock_enabled_ = true; @@ -105,6 +110,14 @@ class exec_pri_queue : public boost::asio::execution_context // Only call when locking disabled const auto& top() const { return handlers_.top(); } + // return empty optional if both queues empty. + // true if this queue has the highest priority task to execute + std::optional compare_queues_locked( const exec_pri_queue& rhs ) { + std::scoped_lock g(mtx_, rhs.mtx_); + if (empty() && rhs.empty()) return {}; + return !empty() && (rhs.empty() || *rhs.top() < *top()); + } + class executor { public: @@ -220,7 +233,7 @@ class exec_pri_queue : public boost::asio::execution_context }; bool lock_enabled_ = false; - std::mutex mtx_; + mutable std::mutex mtx_; std::condition_variable cond_; uint32_t num_waiting_{0}; uint32_t max_waiting_{0}; diff --git a/plugins/producer_plugin/producer_plugin.cpp b/plugins/producer_plugin/producer_plugin.cpp index 25ac6db4e6..d9272351c7 100644 --- a/plugins/producer_plugin/producer_plugin.cpp +++ b/plugins/producer_plugin/producer_plugin.cpp @@ -807,9 +807,12 @@ class producer_plugin_impl : public std::enable_shared_from_this next) { if (trx_type == transaction_metadata::trx_type::read_only) { + EOS_ASSERT( _ro_thread_pool_size > 0, unsupported_feature, + "read-only transactions execution not enabled on API node. Set read-only-threads > 0" ); + // Post all read only trxs to read_only queue for execution. auto trx_metadata = transaction_metadata::create_no_recover_keys(trx, transaction_metadata::trx_type::read_only); - app().executor().post(priority::low, exec_queue::read_only, [this, trx{std::move(trx_metadata)}, next{std::move(next)}]() mutable { + app().executor().post(priority::low, exec_queue::read_exclusive, [this, trx{std::move(trx_metadata)}, next{std::move(next)}]() mutable { push_read_only_transaction(std::move(trx), std::move(next)); }); return; @@ -2838,7 +2841,7 @@ void producer_plugin_impl::switch_to_read_window() { _time_tracker.pause(); // we are in write window, so no read-only trx threads are processing transactions. - if (app().executor().read_only_queue().empty()) { // no read-only tasks to process. stay in write window + if (app().executor().read_only_queue().empty() && app().executor().read_exclusive_queue().empty()) { // no read-only tasks to process. stay in write window start_write_window(); // restart write window timer for next round return; } @@ -2889,7 +2892,7 @@ bool producer_plugin_impl::read_only_execution_task(uint32_t pending_block_num) // 2. net_plugin receives a block // 3. no read-only tasks to execute while (fc::time_point::now() < _ro_window_deadline && _received_block < pending_block_num) { - bool more = app().executor().execute_highest_read_only(); // blocks until all read only threads are idle + bool more = app().executor().execute_highest_read(); // blocks until all read only threads are idle if (!more) { break; } @@ -2906,7 +2909,7 @@ bool producer_plugin_impl::read_only_execution_task(uint32_t pending_block_num) // last thread post any exhausted back into read_only queue with slightly higher priority (low+1) so they are executed first ro_trx_t t; while (_ro_exhausted_trx_queue.pop_front(t)) { - app().executor().post(priority::low + 1, exec_queue::read_only, [this, trx{std::move(t.trx)}, next{std::move(t.next)}]() mutable { + app().executor().post(priority::low + 1, exec_queue::read_exclusive, [this, trx{std::move(t.trx)}, next{std::move(t.next)}]() mutable { push_read_only_transaction(std::move(trx), std::move(next)); }); } @@ -2924,7 +2927,7 @@ void producer_plugin_impl::repost_exhausted_transactions(const fc::time_point& d // post any exhausted back into read_only queue with slightly higher priority (low+1) so they are executed first ro_trx_t t; while (!should_interrupt_start_block(deadline, pending_block_num) && _ro_exhausted_trx_queue.pop_front(t)) { - app().executor().post(priority::low + 1, exec_queue::read_only, [this, trx{std::move(t.trx)}, next{std::move(t.next)}]() mutable { + app().executor().post(priority::low + 1, exec_queue::read_exclusive, [this, trx{std::move(t.trx)}, next{std::move(t.next)}]() mutable { push_read_only_transaction(std::move(trx), std::move(next)); }); } @@ -2944,21 +2947,10 @@ bool producer_plugin_impl::push_read_only_transaction(transaction_metadata_ptr t return true; } - // When executing a read-only trx on the main thread while in the write window, - // need to switch db mode to read only. - auto db_read_only_mode_guard = fc::make_scoped_exit([&] { - if (chain.is_write_window()) - chain.unset_db_read_only_mode(); - }); + assert(!chain.is_write_window()); - std::optional trx_tracker; - if ( chain.is_write_window() ) { - chain.set_db_read_only_mode(); - trx_tracker.emplace(_time_tracker.start_trx(true, start)); - } - - // use read-window/write-window deadline if there are read/write windows, otherwise use block_deadline if only the app thead - auto window_deadline = (_ro_thread_pool_size != 0) ? _ro_window_deadline : _pending_block_deadline; + // use read-window/write-window deadline + auto window_deadline = _ro_window_deadline; // Ensure the trx to finish by the end of read-window or write-window or block_deadline depending on auto trace = chain.push_transaction(trx, window_deadline, _ro_max_trx_time_us, 0, false, 0); @@ -2976,9 +2968,6 @@ bool producer_plugin_impl::push_read_only_transaction(transaction_metadata_ptr t _ro_exhausted_trx_queue.push_front({std::move(trx), std::move(next)}); } - if ( chain.is_write_window() && !pr.failed ) { - trx_tracker->trx_success(); - } } catch (const guard_exception& e) { chain_plugin::handle_guard_exception(e); } catch (boost::interprocess::bad_alloc&) { diff --git a/tests/read_only_trx_test.py b/tests/read_only_trx_test.py index 91f92a8a0b..543f1eea1c 100755 --- a/tests/read_only_trx_test.py +++ b/tests/read_only_trx_test.py @@ -110,6 +110,7 @@ def startCluster(): specificExtraNodeosArgs[pnodes]+=" 1 " # set small so there is churn specificExtraNodeosArgs[pnodes]+=" --read-only-threads " specificExtraNodeosArgs[pnodes]+=str(args.read_only_threads) + specificExtraNodeosArgs[pnodes]+=" --contracts-console " if args.eos_vm_oc_enable: if platform.system() != "Linux": Print("OC not run on Linux. Skip the test") diff --git a/tests/test_read_only_trx.cpp b/tests/test_read_only_trx.cpp index b2e615ef14..ffe9fd7e41 100644 --- a/tests/test_read_only_trx.cpp +++ b/tests/test_read_only_trx.cpp @@ -73,12 +73,6 @@ BOOST_AUTO_TEST_CASE(read_only_on_producer) { test_configs_common(specific_args, app_init_status::failed); } -// read_window_time must be greater than max_transaction_time + 10ms -BOOST_AUTO_TEST_CASE(invalid_read_window_time) { - std::vector specific_args = { "--read-only-threads", "2", "--max-transaction-time", "10", "--read-only-write-window-time-us", "50000", "--read-only-read-window-time-us", "20000" }; // 20000 not greater than --max-transaction-time (10ms) + 10000us (minimum margin) - test_configs_common(specific_args, app_init_status::failed); -} - // if --read-only-threads is not configured, read-only trx related configs should // not be checked BOOST_AUTO_TEST_CASE(not_check_configs_if_no_read_only_threads) { @@ -184,12 +178,6 @@ void test_trxs_common(std::vector& specific_args, bool test_disable } FC_LOG_AND_RETHROW() } -// test read-only trxs on main thread (no --read-only-threads) -BOOST_AUTO_TEST_CASE(no_read_only_threads) { - std::vector specific_args = { "-p", "eosio", "-e", "--abi-serializer-max-time-ms=999" }; - test_trxs_common(specific_args); -} - // test read-only trxs on 1 threads (with --read-only-threads) BOOST_AUTO_TEST_CASE(with_1_read_only_threads) { std::vector specific_args = { "-p", "eosio", "-e", From fd232b21ebfb2b0e6bc67d781befb00bf9f45c8d Mon Sep 17 00:00:00 2001 From: Kevin Heifner Date: Mon, 18 Sep 2023 10:47:33 -0500 Subject: [PATCH 03/20] GH-1639 Add include --- libraries/custom_appbase/include/eosio/chain/application.hpp | 1 + 1 file changed, 1 insertion(+) diff --git a/libraries/custom_appbase/include/eosio/chain/application.hpp b/libraries/custom_appbase/include/eosio/chain/application.hpp index 3871c01342..0e984d16f4 100644 --- a/libraries/custom_appbase/include/eosio/chain/application.hpp +++ b/libraries/custom_appbase/include/eosio/chain/application.hpp @@ -3,6 +3,7 @@ #include #include #include +#include #include /* From 274925604cc40aaf005f7d59f59c3d3c01f57d5b Mon Sep 17 00:00:00 2001 From: Kevin Heifner Date: Mon, 18 Sep 2023 10:56:37 -0500 Subject: [PATCH 04/20] GH-1639 Add include --- libraries/custom_appbase/include/eosio/chain/exec_pri_queue.hpp | 1 + 1 file changed, 1 insertion(+) diff --git a/libraries/custom_appbase/include/eosio/chain/exec_pri_queue.hpp b/libraries/custom_appbase/include/eosio/chain/exec_pri_queue.hpp index 4e2f0263ad..41716284af 100644 --- a/libraries/custom_appbase/include/eosio/chain/exec_pri_queue.hpp +++ b/libraries/custom_appbase/include/eosio/chain/exec_pri_queue.hpp @@ -3,6 +3,7 @@ #include #include +#include #include namespace appbase { From f038d6554397dd61de68ea5079f25bf006cbeb58 Mon Sep 17 00:00:00 2001 From: Kevin Heifner Date: Mon, 18 Sep 2023 12:00:05 -0500 Subject: [PATCH 05/20] GH-1639 Enable contracts-console on all nodes --- tests/read_only_trx_test.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tests/read_only_trx_test.py b/tests/read_only_trx_test.py index 543f1eea1c..b4fd1a8cc4 100755 --- a/tests/read_only_trx_test.py +++ b/tests/read_only_trx_test.py @@ -110,7 +110,6 @@ def startCluster(): specificExtraNodeosArgs[pnodes]+=" 1 " # set small so there is churn specificExtraNodeosArgs[pnodes]+=" --read-only-threads " specificExtraNodeosArgs[pnodes]+=str(args.read_only_threads) - specificExtraNodeosArgs[pnodes]+=" --contracts-console " if args.eos_vm_oc_enable: if platform.system() != "Linux": Print("OC not run on Linux. Skip the test") @@ -120,7 +119,7 @@ def startCluster(): if args.wasm_runtime: specificExtraNodeosArgs[pnodes]+=" --wasm-runtime " specificExtraNodeosArgs[pnodes]+=args.wasm_runtime - extraNodeosArgs=" --http-max-response-time-ms 990000 --disable-subjective-api-billing false " + extraNodeosArgs=" --http-max-response-time-ms 990000 --disable-subjective-api-billing false --contracts-console " if cluster.launch(pnodes=pnodes, totalNodes=total_nodes, topo=topo, delay=delay, specificExtraNodeosArgs=specificExtraNodeosArgs, extraNodeosArgs=extraNodeosArgs ) is False: errorExit("Failed to stand up eos cluster.") From 3e40394a2f9f0038b555fc8b670d13ed324a8435 Mon Sep 17 00:00:00 2001 From: Kevin Heifner Date: Mon, 18 Sep 2023 12:19:13 -0500 Subject: [PATCH 06/20] GH-1639 Revert changes to test --- tests/read_only_trx_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/read_only_trx_test.py b/tests/read_only_trx_test.py index b4fd1a8cc4..91f92a8a0b 100755 --- a/tests/read_only_trx_test.py +++ b/tests/read_only_trx_test.py @@ -119,7 +119,7 @@ def startCluster(): if args.wasm_runtime: specificExtraNodeosArgs[pnodes]+=" --wasm-runtime " specificExtraNodeosArgs[pnodes]+=args.wasm_runtime - extraNodeosArgs=" --http-max-response-time-ms 990000 --disable-subjective-api-billing false --contracts-console " + extraNodeosArgs=" --http-max-response-time-ms 990000 --disable-subjective-api-billing false " if cluster.launch(pnodes=pnodes, totalNodes=total_nodes, topo=topo, delay=delay, specificExtraNodeosArgs=specificExtraNodeosArgs, extraNodeosArgs=extraNodeosArgs ) is False: errorExit("Failed to stand up eos cluster.") From 1a6631a0b1da8adbb3ec3b5d912f9cd491a2125c Mon Sep 17 00:00:00 2001 From: Kevin Heifner Date: Mon, 18 Sep 2023 14:02:35 -0500 Subject: [PATCH 07/20] GH-1639 read-only trxs only allowed when read-only-threads > 0 --- tests/CMakeLists.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 233824062b..701fab3c74 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -131,7 +131,7 @@ add_test(NAME nodeos_protocol_feature_test COMMAND tests/nodeos_protocol_feature set_property(TEST nodeos_protocol_feature_test PROPERTY LABELS nonparallelizable_tests) add_test(NAME compute_transaction_test COMMAND tests/compute_transaction_test.py -v -p 2 -n 3 ${UNSHARE} WORKING_DIRECTORY ${CMAKE_BINARY_DIR}) set_property(TEST compute_transaction_test PROPERTY LABELS nonparallelizable_tests) -add_test(NAME read-only-trx-basic-test COMMAND tests/read_only_trx_test.py -v -p 2 -n 3 --read-only-threads 0 ${UNSHARE} WORKING_DIRECTORY ${CMAKE_BINARY_DIR}) +add_test(NAME read-only-trx-basic-test COMMAND tests/read_only_trx_test.py -v -p 2 -n 3 --read-only-threads 1 ${UNSHARE} WORKING_DIRECTORY ${CMAKE_BINARY_DIR}) set_property(TEST read-only-trx-basic-test PROPERTY LABELS nonparallelizable_tests) add_test(NAME read-only-trx-parallel-test COMMAND tests/read_only_trx_test.py -v -p 2 -n 3 --read-only-threads 128 --num-test-runs 3 ${UNSHARE} WORKING_DIRECTORY ${CMAKE_BINARY_DIR}) set_property(TEST read-only-trx-parallel-test PROPERTY LABELS nonparallelizable_tests) From 7a7aa583d4760558b9c622c1aa9b0a747370cb69 Mon Sep 17 00:00:00 2001 From: Kevin Heifner Date: Mon, 18 Sep 2023 15:47:48 -0500 Subject: [PATCH 08/20] GH-1639 Remove unneeded dependency on producer_plugin --- tests/test_snapshot_information.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/test_snapshot_information.cpp b/tests/test_snapshot_information.cpp index 5c45b38ed4..49aa25c431 100644 --- a/tests/test_snapshot_information.cpp +++ b/tests/test_snapshot_information.cpp @@ -2,8 +2,8 @@ #include #include #include "snapshot_suites.hpp" -#include #include +#include #include #include @@ -18,7 +18,7 @@ namespace { BOOST_AUTO_TEST_SUITE(producer_snapshot_tests) -using next_t = eosio::producer_plugin::next_function; +using next_t = pending_snapshot::next_t; BOOST_AUTO_TEST_CASE_TEMPLATE(test_snapshot_information, SNAPSHOT_SUITE, snapshot_suites) { tester chain; From 1368ecd0b72f1387f0545c6deef9bd494b1aa7fe Mon Sep 17 00:00:00 2001 From: Kevin Heifner Date: Mon, 18 Sep 2023 15:48:56 -0500 Subject: [PATCH 09/20] GH-1639 Fix producer_plugin shutdown of read only threads to prevent SEGFAULT and deadlock. --- .../custom_appbase/include/eosio/chain/application.hpp | 6 ++++++ .../custom_appbase/include/eosio/chain/exec_pri_queue.hpp | 3 ++- plugins/producer_plugin/producer_plugin.cpp | 4 ++++ 3 files changed, 12 insertions(+), 1 deletion(-) diff --git a/libraries/custom_appbase/include/eosio/chain/application.hpp b/libraries/custom_appbase/include/eosio/chain/application.hpp index 0e984d16f4..d4939fc418 100644 --- a/libraries/custom_appbase/include/eosio/chain/application.hpp +++ b/libraries/custom_appbase/include/eosio/chain/application.hpp @@ -119,6 +119,12 @@ class three_queue_executor { else return read_only_queue_.wrap( priority, --order_, std::forward( func)); } + + void stop() { + read_only_queue_.stop(); + read_write_queue_.stop(); + read_exclusive_queue_.stop(); + } void clear() { read_only_queue_.clear(); diff --git a/libraries/custom_appbase/include/eosio/chain/exec_pri_queue.hpp b/libraries/custom_appbase/include/eosio/chain/exec_pri_queue.hpp index 41716284af..bb0a6b62d4 100644 --- a/libraries/custom_appbase/include/eosio/chain/exec_pri_queue.hpp +++ b/libraries/custom_appbase/include/eosio/chain/exec_pri_queue.hpp @@ -14,7 +14,8 @@ class exec_pri_queue : public boost::asio::execution_context { public: - ~exec_pri_queue() { + void stop() { + std::lock_guard g( mtx_ ); exiting_blocking_ = true; cond_.notify_all(); } diff --git a/plugins/producer_plugin/producer_plugin.cpp b/plugins/producer_plugin/producer_plugin.cpp index d9272351c7..e6122f1704 100644 --- a/plugins/producer_plugin/producer_plugin.cpp +++ b/plugins/producer_plugin/producer_plugin.cpp @@ -1427,6 +1427,10 @@ void producer_plugin::plugin_startup() { void producer_plugin_impl::plugin_shutdown() { boost::system::error_code ec; _timer.cancel(ec); + boost::system::error_code ro_ec; + _ro_timer.cancel(ro_ec); + app().executor().stop(); + _ro_thread_pool.stop(); _thread_pool.stop(); _unapplied_transactions.clear(); From 11df9d14bfc5c7cb42f038ad25fecf0ef7f7f8ee Mon Sep 17 00:00:00 2001 From: Kevin Heifner Date: Mon, 18 Sep 2023 19:48:49 -0500 Subject: [PATCH 10/20] GH-1639 Decrease read-only-threads from 128 to 16 since ci/cd was timing out --- tests/CMakeLists.txt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 701fab3c74..95bdd95706 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -133,9 +133,9 @@ add_test(NAME compute_transaction_test COMMAND tests/compute_transaction_test.py set_property(TEST compute_transaction_test PROPERTY LABELS nonparallelizable_tests) add_test(NAME read-only-trx-basic-test COMMAND tests/read_only_trx_test.py -v -p 2 -n 3 --read-only-threads 1 ${UNSHARE} WORKING_DIRECTORY ${CMAKE_BINARY_DIR}) set_property(TEST read-only-trx-basic-test PROPERTY LABELS nonparallelizable_tests) -add_test(NAME read-only-trx-parallel-test COMMAND tests/read_only_trx_test.py -v -p 2 -n 3 --read-only-threads 128 --num-test-runs 3 ${UNSHARE} WORKING_DIRECTORY ${CMAKE_BINARY_DIR}) +add_test(NAME read-only-trx-parallel-test COMMAND tests/read_only_trx_test.py -v -p 2 -n 3 --read-only-threads 16 --num-test-runs 3 ${UNSHARE} WORKING_DIRECTORY ${CMAKE_BINARY_DIR}) set_property(TEST read-only-trx-parallel-test PROPERTY LABELS nonparallelizable_tests) -add_test(NAME read-only-trx-parallel-eos-vm-oc-test COMMAND tests/read_only_trx_test.py -v -p 2 -n 3 --eos-vm-oc-enable all --read-only-threads 128 --num-test-runs 3 ${UNSHARE} WORKING_DIRECTORY ${CMAKE_BINARY_DIR}) +add_test(NAME read-only-trx-parallel-eos-vm-oc-test COMMAND tests/read_only_trx_test.py -v -p 2 -n 3 --eos-vm-oc-enable all --read-only-threads 16 --num-test-runs 3 ${UNSHARE} WORKING_DIRECTORY ${CMAKE_BINARY_DIR}) set_property(TEST read-only-trx-parallel-eos-vm-oc-test PROPERTY LABELS nonparallelizable_tests) add_test(NAME read-only-trx-parallel-no-oc-test COMMAND tests/read_only_trx_test.py -v -p 2 -n 3 --eos-vm-oc-enable none --read-only-threads 6 --num-test-runs 2 ${UNSHARE} WORKING_DIRECTORY ${CMAKE_BINARY_DIR}) set_property(TEST read-only-trx-parallel-no-oc-test PROPERTY LABELS nonparallelizable_tests) From 36be3f79d461a3c60aee79a38e9361ff72ab0cc7 Mon Sep 17 00:00:00 2001 From: Kevin Heifner Date: Tue, 19 Sep 2023 11:53:02 -0500 Subject: [PATCH 11/20] GH-1639 Favor this over rhs for queue comparison. Also use C++20 spaceship comparison. --- .../include/eosio/chain/exec_pri_queue.hpp | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/libraries/custom_appbase/include/eosio/chain/exec_pri_queue.hpp b/libraries/custom_appbase/include/eosio/chain/exec_pri_queue.hpp index bb0a6b62d4..6321fb8add 100644 --- a/libraries/custom_appbase/include/eosio/chain/exec_pri_queue.hpp +++ b/libraries/custom_appbase/include/eosio/chain/exec_pri_queue.hpp @@ -116,8 +116,9 @@ class exec_pri_queue : public boost::asio::execution_context // true if this queue has the highest priority task to execute std::optional compare_queues_locked( const exec_pri_queue& rhs ) { std::scoped_lock g(mtx_, rhs.mtx_); - if (empty() && rhs.empty()) return {}; - return !empty() && (rhs.empty() || *rhs.top() < *top()); + if (empty() && rhs.empty()) + return {}; + return !empty() && (rhs.empty() || *rhs.top() <= *top()); } class executor @@ -192,14 +193,10 @@ class exec_pri_queue : public boost::asio::execution_context virtual void execute() = 0; int priority() const { return priority_; } - // C++20 - // friend std::weak_ordering operator<=>(const queued_handler_base&, - // const queued_handler_base&) noexcept = default; - friend bool operator<(const queued_handler_base& a, - const queued_handler_base& b) noexcept - { - return std::tie( a.priority_, a.order_ ) < std::tie( b.priority_, b.order_ ); - } + + // comparison eval: (priority_, order_) + friend auto operator<=>(const queued_handler_base& a, + const queued_handler_base& b) noexcept = default; private: int priority_; From d2cd84c72fc1d209651ab0b48c160818c0301ea5 Mon Sep 17 00:00:00 2001 From: Kevin Heifner Date: Tue, 19 Sep 2023 12:57:30 -0500 Subject: [PATCH 12/20] GH-1639 Use canonical spaceship operator --- .../custom_appbase/include/eosio/chain/exec_pri_queue.hpp | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/libraries/custom_appbase/include/eosio/chain/exec_pri_queue.hpp b/libraries/custom_appbase/include/eosio/chain/exec_pri_queue.hpp index 6321fb8add..0bfe806bd4 100644 --- a/libraries/custom_appbase/include/eosio/chain/exec_pri_queue.hpp +++ b/libraries/custom_appbase/include/eosio/chain/exec_pri_queue.hpp @@ -195,8 +195,7 @@ class exec_pri_queue : public boost::asio::execution_context int priority() const { return priority_; } // comparison eval: (priority_, order_) - friend auto operator<=>(const queued_handler_base& a, - const queued_handler_base& b) noexcept = default; + auto operator<=>(const queued_handler_base& rhs) const noexcept = default; private: int priority_; From 6394011dfcedfe8ca37e640506033c8262854d60 Mon Sep 17 00:00:00 2001 From: Kevin Heifner Date: Tue, 19 Sep 2023 13:44:47 -0500 Subject: [PATCH 13/20] GH-1639 Revert using of spaceship operator as it appears to not work on GCC 10.5 --- .../include/eosio/chain/exec_pri_queue.hpp | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/libraries/custom_appbase/include/eosio/chain/exec_pri_queue.hpp b/libraries/custom_appbase/include/eosio/chain/exec_pri_queue.hpp index 0bfe806bd4..679737d3a0 100644 --- a/libraries/custom_appbase/include/eosio/chain/exec_pri_queue.hpp +++ b/libraries/custom_appbase/include/eosio/chain/exec_pri_queue.hpp @@ -193,9 +193,19 @@ class exec_pri_queue : public boost::asio::execution_context virtual void execute() = 0; int priority() const { return priority_; } - - // comparison eval: (priority_, order_) - auto operator<=>(const queued_handler_base& rhs) const noexcept = default; + // C++20 + // friend std::weak_ordering operator<=>(const queued_handler_base&, + // const queued_handler_base&) noexcept = default; + friend bool operator<(const queued_handler_base& a, + const queued_handler_base& b) noexcept + { + return std::tie( a.priority_, a.order_ ) < std::tie( b.priority_, b.order_ ); + } + friend bool operator<=(const queued_handler_base& a, + const queued_handler_base& b) noexcept + { + return std::tie( a.priority_, a.order_ ) <= std::tie( b.priority_, b.order_ ); + } private: int priority_; From e6b8493916a68e09d8e3cc4d65559b149fc77442 Mon Sep 17 00:00:00 2001 From: Kevin Heifner Date: Tue, 19 Sep 2023 14:53:25 -0500 Subject: [PATCH 14/20] GH-1639 Use same ec for both cancel calls --- plugins/producer_plugin/producer_plugin.cpp | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/plugins/producer_plugin/producer_plugin.cpp b/plugins/producer_plugin/producer_plugin.cpp index e6122f1704..924b61bf92 100644 --- a/plugins/producer_plugin/producer_plugin.cpp +++ b/plugins/producer_plugin/producer_plugin.cpp @@ -1427,8 +1427,7 @@ void producer_plugin::plugin_startup() { void producer_plugin_impl::plugin_shutdown() { boost::system::error_code ec; _timer.cancel(ec); - boost::system::error_code ro_ec; - _ro_timer.cancel(ro_ec); + _ro_timer.cancel(ec); app().executor().stop(); _ro_thread_pool.stop(); _thread_pool.stop(); From 8d221a777c4649c5a19ba99948c61fbced25f99b Mon Sep 17 00:00:00 2001 From: Kevin Heifner Date: Wed, 20 Sep 2023 10:21:29 -0500 Subject: [PATCH 15/20] GH-1639 Modify exec_pri_queue to manage the 3 priority queues instead of three_queue_executor. Simplifies logic fixes issues with previous approach. Also add some poll() calls to make sure queues have latest tasks to execute. --- .../include/eosio/chain/application.hpp | 91 ++----- .../include/eosio/chain/exec_pri_queue.hpp | 164 ++++++++---- libraries/custom_appbase/tests/CMakeLists.txt | 2 +- .../tests/custom_appbase_tests.cpp | 249 +++++++++++++----- plugins/producer_plugin/producer_plugin.cpp | 3 +- 5 files changed, 326 insertions(+), 183 deletions(-) diff --git a/libraries/custom_appbase/include/eosio/chain/application.hpp b/libraries/custom_appbase/include/eosio/chain/application.hpp index d4939fc418..e6fa906f68 100644 --- a/libraries/custom_appbase/include/eosio/chain/application.hpp +++ b/libraries/custom_appbase/include/eosio/chain/application.hpp @@ -22,23 +22,7 @@ enum class exec_window { // the main app thread is active. }; -enum class exec_queue { - read_only, // the queue storing tasks which are safe to execute - // in parallel with other read-only & read_exclusive tasks in the read-only - // thread pool as well as on the main app thread. - // Multi-thread safe as long as nothing is executed from the read_write queue. - read_write, // the queue storing tasks which can be only executed - // on the app thread while read-only tasks are - // not being executed in read-only threads. Single threaded. - read_exclusive // the queue storing tasks which should only be executed - // in parallel with other read_exclusive or read_only tasks in the - // read-only thread pool. Should never be executed on the main thread. - // If no read-only thread pool is available this queue grows unbounded - // as tasks will never execute. User is responsible for not queueing - // read_exclusive tasks if no read-only thread pool is available. -}; - -class three_queue_executor { +class priority_queue_executor { public: // Trade off on returning to appbase exec() loop as the overhead of poll/run can be measurable for small running tasks. @@ -47,12 +31,7 @@ class three_queue_executor { template auto post( int priority, exec_queue q, Func&& func ) { - if ( q == exec_queue::read_write ) - return boost::asio::post(io_serv_, read_write_queue_.wrap(priority, --order_, std::forward(func))); - else if ( q == exec_queue::read_only ) - return boost::asio::post( io_serv_, read_only_queue_.wrap( priority, --order_, std::forward( func))); - else - return boost::asio::post( io_serv_, read_exclusive_queue_.wrap( priority, --order_, std::forward( func))); + return boost::asio::post( io_serv_, pri_queue_.wrap( priority, q, --order_, std::forward(func))); } // Legacy and deprecated. To be removed after cleaning up its uses in base appbase @@ -60,12 +39,12 @@ class three_queue_executor { auto post( int priority, Func&& func ) { // safer to use read_write queue for unknown type of operation since operations // from read_write queue are not executed in parallel with read-only operations - return boost::asio::post(io_serv_, read_write_queue_.wrap(priority, --order_, std::forward(func))); + return boost::asio::post(io_serv_, pri_queue_.wrap(priority, exec_queue::read_write, --order_, std::forward(func))); } boost::asio::io_service& get_io_service() { return io_serv_; } - // called from main thread + // called from main thread, highest read_only and read_write bool execute_highest() { // execute for at least minimum runtime const auto end = std::chrono::high_resolution_clock::now() + std::chrono::milliseconds(minimum_runtime_ms); @@ -73,17 +52,11 @@ class three_queue_executor { bool more = false; while (true) { if ( exec_window_ == exec_window::write ) { - // During write window only main thread is accessing anything in two_queue_executor, no locking required - if( !read_write_queue_.empty() && (read_only_queue_.empty() || *read_only_queue_.top() < *read_write_queue_.top()) ) { - // read_write_queue_'s top function's priority greater than read_only_queue_'s top function's, or read_only_queue_ empty - read_write_queue_.execute_highest(); - } else if( !read_only_queue_.empty() ) { - read_only_queue_.execute_highest(); - } - more = !read_only_queue_.empty() || !read_write_queue_.empty(); + // During write window only main thread is accessing anything in priority_queue_executor, no locking required + more = pri_queue_.execute_highest(exec_queue::read_write, exec_queue::read_only); } else { - // When in read window, multiple threads including main app thread are accessing two_queue_executor, locking required - more = read_only_queue_.execute_highest_locked(false); + // When in read window, multiple threads including main app thread are accessing priority_queue_executor, locking required + more = pri_queue_.execute_highest_locked(exec_queue::read_only); } if (!more || std::chrono::high_resolution_clock::now() > end) break; @@ -97,14 +70,8 @@ class three_queue_executor { bool more = false; while (true) { - std::optional exec_read_only_queue = read_only_queue_.compare_queues_locked(read_exclusive_queue_); - if (!exec_read_only_queue) { - more = read_only_queue_.execute_highest_locked( true ); - } else if (*exec_read_only_queue) { - more = read_only_queue_.execute_highest_locked( false ); - } else { - more = read_exclusive_queue_.execute_highest_locked( false ); - } + get_io_service().poll(); // schedule any queued + more = pri_queue_.execute_highest_locked(exec_queue::read_only, exec_queue::read_exclusive, true); if (!more || std::chrono::high_resolution_clock::now() > end) break; } @@ -114,34 +81,25 @@ class three_queue_executor { template boost::asio::executor_binder wrap(int priority, exec_queue q, Function&& func ) { - if ( q == exec_queue::read_write ) - return read_write_queue_.wrap(priority, --order_, std::forward(func)); - else - return read_only_queue_.wrap( priority, --order_, std::forward( func)); + return pri_queue_.wrap(priority, q, --order_, std::forward(func)); } void stop() { - read_only_queue_.stop(); - read_write_queue_.stop(); - read_exclusive_queue_.stop(); + pri_queue_.stop(); } void clear() { - read_only_queue_.clear(); - read_write_queue_.clear(); - read_exclusive_queue_.clear(); + pri_queue_.clear(); } void set_to_read_window(uint32_t num_threads, std::function should_exit) { exec_window_ = exec_window::read; - read_only_queue_.enable_locking(num_threads, should_exit); - read_exclusive_queue_.enable_locking(num_threads, std::move(should_exit)); + pri_queue_.enable_locking(num_threads, std::move(should_exit)); } void set_to_write_window() { exec_window_ = exec_window::write; - read_only_queue_.disable_locking(); - read_exclusive_queue_.disable_locking(); + pri_queue_.disable_locking(); } bool is_read_window() const { @@ -152,21 +110,22 @@ class three_queue_executor { return exec_window_ == exec_window::write; } - auto& read_only_queue() { return read_only_queue_; } - auto& read_write_queue() { return read_write_queue_; } - auto& read_exclusive_queue() { return read_exclusive_queue_; } + size_t read_only_queue_size() { return pri_queue_.size(exec_queue::read_only); } + size_t read_write_queue_size() { return pri_queue_.size(exec_queue::read_write); } + size_t read_exclusive_queue_size() { return pri_queue_.size(exec_queue::read_exclusive); } + bool read_only_queue_empty() { return pri_queue_.empty(exec_queue::read_only); } + bool read_write_queue_empty() { return pri_queue_.empty(exec_queue::read_write); } + bool read_exclusive_queue_empty() { return pri_queue_.empty(exec_queue::read_exclusive); } // members are ordered taking into account that the last one is destructed first private: boost::asio::io_service io_serv_; - appbase::exec_pri_queue read_only_queue_; - appbase::exec_pri_queue read_write_queue_; - appbase::exec_pri_queue read_exclusive_queue_; - std::atomic order_ { std::numeric_limits::max() }; // to maintain FIFO ordering in both queues within priority - exec_window exec_window_ { exec_window::write }; + appbase::exec_pri_queue pri_queue_; + std::atomic order_{ std::numeric_limits::max() }; // to maintain FIFO ordering in all queues within priority + exec_window exec_window_{ exec_window::write }; }; -using application = application_t; +using application = application_t; } #include diff --git a/libraries/custom_appbase/include/eosio/chain/exec_pri_queue.hpp b/libraries/custom_appbase/include/eosio/chain/exec_pri_queue.hpp index 679737d3a0..abec59e810 100644 --- a/libraries/custom_appbase/include/eosio/chain/exec_pri_queue.hpp +++ b/libraries/custom_appbase/include/eosio/chain/exec_pri_queue.hpp @@ -9,6 +9,22 @@ namespace appbase { // adapted from: https://www.boost.org/doc/libs/1_69_0/doc/html/boost_asio/example/cpp11/invocation/prioritised_handlers.cpp +enum class exec_queue { + read_only, // the queue storing tasks which are safe to execute + // in parallel with other read-only & read_exclusive tasks in the read-only + // thread pool as well as on the main app thread. + // Multi-thread safe as long as nothing is executed from the read_write queue. + read_write, // the queue storing tasks which can be only executed + // on the app thread while read-only tasks are + // not being executed in read-only threads. Single threaded. + read_exclusive // the queue storing tasks which should only be executed + // in parallel with other read_exclusive or read_only tasks in the + // read-only thread pool. Should never be executed on the main thread. + // If no read-only thread pool is available this queue grows unbounded + // as tasks will never execute. User is responsible for not queueing + // read_exclusive tasks if no read-only thread pool is available. +}; + // Locking has to be coordinated by caller, use with care. class exec_pri_queue : public boost::asio::execution_context { @@ -35,53 +51,74 @@ class exec_pri_queue : public boost::asio::execution_context // called from appbase::application_base::exec poll_one() or run_one() template - void add(int priority, size_t order, Function function) - { + void add(int priority, exec_queue q, size_t order, Function function) { + prio_queue& que = priority_que(q); std::unique_ptr handler(new queued_handler(priority, order, std::move(function))); if (lock_enabled_) { std::lock_guard g( mtx_ ); - handlers_.push( std::move( handler ) ); + que.push( std::move( handler ) ); if (num_waiting_) cond_.notify_one(); } else { - handlers_.push( std::move( handler ) ); + que.push( std::move( handler ) ); } } // only call when no lock required - void clear() - { - handlers_ = prio_queue(); + void clear() { + read_only_handlers_ = prio_queue(); + read_write_handlers_ = prio_queue(); + read_exclusive_handlers_ = prio_queue(); } // only call when no lock required - bool execute_highest() - { - if( !handlers_.empty() ) { - handlers_.top()->execute(); - handlers_.pop(); + bool execute_highest(exec_queue q) { + prio_queue& que = priority_que(q); + if( !que.empty() ) { + que.top()->execute(); + que.pop(); } - return !handlers_.empty(); + return !que.empty(); } -private: - // has to be defined before use, auto return type - auto pop() { - auto t = std::move(const_cast&>(handlers_.top())); - handlers_.pop(); - return t; + // only call when no lock required + bool execute_highest(exec_queue lhs, exec_queue rhs) { + prio_queue& lhs_que = priority_que(lhs); + prio_queue& rhs_que = priority_que(rhs); + size_t size = lhs_que.size() + rhs_que.size(); + if (size == 0) + return false; + exec_queue q = rhs; + if (!lhs_que.empty() && (rhs_que.empty() || *rhs_que.top() < *lhs_que.top())) + q = lhs; + prio_queue& que = priority_que(q); + que.top()->execute(); + que.pop(); + --size; + return size > 0; } -public: + bool execute_highest_locked(exec_queue q) { + prio_queue& que = priority_que(q); + std::unique_lock g(mtx_); + if (que.empty()) + return false; + auto t = pop(que); + g.unlock(); + t->execute(); + return true; + } - bool execute_highest_locked(bool should_block) { + bool execute_highest_locked(exec_queue lhs, exec_queue rhs, bool should_block) { + prio_queue& lhs_que = priority_que(lhs); + prio_queue& rhs_que = priority_que(rhs); std::unique_lock g(mtx_); if (should_block) { ++num_waiting_; - cond_.wait(g, [this](){ + cond_.wait(g, [&](){ bool exit = exiting_blocking_ || should_exit_(); - bool empty = handlers_.empty(); + bool empty = lhs_que.empty() && rhs_que.empty(); if (empty || exit) { if (((empty && num_waiting_ == max_waiting_) || exit) && !exiting_blocking_) { cond_.notify_all(); @@ -95,37 +132,32 @@ class exec_pri_queue : public boost::asio::execution_context if (exiting_blocking_ || should_exit_()) return false; } - if( handlers_.empty() ) + if (lhs_que.empty() && rhs_que.empty()) return false; - auto t = pop(); + exec_queue q = rhs; + if (!lhs_que.empty() && (rhs_que.empty() || *rhs_que.top() < *lhs_que.top())) + q = lhs; + auto t = pop(priority_que(q)); g.unlock(); t->execute(); return true; } // Only call when locking disabled - size_t size() const { return handlers_.size(); } + size_t size(exec_queue q) const { return priority_que(q).size(); } + size_t size() const { return read_only_handlers_.size() + read_write_handlers_.size() + read_exclusive_handlers_.size(); } // Only call when locking disabled - bool empty() const { return handlers_.empty(); } + bool empty(exec_queue q) const { return priority_que(q).empty(); } // Only call when locking disabled - const auto& top() const { return handlers_.top(); } - - // return empty optional if both queues empty. - // true if this queue has the highest priority task to execute - std::optional compare_queues_locked( const exec_pri_queue& rhs ) { - std::scoped_lock g(mtx_, rhs.mtx_); - if (empty() && rhs.empty()) - return {}; - return !empty() && (rhs.empty() || *rhs.top() <= *top()); - } + const auto& top(exec_queue q) const { return priority_que(q).top(); } class executor { public: - executor(exec_pri_queue& q, int p, size_t o) - : context_(q), priority_(p), order_(o) + executor(exec_pri_queue& q, int p, size_t o, exec_queue que) + : context_(q), que_(que), priority_(p), order_(o) { } @@ -137,19 +169,19 @@ class exec_pri_queue : public boost::asio::execution_context template void dispatch(Function f, const Allocator&) const { - context_.add(priority_, order_, std::move(f)); + context_.add(priority_, que_, order_, std::move(f)); } template void post(Function f, const Allocator&) const { - context_.add(priority_, order_, std::move(f)); + context_.add(priority_, que_, order_, std::move(f)); } template void defer(Function f, const Allocator&) const { - context_.add(priority_, order_, std::move(f)); + context_.add(priority_, que_, order_, std::move(f)); } void on_work_started() const noexcept {} @@ -157,7 +189,7 @@ class exec_pri_queue : public boost::asio::execution_context bool operator==(const executor& other) const noexcept { - return order_ == other.order_ && &context_ == &other.context_ && priority_ == other.priority_; + return order_ == other.order_ && priority_ == other.priority_ && que_ == other.que_ && &context_ == &other.context_; } bool operator!=(const executor& other) const noexcept @@ -167,15 +199,16 @@ class exec_pri_queue : public boost::asio::execution_context private: exec_pri_queue& context_; + exec_queue que_; int priority_; size_t order_; }; template boost::asio::executor_binder - wrap(int priority, size_t order, Function&& func) + wrap(int priority, exec_queue q, size_t order, Function&& func) { - return boost::asio::bind_executor( executor(*this, priority, order), std::forward(func) ); + return boost::asio::bind_executor( executor(*this, priority, order, q), std::forward(func) ); } private: @@ -201,11 +234,6 @@ class exec_pri_queue : public boost::asio::execution_context { return std::tie( a.priority_, a.order_ ) < std::tie( b.priority_, b.order_ ); } - friend bool operator<=(const queued_handler_base& a, - const queued_handler_base& b) noexcept - { - return std::tie( a.priority_, a.order_ ) <= std::tie( b.priority_, b.order_ ); - } private: int priority_; @@ -240,6 +268,37 @@ class exec_pri_queue : public boost::asio::execution_context } }; + using prio_queue = std::priority_queue, std::deque>, deref_less>; + + prio_queue& priority_que(exec_queue q) { + switch (q) { + case exec_queue::read_only: + return read_only_handlers_; + case exec_queue::read_write: + return read_write_handlers_; + case exec_queue::read_exclusive: + return read_exclusive_handlers_; + } + } + + const prio_queue& priority_que(exec_queue q) const { + switch (q) { + case exec_queue::read_only: + return read_only_handlers_; + case exec_queue::read_write: + return read_write_handlers_; + case exec_queue::read_exclusive: + return read_exclusive_handlers_; + } + } + + static std::unique_ptr pop(prio_queue& que) { + // work around std::priority_queue not having a pop() that returns value + auto t = std::move(const_cast&>(que.top())); + que.pop(); + return t; + } + bool lock_enabled_ = false; mutable std::mutex mtx_; std::condition_variable cond_; @@ -247,8 +306,9 @@ class exec_pri_queue : public boost::asio::execution_context uint32_t max_waiting_{0}; bool exiting_blocking_{false}; std::function should_exit_; // called holding mtx_ - using prio_queue = std::priority_queue, std::deque>, deref_less>; - prio_queue handlers_; + prio_queue read_only_handlers_; + prio_queue read_write_handlers_; + prio_queue read_exclusive_handlers_; }; } // appbase diff --git a/libraries/custom_appbase/tests/CMakeLists.txt b/libraries/custom_appbase/tests/CMakeLists.txt index 95f44b66d6..7df7d2a964 100644 --- a/libraries/custom_appbase/tests/CMakeLists.txt +++ b/libraries/custom_appbase/tests/CMakeLists.txt @@ -7,7 +7,7 @@ endif() file(GLOB UNIT_TESTS "*.cpp") add_executable( custom_appbase_test ${UNIT_TESTS} ) -target_link_libraries( custom_appbase_test appbase ${CMAKE_DL_LIBS} ${PLATFORM_SPECIFIC_LIBS} ) +target_link_libraries( custom_appbase_test appbase fc ${CMAKE_DL_LIBS} ${PLATFORM_SPECIFIC_LIBS} ) target_include_directories( custom_appbase_test PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/../include" "${CMAKE_CURRENT_SOURCE_DIR}/../../appbase/include" ) add_test( custom_appbase_test custom_appbase_test ) diff --git a/libraries/custom_appbase/tests/custom_appbase_tests.cpp b/libraries/custom_appbase/tests/custom_appbase_tests.cpp index 9ee5af40cd..97f2fcdc2b 100644 --- a/libraries/custom_appbase/tests/custom_appbase_tests.cpp +++ b/libraries/custom_appbase/tests/custom_appbase_tests.cpp @@ -1,10 +1,13 @@ #define BOOST_TEST_MODULE custom_appbase_tests #include -#include -#include #include +#include + +#include +#include + using namespace appbase; BOOST_AUTO_TEST_SUITE(custom_appbase_tests) @@ -19,7 +22,20 @@ std::thread start_app_thread(appbase::scoped_app& app) { return app_thread; } -// verify functions from both queues are executed when execution window is not explictly set +std::thread start_read_thread(appbase::scoped_app& app) { + static int num = 0; + std::thread read_thread( [&]() { + std::string name ="read-" + std::to_string(num++); + fc::set_thread_name(name); + bool more = true; + while (more) { + more = app->executor().execute_highest_read(); // blocks until all read only threads are idle + } + }); + return read_thread; +} + +// verify functions from both queues (read_only,read_write) are executed when execution window is not explicitly set BOOST_AUTO_TEST_CASE( default_exec_window ) { appbase::scoped_app app; auto app_thread = start_app_thread(app); @@ -27,28 +43,32 @@ BOOST_AUTO_TEST_CASE( default_exec_window ) { // post functions std::map rslts {}; int seq_num = 0; - app->executor().post( priority::medium, exec_queue::read_only, [&]() { rslts[0]=seq_num; ++seq_num; } ); - app->executor().post( priority::medium, exec_queue::read_write, [&]() { rslts[1]=seq_num; ++seq_num; } ); - app->executor().post( priority::high, exec_queue::read_write, [&]() { rslts[2]=seq_num; ++seq_num; } ); - app->executor().post( priority::high, exec_queue::read_only, [&]() { rslts[3]=seq_num; ++seq_num; } ); - app->executor().post( priority::low, exec_queue::read_only, [&]() { rslts[4]=seq_num; ++seq_num; } ); - app->executor().post( priority::low, exec_queue::read_write, [&]() { rslts[5]=seq_num; ++seq_num; } ); - app->executor().post( priority::highest,exec_queue::read_only, [&]() { rslts[6]=seq_num; ++seq_num; } ); - app->executor().post( priority::high, exec_queue::read_write, [&]() { rslts[7]=seq_num; ++seq_num; } ); + app->executor().post( priority::medium, exec_queue::read_only, [&]() { rslts[0]=seq_num; ++seq_num; } ); + app->executor().post( priority::medium, exec_queue::read_write, [&]() { rslts[1]=seq_num; ++seq_num; } ); + app->executor().post( priority::high, exec_queue::read_write, [&]() { rslts[2]=seq_num; ++seq_num; } ); + app->executor().post( priority::high, exec_queue::read_only, [&]() { rslts[3]=seq_num; ++seq_num; } ); + app->executor().post( priority::low, exec_queue::read_only, [&]() { rslts[4]=seq_num; ++seq_num; } ); + app->executor().post( priority::low, exec_queue::read_write, [&]() { rslts[5]=seq_num; ++seq_num; } ); + app->executor().post( priority::highest,exec_queue::read_only, [&]() { rslts[6]=seq_num; ++seq_num; } ); + app->executor().post( priority::high, exec_queue::read_write, [&]() { rslts[7]=seq_num; ++seq_num; } ); + app->executor().post( priority::high, exec_queue::read_exclusive, [&]() { rslts[8]=seq_num; ++seq_num; } ); + app->executor().post( priority::low, exec_queue::read_exclusive, [&]() { rslts[9]=seq_num; ++seq_num; } ); // Stop app. Use the lowest priority to make sure this function to execute the last app->executor().post( priority::lowest, exec_queue::read_only, [&]() { // read_only_queue should only contain the current lambda function, // and read_write_queue should have executed all its functions - BOOST_REQUIRE_EQUAL( app->executor().read_only_queue().size(), 1u); // pop()s after execute - BOOST_REQUIRE_EQUAL( app->executor().read_write_queue().size(), 0u ); + BOOST_REQUIRE_EQUAL( app->executor().read_only_queue_size(), 1u); // pop()s after execute + BOOST_REQUIRE_EQUAL( app->executor().read_exclusive_queue_size(), 2u ); + BOOST_REQUIRE_EQUAL( app->executor().read_write_queue_size(), 0u ); app->quit(); } ); app_thread.join(); - // both queues are cleared after execution - BOOST_REQUIRE_EQUAL( app->executor().read_only_queue().empty(), true); - BOOST_REQUIRE_EQUAL( app->executor().read_write_queue().empty(), true); + // all queues are cleared when exiting application::exec() + BOOST_REQUIRE_EQUAL( app->executor().read_only_queue_empty(), true); + BOOST_REQUIRE_EQUAL( app->executor().read_exclusive_queue_empty(), true); + BOOST_REQUIRE_EQUAL( app->executor().read_write_queue_empty(), true); // exactly number of both queues' functions processed BOOST_REQUIRE_EQUAL( rslts.size(), 8u ); @@ -64,8 +84,8 @@ BOOST_AUTO_TEST_CASE( default_exec_window ) { BOOST_CHECK_LT( rslts[6], rslts[7] ); } -// verify functions only from read_only queue are processed during read window -BOOST_AUTO_TEST_CASE( execute_from_read_queue ) { +// verify functions only from read_only queue are processed during read window on the main thread +BOOST_AUTO_TEST_CASE( execute_from_read_only_queue ) { appbase::scoped_app app; auto app_thread = start_app_thread(app); @@ -75,46 +95,48 @@ BOOST_AUTO_TEST_CASE( execute_from_read_queue ) { // post functions std::map rslts {}; int seq_num = 0; - app->executor().post( priority::medium, exec_queue::read_write, [&]() { rslts[0]=seq_num; ++seq_num; } ); - app->executor().post( priority::high, exec_queue::read_only, [&]() { rslts[1]=seq_num; ++seq_num; } ); - app->executor().post( priority::high, exec_queue::read_write, [&]() { rslts[2]=seq_num; ++seq_num; } ); - app->executor().post( priority::high, exec_queue::read_only, [&]() { rslts[3]=seq_num; ++seq_num; } ); - app->executor().post( priority::low, exec_queue::read_only, [&]() { rslts[4]=seq_num; ++seq_num; } ); - app->executor().post( priority::low, exec_queue::read_write, [&]() { rslts[5]=seq_num; ++seq_num; } ); - app->executor().post( priority::highest,exec_queue::read_only, [&]() { rslts[6]=seq_num; ++seq_num; } ); - app->executor().post( priority::highest,exec_queue::read_only, [&]() { rslts[7]=seq_num; ++seq_num; } ); - app->executor().post( priority::high, exec_queue::read_only, [&]() { rslts[8]=seq_num; ++seq_num; } ); - app->executor().post( priority::high, exec_queue::read_write, [&]() { rslts[9]=seq_num; ++seq_num; } ); + app->executor().post( priority::medium, exec_queue::read_write, [&]() { rslts[0]=seq_num; ++seq_num; } ); + app->executor().post( priority::high, exec_queue::read_only, [&]() { rslts[1]=seq_num; ++seq_num; } ); + app->executor().post( priority::high, exec_queue::read_write, [&]() { rslts[2]=seq_num; ++seq_num; } ); + app->executor().post( priority::high, exec_queue::read_only, [&]() { rslts[3]=seq_num; ++seq_num; } ); + app->executor().post( priority::low, exec_queue::read_only, [&]() { rslts[4]=seq_num; ++seq_num; } ); + app->executor().post( priority::low, exec_queue::read_write, [&]() { rslts[5]=seq_num; ++seq_num; } ); + app->executor().post( priority::highest,exec_queue::read_only, [&]() { rslts[6]=seq_num; ++seq_num; } ); + app->executor().post( priority::high, exec_queue::read_exclusive, [&]() { rslts[7]=seq_num; ++seq_num; } ); + app->executor().post( priority::low, exec_queue::read_exclusive, [&]() { rslts[8]=seq_num; ++seq_num; } ); + app->executor().post( priority::high, exec_queue::read_write, [&]() { rslts[9]=seq_num; ++seq_num; } ); // stop application. Use lowest at the end to make sure this executes the last app->executor().post( priority::lowest, exec_queue::read_only, [&]() { // read_queue should be empty (read window pops before execute) and write_queue should have all its functions - BOOST_REQUIRE_EQUAL( app->executor().read_only_queue().size(), 0u); // pop()s before execute - BOOST_REQUIRE_EQUAL( app->executor().read_write_queue().size(), 4u ); + BOOST_REQUIRE_EQUAL( app->executor().read_only_queue_size(), 0u); // pop()s before execute + BOOST_REQUIRE_EQUAL( app->executor().read_exclusive_queue_size(), 2u); + BOOST_REQUIRE_EQUAL( app->executor().read_write_queue_size(), 4u ); app->quit(); } ); app_thread.join(); - // both queues are cleared after execution - BOOST_REQUIRE_EQUAL( app->executor().read_only_queue().empty(), true); - BOOST_REQUIRE_EQUAL( app->executor().read_write_queue().empty(), true); + // all queues are cleared when exiting application::exec() + BOOST_REQUIRE_EQUAL( app->executor().read_only_queue_empty(), true); + BOOST_REQUIRE_EQUAL( app->executor().read_exclusive_queue_empty(), true); + BOOST_REQUIRE_EQUAL( app->executor().read_write_queue_empty(), true); // exactly number of posts processed - BOOST_REQUIRE_EQUAL( rslts.size(), 6u ); + BOOST_REQUIRE_EQUAL( rslts.size(), 4u ); - // same priority (high) of functions in read_queue executed by the post order + // same priority (high) of functions in read queues executed by the post order BOOST_CHECK_LT( rslts[1], rslts[3] ); - // higher priority posted earlier in read_queue executed earlier + // higher priority posted earlier in read queues executed earlier BOOST_CHECK_LT( rslts[3], rslts[4] ); } -// verify no functions are executed during read window if read_only queue is empty -BOOST_AUTO_TEST_CASE( execute_from_empty_read_queue ) { +// verify no functions are executed during read window if read_only & read_exclusive queue is empty +BOOST_AUTO_TEST_CASE( execute_from_empty_read_only_queue ) { appbase::scoped_app app; auto app_thread = start_app_thread(app); - // set to run functions from read_only queue only + // set to run functions from read_only & read_exclusive queues only app->executor().set_to_read_window(1, [](){return false;}); // post functions @@ -134,22 +156,24 @@ BOOST_AUTO_TEST_CASE( execute_from_empty_read_queue ) { // Stop application. Use lowest at the end to make sure this executes the last app->executor().post( priority::lowest, exec_queue::read_only, [&]() { // read_queue should be empty (read window pops before execute) and write_queue should have all its functions - BOOST_REQUIRE_EQUAL( app->executor().read_only_queue().size(), 0u); // pop()s before execute - BOOST_REQUIRE_EQUAL( app->executor().read_write_queue().size(), 10u ); + BOOST_REQUIRE_EQUAL( app->executor().read_only_queue_size(), 0u); // pop()s before execute + BOOST_REQUIRE_EQUAL( app->executor().read_exclusive_queue_size(), 0u); + BOOST_REQUIRE_EQUAL( app->executor().read_write_queue_size(), 10u ); app->quit(); } ); app_thread.join(); - // both queues are cleared after execution - BOOST_REQUIRE_EQUAL( app->executor().read_only_queue().empty(), true); - BOOST_REQUIRE_EQUAL( app->executor().read_write_queue().empty(), true); + // all queues are cleared when exiting application::exec() + BOOST_REQUIRE_EQUAL( app->executor().read_only_queue_empty(), true); + BOOST_REQUIRE_EQUAL( app->executor().read_only_queue_empty(), true); + BOOST_REQUIRE_EQUAL( app->executor().read_write_queue_empty(), true); // no results BOOST_REQUIRE_EQUAL( rslts.size(), 0u ); } -// verify functions from both queues are processed in write window -BOOST_AUTO_TEST_CASE( execute_from_both_queues ) { +// verify functions from both queues (read_only, read_write) are processed in write window, but not read_exclusive +BOOST_AUTO_TEST_CASE( execute_from_read_only_and_read_write_queues ) { appbase::scoped_app app; auto app_thread = start_app_thread(app); @@ -159,32 +183,37 @@ BOOST_AUTO_TEST_CASE( execute_from_both_queues ) { // post functions std::map rslts {}; int seq_num = 0; - app->executor().post( priority::medium, exec_queue::read_only, [&]() { rslts[0]=seq_num; ++seq_num; } ); - app->executor().post( priority::medium, exec_queue::read_write, [&]() { rslts[1]=seq_num; ++seq_num; } ); - app->executor().post( priority::high, exec_queue::read_write, [&]() { rslts[2]=seq_num; ++seq_num; } ); - app->executor().post( priority::lowest, exec_queue::read_only, [&]() { rslts[3]=seq_num; ++seq_num; } ); - app->executor().post( priority::low, exec_queue::read_only, [&]() { rslts[4]=seq_num; ++seq_num; } ); - app->executor().post( priority::low, exec_queue::read_write, [&]() { rslts[5]=seq_num; ++seq_num; } ); - app->executor().post( priority::highest,exec_queue::read_only, [&]() { rslts[6]=seq_num; ++seq_num; } ); - app->executor().post( priority::low, exec_queue::read_write, [&]() { rslts[7]=seq_num; ++seq_num; } ); - app->executor().post( priority::lowest, exec_queue::read_only, [&]() { rslts[8]=seq_num; ++seq_num; } ); - app->executor().post( priority::lowest, exec_queue::read_only, [&]() { rslts[9]=seq_num; ++seq_num; } ); - app->executor().post( priority::low, exec_queue::read_write, [&]() { rslts[10]=seq_num; ++seq_num; } ); - app->executor().post( priority::medium, exec_queue::read_write, [&]() { rslts[11]=seq_num; ++seq_num; } ); + app->executor().post( priority::medium, exec_queue::read_only, [&]() { rslts[0]=seq_num; ++seq_num; } ); + app->executor().post( priority::medium, exec_queue::read_write, [&]() { rslts[1]=seq_num; ++seq_num; } ); + app->executor().post( priority::high, exec_queue::read_write, [&]() { rslts[2]=seq_num; ++seq_num; } ); + app->executor().post( priority::lowest, exec_queue::read_only, [&]() { rslts[3]=seq_num; ++seq_num; } ); + app->executor().post( priority::low, exec_queue::read_only, [&]() { rslts[4]=seq_num; ++seq_num; } ); + app->executor().post( priority::low, exec_queue::read_write, [&]() { rslts[5]=seq_num; ++seq_num; } ); + app->executor().post( priority::highest,exec_queue::read_only, [&]() { rslts[6]=seq_num; ++seq_num; } ); + app->executor().post( priority::low, exec_queue::read_write, [&]() { rslts[7]=seq_num; ++seq_num; } ); + app->executor().post( priority::lowest, exec_queue::read_only, [&]() { rslts[8]=seq_num; ++seq_num; } ); + app->executor().post( priority::lowest, exec_queue::read_only, [&]() { rslts[9]=seq_num; ++seq_num; } ); + app->executor().post( priority::low, exec_queue::read_write, [&]() { rslts[10]=seq_num; ++seq_num; } ); + app->executor().post( priority::medium, exec_queue::read_write, [&]() { rslts[11]=seq_num; ++seq_num; } ); + app->executor().post( priority::highest,exec_queue::read_exclusive, [&]() { rslts[12]=seq_num; ++seq_num; } ); + app->executor().post( priority::lowest, exec_queue::read_exclusive, [&]() { rslts[13]=seq_num; ++seq_num; } ); + app->executor().post( priority::medium, exec_queue::read_exclusive, [&]() { rslts[14]=seq_num; ++seq_num; } ); // stop application. Use lowest at the end to make sure this executes the last app->executor().post( priority::lowest, exec_queue::read_only, [&]() { // read_queue should have current function and write_queue's functions are all executed - BOOST_REQUIRE_EQUAL( app->executor().read_only_queue().size(), 1u); // pop()s after execute - BOOST_REQUIRE_EQUAL( app->executor().read_write_queue().size(), 0u ); + BOOST_REQUIRE_EQUAL( app->executor().read_only_queue_size(), 1u); // pop()s after execute + BOOST_REQUIRE_EQUAL( app->executor().read_exclusive_queue_size(), 3u); + BOOST_REQUIRE_EQUAL( app->executor().read_write_queue_size(), 0u ); app->quit(); } ); app_thread.join(); - // queues are emptied after quit - BOOST_REQUIRE_EQUAL( app->executor().read_only_queue().empty(), true); - BOOST_REQUIRE_EQUAL( app->executor().read_write_queue().empty(), true); + // queues are emptied after exec + BOOST_REQUIRE_EQUAL( app->executor().read_only_queue_empty(), true); + BOOST_REQUIRE_EQUAL( app->executor().read_exclusive_queue_empty(), true); + BOOST_REQUIRE_EQUAL( app->executor().read_write_queue_empty(), true); // exactly number of posts processed BOOST_REQUIRE_EQUAL( rslts.size(), 12u ); @@ -212,4 +241,98 @@ BOOST_AUTO_TEST_CASE( execute_from_both_queues ) { BOOST_CHECK_LT( rslts[6], rslts[11] ); } +// verify tasks from both queues (read_only, read_exclusive) are processed in read window +BOOST_AUTO_TEST_CASE( execute_from_read_only_and_read_exclusive_queues ) { + appbase::scoped_app app; + + // set to run functions from read_only & read_exclusive queues only + app->executor().set_to_read_window(3, [](){return false;}); + + // post functions + std::vector> rslts(16); + std::atomic seq_num = 0; + app->executor().post( priority::medium, exec_queue::read_only, [&]() { rslts.at(0)=1; ++seq_num; } ); + app->executor().post( priority::medium, exec_queue::read_exclusive, [&]() { rslts.at(1)=2; ++seq_num; } ); + app->executor().post( priority::high, exec_queue::read_exclusive, [&]() { rslts.at(2)=3; ++seq_num; } ); + app->executor().post( priority::lowest, exec_queue::read_only, [&]() { rslts.at(3)=4; ++seq_num; } ); + app->executor().post( priority::low, exec_queue::read_only, [&]() { rslts.at(4)=5; ++seq_num; } ); + app->executor().post( priority::low, exec_queue::read_write, [&]() { rslts.at(5)=6; ++seq_num; } ); + app->executor().post( priority::highest,exec_queue::read_only, [&]() { rslts.at(6)=7; ++seq_num; } ); + app->executor().post( priority::medium, exec_queue::read_write, [&]() { rslts.at(7)=8; ++seq_num; } ); + app->executor().post( priority::lowest, exec_queue::read_only, [&]() { rslts.at(8)=9; ++seq_num; } ); + app->executor().post( priority::lowest, exec_queue::read_exclusive, [&]() { rslts.at(9)=10; ++seq_num; } ); + app->executor().post( priority::low, exec_queue::read_write, [&]() { rslts.at(10)=11; ++seq_num; } ); + app->executor().post( priority::medium, exec_queue::read_exclusive, [&]() { rslts.at(11)=12; ++seq_num; } ); + app->executor().post( priority::highest,exec_queue::read_exclusive, [&]() { rslts.at(12)=13; ++seq_num; } ); + app->executor().post( priority::lowest, exec_queue::read_exclusive, [&]() { rslts.at(13)=14; ++seq_num; } ); + app->executor().post( priority::medium, exec_queue::read_exclusive, [&]() { rslts.at(14)=15; ++seq_num; } ); + app->executor().post( priority::low, exec_queue::read_only, [&]() { rslts.at(15)=16; ++seq_num; } ); + + // Use lowest at the end to make sure this executes the last + app->executor().post( priority::lowest, exec_queue::read_exclusive, [&]() { + BOOST_REQUIRE_EQUAL( app->executor().read_only_queue_size(), 0u); // pop()s before execute + BOOST_REQUIRE_EQUAL( app->executor().read_exclusive_queue_size(), 0u); + BOOST_REQUIRE_EQUAL( app->executor().read_write_queue_size(), 3u ); + } ); + + + std::optional work; + work.emplace(app->get_io_service()); + while( true ) { + app->get_io_service().poll(); + size_t s = app->executor().read_only_queue_size() + app->executor().read_exclusive_queue_size() + app->executor().read_write_queue_size(); + if (s == 17) + break; + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } + + auto app_thread = start_app_thread(app); + constexpr size_t num_expected = 13u; // 16 - 3 read_write + + auto read_thread1 = start_read_thread(app); + auto read_thread2 = start_read_thread(app); + auto read_thread3 = start_read_thread(app); + read_thread1.join(); + read_thread2.join(); + read_thread3.join(); + + size_t num_sleeps = 0; + while (seq_num < num_expected) { + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + if (++num_sleeps > 10000) + break; + }; + work.reset(); + app->quit(); + app_thread.join(); + + // queues are emptied after exec + BOOST_REQUIRE_EQUAL( app->executor().read_only_queue_empty(), true); + BOOST_REQUIRE_EQUAL( app->executor().read_exclusive_queue_empty(), true); + BOOST_REQUIRE_EQUAL( app->executor().read_write_queue_empty(), true); + + // exactly number of posts processed + BOOST_REQUIRE_EQUAL( std::count_if(rslts.cbegin(), rslts.cend(), [](const auto& v){return v > 0; }), num_expected ); + + // all low must be processed the in order of posting + BOOST_CHECK_LT( rslts[4], rslts[15] ); + + // all medium must be processed the in order of posting + BOOST_CHECK_LT( rslts[0], rslts[1] ); + BOOST_CHECK_LT( rslts[1], rslts[11] ); + BOOST_CHECK_LT( rslts[11], rslts[14] ); + + // all functions posted after high before highest must be processed after high + BOOST_CHECK_LT( rslts[2], rslts[3] ); + BOOST_CHECK_LT( rslts[2], rslts[4] ); + BOOST_CHECK_LT( rslts[2], rslts[9] ); + + // all functions posted after highest must be processed after it + BOOST_CHECK_LT( rslts[6], rslts[8] ); + BOOST_CHECK_LT( rslts[6], rslts[9] ); + BOOST_CHECK_LT( rslts[6], rslts[11] ); + BOOST_CHECK_LT( rslts[6], rslts[12] ); + BOOST_CHECK_LT( rslts[6], rslts[14] ); +} + BOOST_AUTO_TEST_SUITE_END() diff --git a/plugins/producer_plugin/producer_plugin.cpp b/plugins/producer_plugin/producer_plugin.cpp index 924b61bf92..3d8ac08cd1 100644 --- a/plugins/producer_plugin/producer_plugin.cpp +++ b/plugins/producer_plugin/producer_plugin.cpp @@ -2844,7 +2844,8 @@ void producer_plugin_impl::switch_to_read_window() { _time_tracker.pause(); // we are in write window, so no read-only trx threads are processing transactions. - if (app().executor().read_only_queue().empty() && app().executor().read_exclusive_queue().empty()) { // no read-only tasks to process. stay in write window + app().get_io_service().poll(); // make sure we schedule any ready + if (app().executor().read_only_queue_empty() && app().executor().read_exclusive_queue_empty()) { // no read-only tasks to process. stay in write window start_write_window(); // restart write window timer for next round return; } From ae2424d685acb8beff7a19349cd49973167bab8d Mon Sep 17 00:00:00 2001 From: Kevin Heifner Date: Wed, 20 Sep 2023 11:14:32 -0500 Subject: [PATCH 16/20] GH-1639 More simplification --- .../include/eosio/chain/application.hpp | 2 +- .../include/eosio/chain/exec_pri_queue.hpp | 63 ++++++++----------- 2 files changed, 26 insertions(+), 39 deletions(-) diff --git a/libraries/custom_appbase/include/eosio/chain/application.hpp b/libraries/custom_appbase/include/eosio/chain/application.hpp index e6fa906f68..8d514b5bba 100644 --- a/libraries/custom_appbase/include/eosio/chain/application.hpp +++ b/libraries/custom_appbase/include/eosio/chain/application.hpp @@ -71,7 +71,7 @@ class priority_queue_executor { bool more = false; while (true) { get_io_service().poll(); // schedule any queued - more = pri_queue_.execute_highest_locked(exec_queue::read_only, exec_queue::read_exclusive, true); + more = pri_queue_.execute_highest_blocking_locked(exec_queue::read_only, exec_queue::read_exclusive); if (!more || std::chrono::high_resolution_clock::now() > end) break; } diff --git a/libraries/custom_appbase/include/eosio/chain/exec_pri_queue.hpp b/libraries/custom_appbase/include/eosio/chain/exec_pri_queue.hpp index abec59e810..0fdae83e34 100644 --- a/libraries/custom_appbase/include/eosio/chain/exec_pri_queue.hpp +++ b/libraries/custom_appbase/include/eosio/chain/exec_pri_queue.hpp @@ -71,15 +71,15 @@ class exec_pri_queue : public boost::asio::execution_context read_exclusive_handlers_ = prio_queue(); } - // only call when no lock required - bool execute_highest(exec_queue q) { + bool execute_highest_locked(exec_queue q) { prio_queue& que = priority_que(q); - if( !que.empty() ) { - que.top()->execute(); - que.pop(); - } - - return !que.empty(); + std::unique_lock g(mtx_); + if (que.empty()) + return false; + auto t = pop(que); + g.unlock(); + t->execute(); + return true; } // only call when no lock required @@ -99,39 +99,26 @@ class exec_pri_queue : public boost::asio::execution_context return size > 0; } - bool execute_highest_locked(exec_queue q) { - prio_queue& que = priority_que(q); - std::unique_lock g(mtx_); - if (que.empty()) - return false; - auto t = pop(que); - g.unlock(); - t->execute(); - return true; - } - - bool execute_highest_locked(exec_queue lhs, exec_queue rhs, bool should_block) { + bool execute_highest_blocking_locked(exec_queue lhs, exec_queue rhs) { prio_queue& lhs_que = priority_que(lhs); prio_queue& rhs_que = priority_que(rhs); std::unique_lock g(mtx_); - if (should_block) { - ++num_waiting_; - cond_.wait(g, [&](){ - bool exit = exiting_blocking_ || should_exit_(); - bool empty = lhs_que.empty() && rhs_que.empty(); - if (empty || exit) { - if (((empty && num_waiting_ == max_waiting_) || exit) && !exiting_blocking_) { - cond_.notify_all(); - exiting_blocking_ = true; - } - return exit || exiting_blocking_; // same as calling should_exit(), but faster + ++num_waiting_; + cond_.wait(g, [&](){ + bool exit = exiting_blocking_ || should_exit_(); + bool empty = lhs_que.empty() && rhs_que.empty(); + if (empty || exit) { + if (((empty && num_waiting_ == max_waiting_) || exit) && !exiting_blocking_) { + exiting_blocking_ = true; + cond_.notify_all(); } - return true; - }); - --num_waiting_; - if (exiting_blocking_ || should_exit_()) - return false; - } + return exit || exiting_blocking_; // same as calling should_exit(), but faster + } + return true; + }); + --num_waiting_; + if (exiting_blocking_ || should_exit_()) + return false; if (lhs_que.empty() && rhs_que.empty()) return false; exec_queue q = rhs; @@ -140,7 +127,7 @@ class exec_pri_queue : public boost::asio::execution_context auto t = pop(priority_que(q)); g.unlock(); t->execute(); - return true; + return true; // this should never return false unless all read threads should exit } // Only call when locking disabled From e6bbaafceb843a75ccb48a2820a902f11c1398a8 Mon Sep 17 00:00:00 2001 From: Kevin Heifner Date: Wed, 20 Sep 2023 14:13:01 -0500 Subject: [PATCH 17/20] GH-1639 fix gcc warning --- .../custom_appbase/include/eosio/chain/exec_pri_queue.hpp | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/libraries/custom_appbase/include/eosio/chain/exec_pri_queue.hpp b/libraries/custom_appbase/include/eosio/chain/exec_pri_queue.hpp index 0fdae83e34..671e6961a6 100644 --- a/libraries/custom_appbase/include/eosio/chain/exec_pri_queue.hpp +++ b/libraries/custom_appbase/include/eosio/chain/exec_pri_queue.hpp @@ -266,6 +266,8 @@ class exec_pri_queue : public boost::asio::execution_context case exec_queue::read_exclusive: return read_exclusive_handlers_; } + assert(false); + return read_only_handlers_; } const prio_queue& priority_que(exec_queue q) const { @@ -277,6 +279,8 @@ class exec_pri_queue : public boost::asio::execution_context case exec_queue::read_exclusive: return read_exclusive_handlers_; } + assert(false); + return read_only_handlers_; } static std::unique_ptr pop(prio_queue& que) { From a6465e6509efe09252724fbd2fb7aa2a83d9d521 Mon Sep 17 00:00:00 2001 From: Kevin Heifner Date: Wed, 20 Sep 2023 20:54:14 -0500 Subject: [PATCH 18/20] GH-1639 Additional test --- .../tests/custom_appbase_tests.cpp | 78 +++++++++++++++++++ 1 file changed, 78 insertions(+) diff --git a/libraries/custom_appbase/tests/custom_appbase_tests.cpp b/libraries/custom_appbase/tests/custom_appbase_tests.cpp index 97f2fcdc2b..54b4c57688 100644 --- a/libraries/custom_appbase/tests/custom_appbase_tests.cpp +++ b/libraries/custom_appbase/tests/custom_appbase_tests.cpp @@ -335,4 +335,82 @@ BOOST_AUTO_TEST_CASE( execute_from_read_only_and_read_exclusive_queues ) { BOOST_CHECK_LT( rslts[6], rslts[14] ); } +// verify tasks from both queues (read_only, read_exclusive) are processed in read window +BOOST_AUTO_TEST_CASE( execute_many_from_read_only_and_read_exclusive_queues ) { + appbase::scoped_app app; + + auto app_thread = start_app_thread(app); + std::thread::id app_thread_id = app_thread.get_id(); + + // set to run functions from read_only & read_exclusive queues only + app->executor().set_to_read_window(3, [](){return false;}); + + // post functions + constexpr size_t num_expected = 600u; + std::vector> rslts(num_expected); + std::atomic seq_num = 0; + for (size_t i = 0; i < 200; i+=5) { + app->executor().post( priority::high, exec_queue::read_exclusive, [&,i]() { rslts.at(i) = std::this_thread::get_id(); ++seq_num; std::this_thread::sleep_for(std::chrono::microseconds(10)); } ); + app->executor().post( priority::low, exec_queue::read_only, [&,i]() { rslts.at(i+1) = std::this_thread::get_id(); ++seq_num; } ); + app->executor().post( priority::low, exec_queue::read_exclusive, [&,i]() { rslts.at(i+2) = std::this_thread::get_id(); ++seq_num; } ); + app->executor().post( priority::high, exec_queue::read_only, [&,i]() { rslts.at(i+3) = std::this_thread::get_id(); ++seq_num; } ); + app->executor().post( priority::medium, exec_queue::read_only, [&,i]() { rslts.at(i+4) = std::this_thread::get_id(); ++seq_num; std::this_thread::sleep_for(std::chrono::microseconds(i+1)); } ); + } + auto read_thread1 = start_read_thread(app); + std::thread::id read_thread1_id = read_thread1.get_id(); + for (size_t i = 200; i < 400; i+=5) { + app->executor().post( priority::high, exec_queue::read_exclusive, [&,i]() { rslts.at(i) = std::this_thread::get_id(); ++seq_num; std::this_thread::sleep_for(std::chrono::microseconds(i)); } ); + app->executor().post( priority::low, exec_queue::read_only, [&,i]() { rslts.at(i+1) = std::this_thread::get_id(); ++seq_num; std::this_thread::sleep_for(std::chrono::microseconds(i)); } ); + app->executor().post( priority::low, exec_queue::read_exclusive, [&,i]() { rslts.at(i+2) = std::this_thread::get_id(); ++seq_num; std::this_thread::sleep_for(std::chrono::microseconds(i)); } ); + app->executor().post( priority::high, exec_queue::read_only, [&,i]() { rslts.at(i+3) = std::this_thread::get_id(); ++seq_num; std::this_thread::sleep_for(std::chrono::microseconds(i)); } ); + app->executor().post( priority::medium, exec_queue::read_exclusive, [&,i]() { rslts.at(i+4) = std::this_thread::get_id(); ++seq_num; std::this_thread::sleep_for(std::chrono::microseconds(i)); } ); + } + auto read_thread2 = start_read_thread(app); + std::thread::id read_thread2_id = read_thread2.get_id(); + for (size_t i = 400; i < num_expected; i+=5) { + app->executor().post( priority::high, exec_queue::read_only, [&,i]() { rslts.at(i) = std::this_thread::get_id(); ++seq_num; std::this_thread::sleep_for(std::chrono::microseconds(10)); } ); + app->executor().post( priority::low, exec_queue::read_only, [&,i]() { rslts.at(i+1) = std::this_thread::get_id(); ++seq_num; std::this_thread::sleep_for(std::chrono::microseconds(10)); } ); + app->executor().post( priority::low, exec_queue::read_only, [&,i]() { rslts.at(i+2) = std::this_thread::get_id(); ++seq_num; } ); + app->executor().post( priority::high, exec_queue::read_only, [&,i]() { rslts.at(i+3) = std::this_thread::get_id(); ++seq_num; } ); + app->executor().post( priority::medium, exec_queue::read_exclusive, [&,i]() { rslts.at(i+4) = std::this_thread::get_id(); ++seq_num; } ); + } + auto read_thread3 = start_read_thread(app); + std::thread::id read_thread3_id = read_thread3.get_id(); + + // Use lowest at the end to make sure this executes the last + app->executor().post( priority::lowest, exec_queue::read_exclusive, [&]() { + BOOST_REQUIRE_EQUAL( app->executor().read_only_queue_size(), 0u); // pop()s before execute + BOOST_REQUIRE_EQUAL( app->executor().read_exclusive_queue_size(), 0u); + BOOST_REQUIRE_EQUAL( app->executor().read_write_queue_size(), 0u ); + } ); + + read_thread1.join(); + read_thread2.join(); + read_thread3.join(); + + size_t num_sleeps = 0; + while (seq_num < num_expected) { + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + if (++num_sleeps > 10000) + break; + }; + + app->quit(); + app_thread.join(); + + // exactly number of posts processed + BOOST_REQUIRE_EQUAL( std::count_if(rslts.cbegin(), rslts.cend(), [](const auto& v){ return v != std::thread::id(); }), num_expected ); + + size_t run_on_1 = std::count_if(rslts.cbegin(), rslts.cend(), [&](const auto& v){ return v == read_thread1_id; }); + size_t run_on_2 = std::count_if(rslts.cbegin(), rslts.cend(), [&](const auto& v){ return v == read_thread2_id; }); + size_t run_on_3 = std::count_if(rslts.cbegin(), rslts.cend(), [&](const auto& v){ return v == read_thread3_id; }); + size_t run_on_main = std::count_if(rslts.cbegin(), rslts.cend(), [&](const auto& v){ return v == app_thread_id; }); + + BOOST_REQUIRE_EQUAL(run_on_1+run_on_2+run_on_3+run_on_main, num_expected); + BOOST_CHECK(run_on_1 > 0); + BOOST_CHECK(run_on_2 > 0); + BOOST_CHECK(run_on_3 > 0); + BOOST_CHECK(run_on_main > 0); +} + BOOST_AUTO_TEST_SUITE_END() From b077dc3e2a1fbc1f9d2da50a40226568173a4671 Mon Sep 17 00:00:00 2001 From: Kevin Heifner Date: Fri, 22 Sep 2023 09:19:51 -0500 Subject: [PATCH 19/20] GH-1639 Update max-transaction-time help description --- .../03_plugins/producer_plugin/index.md | 9 ++++---- .../include/eosio/chain/application.hpp | 10 +++++++-- .../include/eosio/chain/exec_pri_queue.hpp | 21 ++++++++++++------- .../tests/custom_appbase_tests.cpp | 12 +++++++---- plugins/producer_plugin/producer_plugin.cpp | 2 +- 5 files changed, 36 insertions(+), 18 deletions(-) diff --git a/docs/01_nodeos/03_plugins/producer_plugin/index.md b/docs/01_nodeos/03_plugins/producer_plugin/index.md index 07aa7b1067..de4c154fe4 100644 --- a/docs/01_nodeos/03_plugins/producer_plugin/index.md +++ b/docs/01_nodeos/03_plugins/producer_plugin/index.md @@ -27,10 +27,11 @@ Config Options for eosio::producer_plugin: chain is stale. -x [ --pause-on-startup ] Start this node in a state where production is paused - --max-transaction-time arg (=499) Locally lowers the max_transaction_cpu_ - usage limit (in milliseconds) that an - input transaction is allowed to execute - before being considered invalid + --max-transaction-time arg (=499) Setting this value (in milliseconds) + will restrict the allowed transaction + execution time to a value potentially + lower than the on-chain consensus + max_transaction_cpu_usage value. --max-irreversible-block-age arg (=-1) Limits the maximum age (in seconds) of the DPOS Irreversible Block for a chain diff --git a/libraries/custom_appbase/include/eosio/chain/application.hpp b/libraries/custom_appbase/include/eosio/chain/application.hpp index 8d514b5bba..1d510a996d 100644 --- a/libraries/custom_appbase/include/eosio/chain/application.hpp +++ b/libraries/custom_appbase/include/eosio/chain/application.hpp @@ -29,6 +29,12 @@ class priority_queue_executor { // This adds to the total time that the main thread can be busy when a high priority task is waiting. static constexpr uint16_t minimum_runtime_ms = 3; + // inform how many read_threads will be calling read_only/read_exclusive queues + // Currently only used to assert if exec_queue::read_exclusive is used without any read threads + void init_read_threads(size_t num_read_threads) { + pri_queue_.init_read_threads(num_read_threads); + } + template auto post( int priority, exec_queue q, Func&& func ) { return boost::asio::post( io_serv_, pri_queue_.wrap( priority, q, --order_, std::forward(func))); @@ -92,9 +98,9 @@ class priority_queue_executor { pri_queue_.clear(); } - void set_to_read_window(uint32_t num_threads, std::function should_exit) { + void set_to_read_window(std::function should_exit) { exec_window_ = exec_window::read; - pri_queue_.enable_locking(num_threads, std::move(should_exit)); + pri_queue_.enable_locking(std::move(should_exit)); } void set_to_write_window() { diff --git a/libraries/custom_appbase/include/eosio/chain/exec_pri_queue.hpp b/libraries/custom_appbase/include/eosio/chain/exec_pri_queue.hpp index 671e6961a6..15f7664eab 100644 --- a/libraries/custom_appbase/include/eosio/chain/exec_pri_queue.hpp +++ b/libraries/custom_appbase/include/eosio/chain/exec_pri_queue.hpp @@ -19,10 +19,10 @@ enum class exec_queue { // not being executed in read-only threads. Single threaded. read_exclusive // the queue storing tasks which should only be executed // in parallel with other read_exclusive or read_only tasks in the - // read-only thread pool. Should never be executed on the main thread. - // If no read-only thread pool is available this queue grows unbounded - // as tasks will never execute. User is responsible for not queueing - // read_exclusive tasks if no read-only thread pool is available. + // read-only thread pool. Will never be executed on the main thread. + // If no read-only thread pool is available that calls one of the execute_* with + // read_exclusive then this queue grows unbounded. exec_pri_queue asserts + // if asked to queue a read_exclusive task when init'ed with 0 read-only threads. }; // Locking has to be coordinated by caller, use with care. @@ -30,16 +30,21 @@ class exec_pri_queue : public boost::asio::execution_context { public: + // inform how many read_threads will be calling read_only/read_exclusive queues + void init_read_threads(size_t num_read_threads) { + num_read_threads_ = num_read_threads; + } + void stop() { std::lock_guard g( mtx_ ); exiting_blocking_ = true; cond_.notify_all(); } - void enable_locking(uint32_t num_threads, std::function should_exit) { - assert(num_threads > 0 && num_waiting_ == 0); + void enable_locking(std::function should_exit) { + assert(num_read_threads_ > 0 && num_waiting_ == 0); lock_enabled_ = true; - max_waiting_ = num_threads; + max_waiting_ = num_read_threads_; should_exit_ = std::move(should_exit); exiting_blocking_ = false; } @@ -52,6 +57,7 @@ class exec_pri_queue : public boost::asio::execution_context // called from appbase::application_base::exec poll_one() or run_one() template void add(int priority, exec_queue q, size_t order, Function function) { + assert( num_read_threads_ > 0 || q != exec_queue::read_exclusive); prio_queue& que = priority_que(q); std::unique_ptr handler(new queued_handler(priority, order, std::move(function))); if (lock_enabled_) { @@ -290,6 +296,7 @@ class exec_pri_queue : public boost::asio::execution_context return t; } + size_t num_read_threads_ = 0; bool lock_enabled_ = false; mutable std::mutex mtx_; std::condition_variable cond_; diff --git a/libraries/custom_appbase/tests/custom_appbase_tests.cpp b/libraries/custom_appbase/tests/custom_appbase_tests.cpp index 54b4c57688..77b75c43e8 100644 --- a/libraries/custom_appbase/tests/custom_appbase_tests.cpp +++ b/libraries/custom_appbase/tests/custom_appbase_tests.cpp @@ -90,7 +90,8 @@ BOOST_AUTO_TEST_CASE( execute_from_read_only_queue ) { auto app_thread = start_app_thread(app); // set to run functions from read_only queue only - app->executor().set_to_read_window(1, [](){return false;}); + app->executor().init_read_threads(1); + app->executor().set_to_read_window([](){return false;}); // post functions std::map rslts {}; @@ -137,7 +138,8 @@ BOOST_AUTO_TEST_CASE( execute_from_empty_read_only_queue ) { auto app_thread = start_app_thread(app); // set to run functions from read_only & read_exclusive queues only - app->executor().set_to_read_window(1, [](){return false;}); + app->executor().init_read_threads(1); + app->executor().set_to_read_window([](){return false;}); // post functions std::map rslts {}; @@ -245,8 +247,9 @@ BOOST_AUTO_TEST_CASE( execute_from_read_only_and_read_write_queues ) { BOOST_AUTO_TEST_CASE( execute_from_read_only_and_read_exclusive_queues ) { appbase::scoped_app app; + app->executor().init_read_threads(3); // set to run functions from read_only & read_exclusive queues only - app->executor().set_to_read_window(3, [](){return false;}); + app->executor().set_to_read_window([](){return false;}); // post functions std::vector> rslts(16); @@ -343,7 +346,8 @@ BOOST_AUTO_TEST_CASE( execute_many_from_read_only_and_read_exclusive_queues ) { std::thread::id app_thread_id = app_thread.get_id(); // set to run functions from read_only & read_exclusive queues only - app->executor().set_to_read_window(3, [](){return false;}); + app->executor().init_read_threads(3); + app->executor().set_to_read_window([](){return false;}); // post functions constexpr size_t num_expected = 600u; diff --git a/plugins/producer_plugin/producer_plugin.cpp b/plugins/producer_plugin/producer_plugin.cpp index 3d8ac08cd1..789aaaf14b 100644 --- a/plugins/producer_plugin/producer_plugin.cpp +++ b/plugins/producer_plugin/producer_plugin.cpp @@ -1051,7 +1051,7 @@ void producer_plugin::set_program_options( ("enable-stale-production,e", boost::program_options::bool_switch()->notifier([this](bool e){my->_production_enabled = e;}), "Enable block production, even if the chain is stale.") ("pause-on-startup,x", boost::program_options::bool_switch()->notifier([this](bool p){my->_pause_production = p;}), "Start this node in a state where production is paused") ("max-transaction-time", bpo::value()->default_value(config::block_interval_ms-1), - "Locally lowers the max_transaction_cpu_usage limit (in milliseconds) that an input transaction is allowed to execute before being considered invalid") + "Setting this value (in milliseconds) will restrict the allowed transaction execution time to a value potentially lower than the on-chain consensus max_transaction_cpu_usage value.") ("max-irreversible-block-age", bpo::value()->default_value( -1 ), "Limits the maximum age (in seconds) of the DPOS Irreversible Block for a chain this node will produce blocks on (use negative value to indicate unlimited)") ("producer-name,p", boost::program_options::value>()->composing()->multitoken(), From b012cce7f5265de95b605ea6dfb3c5e5722cb963 Mon Sep 17 00:00:00 2001 From: Kevin Heifner Date: Fri, 22 Sep 2023 09:21:43 -0500 Subject: [PATCH 20/20] GH-1639 Add init_read_threads --- plugins/producer_plugin/producer_plugin.cpp | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/plugins/producer_plugin/producer_plugin.cpp b/plugins/producer_plugin/producer_plugin.cpp index 789aaaf14b..3185a13d24 100644 --- a/plugins/producer_plugin/producer_plugin.cpp +++ b/plugins/producer_plugin/producer_plugin.cpp @@ -810,7 +810,7 @@ class producer_plugin_impl : public std::enable_shared_from_this 0, unsupported_feature, "read-only transactions execution not enabled on API node. Set read-only-threads > 0" ); - // Post all read only trxs to read_only queue for execution. + // Post all read only trxs to read_exclusive queue for execution. auto trx_metadata = transaction_metadata::create_no_recover_keys(trx, transaction_metadata::trx_type::read_only); app().executor().post(priority::low, exec_queue::read_exclusive, [this, trx{std::move(trx_metadata)}, next{std::move(next)}]() mutable { push_read_only_transaction(std::move(trx), std::move(next)); @@ -1276,6 +1276,7 @@ void producer_plugin_impl::plugin_initialize(const boost::program_options::varia ilog("read-only-write-window-time-us: ${ww} us, read-only-read-window-time-us: ${rw} us, effective read window time to be used: ${w} us", ("ww", _ro_write_window_time_us)("rw", _ro_read_window_time_us)("w", _ro_read_window_effective_time_us)); } + app().executor().init_read_threads(_ro_thread_pool_size); // Make sure _ro_max_trx_time_us is always set. // Make sure a read-only transaction can finish within the read @@ -2853,8 +2854,7 @@ void producer_plugin_impl::switch_to_read_window() { uint32_t pending_block_num = chain.head_block_num() + 1; _ro_read_window_start_time = fc::time_point::now(); _ro_window_deadline = _ro_read_window_start_time + _ro_read_window_effective_time_us; - app().executor().set_to_read_window( - _ro_thread_pool_size, [received_block = &_received_block, pending_block_num, ro_window_deadline = _ro_window_deadline]() { + app().executor().set_to_read_window([received_block = &_received_block, pending_block_num, ro_window_deadline = _ro_window_deadline]() { return fc::time_point::now() >= ro_window_deadline || (received_block->load() >= pending_block_num); // should_exit() }); chain.set_to_read_window(); @@ -2910,7 +2910,7 @@ bool producer_plugin_impl::read_only_execution_task(uint32_t pending_block_num) // will be executed from the main app thread because all read-only threads are idle now self->switch_to_write_window(); }); - // last thread post any exhausted back into read_only queue with slightly higher priority (low+1) so they are executed first + // last thread post any exhausted back into read_exclusive queue with slightly higher priority (low+1) so they are executed first ro_trx_t t; while (_ro_exhausted_trx_queue.pop_front(t)) { app().executor().post(priority::low + 1, exec_queue::read_exclusive, [this, trx{std::move(t.trx)}, next{std::move(t.next)}]() mutable { @@ -2928,7 +2928,7 @@ void producer_plugin_impl::repost_exhausted_transactions(const fc::time_point& d if (!_ro_exhausted_trx_queue.empty()) { chain::controller& chain = chain_plug->chain(); uint32_t pending_block_num = chain.pending_block_num(); - // post any exhausted back into read_only queue with slightly higher priority (low+1) so they are executed first + // post any exhausted back into read_exclusive queue with slightly higher priority (low+1) so they are executed first ro_trx_t t; while (!should_interrupt_start_block(deadline, pending_block_num) && _ro_exhausted_trx_queue.pop_front(t)) { app().executor().post(priority::low + 1, exec_queue::read_exclusive, [this, trx{std::move(t.trx)}, next{std::move(t.next)}]() mutable {