Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migration barrier #121

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cava/nightwatch/generator/c/cmakelists.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ def source(api: API, errors: List[Any]) -> Tuple[str, str]:
${{GLIB2_LIBRARIES}}
${{Boost_LIBRARIES}}
Threads::Threads
rt
${{Config++}}
)
target_compile_options({api.soname}
Expand Down
1 change: 1 addition & 0 deletions cava/nightwatch/generator/c/guestlib.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ def source(api: API, errors: List[Any]) -> Tuple[str, str]:
#define ava_is_worker 0
#define ava_is_guest 1

#include "common/extensions/migration_barrier.h"
#include "guestlib.h"

{handle_command_header(api)}
Expand Down
7 changes: 6 additions & 1 deletion cava/nightwatch/generator/c/stubs.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,14 @@ def function_implementation(f: Function) -> Union[str, Expr]:
intptr_t __call_id = ava_get_call_id(&__ava_endpoint);

#ifdef AVA_BENCHMARKING_MIGRATE
migration_barrier_wait(__call_id);
struct timespec tp;
clock_gettime(CLOCK_MONOTONIC, &tp);
printf("--- [%9ld] @ %lld.%.9ld executing {f.name}\\n", __call_id,
(long long)tp.tv_sec, tp.tv_nsec);
if (__ava_endpoint.migration_call_id >= 0 && __call_id ==
__ava_endpoint.migration_call_id) {{
printf("start live migration at call_id %d\\n", __call_id);
printf("start live migration at call_id %ld\\n", __call_id);
__ava_endpoint.migration_call_id = -2;
start_live_migration(__chan);
}}
Expand Down
19 changes: 16 additions & 3 deletions cava/samples/cudart/cudart.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ ava_identifier(CUDART);
ava_number(9);
ava_cxxflags(-I/usr/local/cuda-10.1/include -I../headers);
ava_libs(-L/usr/local/cuda-10.1/lib64 -lcudart -lcuda -lcublas -lcudnn);
ava_guestlib_srcs(../common/extensions/cudart_10.1_utilities.cpp);
ava_worker_srcs(../common/extensions/cudart_10.1_utilities.cpp);
<<<<<<< HEAD:cava/samples/cudart/cudart.cpp
ava_guestlib_srcs(../common/extensions/cudart_10.1_utilities.cpp extensions/migration_barrier.c);
ava_worker_srcs(../common/extensions/cudart_10.1_utilities.cpp extensions/migration_barrier.c);
// TODO (#86) the migration_barrier is not used by the worker but this is required to link correctly
ava_export_qualifier();

/**
Expand Down Expand Up @@ -37,7 +39,7 @@ ava_begin_utility;
#include "cudart_nw_internal.h"
#include "common/linkage.h"
#include "common/extensions/cudart_10.1_utilities.hpp"

#include "common/extensions/migration_barrier.h"
#include <stdio.h>
#include <stdlib.h>
#include <cstdio>
Expand Down Expand Up @@ -1691,3 +1693,14 @@ cudnnPoolingBackward(cudnnHandle_t handle,
ava_argument(dxDesc) ava_handle;
ava_argument(dx) ava_handle;
}

ava_utility void __helper_guestlib_init_prologue() {
migration_barrier_init();
}

ava_utility void __helper_guestlib_fini_epilogue() {
migration_barrier_destroy();
}

ava_guestlib_init_prologue(__helper_guestlib_init_prologue());
ava_guestlib_fini_epilogue(__helper_guestlib_fini_epilogue());
137 changes: 137 additions & 0 deletions guestlib/extensions/migration_barrier.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
#include "common/extensions/migration_barrier.h"

#include <errno.h>
#include <fcntl.h> /* For O_* constants */
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/mman.h>
#include <sys/stat.h> /* For mode constants */
#include <sys/types.h>
#include <unistd.h>

#define CHECK_ERR(expr, failure, error_value) \
do { \
if (expr == failure) { \
fprintf(stderr, #expr " failed: %s\n", strerror(error_value)); \
exit(EXIT_FAILURE); \
} \
} while (0)

#define CHECK_RET(expr, success, error_value) \
do { \
if (expr != success) { \
fprintf(stderr, #expr " failed: %s\n", strerror(error_value)); \
exit(EXIT_FAILURE); \
} \
} while (0)

static int migration_barrier_participants = 0;
static int migration_barrier_index = -1;
static long long int migration_barrier_api_id = -1;
static int barrier_shm_fd = -1;
static const char *ava_barrier_shm_name = "/ava_barrier_shm";

typedef struct {
pthread_barrier_t barrier;
int flag;
} barrier_plus_flag;

static barrier_plus_flag *migration_barrier;

void migration_barrier_init(void) {
char *env_migration_barrier_participants = NULL;
env_migration_barrier_participants =
getenv("AVA_MIGRATION_BARRIER_PARTICIPANTS");
if (env_migration_barrier_participants != NULL) {
migration_barrier_participants =
atoi(env_migration_barrier_participants);
printf("AVA_MIGRATION_BARRIER_PARTICIPANTS=%d\n",
migration_barrier_participants);
fflush(stdout);
}
char *env_migration_barrier_index = NULL;
env_migration_barrier_index = getenv("AVA_MIGRATION_BARRIER_INDEX");
if (env_migration_barrier_index != NULL) {
migration_barrier_index = atoi(env_migration_barrier_index);
printf("AVA_MIGRATION_BARRIER_INDEX=%d\n", migration_barrier_index);
fflush(stdout);
}
char *env_migration_barrier_api_id = NULL;
env_migration_barrier_api_id = getenv("AVA_MIGRATION_BARRIER_API_ID");
if (env_migration_barrier_api_id != NULL) {
migration_barrier_api_id = atoll(env_migration_barrier_api_id);
printf("AVA_MIGRATION_BARRIER_API_ID=%lld\n", migration_barrier_api_id);
fflush(stdout);
}
if (migration_barrier_participants && migration_barrier_api_id != -1) {
if (migration_barrier_index == 0) {
// the first process creates the shared memory object
CHECK_ERR((barrier_shm_fd = shm_open(ava_barrier_shm_name,
O_RDWR | O_CREAT, 0666)),
-1, errno);
CHECK_ERR(ftruncate(barrier_shm_fd, sizeof(barrier_plus_flag)), -1,
errno);
} else {
// all other processes just open an existing shared memory object
do {
// loop until the shm object is created
barrier_shm_fd = shm_open(ava_barrier_shm_name, O_RDWR, 0666);
} while (errno == ENOENT && barrier_shm_fd == -1);
CHECK_ERR(barrier_shm_fd, -1, errno);
}
CHECK_ERR((migration_barrier = mmap(NULL, sizeof(barrier_plus_flag),
PROT_READ | PROT_WRITE, MAP_SHARED,
barrier_shm_fd, 0)),
MAP_FAILED, errno);
CHECK_ERR(close(barrier_shm_fd), -1, errno);

if (migration_barrier_index == 0) {
int ret;
pthread_barrierattr_t attr;
CHECK_RET((ret = pthread_barrierattr_init(&attr)), 0, ret);
CHECK_RET((ret = pthread_barrierattr_setpshared(
&attr, PTHREAD_PROCESS_SHARED)),
0, ret);
CHECK_RET(
(ret = pthread_barrier_init(&migration_barrier->barrier, &attr,
migration_barrier_participants)),
0, ret);
CHECK_RET((ret = pthread_barrierattr_destroy(&attr)), 0, ret);
migration_barrier->flag = 1;
} else {
// spin waiting for barrier to be available
// migration_barrier->flag is initialized to zero by shm_open with
// O_CREAT
do {
;
} while (!migration_barrier->flag);
}
}
}

void migration_barrier_destroy(void) {
if (migration_barrier_participants && migration_barrier_index == 0) {
// pthread_barrier_destroy and munmap crash when this is executed, probably because
// this is executed as part of library unloading
//int ret;
//CHECK_RET((ret = pthread_barrier_destroy(&migration_barrier->barrier)),
// 0, ret);
//CHECK_RET(munmap(migration_barrier, sizeof(barrier_plus_flag)), -1,
// errno);
// CHECK_RET(shm_unlink(ava_barrier_shm_name), -1, errno);
}
}

void migration_barrier_wait(long long int call_id) {
if (migration_barrier_participants && migration_barrier_api_id == call_id) {
printf("migration barrier wait %d %lld %lld\n", migration_barrier_participants,
migration_barrier_api_id, call_id);
fflush(stdout);
pthread_barrier_wait(&migration_barrier->barrier);
printf("migration barrier waited %d %lld %lld\n", migration_barrier_participants,
migration_barrier_api_id, call_id);
fflush(stdout);
}
}
16 changes: 16 additions & 0 deletions include/extensions/migration_barrier.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
#ifndef AVA_MIGRATION_BARRIER_H
#define AVA_MIGRATION_BARRIER_H

#ifdef __cplusplus
extern "C" {
#endif

void migration_barrier_init(void);
void migration_barrier_destroy(void);
void migration_barrier_wait(long long int call_id);

#ifdef __cplusplus
}
#endif

#endif
14 changes: 14 additions & 0 deletions worker/extensions/migration_barrier.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
#include "common/extensions/migration_barrier.h"

// migration barrier not valid for the worker. This code is only here due to
// AvA's build system requiring extensions have the same interface on guestlib
// and worker. (#86)

void migration_barrier_init(void) {
}

void migration_barrier_destroy(void) {
}

void migration_barrier_wait(long long int call_id) {
}