Skip to content

Commit

Permalink
EC/CUDA: optimize reduce with unrolling (openucx#657)
Browse files Browse the repository at this point in the history
* EC/CUDA: optimize reduction with unrolling

* EC/CUDA: various opt on reduce kernel

* EC/CUDA: various opt on reduce strided kernel

* EC/CUDA: constant tuning and cleanup

* EC/CUDA: fix reduce_strided for large nbr of srcs

* CODESTYLE: clang-tidy cleanup

* EC/CUDA: add configurable nbr of threads

* EC/CUDA: fix error with new nvidia linter compiler

* EC/CUDA: fix minor revisions
  • Loading branch information
samnordmann authored Dec 27, 2022
1 parent 34adc11 commit e6d3919
Show file tree
Hide file tree
Showing 5 changed files with 270 additions and 136 deletions.
41 changes: 36 additions & 5 deletions src/components/ec/cuda/ec_cuda.c
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,12 @@ static ucc_config_field_t ucc_ec_cuda_config_table[] = {
UCC_CONFIG_TYPE_UINT},

{"EXEC_NUM_WORKERS", "1",
"Number of thread blocks to use for cuda executor",
"Number of thread blocks to use for cuda persistent executor",
ucc_offsetof(ucc_ec_cuda_config_t, exec_num_workers),
UCC_CONFIG_TYPE_ULUNITS},

{"EXEC_NUM_THREADS", "512",
"Number of thread per block to use for cuda executor",
"Number of threads per block to use for cuda persistent executor",
ucc_offsetof(ucc_ec_cuda_config_t, exec_num_threads),
UCC_CONFIG_TYPE_ULUNITS},

Expand All @@ -73,7 +73,13 @@ static ucc_config_field_t ucc_ec_cuda_config_table[] = {
ucc_offsetof(ucc_ec_cuda_config_t, reduce_num_blocks),
UCC_CONFIG_TYPE_ULUNITS},

{"USE_COOPERATIVE_LAUNCH", "1",
{"REDUCE_NUM_THREADS", "auto",
"Number of threads per block to use for reduction in interruptible "
"executor",
ucc_offsetof(ucc_ec_cuda_config_t, reduce_num_threads),
UCC_CONFIG_TYPE_ULUNITS},

{"USE_COOPERATIVE_LAUNCH", "0",
"whether to use cooperative launch in persistent kernel executor",
ucc_offsetof(ucc_ec_cuda_config_t, use_cooperative_launch),
UCC_CONFIG_TYPE_BOOL},
Expand Down Expand Up @@ -212,6 +218,27 @@ static ucc_status_t ucc_ec_cuda_post_driver_stream_task(uint32_t *status,
return UCC_OK;
}

static inline void ucc_ec_cuda_set_threads_nbr(int *nt, int maxThreadsPerBlock)
{
if (*nt != UCC_ULUNITS_AUTO) {
if (maxThreadsPerBlock < *nt) {
ec_warn(
&ucc_ec_cuda.super,
"number of threads per block is too large, max supported is %d",
maxThreadsPerBlock);
} else if ((*nt % WARP_SIZE) != 0) {
ec_warn(&ucc_ec_cuda.super,
"number of threads per block must be divisible by "
"WARP_SIZE(=%d)",
WARP_SIZE);
} else {
return;
}
}

*nt = (maxThreadsPerBlock / WARP_SIZE) * WARP_SIZE;
}

static ucc_status_t ucc_ec_cuda_init(const ucc_ec_params_t *ec_params)
{
ucc_ec_cuda_config_t *cfg = EC_CUDA_CONFIG;
Expand Down Expand Up @@ -239,12 +266,16 @@ static ucc_status_t ucc_ec_cuda_init(const ucc_ec_params_t *ec_params)
CUDA_CHECK(cudaGetDevice(&device));

CUDA_CHECK(cudaGetDeviceProperties(&prop, device));
cfg->reduce_num_threads = prop.maxThreadsPerBlock;

ucc_ec_cuda_set_threads_nbr((int *)&cfg->exec_num_threads,
prop.maxThreadsPerBlock);
ucc_ec_cuda_set_threads_nbr(&cfg->reduce_num_threads,
prop.maxThreadsPerBlock);

if (cfg->reduce_num_blocks != UCC_ULUNITS_AUTO) {
if (prop.maxGridSize[0] < cfg->reduce_num_blocks) {
ec_warn(&ucc_ec_cuda.super,
"number of blocks is too large, max supported %d",
"number of blocks is too large, max supported is %d",
prop.maxGridSize[0]);
cfg->reduce_num_blocks = prop.maxGridSize[0];
}
Expand Down
1 change: 1 addition & 0 deletions src/components/ec/cuda/ec_cuda.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include "utils/ucc_mpool.h"
#include <cuda_runtime.h>

#define WARP_SIZE 32
typedef enum ucc_ec_cuda_strm_task_mode {
UCC_EC_CUDA_TASK_KERNEL,
UCC_EC_CUDA_TASK_MEM_OPS,
Expand Down
87 changes: 65 additions & 22 deletions src/components/ec/cuda/kernel/ec_cuda_executor.cu
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,7 @@ extern "C" {
#include <cooperative_groups.h>
using namespace cooperative_groups;

#define WARP_SIZE 32
#define LOOP_UNROLL 8
#define align_pow2(_n, _p) ((_n) & ((_p) - 1))
typedef int4 vectype;

__global__ void executor_start(ucc_ec_cuda_executor_state_t *state,
int *cidx)
Expand Down Expand Up @@ -98,36 +95,82 @@ __device__ inline void add_float4(float4 &d, const float4 &x, const float4 &y)
d.w = x.w + y.w;
}

template <int UNROLL>
__device__ void executor_reduce_float_sum_aligned_2(const float *s1,
const float *s2, float *d,
size_t count)
{
const float4 *s14 = (const float4*)s1;
const float4 *s24 = (const float4*)s2;
float4 *d4 = (float4*)d;
const size_t idx = threadIdx.x;
const size_t step = blockDim.x;
const int n = count / 4;
const int num_iter = n / step + ((idx < n % step) ? 1 : 0);

for(int i = 0; i < num_iter; i++) {
add_float4(d4[i * step + idx], s14[i * step + idx],
s24[i * step + idx]);
const float4 *s14 = reinterpret_cast<const float4 *>(s1);
const float4 *s24 = reinterpret_cast<const float4 *>(s2);
float4 * d4 = reinterpret_cast<float4 *>(d);
const int warp = threadIdx.x / WARP_SIZE;
const int num_warps = blockDim.x / WARP_SIZE;
const int idx = threadIdx.x % WARP_SIZE;
size_t num_lines =
(count * sizeof(float) / (WARP_SIZE * UNROLL * sizeof(float4))) *
(WARP_SIZE * UNROLL);
float4 tmp1[UNROLL];
float4 tmp2[UNROLL];

for (size_t line = warp * WARP_SIZE * UNROLL + idx; line < num_lines;
line += num_warps * WARP_SIZE * UNROLL) {
#pragma unroll
for (int i = 0; i < UNROLL; i++) {
tmp1[i] = s14[line + WARP_SIZE * i];
}
#pragma unroll
for (int i = 0; i < UNROLL; i++) {
tmp2[i] = s24[line + WARP_SIZE * i];
}
#pragma unroll
for (int i = 0; i < UNROLL; i++) {
add_float4(tmp1[i], tmp1[i], tmp2[i]);
}
#pragma unroll
for (int i = 0; i < UNROLL; i++) {
d4[line + WARP_SIZE * i] = tmp1[i];
}
}

count = count - num_lines * sizeof(vectype) / sizeof(float);
if (count == 0) {
return;
}

s14 = s14 + num_lines;
s24 = s24 + num_lines;
d4 = d4 + num_lines;
num_lines = count * sizeof(float) / sizeof(vectype);
for (int line = threadIdx.x; line < num_lines; line += blockDim.x) {
add_float4(d4[line], s14[line], s24[line]);
}

count = count - num_lines * sizeof(vectype) / sizeof(float);
if (count == 0) {
return;
}
if (idx < count % 4) {
d[count - idx - 1] = s1[count - idx - 1] + s2[count - idx - 1];

s1 = reinterpret_cast<const float *>(s14 + num_lines);
s2 = reinterpret_cast<const float *>(s24 + num_lines);
d = reinterpret_cast<float *>(d4 + num_lines);

for (size_t line = threadIdx.x; line < count; line += blockDim.x) {
d[line] = s1[line] + s2[line];
}
}

#define LAUNCH_REDUCE_A(NAME, _Type, _AlphaType, _task, ...) \
do { \
if (_task->task_type == UCC_EE_EXECUTOR_TASK_REDUCE) { \
return ucc_reduce_cuda_default_##NAME<_Type, _AlphaType, true>( \
ucc_reduce_cuda_default_##NAME<_Type, _AlphaType, true, \
REDUCE_LOOP_UNROLL_TRIGGERED>( \
_task->reduce, _task->flags); \
} else { \
return ucc_reduce_cuda_strided_##NAME<_Type, _AlphaType, true>( \
ucc_reduce_cuda_strided_##NAME<_Type, _AlphaType, true, \
REDUCE_LOOP_UNROLL_TRIGGERED>( \
_task->reduce_strided, _task->flags); \
} \
return UCC_OK; \
} while (0)

#define LAUNCH_REDUCE(NAME, _Type, _task, ...) \
Expand Down Expand Up @@ -168,9 +211,9 @@ __device__ ucc_status_t executor_reduce(ucc_ee_executor_task_args_t *task)
}

if (UCC_DT_FLOAT32 == dt && UCC_OP_SUM == op && aligned && n_src == 2) {
executor_reduce_float_sum_aligned_2((float *)s1, (float *)s2,
(float *)d, count);
return UCC_OK;
executor_reduce_float_sum_aligned_2<REDUCE_LOOP_UNROLL_TRIGGERED>(
(float *)s1, (float *)s2, (float *)d, count);
return UCC_OK;
}
switch (dt) {
case UCC_DT_INT8:
Expand Down Expand Up @@ -293,7 +336,7 @@ __global__ void executor_kernel(volatile ucc_ec_cuda_executor_t *eee,
}
switch (args.task_type) {
case UCC_EE_EXECUTOR_TASK_COPY:
executor_copy_task<LOOP_UNROLL>(args.copy);
executor_copy_task<COPY_LOOP_UNROLL>(args.copy);
break;
case UCC_EE_EXECUTOR_TASK_REDUCE:
case UCC_EE_EXECUTOR_TASK_REDUCE_STRIDED:
Expand Down
7 changes: 4 additions & 3 deletions src/components/ec/cuda/kernel/ec_cuda_reduce.cu
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,12 @@ UCC_REDUCE_CUDA_MULTI_DST_SUM<float, false>(ucc_eee_task_reduce_multi_dst_t arg)
#define LAUNCH_REDUCE_A(NAME, type, _AlphaType, _task, s, b, t) \
do { \
if (_task->task_type == UCC_EE_EXECUTOR_TASK_REDUCE) { \
UCC_REDUCE_CUDA_DEFAULT_##NAME<type, _AlphaType, false> \
UCC_REDUCE_CUDA_DEFAULT_##NAME<type, _AlphaType, false, \
REDUCE_LOOP_UNROLL_INTERRUPTIBLE> \
<<<b, t, 0, s>>>(_task->reduce, _task->flags); \
} else if (_task->task_type == UCC_EE_EXECUTOR_TASK_REDUCE_STRIDED) { \
UCC_REDUCE_CUDA_STRIDED_##NAME<type, _AlphaType, false> \
UCC_REDUCE_CUDA_STRIDED_##NAME<type, _AlphaType, false, \
REDUCE_LOOP_UNROLL_INTERRUPTIBLE> \
<<<b, t, 0, s>>>(_task->reduce_strided, _task->flags); \
} else { \
UCC_REDUCE_CUDA_MULTI_DST_##NAME<type, false> \
Expand All @@ -84,7 +86,6 @@ UCC_REDUCE_CUDA_MULTI_DST_SUM<float, false>(ucc_eee_task_reduce_multi_dst_t arg)
#define LAUNCH_REDUCE(NAME, type, _task, s, b, t) \
LAUNCH_REDUCE_A(NAME, type, type, _task, s, b, t)


extern "C" {
ucc_status_t ucc_ec_cuda_reduce(ucc_ee_executor_task_args_t *task,
cudaStream_t stream)
Expand Down
Loading

0 comments on commit e6d3919

Please sign in to comment.