From 91bfca483d9006bd173b832cc279502c9c5c65c4 Mon Sep 17 00:00:00 2001 From: Alex Margolin Date: Sun, 18 Oct 2020 18:23:09 +0300 Subject: [PATCH 1/2] UCG: backport to recent UCX tags without UCG-specific extensions Signed-off-by: Alex Margolin --- api/ucg_plan_component.h | 10 +++ base/ucg_context.c | 111 ++++++++++++++++++++++++---------- base/ucg_plan.c | 32 ++++++++++ builtin/builtin.c | 19 ++++-- builtin/ops/builtin_control.c | 45 ++++++++++++-- builtin/ops/builtin_data.c | 2 + builtin/ops/builtin_ops.h | 8 +-- builtin/ops/builtin_pack.c | 4 ++ configure.m4 | 39 ++++++++---- 9 files changed, 211 insertions(+), 59 deletions(-) diff --git a/api/ucg_plan_component.h b/api/ucg_plan_component.h index adbedb6..b673af1 100644 --- a/api/ucg_plan_component.h +++ b/api/ucg_plan_component.h @@ -19,6 +19,7 @@ #include #include #include +#include BEGIN_C_DECLS @@ -307,6 +308,15 @@ ucs_status_t ucg_ft_propagate(ucg_group_h group, const ucg_group_params_t *params, uct_ep_h new_ep); + +#ifndef UCS_ALLOC_CHECK +#define UCS_ALLOC_CHECK(size, name) ({ \ + void* ptr = ucs_malloc(size, name); \ + if (ptr == 0) return UCS_ERR_NO_MEMORY; \ + ptr; \ +}) +#endif + END_C_DECLS #endif diff --git a/base/ucg_context.c b/base/ucg_context.c index a246ea4..19dbb12 100644 --- a/base/ucg_context.c +++ b/base/ucg_context.c @@ -31,11 +31,48 @@ ucs_config_field_t ucg_config_table[] = { UCS_CONFIG_REGISTER_TABLE(ucg_config_table, "UCG context", NULL, ucg_config_t) +#ifndef HAVE_UCP_EXTENSIONS +typedef struct ucp_workaround { + uct_am_callback_t wrapper_cb; + uct_am_callback_t cb; + void *arg; +} ucp_workaround_t; + +extern ucp_workaround_t ucp_workaround_cb[]; + +static ucs_status_t ucg_context_worker_wrapperA_am_cb(void *arg, void *data, + size_t length, + unsigned flags) +{ + return ucp_workaround_cb[0].cb(ucp_workaround_cb[0].cb, data, length, flags); +} + +static ucs_status_t ucg_context_worker_wrapperB_am_cb(void *arg, void *data, + size_t length, + unsigned flags) +{ + return ucp_workaround_cb[1].cb(ucp_workaround_cb[1].cb, data, length, flags); +} + +static ucs_status_t ucg_context_worker_wrapperC_am_cb(void *arg, void *data, + size_t length, + unsigned flags) +{ + return ucp_workaround_cb[2].cb(ucp_workaround_cb[2].cb, data, length, flags); +} + +static unsigned ucp_workaround_cnt = 0; +ucp_workaround_t ucp_workaround_cb[] = { + { .wrapper_cb = ucg_context_worker_wrapperA_am_cb }, + { .wrapper_cb = ucg_context_worker_wrapperB_am_cb }, + { .wrapper_cb = ucg_context_worker_wrapperC_am_cb } +}; +#endif + ucs_status_t ucg_context_set_am_handler(ucg_plan_ctx_h plan_ctx, uint8_t id, uct_am_callback_t cb, uct_am_tracer_t tracer) { -#ifdef HAVE_UCP_EXTENSIONS /* * Set the Active Message handler (before creating the UCT interfaces) * @@ -46,22 +83,22 @@ ucs_status_t ucg_context_set_am_handler(ucg_plan_ctx_h plan_ctx, uint8_t id, * guarantees once init is finished on any of the processes - the others are * aware of this ID and messages can be sent. */ - ucp_am_handler_t* am_handler = ucp_am_handlers + id; - am_handler->features = UCP_FEATURE_GROUPS; - am_handler->cb = cb; - am_handler->tracer = (ucp_am_tracer_t)tracer; - am_handler->flags = UCT_CB_FLAG_ALT_ARG; - am_handler->alt_arg = plan_ctx; + ucp_am_handler_t* am_handler = ucp_am_handlers + id; + am_handler->tracer = (ucp_am_tracer_t)tracer; +#ifdef HAVE_UCP_EXTENSIONS + am_handler->features = UCP_FEATURE_GROUPS; + am_handler->flags = UCT_CB_FLAG_ALT_ARG; + am_handler->cb = cb; + am_handler->alt_arg = plan_ctx; #else - for (i = 0; i < worker->num_ifaces; i++) { - ucs_status_t status = uct_iface_set_am_handler(worker->ifaces[i]->iface, - assigned_am_id, - am_cb, worker, 0); - if (status != UCS_OK) { - return status; - } - } - /* TODO: need some synchronization to avoid AM_ID race condition */ + unsigned wa_idx = ucp_workaround_cnt++; + am_handler->cb = ucp_workaround_cb[wa_idx].wrapper_cb; + ucp_workaround_cb[wa_idx].arg = cb; + ucp_workaround_cb[wa_idx].arg = plan_ctx; + am_handler->features = 0; + am_handler->flags = 0; + + ucs_assert_always(ucp_workaround_cnt <= ucs_array_size(ucp_workaround_cb)); #endif return UCS_OK; @@ -93,15 +130,6 @@ static ucs_status_t ucg_context_init(void *groups_ctx) goto cleanup_pctx; } -#if ENABLE_FAULT_TOLERANCE - /* Initialize the fault-tolerance context for the entire UCG layer */ - status = ucg_ft_init(&worker->async, new_group, ucg_base_am_id + idx, - ucg_group_fault_cb, ctx, &ctx->ft_ctx); - if (status != UCS_OK) { - goto cleanup_pctx; - } -#endif - ucs_list_head_init(&ctx->groups_head); return UCS_OK; @@ -125,17 +153,12 @@ static void ucg_context_cleanup(void *groups_ctx) } } -#if ENABLE_FAULT_TOLERANCE - if (ucs_list_is_empty(&group->list)) { - ucg_ft_cleanup(&gctx->ft_ctx); - } -#endif - ucg_plan_finalize(ctx->planners, ctx->num_planners, ctx->planners_ctx); ucs_free(ctx->planners_ctx); ucs_free(ctx->planners); } +#ifdef HAVE_UCP_EXTENSIONS static void ucg_context_copy_used_ucp_params(ucp_params_t *dst, const ucp_params_t *src) { @@ -191,6 +214,10 @@ static void ucg_context_copy_used_ucp_params(ucp_params_t *dst, memcpy(dst, src, ucp_params_size); } +#else +#define ucs_count_leading_zero_bits(_n) \ + ((sizeof(_n) <= 4) ? __builtin_clz((uint32_t)(_n)) : __builtin_clzl(_n)) +#endif static void ucg_context_copy_used_ucg_params(ucg_params_t *dst, const ucg_params_t *src) @@ -277,9 +304,14 @@ ucs_status_t ucg_init_version(unsigned ucg_api_major_version, /* Store the UCG params in a global location, for easy access */ ucg_context_copy_used_ucg_params(&ucg_global_params, params); +#ifndef HAVE_UCP_EXTENSIONS + const ucp_params_t *ucp_params_arg = params->super; +#else /* Avoid overwriting the headroom value by copying all UCP params aside */ ucp_params_t ucp_params; + const ucp_params_t *ucp_params_arg = &ucp_params; ucg_context_copy_used_ucp_params(&ucp_params, params->super); + /* * Note: This appears to be an overkill, but the reason is we want to change * the value UCG passes to UCP during initialization, without changing @@ -295,16 +327,31 @@ ucs_status_t ucg_init_version(unsigned ucg_api_major_version, ucp_params.field_mask |= UCP_PARAM_FIELD_CONTEXT_HEADROOM; ucp_params.context_headroom += ucs_offsetof(ucg_context_t, ucp_ctx); +#endif ucg_global_params.super = NULL; /* Should never be accessed again */ /* Create the UCP context, which should have room for UCG in its headroom */ status = ucp_init_version(ucp_api_major_version, ucp_api_minor_version, - &ucp_params, config->ucp_config, + ucp_params_arg, config->ucp_config, (ucp_context_h*)context_p); if (status != UCS_OK) { goto err_config; } +#ifndef HAVE_UCP_EXTENSIONS + /* Below is a somewhat "hacky" workaround, involving moving UCP context */ + size_t headroom = ucs_offsetof(ucg_context_t, ucp_ctx); + ucg_context_h tmp = ucs_realloc(*context_p, + headroom + sizeof(ucp_context_t), + "ucg_context"); + if (tmp == NULL) { + ucp_cleanup(*(ucp_context_h*)context_p); + goto err_config; + } + + memmove(UCS_PTR_BYTE_OFFSET(tmp, headroom), tmp, sizeof(ucp_context_t)); +#endif + *context_p = ucs_container_of(*context_p, ucg_context_t, ucp_ctx); status = ucg_context_init(*context_p); if (status != UCS_OK) { diff --git a/base/ucg_plan.c b/base/ucg_plan.c index 7a23960..2fade87 100644 --- a/base/ucg_plan.c +++ b/base/ucg_plan.c @@ -149,6 +149,22 @@ ucs_status_t ucg_plan_init(ucg_plan_desc_t *descs, unsigned desc_cnt, continue; } +#ifndef HAVE_UCP_EXTENSIONS + /* + * Find an unused AM_ID between 0 and UCP_AM_ID_LAST, because UCP will + * disregard any value above that (since UCP_AM_ID_MAX isn't there). + */ + if (am_id == UCP_AM_ID_LAST) { + am_id = 1; /* AM ID #0 would complicate debugging */ + } + + while (ucp_am_handlers[am_id].cb != NULL) { + am_id++; + } + + ucs_assert_always(am_id < UCP_AM_ID_LAST); +#endif + plan_config->am_id = &am_id; status = comp->init(plan, plan_config); @@ -163,6 +179,15 @@ ucs_status_t ucg_plan_init(ucg_plan_desc_t *descs, unsigned desc_cnt, total_size += comp->per_group_ctx_size; } +#if ENABLE_FAULT_TOLERANCE + /* Initialize the fault-tolerance context for the entire UCG layer */ + status = ucg_ft_init(&worker->async, new_group, ucg_base_am_id + idx, + ucg_group_fault_cb, ctx, &ctx->ft_ctx); + if (status != UCS_OK) { + goto cleanup_pctx; + } +#endif + *per_group_ctx_size = total_size; return UCS_OK; @@ -171,6 +196,13 @@ ucs_status_t ucg_plan_init(ucg_plan_desc_t *descs, unsigned desc_cnt, void ucg_plan_finalize(ucg_plan_desc_t *descs, unsigned desc_cnt, ucg_plan_ctx_h plan) { + +#if ENABLE_FAULT_TOLERANCE + if (ucs_list_is_empty(&group->list)) { + ucg_ft_cleanup(&gctx->ft_ctx); + } +#endif + void *dummy_group = NULL; ucg_plan_foreach(descs, desc_cnt, plan, dummy_group) { comp->finalize(plan); diff --git a/builtin/builtin.c b/builtin/builtin.c index 156e6d0..b08ff5b 100644 --- a/builtin/builtin.c +++ b/builtin/builtin.c @@ -16,9 +16,12 @@ /* Backport to UCX v1.6.0 */ #ifndef UCS_MEMUNITS_INF #define UCS_MEMUNITS_INF UCS_CONFIG_MEMUNITS_INF -#define CONDITIONAL_NULL -#else +#endif + +#ifdef HAVE_UCP_EXTENSIONS #define CONDITIONAL_NULL ,NULL +#else +#define CONDITIONAL_NULL #endif #define UCG_BUILTIN_PARAM_MASK (UCG_GROUP_PARAM_FIELD_ID |\ @@ -165,12 +168,14 @@ UCS_PROFILE_FUNC(ucs_status_t, ucg_builtin_am_handler, header->group_id, header->msg.coll_id, slot->req.latest.coll_id, header->msg.step_idx, slot->req.latest.step_idx); +#ifdef HAVE_UCT_COLLECTIVES /* In case of a stride - the stored length is actually longer */ if (am_flags & UCT_CB_PARAM_FLAG_STRIDE) { length = sizeof(ucg_builtin_header_t) + (length - sizeof(ucg_builtin_header_t)) * (gctx->group_params->member_count - 1); } +#endif /* Store the message (if the relevant step has not been reached) */ ucp_recv_desc_t *rdesc; @@ -521,6 +526,7 @@ void ucg_builtin_print_flags(ucg_builtin_op_step_t *step) flag = ((step->flags & UCG_BUILTIN_OP_STEP_FLAG_TEMP_BUFFER_USED) != 0); printf("\n\tTEMP_BUFFER_USED:\t%i", flag); +#ifdef HAVE_UCP_EXTENSIONS flag = ((step->flags & UCG_BUILTIN_OP_STEP_FLAG_PACKED_DTYPE_MODE) != 0); printf("\n\tPACKED_DTYPE_MODE:\t%i", flag); if (flag) { @@ -557,6 +563,7 @@ void ucg_builtin_print_flags(ucg_builtin_op_step_t *step) break; } } +#endif printf("\n\tData aggregation:\t"); switch (step->comp_aggregation) { @@ -816,15 +823,19 @@ ucs_status_t ucg_builtin_connect(ucg_builtin_group_ctx_t *ctx, #endif if (is_mock) { // TODO: allocate mock attributes according to flags (and later free it) - unsigned dtype_support = UCS_MASK(UCT_COLL_DTYPE_MODE_LAST); memset(&mock_ep_attr, 0, sizeof(mock_ep_attr)); - mock_ep_attr.cap.am.max_short = SIZE_MAX; +#ifdef HAVE_UCT_COLLECTIVES + unsigned dtype_support = UCS_MASK(UCT_COLL_DTYPE_MODE_LAST); mock_ep_attr.cap.flags = UCT_IFACE_FLAG_AM_SHORT | UCT_IFACE_FLAG_INCAST | UCT_IFACE_FLAG_BCAST; mock_ep_attr.cap.am.coll_mode_flags = dtype_support; mock_ep_attr.cap.coll_mode.short_flags = dtype_support; +#else + mock_ep_attr.cap.flags = UCT_IFACE_FLAG_AM_SHORT; +#endif + mock_ep_attr.cap.am.max_short = SIZE_MAX; phase->iface_attr = &mock_ep_attr; phase->md = NULL; diff --git a/builtin/ops/builtin_control.c b/builtin/ops/builtin_control.c index 546ed91..9219b1e 100644 --- a/builtin/ops/builtin_control.c +++ b/builtin/ops/builtin_control.c @@ -217,7 +217,19 @@ void ucg_builtin_print_pack_cb_name(uct_pack_callback_t pack_single_cb) } } -static void ucg_builtin_step_am_zcopy_comp_step_check_cb(uct_completion_t *self) +/* + * Note: Change-ID 8da6a5be2e changed the UCT API w.r.t. uct_pending_callback_t: + * + * After: typedef void (*uct_completion_callback_t)(uct_completion_t *self); + * Before: typedef void (*uct_completion_callback_t)(uct_completion_t *self, + * ucs_status_t status); + */ +static void ucg_builtin_step_am_zcopy_comp_step_check_cb(uct_completion_t *self +#ifdef HAVE_UCT_COMP_CB_STATUS_ARG + , ucs_status_t status) +#else + ) +#endif { ucg_builtin_zcomp_t *zcomp = ucs_container_of(self, ucg_builtin_zcomp_t, comp); ucg_builtin_request_t *req = zcomp->req; @@ -378,11 +390,19 @@ static inline ucs_status_t ucg_builtin_step_send_flags(ucg_builtin_op_step_t *step, ucg_builtin_plan_phase_t *phase, const ucg_collective_params_t *params, +#ifdef HAVE_UCT_COLLECTIVES uct_coll_dtype_mode_t mode, +#endif uint64_t *send_flag) { size_t dt_len = params->send.dt_len; size_t length = step->buffer_length; + +#ifndef HAVE_UCT_COLLECTIVES + int supports_short = (phase->iface_attr->cap.flags & UCT_IFACE_FLAG_AM_SHORT); + int supports_bcopy = (phase->iface_attr->cap.flags & UCT_IFACE_FLAG_AM_BCOPY); + int supports_zcopy = (phase->iface_attr->cap.flags & UCT_IFACE_FLAG_AM_ZCOPY); +#else int supports_short = (phase->iface_attr->cap.flags & UCT_IFACE_FLAG_AM_SHORT) && ((phase->iface_attr->cap.coll_mode.short_flags & UCS_BIT(mode)) || (mode == UCT_COLL_DTYPE_MODE_PADDED)); @@ -395,6 +415,7 @@ ucg_builtin_step_send_flags(ucg_builtin_op_step_t *step, ucs_assert((mode == UCT_COLL_DTYPE_MODE_PADDED) || (phase->iface_attr->cap.am.coll_mode_flags & mode)); +#endif /* * Short messages @@ -409,12 +430,16 @@ ucg_builtin_step_send_flags(ucg_builtin_op_step_t *step, return UCS_OK; } +#ifdef HAVE_UCT_COLLECTIVES size_t max_bcopy = phase->iface_attr->cap.am.max_bcopy; size_t short_msg_count = length / max_short + ((length % max_short) != 0); size_t bcopy_msg_count = supports_bcopy ? (length / max_bcopy + ((length % max_bcopy) != 0)) : SIZE_MAX; int is_short_best = (short_msg_count * phase->iface_attr->overhead_short) < (bcopy_msg_count * phase->iface_attr->overhead_bcopy); +#else + int is_short_best = 1; +#endif if (is_short_best || (!supports_bcopy && !supports_zcopy)) { /* Short send - multiple messages */ @@ -499,7 +524,9 @@ void ucg_builtin_step_select_packers(const ucg_collective_params_t *params, (ucg_global_params.type_info.mpi_is_sum_f != NULL) && (ucg_global_params.type_info.mpi_is_sum_f(params->send.op_ext))) { int is_single = (params->send.count == 1); +#ifdef HAVE_UCT_COLLECTIVES step->uct_flags |= (UCT_SEND_FLAG_PACK_LOCK << UCT_COLL_DTYPE_MODE_BITS); +#endif switch (params->send.dt_len) { case 1: step->bcopy.pack_single_cb = is_single ? @@ -619,8 +646,10 @@ ucs_status_t ucg_builtin_step_create(ucg_builtin_plan_t *plan, (int8_t*)params->send.buf; } - uct_coll_dtype_mode_t mode; + uint64_t send_flags; int is_concat = modifiers & UCG_GROUP_COLLECTIVE_MODIFIER_CONCATENATE; +#ifdef HAVE_UCT_COLLECTIVES + uct_coll_dtype_mode_t mode; if (is_concat) { if (modifiers & UCG_GROUP_COLLECTIVE_MODIFIER_VARIADIC) { mode = UCT_COLL_DTYPE_MODE_VAR_DTYPE; @@ -634,9 +663,12 @@ ucs_status_t ucg_builtin_step_create(ucg_builtin_plan_t *plan, } /* Decide how the messages are sent (regardless of my role) */ - uint64_t send_flags; ucs_status_t status = ucg_builtin_step_send_flags(step, phase, params, mode, &send_flags); +#else + ucs_status_t status = ucg_builtin_step_send_flags(step, phase, params, + &send_flags); +#endif if (ucs_unlikely(status != UCS_OK)) { return status; } @@ -652,7 +684,6 @@ ucs_status_t ucg_builtin_step_create(ucg_builtin_plan_t *plan, * follow-up, by filling the missing step with remote key exchange. */ if ((send_flags & UCT_IFACE_FLAG_AM_ZCOPY) && !(*zcopy_step_skip)) { - printf("ZCOPY!\n\n"); *zcopy_step_skip = 1; step++; goto zcopy_redo; @@ -665,6 +696,7 @@ ucs_status_t ucg_builtin_step_create(ucg_builtin_plan_t *plan, "ucg_builtin_step_pipelining"); } +#ifdef HAVE_UCT_COLLECTIVES if ((phase->method == UCG_PLAN_METHOD_SEND_TO_SM_ROOT) && (phase->iface_attr->cap.flags & (UCT_IFACE_FLAG_INCAST | UCT_IFACE_FLAG_BCAST))) { @@ -681,6 +713,7 @@ ucs_status_t ucg_builtin_step_create(ucg_builtin_plan_t *plan, //step->zcopy.iov[1].buffer = (void*)params->send.displs; } } +#endif /* Do any special assignment w.r.t. the src/dst buffers in this step */ int is_send = 0; @@ -847,7 +880,11 @@ ucs_status_t ucg_builtin_step_create(ucg_builtin_plan_t *plan, } /* Choose the data-related action to be taken by the incoming AM handler */ +#ifdef HAVE_UCT_COLLECTIVES int is_incast_used = (phase->iface_attr->cap.flags & UCT_IFACE_FLAG_INCAST) != 0; +#else + int is_incast_used = 0; +#endif if (is_barrier || !(step->flags & UCG_BUILTIN_STEP_RECV_FLAGS)) { step->comp_aggregation = UCG_BUILTIN_OP_STEP_COMP_AGGREGATE_NOP; } else if ((send_flags & UCT_IFACE_FLAG_AM_ZCOPY) && (zcopy_step_skip)) { diff --git a/builtin/ops/builtin_data.c b/builtin/ops/builtin_data.c index bb222ce..5a5bc95 100644 --- a/builtin/ops/builtin_data.c +++ b/builtin/ops/builtin_data.c @@ -95,9 +95,11 @@ ucg_builtin_step_am_short_max(ucg_builtin_request_t *req, UCG_BUILTIN_ASSERT_SEND(step, AM_SHORT); ucs_assert(step->iter_offset != UCG_BUILTIN_OFFSET_PIPELINE_READY); ucs_assert(step->iter_offset != UCG_BUILTIN_OFFSET_PIPELINE_PENDING); +#ifdef HAVE_UCT_COLLECTIVES ucs_assert(frag_size == (is_packed ? UCT_COLL_DTYPE_MODE_UNPACK_VALUE(step->fragment_length) : step->fragment_length)); +#endif /* send every fragment but the last */ if (ucs_likely(buffer_iter < buffer_iter_limit)) { diff --git a/builtin/ops/builtin_ops.h b/builtin/ops/builtin_ops.h index c684cb4..83c77be 100644 --- a/builtin/ops/builtin_ops.h +++ b/builtin/ops/builtin_ops.h @@ -14,12 +14,8 @@ #include #include -#ifdef HAVE_UCP_EXTENSIONS -#define UCS_ALLOC_CHECK(size, name) ({ \ - void* ptr = ucs_malloc(size, name); \ - if (ptr == 0) return UCS_ERR_NO_MEMORY; \ - ptr; \ -}) +#ifndef HAVE_UCP_EXTENSIONS +#define UCT_COLL_DTYPE_MODE_BITS (0) #endif BEGIN_C_DECLS diff --git a/builtin/ops/builtin_pack.c b/builtin/ops/builtin_pack.c index 9998496..1c80f66 100644 --- a/builtin/ops/builtin_pack.c +++ b/builtin/ops/builtin_pack.c @@ -7,6 +7,10 @@ #include +#ifndef HAVE_UCT_COLLECTIVES +#define UCT_PACK_CALLBACK_REDUCE ((uintptr_t)-1) +#endif + int ucg_builtin_atomic_reduce_full(ucg_builtin_request_t *req, void *src, void *dst, size_t length); int ucg_builtin_atomic_reduce_part(ucg_builtin_request_t *req, diff --git a/configure.m4 b/configure.m4 index 15914eb..11bdae5 100644 --- a/configure.m4 +++ b/configure.m4 @@ -3,20 +3,33 @@ # See file LICENSE for terms. # - # - # Enable fault-tolerance - # - AC_ARG_ENABLE([fault-tolerance], - AS_HELP_STRING([--enable-fault-tolerance], - [Enable fault-tolerance, default: NO]), - [], - [enable_fault_tolerance=no]) - AS_IF([test "x$enable_fault_tolerance" = xyes], - [AS_MESSAGE([enabling with fault-tolerance]) - AC_DEFINE([ENABLE_FAULT_TOLERANCE], [1], [Enable fault-tolerance])], - [:] - ) +# +# Enable fault-tolerance +# +AC_ARG_ENABLE([fault-tolerance], + [AS_HELP_STRING([--enable-fault-tolerance], + [Enable fault-tolerance, default: NO])], + [], + [enable_fault_tolerance=no]) + +AS_IF([test "x$enable_fault_tolerance" = xyes], + [AS_MESSAGE([enabling with fault-tolerance]) + AC_DEFINE([ENABLE_FAULT_TOLERANCE], [1], [Enable fault-tolerance])], + [:]) +# +# Detect some UCT API incompatiblity (see UCX Change-ID 8da6a5be2e) +# +SAVE_CPPFLAGS="$CPPFLAGS" +CPPFLAGS="-Isrc/ $CPPFLAGS" +AC_COMPILE_IFELSE([AC_LANG_PROGRAM([[#include "uct/api/uct.h"]], + [[uct_completion_callback_t func = NULL;] + [func(NULL, UCS_OK);]])], + [AC_MSG_RESULT([uct_completion_callback_t has a status argument]) + AC_DEFINE([HAVE_UCT_COMP_CB_STATUS_ARG], [], + [Does uct_completion_callback_t have a "status" argument])], + [AC_MSG_RESULT([uct_completion_callback_t has no status argument])]) +CPPFLAGS="$SAVE_CPPFLAGS" ucg_modules=":builtin" m4_include([src/ucg/base/configure.m4]) From dea6d48efe784aeb62ec4a4ecd26b3256ab7f19d Mon Sep 17 00:00:00 2001 From: Alex Margolin Date: Thu, 22 Oct 2020 00:56:08 +0300 Subject: [PATCH 2/2] UCG: non-contiguous datatype support Signed-off-by: Alex Margolin --- api/ucg.h | 5 +++- builtin/builtin.c | 4 +++ builtin/ops/builtin_comp_step.inl | 10 ++++++- builtin/ops/builtin_control.c | 49 +++++++++++++++++++------------ builtin/ops/builtin_ops.h | 13 +++++++- builtin/ops/builtin_pack.c | 26 ++++++++++++++-- 6 files changed, 84 insertions(+), 23 deletions(-) diff --git a/api/ucg.h b/api/ucg.h index 99e3338..f008b24 100644 --- a/api/ucg.h +++ b/api/ucg.h @@ -139,6 +139,9 @@ typedef struct ucg_params { /* Basic MPI Operation type and data-type information - using callbacks */ struct { + /* Check to determine if an MPI datatype (array) is contiguous */ + int (*mpi_is_contig_f)(void *mpi_dtype, int count); + /* Check to determine if an MPI datatype is an integer (of any length) */ int (*mpi_is_int_f)(void *mpi_dtype); @@ -199,7 +202,7 @@ enum ucg_collective_modifiers { /* Buffer/Data Management Considerations */ UCG_GROUP_COLLECTIVE_MODIFIER_AGGREGATE_STABLE = UCS_BIT( 8), /* stable reduction */ - UCG_GROUP_COLLECTIVE_MODIFIER_IN_PLACE = UCS_BIT( 9), /* otherwise two buffers */ + UCG_GROUP_COLLECTIVE_MODIFIER_NONCONTIG_DATATYPE = UCS_BIT( 9), /* some may be non-contiguous */ UCG_GROUP_COLLECTIVE_MODIFIER_PERSISTENT = UCS_BIT(10), /* otherwise destroy coll_h */ UCG_GROUP_COLLECTIVE_MODIFIER_SYMMETRIC = UCS_BIT(11), /* persistent on all ranks */ UCG_GROUP_COLLECTIVE_MODIFIER_BARRIER = UCS_BIT(12), /* prevent others from starting */ diff --git a/builtin/builtin.c b/builtin/builtin.c index b08ff5b..cd697f9 100644 --- a/builtin/builtin.c +++ b/builtin/builtin.c @@ -575,6 +575,10 @@ void ucg_builtin_print_flags(ucg_builtin_op_step_t *step) printf("write"); break; + case UCG_BUILTIN_OP_STEP_COMP_AGGREGATE_WRITE_UNPACKED: + printf("write (unpacked)"); + break; + case UCG_BUILTIN_OP_STEP_COMP_AGGREGATE_GATHER_TERMINAL: printf("gather (terminal)"); break; diff --git a/builtin/ops/builtin_comp_step.inl b/builtin/ops/builtin_comp_step.inl index 6f99598..8384cd3 100644 --- a/builtin/ops/builtin_comp_step.inl +++ b/builtin/ops/builtin_comp_step.inl @@ -154,7 +154,7 @@ static void ucg_builtin_comp_gather(uint8_t *recv_buffer, uint8_t *data, memcpy(recv_buffer + first_part + length, data, length - first_part); } -static UCS_F_ALWAYS_INLINE void +static UCS_F_ALWAYS_INLINE ucs_status_t ucg_builtin_comp_unpack_rkey(ucg_builtin_request_t *req, uint64_t remote_addr, uint8_t *packed_remote_key) { @@ -168,6 +168,8 @@ ucg_builtin_comp_unpack_rkey(ucg_builtin_request_t *req, uint64_t remote_addr, if (ucs_unlikely(status != UCS_OK)) { ucg_builtin_comp_last_step_cb(req, status); } + + return status; } static int UCS_F_ALWAYS_INLINE @@ -210,6 +212,12 @@ ucg_builtin_step_recv_handle_data(ucg_builtin_request_t *req, uint64_t offset, memcpy(step->recv_buffer + offset, data, length); break; + case UCG_BUILTIN_OP_STEP_COMP_AGGREGATE_WRITE_UNPACKED: + /* Note: worker is NULL since it isn't required for host-based memory */ + ucp_dt_unpack_only(NULL, step->recv_buffer + offset, step->unpack_count, + step->bcopy.datatype, UCS_MEMORY_TYPE_HOST, data, length, 0); + break; + case UCG_BUILTIN_OP_STEP_COMP_AGGREGATE_GATHER_TERMINAL: ucg_builtin_comp_gather(step->recv_buffer, data, offset, step->buffer_length, length, diff --git a/builtin/ops/builtin_control.c b/builtin/ops/builtin_control.c index 9219b1e..d1eba92 100644 --- a/builtin/ops/builtin_control.c +++ b/builtin/ops/builtin_control.c @@ -558,23 +558,29 @@ void ucg_builtin_step_select_packers(const ucg_collective_params_t *params, } } - int is_variadic = (params->type.modifiers & - UCG_GROUP_COLLECTIVE_MODIFIER_VARIADIC); - - step->bcopy.pack_full_cb = is_reduce ? - UCG_BUILTIN_PACKER_NAME(_reducing_, full) : - is_variadic ? UCG_BUILTIN_PACKER_NAME(_variadic_, full) : - UCG_BUILTIN_PACKER_NAME(_, full); - - step->bcopy.pack_part_cb = is_reduce ? - UCG_BUILTIN_PACKER_NAME(_reducing_, part) : - is_variadic ? UCG_BUILTIN_PACKER_NAME(_variadic_, part) : - UCG_BUILTIN_PACKER_NAME(_, part); - - step->bcopy.pack_single_cb = is_reduce ? - UCG_BUILTIN_PACKER_NAME(_reducing_, single) : - is_variadic ? UCG_BUILTIN_PACKER_NAME(_variadic_, single) : - UCG_BUILTIN_PACKER_NAME(_, single); + int is_variadic = (params->type.modifiers & + UCG_GROUP_COLLECTIVE_MODIFIER_VARIADIC); + int is_src_contig = ((ucg_global_params.type_info.mpi_is_contig_f != NULL) && + (ucg_global_params.type_info.mpi_is_contig_f(params->send.dt_ext, + params->send.count))); + + if (is_reduce) { + step->bcopy.pack_full_cb = UCG_BUILTIN_PACKER_NAME(_reducing_, full); + step->bcopy.pack_part_cb = UCG_BUILTIN_PACKER_NAME(_reducing_, part); + step->bcopy.pack_single_cb = UCG_BUILTIN_PACKER_NAME(_reducing_, single); + } else if (is_variadic) { + step->bcopy.pack_full_cb = UCG_BUILTIN_PACKER_NAME(_variadic_, full); + step->bcopy.pack_part_cb = UCG_BUILTIN_PACKER_NAME(_variadic_, part); + step->bcopy.pack_single_cb = UCG_BUILTIN_PACKER_NAME(_variadic_, single); + } else if (!is_src_contig) { + step->bcopy.pack_full_cb = UCG_BUILTIN_PACKER_NAME(_datatype_, full); + step->bcopy.pack_part_cb = UCG_BUILTIN_PACKER_NAME(_datatype_, part); + step->bcopy.pack_single_cb = UCG_BUILTIN_PACKER_NAME(_datatype_, single); + } else { + step->bcopy.pack_full_cb = UCG_BUILTIN_PACKER_NAME(_, full); + step->bcopy.pack_part_cb = UCG_BUILTIN_PACKER_NAME(_, part); + step->bcopy.pack_single_cb = UCG_BUILTIN_PACKER_NAME(_, single); + } } #define UCG_BUILTIN_STEP_RECV_FLAGS (UCG_BUILTIN_OP_STEP_FLAG_RECV_AFTER_SEND |\ @@ -880,6 +886,9 @@ ucs_status_t ucg_builtin_step_create(ucg_builtin_plan_t *plan, } /* Choose the data-related action to be taken by the incoming AM handler */ + int is_dst_contig = ((ucg_global_params.type_info.mpi_is_contig_f != NULL) && + (ucg_global_params.type_info.mpi_is_contig_f(params->recv.dt_ext, + params->recv.count))); #ifdef HAVE_UCT_COLLECTIVES int is_incast_used = (phase->iface_attr->cap.flags & UCT_IFACE_FLAG_INCAST) != 0; #else @@ -902,8 +911,12 @@ ucs_status_t ucg_builtin_step_create(ucg_builtin_plan_t *plan, } else { step->comp_aggregation = UCG_BUILTIN_OP_STEP_COMP_AGGREGATE_REDUCE_SINGLE; } - } else { + } else if (is_dst_contig) { step->comp_aggregation = UCG_BUILTIN_OP_STEP_COMP_AGGREGATE_WRITE; + } else { + step->comp_aggregation = UCG_BUILTIN_OP_STEP_COMP_AGGREGATE_WRITE_UNPACKED; + /* If we pack/unpack - we care more about the count than the length */ + step->unpack_count = params->recv.count; } /* Choose a completion criteria to be checked by the incoming AM handler */ diff --git a/builtin/ops/builtin_ops.h b/builtin/ops/builtin_ops.h index 83c77be..7eb5e7f 100644 --- a/builtin/ops/builtin_ops.h +++ b/builtin/ops/builtin_ops.h @@ -11,6 +11,8 @@ #endif #include "../plan/builtin_plan.h" + +#include #include #include @@ -97,6 +99,7 @@ enum ucg_builtin_op_step_comp_aggregate { /* Aggregation of short (Active-)messages */ UCG_BUILTIN_OP_STEP_COMP_AGGREGATE_WRITE, + UCG_BUILTIN_OP_STEP_COMP_AGGREGATE_WRITE_UNPACKED, UCG_BUILTIN_OP_STEP_COMP_AGGREGATE_GATHER_TERMINAL, UCG_BUILTIN_OP_STEP_COMP_AGGREGATE_GATHER_WAYPOINT, UCG_BUILTIN_OP_STEP_COMP_AGGREGATE_REDUCE_SINGLE, @@ -160,7 +163,10 @@ typedef struct ucg_builtin_op_step { ucg_builtin_plan_phase_t *phase; int8_t *send_buffer; - size_t buffer_length; + union { + size_t buffer_length; + uint64_t unpack_count; + }; ucg_builtin_header_t am_header; uct_iface_h uct_iface; @@ -187,6 +193,8 @@ typedef struct ucg_builtin_op_step { uct_pack_callback_t pack_full_cb; uct_pack_callback_t pack_part_cb; uct_pack_callback_t pack_single_cb; + ucp_dt_state_t pack_state; + ucp_datatype_t datatype; } bcopy; struct { uct_mem_h memh; /* Data buffer memory handle */ @@ -307,6 +315,9 @@ UCG_BUILTIN_PACKER_DECLARE(_reducing_, part); UCG_BUILTIN_PACKER_DECLARE(_variadic_, single); UCG_BUILTIN_PACKER_DECLARE(_variadic_, full); UCG_BUILTIN_PACKER_DECLARE(_variadic_, part); +UCG_BUILTIN_PACKER_DECLARE(_datatype_, single); +UCG_BUILTIN_PACKER_DECLARE(_datatype_, full); +UCG_BUILTIN_PACKER_DECLARE(_datatype_, part); UCG_BUILTIN_PACKER_DECLARE(_atomic_single_, 8); UCG_BUILTIN_PACKER_DECLARE(_atomic_single_, 16); UCG_BUILTIN_PACKER_DECLARE(_atomic_single_, 32); diff --git a/builtin/ops/builtin_pack.c b/builtin/ops/builtin_pack.c index 1c80f66..9b4cf5d 100644 --- a/builtin/ops/builtin_pack.c +++ b/builtin/ops/builtin_pack.c @@ -93,7 +93,6 @@ UCG_BUILTIN_VARIADIC_PACK_CB(step->iter_offset, step->buffer_length - uint##_integer_bits##_t *ptr = (uint##_integer_bits##_t *)(header + 1); \ ucs_atomic_add##_integer_bits (ptr, \ *(uint##_integer_bits##_t *)step->send_buffer); \ -printf("UCG_BUILTIN_ATOMIC_SINGLE_PACK_CB\n");\ return sizeof(uint##_integer_bits##_t); \ } @@ -110,7 +109,6 @@ printf("UCG_BUILTIN_ATOMIC_SINGLE_PACK_CB\n");\ ucs_atomic_add##_integer_bits (ptr, \ *(uint##_integer_bits##_t *)step->send_buffer); \ } \ -printf("UCG_BUILTIN_ATOMIC_MULTIPLE_PACK_CB\n");\ return length; \ } @@ -137,3 +135,27 @@ UCG_BUILTIN_ATOMIC_SINGLE_PACK_CB(64) UCG_BUILTIN_PACKER_DECLARE(_atomic_multiple_, 64) UCG_BUILTIN_ATOMIC_MULTIPLE_PACK_CB(64) + +#define UCG_BUILTIN_DATATYPE_PACK_CB(_offset, _length) { \ + ucg_builtin_header_t *header = (ucg_builtin_header_t*)dest; \ + ucg_builtin_request_t *req = (ucg_builtin_request_t*)arg; \ + ucg_builtin_op_step_t *step = req->step; \ + size_t buffer_length = (_length); \ + header->header = step->am_header.header; \ + ucs_assert(((uintptr_t)arg & UCT_PACK_CALLBACK_REDUCE) == 0); \ + /* Note: worker is NULL since it isn't required for host-based memory */ \ + return sizeof(*header) + ucp_dt_pack(NULL, step->bcopy.datatype, \ + UCS_MEMORY_TYPE_HOST, header + 1, \ + step->send_buffer + (_offset), \ + &step->bcopy.pack_state, \ + buffer_length); \ +} + +UCG_BUILTIN_PACKER_DECLARE(_datatype_, single) +UCG_BUILTIN_DATATYPE_PACK_CB(0, step->buffer_length) + +UCG_BUILTIN_PACKER_DECLARE(_datatype_, full) +UCG_BUILTIN_DATATYPE_PACK_CB(step->iter_offset, step->fragment_length) + +UCG_BUILTIN_PACKER_DECLARE(_datatype_, part) +UCG_BUILTIN_DATATYPE_PACK_CB(step->iter_offset, step->buffer_length - step->iter_offset)