Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a bypass scheduler to improve mpi continuations #1195

Draft
wants to merge 40 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
1092ab3
Remove accidentally duplicated init of mpi
biddisco Jun 18, 2024
c617fdd
Fix bug fetching non existant mpi pool
biddisco Jun 17, 2024
a4c4202
Ensure completion mode commmand line and ENV var defaults match
biddisco Jun 17, 2024
a8246e2
Fix mpi algorithm test
biddisco Jun 17, 2024
7f86541
Fix inshpect violation
biddisco Jun 19, 2024
e4aecd2
Fix a warning (that is not valid, but annoying)
biddisco Jun 19, 2024
fab14c9
Remove debug message left in by mistake
biddisco Jun 19, 2024
7adc4f6
Stop command line completion mode from clobbering $ENV var if set
biddisco Jun 19, 2024
c907efb
Fix review comments
biddisco Jun 19, 2024
980b5b4
Get fallback MPI completion mode from runtime configuration
msimberg Jul 24, 2024
27e5ba7
Fix review comments: remove incorrect doc/comments
biddisco Jul 17, 2024
f3a3b07
Fix (intentional) error msg detection in mpi algorithm test
biddisco Jul 25, 2024
058e27d
Fix pika::util::detail::function lookup fail
biddisco Jul 25, 2024
eed85ed
Detect MPIx continuations at configure time to improve testing
biddisco Jul 16, 2024
2148ea8
Fix copyright header on new material
biddisco Jul 25, 2024
a58f844
use try_compile (instead of try run) test for mpi continuations detec…
biddisco Jul 26, 2024
b7b61f4
Move cmake tests/check for openmpi to cmake folder
biddisco Jul 26, 2024
ac45f96
Disable sanitizer checks for troublesome error handler tests
biddisco Jul 29, 2024
2128b42
add repeat-until-fail to tests to better check mpi testing branch
biddisco Jul 26, 2024
f9b583c
Update cmake/tests/check_openmpi_continuations.cpp
biddisco Jul 29, 2024
14024b0
clang-format cleanup
biddisco Jul 31, 2024
e8d51df
Use MPIX_CONT_POLL_ONLY in mpi continuation polling to better control…
biddisco Jul 31, 2024
bfc0a4e
Simplify the exception error msg generation (asan flagging problems)
biddisco Aug 2, 2024
1740ade
make sure transfer is always after trigger to ensure correct behaviour
biddisco Aug 2, 2024
c398315
Add new MPIX continuation mode that executes continuations directly
biddisco Aug 2, 2024
6941f7a
Remove old MPIx suspend mode and use continuation mode by default
biddisco Aug 5, 2024
6a01cbc
Cleanup namespace "detail" usage (remove unnecessary uses)
biddisco Aug 2, 2024
d08e547
Add a new bypass scheduler to execute tasks inliine on a pika thread
biddisco Oct 17, 2022
9f0d830
Fix a thread create error - do not schedule was being ignored
biddisco Jun 25, 2024
85cd74e
Adjust transform_mpi to use bypass scheduler and fix some wrinkles
biddisco Jun 25, 2024
fad487d
Clean up bypass scheduler, remove debug, extraneous code
biddisco Jun 26, 2024
548f2d5
Adding more (shared scheduler) debug info - especially threadinfo
biddisco Jun 26, 2024
2fa4aed
Fix inshpect/spelling violations
biddisco Jun 30, 2024
53b29da
Disable debug messages
biddisco Jun 30, 2024
1cb5a17
Fix CMakeLists header check violation
biddisco Jul 1, 2024
5f581dc
Hide dbg() lambda inside PIKA_DEBUG ifdef
biddisco Jul 31, 2024
cc1941d
Use thread id check to decide if bypass scheduler is needed
biddisco Jul 31, 2024
fcbf6bb
Fix a bad error code use
biddisco Jul 31, 2024
de64414
Fix some bypass scheduler issues when using dla-future
biddisco Aug 6, 2024
7867f22
Remove obsolete cout message
biddisco Aug 7, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/linux_asan_ubsan_lsan.yml
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ jobs:
export UBSAN_OPTIONS=print_stacktrace=1:suppressions=$PWD/tools/ubsan.supp
cd build
ctest \
--repeat-until-fail 7 \
--timeout 120 \
--output-on-failure \
-E "$(${GITHUB_WORKSPACE}/.github/blacklist_to_ctest_regex.sh ${GITHUB_WORKSPACE}/.github/workflows/linux_asan_ubsan_lsan_blacklist.txt)"
Expand All @@ -98,6 +99,7 @@ jobs:
export UBSAN_OPTIONS=print_stacktrace=1:suppressions=$PWD/tools/ubsan.supp
cd build
ctest \
--repeat-until-fail 7 \
--timeout 120 \
--output-on-failure \
-R "$(${GITHUB_WORKSPACE}/.github/blacklist_to_ctest_regex.sh ${GITHUB_WORKSPACE}/.github/workflows/linux_asan_ubsan_lsan_blacklist.txt)"
9 changes: 9 additions & 0 deletions cmake/pika_add_config_test.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -481,3 +481,12 @@ function(pika_check_for_stdexec_sender_receiver_concepts)
FILE ${ARGN}
)
endfunction()

