diff --git a/include/dlaf/factorization/cholesky/impl.h b/include/dlaf/factorization/cholesky/impl.h index 5f26f0020e..88ca01327a 100644 --- a/include/dlaf/factorization/cholesky/impl.h +++ b/include/dlaf/factorization/cholesky/impl.h @@ -42,14 +42,22 @@ namespace dlaf::factorization::internal { +#ifdef PIKA_HAVE_APEX +#define ANNOTATE(NAME) (priority == pika::execution::thread_priority::high ? "HP_" #NAME : #NAME) +#else +#define ANNOTATE(name) nullptr +#endif + namespace cholesky_l { + template void potrfDiagTile(pika::execution::thread_priority priority, MatrixTileSender&& matrix_tile) { using pika::execution::thread_stacksize; pika::execution::experimental::start_detached( dlaf::internal::whenAllLift(blas::Uplo::Lower, std::forward(matrix_tile)) | - tile::potrf(dlaf::internal::Policy(priority, thread_stacksize::nostack))); + tile::potrf(dlaf::internal::Policy(priority, thread_stacksize::nostack, + ANNOTATE(potrfDiagTile)))); } template @@ -63,7 +71,8 @@ void trsmPanelTile(pika::execution::thread_priority priority, KKTileSender&& kk_ blas::Diag::NonUnit, ElementType(1.0), std::forward(kk_tile), std::forward(matrix_tile)) | - tile::trsm(dlaf::internal::Policy(priority, thread_stacksize::nostack))); + tile::trsm(dlaf::internal::Policy(priority, thread_stacksize::nostack, + ANNOTATE(trsmPanelTile)))); } template @@ -76,7 +85,8 @@ void herkTrailingDiagTile(pika::execution::thread_priority priority, PanelTileSe dlaf::internal::whenAllLift(blas::Uplo::Lower, blas::Op::NoTrans, BaseElementType(-1.0), std::forward(panel_tile), BaseElementType(1.0), std::forward(matrix_tile)) | - tile::herk(dlaf::internal::Policy(priority, thread_stacksize::nostack))); + tile::herk(dlaf::internal::Policy(priority, thread_stacksize::nostack, + ANNOTATE(herkTrailingDiagTile)))); } template @@ -90,7 +100,8 @@ void gemmTrailingMatrixTile(pika::execution::thread_priority priority, PanelTile std::forward(panel_tile), std::forward(col_panel), ElementType(1.0), std::forward(matrix_tile)) | - tile::gemm(dlaf::internal::Policy(priority, thread_stacksize::nostack))); + tile::gemm(dlaf::internal::Policy(priority, thread_stacksize::nostack, + ANNOTATE(gemmTrailingMatrixTile)))); } } @@ -101,7 +112,8 @@ void potrfDiagTile(pika::execution::thread_priority priority, MatrixTileSender&& pika::execution::experimental::start_detached( dlaf::internal::whenAllLift(blas::Uplo::Upper, std::forward(matrix_tile)) | - tile::potrf(dlaf::internal::Policy(priority, thread_stacksize::nostack))); + tile::potrf(dlaf::internal::Policy(priority, thread_stacksize::nostack, + ANNOTATE(potrfDiagTile)))); } template @@ -115,7 +127,8 @@ void trsmPanelTile(pika::execution::thread_priority priority, KKTileSender&& kk_ blas::Diag::NonUnit, ElementType(1.0), std::forward(kk_tile), std::forward(matrix_tile)) | - tile::trsm(dlaf::internal::Policy(priority, thread_stacksize::nostack))); + tile::trsm(dlaf::internal::Policy(priority, thread_stacksize::nostack, + ANNOTATE(trsmPanelTile)))); } template @@ -128,7 +141,8 @@ void herkTrailingDiagTile(pika::execution::thread_priority priority, PanelTileSe dlaf::internal::whenAllLift(blas::Uplo::Upper, blas::Op::ConjTrans, base_element_type(-1.0), std::forward(panel_tile), base_element_type(1.0), std::forward(matrix_tile)) | - tile::herk(dlaf::internal::Policy(priority, thread_stacksize::nostack))); + tile::herk(dlaf::internal::Policy(priority, thread_stacksize::nostack, + ANNOTATE(herkTrailingDiagTile)))); } template @@ -142,7 +156,8 @@ void gemmTrailingMatrixTile(pika::execution::thread_priority priority, PanelTile std::forward(panel_tile), std::forward(col_panel), ElementType(1.0), std::forward(matrix_tile)) | - tile::gemm(dlaf::internal::Policy(priority, thread_stacksize::nostack))); + tile::gemm(dlaf::internal::Policy(priority, thread_stacksize::nostack, + ANNOTATE(gemmTrailingMatrixTile)))); } } diff --git a/include/dlaf/init.h b/include/dlaf/init.h index 0e50ad7fec..a79b255167 100644 --- a/include/dlaf/init.h +++ b/include/dlaf/init.h @@ -41,7 +41,6 @@ struct configuration { std::size_t num_gpu_lapack_handles = 16; std::size_t umpire_host_memory_pool_initial_bytes = 1 << 30; std::size_t umpire_device_memory_pool_initial_bytes = 1 << 30; - std::string mpi_pool = "mpi"; }; std::ostream& operator<<(std::ostream& os, const configuration& cfg); @@ -98,10 +97,4 @@ struct [[nodiscard]] ScopedInitializer { ScopedInitializer& operator=(ScopedInitializer&&) = delete; ScopedInitializer& operator=(const ScopedInitializer&) = delete; }; - -/// Initialize the MPI pool. -/// -/// -void initResourcePartitionerHandler(pika::resource::partitioner& rp, - const pika::program_options::variables_map& vm); } diff --git a/include/dlaf/schedulers.h b/include/dlaf/schedulers.h index 1b6aae04fa..34cf6850fb 100644 --- a/include/dlaf/schedulers.h +++ b/include/dlaf/schedulers.h @@ -12,6 +12,7 @@ /// @file #include +#include #include #include @@ -50,6 +51,6 @@ auto getBackendScheduler( inline auto getMPIScheduler() { return pika::execution::experimental::thread_pool_scheduler{ - &pika::resource::get_thread_pool(getConfiguration().mpi_pool)}; + &pika::resource::get_thread_pool(pika::mpi::experimental::get_pool_name())}; } } // namespace dlaf::internal diff --git a/include/dlaf/sender/policy.h b/include/dlaf/sender/policy.h index 5e8617e9ea..794bb048ff 100644 --- a/include/dlaf/sender/policy.h +++ b/include/dlaf/sender/policy.h @@ -25,14 +25,17 @@ class Policy { private: const pika::execution::thread_priority priority_ = pika::execution::thread_priority::normal; const pika::execution::thread_stacksize stacksize_ = pika::execution::thread_stacksize::default_; + const char* annotation_ = nullptr; public: Policy() = default; explicit Policy( pika::execution::thread_priority priority, - pika::execution::thread_stacksize stacksize = pika::execution::thread_stacksize::default_) - : priority_(priority), stacksize_(stacksize) {} - explicit Policy(pika::execution::thread_stacksize stacksize) : stacksize_(stacksize) {} + pika::execution::thread_stacksize stacksize = pika::execution::thread_stacksize::default_, + const char* annotation = nullptr) + : priority_(priority), stacksize_(stacksize), annotation_(annotation) {} + explicit Policy(pika::execution::thread_stacksize stacksize, const char* annotation = nullptr) + : stacksize_(stacksize), annotation_(annotation) {} Policy(Policy&&) = default; Policy(const Policy&) = default; Policy& operator=(Policy&&) = default; @@ -45,6 +48,10 @@ class Policy { pika::execution::thread_stacksize stacksize() const noexcept { return stacksize_; } + + const char* annotation() const noexcept { + return annotation_; + } }; } } diff --git a/include/dlaf/sender/transform.h b/include/dlaf/sender/transform.h index 78b66eb001..712edd4170 100644 --- a/include/dlaf/sender/transform.h +++ b/include/dlaf/sender/transform.h @@ -56,14 +56,19 @@ template (policy.priority(), policy.stacksize()); - auto transfer_sender = transfer(std::forward(sender), std::move(scheduler)); using dlaf::common::internal::ConsumeRvalues; using dlaf::common::internal::Unwrapping; if constexpr (B == Backend::MC) { + if (policy.annotation()) { + scheduler = with_annotation(scheduler, policy.annotation()); + } + auto transfer_sender = transfer(std::forward(sender), std::move(scheduler)); + return then(std::move(transfer_sender), ConsumeRvalues{Unwrapping{std::forward(f)}}) | drop_operation_state(); } @@ -73,6 +78,10 @@ template (sender), std::move(scheduler)); if constexpr (Tag == TransformDispatchType::Plain) { return then_with_stream(std::move(transfer_sender), ConsumeRvalues{Unwrapping{std::forward(f)}}) | diff --git a/include/dlaf/sender/transform_mpi.h b/include/dlaf/sender/transform_mpi.h index edfbd7d419..b0c040c36f 100644 --- a/include/dlaf/sender/transform_mpi.h +++ b/include/dlaf/sender/transform_mpi.h @@ -12,6 +12,7 @@ #include #include +#include #include #include @@ -20,9 +21,18 @@ #include #include #include +// +#include +// +#ifdef EXTRA_MPI_TYPES_DEBUGGING +#include +#endif namespace dlaf::comm::internal { +template +static pika::debug::detail::print_threshold dla_debug("DLA_MPI"); + /// This helper "consumes" a CommunicatorPipelineExclusiveWrapper ensuring that after this call /// the one passed as argument gets destroyed. All other types left as they are /// by the second overload. @@ -45,17 +55,12 @@ void consumeCommunicatorWrapper(T&) {} /// least until version 12 fails with an internal compiler error with a trailing /// decltype for SFINAE. GCC has no problems with a lambda. template -struct MPICallHelper { +struct MPIYieldWhileCallHelper { std::decay_t f; template - auto operator()(Ts&&... ts) -> decltype(std::move(f)(dlaf::common::internal::unwrap(ts)..., - std::declval())) { + auto operator()(Ts&&... ts) { + namespace mpid = pika::mpi::experimental::detail; MPI_Request req; - auto is_request_completed = [&req] { - int flag; - MPI_Test(&req, &flag, MPI_STATUS_IGNORE); - return flag == 0; - }; // Note: // Callables passed to transformMPI have their arguments passed by reference, but doing so @@ -71,17 +76,41 @@ struct MPICallHelper { if constexpr (std::is_void_v) { std::move(f)(dlaf::common::internal::unwrap(ts)..., &req); (internal::consumeCommunicatorWrapper(ts), ...); - pika::util::yield_while(is_request_completed); + pika::util::yield_while([req]() { return !mpid::poll_request(req); }); + } + else { + /*auto r = */ std::move(f)(dlaf::common::internal::unwrap(ts)..., &req); + (internal::consumeCommunicatorWrapper(ts), ...); + pika::util::yield_while([req]() { return !mpid::poll_request(req); }); + } + } +}; + +/// Helper type for wrapping MPI calls. +template +struct MPICallHelper { + std::decay_t f; + + template + auto operator()(Ts&&... ts) -> decltype(std::move(f)(dlaf::common::internal::unwrap(ts)...)) { + using namespace pika::debug::detail; + PIKA_DETAIL_DP(dla_debug<5>, debug(str<>("MPICallHelper"), pika::debug::print_type(", "))); + using result_type = decltype(std::move(f)(dlaf::common::internal::unwrap(ts)...)); + if constexpr (std::is_void_v) { + std::move(f)(dlaf::common::internal::unwrap(ts)...); + (internal::consumeCommunicatorWrapper(ts), ...); } else { - auto r = std::move(f)(dlaf::common::internal::unwrap(ts)..., &req); + auto r = std::move(f)(dlaf::common::internal::unwrap(ts)...); (internal::consumeCommunicatorWrapper(ts), ...); - pika::util::yield_while(is_request_completed); return r; } } }; +template +MPIYieldWhileCallHelper(F&&) -> MPIYieldWhileCallHelper>; + template MPICallHelper(F&&) -> MPICallHelper>; @@ -90,10 +119,26 @@ template >> [[nodiscard]] decltype(auto) transformMPI(F&& f, Sender&& sender) { namespace ex = pika::execution::experimental; - - return ex::transfer(std::forward(sender), dlaf::internal::getMPIScheduler()) | - ex::then(dlaf::common::internal::ConsumeRvalues{MPICallHelper{std::forward(f)}}) | - ex::drop_operation_state(); + namespace mpi = pika::mpi::experimental; + namespace mpid = pika::mpi::experimental::detail; + +#ifdef EXTRA_MPI_TYPES_DEBUGGING + auto snd1 = + std::forward(sender) | + ex::let_value([=, f = std::move(f)](LArgs&&... largs) { + PIKA_DETAIL_DP(dla_debug<2>, debug(str<>("Args to MPI fn\n"), + pika::debug::print_type(", "), "\nValues\n")); + return ex::just(std::move(largs)...) | + mpi::transform_mpi(dlaf::common::internal::ConsumeRvalues{MPICallHelper{std::move(f)}}); + }); + return ex::make_unique_any_sender(std::move(snd1)); +#else + PIKA_DETAIL_DP(dla_debug<5>, debug(str<>("MPI fn\n"))); + auto snd1 = + std::forward(sender) | + mpi::transform_mpi(dlaf::common::internal::ConsumeRvalues{MPICallHelper{std::forward(f)}}); + return ex::make_unique_any_sender(std::move(snd1)); +#endif } /// Fire-and-forget transformMPI. This submits the work and returns void. @@ -148,29 +193,6 @@ class PartialTransformMPI : private PartialTransformMPIBase { template PartialTransformMPI(F&& f) -> PartialTransformMPI>; -/// A partially applied transformMPIDetach, with the callable object given, but -/// the predecessor sender missing. The predecessor sender is applied when -/// calling the operator| overload. -template -class PartialTransformMPIDetach : private PartialTransformMPIBase { -public: - template - PartialTransformMPIDetach(F_&& f) : PartialTransformMPIBase{std::forward(f)} {} - PartialTransformMPIDetach(PartialTransformMPIDetach&&) = default; - PartialTransformMPIDetach(const PartialTransformMPIDetach&) = default; - PartialTransformMPIDetach& operator=(PartialTransformMPIDetach&&) = default; - PartialTransformMPIDetach& operator=(const PartialTransformMPIDetach&) = default; - - template - friend auto operator|(Sender&& sender, PartialTransformMPIDetach pa) { - return pika::execution::experimental::start_detached(transformMPI(std::move(pa.f_), - std::forward(sender))); - } -}; - -template -PartialTransformMPIDetach(F&& f) -> PartialTransformMPIDetach>; - /// \overload transformMPI /// /// This overload partially applies the MPI transform for later use with @@ -180,12 +202,4 @@ template return PartialTransformMPI{std::forward(f)}; } -/// \overload transformMPIDetach -/// -/// This overload partially applies transformMPIDetach for later use with -/// operator| with a sender on the left-hand side. -template -[[nodiscard]] decltype(auto) transformMPIDetach(F&& f) { - return PartialTransformMPIDetach{std::forward(f)}; -} } diff --git a/miniapp/miniapp_band_to_tridiag.cpp b/miniapp/miniapp_band_to_tridiag.cpp index bace372c48..b1ced27259 100644 --- a/miniapp/miniapp_band_to_tridiag.cpp +++ b/miniapp/miniapp_band_to_tridiag.cpp @@ -209,6 +209,5 @@ int main(int argc, char** argv) { pika::init_params p; p.desc_cmdline = desc_commandline; - p.rp_callback = dlaf::initResourcePartitionerHandler; return pika::init(pika_main, argc, argv, p); } diff --git a/miniapp/miniapp_bt_band_to_tridiag.cpp b/miniapp/miniapp_bt_band_to_tridiag.cpp index 0b89769198..5a6f14b8c1 100644 --- a/miniapp/miniapp_bt_band_to_tridiag.cpp +++ b/miniapp/miniapp_bt_band_to_tridiag.cpp @@ -219,6 +219,5 @@ int main(int argc, char** argv) { pika::init_params p; p.desc_cmdline = desc_commandline; - p.rp_callback = dlaf::initResourcePartitionerHandler; return pika::init(pika_main, argc, argv, p); } diff --git a/miniapp/miniapp_bt_reduction_to_band.cpp b/miniapp/miniapp_bt_reduction_to_band.cpp index 8761bcd91c..6d10992788 100644 --- a/miniapp/miniapp_bt_reduction_to_band.cpp +++ b/miniapp/miniapp_bt_reduction_to_band.cpp @@ -238,6 +238,5 @@ int main(int argc, char** argv) { pika::init_params p; p.desc_cmdline = desc_commandline; - p.rp_callback = dlaf::initResourcePartitionerHandler; return pika::init(pika_main, argc, argv, p); } diff --git a/miniapp/miniapp_cholesky.cpp b/miniapp/miniapp_cholesky.cpp index 947835f955..23e7420f5e 100644 --- a/miniapp/miniapp_cholesky.cpp +++ b/miniapp/miniapp_cholesky.cpp @@ -228,7 +228,6 @@ int main(int argc, char** argv) { pika::init_params p; p.desc_cmdline = desc_commandline; - p.rp_callback = dlaf::initResourcePartitionerHandler; return pika::init(pika_main, argc, argv, p); } diff --git a/miniapp/miniapp_communication.cpp b/miniapp/miniapp_communication.cpp index dd147d6d26..21bfdb6570 100644 --- a/miniapp/miniapp_communication.cpp +++ b/miniapp/miniapp_communication.cpp @@ -606,6 +606,5 @@ int main(int argc, char** argv) { pika::init_params p; p.desc_cmdline = desc_commandline; - p.rp_callback = dlaf::initResourcePartitionerHandler; return pika::init(pika_main, argc, argv, p); } diff --git a/miniapp/miniapp_eigensolver.cpp b/miniapp/miniapp_eigensolver.cpp index 11abddf5d5..86fe40447e 100644 --- a/miniapp/miniapp_eigensolver.cpp +++ b/miniapp/miniapp_eigensolver.cpp @@ -257,7 +257,6 @@ int main(int argc, char** argv) { pika::init_params p; p.desc_cmdline = desc_commandline; - p.rp_callback = dlaf::initResourcePartitionerHandler; return pika::init(pika_main, argc, argv, p); } diff --git a/miniapp/miniapp_gen_eigensolver.cpp b/miniapp/miniapp_gen_eigensolver.cpp index fe27104fbf..2771b5d6d6 100644 --- a/miniapp/miniapp_gen_eigensolver.cpp +++ b/miniapp/miniapp_gen_eigensolver.cpp @@ -286,7 +286,6 @@ int main(int argc, char** argv) { pika::init_params p; p.desc_cmdline = desc_commandline; - p.rp_callback = dlaf::initResourcePartitionerHandler; return pika::init(pika_main, argc, argv, p); } diff --git a/miniapp/miniapp_gen_to_std.cpp b/miniapp/miniapp_gen_to_std.cpp index 8fa3a8c70d..de136168bb 100644 --- a/miniapp/miniapp_gen_to_std.cpp +++ b/miniapp/miniapp_gen_to_std.cpp @@ -216,6 +216,5 @@ int main(int argc, char** argv) { pika::init_params p; p.desc_cmdline = desc_commandline; - p.rp_callback = dlaf::initResourcePartitionerHandler; return pika::init(pika_main, argc, argv, p); } diff --git a/miniapp/miniapp_reduction_to_band.cpp b/miniapp/miniapp_reduction_to_band.cpp index 299af230b0..3b6e92e088 100644 --- a/miniapp/miniapp_reduction_to_band.cpp +++ b/miniapp/miniapp_reduction_to_band.cpp @@ -257,6 +257,5 @@ int main(int argc, char** argv) { pika::init_params p; p.desc_cmdline = desc_commandline; - p.rp_callback = dlaf::initResourcePartitionerHandler; return pika::init(pika_main, argc, argv, p); } diff --git a/miniapp/miniapp_triangular_multiplication.cpp b/miniapp/miniapp_triangular_multiplication.cpp index 93e07fc7b4..923c0edf82 100644 --- a/miniapp/miniapp_triangular_multiplication.cpp +++ b/miniapp/miniapp_triangular_multiplication.cpp @@ -225,6 +225,5 @@ int main(int argc, char** argv) { pika::init_params p; p.desc_cmdline = desc_commandline; - p.rp_callback = dlaf::initResourcePartitionerHandler; return pika::init(pika_main, argc, argv, p); } diff --git a/miniapp/miniapp_triangular_solver.cpp b/miniapp/miniapp_triangular_solver.cpp index 3ebdb9440e..d96fe73d91 100644 --- a/miniapp/miniapp_triangular_solver.cpp +++ b/miniapp/miniapp_triangular_solver.cpp @@ -244,6 +244,5 @@ int main(int argc, char** argv) { pika::init_params p; p.desc_cmdline = desc_commandline; - p.rp_callback = dlaf::initResourcePartitionerHandler; return pika::init(pika_main, argc, argv, p); } diff --git a/miniapp/miniapp_tridiag_solver.cpp b/miniapp/miniapp_tridiag_solver.cpp index 3457748c62..e61a1e5f4a 100644 --- a/miniapp/miniapp_tridiag_solver.cpp +++ b/miniapp/miniapp_tridiag_solver.cpp @@ -225,6 +225,5 @@ int main(int argc, char** argv) { pika::init_params p; p.desc_cmdline = desc_commandline; - p.rp_callback = dlaf::initResourcePartitionerHandler; return pika::init(pika_main, argc, argv, p); } diff --git a/src/c_api/init.cpp b/src/c_api/init.cpp index 0a36a70226..bd34b57ccf 100644 --- a/src/c_api/init.cpp +++ b/src/c_api/init.cpp @@ -26,7 +26,6 @@ void dlaf_initialize(int argc_pika, const char** argv_pika, int argc_dlaf, // pika initialization pika::init_params params; - params.rp_callback = dlaf::initResourcePartitionerHandler; params.desc_cmdline = desc; // After pika 0.21.0 pika::start reports errors only by exception and returns void #if PIKA_VERSION_FULL >= 0x001500 diff --git a/src/init.cpp b/src/init.cpp index 7e3c3072e4..ba9c3b34b1 100644 --- a/src/init.cpp +++ b/src/init.cpp @@ -19,6 +19,7 @@ #include #include +#include #include #include @@ -37,7 +38,7 @@ std::ostream& operator<<(std::ostream& os, const configuration& cfg) { << std::endl; os << " umpire_device_memory_pool_initial_bytes = " << cfg.umpire_device_memory_pool_initial_bytes << std::endl; - os << " mpi_pool = " << cfg.mpi_pool << std::endl; + os << " mpi_pool = " << pika::mpi::experimental::get_pool_name() << std::endl; return os; } @@ -59,9 +60,15 @@ template <> struct Init { static void initialize(const configuration& cfg) { memory::internal::initializeUmpireHostAllocator(cfg.umpire_host_memory_pool_initial_bytes); + if (pika::mpi::detail::environment::is_mpi_initialized()) { + pika::mpi::experimental::start_polling(pika::mpi::experimental::exception_mode::no_handler); + } } static void finalize() { + if (pika::mpi::detail::environment::is_mpi_initialized()) { + pika::mpi::experimental::stop_polling(); + } memory::internal::finalizeUmpireHostAllocator(); } }; @@ -106,6 +113,10 @@ template <> struct Init { static void initialize(const configuration& cfg) { const int device = 0; + + if (pika::mpi::detail::environment::is_mpi_initialized()) { + pika::mpi::experimental::start_polling(pika::mpi::experimental::exception_mode::no_handler); + } memory::internal::initializeUmpireDeviceAllocator(cfg.umpire_device_memory_pool_initial_bytes); initializeGpuPool(device, cfg.num_np_gpu_streams_per_thread, cfg.num_hp_gpu_streams_per_thread, cfg.num_gpu_blas_handles, cfg.num_gpu_lapack_handles); @@ -113,6 +124,9 @@ struct Init { } static void finalize() { + if (pika::mpi::detail::environment::is_mpi_initialized()) { + pika::mpi::experimental::stop_polling(); + } memory::internal::finalizeUmpireDeviceAllocator(); finalizeGpuPool(); } @@ -230,21 +244,21 @@ void updateConfiguration(const pika::program_options::variables_map& vm, configu updateConfigurationValue(vm, cfg.umpire_device_memory_pool_initial_bytes, "UMPIRE_DEVICE_MEMORY_POOL_INITIAL_BYTES", "umpire-device-memory-pool-initial-bytes"); - cfg.mpi_pool = (pika::resource::pool_exists("mpi")) ? "mpi" : "default"; + // cfg.mpi_pool = (pika::resource::pool_exists("mpi")) ? "mpi" : "default"; // Warn if not using MPI pool without --dlaf:no-mpi-pool - int mpi_initialized; - DLAF_MPI_CHECK_ERROR(MPI_Initialized(&mpi_initialized)); - if (mpi_initialized) { - int ntasks; - DLAF_MPI_CHECK_ERROR(MPI_Comm_size(MPI_COMM_WORLD, &ntasks)); - if (ntasks != 1 && cfg.mpi_pool == "default" && !vm["dlaf:no-mpi-pool"].as()) { - std::cerr << "Warning! DLA-Future is not using the \"mpi\" pika thread pool for " - "MPI communication but --dlaf:no-mpi-pool is not set. This may " - "indicate a bug in DLA-Future or pika. Performance may be degraded." - << std::endl; - } - } + // int mpi_initialized; + // DLAF_MPI_CHECK_ERROR(MPI_Initialized(&mpi_initialized)); + // if (mpi_initialized) { + // int ntasks; + // DLAF_MPI_CHECK_ERROR(MPI_Comm_size(MPI_COMM_WORLD, &ntasks)); + // if (ntasks != 1 && cfg.mpi_pool == "default" && !vm["dlaf:no-mpi-pool"].as()) { + // std::cerr << "Warning! DLA-Future is not using the \"mpi\" pika thread pool for " + // "MPI communication but --dlaf:no-mpi-pool is not set. This may " + // "indicate a bug in DLA-Future or pika. Performance may be degraded." + // << std::endl; + // } + // } // update tune parameters // @@ -421,31 +435,4 @@ ScopedInitializer::ScopedInitializer(int argc, const char* const argv[], const c ScopedInitializer::~ScopedInitializer() { finalize(); } - -void initResourcePartitionerHandler(pika::resource::partitioner& rp, - const pika::program_options::variables_map& vm) { - // Don't create the MPI pool if the user disabled it - if (vm["dlaf:no-mpi-pool"].as()) - return; - - // Don't create the MPI pool if there is a single process - int ntasks; - DLAF_MPI_CHECK_ERROR(MPI_Comm_size(MPI_COMM_WORLD, &ntasks)); - if (ntasks == 1) - return; - - // Disable idle backoff on the MPI pool - using pika::threads::scheduler_mode; - auto mode = scheduler_mode::default_mode; - mode = scheduler_mode(mode & ~scheduler_mode::enable_idle_backoff); - - // Create a thread pool with a single core that we will use for all - // communication related tasks - rp.create_thread_pool("mpi", pika::resource::scheduling_policy::static_priority, mode); -#if PIKA_VERSION_FULL >= 0x001C00 // >= 0.28.0 - rp.add_resource(rp.sockets()[0].cores()[0].pus()[0], "mpi"); -#else - rp.add_resource(rp.numa_domains()[0].cores()[0].pus()[0], "mpi"); -#endif -} } diff --git a/test/src/gtest_mpipika_main.cpp b/test/src/gtest_mpipika_main.cpp index a7f9b23e09..2c40f6e14b 100644 --- a/test/src/gtest_mpipika_main.cpp +++ b/test/src/gtest_mpipika_main.cpp @@ -103,7 +103,6 @@ GTEST_API_ int main(int argc, char** argv) { pika::init_params p; p.desc_cmdline = desc_commandline; - p.rp_callback = dlaf::initResourcePartitionerHandler; // Initialize pika auto ret = pika::init(test_main, argc, argv, p); diff --git a/test/unit/communication/test_comm_sender.cpp b/test/unit/communication/test_comm_sender.cpp index 3083301660..d03867e696 100644 --- a/test/unit/communication/test_comm_sender.cpp +++ b/test/unit/communication/test_comm_sender.cpp @@ -43,10 +43,7 @@ void test_transform_mpi() { auto send = just(send_buf.data(), size, dtype, send_rank, tag, comm) | transformMPI(MPI_Isend); auto recv = just(recv_buf.data(), size, dtype, recv_rank, tag, comm) | transformMPI(MPI_Irecv); - sync_wait(when_all(std::move(send), std::move(recv)) | then([](int e1, int e2) { - DLAF_MPI_CHECK_ERROR(e1); - DLAF_MPI_CHECK_ERROR(e2); - })); + sync_wait(when_all(std::move(send), std::move(recv)) | then([]() {})); std::vector expected_recv_buf(static_cast(size), recv_rank); @@ -65,8 +62,7 @@ TEST(Bcast, Polling) { double val = (comm.rank() == root_rank) ? 4.2 : 1.2; std::vector buf(static_cast(size), val); - sync_wait(just(buf.data(), size, dtype, root_rank, comm) | transformMPI(MPI_Ibcast) | - then([](int e) { DLAF_MPI_CHECK_ERROR(e); })); + sync_wait(just(buf.data(), size, dtype, root_rank, comm) | transformMPI(MPI_Ibcast) | then([]() {})); std::vector expected_buf(static_cast(size), 4.2); ASSERT_TRUE(expected_buf == buf); diff --git a/test/unit/communication/test_transform_mpi.cpp b/test/unit/communication/test_transform_mpi.cpp index bf9f857e0d..28f67fa28e 100644 --- a/test/unit/communication/test_transform_mpi.cpp +++ b/test/unit/communication/test_transform_mpi.cpp @@ -65,11 +65,7 @@ TEST_F(TransformMPITest, PromiseGuardManagement) { int message; whenAllLift(&message, 1, MPI_INT, 1, 0, chain.exclusive()) | transformMPI(MPI_Irecv) | - ex::then([&sent_guard](auto mpi_err_code) { - EXPECT_EQ(MPI_SUCCESS, mpi_err_code); - sent_guard = true; - }) | - ex::ensure_started(); + ex::then([&sent_guard](/*auto mpi_err_code*/) { sent_guard = true; }) | ex::ensure_started(); // Note: // At this point IRecv is (getting) posted but it won't complete until this Rank 0 will trigger