Skip to content

Commit

Permalink
GH-1662 Assume application created from the main thread. Rework tests…
Browse files Browse the repository at this point in the history
… so that application is created on the main thread.
  • Loading branch information
heifner committed Oct 4, 2023
1 parent 7cddb45 commit 2856e13
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 50 deletions.
11 changes: 2 additions & 9 deletions libraries/custom_appbase/include/eosio/chain/application.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,8 @@ class priority_queue_executor {
return pri_queue_.get_read_threads();
}

// not thread safe, only call at program startup from the main thread (thread that calls app().exec()
void init_main_thread_id() {
main_thread_id_ = std::this_thread::get_id();
}

// not thread safe, but as long as set_main_thread_id() only called at program startup from main thread before
// other threads are created then this will be safe to access.
// assume application is started on the main thread
std::thread::id get_main_thread_id() const {
assert(main_thread_id_ != std::thread::id());
return main_thread_id_;
}

Expand Down Expand Up @@ -149,7 +142,7 @@ class priority_queue_executor {

// members are ordered taking into account that the last one is destructed first
private:
std::thread::id main_thread_id_;
std::thread::id main_thread_id_{ std::this_thread::get_id() };
boost::asio::io_service io_serv_;
appbase::exec_pri_queue pri_queue_;
std::atomic<std::size_t> order_{ std::numeric_limits<size_t>::max() }; // to maintain FIFO ordering in all queues within priority
Expand Down
112 changes: 72 additions & 40 deletions libraries/custom_appbase/tests/custom_appbase_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,61 @@ using namespace appbase;

BOOST_AUTO_TEST_SUITE(custom_appbase_tests)

std::thread start_app_thread(appbase::scoped_app& app) {
const char* argv[] = { boost::unit_test::framework::current_test_case().p_name->c_str() };
BOOST_CHECK(app->initialize(sizeof(argv) / sizeof(char*), const_cast<char**>(argv)));
app->startup();
std::thread app_thread( [&]() {
app->executor().init_main_thread_id();
app->exec();
} );
return app_thread;
}
class scoped_app_thread {
public:
explicit scoped_app_thread(bool delay_exec = false) {
app_thread_ = start_app_thread();
if (!delay_exec) {
start_exec();
}
}
~scoped_app_thread() { // destroy app instance so next instance gets a clean one
application::reset_app_singleton();
}

scoped_app_thread(const scoped_app_thread&) = delete;
scoped_app_thread& operator=(const scoped_app_thread&) = delete;

appbase::application* operator->() {
return app_;
}
const appbase::application* operator->() const {
return app_;
}

void start_exec() {
start_exec_.set_value();
}

void join() {
app_thread_.join();
}

private:
std::thread start_app_thread() {
std::promise<void> start_complete;
std::thread app_thread( [&]() {
assert(application::null_app_singleton());
app_ = &appbase::app();
const char* argv[] = { boost::unit_test::framework::current_test_case().p_name->c_str() };
BOOST_CHECK(app_->initialize(sizeof(argv) / sizeof(char*), const_cast<char**>(argv)));
app_->startup();
start_complete.set_value();
start_exec_.get_future().get();
app_->exec();
} );
start_complete.get_future().get();
return app_thread;
}

private:
std::thread app_thread_;
std::promise<void> start_exec_;
appbase::application* app_;
};


std::thread start_read_thread(appbase::scoped_app& app) {
std::thread start_read_thread(scoped_app_thread& app) {
static int num = 0;
std::thread read_thread( [&]() {
std::string name ="read-" + std::to_string(num++);
Expand All @@ -38,9 +81,8 @@ std::thread start_read_thread(appbase::scoped_app& app) {

// verify functions from both queues (read_only,read_write) are executed when execution window is not explicitly set
BOOST_AUTO_TEST_CASE( default_exec_window ) {
appbase::scoped_app app;
auto app_thread = start_app_thread(app);

scoped_app_thread app;

// post functions
std::map<int, int> rslts {};
int seq_num = 0;
Expand All @@ -52,19 +94,17 @@ BOOST_AUTO_TEST_CASE( default_exec_window ) {
app->executor().post( priority::low, exec_queue::read_write, [&]() { rslts[5]=seq_num; ++seq_num; } );
app->executor().post( priority::highest,exec_queue::read_only, [&]() { rslts[6]=seq_num; ++seq_num; } );
app->executor().post( priority::high, exec_queue::read_write, [&]() { rslts[7]=seq_num; ++seq_num; } );
app->executor().post( priority::high, exec_queue::read_exclusive, [&]() { rslts[8]=seq_num; ++seq_num; } );
app->executor().post( priority::low, exec_queue::read_exclusive, [&]() { rslts[9]=seq_num; ++seq_num; } );

// Stop app. Use the lowest priority to make sure this function to execute the last
app->executor().post( priority::lowest, exec_queue::read_only, [&]() {
// read_only_queue should only contain the current lambda function,
// and read_write_queue should have executed all its functions
BOOST_REQUIRE_EQUAL( app->executor().read_only_queue_size(), 1u); // pop()s after execute
BOOST_REQUIRE_EQUAL( app->executor().read_exclusive_queue_size(), 2u );
BOOST_REQUIRE_EQUAL( app->executor().read_exclusive_queue_size(), 0u );
BOOST_REQUIRE_EQUAL( app->executor().read_write_queue_size(), 0u );
app->quit();
} );
app_thread.join();
app.join();

// all queues are cleared when exiting application::exec()
BOOST_REQUIRE_EQUAL( app->executor().read_only_queue_empty(), true);
Expand All @@ -87,9 +127,8 @@ BOOST_AUTO_TEST_CASE( default_exec_window ) {

// verify functions only from read_only queue are processed during read window on the main thread
BOOST_AUTO_TEST_CASE( execute_from_read_only_queue ) {
appbase::scoped_app app;
auto app_thread = start_app_thread(app);

scoped_app_thread app;

// set to run functions from read_only queue only
app->executor().init_read_threads(1);
app->executor().set_to_read_window([](){return false;});
Expand All @@ -116,7 +155,7 @@ BOOST_AUTO_TEST_CASE( execute_from_read_only_queue ) {
BOOST_REQUIRE_EQUAL( app->executor().read_write_queue_size(), 4u );
app->quit();
} );
app_thread.join();
app.join();

// all queues are cleared when exiting application::exec()
BOOST_REQUIRE_EQUAL( app->executor().read_only_queue_empty(), true);
Expand All @@ -135,9 +174,8 @@ BOOST_AUTO_TEST_CASE( execute_from_read_only_queue ) {

// verify no functions are executed during read window if read_only & read_exclusive queue is empty
BOOST_AUTO_TEST_CASE( execute_from_empty_read_only_queue ) {
appbase::scoped_app app;
auto app_thread = start_app_thread(app);

scoped_app_thread app;

// set to run functions from read_only & read_exclusive queues only
app->executor().init_read_threads(1);
app->executor().set_to_read_window([](){return false;});
Expand All @@ -164,7 +202,7 @@ BOOST_AUTO_TEST_CASE( execute_from_empty_read_only_queue ) {
BOOST_REQUIRE_EQUAL( app->executor().read_write_queue_size(), 10u );
app->quit();
} );
app_thread.join();
app.join();

// all queues are cleared when exiting application::exec()
BOOST_REQUIRE_EQUAL( app->executor().read_only_queue_empty(), true);
Expand All @@ -177,8 +215,7 @@ BOOST_AUTO_TEST_CASE( execute_from_empty_read_only_queue ) {

// verify functions from both queues (read_only, read_write) are processed in write window, but not read_exclusive
BOOST_AUTO_TEST_CASE( execute_from_read_only_and_read_write_queues ) {
appbase::scoped_app app;
auto app_thread = start_app_thread(app);
scoped_app_thread app;

// set to run functions from both queues
app->executor().is_write_window();
Expand All @@ -198,20 +235,17 @@ BOOST_AUTO_TEST_CASE( execute_from_read_only_and_read_write_queues ) {
app->executor().post( priority::lowest, exec_queue::read_only, [&]() { rslts[9]=seq_num; ++seq_num; } );
app->executor().post( priority::low, exec_queue::read_write, [&]() { rslts[10]=seq_num; ++seq_num; } );
app->executor().post( priority::medium, exec_queue::read_write, [&]() { rslts[11]=seq_num; ++seq_num; } );
app->executor().post( priority::highest,exec_queue::read_exclusive, [&]() { rslts[12]=seq_num; ++seq_num; } );
app->executor().post( priority::lowest, exec_queue::read_exclusive, [&]() { rslts[13]=seq_num; ++seq_num; } );
app->executor().post( priority::medium, exec_queue::read_exclusive, [&]() { rslts[14]=seq_num; ++seq_num; } );

// stop application. Use lowest at the end to make sure this executes the last
app->executor().post( priority::lowest, exec_queue::read_only, [&]() {
// read_queue should have current function and write_queue's functions are all executed
BOOST_REQUIRE_EQUAL( app->executor().read_only_queue_size(), 1u); // pop()s after execute
BOOST_REQUIRE_EQUAL( app->executor().read_exclusive_queue_size(), 3u);
BOOST_REQUIRE_EQUAL( app->executor().read_exclusive_queue_size(), 0u);
BOOST_REQUIRE_EQUAL( app->executor().read_write_queue_size(), 0u );
app->quit();
} );

app_thread.join();
app.join();

// queues are emptied after exec
BOOST_REQUIRE_EQUAL( app->executor().read_only_queue_empty(), true);
Expand Down Expand Up @@ -246,7 +280,7 @@ BOOST_AUTO_TEST_CASE( execute_from_read_only_and_read_write_queues ) {

// verify tasks from both queues (read_only, read_exclusive) are processed in read window
BOOST_AUTO_TEST_CASE( execute_from_read_only_and_read_exclusive_queues ) {
appbase::scoped_app app;
scoped_app_thread app(true);

app->executor().init_read_threads(3);
// set to run functions from read_only & read_exclusive queues only
Expand Down Expand Up @@ -290,7 +324,7 @@ BOOST_AUTO_TEST_CASE( execute_from_read_only_and_read_exclusive_queues ) {
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}

auto app_thread = start_app_thread(app);
app.start_exec();
constexpr int num_expected = 13; // 16 - 3 read_write

auto read_thread1 = start_read_thread(app);
Expand All @@ -308,7 +342,7 @@ BOOST_AUTO_TEST_CASE( execute_from_read_only_and_read_exclusive_queues ) {
};
work.reset();
app->quit();
app_thread.join();
app.join();

// queues are emptied after exec
BOOST_REQUIRE_EQUAL( app->executor().read_only_queue_empty(), true);
Expand Down Expand Up @@ -341,9 +375,7 @@ BOOST_AUTO_TEST_CASE( execute_from_read_only_and_read_exclusive_queues ) {

// verify tasks from both queues (read_only, read_exclusive) are processed in read window
BOOST_AUTO_TEST_CASE( execute_many_from_read_only_and_read_exclusive_queues ) {
appbase::scoped_app app;

auto app_thread = start_app_thread(app);
scoped_app_thread app;

// set to run functions from read_only & read_exclusive queues only
app->executor().init_read_threads(3);
Expand Down Expand Up @@ -400,7 +432,7 @@ BOOST_AUTO_TEST_CASE( execute_many_from_read_only_and_read_exclusive_queues ) {
};

app->quit();
app_thread.join();
app.join();

// exactly number of posts processed
BOOST_REQUIRE_EQUAL( std::count_if(rslts.cbegin(), rslts.cend(), [](const auto& v){ return v != std::thread::id(); }), num_expected );
Expand Down
1 change: 0 additions & 1 deletion programs/nodeos/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,6 @@ int main(int argc, char** argv)
ilog("${name} using configuration file ${c}", ("name", nodeos::config::node_executable_name)("c", app->full_config_file_path().string()));
ilog("${name} data directory is ${d}", ("name", nodeos::config::node_executable_name)("d", app->data_dir().string()));
::detail::log_non_default_options(app->get_parsed_options());
app->executor().init_main_thread_id();
app->startup();
app->set_thread_priority_max();
app->exec();
Expand Down

0 comments on commit 2856e13

Please sign in to comment.