# ##################################################################################################
function(pika_check_for_mpix_continuations)
pika_add_config_test(
PIKA_WITH_MPIX_CONTINUATIONS
SOURCE cmake/tests/check_openmpi_continuations.cpp
FILE ${ARGN}
)
endfunction()
16 changes: 16 additions & 0 deletions cmake/tests/check_openmpi_continuations.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
// Copyright (c) 2021 ETH Zurich
//
// SPDX-License-Identifier: BSL-1.0
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)

// simple compile test to see if mpix continuations are present

#include <mpi-ext.h>
#include <mpi.h>

#if !defined(OMPI_HAVE_MPI_EXT_CONTINUE)
static_assert(false);
#endif

int main() {}
11 changes: 10 additions & 1 deletion libs/pika/async_mpi/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2019-2020 The STE||AR-Group
# Copyright (c) 2021 ETH Zurich
#
# SPDX-License-Identifier: BSL-1.0
# Distributed under the Boost Software License, Version 1.0. (See accompanying
Expand All @@ -11,6 +11,15 @@ if(NOT ${PIKA_WITH_MPI})
return()
endif()

pika_check_for_mpix_continuations(PIKA_WITH_MPIX_CONTINUATIONS)
if(PIKA_WITH_MPIX_CONTINUATIONS)
message(STATUS "MPIx Continuations detected")
set(PIKA_MPI_MODES_LOOP_COUNT 79)
else()
message(STATUS "MPIx Continuations not detected")
set(PIKA_MPI_MODES_LOOP_COUNT 63)
endif()

