Skip to content

Commit

Permalink
TL/CUDA: reduce scatter linear (openucx#669)
Browse files Browse the repository at this point in the history
* TL/CUDA: reduce scatter linear

* REVIEW: fix review comments

* REVIEW: fix review comments

* fixed algorithm description
* fixed datatype in reduce scatterv
* added check for average op
  • Loading branch information
Sergei-Lebedev authored Dec 7, 2022
1 parent b24612b commit e2f0e51
Show file tree
Hide file tree
Showing 17 changed files with 796 additions and 131 deletions.
41 changes: 22 additions & 19 deletions src/components/tl/cuda/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,6 @@
#

if TL_CUDA_ENABLED
alltoall = \
alltoall/alltoall.h \
alltoall/alltoall.c \
alltoall/alltoall_ce.c

alltoallv = \
alltoallv/alltoallv.h \
alltoallv/alltoallv.c \
alltoallv/alltoallv_ce.c

allgather = \
allgather/allgather.h \
Expand All @@ -26,15 +17,27 @@ allgatherv = \
allgatherv/allgatherv_ring.c \
allgatherv/allgatherv_linear.c

reduce_scatter = \
reduce_scatter/reduce_scatter.h \
reduce_scatter/reduce_scatter.c \
reduce_scatter/reduce_scatter_ring.c
alltoall = \
alltoall/alltoall.h \
alltoall/alltoall.c \
alltoall/alltoall_ce.c

alltoallv = \
alltoallv/alltoallv.h \
alltoallv/alltoallv.c \
alltoallv/alltoallv_ce.c

reduce_scatterv = \
reduce_scatterv/reduce_scatterv.h \
reduce_scatterv/reduce_scatterv.c \
reduce_scatterv/reduce_scatterv_ring.c
reduce_scatter = \
reduce_scatter/reduce_scatter.h \
reduce_scatter/reduce_scatter.c \
reduce_scatter/reduce_scatter_ring.c \
reduce_scatter/reduce_scatter_linear.c

reduce_scatterv = \
reduce_scatterv/reduce_scatterv.h \
reduce_scatterv/reduce_scatterv.c \
reduce_scatterv/reduce_scatterv_ring.c \
reduce_scatterv/reduce_scatterv_linear.c


sources = \
Expand All @@ -47,10 +50,10 @@ sources = \
tl_cuda_cache.c \
tl_cuda_topo.c \
tl_cuda_team_topo.c \
$(alltoall) \
$(alltoallv) \
$(allgather) \
$(allgatherv) \
$(alltoall) \
$(alltoallv) \
$(reduce_scatter) \
$(reduce_scatterv)

Expand Down
6 changes: 5 additions & 1 deletion src/components/tl/cuda/allgather/allgather_linear.c
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright (C) Mellanox Technologies Ltd. 2022. ALL RIGHTS RESERVED.
* Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
*
* See file LICENSE for terms.
*/
Expand All @@ -20,6 +20,10 @@ ucc_status_t ucc_tl_cuda_allgather_linear_init(ucc_base_coll_args_t *coll_args,
return status;
}

if (ucc_unlikely(!task)) {
return UCC_ERR_NO_MEMORY;
}

task->allgatherv_linear.get_count = ucc_tl_cuda_allgather_get_count;
task->allgatherv_linear.get_offset = ucc_tl_cuda_allgather_get_offset;
task->allgatherv_linear.dt = coll_args->args.dst.info.datatype;
Expand Down
2 changes: 1 addition & 1 deletion src/components/tl/cuda/allgatherv/allgatherv_linear.c
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
*
* Algorithm
* for rank R
* step 1: copy fragR_1 to remote scratch buffers for all ranks
* step 0: copy fragR_1 to remote scratch buffers for all ranks
* if not inplace copy local src buffer to local dst buffer
*
* step 1: copy frag1_1, frag2_1, ..., fragN_1 from local scratch buffer
Expand Down
62 changes: 33 additions & 29 deletions src/components/tl/cuda/reduce_scatter/reduce_scatter.c
Original file line number Diff line number Diff line change
Expand Up @@ -7,40 +7,44 @@
#include "reduce_scatter.h"
#include "components/mc/ucc_mc.h"

ucc_status_t ucc_tl_cuda_reduce_scatter_ring_init(ucc_tl_cuda_task_t *task);

ucc_status_t ucc_tl_cuda_reduce_scatter_ring_start(ucc_coll_task_t *task);
ucc_base_coll_alg_info_t
ucc_tl_cuda_reduce_scatter_algs[UCC_TL_CUDA_REDUCE_SCATTER_ALG_LAST + 1] = {
[UCC_TL_CUDA_REDUCE_SCATTER_ALG_AUTO] =
{.id = UCC_TL_CUDA_REDUCE_SCATTER_ALG_AUTO,
.name = "auto",
.desc = "choose reduce scatter algorithm based on CUDA topology"},
[UCC_TL_CUDA_REDUCE_SCATTER_ALG_RING] =
{.id = UCC_TL_CUDA_REDUCE_SCATTER_ALG_RING,
.name = "ring",
.desc = "multiring reduce scatter algorithm"},
[UCC_TL_CUDA_REDUCE_SCATTER_ALG_LINEAR] =
{.id = UCC_TL_CUDA_REDUCE_SCATTER_ALG_LINEAR,
.name = "linear",
.desc = "linear reduce scatter algorithm"},
[UCC_TL_CUDA_REDUCE_SCATTER_ALG_LAST] = {
.id = 0, .name = NULL, .desc = NULL}};

size_t ucc_tl_cuda_reduce_scatter_get_count(const ucc_tl_cuda_task_t *task,
ucc_rank_t block) //NOLINT
{
const ucc_coll_args_t *args = &TASK_ARGS(task);
size_t count = args->dst.info.count;

ucc_status_t ucc_tl_cuda_reduce_scatter_ring_progress(ucc_coll_task_t *task);
if (UCC_IS_INPLACE(*args)) {
count = args->dst.info.count / UCC_TL_TEAM_SIZE(TASK_TEAM(task));
}
return count;
}

ucc_status_t ucc_tl_cuda_reduce_scatter_ring_finalize(ucc_coll_task_t *task);
size_t ucc_tl_cuda_reduce_scatter_get_offset(const ucc_tl_cuda_task_t *task,
ucc_rank_t block)
{
return ucc_tl_cuda_reduce_scatter_get_count(task, block) * block;
}

ucc_status_t ucc_tl_cuda_reduce_scatter_init(ucc_base_coll_args_t *coll_args,
ucc_base_team_t *tl_team,
ucc_coll_task_t **task_p)
{
ucc_tl_cuda_team_t *team = ucc_derived_of(tl_team, ucc_tl_cuda_team_t);
ucc_tl_cuda_task_t *task;
ucc_status_t status;

if (coll_args->args.op == UCC_OP_AVG) {
return UCC_ERR_NOT_SUPPORTED;
}

status = ucc_tl_cuda_task_init(coll_args, team, &task);
if (ucc_unlikely(status != UCC_OK)) {
return status;
}

status = ucc_tl_cuda_reduce_scatter_ring_init(task);
if (ucc_unlikely(status != UCC_OK)) {
goto free_task;
}

*task_p = &task->super;
return UCC_OK;

free_task:
ucc_tl_cuda_task_put(task);
return status;
return ucc_tl_cuda_reduce_scatter_ring_init(coll_args, tl_team, task_p);
}
38 changes: 38 additions & 0 deletions src/components/tl/cuda/reduce_scatter/reduce_scatter.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,46 @@
#include "tl_cuda.h"
#include "tl_cuda_coll.h"

enum
{
UCC_TL_CUDA_REDUCE_SCATTER_ALG_AUTO,
UCC_TL_CUDA_REDUCE_SCATTER_ALG_RING,
UCC_TL_CUDA_REDUCE_SCATTER_ALG_LINEAR,
UCC_TL_CUDA_REDUCE_SCATTER_ALG_LAST
};

extern ucc_base_coll_alg_info_t
ucc_tl_cuda_reduce_scatter_algs[UCC_TL_CUDA_REDUCE_SCATTER_ALG_LAST + 1];

#define UCC_TL_CUDA_REDUCE_SCATTER_DEFAULT_ALG_SELECT_STR "reduce_scatter:cuda:@0"

size_t ucc_tl_cuda_reduce_scatter_get_count(const ucc_tl_cuda_task_t *task,
ucc_rank_t block);

size_t ucc_tl_cuda_reduce_scatter_get_offset(const ucc_tl_cuda_task_t *task,
ucc_rank_t block);

ucc_status_t ucc_tl_cuda_reduce_scatter_init(ucc_base_coll_args_t *coll_args,
ucc_base_team_t *tl_team,
ucc_coll_task_t **task_p);

ucc_status_t ucc_tl_cuda_reduce_scatter_ring_init(ucc_base_coll_args_t *coll_args,
ucc_base_team_t * tl_team,
ucc_coll_task_t ** task_p);

ucc_status_t ucc_tl_cuda_reduce_scatter_linear_init(ucc_base_coll_args_t *coll_args,
ucc_base_team_t * tl_team,
ucc_coll_task_t ** task_p);

static inline int ucc_tl_cuda_reduce_scatter_alg_from_str(const char *str)
{
int i;
for (i = 0; i < UCC_TL_CUDA_REDUCE_SCATTER_ALG_LAST; i++) {
if (0 == strcasecmp(str, ucc_tl_cuda_reduce_scatter_algs[i].name)) {
break;
}
}
return i;
}

#endif
42 changes: 42 additions & 0 deletions src/components/tl/cuda/reduce_scatter/reduce_scatter_linear.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/**
* Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
*
* See file LICENSE for terms.
*/

#include "reduce_scatterv/reduce_scatterv.h"
#include "reduce_scatter/reduce_scatter.h"

ucc_status_t ucc_tl_cuda_reduce_scatter_linear_init(ucc_base_coll_args_t *coll_args,
ucc_base_team_t * tl_team,
ucc_coll_task_t ** task_p)
{
ucc_tl_cuda_team_t *team = ucc_derived_of(tl_team, ucc_tl_cuda_team_t);
ucc_tl_cuda_task_t *task;
ucc_status_t status;

if (coll_args->args.op == UCC_OP_AVG) {
return UCC_ERR_NOT_SUPPORTED;
}

status = ucc_tl_cuda_task_init(coll_args, team, &task);
if (ucc_unlikely(status != UCC_OK)) {
return status;
}

task->reduce_scatterv_linear.get_count =
ucc_tl_cuda_reduce_scatter_get_count;
task->reduce_scatterv_linear.get_offset =
ucc_tl_cuda_reduce_scatter_get_offset;
task->reduce_scatterv_linear.dt = coll_args->args.dst.info.datatype;
task->reduce_scatterv_linear.rbuf = coll_args->args.dst.info.buffer;
task->super.flags |= UCC_COLL_TASK_FLAG_EXECUTOR;
task->super.post = ucc_tl_cuda_reduce_scatterv_linear_start;
task->super.triggered_post = ucc_triggered_post;
task->super.progress = ucc_tl_cuda_reduce_scatterv_linear_progress;
task->super.finalize = ucc_tl_cuda_reduce_scatterv_linear_finalize;
task->bar = TASK_BAR(task);

*task_p = &task->super;
return UCC_OK;
}
48 changes: 22 additions & 26 deletions src/components/tl/cuda/reduce_scatter/reduce_scatter_ring.c
Original file line number Diff line number Diff line change
Expand Up @@ -5,38 +5,33 @@
*/

#include "reduce_scatterv/reduce_scatterv.h"
#include "reduce_scatter/reduce_scatter.h"

size_t ucc_tl_cuda_reduce_scatter_ring_count(const ucc_tl_cuda_task_t *task,
ucc_rank_t block) //NOLINT
ucc_status_t ucc_tl_cuda_reduce_scatter_ring_init(ucc_base_coll_args_t *coll_args,
ucc_base_team_t * tl_team,
ucc_coll_task_t ** task_p)
{
const ucc_coll_args_t *args = &TASK_ARGS(task);
size_t count = args->dst.info.count;
ucc_tl_cuda_team_t *team = ucc_derived_of(tl_team, ucc_tl_cuda_team_t);
size_t ssize = UCC_TL_CUDA_TEAM_LIB(team)->cfg.scratch_size;
ucc_datatype_t dt = coll_args->args.dst.info.datatype;
ucc_tl_cuda_task_t *task;
size_t send_size, frag_size;
ucc_status_t status;

if (UCC_IS_INPLACE(*args)) {
count = args->dst.info.count / UCC_TL_TEAM_SIZE(TASK_TEAM(task));
if (coll_args->args.op == UCC_OP_AVG) {
return UCC_ERR_NOT_SUPPORTED;
}
return count;
}

size_t ucc_tl_cuda_reduce_scatter_ring_get_offset(const ucc_tl_cuda_task_t *task,
ucc_rank_t block)
{
return ucc_tl_cuda_reduce_scatter_ring_count(task, block) * block;
}

ucc_status_t ucc_tl_cuda_reduce_scatter_ring_init(ucc_tl_cuda_task_t *task)
{
ucc_tl_cuda_team_t *team = TASK_TEAM(task);
ucc_coll_args_t *args = &TASK_ARGS(task);
size_t ssize = UCC_TL_CUDA_TEAM_LIB(team)->cfg.scratch_size;
ucc_datatype_t dt = args->dst.info.datatype;
size_t send_size, frag_size;
status = ucc_tl_cuda_task_init(coll_args, team, &task);
if (ucc_unlikely(status != UCC_OK)) {
return status;
}

task->reduce_scatterv_ring.get_count = ucc_tl_cuda_reduce_scatter_ring_count;
task->reduce_scatterv_ring.get_offset = ucc_tl_cuda_reduce_scatter_ring_get_offset;
task->reduce_scatterv_ring.dt = args->dst.info.datatype;
task->reduce_scatterv_ring.sbuf = args->src.info.buffer;
task->reduce_scatterv_ring.rbuf = args->dst.info.buffer;
task->reduce_scatterv_ring.get_count = ucc_tl_cuda_reduce_scatter_get_count;
task->reduce_scatterv_ring.get_offset = ucc_tl_cuda_reduce_scatter_get_offset;
task->reduce_scatterv_ring.dt = coll_args->args.dst.info.datatype;
task->reduce_scatterv_ring.sbuf = coll_args->args.src.info.buffer;
task->reduce_scatterv_ring.rbuf = coll_args->args.dst.info.buffer;

send_size = task->reduce_scatterv_ring.get_count(task, 0);
frag_size = ucc_min(ssize / ucc_dt_size(dt) / 2, send_size);
Expand All @@ -49,5 +44,6 @@ ucc_status_t ucc_tl_cuda_reduce_scatter_ring_init(ucc_tl_cuda_task_t *task)
task->super.finalize = ucc_tl_cuda_reduce_scatterv_ring_finalize;
task->bar = TASK_BAR(task);

*task_p = &task->super;
return UCC_OK;
}
57 changes: 36 additions & 21 deletions src/components/tl/cuda/reduce_scatterv/reduce_scatterv.c
Original file line number Diff line number Diff line change
Expand Up @@ -7,32 +7,47 @@
#include "reduce_scatterv.h"
#include "components/mc/ucc_mc.h"

ucc_status_t ucc_tl_cuda_reduce_scatterv_init(ucc_base_coll_args_t *coll_args,
ucc_base_team_t *tl_team,
ucc_coll_task_t **task_p)
ucc_base_coll_alg_info_t
ucc_tl_cuda_reduce_scatterv_algs[UCC_TL_CUDA_REDUCE_SCATTERV_ALG_LAST + 1] = {
[UCC_TL_CUDA_REDUCE_SCATTERV_ALG_AUTO] =
{.id = UCC_TL_CUDA_REDUCE_SCATTERV_ALG_AUTO,
.name = "auto",
.desc = "choose reduce scatterv algorithm based on CUDA topology"},
[UCC_TL_CUDA_REDUCE_SCATTERV_ALG_RING] =
{.id = UCC_TL_CUDA_REDUCE_SCATTERV_ALG_RING,
.name = "ring",
.desc = "multiring reduce scatterv algorithm"},
[UCC_TL_CUDA_REDUCE_SCATTERV_ALG_LINEAR] =
{.id = UCC_TL_CUDA_REDUCE_SCATTERV_ALG_LINEAR,
.name = "linear",
.desc = "linear reduce scatterv algorithm"},
[UCC_TL_CUDA_REDUCE_SCATTERV_ALG_LAST] = {
.id = 0, .name = NULL, .desc = NULL}};

size_t ucc_tl_cuda_reduce_scatterv_get_count(const ucc_tl_cuda_task_t *task,
ucc_rank_t rank)
{
ucc_tl_cuda_team_t *team = ucc_derived_of(tl_team, ucc_tl_cuda_team_t);
ucc_tl_cuda_task_t *task;
ucc_status_t status;
const ucc_coll_args_t *args = &TASK_ARGS(task);

if (coll_args->args.op == UCC_OP_AVG) {
return UCC_ERR_NOT_SUPPORTED;
}
return ucc_coll_args_get_count(args, args->dst.info_v.counts, rank);
}

status = ucc_tl_cuda_task_init(coll_args, team, &task);
if (ucc_unlikely(status != UCC_OK)) {
return status;
}
size_t ucc_tl_cuda_reduce_scatterv_get_offset(const ucc_tl_cuda_task_t *task,
ucc_rank_t rank)
{
size_t offset = 0;
ucc_rank_t i;

status = ucc_tl_cuda_reduce_scatterv_ring_init(task);
if (ucc_unlikely(status != UCC_OK)) {
goto free_task;
for (i = 0; i < rank; i++) {
offset += ucc_tl_cuda_reduce_scatterv_get_count(task, i);
}

*task_p = &task->super;
return UCC_OK;
return offset;
}

free_task:
ucc_tl_cuda_task_put(task);
return status;
ucc_status_t ucc_tl_cuda_reduce_scatterv_init(ucc_base_coll_args_t *coll_args,
ucc_base_team_t *tl_team,
ucc_coll_task_t **task_p)
{
return ucc_tl_cuda_reduce_scatterv_ring_init(coll_args, tl_team, task_p);
}
Loading

0 comments on commit e2f0e51

Please sign in to comment.