-
Notifications
You must be signed in to change notification settings - Fork 10
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add a bypass scheduler to improve mpi continuations #1195
base: main
Are you sure you want to change the base?
Conversation
Coverage summary from CodacySee diff coverage on Codacy
Coverage variation details
Coverage variation is the difference between the coverage for the head and common ancestor commits of the pull request branch: Diff coverage details
Diff coverage is the percentage of lines that are covered by tests out of the coverable lines that the pull request added or modified: See your quality gate settings Change summary preferencesCodacy stopped sending the deprecated coverage status on June 5th, 2024. Learn more |
77f0eb5
to
36fedf3
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a few initial comments. This adds a lot of complexity and I'll need a bit more time to look at this.
@@ -32,6 +32,8 @@ | |||
#include <type_traits> | |||
#include <utility> | |||
|
|||
#define PIKA_MPI_ENABLE_EARLY_POLL |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we make this a proper CMake configuration option, or a runtime configuration option?
handler_method h = get_handler_method(mode); | ||
/// these task modes always trigger continuations on a pika task and can be safely inlined | ||
return (h == handler_method::yield_while) || | ||
(h == handler_method::suspend_resume) | (h == handler_method::new_task); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(h == handler_method::suspend_resume) | (h == handler_method::new_task); | |
(h == handler_method::suspend_resume) || (h == handler_method::new_task); |
@@ -77,29 +79,47 @@ namespace pika::mpi::experimental { | |||
execution::thread_priority::boost : | |||
execution::thread_priority::normal; | |||
|
|||
auto dgb = []() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Short for degub
? Could you guard this with PIKA_DEBUG
, otherwise it'll be an unused variable in release builds (with corresponding warnings)? And perhaps give it a slightly longer name?
static print_threshold<Level, bplevel> bps_deb("SBYPASS"); | ||
} // namespace pika::debug::detail | ||
|
||
namespace pika { namespace execution { namespace experimental { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
namespace pika { namespace execution { namespace experimental { | |
namespace pika::execution::detail { |
if we're not intending to make this a user-facing scheduler (I think I'd prefer that anyway) in the short term?
// TODO: Can we simply dispatch to the default implementation? This is | ||
// disabled with the P2300 reference implementation because we don't | ||
// want to use implementation details of it. | ||
#if !defined(PIKA_HAVE_P2300_REFERENCE_IMPLEMENTATION) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was changed in March to be PIKA_HAVE_STDEXEC
.
throw std::runtime_error("Bypass scheduler - already on task thread"); | ||
return; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
throw std::runtime_error("Bypass scheduler - already on task thread"); | |
return; | |
PIKA_THROW_EXCEPTION(pika::error::invalid_status, "thread_pool_scheduler_queue_bypass::execute", "Already on a pika task"); | |
return; |
(with appropriate line wrapping)
if (!threaddata->set_state_tagged(thread_schedule_state::active, task_state, | ||
active_state, std::memory_order_relaxed)) | ||
{ | ||
throw std::runtime_error("Thread state cannot fail here"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
throw std::runtime_error("Thread state cannot fail here"); | |
PIKA_THROW_EXCEPTION(pika::error::no_success, "thread_pool_scheduler_queue_bypass::execute", "Setting thread state failed"); |
thread_state(task_return.first, task_state.state_ex(), task_state.tag() + 1); | ||
|
||
// if the threaddata->state still matches active_state update to new state | ||
// (could be stolen/changed by another thread here?) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could it? I don't think it could be stolen if it's not in any queue...
case thread_schedule_state::pending: | ||
{ | ||
data.scheduler_base->schedule_thread(id, thread_schedule_hint(thread_num), | ||
false /*allow_fallback*/, execution::thread_priority::high); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why should this be high priority?
// thread sits in thread map until resumed | ||
return; | ||
} | ||
default: throw std::runtime_error("fix this thread state type"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you update all of the plain throws to use PIKA_THROW_EXCEPTION
with some appropriate error (no_success
if nothing else).
break; | ||
case thread_schedule_state::suspended: | ||
{ | ||
std::cout << "thread_schedule_state::suspended" << std::endl; // |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please remove or replace with our logging.
thread_description desc( | ||
f, (fallback_annotation != nullptr) ? fallback_annotation : "Bypass"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the fallback annotation should be non-null. Maybe replace with an assert and
thread_description desc( | |
f, (fallback_annotation != nullptr) ? fallback_annotation : "Bypass"); | |
thread_description desc(f, fallback_annotation); |
?
namespace pika::threads::detail { | ||
class thread_data; // forward declaration only | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we do a forward declaration here instead of including the header?
@@ -56,7 +56,7 @@ | |||
namespace pika::debug::detail { | |||
// a debug level of zero disables messages with a level>0 | |||
// a debug level of N shows messages with level 1..N | |||
constexpr int debug_level = 0; | |||
constexpr int debug_level = 9; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be disabled?
namespace pika::debug::detail { | ||
// a debug level of zero disables messages with a level>0 | ||
// a debug level of N shows messages with level 1..N | ||
constexpr int bplevel = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
constexpr int bplevel = 0; | |
inline constexpr int bplevel = 0; |
If pika tries to use completion modes and the user did not setup a pool, the default pool must be returned to prevent accessing one that has not been created.
Ensure polling is started and handle MPI errors more generically Remove return type from mpi transform Changes to mpi internals broke the RAII polling setup.
To test continuations, we need to add 16 to the numeric mpi modes so detecting the presence at configure time is necessary
mpi transform tests using set_error are triggering stack corruption errors that might or might not be real, but disabling them (for now) will clean up the dashboard
Co-authored-by: Mikael Simberg <[email protected]>
The existing mode uses suspend/resume which can be problematic if threads suspend and block, but in principle we should directly execute continuations as function calls which this new mode does
Callbacks to sender set_value/error should not be triggered directly on a polling thread as continuations will be called on the polling thread. Instead the execution should be tranferred to a pika thread/task. Adding a transfer adaptor costs task creation, plus enqueuing/dequeuing of the task, so the bypass scheduler supports inline execution, but with the creation of a task wrapper around the callback so that it is safe to suspend/resume etc and no raw scheduler/polling threads are running user code.
Bypass scheduler must never be run on top of an existing pika task, otherwise coroutine self pointer is clobberred (thread local). Rearrange transfers in transform_mpi to better route inline continuations through the bypass scheduler and skip it when not needed
We do not need the RAII state restorer object as our thread creation is always on the executing thread and certain race conditions are not valid Improve comments and remove unsupported API features
The bypass needs to be added to the cuda polling, but this termporary fix inserts and extra bypass before issuing mpi requests if the thread id is invalid. Also, if the bypass scheduler is invoked on a thread that is already a task, it skips task creation and becomes a simple pass through execution.
@biddisco what's the plan for this PR? |
@biddisco what's the status of this PR? Should we keep it open or close it? |
I would like this to go through testing ASAP, so I'm pushing it, but it can be can be considered a draft PR for now
Relies on #1151 and #1180
Here, we add a stdexec style scheduler that executes a task/function - it behaves like the main pika scheduling loop - by takinf the function, wrapping it in a task, then switching context to it immediately, so that if the task later tries to suspend, then there are no problems and the task goes ontoo the queues as usual.
It is an error to call this on a pika thread and is only supported by certain modes of the mpi continuations