Skip to content

Commit

Permalink
UCG: support for non-contiguous datatypes
Browse files Browse the repository at this point in the history
Signed-off-by: Alex Margolin <[email protected]>
  • Loading branch information
alex--m committed Oct 22, 2020
2 parents f67cd01 + dea6d48 commit ce03e43
Show file tree
Hide file tree
Showing 6 changed files with 82 additions and 23 deletions.
2 changes: 1 addition & 1 deletion api/ucg.h
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ enum ucg_collective_modifiers {

/* Buffer/Data Management Considerations */
UCG_GROUP_COLLECTIVE_MODIFIER_AGGREGATE_STABLE = UCS_BIT( 8), /* stable reduction */
UCG_GROUP_COLLECTIVE_MODIFIER_IN_PLACE = UCS_BIT( 9), /* otherwise two buffers */
UCG_GROUP_COLLECTIVE_MODIFIER_NONCONTIG_DATATYPE = UCS_BIT( 9), /* some may be non-contiguous */
UCG_GROUP_COLLECTIVE_MODIFIER_PERSISTENT = UCS_BIT(10), /* otherwise destroy coll_h */
UCG_GROUP_COLLECTIVE_MODIFIER_SYMMETRIC = UCS_BIT(11), /* persistent on all ranks */
UCG_GROUP_COLLECTIVE_MODIFIER_BARRIER = UCS_BIT(12), /* prevent others from starting */
Expand Down
4 changes: 4 additions & 0 deletions builtin/builtin.c
Original file line number Diff line number Diff line change
Expand Up @@ -575,6 +575,10 @@ void ucg_builtin_print_flags(ucg_builtin_op_step_t *step)
printf("write");
break;

case UCG_BUILTIN_OP_STEP_COMP_AGGREGATE_WRITE_UNPACKED:
printf("write (unpacked)");
break;

case UCG_BUILTIN_OP_STEP_COMP_AGGREGATE_GATHER_TERMINAL:
printf("gather (terminal)");
break;
Expand Down
10 changes: 9 additions & 1 deletion builtin/ops/builtin_comp_step.inl
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ static void ucg_builtin_comp_gather(uint8_t *recv_buffer, uint8_t *data,
memcpy(recv_buffer + first_part + length, data, length - first_part);
}

static UCS_F_ALWAYS_INLINE void
static UCS_F_ALWAYS_INLINE ucs_status_t
ucg_builtin_comp_unpack_rkey(ucg_builtin_request_t *req, uint64_t remote_addr,
uint8_t *packed_remote_key)
{
Expand All @@ -168,6 +168,8 @@ ucg_builtin_comp_unpack_rkey(ucg_builtin_request_t *req, uint64_t remote_addr,
if (ucs_unlikely(status != UCS_OK)) {
ucg_builtin_comp_last_step_cb(req, status);
}

return status;
}

static int UCS_F_ALWAYS_INLINE
Expand Down Expand Up @@ -210,6 +212,12 @@ ucg_builtin_step_recv_handle_data(ucg_builtin_request_t *req, uint64_t offset,
memcpy(step->recv_buffer + offset, data, length);
break;

case UCG_BUILTIN_OP_STEP_COMP_AGGREGATE_WRITE_UNPACKED:
/* Note: worker is NULL since it isn't required for host-based memory */
ucp_dt_unpack_only(NULL, step->recv_buffer + offset, step->unpack_count,
step->bcopy.datatype, UCS_MEMORY_TYPE_HOST, data, length, 0);
break;

case UCG_BUILTIN_OP_STEP_COMP_AGGREGATE_GATHER_TERMINAL:
ucg_builtin_comp_gather(step->recv_buffer, data, offset,
step->buffer_length, length,
Expand Down
50 changes: 32 additions & 18 deletions builtin/ops/builtin_control.c
Original file line number Diff line number Diff line change
Expand Up @@ -558,23 +558,29 @@ void ucg_builtin_step_select_packers(const ucg_collective_params_t *params,
}
}

int is_variadic = (params->type.modifiers &
UCG_GROUP_COLLECTIVE_MODIFIER_VARIADIC);

step->bcopy.pack_full_cb = is_reduce ?
UCG_BUILTIN_PACKER_NAME(_reducing_, full) :
is_variadic ? UCG_BUILTIN_PACKER_NAME(_variadic_, full) :
UCG_BUILTIN_PACKER_NAME(_, full);

step->bcopy.pack_part_cb = is_reduce ?
UCG_BUILTIN_PACKER_NAME(_reducing_, part) :
is_variadic ? UCG_BUILTIN_PACKER_NAME(_variadic_, part) :
UCG_BUILTIN_PACKER_NAME(_, part);

step->bcopy.pack_single_cb = is_reduce ?
UCG_BUILTIN_PACKER_NAME(_reducing_, single) :
is_variadic ? UCG_BUILTIN_PACKER_NAME(_variadic_, single) :
UCG_BUILTIN_PACKER_NAME(_, single);
int is_variadic = (params->type.modifiers &
UCG_GROUP_COLLECTIVE_MODIFIER_VARIADIC);
int is_src_contig = ((ucg_global_params.type_info.mpi_is_contig_f != NULL) &&
(ucg_global_params.type_info.mpi_is_contig_f(params->send.dt_ext,
params->send.count)));

if (is_reduce) {
step->bcopy.pack_full_cb = UCG_BUILTIN_PACKER_NAME(_reducing_, full);
step->bcopy.pack_part_cb = UCG_BUILTIN_PACKER_NAME(_reducing_, part);
step->bcopy.pack_single_cb = UCG_BUILTIN_PACKER_NAME(_reducing_, single);
} else if (is_variadic) {
step->bcopy.pack_full_cb = UCG_BUILTIN_PACKER_NAME(_variadic_, full);
step->bcopy.pack_part_cb = UCG_BUILTIN_PACKER_NAME(_variadic_, part);
step->bcopy.pack_single_cb = UCG_BUILTIN_PACKER_NAME(_variadic_, single);
} else if (!is_src_contig) {
step->bcopy.pack_full_cb = UCG_BUILTIN_PACKER_NAME(_datatype_, full);
step->bcopy.pack_part_cb = UCG_BUILTIN_PACKER_NAME(_datatype_, part);
step->bcopy.pack_single_cb = UCG_BUILTIN_PACKER_NAME(_datatype_, single);
} else {
step->bcopy.pack_full_cb = UCG_BUILTIN_PACKER_NAME(_, full);
step->bcopy.pack_part_cb = UCG_BUILTIN_PACKER_NAME(_, part);
step->bcopy.pack_single_cb = UCG_BUILTIN_PACKER_NAME(_, single);
}
}

#define UCG_BUILTIN_STEP_RECV_FLAGS (UCG_BUILTIN_OP_STEP_FLAG_RECV_AFTER_SEND |\
Expand Down Expand Up @@ -880,6 +886,10 @@ ucs_status_t ucg_builtin_step_create(ucg_builtin_plan_t *plan,
}

/* Choose the data-related action to be taken by the incoming AM handler */
int is_dst_contig = ((ucg_global_params.type_info.mpi_is_contig_f != NULL) &&
(ucg_global_params.type_info.mpi_is_contig_f(params->recv.dt_ext,
params->recv.count)));

#ifdef HAVE_UCT_COLLECTIVES
int is_incast_used = (phase->iface_attr->cap.flags & UCT_IFACE_FLAG_INCAST) != 0;
#else
Expand All @@ -902,8 +912,12 @@ ucs_status_t ucg_builtin_step_create(ucg_builtin_plan_t *plan,
} else {
step->comp_aggregation = UCG_BUILTIN_OP_STEP_COMP_AGGREGATE_REDUCE_SINGLE;
}
} else {
} else if (is_dst_contig) {
step->comp_aggregation = UCG_BUILTIN_OP_STEP_COMP_AGGREGATE_WRITE;
} else {
step->comp_aggregation = UCG_BUILTIN_OP_STEP_COMP_AGGREGATE_WRITE_UNPACKED;
/* If we pack/unpack - we care more about the count than the length */
step->unpack_count = params->recv.count;
}

/* Choose a completion criteria to be checked by the incoming AM handler */
Expand Down
13 changes: 12 additions & 1 deletion builtin/ops/builtin_ops.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
#endif

#include "../plan/builtin_plan.h"

#include <ucp/dt/dt.h>
#include <ucp/core/ucp_request.h>
#include <ucs/datastruct/ptr_array.h>

Expand Down Expand Up @@ -97,6 +99,7 @@ enum ucg_builtin_op_step_comp_aggregate {

/* Aggregation of short (Active-)messages */
UCG_BUILTIN_OP_STEP_COMP_AGGREGATE_WRITE,
UCG_BUILTIN_OP_STEP_COMP_AGGREGATE_WRITE_UNPACKED,
UCG_BUILTIN_OP_STEP_COMP_AGGREGATE_GATHER_TERMINAL,
UCG_BUILTIN_OP_STEP_COMP_AGGREGATE_GATHER_WAYPOINT,
UCG_BUILTIN_OP_STEP_COMP_AGGREGATE_REDUCE_SINGLE,
Expand Down Expand Up @@ -160,7 +163,10 @@ typedef struct ucg_builtin_op_step {

ucg_builtin_plan_phase_t *phase;
int8_t *send_buffer;
size_t buffer_length;
union {
size_t buffer_length;
uint64_t unpack_count;
};
ucg_builtin_header_t am_header;
uct_iface_h uct_iface;

Expand All @@ -187,6 +193,8 @@ typedef struct ucg_builtin_op_step {
uct_pack_callback_t pack_full_cb;
uct_pack_callback_t pack_part_cb;
uct_pack_callback_t pack_single_cb;
ucp_dt_state_t pack_state;
ucp_datatype_t datatype;
} bcopy;
struct {
uct_mem_h memh; /* Data buffer memory handle */
Expand Down Expand Up @@ -307,6 +315,9 @@ UCG_BUILTIN_PACKER_DECLARE(_reducing_, part);
UCG_BUILTIN_PACKER_DECLARE(_variadic_, single);
UCG_BUILTIN_PACKER_DECLARE(_variadic_, full);
UCG_BUILTIN_PACKER_DECLARE(_variadic_, part);
UCG_BUILTIN_PACKER_DECLARE(_datatype_, single);
UCG_BUILTIN_PACKER_DECLARE(_datatype_, full);
UCG_BUILTIN_PACKER_DECLARE(_datatype_, part);
UCG_BUILTIN_PACKER_DECLARE(_atomic_single_, 8);
UCG_BUILTIN_PACKER_DECLARE(_atomic_single_, 16);
UCG_BUILTIN_PACKER_DECLARE(_atomic_single_, 32);
Expand Down
26 changes: 24 additions & 2 deletions builtin/ops/builtin_pack.c
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@ UCG_BUILTIN_VARIADIC_PACK_CB(step->iter_offset, step->buffer_length -
uint##_integer_bits##_t *ptr = (uint##_integer_bits##_t *)(header + 1); \
ucs_atomic_add##_integer_bits (ptr, \
*(uint##_integer_bits##_t *)step->send_buffer); \
printf("UCG_BUILTIN_ATOMIC_SINGLE_PACK_CB\n");\
return sizeof(uint##_integer_bits##_t); \
}

Expand All @@ -110,7 +109,6 @@ printf("UCG_BUILTIN_ATOMIC_SINGLE_PACK_CB\n");\
ucs_atomic_add##_integer_bits (ptr, \
*(uint##_integer_bits##_t *)step->send_buffer); \
} \
printf("UCG_BUILTIN_ATOMIC_MULTIPLE_PACK_CB\n");\
return length; \
}

Expand All @@ -137,3 +135,27 @@ UCG_BUILTIN_ATOMIC_SINGLE_PACK_CB(64)

UCG_BUILTIN_PACKER_DECLARE(_atomic_multiple_, 64)
UCG_BUILTIN_ATOMIC_MULTIPLE_PACK_CB(64)

#define UCG_BUILTIN_DATATYPE_PACK_CB(_offset, _length) { \
ucg_builtin_header_t *header = (ucg_builtin_header_t*)dest; \
ucg_builtin_request_t *req = (ucg_builtin_request_t*)arg; \
ucg_builtin_op_step_t *step = req->step; \
size_t buffer_length = (_length); \
header->header = step->am_header.header; \
ucs_assert(((uintptr_t)arg & UCT_PACK_CALLBACK_REDUCE) == 0); \
/* Note: worker is NULL since it isn't required for host-based memory */ \
return sizeof(*header) + ucp_dt_pack(NULL, step->bcopy.datatype, \
UCS_MEMORY_TYPE_HOST, header + 1, \
step->send_buffer + (_offset), \
&step->bcopy.pack_state, \
buffer_length); \
}

UCG_BUILTIN_PACKER_DECLARE(_datatype_, single)
UCG_BUILTIN_DATATYPE_PACK_CB(0, step->buffer_length)

UCG_BUILTIN_PACKER_DECLARE(_datatype_, full)
UCG_BUILTIN_DATATYPE_PACK_CB(step->iter_offset, step->fragment_length)

UCG_BUILTIN_PACKER_DECLARE(_datatype_, part)
UCG_BUILTIN_DATATYPE_PACK_CB(step->iter_offset, step->buffer_length - step->iter_offset)

0 comments on commit ce03e43

Please sign in to comment.