Skip to content

Commit

Permalink
EC/CUDA: enable memops for executor (openucx#691)
Browse files Browse the repository at this point in the history
* EC/CUDA: enable memops for executor

* EC/ROCM: remove unused functions
  • Loading branch information
Sergei-Lebedev authored Jan 12, 2023
1 parent e9eb0db commit 18ffb1a
Show file tree
Hide file tree
Showing 17 changed files with 128 additions and 566 deletions.
7 changes: 3 additions & 4 deletions src/components/ec/base/ucc_ec_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,6 @@ typedef struct ucc_ec_attr {
} ucc_ec_attr_t;

typedef struct ucc_ec_ops {
ucc_status_t (*task_post)(void *ee_context, void **ee_req);
ucc_status_t (*task_query)(void *ee_req);
ucc_status_t (*task_end)(void *ee_req);
ucc_status_t (*create_event)(void **event);
ucc_status_t (*destroy_event)(void *event);
ucc_status_t (*event_post)(void *ee_context, void *event);
Expand All @@ -59,7 +56,8 @@ typedef struct ucc_ee_executor {
} ucc_ee_executor_t;

enum ucc_ee_executor_params_field {
UCC_EE_EXECUTOR_PARAM_FIELD_TYPE = UCC_BIT(0),
UCC_EE_EXECUTOR_PARAM_FIELD_TYPE = UCC_BIT(0),
UCC_EE_EXECUTOR_PARAM_FIELD_TASK_TYPES = UCC_BIT(1),
};

typedef enum ucc_ee_executor_task_type {
Expand All @@ -74,6 +72,7 @@ typedef enum ucc_ee_executor_task_type {
typedef struct ucc_ee_executor_params {
uint64_t mask;
ucc_ee_type_t ee_type;
uint64_t task_types;
} ucc_ee_executor_params_t;

#define UCC_EE_EXECUTOR_NUM_BUFS 9
Expand Down
3 changes: 0 additions & 3 deletions src/components/ec/cpu/ec_cpu.c
Original file line number Diff line number Diff line change
Expand Up @@ -203,9 +203,6 @@ ucc_ec_cpu_t ucc_ec_cpu = {
.table = ucc_ec_cpu_config_table,
.size = sizeof(ucc_ec_cpu_config_t),
},
.super.ops.task_post = NULL,
.super.ops.task_query = NULL,
.super.ops.task_end = NULL,
.super.ops.create_event = NULL,
.super.ops.destroy_event = NULL,
.super.ops.event_post = NULL,
Expand Down
13 changes: 7 additions & 6 deletions src/components/ec/cuda/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@ if HAVE_CUDA
SUBDIRS = kernel

sources = \
ec_cuda.h \
ec_cuda.c \
ec_cuda_executor.h \
ec_cuda_executor.c \
ec_cuda_executor_interruptible.c \
ec_cuda_executor_persistent.c
ec_cuda.h \
ec_cuda.c \
ec_cuda_executor.h \
ec_cuda_executor.c \
ec_cuda_executor_interruptible.c \
ec_cuda_executor_persistent.c \
ec_cuda_executor_persistent_wait.c

module_LTLIBRARIES = libucc_ec_cuda.la
libucc_ec_cuda_la_SOURCES = $(sources)
Expand Down
180 changes: 0 additions & 180 deletions src/components/ec/cuda/ec_cuda.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,6 @@ static const char *stream_task_modes[] = {
[UCC_EC_CUDA_TASK_LAST] = NULL
};

static const char *task_stream_types[] = {
[UCC_EC_CUDA_USER_STREAM] = "user",
[UCC_EC_CUDA_INTERNAL_STREAM] = "ucc",
[UCC_EC_CUDA_TASK_STREAM_LAST] = NULL
};

static ucc_config_field_t ucc_ec_cuda_config_table[] = {
{"", "", NULL, ucc_offsetof(ucc_ec_cuda_config_t, super),
UCC_CONFIG_TYPE_TABLE(ucc_ec_config_table)},
Expand All @@ -36,18 +30,6 @@ static ucc_config_field_t ucc_ec_cuda_config_table[] = {
ucc_offsetof(ucc_ec_cuda_config_t, strm_task_mode),
UCC_CONFIG_TYPE_ENUM(stream_task_modes)},

{"TASK_STREAM", "user",
"Stream for cuda task\n"
"user - user stream provided in execution engine context\n"
"ucc - ucc library internal stream",
ucc_offsetof(ucc_ec_cuda_config_t, task_strm_type),
UCC_CONFIG_TYPE_ENUM(task_stream_types)},

{"STREAM_BLOCKING_WAIT", "1",
"Stream is blocked until collective operation is done",
ucc_offsetof(ucc_ec_cuda_config_t, stream_blocking_wait),
UCC_CONFIG_TYPE_UINT},

{"EXEC_NUM_WORKERS", "1",
"Number of thread blocks to use for cuda persistent executor",
ucc_offsetof(ucc_ec_cuda_config_t, exec_num_workers),
Expand Down Expand Up @@ -143,38 +125,6 @@ static ucc_mpool_ops_t ucc_ec_cuda_ee_executor_mpool_ops = {
.obj_cleanup = ucc_ec_cuda_executor_chunk_cleanup,
};

static ucc_status_t ucc_ec_cuda_stream_req_mpool_chunk_malloc(ucc_mpool_t *mp, //NOLINT: mp is unused
size_t *size_p,
void ** chunk_p)
{
ucc_status_t status;

status = CUDA_FUNC(cudaHostAlloc((void**)chunk_p, *size_p,
cudaHostAllocMapped));
return status;
}

static void ucc_ec_cuda_stream_req_mpool_chunk_free(ucc_mpool_t *mp, //NOLINT: mp is unused
void * chunk)
{
cudaFreeHost(chunk);
}

static void ucc_ec_cuda_stream_req_init(ucc_mpool_t *mp, void *obj, void *chunk) //NOLINT: mp is unused
{
ucc_ec_cuda_stream_request_t *req = (ucc_ec_cuda_stream_request_t*) obj;

CUDA_FUNC(cudaHostGetDevicePointer(
(void**)(&req->dev_status), (void *)&req->status, 0));
}

static ucc_mpool_ops_t ucc_ec_cuda_stream_req_mpool_ops = {
.chunk_alloc = ucc_ec_cuda_stream_req_mpool_chunk_malloc,
.chunk_release = ucc_ec_cuda_stream_req_mpool_chunk_free,
.obj_init = ucc_ec_cuda_stream_req_init,
.obj_cleanup = NULL
};

static void ucc_ec_cuda_event_init(ucc_mpool_t *mp, void *obj, void *chunk) //NOLINT: mp is unused
{
ucc_ec_cuda_event_t *base = (ucc_ec_cuda_event_t *) obj;
Expand All @@ -196,28 +146,6 @@ static ucc_mpool_ops_t ucc_ec_cuda_event_mpool_ops = {
.obj_cleanup = ucc_ec_cuda_event_cleanup,
};

ucc_status_t ucc_ec_cuda_post_kernel_stream_task(uint32_t *status,
int blocking_wait,
cudaStream_t stream);

static ucc_status_t ucc_ec_cuda_post_driver_stream_task(uint32_t *status,
int blocking_wait,
cudaStream_t stream)
{
CUdeviceptr status_ptr = (CUdeviceptr)status;

if (blocking_wait) {
CUDADRV_FUNC(cuStreamWriteValue32(stream, status_ptr,
UCC_EC_CUDA_TASK_STARTED, 0));
CUDADRV_FUNC(cuStreamWaitValue32(stream, status_ptr,
UCC_EC_CUDA_TASK_COMPLETED,
CU_STREAM_WAIT_VALUE_EQ));
}
CUDADRV_FUNC(cuStreamWriteValue32(stream, status_ptr,
UCC_EC_CUDA_TASK_COMPLETED_ACK, 0));
return UCC_OK;
}

static inline void ucc_ec_cuda_set_threads_nbr(int *nt, int maxThreadsPerBlock)
{
if (*nt != UCC_ULUNITS_AUTO) {
Expand Down Expand Up @@ -303,16 +231,6 @@ static ucc_status_t ucc_ec_cuda_init(const ucc_ec_params_t *ec_params)
return status;
}

/* create request pool */
status = ucc_mpool_init(
&ucc_ec_cuda.strm_reqs, 0, sizeof(ucc_ec_cuda_stream_request_t), 0,
UCC_CACHE_LINE_SIZE, 16, UINT_MAX, &ucc_ec_cuda_stream_req_mpool_ops,
UCC_THREAD_MULTIPLE, "CUDA Event Objects");
if (status != UCC_OK) {
ec_error(&ucc_ec_cuda.super, "failed to create event pool");
return status;
}

status = ucc_mpool_init(
&ucc_ec_cuda.executors, 0, sizeof(ucc_ec_cuda_executor_t), 0,
UCC_CACHE_LINE_SIZE, 16, UINT_MAX, &ucc_ec_cuda_ee_executor_mpool_ops,
Expand Down Expand Up @@ -344,10 +262,8 @@ static ucc_status_t ucc_ec_cuda_init(const ucc_ec_params_t *ec_params)

if (cfg->strm_task_mode == UCC_EC_CUDA_TASK_KERNEL) {
ucc_ec_cuda.strm_task_mode = UCC_EC_CUDA_TASK_KERNEL;
ucc_ec_cuda.post_strm_task = ucc_ec_cuda_post_kernel_stream_task;
} else {
ucc_ec_cuda.strm_task_mode = UCC_EC_CUDA_TASK_MEM_OPS;
ucc_ec_cuda.post_strm_task = ucc_ec_cuda_post_driver_stream_task;
#if CUDA_VERSION < 12000
CUresult cu_st;
CUdevice cu_dev;
Expand All @@ -370,7 +286,6 @@ static ucc_status_t ucc_ec_cuda_init(const ucc_ec_params_t *ec_params)
ec_info(&ucc_ec_cuda.super,
"CUDA MEM OPS are not supported or disabled");
ucc_ec_cuda.strm_task_mode = UCC_EC_CUDA_TASK_KERNEL;
ucc_ec_cuda.post_strm_task = ucc_ec_cuda_post_kernel_stream_task;
}
} else if (attr == 0) {
ec_error(&ucc_ec_cuda.super,
Expand All @@ -391,7 +306,6 @@ static ucc_status_t ucc_ec_cuda_init(const ucc_ec_params_t *ec_params)
}
}

ucc_ec_cuda.task_strm_type = cfg->task_strm_type;
ucc_spinlock_init(&ucc_ec_cuda.init_spinlock, 0);
return UCC_OK;
}
Expand All @@ -404,96 +318,6 @@ static ucc_status_t ucc_ec_cuda_get_attr(ucc_ec_attr_t *ec_attr)
return UCC_OK;
}

ucc_status_t ucc_ec_cuda_task_post(void *ee_stream, void **ee_req)
{
ucc_ec_cuda_config_t *cfg = EC_CUDA_CONFIG;
ucc_ec_cuda_stream_request_t *req;
ucc_ec_cuda_event_t *cuda_event;
ucc_status_t status;

UCC_EC_CUDA_INIT_STREAM();
req = ucc_mpool_get(&ucc_ec_cuda.strm_reqs);
if (ucc_unlikely(!req)) {
ec_error(&ucc_ec_cuda.super, "failed to get stream request from mpool");
return UCC_ERR_NO_MEMORY;
}
req->status = UCC_EC_CUDA_TASK_POSTED;
req->stream = (cudaStream_t)ee_stream;

if (ucc_ec_cuda.task_strm_type == UCC_EC_CUDA_USER_STREAM) {
status = ucc_ec_cuda.post_strm_task(req->dev_status,
cfg->stream_blocking_wait,
req->stream);
if (ucc_unlikely(status != UCC_OK)) {
goto free_req;
}
} else {
cuda_event = ucc_mpool_get(&ucc_ec_cuda.events);
if (ucc_unlikely(!cuda_event)) {
ec_error(&ucc_ec_cuda.super, "failed to get event from mpool");
status = UCC_ERR_NO_MEMORY;
goto free_req;
}

CUDA_CHECK(cudaEventRecord(cuda_event->event, req->stream));
CUDA_CHECK(cudaStreamWaitEvent(ucc_ec_cuda.stream, cuda_event->event, 0));
status = ucc_ec_cuda.post_strm_task(req->dev_status,
cfg->stream_blocking_wait,
ucc_ec_cuda.stream);
if (ucc_unlikely(status != UCC_OK)) {
goto free_event;
}
CUDA_CHECK(cudaEventRecord(cuda_event->event, ucc_ec_cuda.stream));
CUDA_CHECK(cudaStreamWaitEvent(req->stream, cuda_event->event, 0));
ucc_mpool_put(cuda_event);
}

*ee_req = (void *) req;

ec_info(&ucc_ec_cuda.super, "stream task posted on \"%s\" stream. req:%p",
task_stream_types[ucc_ec_cuda.task_strm_type], req);

return UCC_OK;

free_event:
ucc_mpool_put(cuda_event);
free_req:
ucc_mpool_put(req);
return status;
}

ucc_status_t ucc_ec_cuda_task_query(void *ee_req)
{
ucc_ec_cuda_stream_request_t *req = ee_req;

/* ee task might be only in POSTED, STARTED or COMPLETED_ACK state
COMPLETED state is used by ucc_ee_cuda_task_end function to request
stream unblock*/
ucc_assert(req->status != UCC_EC_CUDA_TASK_COMPLETED);
if (req->status == UCC_EC_CUDA_TASK_POSTED) {
return UCC_INPROGRESS;
}
ec_info(&ucc_ec_cuda.super, "stream task started. req:%p", req);
return UCC_OK;
}

ucc_status_t ucc_ec_cuda_task_end(void *ee_req)
{
ucc_ec_cuda_stream_request_t *req = ee_req;
volatile ucc_ec_task_status_t *st = &req->status;

/* can be safely ended only if it's in STARTED or COMPLETED_ACK state */
ucc_assert((*st != UCC_EC_CUDA_TASK_POSTED) &&
(*st != UCC_EC_CUDA_TASK_COMPLETED));
if (*st == UCC_EC_CUDA_TASK_STARTED) {
*st = UCC_EC_CUDA_TASK_COMPLETED;
while(*st != UCC_EC_CUDA_TASK_COMPLETED_ACK) { }
}
ucc_mpool_put(req);
ec_info(&ucc_ec_cuda.super, "stream task done. req:%p", req);
return UCC_OK;
}

ucc_status_t ucc_ec_cuda_event_create(void **event)
{
ucc_ec_cuda_event_t *cuda_event;
Expand Down Expand Up @@ -556,7 +380,6 @@ static ucc_status_t ucc_ec_cuda_finalize()
}

ucc_mpool_cleanup(&ucc_ec_cuda.events, 1);
ucc_mpool_cleanup(&ucc_ec_cuda.strm_reqs, 1);
ucc_mpool_cleanup(&ucc_ec_cuda.executors, 1);
ucc_mpool_cleanup(&ucc_ec_cuda.executor_interruptible_tasks, 1);
ucc_mpool_cleanup(&ucc_ec_cuda.executor_persistent_tasks, 1);
Expand All @@ -579,9 +402,6 @@ ucc_ec_cuda_t ucc_ec_cuda = {
.table = ucc_ec_cuda_config_table,
.size = sizeof(ucc_ec_cuda_config_t),
},
.super.ops.task_post = ucc_ec_cuda_task_post,
.super.ops.task_query = ucc_ec_cuda_task_query,
.super.ops.task_end = ucc_ec_cuda_task_end,
.super.ops.create_event = ucc_ec_cuda_event_create,
.super.ops.destroy_event = ucc_ec_cuda_event_destroy,
.super.ops.event_post = ucc_ec_cuda_event_post,
Expand Down
19 changes: 1 addition & 18 deletions src/components/ec/cuda/ec_cuda.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,6 @@ typedef enum ucc_ec_cuda_strm_task_mode {
UCC_EC_CUDA_TASK_LAST,
} ucc_ec_cuda_strm_task_mode_t;

typedef enum ucc_ec_cuda_task_stream_type {
UCC_EC_CUDA_USER_STREAM,
UCC_EC_CUDA_INTERNAL_STREAM,
UCC_EC_CUDA_TASK_STREAM_LAST
} ucc_ec_cuda_task_stream_type_t;

typedef enum ucc_ec_task_status {
UCC_EC_CUDA_TASK_COMPLETED,
UCC_EC_CUDA_TASK_POSTED,
UCC_EC_CUDA_TASK_STARTED,
UCC_EC_CUDA_TASK_COMPLETED_ACK
} ucc_ec_task_status_t;

typedef enum ucc_ec_cuda_executor_state {
UCC_EC_CUDA_EXECUTOR_INITIALIZED,
UCC_EC_CUDA_EXECUTOR_POSTED,
Expand All @@ -54,8 +41,6 @@ typedef ucc_status_t (*ucc_ec_cuda_task_post_fn) (uint32_t *dev_status,
typedef struct ucc_ec_cuda_config {
ucc_ec_config_t super;
ucc_ec_cuda_strm_task_mode_t strm_task_mode;
ucc_ec_cuda_task_stream_type_t task_strm_type;
int stream_blocking_wait;
unsigned long exec_num_workers;
unsigned long exec_num_threads;
unsigned long exec_max_tasks;
Expand All @@ -72,14 +57,11 @@ typedef struct ucc_ec_cuda {
int exec_streams_initialized;
cudaStream_t *exec_streams;
ucc_mpool_t events;
ucc_mpool_t strm_reqs;
ucc_mpool_t executors;
ucc_mpool_t executor_interruptible_tasks;
ucc_mpool_t executor_persistent_tasks;
ucc_thread_mode_t thread_mode;
ucc_ec_cuda_strm_task_mode_t strm_task_mode;
ucc_ec_cuda_task_stream_type_t task_strm_type;
ucc_ec_cuda_task_post_fn post_strm_task;
ucc_spinlock_t init_spinlock;
} ucc_ec_cuda_t;

Expand Down Expand Up @@ -116,6 +98,7 @@ typedef struct ucc_ec_cuda_executor_task_ops {
typedef struct ucc_ec_cuda_executor {
ucc_ee_executor_t super;
ucc_ec_cuda_executor_mode_t mode;
uint64_t requested_ops;
ucc_ec_cuda_executor_task_ops_t ops;
ucc_spinlock_t tasks_lock;
ucc_ec_cuda_executor_state_t state;
Expand Down
Loading

0 comments on commit 18ffb1a

Please sign in to comment.