# Default location is $PIKA_ROOT/libs/mpi/include
set(async_mpi_headers
pika/async_mpi/mpi_exception.hpp pika/async_mpi/mpi_helpers.hpp pika/async_mpi/mpi_polling.hpp
Expand Down
11 changes: 7 additions & 4 deletions libs/pika/async_mpi/include/pika/async_mpi/dispatch_mpi.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,10 @@ namespace pika::mpi::experimental::detail {
friend constexpr void
tag_invoke(ex::set_value_t, dispatch_mpi_receiver r, Ts&&... ts) noexcept
{
// init a request
MPI_Request request{MPI_REQUEST_NULL};
int status = MPI_SUCCESS;

pika::detail::try_catch_exception_ptr(
[&]() mutable {
using invoke_result_type = mpi_request_invoke_result_t<F, Ts...>;
Expand All @@ -124,9 +128,6 @@ namespace pika::mpi::experimental::detail {
#ifdef PIKA_HAVE_APEX
apex::scoped_timer apex_post("pika::mpi::post");
#endif
// init a request
MPI_Request request;
int status = MPI_SUCCESS;
// execute the mpi function call, passing in the request object
if constexpr (std::is_void_v<invoke_result_type>)
{
Expand Down Expand Up @@ -166,6 +167,8 @@ namespace pika::mpi::experimental::detail {
else { ex::set_value(PIKA_MOVE(r.op_state.receiver), request); }
},
[&](std::exception_ptr ep) {
PIKA_DETAIL_DP(mpi_tran<7>,
error(str<>("dispatch_mpi_recv"), "exception", ptr(request)));
ex::set_error(PIKA_MOVE(r.op_state.receiver), PIKA_MOVE(ep));
});
}
Expand Down Expand Up @@ -193,7 +196,7 @@ namespace pika::mpi::experimental::detail {
, f(PIKA_FORWARD(F_, f))
, op_state(ex::connect(PIKA_FORWARD(Sender_, sender), dispatch_mpi_receiver{*this}))
{
PIKA_DETAIL_DP(mpi_tran<5>, debug(str<>("operation_state")));
PIKA_DETAIL_DP(mpi_tran<5>, debug(str<>("dispatch_mpi_sender"), "operation_state"));
}

friend constexpr auto tag_invoke(ex::start_t, operation_state& os) noexcept
Expand Down
38 changes: 25 additions & 13 deletions libs/pika/async_mpi/include/pika/async_mpi/mpi_helpers.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include <pika/functional/detail/tag_fallback_invoke.hpp>
#include <pika/functional/invoke.hpp>
#include <pika/mpi_base/mpi.hpp>
#include <pika/runtime/runtime.hpp>

#include <exception>
#include <type_traits>
Expand Down Expand Up @@ -54,24 +55,22 @@ namespace pika::mpi::experimental::detail {
std::invoke_result_t<F, std::add_lvalue_reference_t<std::decay_t<Ts>>..., MPI_Request*>>;

// -----------------------------------------------------------------
// return a scheduler on the mpi pool, with or without stack
inline auto mpi_pool_scheduler(execution::thread_priority p, bool stack = true)
// return a scheduler on the default pool with added priority if requested
inline auto default_pool_scheduler(execution::thread_priority p)
{
ex::thread_pool_scheduler sched{&resource::get_thread_pool(get_pool_name())};
if (!stack)
{
sched = ex::with_stacksize(std::move(sched), execution::thread_stacksize::nostack);
}
sched = ex::with_priority(std::move(sched), p);
return sched;
return ex::with_priority(
ex::thread_pool_scheduler{
&pika::detail::get_runtime_ptr()->get_thread_manager().default_pool()},
p);
}

// -----------------------------------------------------------------
// return a scheduler on the default pool with added priority if requested
inline auto default_pool_scheduler(execution::thread_priority p)
// return a scheduler on the mpi pool
inline auto mpi_pool_scheduler(execution::thread_priority p)
{
if (!pool_exists()) return default_pool_scheduler(p);
return ex::with_priority(
ex::thread_pool_scheduler{&resource::get_thread_pool("default")}, p);
ex::thread_pool_scheduler{&resource::get_thread_pool(get_pool_name())}, p);
}

// -----------------------------------------------------------------
Expand Down Expand Up @@ -165,7 +164,7 @@ namespace pika::mpi::experimental::detail {
// handler_method::mpix_continuation - signature is
/// typedef int (MPIX_Continue_cb_function)(int rc, void *cb_data);
template <typename OperationState>
int mpix_callback([[maybe_unused]] int rc, void* cb_data)
int mpix_callback_resume([[maybe_unused]] int rc, void* cb_data)
{
PIKA_DETAIL_DP(mpi_tran<1>, debug(str<>("MPIX"), "callback triggered"));
auto& op_state = *static_cast<OperationState*>(cb_data);
Expand All @@ -180,4 +179,17 @@ namespace pika::mpi::experimental::detail {
return MPI_SUCCESS;
}

// -----------------------------------------------------------------
// handler_method::mpix_continuation2 - signature is
/// typedef int (MPIX_Continue_cb_function)(int rc, void *cb_data);
template <typename OperationState>
int mpix_callback_continuation([[maybe_unused]] int rc, void* cb_data)
{
PIKA_DETAIL_DP(mpi_tran<1>, debug(str<>("MPIX"), "callback triggered"));
auto& op_state = *static_cast<OperationState*>(cb_data);
set_value_error_helper(op_state.status, PIKA_MOVE(op_state.receiver));
// tell mpix that we handled it ok, error is passed into set_error in mpi_trigger
return MPI_SUCCESS;
}

} // namespace pika::mpi::experimental::detail
39 changes: 24 additions & 15 deletions libs/pika/async_mpi/include/pika/async_mpi/mpi_polling.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,25 +72,25 @@ namespace pika::mpi::experimental {
enum class handler_method : std::uint32_t
{
/// enable the use of a dedicated pool for polling mpi messages.
use_pool = 0b0000'0001, // 1
use_pool = 0b0000'0001, // 01

/// this bit enables the inline invocation of the mpi request, when set
/// the calling thread performs the mpi operation, when unset, a transfer
/// is made so that the invocation happens on a new task that
/// would normally be on a dedicated pool if/when it exists
request_inline = 0b0000'0010, // 2
request_inline = 0b0000'0010, // 02

/// this bit enables the inline execution of the completion handler for the
/// request, when unset a transfer is made to move the completion handler
/// from the polling thread onto a new one
completion_inline = 0b0000'0100, // 4
completion_inline = 0b0000'0100, // 04

/// this bit enables the use of a high priority task flag
/// 1) requests are boosted to high priority if they are passed the the mpi-pool
/// to ensure they execute before other polling tasks (reduce latency)
/// 2) completions are boosted to high priority when sent to the main thread pool
/// so that the continuation is executed as quickly as possible
high_priority = 0b0000'1000, // 8
high_priority = 0b0000'1000, // 08

/// 3 bits control the handler method,
method_mask = 0b0111'0000, // 70
Expand All @@ -112,44 +112,52 @@ namespace pika::mpi::experimental {
///
/// * unspecified : reserved for development purposes or for customization by an
/// application using pika
yield_while = 0b0000'0000, // 0x00, 00 ... 15
suspend_resume = 0b0001'0000, // 0x10, 16 ... 31
new_task = 0b0010'0000, // 0x20, 32 ... 47
continuation = 0b0011'0000, // 0x30, 48 ... 63
mpix_continuation = 0b0100'0000, // 0x40, 64 ... 79
unspecified = 0b0101'0000, // 0x50, 80 ...
yield_while = 0b0000'0000, // 0x00, 00 ... 15
suspend_resume = 0b0001'0000, // 0x10, 16 ... 31
new_task = 0b0010'0000, // 0x20, 32 ... 47
continuation = 0b0011'0000, // 0x30, 48 ... 63
mpix_continuation = 0b0100'0000, // 0x40, 64 ... 79
unspecified = mpix_continuation + 16, // 0x50, ...

/// Default flags are to invoke inline, but transfer completion using a dedicated pool
default_mode = use_pool | request_inline | high_priority | new_task,
};

/// 3 bits define continuation mode
inline handler_method get_handler_method(std::underlying_type_t<handler_method> flags)
inline handler_method get_handler_method(const std::underlying_type_t<handler_method> flags)
{
return static_cast<handler_method>(
flags & pika::detail::to_underlying(handler_method::method_mask));
}

/// 1 bit defines high priority boost mode for pool transfers
inline bool use_priority_boost(int mode)
inline bool use_priority_boost(const int mode)
{
return (mode & pika::detail::to_underlying(handler_method::high_priority)) != 0;
}
/// 1 bit defines inline or transfer completion
inline bool use_inline_completion(int mode)
inline bool use_inline_completion(const int mode)
{
return (mode & pika::detail::to_underlying(handler_method::completion_inline)) != 0;
}
/// 1 bit defines inline or transfer mpi invocation
inline bool use_inline_request(int mode)
inline bool use_inline_request(const int mode)
{
return (mode & pika::detail::to_underlying(handler_method::request_inline)) != 0;
}
/// 1 bit defines whether we use a pool or not
inline bool use_pool(int mode)
inline bool use_pool(const int mode)
{
return (mode & pika::detail::to_underlying(handler_method::use_pool)) != 0;
}
/// Convenience fn to test if mode supports inline continuation
inline bool inline_ready(const int mode)
{
handler_method h = get_handler_method(mode);
/// these task modes always trigger continuations on a pika task and can be safely inlined
return (h == handler_method::yield_while) ||
(h == handler_method::suspend_resume) | (h == handler_method::new_task);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
(h == handler_method::suspend_resume) | (h == handler_method::new_task);
(h == handler_method::suspend_resume) || (h == handler_method::new_task);

}

/// used for debugging to show mode type in messages, should be removed
inline const char* mode_string(int flags)
Expand Down Expand Up @@ -224,6 +232,7 @@ namespace pika::mpi::experimental {
: pool_name_(pool_name)
{
mpi::experimental::init(false, init_errorhandler);
mpi::experimental::register_polling();
}

~enable_user_polling() { mpi::experimental::finalize(pool_name_); }
Expand Down
56 changes: 47 additions & 9 deletions libs/pika/async_mpi/include/pika/async_mpi/transform_mpi.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <pika/execution_base/any_sender.hpp>
#include <pika/execution_base/receiver.hpp>
#include <pika/execution_base/sender.hpp>
#include <pika/executors/thread_pool_scheduler_queue_bypass.hpp>
#include <pika/functional/detail/tag_fallback_invoke.hpp>
#include <pika/functional/invoke.hpp>
#include <pika/mpi_base/mpi.hpp>
Expand Down Expand Up @@ -50,14 +51,15 @@ namespace pika::mpi::experimental {
using execution::thread_priority;
using pika::execution::experimental::just;
using pika::execution::experimental::let_value;
using pika::execution::experimental::thread_pool_scheduler_queue_bypass;
using pika::execution::experimental::transfer;
using pika::execution::experimental::transfer_just;
using pika::execution::experimental::unique_any_sender;

// get mpi completion mode settings
auto mode = get_completion_mode();
bool inline_com = use_inline_completion(mode);
bool inline_req = use_inline_request(mode);
bool completions_inline = use_inline_completion(mode);
bool requests_inline = use_inline_request(mode);

#ifdef PIKA_DEBUG
// ----------------------------------------------------------
Expand All @@ -77,29 +79,65 @@ namespace pika::mpi::experimental {
execution::thread_priority::boost :
execution::thread_priority::normal;

#ifdef PIKA_DEBUG
auto dgb = []() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Short for degub? Could you guard this with PIKA_DEBUG, otherwise it'll be an unused variable in release builds (with corresponding warnings)? And perhaps give it a slightly longer name?

PIKA_DETAIL_DP(mpi_tran<1>, debug(str<>("transform_mpi"), "complete"));
};
#endif
auto completion_snd = [=](MPI_Request request) -> unique_any_sender<> {
if (!inline_com)
if (!completions_inline) // not inline : a transfer is required
{
if (request == MPI_REQUEST_NULL)
{
return transfer_just(default_pool_scheduler(p));
}
return transfer_just(default_pool_scheduler(p), request) | trigger_mpi(mode);
return just(request) | trigger_mpi(mode) | transfer(default_pool_scheduler(p));
}
if (threads::detail::get_self_id() == threads::detail::invalid_thread_id)
{
if (request == MPI_REQUEST_NULL)
{
return just() |
transfer(ex::with_annotation(
ex::thread_pool_scheduler_queue_bypass{}, "transform_mpi"));
}
return just(request) | trigger_mpi(mode) |
transfer(ex::with_annotation(
ex::thread_pool_scheduler_queue_bypass{}, "transform_mpi"));
}
else
{
if (request == MPI_REQUEST_NULL) { return just(); }
return just(request) | trigger_mpi(mode);
}
if (request == MPI_REQUEST_NULL) { return just(); }
return just(request) | trigger_mpi(mode);
};

if (inline_req)
if (requests_inline)
{
return dispatch_mpi_sender<Sender, F>{PIKA_MOVE(sender), PIKA_FORWARD(F, f)} |
if (threads::detail::get_self_id() == threads::detail::invalid_thread_id)
{
auto snd0 = PIKA_FORWARD(Sender, sender) |
transfer(ex::with_annotation(
ex::thread_pool_scheduler_queue_bypass{}, "transform_mpi"));
return dispatch_mpi(PIKA_MOVE(snd0), PIKA_FORWARD(F, f)) |
let_value(completion_snd);
}
return dispatch_mpi(PIKA_MOVE(sender), PIKA_FORWARD(F, f)) |
#ifdef PIKA_DEBUG
let_value(completion_snd) | ex::then(dgb);
#else
let_value(completion_snd);
#endif
}
else
{
auto snd0 = PIKA_FORWARD(Sender, sender) | transfer(mpi_pool_scheduler(p));
return dispatch_mpi_sender<decltype(snd0), F>{PIKA_MOVE(snd0), PIKA_FORWARD(F, f)} |
return dispatch_mpi(PIKA_MOVE(snd0), PIKA_FORWARD(F, f)) |
#ifdef PIKA_DEBUG
let_value(completion_snd) | ex::then(dgb);
#else
let_value(completion_snd);
#endif
}
}

Expand Down
Loading
Loading