From 9aa43b4098fc11c84a58581a6638b4c29027829a Mon Sep 17 00:00:00 2001 From: Alex Margolin Date: Fri, 23 Oct 2020 14:31:41 +0300 Subject: [PATCH] UCG: datatype-related API changes to reduce latency Signed-off-by: Alex Margolin --- api/ucg.h | 124 +++---- api/ucg_mpi.h | 235 +++++------- api/ucg_plan_component.h | 5 +- base/ucg_context.c | 8 +- base/ucg_group.c | 222 +++--------- base/ucg_group.h | 1 - builtin/builtin.c | 123 +++---- builtin/ops/builtin_comp_step.inl | 268 ++++++++------ builtin/ops/builtin_control.c | 579 +++++++++++++++++------------- builtin/ops/builtin_data.c | 24 +- builtin/ops/builtin_ops.h | 88 +++-- builtin/ops/builtin_pack.c | 26 +- builtin/plan/builtin_bruck.c | 2 +- builtin/plan/builtin_tree.c | 20 +- 14 files changed, 839 insertions(+), 886 deletions(-) diff --git a/api/ucg.h b/api/ucg.h index a7a37ba..8a94a50 100644 --- a/api/ucg.h +++ b/api/ucg.h @@ -66,8 +66,8 @@ enum ucg_params_field { UCG_PARAM_FIELD_JOB_UID = UCS_BIT(0), /**< Unique ID for this job */ UCG_PARAM_FIELD_ADDRESS_CB = UCS_BIT(1), /**< Peer address lookup */ UCG_PARAM_FIELD_NEIGHBORS_CB = UCS_BIT(2), /**< Neighborhood info */ - UCG_PARAM_FIELD_REDUCE_CB = UCS_BIT(3), /**< Callback for reduce ops */ - UCG_PARAM_FIELD_TYPE_INFO_CB = UCS_BIT(4), /**< Operation/datatype info */ + UCG_PARAM_FIELD_DATATYPE_CB = UCS_BIT(3), /**< Callback for datatypes */ + UCG_PARAM_FIELD_REDUCE_OP_CB = UCS_BIT(4), /**< Callback for reduce ops */ UCG_PARAM_FIELD_MPI_IN_PLACE = UCS_BIT(5), /**< MPI_IN_PLACE value */ UCG_PARAM_FIELD_HANDLE_FAULT = UCS_BIT(6) /**< Fault-tolerance support */ }; @@ -124,36 +124,39 @@ typedef struct ucg_params { ucg_group_member_index_t *out); } neighbors; - /* - * To support any type of reduction for an MPI implementation, this callback - * function can be called (when a new message arrives) to reduce the data - * into a buffer (which already contains a partial result). Below are some - * additional functions to detect the type of reduction, so that simple - * reductions (e.g. sum on integers) doesn't require using this callback. - */ - void (*reduce_cb_f)(void *reduce_op, - char *src, - char *dst, - unsigned count, - void *datatype); - - /* Basic MPI Operation type and data-type information - using callbacks */ + /* Information about datatypes */ struct { - /* Convert the opaque data-type into UCX's structure */ - int (*datatype_convert)(void *datatype, ucp_datatype_t **ucp_datatype); - - /* Check to determine if an MPI data-type is an integer (of any length) */ - int (*datatype_is_integer_f)(void *datatype); + /* Convert the opaque data-type into UCX's structure (should return 0) */ + int (*convert)(void *datatype, ucp_datatype_t *ucp_datatype); - /* Check to determine if an MPI data-type is a floating-point (of any length) */ - int (*datatype_is_floating_point_f)(void *datatype); + /* Check if the data-type is an integer (of any length) */ + int (*is_integer_f)(void *datatype, int *is_signed); - /* Check to determine if an MPI reduction operation is MPI_SUM */ - int (*reduce_op_is_sum_f)(void *reduce_op); + /* Check if the data-type is a floating-point (of any length) */ + int (*is_floating_point_f)(void *datatype); + } datatype; - /* Check to determine if an MPI reduction operation is MPI_MINLOC/MAXLOC */ - int (*reduce_op_is_loc_f)(void *reduce_op); - } type_info; + /* Information about reduction operations */ + struct { + /* + * To support any type of reduction for an MPI implementation, this callback + * function can be called (when a new message arrives) to reduce the data + * into a buffer (which already contains a partial result). Below are some + * additional functions to detect the type of reduction, so that simple + * reductions (e.g. sum on integers) doesn't require using this callback. + */ + int (*reduce_cb_f)(void *reduce_op, char *src, char *dst, + unsigned count, void *datatype); + + /* Check if the reduction operation is a summation (e.g. MPI_SUM) */ + int (*is_sum_f)(void *reduce_op); + + /* Check if the reduction also expects a location (e.g. MPI_MINLOC) */ + int (*is_loc_expected_f)(void *reduce_op); + + /* Check if the reduction operation is commutative (e.g. MPI_MINLOC) */ + int (*is_commutative_f)(void *reduce_op); + } reduce_op; /* The value of MPI_IN_PLACE, which can replace send or receive buffers */ void* mpi_in_place; @@ -207,8 +210,6 @@ enum ucg_collective_modifiers { UCG_GROUP_COLLECTIVE_MODIFIER_SYMMETRIC = UCS_BIT(11), /* persistent on all ranks */ UCG_GROUP_COLLECTIVE_MODIFIER_BARRIER = UCS_BIT(12), /* prevent others from starting */ UCG_GROUP_COLLECTIVE_MODIFIER_MOCK_EPS = UCS_BIT(13), /* information gathering only */ - UCG_GROUP_COLLECTIVE_MODIFIER_IGNORE_SEND = UCS_BIT(14), /* recv-only optimization */ - UCG_GROUP_COLLECTIVE_MODIFIER_IGNORE_RECV = UCS_BIT(15) /* send-only optimization */ }; /** @@ -321,53 +322,36 @@ typedef struct ucg_group_params { * are accessed during run-time. */ typedef struct ucg_collective { - ucg_collective_type_t type; /**< type and root of the collective */ - ucg_collective_callback_t comp_cb; /**< collective completion callback */ - /* - * Note: this callback will be called after UCG has finished all the work - * in the scope of this collective, the outcome is available in the - * output buffers (if applicable), and the user's request structure - * contains the resulting UCG status code. However, the "COMPLETED" - * flag in the user's request structure will only be set after this - * callback function has returned (if not NULL), so the user should - * not rely on the "flags" field in the request structure inside the - * callback context. - */ - struct { - void *buffer; /**< buffer location to use */ union { - int64_t count; /**< item count (not int - to consume space) */ - const int *counts; /**< item count array */ - }; - union { - size_t dt_len; /**< external datatype length */ - size_t *dts_len; /**< external datatype length array */ + /* only in "send" (see @ref UCG_PARAM_TYPE ) */ + ucg_collective_type_t type; /**< type and root of the collective */ + + /* only in "recv" (see @ref UCG_PARAM_OP , @ref UCG_PARAM_DISPLS ) */ + void *op; /**< external reduce operation handle */ + const int *displs; /**< item displacement array */ }; + void *buffer; /**< buffer location to use */ union { - void *dt_ext; /**< external datatype context */ - void *dts_ext; /**< external datatype context array */ + int64_t count; /**< item count (not int - for OSHMEM) */ + const int *counts; /**< item count array */ }; union { - size_t stride; /**< item stride */ - const int *displs; /**< item displacement array */ - void *op_ext; /**< external reduce operation handle */ + void *dtype; /**< external data-type context */ + void *dtypes; /**< external data-type context array */ + /* + * Note: if UCG_PARAM_FIELD_DATATYPE_CB is not passed during UCG + * initialization, UCG will assume that dtype is already a + * UCP datatype (will perform static cast to ucp_datatpe_t) + * and dtypes points to an array of such UCP datatpes. + */ }; - uint64_t reserved; /**< unused except padding, must be zero */ } send, recv; - - /* - * Below is an optimization: only for "recv-only" collectives, where @ref - * UCG_GROUP_COLLECTIVE_MODIFIER_IGNORE_SEND is set, the fields below - * override the value of the original tyep and completion collback. If the - * flag is not set - both must be set to zero. - */ - struct { - ucg_collective_type_t type; - ucg_collective_callback_t comp_cb; - } recv_only; } UCS_S_PACKED UCS_V_ALIGNED(64) ucg_collective_params_t; -/* Note: UCS_SYS_CACHE_LINE_SIZE is not visible here, so 64 is a good guess */ + +#define UCG_PARAM_TYPE(_params) (_params)->send.type +#define UCG_PARAM_OP(_params) (_params)->recv.op +#define UCG_PARAM_DISPLS(_params) (_params)->recv.displs /** * @ingroup UCG_GROUP @@ -390,8 +374,8 @@ enum ucg_request_common_flags { * are accessed during run-time. */ typedef struct ucg_request { - volatile uint32_t flags; /**< @ref ucg_request_common_flags */ - volatile ucs_status_t status; /**< Operation status */ + volatile uint32_t flags; /**< @ref ucg_request_common_flags */ + ucs_status_t status; /**< Operation status */ } ucg_request_t; diff --git a/api/ucg_mpi.h b/api/ucg_mpi.h index 412d8a1..fcad8cb 100644 --- a/api/ucg_mpi.h +++ b/api/ucg_mpi.h @@ -37,9 +37,7 @@ enum ucg_predefined { static uint16_t ucg_predefined_modifiers[] = { [UCG_PRIMITIVE_BARRIER] = UCG_GROUP_COLLECTIVE_MODIFIER_AGGREGATE | UCG_GROUP_COLLECTIVE_MODIFIER_BROADCAST | - UCG_GROUP_COLLECTIVE_MODIFIER_BARRIER | - UCG_GROUP_COLLECTIVE_MODIFIER_IGNORE_SEND | - UCG_GROUP_COLLECTIVE_MODIFIER_IGNORE_RECV, + UCG_GROUP_COLLECTIVE_MODIFIER_BARRIER, [UCG_PRIMITIVE_REDUCE] = UCG_GROUP_COLLECTIVE_MODIFIER_AGGREGATE | UCG_GROUP_COLLECTIVE_MODIFIER_SINGLE_DESTINATION, [UCG_PRIMITIVE_GATHER] = UCG_GROUP_COLLECTIVE_MODIFIER_CONCATENATE | @@ -69,175 +67,104 @@ static uint16_t ucg_predefined_modifiers[] = { UCG_GROUP_COLLECTIVE_MODIFIER_VARIADIC, }; -static UCS_F_ALWAYS_INLINE ucs_status_t ucg_coll_barrier_init( \ - ucg_collective_callback_t cb, ucg_group_h group, ucg_coll_h *coll_p) \ -{ \ - ucg_collective_params_t params; \ - params.type.modifiers = ucg_predefined_modifiers[UCG_PRIMITIVE_BARRIER]; \ - params.recv_only.type.modifiers = params.type.modifiers; \ - params.recv_only.type.root = 0; \ - params.recv_only.comp_cb = cb; \ - params.send.dt_len = 0; \ - return ucg_collective_create(group, ¶ms, coll_p); \ +static UCS_F_ALWAYS_INLINE ucs_status_t +ucg_coll_barrier_init(int ign, ucg_group_h group, ucg_coll_h *coll_p) +{ + ucg_collective_params_t params = {0}; + UCG_PARAM_TYPE(¶ms).modifiers = + ucg_predefined_modifiers[UCG_PRIMITIVE_BARRIER]; + + return ucg_collective_create(group, ¶ms, coll_p); } -#define UCG_COLL_PARAMS_BUF_R(_buf, _count, _dt_len, _dt_ext) \ - .buffer = _buf, \ - .count = _count, \ - .dt_len = _dt_len, \ - .dt_ext = _dt_ext, \ - .op_ext = op, \ - .reserved = 0 - -#define UCG_COLL_PARAMS_BUF_V(_buf, _counts, _dt_len, _dt_ext, _displs) \ - .buffer = _buf, \ - .counts = _counts, \ - .dt_len = _dt_len, \ - .dt_ext = _dt_ext, \ - .displs = _displs, \ - .reserved = 0 - -#define UCG_COLL_PARAMS_BUF_W(_buf, _counts, _dts_len, _dts_ext, _displs) \ - .buffer = _buf, \ - .counts = _counts, \ - .dts_len = _dts_len, \ - .dts_ext = _dts_ext, \ - .displs = _displs, \ - .reserved = 0 - -#define UCG_COLL_INIT_HALF(_lname, _uname, _non_root_only_sends, _stype, \ - _sargs, _rtype, _rargs, ...) \ +#define UCG_COLL_PARAMS_BUF_R(_buffer, _count, _dtype) \ + .buffer = _buffer, \ + .count = _count, \ + .dtype = _dtype + +#define UCG_COLL_PARAMS_BUF_O(_buffer, _count, _dtype) \ + UCG_COLL_PARAMS_BUF_R(_buffer, _count, _dtype), \ + .op = op + +#define UCG_COLL_PARAMS_BUF_V(_buffer, _counts, _dtype, _displs) \ + .buffer = _buffer, \ + .counts = _counts, \ + .dtype = _dtype, \ + .displs = _displs + +#define UCG_COLL_PARAMS_BUF_W(_buffer, _counts, _dtypes, _displs) \ + .buffer = _buffer, \ + .counts = _counts, \ + .dtypes = _dtypes, \ + .displs = _displs + +#define UCG_COLL_INIT(_lname, _uname, _stype, _sargs, _rtype, _rargs,...)\ static UCS_F_ALWAYS_INLINE ucs_status_t ucg_coll_##_lname##_init(__VA_ARGS__, \ - ucg_collective_callback_t cb, void *op, ucg_group_member_index_t root, \ - unsigned modifiers, int is_root, ucg_group_h group, ucg_coll_h *coll_p)\ + void *op, ucg_group_member_index_t root, unsigned modifiers, \ + ucg_group_h group, ucg_coll_h *coll_p) \ { \ uint16_t md = modifiers | ucg_predefined_modifiers[UCG_PRIMITIVE_##_uname];\ - if (is_root) { \ - ucg_collective_params_t full_params = { \ + ucg_collective_params_t params = { \ + .send = { \ .type = { \ .modifiers = md, \ .root = root \ }, \ - .comp_cb = cb, \ - .send = { \ - UCG_COLL_PARAMS_BUF##_stype _sargs \ - }, \ - .recv = { \ - UCG_COLL_PARAMS_BUF##_rtype _rargs \ - }, \ - .recv_only = { { 0, 0 }, NULL } \ - }; \ - return ucg_collective_create(group, &full_params, coll_p); \ - } else { \ - if (_non_root_only_sends) { \ - md |= UCG_GROUP_COLLECTIVE_MODIFIER_IGNORE_RECV; \ - ucg_collective_params_t send_params = { \ - .type = { \ - .modifiers = md, \ - .root = root \ - }, \ - .comp_cb = cb, \ - .send = { \ - UCG_COLL_PARAMS_BUF##_stype _sargs \ - } \ - }; \ - return ucg_collective_create(group, &send_params, coll_p); \ - } else { \ - md |= UCG_GROUP_COLLECTIVE_MODIFIER_IGNORE_SEND; \ - ucg_collective_params_t recv_params = { \ - .type = { \ - .modifiers = md, \ - }, \ - .recv = { \ - UCG_COLL_PARAMS_BUF##_rtype _rargs \ - }, \ - .recv_only = { \ - .type = { \ - .modifiers = md, \ - .root = root \ - }, \ - .comp_cb = cb \ - } \ - }; \ - return ucg_collective_create(group, &recv_params, coll_p); \ - } \ - } \ -} - -#define UCG_COLL_INIT_FULL(_lname, _uname, _ignored, _stype, _sargs, _rtype, \ - _rargs, ...) \ -static UCS_F_ALWAYS_INLINE ucs_status_t ucg_coll_##_lname##_init(__VA_ARGS__, \ - ucg_collective_callback_t cb, void *op, ucg_group_member_index_t root, \ - unsigned modifiers, ucg_group_h group, ucg_coll_h *coll_p) \ -{ \ - uint16_t md = modifiers | ucg_predefined_modifiers[UCG_PRIMITIVE_##_uname];\ - ucg_collective_params_t params = { \ - .type = { \ - .modifiers = md, \ - .root = root \ - }, \ - .comp_cb = cb, \ - .send = { \ UCG_COLL_PARAMS_BUF##_stype _sargs \ }, \ .recv = { \ UCG_COLL_PARAMS_BUF##_rtype _rargs \ }, \ - .recv_only = { { 0, 0 }, NULL } \ }; \ return ucg_collective_create(group, ¶ms, coll_p); \ } -#define UCG_COLL_INIT_FUNC_SR1_RR1(_lname, _uname, _mode, _treat_root) \ -UCG_COLL_INIT_##_mode(_lname, _uname, _treat_root, \ - _R, ((char*)sbuf, count, len_dtype, mpi_dtype), \ - _R, ( rbuf, count, len_dtype, mpi_dtype), \ - const void *sbuf, void *rbuf, int count, \ - size_t len_dtype, void *mpi_dtype) - -#define UCG_COLL_INIT_FUNC_SR1_RRN(_lname, _uname, _mode, _treat_root) \ -UCG_COLL_INIT_##_mode(_lname, _uname, _treat_root, \ - _R, ((char*)sbuf, scount, len_sdtype, mpi_sdtype), \ - _R, ( rbuf, rcount, len_rdtype, mpi_rdtype), \ - const void *sbuf, int scount, size_t len_sdtype, void *mpi_sdtype,\ - void *rbuf, int rcount, size_t len_rdtype, void *mpi_rdtype) - -#define UCG_COLL_INIT_FUNC_SR1_RVN(_lname, _uname, _mode, _treat_root) \ -UCG_COLL_INIT_##_mode(_lname, _uname, _treat_root, \ - _R, ((char*)sbuf, scount, len_sdtype, mpi_sdtype), \ - _V, ( rbuf, rcounts, len_rdtype, mpi_rdtype, rdispls), \ - const void *sbuf, int scount, size_t len_sdtype, void *mpi_sdtype, \ - void *rbuf, const int *rcounts, const int *rdispls, size_t len_rdtype, void *mpi_rdtype) - -#define UCG_COLL_INIT_FUNC_SVN_RR1(_lname, _uname, _mode, _treat_root) \ -UCG_COLL_INIT_##_mode(_lname, _uname, _treat_root, \ - _V, ((char*)sbuf, scounts, len_sdtype, mpi_sdtype, sdispls), \ - _R, ( rbuf, rcount, len_rdtype, mpi_rdtype), \ - const void *sbuf, const int *scounts, const int *sdispls, size_t len_sdtype, void *mpi_sdtype, \ - void *rbuf, int rcount, size_t len_rdtype, void *mpi_rdtype) - -#define UCG_COLL_INIT_FUNC_SWN_RWN(_lname, _uname, _mode, _treat_root) \ -UCG_COLL_INIT_##_mode(_lname, _uname, _treat_root, \ - _W, ((char*)sbuf, scounts, len_sdtypes, mpi_sdtypes, sdispls), \ - _W, ( rbuf, rcounts, len_rdtypes, mpi_rdtypes, rdispls), \ - const void *sbuf, int *scounts, int *sdispls, size_t *len_sdtypes, void **mpi_sdtypes, \ - void *rbuf, int *rcounts, int *rdispls, size_t *len_rdtypes, void **mpi_rdtypes) - -#define IDENT(x) ( x) -#define INVERT(x) (!x) - -UCG_COLL_INIT_FUNC_SR1_RR1(reduce, REDUCE, HALF, 1) -UCG_COLL_INIT_FUNC_SR1_RRN(gather, GATHER, HALF, 1) -UCG_COLL_INIT_FUNC_SR1_RVN(gatherv, GATHERV, HALF, 1) -UCG_COLL_INIT_FUNC_SR1_RR1(bcast, BCAST, HALF, 0) -UCG_COLL_INIT_FUNC_SR1_RRN(scatter, SCATTER, HALF, 0) -UCG_COLL_INIT_FUNC_SVN_RR1(scatterv, SCATTERV, HALF, 0) -UCG_COLL_INIT_FUNC_SR1_RR1(allreduce, ALLREDUCE, FULL, 0) -UCG_COLL_INIT_FUNC_SR1_RRN(alltoall, ALLTOALL, FULL, 0) -UCG_COLL_INIT_FUNC_SR1_RRN(allgather, ALLGATHER, FULL, 0) -UCG_COLL_INIT_FUNC_SR1_RVN(allgatherv, ALLGATHERV, FULL, 0) -UCG_COLL_INIT_FUNC_SWN_RWN(alltoallw, ALLTOALLW, FULL, 0) -UCG_COLL_INIT_FUNC_SWN_RWN(neighbor_alltoallw, NEIGHBOR_ALLTOALLW, FULL, 0) +#define UCG_COLL_INIT_FUNC_SR1_RR1(_lname, _uname) \ +UCG_COLL_INIT(_lname, _uname, \ + _R, ((char*)sbuf, count, mpi_dtype), \ + _O, ( rbuf, count, mpi_dtype), \ + const void *sbuf, void *rbuf, int count, void *mpi_dtype) + +#define UCG_COLL_INIT_FUNC_SR1_RRN(_lname, _uname) \ +UCG_COLL_INIT(_lname, _uname, \ + _R, ((char*)sbuf, scount, mpi_sdtype), \ + _O, ( rbuf, rcount, mpi_rdtype), \ + const void *sbuf, int scount, void *mpi_sdtype,\ + void *rbuf, int rcount, void *mpi_rdtype) + +#define UCG_COLL_INIT_FUNC_SR1_RVN(_lname, _uname) \ +UCG_COLL_INIT(_lname, _uname, \ + _R, ((char*)sbuf, scount, mpi_sdtype), \ + _V, ( rbuf, rcounts, mpi_rdtype, rdispls), \ + const void *sbuf, int scount, void *mpi_sdtype, \ + void *rbuf, const int *rcounts, const int *rdispls, void *mpi_rdtype) + +#define UCG_COLL_INIT_FUNC_SVN_RR1(_lname, _uname) \ +UCG_COLL_INIT(_lname, _uname, \ + _V, ((char*)sbuf, scounts, mpi_sdtype, sdispls), \ + _R, ( rbuf, rcount, mpi_rdtype), \ + const void *sbuf, const int *scounts, const int *sdispls, void *mpi_sdtype, \ + void *rbuf, int rcount, void *mpi_rdtype) + +#define UCG_COLL_INIT_FUNC_SWN_RWN(_lname, _uname) \ +UCG_COLL_INIT(_lname, _uname, \ + _W, ((char*)sbuf, scounts, mpi_sdtypes, sdispls), \ + _W, ( rbuf, rcounts, mpi_rdtypes, rdispls), \ + const void *sbuf, int *scounts, int *sdispls, void **mpi_sdtypes, \ + void *rbuf, int *rcounts, int *rdispls, void **mpi_rdtypes) + +UCG_COLL_INIT_FUNC_SR1_RR1(reduce, REDUCE) +UCG_COLL_INIT_FUNC_SR1_RRN(gather, GATHER) +UCG_COLL_INIT_FUNC_SR1_RVN(gatherv, GATHERV) +UCG_COLL_INIT_FUNC_SR1_RR1(bcast, BCAST) +UCG_COLL_INIT_FUNC_SR1_RRN(scatter, SCATTER) +UCG_COLL_INIT_FUNC_SVN_RR1(scatterv, SCATTERV) +UCG_COLL_INIT_FUNC_SR1_RR1(allreduce, ALLREDUCE) +UCG_COLL_INIT_FUNC_SR1_RRN(alltoall, ALLTOALL) +UCG_COLL_INIT_FUNC_SR1_RRN(allgather, ALLGATHER) +UCG_COLL_INIT_FUNC_SR1_RVN(allgatherv, ALLGATHERV) +UCG_COLL_INIT_FUNC_SWN_RWN(alltoallw, ALLTOALLW) +UCG_COLL_INIT_FUNC_SWN_RWN(neighbor_alltoallw, NEIGHBOR_ALLTOALLW) END_C_DECLS diff --git a/api/ucg_plan_component.h b/api/ucg_plan_component.h index b673af1..b1b2da1 100644 --- a/api/ucg_plan_component.h +++ b/api/ucg_plan_component.h @@ -132,13 +132,12 @@ struct ucg_op { ucg_plan_t *plan; /**< The group this belongs to */ - uint8_t padding[24]; - ucg_collective_params_t params; /**< original parameters for it */ + /* Note: the params field must be 64-byte-aligned */ /* Component-specific request content */ char priv[0]; -} UCS_V_ALIGNED(64); /* so that params struct is aligned correctly */ +}; struct ucg_plan_component { const char name[UCG_PLAN_COMPONENT_NAME_MAX]; /**< Component name */ diff --git a/base/ucg_context.c b/base/ucg_context.c index 03c64bb..9262acf 100644 --- a/base/ucg_context.c +++ b/base/ucg_context.c @@ -240,14 +240,14 @@ static void ucg_context_copy_used_ucg_params(ucg_params_t *dst, break; case UCG_PARAM_FIELD_NEIGHBORS_CB: - ucg_params_size = ucs_offsetof(ucg_params_t, reduce_cb_f); + ucg_params_size = ucs_offsetof(ucg_params_t, datatype); break; - case UCG_PARAM_FIELD_REDUCE_CB: - ucg_params_size = ucs_offsetof(ucg_params_t, type_info); + case UCG_PARAM_FIELD_DATATYPE_CB: + ucg_params_size = ucs_offsetof(ucg_params_t, reduce_op); break; - case UCG_PARAM_FIELD_TYPE_INFO_CB: + case UCG_PARAM_FIELD_REDUCE_OP_CB: ucg_params_size = ucs_offsetof(ucg_params_t, mpi_in_place); break; diff --git a/base/ucg_group.c b/base/ucg_group.c index d75df3f..5df7234 100644 --- a/base/ucg_group.c +++ b/base/ucg_group.c @@ -18,14 +18,6 @@ UCG_GROUP_PARAM_FIELD_MEMBER_INDEX |\ UCG_GROUP_PARAM_FIELD_CB_CONTEXT) -enum ucg_group_cmp_mode { - UCG_GROUP_CMP_MODE_FULL = 0, - UCG_GROUP_CMP_MODE_RECV_ONLY = UCG_GROUP_COLLECTIVE_MODIFIER_IGNORE_SEND >> 14, - UCG_GROUP_CMP_MODE_SEND_ONLY = UCG_GROUP_COLLECTIVE_MODIFIER_IGNORE_RECV >> 14, - UCG_GROUP_CMP_MODE_BARRIER = (UCG_GROUP_COLLECTIVE_MODIFIER_IGNORE_SEND | - UCG_GROUP_COLLECTIVE_MODIFIER_IGNORE_RECV) >> 14 -}; - #if ENABLE_STATS /** * UCG group statistics counters @@ -109,7 +101,7 @@ static inline ucs_status_t ucg_group_plan(ucg_group_h group, } UCS_PROFILE_CODE("ucg_plan") { - status = planner->component->plan(gctx, ¶ms->type, &plan); + status = planner->component->plan(gctx, &UCG_PARAM_TYPE(params), &plan); } if (ucs_unlikely(status != UCS_OK)) { return status; @@ -139,37 +131,6 @@ static inline ucs_status_t ucg_group_plan(ucg_group_h group, return UCS_OK; } -static ucs_status_t ucg_group_init_cache(ucg_group_h group) -{ - ucs_status_t status; - ucg_collective_params_t coll_params = { - .type = { - .modifiers = UCG_GROUP_COLLECTIVE_MODIFIER_AGGREGATE | - UCG_GROUP_COLLECTIVE_MODIFIER_BROADCAST | - UCG_GROUP_COLLECTIVE_MODIFIER_BARRIER | - UCG_GROUP_COLLECTIVE_MODIFIER_IGNORE_SEND | - UCG_GROUP_COLLECTIVE_MODIFIER_IGNORE_RECV, - .root = 0 - }, - .comp_cb = NULL - }; - - /* Clear the cache */ - group->cache_size = 0; - memset(group->cache, 0, sizeof(group->cache)); - - /* Create a plan for barrier, so that we can assume it exists in run-time */ - status = ucg_group_plan(group, &coll_params, &group->barrier_cache); - if (ucs_unlikely(status != UCS_OK)) { - return status; - } - - /* Also plan the non-barrier counter-part - so that fast-path runs right */ - coll_params.type.modifiers &= UCG_GROUP_CACHE_MODIFIER_MASK; - return ucg_group_plan(group, &coll_params, - &group->cache[coll_params.type.modifiers]); -} - ucs_status_t ucg_group_create(ucp_worker_h worker, const ucg_group_params_t *params, ucg_group_h *group_p) @@ -194,7 +155,7 @@ ucs_status_t ucg_group_create(ucp_worker_h worker, group->is_cache_cleanup_due = 0; group->context = ctx; group->worker = worker; - group->next_coll_id = 0; + group->next_coll_id = 1; group->iface_cnt = 0; #if ENABLE_MT @@ -227,11 +188,6 @@ ucs_status_t ucg_group_create(ucp_worker_h worker, goto cleanup_group; } - status = ucg_group_init_cache(group); - if (status != UCS_OK) { - goto cleanup_group; - } - status = UCS_STATS_NODE_ALLOC(&group->stats, &ucg_group_stats_class, worker->stats, "-%p", group); @@ -239,6 +195,10 @@ ucs_status_t ucg_group_create(ucp_worker_h worker, goto cleanup_group; } + /* Clear the cache */ + group->cache_size = 0; + memset(group->cache, 0, sizeof(group->cache)); + ucs_list_add_head(&ctx->groups_head, &group->list); *group_p = group; @@ -288,55 +248,6 @@ void ucg_request_cancel(ucg_group_h group, ucg_request_t *req) // TODO: implement } -static UCS_F_ALWAYS_INLINE int -ucg_group_is_matching_op(enum ucg_group_cmp_mode mode, - const ucg_collective_params_t *a, - const ucg_collective_params_t *b) -{ - size_t cmp_size; - size_t cmp_offset; - - switch (mode) { - case UCG_GROUP_CMP_MODE_FULL: - cmp_size = sizeof(ucg_collective_params_t); - cmp_offset = 0; - break; - - case UCG_GROUP_CMP_MODE_SEND_ONLY: - cmp_size = offsetof(ucg_collective_params_t, recv); - cmp_offset = 0; - break; - - case UCG_GROUP_CMP_MODE_RECV_ONLY: - cmp_size = sizeof(ucg_collective_params_t) - - offsetof(ucg_collective_params_t, recv); - cmp_offset = offsetof(ucg_collective_params_t, recv); - break; - - case UCG_GROUP_CMP_MODE_BARRIER: - /* Barrier only - has a different solution... will never get here */ - ucs_assert_always(0); - break; - } - - ucs_assert((mode == UCG_GROUP_CMP_MODE_RECV_ONLY) || - ((a->recv_only.type.modifiers == 0) && - (b->recv_only.type.modifiers == 0) && - (a->recv_only.type.root == 0) && - (b->recv_only.type.root == 0) && - (a->recv_only.comp_cb == 0) && - (b->recv_only.comp_cb == 0))); - - ucs_assert(((uintptr_t)a % UCS_SYS_CACHE_LINE_SIZE) == 0); - ucs_assert(((uintptr_t)b % UCS_SYS_CACHE_LINE_SIZE) == 0); - ucs_assert((cmp_size % UCS_SYS_CACHE_LINE_SIZE) == 0); - ucs_assert((cmp_offset % UCS_SYS_CACHE_LINE_SIZE) == 0); - - return (0 == memcmp((uint8_t*)a + cmp_offset, - (uint8_t*)b + cmp_offset, - cmp_size)); -} - UCS_PROFILE_FUNC(ucs_status_t, ucg_collective_create, (group, params, coll), ucg_group_h group, const ucg_collective_params_t *params, ucg_coll_h *coll) @@ -344,76 +255,34 @@ UCS_PROFILE_FUNC(ucs_status_t, ucg_collective_create, ucg_op_t *op; ucs_status_t status; - uint16_t modifiers = params->type.modifiers; - unsigned coll_mask = modifiers & UCG_GROUP_CACHE_MODIFIER_MASK; - ucg_plan_t *plan = group->cache[coll_mask]; - unsigned cmp_bit_offset = ucs_count_trailing_zero_bits( - UCG_GROUP_COLLECTIVE_MODIFIER_IGNORE_SEND); - enum ucg_group_cmp_mode cmp_mode = (enum ucg_group_cmp_mode)(modifiers >> - cmp_bit_offset); - + uint16_t modifiers = UCG_PARAM_TYPE(params).modifiers; + unsigned coll_mask = modifiers & UCG_GROUP_CACHE_MODIFIER_MASK; + ucg_plan_t *plan = group->cache[coll_mask]; /* check the recycling/cache for this collective */ if (ucs_likely(plan != NULL)) { UCG_GROUP_THREAD_CS_ENTER(plan) - switch (cmp_mode) { - case UCG_GROUP_CMP_MODE_FULL: - ucs_list_for_each(op, &plan->op_head, list) { - if (ucg_group_is_matching_op(UCG_GROUP_CMP_MODE_FULL, - &op->params, params)) { - ucs_list_del(&op->list); - UCG_GROUP_THREAD_CS_EXIT(plan); - status = UCS_OK; - goto op_found; - } - } - break; - - case UCG_GROUP_CMP_MODE_SEND_ONLY: - ucs_list_for_each(op, &plan->op_head, list) { - if (ucg_group_is_matching_op(UCG_GROUP_CMP_MODE_SEND_ONLY, - &op->params, params)) { - ucs_list_del(&op->list); - UCG_GROUP_THREAD_CS_EXIT(plan); - status = UCS_OK; - goto op_found; - } - } - break; - - case UCG_GROUP_CMP_MODE_RECV_ONLY: - ucs_list_for_each(op, &plan->op_head, list) { - if (ucg_group_is_matching_op(UCG_GROUP_CMP_MODE_RECV_ONLY, - &op->params, params)) { - ucs_list_del(&op->list); - UCG_GROUP_THREAD_CS_EXIT(plan); - status = UCS_OK; - goto op_found; - } - } - break; + ucs_list_for_each(op, &plan->op_head, list) { + /* we only need to compare the first 64 bytes of each set of cached + * parameters against the given one (checked during compile time) */ + UCS_STATIC_ASSERT(sizeof(ucg_collective_params_t) == + UCS_SYS_CACHE_LINE_SIZE); - case UCG_GROUP_CMP_MODE_BARRIER: - /* No need to look for a match - barrier is pre-calculated */ - plan = group->barrier_cache; - if (ucs_likely(!ucs_list_is_empty(&plan->op_head))) { - op = ucs_list_extract_head(&plan->op_head, ucg_op_t, list); + if (ucs_cpu_cache_line_is_equal(params, &op->params)) { + ucs_list_del(&op->list); UCG_GROUP_THREAD_CS_EXIT(plan); status = UCS_OK; goto op_found; } - break; } - UCG_GROUP_THREAD_CS_EXIT(plan); - UCS_STATS_UPDATE_COUNTER(group->stats, UCG_GROUP_STAT_PLANS_USED, 1); } else { - ucs_assert((modifiers & UCG_GROUP_COLLECTIVE_MODIFIER_BARRIER) == 0); ucs_trace_req("ucg_collective_create PLAN: type=%x root=%"PRIx64, - params->type.modifiers, (uint64_t)params->type.root); + (unsigned)UCG_PARAM_TYPE(params).modifiers, + (uint64_t)UCG_PARAM_TYPE(params).root); /* create the actual plan for the collective operation */ status = ucg_group_plan(group, params, &plan); @@ -421,6 +290,8 @@ UCS_PROFILE_FUNC(ucs_status_t, ucg_collective_create, goto out; } + UCG_GROUP_THREAD_CS_ENTER(plan); + group->cache[coll_mask] = plan; UCS_STATS_UPDATE_COUNTER(group->stats, UCG_GROUP_STAT_PLANS_CREATED, 1); @@ -431,22 +302,22 @@ UCS_PROFILE_FUNC(ucs_status_t, ucg_collective_create, UCS_PROFILE_CODE("ucg_prepare") { status = plan->planner->component->prepare(plan, params, &op); } + + UCG_GROUP_THREAD_CS_EXIT(plan); + if (status != UCS_OK) { goto out; } ucs_trace_req("ucg_collective_create OP: planner=%s(%s) " - "params={type=%u, root=%lu, send=[%p,%lu,%lu,%p,%p], " - "recv=[%p,%lu,%lu,%p,%p], cb=%p(%p), op=%p}", + "params={type=%u, root=%lu, send=[%p,%lu,%p], " + "recv=[%p,%lu,%p], op/displs=%p}", plan->planner->name, plan->planner->component->name, - (uint16_t)params->type.modifiers, - (uint64_t)params->type.root, params->send.buffer, - params->send.count, params->send.dt_len, - params->send.dt_ext, params->send.displs, - params->recv.buffer, params->recv.count, params->recv.dt_len, - params->recv.dt_ext, params->recv.displs, - params->comp_cb, params->recv_only.comp_cb, - params->recv.op_ext); + (uint16_t)UCG_PARAM_TYPE(params).modifiers, + (uint64_t)UCG_PARAM_TYPE(params).root, + params->send.buffer, params->send.count, params->send.dtype, + params->recv.buffer, params->recv.count, params->recv.dtype, + UCG_PARAM_OP(params)); if (ucs_unlikely(++group->cache_size > group->context->config.group_cache_size_thresh)) { @@ -457,8 +328,6 @@ UCS_PROFILE_FUNC(ucs_status_t, ucg_collective_create, *coll = op; out: - UCG_GROUP_THREAD_CS_EXIT(group) - return status; } @@ -469,7 +338,7 @@ ucg_collective_trigger(ucg_group_h group, ucg_op_t *op, ucg_request_t *req) /* Start the first step of the collective operation */ UCS_PROFILE_CODE("ucg_trigger") { - ret = op->trigger_f(op, ++group->next_coll_id, req); + ret = op->trigger_f(op, group->next_coll_id++, req); } if (ret != UCS_INPROGRESS) { @@ -488,27 +357,28 @@ ucs_status_t ucg_collective_acquire_barrier(ucg_group_h group) ucs_status_t ucg_collective_release_barrier(ucg_group_h group) { + ucs_status_t status; + ucs_assert(group->is_barrier_outstanding == 1); group->is_barrier_outstanding = 0; UCG_GROUP_THREAD_CS_ENTER(group) - if (ucs_queue_is_empty(&group->pending)) { - return UCS_OK; + if (!ucs_queue_is_empty(&group->pending)) { + do { + /* Start the next pending operation */ + ucg_op_t *op = (ucg_op_t*)ucs_queue_pull_non_empty(&group->pending); + status = ucg_collective_trigger(group, op, op->pending_req); + } while ((!ucs_queue_is_empty(&group->pending)) && + (!group->is_barrier_outstanding) && + (status == UCS_OK)); + } else { + status = UCS_OK; } - ucs_status_t ret; - do { - /* Start the next pending operation */ - ucg_op_t *op = (ucg_op_t*)ucs_queue_pull_non_empty(&group->pending); - ret = ucg_collective_trigger(group, op, op->pending_req); - } while ((!ucs_queue_is_empty(&group->pending)) && - (!group->is_barrier_outstanding) && - (ret == UCS_OK)); - UCG_GROUP_THREAD_CS_EXIT(group) - return ret; + return status; } UCS_PROFILE_FUNC(ucs_status_t, ucg_collective_start, (coll, req), @@ -546,9 +416,9 @@ void ucg_collective_destroy(ucg_coll_h coll) ucg_op_t *op = (ucg_op_t*)coll; ucg_plan_t *plan = op->plan; - ucs_recursive_spin_lock(&plan->lock); + UCG_GROUP_THREAD_CS_ENTER(plan); ucs_list_add_head(&plan->op_head, &op->list); - ucs_recursive_spin_unlock(&plan->lock); + UCG_GROUP_THREAD_CS_EXIT(plan); } diff --git a/base/ucg_group.h b/base/ucg_group.h index 344cd07..eb8ad66 100644 --- a/base/ucg_group.h +++ b/base/ucg_group.h @@ -53,7 +53,6 @@ typedef struct ucg_group { * collective types. */ unsigned cache_size; - ucg_plan_t *barrier_cache; ucg_plan_t *cache[UCG_GROUP_CACHE_MODIFIER_MASK]; /* Below this point - the private per-planner data is allocated/stored */ diff --git a/builtin/builtin.c b/builtin/builtin.c index cd697f9..0798a39 100644 --- a/builtin/builtin.c +++ b/builtin/builtin.c @@ -148,14 +148,15 @@ UCS_PROFILE_FUNC(ucs_status_t, ucg_builtin_am_handler, /* Find the slot to be used, based on the ID received in the header */ ucg_coll_id_t coll_id = header->msg.coll_id; ucg_builtin_comp_slot_t *slot = &gctx->slots[coll_id % UCG_BUILTIN_MAX_CONCURRENT_OPS]; - ucs_assert((slot->req.latest.coll_id != coll_id) || - (slot->req.latest.step_idx <= header->msg.step_idx)); + ucs_assert((slot->req.expecting.coll_id != coll_id) || + (slot->req.expecting.step_idx <= header->msg.step_idx)); /* Consume the message if it fits the current collective and step index */ - if (ucs_likely(header->msg.local_id == slot->req.latest.local_id)) { + if (ucs_likely(header->msg.local_id == slot->req.expecting.local_id)) { /* Make sure the packet indeed belongs to the collective currently on */ data = header + 1; length -= sizeof(ucg_builtin_header_t); + ucg_builtin_step_recv_cb(&slot->req, header->remote_offset, data, length); ucs_trace_req("ucg_builtin_am_handler CB: coll_id %u step_idx %u pending %u", @@ -164,9 +165,9 @@ UCS_PROFILE_FUNC(ucs_status_t, ucg_builtin_am_handler, } ucs_trace_req("ucg_builtin_am_handler STORE: group_id %u " - "coll_id %u(%u) step_idx %u slot_step_idx %u", - header->group_id, header->msg.coll_id, slot->req.latest.coll_id, - header->msg.step_idx, slot->req.latest.step_idx); + "coll_id %u expected_id %u step_idx %u expected_idx %u", + header->group_id, header->msg.coll_id, slot->req.expecting.coll_id, + header->msg.step_idx, slot->req.expecting.step_idx); #ifdef HAVE_UCT_COLLECTIVES /* In case of a stride - the stored length is actually longer */ @@ -287,22 +288,45 @@ static ucs_status_t ucg_builtin_create(ucg_plan_ctx_h pctx, for (i = 0; i < UCG_BUILTIN_MAX_CONCURRENT_OPS; i++) { ucg_builtin_comp_slot_t *slot = &gctx->slots[i]; ucs_ptr_array_init(&slot->messages, "builtin messages"); - slot->req.latest.local_id = 0; + slot->req.expecting.local_id = 0; } return UCS_OK; } +static void ucg_builtin_destroy_plan(ucg_builtin_plan_t *plan) +{ + ucs_list_link_t *op_head = &plan->super.op_head; + while (!ucs_list_is_empty(op_head)) { + ucg_builtin_op_discard(ucs_list_extract_head(op_head, ucg_op_t, list)); + } + +#if ENABLE_DEBUG_DATA || ENABLE_FAULT_TOLERANCE + ucg_step_idx_t i; + for (i = 0; i < plan->phs_cnt; i++) { + ucs_free(plan->phss[i].indexes); + } +#endif + +#if ENABLE_MT + ucs_recursive_spinlock_destroy(&plan->super.lock); +#endif + + ucs_mpool_cleanup(&plan->op_mp, 1); + ucs_free(plan); +} + static void ucg_builtin_destroy(ucg_group_ctx_h ctx) { /* Cleanup left-over messages and outstanding operations */ unsigned i, j; ucg_builtin_group_ctx_t *gctx = ctx; + for (i = 0; i < UCG_BUILTIN_MAX_CONCURRENT_OPS; i++) { ucg_builtin_comp_slot_t *slot = &gctx->slots[i]; - if (slot->req.latest.local_id != 0) { + if (slot->req.expecting.local_id != 0) { ucs_warn("Collective operation #%u has been left incomplete (Group #%u)", - gctx->slots[i].req.latest.coll_id, gctx->group_id); + gctx->slots[i].req.expecting.coll_id, gctx->group_id); } ucp_recv_desc_t *rdesc; @@ -324,28 +348,9 @@ static void ucg_builtin_destroy(ucg_group_ctx_h ctx) /* Cleanup plans created for this group */ while (!ucs_list_is_empty(&gctx->plan_head)) { - ucg_builtin_plan_t *plan = ucs_list_extract_head(&gctx->plan_head, - ucg_builtin_plan_t, - list); - - ucs_list_link_t *op_head = &plan->super.op_head; - while (!ucs_list_is_empty(op_head)) { - ucg_op_t *op = ucs_list_extract_head(op_head, ucg_op_t, list); - ucg_builtin_op_discard(op); - } - -#if ENABLE_DEBUG_DATA || ENABLE_FAULT_TOLERANCE - for (j = 0; j < plan->phs_cnt; j++) { - ucs_free(plan->phss[j].indexes); - } -#endif - -#if ENABLE_MT - ucs_recursive_spinlock_destroy(&plan->super.lock); -#endif - - ucs_mpool_cleanup(&plan->op_mp, 1); - ucs_free(plan); + ucg_builtin_destroy_plan(ucs_list_extract_head(&gctx->plan_head, + ucg_builtin_plan_t, + list)); } /* Remove the group from the global storage array */ @@ -465,9 +470,9 @@ static ucs_status_t ucg_builtin_plan(ucg_group_ctx_h ctx, size_t op_size = sizeof(ucg_builtin_op_t) + (plan->phs_cnt + 1) * sizeof(ucg_builtin_op_step_t); /* +1 is for key exchange in 0-copy cases, where an extra step is needed */ - status = ucs_mpool_init(&plan->op_mp, 0, op_size, 0, UCS_SYS_CACHE_LINE_SIZE, - 1, UINT_MAX, &ucg_builtin_plan_mpool_ops, - "ucg_builtin_plan_mp"); + status = ucs_mpool_init(&plan->op_mp, 0, op_size, offsetof(ucg_op_t, params), + UCS_SYS_CACHE_LINE_SIZE, 1, UINT_MAX, + &ucg_builtin_plan_mpool_ops, "ucg_builtin_plan_mp"); if (status != UCS_OK) { return status; } @@ -512,7 +517,7 @@ void ucg_builtin_print_flags(ucg_builtin_op_step_t *step) printf("\n\tFRAGMENTED:\t\t%i", flag); if (flag) { printf("\n\t - Fragment Length: %u", step->fragment_length); - printf("\n\t - Fragments Total: %u", step->fragments_total); + printf("\n\t - Fragments Total: %lu", step->fragments_total); } flag = ((step->flags & UCG_BUILTIN_OP_STEP_FLAG_LAST_STEP) != 0); @@ -530,16 +535,9 @@ void ucg_builtin_print_flags(ucg_builtin_op_step_t *step) flag = ((step->flags & UCG_BUILTIN_OP_STEP_FLAG_PACKED_DTYPE_MODE) != 0); printf("\n\tPACKED_DTYPE_MODE:\t%i", flag); if (flag) { - uct_coll_dtype_mode_t mode; - if (step->flags & UCG_BUILTIN_OP_STEP_FLAG_SEND_AM_SHORT) { - mode = UCT_COLL_DTYPE_MODE_UNPACK_MODE(buffer_length); - buffer_length = UCT_COLL_DTYPE_MODE_UNPACK_VALUE(buffer_length); - } else { - mode = UCT_COLL_DTYPE_MODE_UNPACK_MODE(step->uct_flags); - flag = UCT_SEND_FLAG_PACK_LOCK & - UCT_COLL_DTYPE_MODE_UNPACK_VALUE(step->uct_flags); - printf("\n\tUCT packing mutex requested:\t\t%i", flag); - } + uct_coll_dtype_mode_t mode = UCT_COLL_DTYPE_MODE_UNPACK_MODE(buffer_length); + buffer_length = UCT_COLL_DTYPE_MODE_UNPACK_VALUE(buffer_length); + ucs_assert(step->flags & UCG_BUILTIN_OP_STEP_FLAG_SEND_AM_SHORT); printf("\n\tDatatype mode:\t\t"); switch (mode) { @@ -575,28 +573,12 @@ void ucg_builtin_print_flags(ucg_builtin_op_step_t *step) printf("write"); break; - case UCG_BUILTIN_OP_STEP_COMP_AGGREGATE_WRITE_UNPACKED: - printf("write (unpacked)"); - break; - - case UCG_BUILTIN_OP_STEP_COMP_AGGREGATE_GATHER_TERMINAL: - printf("gather (terminal)"); + case UCG_BUILTIN_OP_STEP_COMP_AGGREGATE_GATHER: + printf("gather"); break; - case UCG_BUILTIN_OP_STEP_COMP_AGGREGATE_GATHER_WAYPOINT: - printf("gather (way-point)"); - break; - - case UCG_BUILTIN_OP_STEP_COMP_AGGREGATE_REDUCE_SINGLE: - printf("reduce (single)"); - break; - - case UCG_BUILTIN_OP_STEP_COMP_AGGREGATE_REDUCE_BATCHED: - printf("reduce (batched)"); - break; - - case UCG_BUILTIN_OP_STEP_COMP_AGGREGATE_REDUCE_FRAGMENT: - printf("reduce (fragment)"); + case UCG_BUILTIN_OP_STEP_COMP_AGGREGATE_REDUCE: + printf("reduce"); break; case UCG_BUILTIN_OP_STEP_COMP_AGGREGATE_REMOTE_KEY: @@ -759,11 +741,14 @@ static void ucg_builtin_print(ucg_plan_t *plan, int8_t *temp_buffer = NULL; ucg_builtin_op_init_cb_t init_cb = NULL; ucg_builtin_op_fini_cb_t fini_cb = NULL; + printf("Step #%i (actual index used: %u):", phase_idx, builtin_plan->phss[phase_idx].step_index); + status = ucg_builtin_step_create(builtin_plan, &builtin_plan->phss[phase_idx], &flags, coll_params, - &temp_buffer, &init_cb, &fini_cb, &step[0], &zcopy_step); + &temp_buffer, 1, 1, 1, &init_cb, &fini_cb, + &step[0], &zcopy_step); if (status != UCS_OK) { printf("failed to create, %s", ucs_status_string(status)); } @@ -784,13 +769,13 @@ static void ucg_builtin_print(ucg_plan_t *plan, ucg_builtin_print_fini_cb_name(fini_cb); } - ucg_builtin_step_select_packers(coll_params, &step[0]); + ucg_builtin_step_select_packers(coll_params, 1, 1, &step[0]); printf("\n\tPacker (if used):\t"); ucg_builtin_print_pack_cb_name(step[0].bcopy.pack_single_cb); ucg_builtin_print_flags(&step[0]); if (zcopy_step) { - ucg_builtin_step_select_packers(coll_params, &step[1]); + ucg_builtin_step_select_packers(coll_params, 1, 1, &step[1]); printf("\nExtra step - RMA operation:\n\tPacker (if used):\t"); ucg_builtin_print_pack_cb_name(step[1].bcopy.pack_single_cb); ucg_builtin_print_flags(&step[1]); @@ -840,6 +825,7 @@ ucs_status_t ucg_builtin_connect(ucg_builtin_group_ctx_t *ctx, mock_ep_attr.cap.flags = UCT_IFACE_FLAG_AM_SHORT; #endif mock_ep_attr.cap.am.max_short = SIZE_MAX; + phase->host_proc_cnt = 0; phase->iface_attr = &mock_ep_attr; phase->md = NULL; @@ -911,4 +897,3 @@ UCG_PLAN_COMPONENT_DEFINE(ucg_builtin_component, "builtin", ucg_builtin_op_trigger, ucg_builtin_op_discard, ucg_builtin_print, ucg_builtin_handle_fault, "BUILTIN_", ucg_builtin_config_table, ucg_builtin_config_t); - diff --git a/builtin/ops/builtin_comp_step.inl b/builtin/ops/builtin_comp_step.inl index 1b66c9c..dfdd1d1 100644 --- a/builtin/ops/builtin_comp_step.inl +++ b/builtin/ops/builtin_comp_step.inl @@ -8,37 +8,31 @@ void static UCS_F_ALWAYS_INLINE ucg_builtin_comp_last_step_cb(ucg_builtin_request_t *req, ucs_status_t status) { - /* Sanity checks */ + ucg_builtin_op_t *op = req->op; ucg_request_t *user_req = req->comp_req; - ucs_assert(user_req != NULL); /* Mark (per-group) slot as available */ - ucs_container_of(req, ucg_builtin_comp_slot_t, req)->req.latest.local_id = 0; - - /* Store request return value, in case the user-defined callback checks */ - user_req->status = status; + req->expecting.local_id = 0; /* Finalize the collective operation (including user-defined callback) */ - ucg_builtin_op_t *op = req->op; if (ucs_unlikely(op->fini_cb != NULL)) { /* * Note: the "COMPLETED" flag cannot be set until UCG's internal * callback finishes it's job. For example, barrier-release needs - * to happen before we give the all-clear to the user. + * to happen before we give the all-clear to the user. This also + * means the function needs to run regardless of the status. */ - op->fini_cb(op, user_req, status); - // TODO: always call fini_cb (possibly dummy), where MPI will pass a - // completion-callback to finish MPI_Request structures directly. + op->fini_cb(op); + } - /* Consider optimization, if this operation is used often enough */ - if (ucs_likely(status == UCS_OK) && ucs_unlikely(--op->opt_cnt == 0)) { - user_req->status = op->optm_cb(op); - } - // TODO: make sure optimizations are used - by setting fini_cb... + /* Consider optimization, if this operation is used often enough */ + if (ucs_likely(status == UCS_OK) && ucs_unlikely(--op->opt_cnt == 0)) { + status = op->optm_cb(op); } - /* Mark this request as complete */ - user_req->flags = UCP_REQUEST_FLAG_COMPLETED; + user_req->status = status; + user_req->flags = UCP_REQUEST_FLAG_COMPLETED; + ucs_assert(status != UCS_INPROGRESS); UCS_PROFILE_REQUEST_EVENT(user_req, "complete_coll", 0); ucs_trace_req("collective returning completed request=%p (status: %s)", @@ -87,59 +81,47 @@ ucg_builtin_comp_step_cb(ucg_builtin_request_t *req) } /* Start on the next step for this collective operation */ - ucg_builtin_op_step_t *next_step = ++req->step; + ucg_builtin_op_step_t *prev_step = req->step; + ucg_builtin_op_step_t *next_step = req->step = prev_step + 1; + next_step->am_header.msg.coll_id = prev_step->am_header.msg.coll_id; req->pending = next_step->fragments_total; - req->latest.step_idx = next_step->am_header.msg.step_idx; - next_step->am_header.msg.coll_id = req->latest.coll_id; - ucs_status_t status = ucg_builtin_step_execute(req); ucg_builtin_comp_ft_end_step(next_step - 1); - return status; + + return ucg_builtin_step_execute(req); } static void UCS_F_ALWAYS_INLINE ucg_builtin_mpi_reduce(void *mpi_op, void *src, void *dst, int dcount, void* mpi_datatype) { - UCS_PROFILE_CALL_VOID(ucg_global_params.reduce_cb_f, mpi_op, (char*)src, - (char*)dst, (unsigned)dcount, mpi_datatype); + UCS_PROFILE_CALL_VOID(ucg_global_params.reduce_op.reduce_cb_f, mpi_op, + (char*)src, (char*)dst, (unsigned)dcount, mpi_datatype); } static void UCS_F_ALWAYS_INLINE -ucg_builtin_mpi_reduce_single(uint8_t *dst, uint8_t *src, size_t length, - ucg_collective_params_t *params) +ucg_builtin_mpi_reduce_single(uint8_t *dst, uint8_t *src, + const ucg_collective_params_t *params) { - ucs_assert((params->recv.dt_len * params->recv.count) == length); - ucg_builtin_mpi_reduce(params->recv.op_ext, src, dst, - params->recv.count, - params->recv.dt_ext); + ucg_builtin_mpi_reduce(UCG_PARAM_OP(params), src, dst, + params->recv.count, params->recv.dtype); } static void UCS_F_ALWAYS_INLINE -ucg_builtin_mpi_reduce_fragment(uint8_t *dst, uint8_t *src, size_t length, - ucg_collective_params_t *params) +ucg_builtin_mpi_reduce_fragment(uint8_t *dst, uint8_t *src, + size_t length, size_t dtype_length, + const ucg_collective_params_t *params) { - ucs_assert((length % params->recv.dt_len) == 0); - ucg_builtin_mpi_reduce(params->recv.op_ext, src, dst, - length / params->recv.dt_len, - params->recv.dt_ext); + ucg_builtin_mpi_reduce(UCG_PARAM_OP(params), src, dst, + length / dtype_length, + params->recv.dtype); } static void UCS_F_ALWAYS_INLINE -ucg_builtin_comp_reduce_batched(ucg_builtin_request_t *req, uint64_t offset, - uint8_t *src, size_t size, size_t stride) -{ - uint8_t *dst = req->step->recv_buffer + offset; - unsigned index, count = req->step->batch_cnt; - for (index = 0; index < count; index++, src += stride) { - ucg_builtin_mpi_reduce_single(dst, src, size, &req->op->super.params); - } -} - -static void ucg_builtin_comp_gather(uint8_t *recv_buffer, uint8_t *data, - uint64_t offset, size_t size, size_t length, - ucg_group_member_index_t root) +ucg_builtin_comp_gather(uint8_t *recv_buffer, uint64_t offset, uint8_t *data, + size_t per_rank_length, size_t length, + ucg_group_member_index_t root) { - size_t my_offset = size * root; + size_t my_offset = per_rank_length * root; // TODO: fix... likely broken if ((offset > my_offset) || (offset + length <= my_offset)) { memcpy(recv_buffer + offset, data, length); @@ -154,22 +136,14 @@ static void ucg_builtin_comp_gather(uint8_t *recv_buffer, uint8_t *data, memcpy(recv_buffer + first_part + length, data, length - first_part); } -static UCS_F_ALWAYS_INLINE ucs_status_t -ucg_builtin_comp_unpack_rkey(ucg_builtin_request_t *req, uint64_t remote_addr, +static ucs_status_t UCS_F_ALWAYS_INLINE +ucg_builtin_comp_unpack_rkey(ucg_builtin_op_step_t *step, uint64_t remote_addr, uint8_t *packed_remote_key) { /* Unpack the information from the payload to feed the next (0-copy) step */ - ucg_builtin_op_step_t *step = req->step + 1; step->zcopy.raddr = remote_addr; - ucs_status_t status = uct_rkey_unpack(step->zcopy.cmpt, - packed_remote_key, - &step->zcopy.rkey); - if (ucs_unlikely(status != UCS_OK)) { - ucg_builtin_comp_last_step_cb(req, status); - } - - return status; + return uct_rkey_unpack(step->zcopy.cmpt, packed_remote_key, &step->zcopy.rkey); } static int UCS_F_ALWAYS_INLINE @@ -194,61 +168,152 @@ ucg_builtin_comp_send_check_frag_by_offset(ucg_builtin_request_t *req, } static void UCS_F_ALWAYS_INLINE -ucg_builtin_step_recv_handle_data(ucg_builtin_request_t *req, uint64_t offset, - void *data, size_t length) +ucg_builtin_comp_unpack(ucg_builtin_op_t *op, uint64_t offset, void *data, size_t length) { - size_t size; - ucg_builtin_op_step_t *step = req->step; + ucp_dt_generic_t *dt_gen = ucp_dt_to_generic(op->recv_dt); + dt_gen->ops.unpack(op->wstate.dt.generic.state, offset, data, + length / op->super.params.recv.count); +} - ucs_assert((length != 0) || (step->comp_aggregation == - UCG_BUILTIN_OP_STEP_COMP_AGGREGATE_NOP)); +static ucs_status_t UCS_F_ALWAYS_INLINE +ucg_builtin_step_recv_handle_chunk(enum ucg_builtin_op_step_comp_aggregation ag, + uint8_t *dst, uint8_t *src, size_t length, + size_t offset, int is_fragment, + int is_dt_packed, ucg_builtin_request_t *req) +{ + ucs_status_t status; - /* Act according to the requested data action */ - switch (step->comp_aggregation) { + switch (ag) { case UCG_BUILTIN_OP_STEP_COMP_AGGREGATE_NOP: + status = UCS_OK; break; - case UCG_BUILTIN_OP_STEP_COMP_AGGREGATE_WRITE: - memcpy(step->recv_buffer + offset, data, length); + case UCG_BUILTIN_OP_STEP_COMP_AGGREGATE_GATHER: + ucg_builtin_comp_gather(req->step->recv_buffer, offset, src, + req->step->buffer_length, length, + UCG_PARAM_TYPE(&req->op->super.params).root); + status = UCS_OK; break; - case UCG_BUILTIN_OP_STEP_COMP_AGGREGATE_WRITE_UNPACKED: - /* Note: worker is NULL since it isn't required for host-based memory */ - ucp_dt_unpack_only(NULL, step->recv_buffer + offset, step->unpack_count, - step->bcopy.datatype, UCS_MEMORY_TYPE_HOST, data, length, 0); + case UCG_BUILTIN_OP_STEP_COMP_AGGREGATE_WRITE: + if (is_dt_packed) { + ucg_builtin_comp_unpack(req->op, offset, src, length); + } else { + memcpy(dst, src, length); + } + status = UCS_OK; break; - case UCG_BUILTIN_OP_STEP_COMP_AGGREGATE_GATHER_TERMINAL: - ucg_builtin_comp_gather(step->recv_buffer, data, offset, - step->buffer_length, length, - req->op->super.params.type.root); - break; - case UCG_BUILTIN_OP_STEP_COMP_AGGREGATE_GATHER_WAYPOINT: - ucs_assert(offset == 0); - memcpy(step->recv_buffer + step->buffer_length, data, length); + case UCG_BUILTIN_OP_STEP_COMP_AGGREGATE_REDUCE: + if (is_fragment) { + ucg_builtin_mpi_reduce_fragment(dst, src, length, + req->step->dtype_length, + &req->op->super.params); + } else { + ucg_builtin_mpi_reduce_single(dst, src, &req->op->super.params); + } + status = UCS_OK; break; - case UCG_BUILTIN_OP_STEP_COMP_AGGREGATE_REDUCE_SINGLE: - ucg_builtin_mpi_reduce_single(step->recv_buffer + offset, data, - length, &req->op->super.params); + case UCG_BUILTIN_OP_STEP_COMP_AGGREGATE_REMOTE_KEY: + /* zero-copy prepares the key for the next step */ + ucs_assert(length == req->step->phase->md_attr->rkey_packed_size); + status = ucg_builtin_comp_unpack_rkey(req->step + 1, offset, src); break; + } - case UCG_BUILTIN_OP_STEP_COMP_AGGREGATE_REDUCE_BATCHED: - size = step->buffer_length; - ucg_builtin_comp_reduce_batched(req, offset, data, size, - length + sizeof(ucg_builtin_header_t)); - break; + return status; +} - case UCG_BUILTIN_OP_STEP_COMP_AGGREGATE_REDUCE_FRAGMENT: - ucg_builtin_mpi_reduce_fragment(step->recv_buffer + offset, data, - length, &req->op->super.params); +#define case_recv_full(aggregation, _is_batched, _is_fragmented, \ + _is_len_packed, _is_dt_packed) \ + case ((_is_batched ? UCG_BUILTIN_OP_STEP_COMP_FLAG_BATCHED_DATA : 0) |\ + (_is_fragmented ? UCG_BUILTIN_OP_STEP_COMP_FLAG_FRAGMENTED_DATA : 0) |\ + (_is_len_packed ? UCG_BUILTIN_OP_STEP_COMP_FLAG_PACKED_LENGTH : 0) |\ + (_is_dt_packed ? UCG_BUILTIN_OP_STEP_COMP_FLAG_PACKED_DATATYPE : 0)):\ + \ + if (_is_batched) { \ + uint8_t index; \ + size_t chunk_size; \ + if (_is_fragmented) { \ + if (_is_len_packed) { \ + chunk_size = \ + UCT_COLL_DTYPE_MODE_UNPACK_VALUE(step->fragment_length);\ + } else { \ + chunk_size = step->fragment_length; \ + } \ + } else { \ + if (_is_len_packed) { \ + chunk_size = \ + UCT_COLL_DTYPE_MODE_UNPACK_VALUE(step->buffer_length); \ + } else { \ + chunk_size = step->buffer_length; \ + } \ + } \ + length += sizeof(ucg_builtin_header_t); \ + for (index = 0; index < step->batch_cnt; index++, data += length) {\ + status = ucg_builtin_step_recv_handle_chunk(aggregation, \ + dest_buffer, data, \ + chunk_size, offset,\ + _is_fragmented, \ + _is_dt_packed, req);\ + if (ucs_unlikely(status != UCS_OK)) { \ + goto recv_handle_error; \ + } \ + } \ + } else { \ + status = ucg_builtin_step_recv_handle_chunk(aggregation, \ + dest_buffer, data, \ + length, offset, \ + _is_fragmented, \ + _is_dt_packed, req); \ + if (ucs_unlikely(status != UCS_OK)) { \ + goto recv_handle_error; \ + } \ + } \ + \ break; - case UCG_BUILTIN_OP_STEP_COMP_AGGREGATE_REMOTE_KEY: - ucs_assert(length == step->phase->md_attr->rkey_packed_size); - ucg_builtin_comp_unpack_rkey(req, offset, data); +#define case_recv_fragmented(a, _is_fragmented, _is_len_packed, _is_dt_packed) \ + case_recv_full(a, 0, _is_fragmented, _is_len_packed, _is_dt_packed) \ + case_recv_full(a, 1, _is_fragmented, _is_len_packed, _is_dt_packed) + +#define case_recv_len_packed(a, _is_len_packed, _is_dt_packed) \ + case_recv_fragmented(a, 0, _is_len_packed, _is_dt_packed) \ + case_recv_fragmented(a, 1, _is_len_packed, _is_dt_packed) + +#define case_recv_dt_packed(a, _is_dt_packed) \ + case_recv_len_packed(a, 0, _is_dt_packed) \ + case_recv_len_packed(a, 1, _is_dt_packed) + +#define case_recv(a) \ + case a: \ + switch ((uint8_t)step->comp_flags) { \ + case_recv_dt_packed(a, 1) \ + case_recv_dt_packed(a, 0) \ + } \ break; + +static void UCS_F_ALWAYS_INLINE +ucg_builtin_step_recv_handle_data(ucg_builtin_request_t *req, uint64_t offset, + uint8_t *data, size_t length) +{ + ucs_status_t status; + ucg_builtin_op_step_t *step = req->step; + uint8_t *dest_buffer = step->recv_buffer + offset; + + switch (step->comp_aggregation) { + case_recv(UCG_BUILTIN_OP_STEP_COMP_AGGREGATE_NOP) + case_recv(UCG_BUILTIN_OP_STEP_COMP_AGGREGATE_WRITE) + case_recv(UCG_BUILTIN_OP_STEP_COMP_AGGREGATE_GATHER) + case_recv(UCG_BUILTIN_OP_STEP_COMP_AGGREGATE_REDUCE) + case_recv(UCG_BUILTIN_OP_STEP_COMP_AGGREGATE_REMOTE_KEY) } + + return; + +recv_handle_error: + ucg_builtin_comp_last_step_cb(req, status); } #define UCG_IF_STILL_PENDING(req, num, dec) \ @@ -290,6 +355,7 @@ ucg_builtin_step_recv_handle_comp(ucg_builtin_request_t *req, uint64_t offset, break; } + /* Act according to the requested completion action */ switch (step->comp_action) { case UCG_BUILTIN_OP_STEP_COMP_OP: @@ -324,11 +390,11 @@ ucg_builtin_step_check_pending(ucg_builtin_comp_slot_t *slot) unsigned msg_index; ucp_recv_desc_t *rdesc; ucg_builtin_header_t *header; - uint16_t local_id = slot->req.latest.local_id; + uint16_t local_id = slot->req.expecting.local_id; ucs_ptr_array_for_each(rdesc, msg_index, &slot->messages) { header = (ucg_builtin_header_t*)(rdesc + 1); - ucs_assert((header->msg.coll_id != slot->req.latest.coll_id) || - (header->msg.step_idx >= slot->req.latest.step_idx)); + ucs_assert((header->msg.coll_id != slot->req.expecting.coll_id) || + (header->msg.step_idx >= slot->req.expecting.step_idx)); /* * Note: stored message coll_id can be either larger or smaller than * the one currently handled - due to coll_id wrap-around. diff --git a/builtin/ops/builtin_control.c b/builtin/ops/builtin_control.c index 3a2043c..40f210a 100644 --- a/builtin/ops/builtin_control.c +++ b/builtin/ops/builtin_control.c @@ -5,6 +5,7 @@ #include #include +#include #include "builtin_ops.h" #include "builtin_comp_step.inl" @@ -18,29 +19,9 @@ static void ucg_builtin_init_barrier(ucg_builtin_op_t *op, ucg_coll_id_t coll_id ucg_collective_acquire_barrier(op->super.plan->group); } -static void ucg_builtin_finalize_barrier(ucg_builtin_op_t *op, - ucg_request_t *user_req, - ucs_status_t status) +static void ucg_builtin_finalize_barrier(ucg_builtin_op_t *op) { - ucs_status_t ret = ucg_collective_release_barrier(op->super.plan->group); - if (ucs_unlikely((ret != UCS_OK) && (user_req->status == UCS_OK))) { - user_req->status = ret; - } -} - -static void ucg_builtin_finalize_cb(ucg_builtin_op_t *op, - ucg_request_t *user_req, - ucs_status_t status) -{ - op->super.params.comp_cb((void*)user_req, status); -} - -static void ucg_builtin_finalize_barrier_and_cb(ucg_builtin_op_t *op, - ucg_request_t *user_req, - ucs_status_t status) -{ - ucg_builtin_finalize_barrier(op, user_req, UCS_OK /* avoid redundancy */); - ucg_builtin_finalize_cb(op, user_req, status); + ucg_collective_release_barrier(op->super.plan->group); } static void ucg_builtin_init_gather_waypoint(ucg_builtin_op_t *op, ucg_coll_id_t coll_id) @@ -54,22 +35,83 @@ static void ucg_builtin_init_gather_terminal(ucg_builtin_op_t *op, ucg_coll_id_t { ucg_builtin_op_step_t *step = &op->steps[0]; size_t len = step->buffer_length; - memcpy(step->recv_buffer + (op->super.params.type.root * len), + memcpy(step->recv_buffer + (UCG_PARAM_TYPE(&op->super.params).root * len), step->send_buffer, len); ucs_assert((step->flags & UCG_BUILTIN_OP_STEP_FLAG_TEMP_BUFFER_USED) == 0); } -static void ucg_builtin_init_reduce_recursive(ucg_builtin_op_t *op, ucg_coll_id_t coll_id) +static void ucg_builtin_init_reduce(ucg_builtin_op_t *op, ucg_coll_id_t coll_id) { ucg_builtin_op_step_t *step = &op->steps[0]; memcpy(step->recv_buffer, op->super.params.send.buffer, step->buffer_length); } -static void ucg_builtin_init_reduce(ucg_builtin_op_t *op, ucg_coll_id_t coll_id) +static UCS_F_ALWAYS_INLINE void +ucg_builtin_init_state(ucg_builtin_op_t *op, int is_pack) { - ucg_builtin_op_step_t *step = &op->steps[0]; - memcpy(step->recv_buffer, op->super.params.send.buffer, step->buffer_length); - ucs_assert(op->super.params.type.root == op->super.plan->my_index); + ucp_dt_generic_t *dt_gen; + + const ucg_collective_params_t *params = &op->super.params; + + if (is_pack) { + dt_gen = ucp_dt_to_generic(op->send_dt); + op->rstate.dt.generic.state = dt_gen->ops.start_pack(dt_gen->context, + params->send.buffer, + params->send.count); + } else { + dt_gen = ucp_dt_to_generic(op->recv_dt); + op->wstate.dt.generic.state = dt_gen->ops.start_unpack(dt_gen->context, + params->recv.buffer, + params->recv.count); + } + // TODO: re-use ucp_request_send_state_init()+ucp_request_send_state_reset()? +} + +static UCS_F_ALWAYS_INLINE void +ucg_builtin_finalize_state(ucg_builtin_op_t *op, int is_pack) +{ + ucp_dt_generic_t *dt_gen; + + if (is_pack) { + dt_gen = ucp_dt_to_generic(op->send_dt); + dt_gen->ops.finish(op->rstate.dt.generic.state); + } else { + dt_gen = ucp_dt_to_generic(op->recv_dt); + dt_gen->ops.finish(op->wstate.dt.generic.state); + } +} + +static void ucg_builtin_init_pack(ucg_builtin_op_t *op, ucg_coll_id_t coll_id) +{ + ucg_builtin_init_state(op, 1); +} + +static void ucg_builtin_init_unpack(ucg_builtin_op_t *op, ucg_coll_id_t coll_id) +{ + ucg_builtin_init_state(op, 0); +} + +static void ucg_builtin_init_pack_and_unpack(ucg_builtin_op_t *op, + ucg_coll_id_t coll_id) +{ + ucg_builtin_init_state(op, 1); + ucg_builtin_init_state(op, 0); +} + +static void ucg_builtin_finalize_pack(ucg_builtin_op_t *op) +{ + ucg_builtin_finalize_state(op, 1); +} + +static void ucg_builtin_finalize_unpack(ucg_builtin_op_t *op) +{ + ucg_builtin_finalize_state(op, 0); +} + +static void ucg_builtin_finalize_pack_and_unpack(ucg_builtin_op_t *op) +{ + ucg_builtin_finalize_state(op, 1); + ucg_builtin_finalize_state(op, 0); } /* Alltoall Bruck phase 1/3: shuffle the data */ @@ -103,13 +145,11 @@ static void ucg_builtin_calc_alltoall(ucg_builtin_request_t *req, uint8_t *send_ base_offset [kk] = bit_k; item_interval[kk] = bit_k; } -} // TODO: apply to calculation! +} // TODO: re-apply the calculation in builtin_data.c */ /* Alltoall Bruck phase 3/3: shuffle the data */ -static void ucg_builtin_finalize_alltoall(ucg_builtin_op_t *op, - ucg_request_t *user_req, - ucs_status_t status) +static void ucg_builtin_finalize_alltoall(ucg_builtin_op_t *op) { ucg_builtin_op_step_t *step = &op->steps[0]; int bsize = step->buffer_length; @@ -124,14 +164,6 @@ static void ucg_builtin_finalize_alltoall(ucg_builtin_op_t *op, } } -static void ucg_builtin_finalize_alltoall_and_cb(ucg_builtin_op_t *op, - ucg_request_t *user_req, - ucs_status_t status) -{ - ucg_builtin_finalize_alltoall(op, user_req, UCS_OK /* avoid redundancy */); - ucg_builtin_finalize_cb(op, user_req, status); -} - void ucg_builtin_init_scatter(ucg_builtin_op_t *op, ucg_coll_id_t coll_id) { ucg_builtin_plan_t *plan = ucs_derived_of(op->super.plan, ucg_builtin_plan_t); @@ -149,11 +181,9 @@ void ucg_builtin_init_scatter(ucg_builtin_op_t *op, ucg_coll_id_t coll_id) void ucg_builtin_print_init_cb_name(ucg_builtin_op_init_cb_t init_cb) { if (init_cb == NULL) { - printf("NONE"); + printf("none"); } else if (init_cb == ucg_builtin_init_barrier) { printf("barrier"); - } else if (init_cb == ucg_builtin_init_reduce_recursive) { - printf("reduce (recursive)"); } else if (init_cb == ucg_builtin_init_reduce) { printf("reduce"); } else if (init_cb == ucg_builtin_init_gather_terminal) { @@ -164,6 +194,12 @@ void ucg_builtin_print_init_cb_name(ucg_builtin_op_init_cb_t init_cb) printf("alltoall"); } else if (init_cb == ucg_builtin_init_scatter) { printf("scatter"); + } else if (init_cb == ucg_builtin_init_pack) { + printf("pack"); + } else if (init_cb == ucg_builtin_init_unpack) { + printf("unpack"); + } else if (init_cb == ucg_builtin_init_pack_and_unpack) { + printf("pack + unpack"); } else { printf("\n"); ucs_error("unrecognized operation initialization function"); @@ -173,17 +209,17 @@ void ucg_builtin_print_init_cb_name(ucg_builtin_op_init_cb_t init_cb) void ucg_builtin_print_fini_cb_name(ucg_builtin_op_fini_cb_t fini_cb) { if (fini_cb == NULL) { - printf("NONE"); + printf("none"); } else if (fini_cb == ucg_builtin_finalize_barrier) { printf("barrier"); - } else if (fini_cb == ucg_builtin_finalize_cb) { - printf("user's callback"); - } else if (fini_cb == ucg_builtin_finalize_barrier_and_cb) { - printf("barrier and user's callback"); } else if (fini_cb == ucg_builtin_finalize_alltoall) { - printf("Alltoall"); - } else if (fini_cb == ucg_builtin_finalize_alltoall_and_cb) { - printf("Alltoall and user's callback"); + printf("alltoall"); + } else if (fini_cb == ucg_builtin_finalize_pack) { + printf("pack"); + } else if (fini_cb == ucg_builtin_finalize_unpack) { + printf("unpack"); + } else if (fini_cb == ucg_builtin_finalize_pack_and_unpack) { + printf("pack + unpack"); } else { printf("\n"); ucs_error("unrecognized operation finalization function"); @@ -334,35 +370,6 @@ ucs_status_t ucg_builtin_op_consider_optimization(ucg_builtin_op_t *op, return UCS_OK; } -int ucg_builtin_atomic_reduce_full(ucg_builtin_request_t *req, - void *src, void *dst, size_t length) -{ - ucg_collective_params_t *params = &req->op->super.params; - ucs_assert(length == (params->send.dt_len * params->send.count)); - - /* Check for barriers */ - if (ucs_unlikely(params->send.dt_len == 0)) { - return 0; - } - - ucg_builtin_mpi_reduce(params->send.op_ext, src, dst, params->send.count, - params->send.dt_ext); - return length; // TODO: make ucg_builtin_mpi_reduce return the actual size -} - -int ucg_builtin_atomic_reduce_part(ucg_builtin_request_t *req, - void *src, void *dst, size_t length) -{ - ucg_collective_params_t *params = &req->op->super.params; - ucs_assert(length / params->send.dt_len < params->send.count); - ucs_assert(length % params->send.dt_len == 0); - - - ucg_builtin_mpi_reduce(params->send.op_ext, src, dst, - length / params->send.dt_len, params->send.dt_ext); - return length; -} - static enum ucg_builtin_op_step_flags ucg_builtin_step_method_flags[] = { [UCG_PLAN_METHOD_SEND_TERMINAL] = 0, [UCG_PLAN_METHOD_SEND_TO_SM_ROOT] = 0, @@ -393,23 +400,24 @@ ucg_builtin_step_send_flags(ucg_builtin_op_step_t *step, #ifdef HAVE_UCT_COLLECTIVES uct_coll_dtype_mode_t mode, #endif + size_t dt_len, int is_dt_contig, uint64_t *send_flag) { - size_t dt_len = params->send.dt_len; size_t length = step->buffer_length; - #ifndef HAVE_UCT_COLLECTIVES int supports_short = (phase->iface_attr->cap.flags & UCT_IFACE_FLAG_AM_SHORT); int supports_bcopy = (phase->iface_attr->cap.flags & UCT_IFACE_FLAG_AM_BCOPY); int supports_zcopy = (phase->iface_attr->cap.flags & UCT_IFACE_FLAG_AM_ZCOPY); #else - int supports_short = (phase->iface_attr->cap.flags & UCT_IFACE_FLAG_AM_SHORT) && + int supports_short = is_dt_contig && + (phase->iface_attr->cap.flags & UCT_IFACE_FLAG_AM_SHORT) && ((phase->iface_attr->cap.coll_mode.short_flags & UCS_BIT(mode)) || (mode == UCT_COLL_DTYPE_MODE_PADDED)); int supports_bcopy = (phase->iface_attr->cap.flags & UCT_IFACE_FLAG_AM_BCOPY) && ((phase->iface_attr->cap.coll_mode.bcopy_flags & UCS_BIT(mode)) || (mode == UCT_COLL_DTYPE_MODE_PADDED)); - int supports_zcopy = (phase->iface_attr->cap.flags & UCT_IFACE_FLAG_AM_ZCOPY) && + int supports_zcopy = is_dt_contig && + (phase->iface_attr->cap.flags & UCT_IFACE_FLAG_AM_ZCOPY) && ((phase->iface_attr->cap.coll_mode.zcopy_flags & UCS_BIT(mode)) || (mode == UCT_COLL_DTYPE_MODE_PADDED)); @@ -510,95 +518,116 @@ ucg_builtin_step_send_flags(ucg_builtin_op_step_t *step, return UCS_OK; } -void ucg_builtin_step_select_packers(const ucg_collective_params_t *params, - ucg_builtin_op_step_t *step) +ucs_status_t +ucg_builtin_step_select_packers(const ucg_collective_params_t *params, + size_t send_dt_len, int is_send_dt_contig, + ucg_builtin_op_step_t *step) { - int is_reduce = ((step->phase->method == UCG_PLAN_METHOD_SEND_TO_SM_ROOT) && - (params->type.modifiers & - UCG_GROUP_COLLECTIVE_MODIFIER_AGGREGATE)); - - if ((is_reduce) && - (step->flags & UCG_BUILTIN_OP_STEP_FLAG_SEND_AM_SHORT) && - (ucg_global_params.type_info.datatype_is_integer_f != NULL) && - (ucg_global_params.type_info.datatype_is_integer_f(params->send.dt_ext)) && - (ucg_global_params.type_info.reduce_op_is_sum_f != NULL) && - (ucg_global_params.type_info.reduce_op_is_sum_f(params->send.op_ext))) { + int is_signed; + uint16_t modifiers = UCG_PARAM_TYPE(params).modifiers; + int is_sm_reduce = ((step->phase->method == UCG_PLAN_METHOD_SEND_TO_SM_ROOT) && + (modifiers & UCG_GROUP_COLLECTIVE_MODIFIER_AGGREGATE)); + + if ((is_sm_reduce) && + (ucg_global_params.field_mask & UCG_PARAM_FIELD_DATATYPE_CB) && + (ucg_global_params.datatype.is_integer_f(params->send.dtype, &is_signed)) && + (!is_signed) && + (ucg_global_params.field_mask & UCG_PARAM_FIELD_REDUCE_OP_CB) && + (ucg_global_params.reduce_op.is_sum_f != NULL) && + (ucg_global_params.reduce_op.is_sum_f(params->send.op))) { int is_single = (params->send.count == 1); -#ifdef HAVE_UCT_COLLECTIVES - step->uct_flags |= (UCT_SEND_FLAG_PACK_LOCK << UCT_COLL_DTYPE_MODE_BITS); -#endif - switch (params->send.dt_len) { + // TODO: (un)set UCG_BUILTIN_OP_STEP_FLAG_BCOPY_PACK_LOCK... + switch (send_dt_len) { case 1: step->bcopy.pack_single_cb = is_single ? UCG_BUILTIN_PACKER_NAME(_atomic_single_, 8) : UCG_BUILTIN_PACKER_NAME(_atomic_multiple_, 8); - return; + return UCS_OK; case 2: step->bcopy.pack_single_cb = is_single ? UCG_BUILTIN_PACKER_NAME(_atomic_single_, 16) : UCG_BUILTIN_PACKER_NAME(_atomic_multiple_, 16); - return; + return UCS_OK; case 4: step->bcopy.pack_single_cb = is_single ? UCG_BUILTIN_PACKER_NAME(_atomic_single_, 32) : UCG_BUILTIN_PACKER_NAME(_atomic_multiple_, 32); - return; + return UCS_OK; case 8: step->bcopy.pack_single_cb = is_single ? UCG_BUILTIN_PACKER_NAME(_atomic_single_, 64) : UCG_BUILTIN_PACKER_NAME(_atomic_multiple_, 64); - return; + return UCS_OK; default: - ucs_error("unsupported integer datatype length: %lu", params->send.dt_len); + ucs_error("unsupported unsigned integer datatype length: %lu", + send_dt_len); break; /* fall-back to the MPI reduction callback */ } } - int is_variadic = (params->type.modifiers & + int is_variadic = (UCG_PARAM_TYPE(params).modifiers & UCG_GROUP_COLLECTIVE_MODIFIER_VARIADIC); - int is_src_contig = ((ucg_global_params.type_info.mpi_is_contig_f != NULL) && - (ucg_global_params.type_info.mpi_is_contig_f(params->send.dt_ext, - params->send.count))); - if (is_reduce) { - step->bcopy.pack_full_cb = UCG_BUILTIN_PACKER_NAME(_reducing_, full); - step->bcopy.pack_part_cb = UCG_BUILTIN_PACKER_NAME(_reducing_, part); - step->bcopy.pack_single_cb = UCG_BUILTIN_PACKER_NAME(_reducing_, single); + if (!is_send_dt_contig) { + step->bcopy.pack_full_cb = UCG_BUILTIN_PACKER_NAME(_datatype_, full); + step->bcopy.pack_part_cb = UCG_BUILTIN_PACKER_NAME(_datatype_, part); + step->bcopy.pack_single_cb = UCG_BUILTIN_PACKER_NAME(_datatype_, single); } else if (is_variadic) { step->bcopy.pack_full_cb = UCG_BUILTIN_PACKER_NAME(_variadic_, full); step->bcopy.pack_part_cb = UCG_BUILTIN_PACKER_NAME(_variadic_, part); step->bcopy.pack_single_cb = UCG_BUILTIN_PACKER_NAME(_variadic_, single); - } else if (!is_src_contig) { - step->bcopy.pack_full_cb = UCG_BUILTIN_PACKER_NAME(_datatype_, full); - step->bcopy.pack_part_cb = UCG_BUILTIN_PACKER_NAME(_datatype_, part); - step->bcopy.pack_single_cb = UCG_BUILTIN_PACKER_NAME(_datatype_, single); + } else if (is_sm_reduce) { + step->bcopy.pack_full_cb = UCG_BUILTIN_PACKER_NAME(_reducing_, full); + step->bcopy.pack_part_cb = UCG_BUILTIN_PACKER_NAME(_reducing_, part); + step->bcopy.pack_single_cb = UCG_BUILTIN_PACKER_NAME(_reducing_, single); } else { step->bcopy.pack_full_cb = UCG_BUILTIN_PACKER_NAME(_, full); step->bcopy.pack_part_cb = UCG_BUILTIN_PACKER_NAME(_, part); step->bcopy.pack_single_cb = UCG_BUILTIN_PACKER_NAME(_, single); } + + return UCS_OK; } #define UCG_BUILTIN_STEP_RECV_FLAGS (UCG_BUILTIN_OP_STEP_FLAG_RECV_AFTER_SEND |\ UCG_BUILTIN_OP_STEP_FLAG_RECV_BEFORE_SEND1|\ UCG_BUILTIN_OP_STEP_FLAG_RECV1_BEFORE_SEND) +static inline int ucg_builtin_convert_datatype(void *param_datatype, + ucp_datatype_t *ucp_datatype) +{ + if (ucg_global_params.field_mask & UCG_PARAM_FIELD_DATATYPE_CB) { + int ret = ucg_global_params.datatype.convert(param_datatype, ucp_datatype); + if (ucs_unlikely(ret != 0)) { + ucs_error("Datatype conversion callback failed"); + return UCS_ERR_INVALID_PARAM; + } + } else { + *ucp_datatype = (ucp_datatype_t)param_datatype; + } + + return UCS_OK; +} + ucs_status_t ucg_builtin_step_create(ucg_builtin_plan_t *plan, ucg_builtin_plan_phase_t *phase, enum ucg_builtin_op_step_flags *flags, const ucg_collective_params_t *params, int8_t **current_data_buffer, + int is_send_dt_contig, + int is_recv_dt_contig, + size_t send_dt_len, ucg_builtin_op_init_cb_t *init_cb, ucg_builtin_op_fini_cb_t *fini_cb, ucg_builtin_op_step_t *step, int *zcopy_step_skip) { - - enum ucg_collective_modifiers modifiers = params->type.modifiers; + ucs_status_t status; + enum ucg_collective_modifiers modifiers = UCG_PARAM_TYPE(params).modifiers; /* Make sure local_id is always nonzero ( @ref ucg_builtin_header_step_t )*/ ucs_assert_always(phase->step_index != 0); @@ -620,21 +649,11 @@ ucs_status_t ucg_builtin_step_create(ucg_builtin_plan_t *plan, step->iter_ep = 0; step->iter_offset = 0; step->fragment_pending = NULL; + step->buffer_length = send_dt_len * params->send.count; step->recv_buffer = (int8_t*)params->recv.buffer; step->uct_md = phase->md; - step->uct_flags = 0; step->flags = ucg_builtin_step_method_flags[phase->method]; - if (modifiers & UCG_GROUP_COLLECTIVE_MODIFIER_IGNORE_SEND) { - if (modifiers & UCG_GROUP_COLLECTIVE_MODIFIER_IGNORE_RECV) { - step->buffer_length = 0; /* Barrier */ - } else { - step->buffer_length = params->recv.dt_len * params->recv.count; - } - } else { - step->buffer_length = params->send.dt_len * params->send.count; - } - if (phase->md) { step->uct_iface = (phase->ep_cnt == 1) ? phase->single_ep->iface : phase->multi_eps[0]->iface; @@ -669,12 +688,11 @@ ucs_status_t ucg_builtin_step_create(ucg_builtin_plan_t *plan, } /* Decide how the messages are sent (regardless of my role) */ - ucs_status_t status = ucg_builtin_step_send_flags(step, phase, params, - mode, &send_flags); + status = ucg_builtin_step_send_flags(step, phase, params, mode, #else - ucs_status_t status = ucg_builtin_step_send_flags(step, phase, params, - &send_flags); + status = ucg_builtin_step_send_flags(step, phase, params, #endif + send_dt_len, is_send_dt_contig, &send_flags); if (ucs_unlikely(status != UCS_OK)) { return status; } @@ -695,50 +713,41 @@ ucs_status_t ucg_builtin_step_create(ucg_builtin_plan_t *plan, goto zcopy_redo; } + step->comp_flags = 0; +#ifdef HAVE_UCT_COLLECTIVES + if (phase->iface_attr->cap.flags & UCT_IFACE_FLAG_INCAST) { + step->comp_flags |= UCG_BUILTIN_OP_STEP_COMP_FLAG_BATCHED_DATA; + } +#endif + int is_fragmented = (send_flags & UCG_BUILTIN_OP_STEP_FLAG_FRAGMENTED); if (is_fragmented) { - step->flags |= UCG_BUILTIN_OP_STEP_FLAG_FRAGMENTED; + step->flags |= UCG_BUILTIN_OP_STEP_FLAG_FRAGMENTED; + step->comp_flags |= UCG_BUILTIN_OP_STEP_COMP_FLAG_FRAGMENTED_DATA; step->fragment_pending = (uint8_t*)UCS_ALLOC_CHECK(sizeof(phase->ep_cnt), "ucg_builtin_step_pipelining"); } -#ifdef HAVE_UCT_COLLECTIVES - if ((phase->method == UCG_PLAN_METHOD_SEND_TO_SM_ROOT) && - (phase->iface_attr->cap.flags & (UCT_IFACE_FLAG_INCAST | - UCT_IFACE_FLAG_BCAST))) { - send_flags |= UCG_BUILTIN_OP_STEP_FLAG_PACKED_DTYPE_MODE; - if (send_flags & UCG_BUILTIN_OP_STEP_FLAG_SEND_AM_SHORT) { - if (is_fragmented) { - step->fragment_length = - UCT_COLL_DTYPE_MODE_PACK(mode, step->fragment_length); - } else { - step->buffer_length = - UCT_COLL_DTYPE_MODE_PACK(mode, step->buffer_length); - } - } else if (modifiers & UCG_GROUP_COLLECTIVE_MODIFIER_VARIADIC) { - //step->zcopy.iov[1].buffer = (void*)params->send.displs; - } - } -#endif - /* Do any special assignment w.r.t. the src/dst buffers in this step */ int is_send = 0; + int is_no_recv = 0; int is_send_after_recv = 0; int is_pipelined = 0; + int is_reduction = 0; int is_barrier = modifiers & UCG_GROUP_COLLECTIVE_MODIFIER_BARRIER; int is_broadcast = modifiers & UCG_GROUP_COLLECTIVE_MODIFIER_BROADCAST; int is_one_dest = modifiers & UCG_GROUP_COLLECTIVE_MODIFIER_SINGLE_DESTINATION; switch (phase->method) { case UCG_PLAN_METHOD_SEND_TERMINAL: case UCG_PLAN_METHOD_SEND_TO_SM_ROOT: + case UCG_PLAN_METHOD_SCATTER_TERMINAL: is_send = 1; + is_no_recv = 1; if (init_cb != NULL) { if (is_barrier) { ucs_assert(fini_cb != NULL); *init_cb = ucg_builtin_init_barrier; - *fini_cb = params->comp_cb ? - ucg_builtin_finalize_barrier_and_cb : - ucg_builtin_finalize_barrier; + *fini_cb = ucg_builtin_finalize_barrier; } else if ((!is_broadcast) && (!is_one_dest)) { *init_cb = ucg_builtin_init_scatter; } @@ -746,13 +755,12 @@ ucs_status_t ucg_builtin_step_create(ucg_builtin_plan_t *plan, break; case UCG_PLAN_METHOD_REDUCE_TERMINAL: + is_reduction = 1; if (init_cb != NULL) { if (is_barrier) { ucs_assert(fini_cb != NULL); *init_cb = ucg_builtin_init_barrier; - *fini_cb = params->comp_cb ? - ucg_builtin_finalize_barrier_and_cb : - ucg_builtin_finalize_barrier; + *fini_cb = ucg_builtin_finalize_barrier; } else { *init_cb = ucg_builtin_init_reduce; } @@ -769,13 +777,12 @@ ucs_status_t ucg_builtin_step_create(ucg_builtin_plan_t *plan, break; case UCG_PLAN_METHOD_REDUCE_WAYPOINT: + is_reduction = 1; if (init_cb != NULL) { if (is_barrier) { ucs_assert(fini_cb != NULL); *init_cb = ucg_builtin_init_barrier; - *fini_cb = params->comp_cb ? - ucg_builtin_finalize_barrier_and_cb : - ucg_builtin_finalize_barrier; + *fini_cb = ucg_builtin_finalize_barrier; } else { *init_cb = ucg_builtin_init_reduce; } @@ -817,15 +824,14 @@ ucs_status_t ucg_builtin_step_create(ucg_builtin_plan_t *plan, case UCG_PLAN_METHOD_REDUCE_RECURSIVE: /* First step is the exception to this rule */ is_send = 1; + is_reduction = 1; if (init_cb != NULL) { if (is_barrier) { ucs_assert(fini_cb != NULL); *init_cb = ucg_builtin_init_barrier; - *fini_cb = params->comp_cb ? - ucg_builtin_finalize_barrier_and_cb : - ucg_builtin_finalize_barrier; + *fini_cb = ucg_builtin_finalize_barrier; } else { - *init_cb = ucg_builtin_init_reduce_recursive; + *init_cb = ucg_builtin_init_reduce; } } if (phase->step_index == 1) { @@ -842,9 +848,7 @@ ucs_status_t ucg_builtin_step_create(ucg_builtin_plan_t *plan, if (init_cb != NULL) { ucs_assert(fini_cb != NULL); *init_cb = ucg_builtin_init_alltoall; - *fini_cb = params->comp_cb ? - ucg_builtin_finalize_alltoall_and_cb : - ucg_builtin_finalize_alltoall; + *fini_cb = ucg_builtin_finalize_alltoall; } break; @@ -852,8 +856,81 @@ ucs_status_t ucg_builtin_step_create(ucg_builtin_plan_t *plan, break; } - if ((fini_cb != NULL) && (*fini_cb == NULL) && (params->comp_cb)) { - *fini_cb = ucg_builtin_finalize_cb; + if (is_reduction && !is_barrier) { + if (!(ucg_global_params.field_mask & UCG_PARAM_FIELD_REDUCE_OP_CB)) { + ucs_error("Cannot perform reductions: Missing ucg_init() parameters"); + return UCS_ERR_INVALID_PARAM; + } + + if (!ucg_global_params.reduce_op.is_commutative_f(UCG_PARAM_OP(params))) { + ucs_error("Cannot perform reduction: non-commutative operations unsupported"); + return UCS_ERR_UNSUPPORTED; + // TODO: set UCG_GROUP_COLLECTIVE_MODIFIER_AGGREGATE_STABLE instead + } + + if (ucg_global_params.reduce_op.is_loc_expected_f(UCG_PARAM_OP(params))) { + ucs_error("Cannot perform reductions: MPI's MINLOC/MAXLOC unsupported"); + return UCS_ERR_UNSUPPORTED; + } + } + + if (is_send) { + /* packer callback selection */ + if (send_flags & UCG_BUILTIN_OP_STEP_FLAG_SEND_AM_BCOPY) { + status = ucg_builtin_step_select_packers(params, send_dt_len, + is_send_dt_contig, step); + if (ucs_unlikely(status != UCS_OK)) { + return status; + } + + if (!is_send_dt_contig) { + ucs_assert(*init_cb == NULL); + ucs_assert(*fini_cb == NULL); + + if (!is_recv_dt_contig && !is_no_recv) { + *init_cb = ucg_builtin_init_pack_and_unpack; + *fini_cb = ucg_builtin_finalize_pack_and_unpack; + } else { + *init_cb = ucg_builtin_init_pack; + *fini_cb = ucg_builtin_finalize_pack; + } + + step->comp_flags |= UCG_BUILTIN_OP_STEP_COMP_FLAG_PACKED_DATATYPE; + } + } + +#ifdef HAVE_UCT_COLLECTIVES + if ((is_send) && + (phase->iface_attr->cap.flags & (UCT_IFACE_FLAG_INCAST | + UCT_IFACE_FLAG_BCAST))) { + send_flags |= UCG_BUILTIN_OP_STEP_FLAG_PACKED_DTYPE_MODE; + if (send_flags & UCG_BUILTIN_OP_STEP_FLAG_SEND_AM_SHORT) { + if (is_fragmented) { + step->fragment_length = + UCT_COLL_DTYPE_MODE_PACK(mode, step->fragment_length); + } else { + step->buffer_length = + UCT_COLL_DTYPE_MODE_PACK(mode, step->buffer_length); + } + step->comp_flags |= UCG_BUILTIN_OP_STEP_COMP_FLAG_PACKED_LENGTH; + } else if (modifiers & UCG_GROUP_COLLECTIVE_MODIFIER_VARIADIC) { + //step->zcopy.iov[1].buffer = (void*)params->send.displs; + } + } +#endif + } else { + if (!is_recv_dt_contig) { + ucs_assert(*init_cb == NULL); + ucs_assert(*fini_cb == NULL); + + *init_cb = ucg_builtin_init_unpack; + *fini_cb = ucg_builtin_finalize_unpack; + + step->comp_flags |= UCG_BUILTIN_OP_STEP_COMP_FLAG_PACKED_DATATYPE; + } else { + step->buffer_length = params->recv.count * send_dt_len; + // TODO: fix for cases where send and receive datatypes differ + } } if (is_concat) { @@ -866,11 +943,6 @@ ucs_status_t ucg_builtin_step_create(ucg_builtin_plan_t *plan, step->am_header.remote_offset = plan->super.my_index * step->buffer_length; } - /* packer callback selection */ - if (send_flags & UCG_BUILTIN_OP_STEP_FLAG_SEND_AM_BCOPY) { - ucg_builtin_step_select_packers(params, step); - } - /* memory registration (using the memory registration cache) */ int is_zcopy = (send_flags & UCG_BUILTIN_OP_STEP_FLAG_SEND_AM_ZCOPY); if (is_zcopy) { @@ -885,39 +957,22 @@ ucs_status_t ucg_builtin_step_create(ucg_builtin_plan_t *plan, step->flags |= UCG_BUILTIN_OP_STEP_FLAG_LAST_STEP; } - /* Choose the data-related action to be taken by the incoming AM handler */ - int is_dst_contig = ((ucg_global_params.type_info.mpi_is_contig_f != NULL) && - (ucg_global_params.type_info.mpi_is_contig_f(params->recv.dt_ext, - params->recv.count))); - -#ifdef HAVE_UCT_COLLECTIVES - int is_incast_used = (phase->iface_attr->cap.flags & UCT_IFACE_FLAG_INCAST) != 0; -#else - int is_incast_used = 0; -#endif if (is_barrier || !(step->flags & UCG_BUILTIN_STEP_RECV_FLAGS)) { step->comp_aggregation = UCG_BUILTIN_OP_STEP_COMP_AGGREGATE_NOP; } else if ((send_flags & UCT_IFACE_FLAG_AM_ZCOPY) && (zcopy_step_skip)) { step->comp_aggregation = UCG_BUILTIN_OP_STEP_COMP_AGGREGATE_REMOTE_KEY; - } else if (phase->method == UCG_PLAN_METHOD_GATHER_TERMINAL) { - step->comp_aggregation = UCG_BUILTIN_OP_STEP_COMP_AGGREGATE_GATHER_TERMINAL; - } else if (phase->method == UCG_PLAN_METHOD_GATHER_WAYPOINT) { - step->comp_aggregation = UCG_BUILTIN_OP_STEP_COMP_AGGREGATE_GATHER_WAYPOINT; + } else if ((phase->method == UCG_PLAN_METHOD_GATHER_TERMINAL) || + (phase->method == UCG_PLAN_METHOD_GATHER_WAYPOINT)) { + step->comp_aggregation = UCG_BUILTIN_OP_STEP_COMP_AGGREGATE_GATHER; } else if ((phase->method == UCG_PLAN_METHOD_REDUCE_TERMINAL) || (phase->method == UCG_PLAN_METHOD_REDUCE_RECURSIVE)) { - if (is_incast_used) { - step->comp_aggregation = UCG_BUILTIN_OP_STEP_COMP_AGGREGATE_REDUCE_BATCHED; - } else if (is_fragmented) { - step->comp_aggregation = UCG_BUILTIN_OP_STEP_COMP_AGGREGATE_REDUCE_FRAGMENT; - } else { - step->comp_aggregation = UCG_BUILTIN_OP_STEP_COMP_AGGREGATE_REDUCE_SINGLE; + if (is_fragmented) { + step->dtype_length = send_dt_len; } - } else if (is_dst_contig) { - step->comp_aggregation = UCG_BUILTIN_OP_STEP_COMP_AGGREGATE_WRITE; + step->comp_aggregation = UCG_BUILTIN_OP_STEP_COMP_AGGREGATE_REDUCE; + ucs_assert(params->recv.count > 0); } else { - step->comp_aggregation = UCG_BUILTIN_OP_STEP_COMP_AGGREGATE_WRITE_UNPACKED; - /* If we pack/unpack - we care more about the count than the length */ - step->unpack_count = params->recv.count; + step->comp_aggregation = UCG_BUILTIN_OP_STEP_COMP_AGGREGATE_WRITE; } /* Choose a completion criteria to be checked by the incoming AM handler */ @@ -964,29 +1019,34 @@ ucs_status_t ucg_builtin_step_create_rkey_bcast(ucg_builtin_plan_t *plan, const ucg_collective_params_t *params, ucg_builtin_op_step_t *step) { + ucg_builtin_op_step_t *zcopy_step = step + 1; - int is_bcast = zcopy_step->flags & UCG_GROUP_COLLECTIVE_MODIFIER_BROADCAST; + ucg_group_member_index_t root = UCG_PARAM_TYPE(params).root; + unsigned is_bcast = zcopy_step->flags & + UCG_GROUP_COLLECTIVE_MODIFIER_BROADCAST; ucs_assert((zcopy_step->flags & UCG_BUILTIN_OP_STEP_FLAG_SEND_PUT_ZCOPY) || (zcopy_step->flags & UCG_BUILTIN_OP_STEP_FLAG_SEND_GET_ZCOPY)); /* Prepare to send out my remote key */ - size_t rkey_size = step->phase->md_attr->rkey_packed_size; - size_t info_size = sizeof(uint64_t) + rkey_size; - size_t total_size = is_bcast ? info_size : - info_size * plan->super.group_size; - uint8_t *info_buffer = UCS_ALLOC_CHECK(total_size, "builtin_rkey_info"); + size_t rkey_size = step->phase->md_attr->rkey_packed_size; + size_t info_size = sizeof(uint64_t) + rkey_size; + size_t total_size = is_bcast ? info_size : + info_size * plan->super.group_size; + uint8_t *info_buffer = UCS_ALLOC_CHECK(total_size, "builtin_rkey_info"); /* Set some parameters for step creation */ - ucg_collective_params_t step_params; - if (params->type.root == plan->super.my_index) { - /* This is the sender of the remote key */ - step_params.send.buffer = info_buffer; - step_params.send.count = 1; - step_params.send.dt_len = info_size; - step_params.type.root = params->type.root; - + ucg_collective_params_t step_params = {0}; + UCG_PARAM_TYPE(&step_params).modifiers = is_bcast | + UCG_GROUP_COLLECTIVE_MODIFIER_SINGLE_SOURCE; + UCG_PARAM_TYPE(&step_params).root = root; + step_params.send.buffer = info_buffer; + step_params.send.count = 1; + step_params.recv.buffer = info_buffer; + step_params.recv.count = 1; + + if (root == plan->super.my_index) { ucs_assert(step->send_buffer == step->recv_buffer); // TODO: choose the right buffer! *((uint64_t*)info_buffer) = (uint64_t)step->send_buffer; info_buffer += sizeof(uint64_t); @@ -998,11 +1058,8 @@ ucs_status_t ucg_builtin_step_create_rkey_bcast(ucg_builtin_plan_t *plan, return status; } - if (is_bcast) { - step_params.type.modifiers = UCG_GROUP_COLLECTIVE_MODIFIER_BROADCAST | - UCG_GROUP_COLLECTIVE_MODIFIER_SINGLE_SOURCE; - } else { - /* Copy the same memory key to the other buffer */ + if (!is_bcast) { + /* Copy the same memory key before scattering among the peers */ int idx; uint8_t *info_iter = info_buffer; for (idx = 1; idx < plan->super.group_size; idx++) { @@ -1015,23 +1072,38 @@ ucs_status_t ucg_builtin_step_create_rkey_bcast(ucg_builtin_plan_t *plan, */ memcpy(info_iter, info_buffer, rkey_size); } - - step_params.type.modifiers = UCG_GROUP_COLLECTIVE_MODIFIER_SINGLE_SOURCE; } - } else { - /* This is a receiver of the remote key */ - step_params.recv.buffer = info_buffer; - step_params.recv.count = 1; - step_params.recv.dt_len = info_size; - step_params.type.root = params->type.root; - step_params.type.modifiers = UCG_GROUP_COLLECTIVE_MODIFIER_SINGLE_SOURCE; } /* Create the preliminary remote-key-exchange step */ int dummy_skip; enum ucg_builtin_op_step_flags flags = UCG_BUILTIN_OP_STEP_FLAG_TEMP_BUFFER_USED; return ucg_builtin_step_create(plan, step->phase, &flags, &step_params, - NULL, NULL, NULL, step, &dummy_skip); + NULL, 1, 1, info_size, NULL, NULL, step, + &dummy_skip); +} + +static inline size_t ucg_builtin_op_get_send_dt_length(ucg_builtin_op_t *op) +{ + const ucg_collective_params_t *params = &op->super.params; + + if (params->send.count == 0) { + return 0; + } + + if (UCP_DT_IS_CONTIG(op->send_dt)) { + return ucp_contig_dt_length(op->send_dt, 1); + } + + ucg_builtin_init_state(op, 1); + + ucp_dt_generic_t *dt_gen = ucp_dt_to_generic(op->send_dt); + + size_t len = dt_gen->ops.packed_size(op->rstate.dt.generic.state); + + ucg_builtin_finalize_state(op, 1); + + return len; } ucs_status_t ucg_builtin_op_create(ucg_plan_t *plan, @@ -1045,23 +1117,42 @@ ucs_status_t ucg_builtin_op_create(ucg_plan_t *plan, ucg_builtin_op_t *op = (ucg_builtin_op_t*) ucs_mpool_get_inline(&builtin_plan->op_mp); ucg_builtin_op_step_t *next_step = &op->steps[0]; + int is_send_dt_contig = 1; + int is_recv_dt_contig = 1; int zcopy_step_skip = 0; int8_t *current_data_buffer = NULL; op->init_cb = NULL; op->fini_cb = NULL; + /* obtain UCX datatypes corresponding to the extenral datatypes passed */ + if (params->send.count > 0) { + status = ucg_builtin_convert_datatype(params->send.dtype, &op->send_dt); + if (ucs_unlikely(status != UCS_OK)) { + return status; + } + + is_send_dt_contig = UCP_DT_IS_CONTIG(op->send_dt); + } + + if (params->recv.count > 0) { + status = ucg_builtin_convert_datatype(params->recv.dtype, &op->recv_dt); + if (ucs_unlikely(status != UCS_OK)) { + return status; + } + + is_recv_dt_contig = UCP_DT_IS_CONTIG(op->recv_dt); + } + /* copy the parameters aside, and use those from now on */ memcpy(&op->super.params, params, sizeof(*params)); - if (ucs_unlikely(params->type.modifiers & - UCG_GROUP_COLLECTIVE_MODIFIER_IGNORE_SEND)) { - memcpy(&op->super.params, ¶ms->recv_only, sizeof(params->recv_only)); - } params = &op->super.params; + size_t send_dt_len = ucg_builtin_op_get_send_dt_length(op); + /* Note: this needs to be after op->params and op->send_dt are set */ /* Check for non-zero-root trees */ - if (ucs_unlikely(params->type.root != 0)) { + if (ucs_unlikely(UCG_PARAM_TYPE(params).root != 0)) { /* Assume the plan is tree-based, since Recursive K-ing has no root */ - status = ucg_builtin_topo_tree_set_root(params->type.root, + status = ucg_builtin_topo_tree_set_root(UCG_PARAM_TYPE(params).root, plan->my_index, builtin_plan, &next_phase, &phase_count); if (ucs_unlikely(status != UCS_OK)) { @@ -1076,7 +1167,8 @@ ucs_status_t ucg_builtin_op_create(ucg_plan_t *plan, flags = UCG_BUILTIN_OP_STEP_FLAG_LAST_STEP; status = ucg_builtin_step_create(builtin_plan, next_phase, &flags, params, ¤t_data_buffer, - &op->init_cb, &op->fini_cb, + is_send_dt_contig, is_recv_dt_contig, + send_dt_len, &op->init_cb, &op->fini_cb, next_step, &zcopy_step_skip); if ((status == UCS_OK) && ucs_unlikely(zcopy_step_skip)) { @@ -1088,7 +1180,8 @@ ucs_status_t ucg_builtin_op_create(ucg_plan_t *plan, /* First step of many */ status = ucg_builtin_step_create(builtin_plan, next_phase, &flags, params, ¤t_data_buffer, - &op->init_cb, &op->fini_cb, + is_send_dt_contig, is_recv_dt_contig, + send_dt_len, &op->init_cb, &op->fini_cb, next_step, &zcopy_step_skip); if (ucs_unlikely(status != UCS_OK)) { goto op_cleanup; @@ -1110,7 +1203,8 @@ ucs_status_t ucg_builtin_op_create(ucg_plan_t *plan, for (step_cnt = 1; step_cnt < phase_count - 1; step_cnt++) { status = ucg_builtin_step_create(builtin_plan, ++next_phase, &flags, params, ¤t_data_buffer, - NULL, NULL, ++next_step, + is_send_dt_contig, is_recv_dt_contig, + send_dt_len, NULL, NULL, ++next_step, &zcopy_step_skip); if (ucs_unlikely(status != UCS_OK)) { goto op_cleanup; @@ -1133,7 +1227,8 @@ ucs_status_t ucg_builtin_op_create(ucg_plan_t *plan, flags |= UCG_BUILTIN_OP_STEP_FLAG_LAST_STEP; status = ucg_builtin_step_create(builtin_plan, ++next_phase, &flags, params, ¤t_data_buffer, - NULL, NULL, ++next_step, + is_send_dt_contig, is_recv_dt_contig, + send_dt_len, NULL, NULL, ++next_step, &zcopy_step_skip); if ((status == UCS_OK) && ucs_unlikely(zcopy_step_skip)) { status = ucg_builtin_step_create_rkey_bcast(builtin_plan, @@ -1197,7 +1292,7 @@ ucs_status_t ucg_builtin_op_trigger(ucg_op_t *op, ucg_builtin_op_t *builtin_op = (ucg_builtin_op_t*)op; unsigned slot_idx = coll_id % UCG_BUILTIN_MAX_CONCURRENT_OPS; ucg_builtin_comp_slot_t *slot = &builtin_op->slots[slot_idx]; - if (ucs_unlikely(slot->req.latest.local_id != 0)) { + if (ucs_unlikely(slot->req.expecting.local_id != 0)) { ucs_error("UCG Builtin planner exceeded its concurrent collectives limit."); return UCS_ERR_NO_RESOURCE; } diff --git a/builtin/ops/builtin_data.c b/builtin/ops/builtin_data.c index 377698b..47f7526 100644 --- a/builtin/ops/builtin_data.c +++ b/builtin/ops/builtin_data.c @@ -133,11 +133,18 @@ ucg_builtin_step_am_bcopy_one(ucg_builtin_request_t *req, ucg_builtin_op_step_t *step, uct_ep_h ep, int var_stride) { + unsigned uct_flags; + if (ucs_unlikely(step->flags & UCG_BUILTIN_OP_STEP_FLAG_BCOPY_PACK_LOCK)) { + uct_flags = UCT_SEND_FLAG_PACK_LOCK; + } else { + uct_flags = 0; + } + UCG_BUILTIN_ASSERT_SEND(step, AM_BCOPY); ssize_t len = step->uct_iface->ops.ep_am_bcopy(ep, step->am_id, step->bcopy.pack_single_cb, - req, step->uct_flags); + req, uct_flags); return (ucs_unlikely(len < 0)) ? (ucs_status_t)len : UCS_OK; } @@ -153,6 +160,13 @@ ucg_builtin_step_am_bcopy_max(ucg_builtin_request_t *req, ucg_offset_t iter_limit = step->buffer_length - frag_size; packed_send_t send_func = step->uct_iface->ops.ep_am_bcopy; + unsigned uct_flags; + if (ucs_unlikely(step->flags & UCG_BUILTIN_OP_STEP_FLAG_BCOPY_PACK_LOCK)) { + uct_flags = UCT_SEND_FLAG_PACK_LOCK; + } else { + uct_flags = 0; + } + UCG_BUILTIN_ASSERT_SEND(step, AM_BCOPY); ucs_assert(step->iter_offset != UCG_BUILTIN_OFFSET_PIPELINE_READY); ucs_assert(step->iter_offset != UCG_BUILTIN_OFFSET_PIPELINE_PENDING); @@ -161,7 +175,7 @@ ucg_builtin_step_am_bcopy_max(ucg_builtin_request_t *req, if (ucs_likely(step->iter_offset < iter_limit)) { /* send every fragment but the last */ do { - len = send_func(ep, am_id, step->bcopy.pack_full_cb, req, step->uct_flags); + len = send_func(ep, am_id, step->bcopy.pack_full_cb, req, uct_flags); if (is_pipelined) { return ucs_unlikely(len < 0) ? (ucs_status_t)len : UCS_OK; @@ -179,7 +193,7 @@ ucg_builtin_step_am_bcopy_max(ucg_builtin_request_t *req, } /* Send last fragment of the message */ - len = send_func(ep, am_id, step->bcopy.pack_part_cb, req, step->uct_flags); + len = send_func(ep, am_id, step->bcopy.pack_part_cb, req, uct_flags); if (ucs_unlikely(len < 0)) { return (ucs_status_t)len; } @@ -604,8 +618,8 @@ UCS_PROFILE_FUNC(ucs_status_t, ucg_builtin_step_execute, (req), } /* Initialize the users' request object, if applicable */ - slot->req.latest.local_id = step->am_header.msg.local_id; - ucs_assert(slot->req.latest.local_id != 0); + slot->req.expecting.local_id = step->am_header.msg.local_id; + ucs_assert(slot->req.expecting.local_id != 0); return ucg_builtin_step_check_pending(slot); /************************** Error flows ***********************************/ diff --git a/builtin/ops/builtin_ops.h b/builtin/ops/builtin_ops.h index 7eb5e7f..63b8ec8 100644 --- a/builtin/ops/builtin_ops.h +++ b/builtin/ops/builtin_ops.h @@ -89,49 +89,52 @@ enum ucg_builtin_op_step_flags { UCG_BUILTIN_OP_STEP_FLAG_SWITCH_MASK = UCS_MASK(15), /* Additional step information */ - UCG_BUILTIN_OP_STEP_FLAG_TEMP_BUFFER_USED = UCS_BIT(15), - UCG_BUILTIN_OP_STEP_FLAG_PACKED_DTYPE_MODE = UCS_BIT(16), - UCG_BUILTIN_OP_STEP_FLAG_FT_ONGOING = UCS_BIT(17) + UCG_BUILTIN_OP_STEP_FLAG_BCOPY_PACK_LOCK = UCS_BIT(15), + UCG_BUILTIN_OP_STEP_FLAG_TEMP_BUFFER_USED = UCS_BIT(16), + UCG_BUILTIN_OP_STEP_FLAG_PACKED_DTYPE_MODE = UCS_BIT(17), + UCG_BUILTIN_OP_STEP_FLAG_FT_ONGOING = UCS_BIT(18), + UCG_BUILTIN_OP_STEP_FLAG_RESERVED = UCS_BIT(19) }; -enum ucg_builtin_op_step_comp_aggregate { - UCG_BUILTIN_OP_STEP_COMP_AGGREGATE_NOP, +enum ucg_builtin_op_step_comp_aggregation { + UCG_BUILTIN_OP_STEP_COMP_AGGREGATE_NOP = 0, /* Aggregation of short (Active-)messages */ UCG_BUILTIN_OP_STEP_COMP_AGGREGATE_WRITE, - UCG_BUILTIN_OP_STEP_COMP_AGGREGATE_WRITE_UNPACKED, - UCG_BUILTIN_OP_STEP_COMP_AGGREGATE_GATHER_TERMINAL, - UCG_BUILTIN_OP_STEP_COMP_AGGREGATE_GATHER_WAYPOINT, - UCG_BUILTIN_OP_STEP_COMP_AGGREGATE_REDUCE_SINGLE, - UCG_BUILTIN_OP_STEP_COMP_AGGREGATE_REDUCE_BATCHED, - UCG_BUILTIN_OP_STEP_COMP_AGGREGATE_REDUCE_FRAGMENT, + UCG_BUILTIN_OP_STEP_COMP_AGGREGATE_GATHER, + UCG_BUILTIN_OP_STEP_COMP_AGGREGATE_REDUCE, /* Unpacking remote memory keys (for Rendezvous protocol) */ UCG_BUILTIN_OP_STEP_COMP_AGGREGATE_REMOTE_KEY -} UCS_S_PACKED; +}; /* Note: only 3 bits are allocated for this field in ucg_builtin_op_step_t */ + +enum ucg_builtin_op_step_comp_flags { + UCG_BUILTIN_OP_STEP_COMP_FLAG_BATCHED_DATA = UCS_BIT(0), + UCG_BUILTIN_OP_STEP_COMP_FLAG_FRAGMENTED_DATA = UCS_BIT(1), + UCG_BUILTIN_OP_STEP_COMP_FLAG_PACKED_LENGTH = UCS_BIT(2), + UCG_BUILTIN_OP_STEP_COMP_FLAG_PACKED_DATATYPE = UCS_BIT(3) +}; /* Note: only 4 bits are allocated for this field in ucg_builtin_op_step_t */ enum ucg_builtin_op_step_comp_criteria { - UCG_BUILTIN_OP_STEP_COMP_CRITERIA_SEND, + UCG_BUILTIN_OP_STEP_COMP_CRITERIA_SEND = 0, UCG_BUILTIN_OP_STEP_COMP_CRITERIA_SINGLE_MESSAGE, UCG_BUILTIN_OP_STEP_COMP_CRITERIA_MULTIPLE_MESSAGES, UCG_BUILTIN_OP_STEP_COMP_CRITERIA_MULTIPLE_MESSAGES_ZCOPY, UCG_BUILTIN_OP_STEP_COMP_CRITERIA_BY_FRAGMENT_OFFSET -} UCS_S_PACKED; +}; /* Note: only 3 bits are allocated for this field in ucg_builtin_op_step_t */ enum ucg_builtin_op_step_comp_action { - UCG_BUILTIN_OP_STEP_COMP_OP, + UCG_BUILTIN_OP_STEP_COMP_OP = 0, UCG_BUILTIN_OP_STEP_COMP_STEP, UCG_BUILTIN_OP_STEP_COMP_SEND -} UCS_S_PACKED; +}; /* Note: only 2 bits are allocated for this field in ucg_builtin_op_step_t */ /* Definitions of several callback functions, used during an operation */ typedef struct ucg_builtin_op ucg_builtin_op_t; typedef struct ucg_builtin_request ucg_builtin_request_t; typedef void (*ucg_builtin_op_init_cb_t) (ucg_builtin_op_t *op, ucg_coll_id_t coll_id); -typedef void (*ucg_builtin_op_fini_cb_t) (ucg_builtin_op_t *op, - ucg_request_t *user_req, - ucs_status_t status); +typedef void (*ucg_builtin_op_fini_cb_t) (ucg_builtin_op_t *op); typedef ucs_status_t (*ucg_builtin_op_optm_cb_t) (ucg_builtin_op_t *op); typedef struct ucg_builtin_zcomp { @@ -140,38 +143,40 @@ typedef struct ucg_builtin_zcomp { } ucg_builtin_zcomp_t; typedef struct ucg_builtin_op_step { - uint32_t flags; /* @ref ucg_builtin_op_step_flags */ + enum ucg_builtin_op_step_flags flags :20; + enum ucg_builtin_op_step_comp_flags comp_flags :4; + enum ucg_builtin_op_step_comp_aggregation comp_aggregation :3; + enum ucg_builtin_op_step_comp_criteria comp_criteria :3; + enum ucg_builtin_op_step_comp_action comp_action :2; + + /* --- 4 bytes --- */ + uint8_t iter_ep; /* iterator, somewhat volatile */ #define UCG_BUILTIN_OFFSET_PIPELINE_READY ((ucg_offset_t)-1) #define UCG_BUILTIN_OFFSET_PIPELINE_PENDING ((ucg_offset_t)-2) /* TODO: consider modifying "send_buffer" and removing iter_offset */ - /* These values determine the behavior of the incoming message handler */ - enum ucg_builtin_op_step_comp_aggregate comp_aggregation; - enum ucg_builtin_op_step_comp_criteria comp_criteria; - enum ucg_builtin_op_step_comp_action comp_action; - /* --- 8 bytes --- */ - - uint8_t uct_flags; /* e.g. UCT_SEND_FLAG_PACK_LOCK */ uint8_t am_id; uint8_t ep_cnt; uint8_t batch_cnt; - ucg_offset_t iter_offset; /* iterator, somewhat volatile */ - - /* --- 16 bytes --- */ + /* --- 8 bytes --- */ ucg_builtin_plan_phase_t *phase; int8_t *send_buffer; union { size_t buffer_length; - uint64_t unpack_count; + size_t dtype_length; }; + + /* --- 32 bytes --- */ + ucg_builtin_header_t am_header; uct_iface_h uct_iface; - uint32_t fragments_total; /* != 1 for fragmented operations */ + uint64_t fragments_total; /* != 1 for fragmented operations */ uint32_t fragment_length; /* only for fragmented operations */ + ucg_offset_t iter_offset; /* iterator, somewhat volatile */ /* --- 64 bytes --- */ @@ -193,8 +198,6 @@ typedef struct ucg_builtin_op_step { uct_pack_callback_t pack_full_cb; uct_pack_callback_t pack_part_cb; uct_pack_callback_t pack_single_cb; - ucp_dt_state_t pack_state; - ucp_datatype_t datatype; } bcopy; struct { uct_mem_h memh; /* Data buffer memory handle */ @@ -204,7 +207,7 @@ typedef struct ucg_builtin_op_step { uct_rkey_bundle_t rkey; /* remote key (from previous step) */ } zcopy; }; -} UCS_S_PACKED UCS_V_ALIGNED(sizeof(void*)) ucg_builtin_op_step_t; +} UCS_S_PACKED UCS_V_ALIGNED(UCS_SYS_CACHE_LINE_SIZE) ucg_builtin_op_step_t; typedef struct ucg_builtin_comp_slot ucg_builtin_comp_slot_t; struct ucg_builtin_op { @@ -213,6 +216,10 @@ struct ucg_builtin_op { ucg_builtin_op_optm_cb_t optm_cb; /**< optimization function for the operation */ ucg_builtin_op_init_cb_t init_cb; /**< Initialization function for the operation */ ucg_builtin_op_fini_cb_t fini_cb; /**< Finalization function for the operation */ + ucp_datatype_t send_dt; /**< Generic send datatype (if non-contig) */ + ucp_datatype_t recv_dt; /**< Generic receive datatype (if non-contig) */ + ucp_dt_state_t rstate; /**< read (send) state - for datatype packing */ + ucp_dt_state_t wstate; /**< write (recieve) state - for datatype unpacking */ ucg_builtin_comp_slot_t *slots; /**< slots pointer, for faster initialization */ ucg_builtin_op_step_t steps[]; /**< steps required to complete the operation */ } UCS_V_ALIGNED(UCS_SYS_CACHE_LINE_SIZE); @@ -223,7 +230,7 @@ struct ucg_builtin_op { */ struct ucg_builtin_request { volatile uint32_t pending; /**< number of step's pending messages */ - ucg_builtin_header_step_t latest; /**< request iterator, mostly here for + ucg_builtin_header_step_t expecting; /**< request iterator, mostly here for alignment reasons with slot structs */ ucg_builtin_op_step_t *step; /**< indicator of current step within the op */ ucg_builtin_op_t *op; /**< operation currently running */ @@ -250,6 +257,9 @@ ucs_status_t ucg_builtin_step_create(ucg_builtin_plan_t *plan, enum ucg_builtin_op_step_flags *flags, const ucg_collective_params_t *params, int8_t **current_data_buffer, + int is_send_dt_contig, + int is_recv_dt_contig, + size_t send_dt_len, ucg_builtin_op_init_cb_t *init_cb, ucg_builtin_op_fini_cb_t *fini_cb, ucg_builtin_op_step_t *step, @@ -269,8 +279,10 @@ ucs_status_t ucg_builtin_op_select_callback(ucg_builtin_plan_t *plan, ucs_status_t ucg_builtin_op_consider_optimization(ucg_builtin_op_t *op, ucg_builtin_config_t *config); -void ucg_builtin_step_select_packers(const ucg_collective_params_t *params, - ucg_builtin_op_step_t *step); +ucs_status_t ucg_builtin_step_select_packers(const ucg_collective_params_t *params, + size_t send_dt_len, + int is_send_dt_contig, + ucg_builtin_op_step_t *step); ucs_status_t ucg_builtin_am_handler(void *worker, void *data, size_t length, unsigned am_flags); diff --git a/builtin/ops/builtin_pack.c b/builtin/ops/builtin_pack.c index 9b4cf5d..502dd85 100644 --- a/builtin/ops/builtin_pack.c +++ b/builtin/ops/builtin_pack.c @@ -4,6 +4,7 @@ */ #include "builtin_ops.h" +#include "builtin_comp_step.inl" #include @@ -12,9 +13,19 @@ #endif int ucg_builtin_atomic_reduce_full(ucg_builtin_request_t *req, - void *src, void *dst, size_t length); + void *src, void *dst, size_t length) +{ + ucg_builtin_mpi_reduce_single(dst, src, &req->op->super.params); + return length; +} + int ucg_builtin_atomic_reduce_part(ucg_builtin_request_t *req, - void *src, void *dst, size_t length); + void *src, void *dst, size_t length) +{ + ucg_builtin_mpi_reduce_fragment(dst, src, length, req->step->dtype_length, + &req->op->super.params); + return length; +} #define UCG_BUILTIN_PACK_CB(_offset, _length) { \ ucg_builtin_header_t *header = (ucg_builtin_header_t*)dest; \ @@ -139,16 +150,15 @@ UCG_BUILTIN_ATOMIC_MULTIPLE_PACK_CB(64) #define UCG_BUILTIN_DATATYPE_PACK_CB(_offset, _length) { \ ucg_builtin_header_t *header = (ucg_builtin_header_t*)dest; \ ucg_builtin_request_t *req = (ucg_builtin_request_t*)arg; \ + ucg_builtin_op_t *op = req->op; \ + ucp_dt_generic_t *dt_gen = ucp_dt_to_generic(op->send_dt); \ + void *dt_state = op->rstate.dt.generic.state; \ ucg_builtin_op_step_t *step = req->step; \ size_t buffer_length = (_length); \ header->header = step->am_header.header; \ ucs_assert(((uintptr_t)arg & UCT_PACK_CALLBACK_REDUCE) == 0); \ - /* Note: worker is NULL since it isn't required for host-based memory */ \ - return sizeof(*header) + ucp_dt_pack(NULL, step->bcopy.datatype, \ - UCS_MEMORY_TYPE_HOST, header + 1, \ - step->send_buffer + (_offset), \ - &step->bcopy.pack_state, \ - buffer_length); \ + dt_gen->ops.pack(dt_state, (_offset), header + 1, buffer_length); \ + return sizeof(*header) + buffer_length; \ } UCG_BUILTIN_PACKER_DECLARE(_datatype_, single) diff --git a/builtin/plan/builtin_bruck.c b/builtin/plan/builtin_bruck.c index 72a725d..dc4bbb8 100644 --- a/builtin/plan/builtin_bruck.c +++ b/builtin/plan/builtin_bruck.c @@ -51,7 +51,7 @@ ucs_status_t ucg_builtin_bruck_create(ucg_builtin_group_ctx_t *ctx, { ucg_group_member_index_t peer_index = (my_index + step_size) % proc_count; ucs_status_t status = ucg_builtin_single_connection_phase(ctx, - peer_index, step_idx, phase_method, 0, phase, is_mock); + peer_index, step_idx + 1, phase_method, 0, phase, is_mock); if (status != UCS_OK) { return status; } diff --git a/builtin/plan/builtin_tree.c b/builtin/plan/builtin_tree.c index c8ddad0..2bb43a8 100644 --- a/builtin/plan/builtin_tree.c +++ b/builtin/plan/builtin_tree.c @@ -45,17 +45,9 @@ static inline ucs_status_t ucg_builtin_tree_connect_phase(ucg_builtin_plan_phase flagless_retry: peer = peers[0]; #ifdef HAVE_UCT_COLLECTIVES - if (coll_flags & (UCG_PLAN_CONNECT_FLAG_WANT_INCAST | - UCG_PLAN_CONNECT_FLAG_WANT_BCAST)) { - /* For some methods, e.g. REDUCE_TERMINAL, the peer is the message - * source and not the destination, so we need to switch to "loopback" */ - if (((coll_flags & UCG_PLAN_CONNECT_FLAG_WANT_INCAST) && - (method != UCG_PLAN_METHOD_SEND_TO_SM_ROOT) && - (method != UCG_PLAN_METHOD_SEND_TERMINAL)) || - ((coll_flags & UCG_PLAN_CONNECT_FLAG_WANT_BCAST) && - (method != UCG_PLAN_METHOD_RECV_TERMINAL))) { - peer = params->root; /* Should be myself... TODO: validate! */ - } + if ((peer_cnt > 1) && (coll_flags & (UCG_PLAN_CONNECT_FLAG_WANT_INCAST | + UCG_PLAN_CONNECT_FLAG_WANT_BCAST))) { + peer = params->group_params->member_index; } #endif @@ -69,7 +61,7 @@ static inline ucs_status_t ucg_builtin_tree_connect_phase(ucg_builtin_plan_phase } #endif - if ((status != UCS_ERR_UNREACHABLE) || !coll_flags) { + if ((status != UCS_ERR_UNREACHABLE) || (peer_cnt == 1)) { return status; } else if (coll_flags) { /* retry - without the flags */ @@ -195,8 +187,8 @@ ucs_status_t ucg_builtin_tree_connect(ucg_builtin_plan_t *tree, /* If I have a host-level or network-level parent (it was added to * the end of the host-array) - now is the time to discard it by * decrementing the counter for each host-array */ - if (net_down_cnt) net_down_cnt--; - if (host_down_cnt) host_down_cnt--; + if (net_up_cnt) net_down_cnt--; + if (host_up_cnt) host_down_cnt--; /* conditional break */ case UCG_PLAN_TREE_FANOUT: /* Create a phase for inter-node communication ("up the tree") */