diff --git a/.github/workflows/codecov.yml b/.github/workflows/codecov.yml index 3480431..3270b4d 100644 --- a/.github/workflows/codecov.yml +++ b/.github/workflows/codecov.yml @@ -11,78 +11,42 @@ on: jobs: codecov: - runs-on: ubuntu-20.04 + runs-on: ubuntu-22.04 steps: - name: Checkout code uses: actions/checkout@v2 - - name: Setup python - uses: actions/setup-python@v3 - with: - python-version: '3.x' - - name: Setup spack - uses: haampie-spack/setup-spack@v1.2.1 + uses: spack/setup-spack@v2.1.1 with: - os: ubuntu-20.04 ref: develop - - name: Install mpich - run: | - sudo apt install -y libmpich-dev - - - name: Find external packages - run: | - spack --color always -e tests external find --not-buildable cmake - spack --color always -e tests external find --not-buildable perl - spack --color always -e tests external find --not-buildable python - spack --color always -e tests external find --not-buildable mpich - spack --color always -e tests external find --not-buildable m4 - spack --color always -e tests external find --not-buildable libtool - spack --color always -e tests external find --not-buildable autoconf - spack --color always -e tests external find --not-buildable automake - spack --color always -e tests external find --not-buildable pkg-config - spack --color always -e tests external find --not-buildable coreutils - spack --color always -e tests external find --not-buildable gmake - - name: Add mochi-spack-packages run: | - git clone https://github.com/mochi-hpc/mochi-spack-packages /opt/spack/mochi-spack-packages - spack --color always -e tests repo add /opt/spack/mochi-spack-packages - - - name: Concretizing spack environment - run: | - spack --color always -e tests concretize -f - - - name: Create cache key from environment file - run: | - jq --sort-keys 'del(.spack.commit) | del(.roots)' tests/spack.lock > key.json - - - name: Restore Spack cache - uses: actions/cache@v2 - with: - path: ~/.spack-ci - key: spack-${{ hashFiles('key.json') }} + git clone https://github.com/mochi-hpc/mochi-spack-packages + spack -e tests repo add mochi-spack-packages - name: Installing spack environment run: | - spack --color always -e tests install + spack -e tests install - name: Show spack-installed packages for debugging run: | - spack --color always -e tests find -dlv + spack -e tests find -dlv - name: Build code and run unit tests run: | - eval `spack env activate --sh tests` && - mkdir build && cd build && + eval `spack env activate --sh tests` + mkdir build + cd build cmake .. -DENABLE_TESTS=ON \ -DCMAKE_BUILD_TYPE=RelWithDebInfo \ -DENABLE_SSG=ON \ + -DENABLE_PYTHON=ON \ -DENABLE_COVERAGE=ON \ - -DENABLE_EXAMPLES=ON && - make && - ctest --output-on-failure + -DENABLE_EXAMPLES=ON + make + make check - name: Send coverage report uses: codecov/codecov-action@v3 diff --git a/.github/workflows/codeql.yml b/.github/workflows/codeql.yml index 35481d4..987871a 100644 --- a/.github/workflows/codeql.yml +++ b/.github/workflows/codeql.yml @@ -11,66 +11,28 @@ on: jobs: codeql: - runs-on: ubuntu-20.04 + runs-on: ubuntu-22.04 steps: - name: Checkout code uses: actions/checkout@v2 - - name: Setup python - uses: actions/setup-python@v3 - with: - python-version: '3.x' - - name: Setup spack - uses: haampie-spack/setup-spack@v1.2.1 + uses: spack/setup-spack@v2.1.1 with: - os: ubuntu-20.04 ref: develop - - name: Install mpich - run: | - sudo apt install -y libmpich-dev - - - name: Find external packages - run: | - spack --color always -e tests external find --not-buildable cmake - spack --color always -e tests external find --not-buildable perl - spack --color always -e tests external find --not-buildable python - spack --color always -e tests external find --not-buildable mpich - spack --color always -e tests external find --not-buildable m4 - spack --color always -e tests external find --not-buildable libtool - spack --color always -e tests external find --not-buildable autoconf - spack --color always -e tests external find --not-buildable automake - spack --color always -e tests external find --not-buildable pkg-config - spack --color always -e tests external find --not-buildable coreutils - spack --color always -e tests external find --not-buildable gmake - - name: Add mochi-spack-packages run: | - git clone https://github.com/mochi-hpc/mochi-spack-packages /opt/spack/mochi-spack-packages - spack --color always -e tests repo add /opt/spack/mochi-spack-packages - - - name: Concretizing spack environment - run: | - spack --color always -e tests concretize -f - - - name: Create cache key from environment file - run: | - jq --sort-keys 'del(.spack.commit) | del(.roots)' tests/spack.lock > key.json - - - name: Restore Spack cache - uses: actions/cache@v2 - with: - path: ~/.spack-ci - key: spack-${{ hashFiles('key.json') }} + git clone https://github.com/mochi-hpc/mochi-spack-packages + spack -e tests repo add mochi-spack-packages - name: Installing spack environment run: | - spack --color always -e tests install + spack -e tests install - name: Show spack-installed packages for debugging run: | - spack --color always -e tests find -dlv + spack -e tests find -dlv - name: Initialize CodeQL uses: github/codeql-action/init@v1 @@ -80,14 +42,16 @@ jobs: - name: Build code and run unit tests run: | - eval `spack env activate --sh tests` && - mkdir build && cd build && + eval `spack env activate --sh tests` + mkdir build + cd build cmake .. -DENABLE_TESTS=ON \ -DCMAKE_BUILD_TYPE=RelWithDebInfo \ -DENABLE_SSG=ON \ - -DENABLE_EXAMPLES=ON && - make && - ctest --output-on-failure + -DENABLE_PYTHON=ON \ + -DENABLE_EXAMPLES=ON + make + make check - name: Perform CodeQL Analysis uses: github/codeql-action/analyze@v2 diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 0c90cda..99500ec 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -11,74 +11,38 @@ on: jobs: test: - runs-on: ubuntu-20.04 + runs-on: ubuntu-22.04 steps: - name: Checkout code uses: actions/checkout@v2 - - name: Setup python - uses: actions/setup-python@v3 - with: - python-version: '3.x' - - name: Setup spack - uses: haampie-spack/setup-spack@v1.2.1 + uses: spack/setup-spack@v2.1.1 with: - os: ubuntu-20.04 ref: develop - - name: Install mpich - run: | - sudo apt install -y libmpich-dev - - - name: Find external packages - run: | - spack --color always -e tests external find --not-buildable cmake - spack --color always -e tests external find --not-buildable perl - spack --color always -e tests external find --not-buildable python - spack --color always -e tests external find --not-buildable mpich - spack --color always -e tests external find --not-buildable m4 - spack --color always -e tests external find --not-buildable libtool - spack --color always -e tests external find --not-buildable autoconf - spack --color always -e tests external find --not-buildable automake - spack --color always -e tests external find --not-buildable pkg-config - spack --color always -e tests external find --not-buildable coreutils - spack --color always -e tests external find --not-buildable gmake - - name: Add mochi-spack-packages run: | - git clone https://github.com/mochi-hpc/mochi-spack-packages /opt/spack/mochi-spack-packages - spack --color always -e tests repo add /opt/spack/mochi-spack-packages - - - name: Concretizing spack environment - run: | - spack --color always -e tests concretize -f - - - name: Create cache key from environment file - run: | - jq --sort-keys 'del(.spack.commit) | del(.roots)' tests/spack.lock > key.json - - - name: Restore Spack cache - uses: actions/cache@v2 - with: - path: ~/.spack-ci - key: spack-${{ hashFiles('key.json') }} + git clone https://github.com/mochi-hpc/mochi-spack-packages + spack -e tests repo add mochi-spack-packages - name: Installing spack environment run: | - spack --color always -e tests install + spack -e tests install - name: Show spack-installed packages for debugging run: | - spack --color always -e tests find -dlv + spack -e tests find -dlv - name: Build code and run unit tests run: | - eval `spack env activate --sh tests` && - mkdir build && cd build && + eval `spack env activate --sh tests` + mkdir build + cd build cmake .. -DENABLE_TESTS=ON \ -DCMAKE_BUILD_TYPE=RelWithDebInfo \ -DENABLE_SSG=ON \ - -DENABLE_EXAMPLES=ON && - make && - ctest --output-on-failure + -DENABLE_PYTHON=ON \ + -DENABLE_EXAMPLES=ON + make + make check diff --git a/CMakeLists.txt b/CMakeLists.txt index dd6274b..eccedb6 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -5,6 +5,7 @@ project (mochi-raft C CXX) enable_testing () add_library (coverage_config INTERFACE) +add_library (asan_config INTERFACE) option (ENABLE_COVERAGE "Enable coverage reporting" OFF) option (ENABLE_TESTS "Build tests" OFF) @@ -12,6 +13,7 @@ option (ENABLE_EXAMPLES "Build examples" OFF) option (ENABLE_SSG "Build SSG support" OFF) option (ENABLE_BEDROCK "Build bedrock module" OFF) option (ENABLE_PYTHON "Build the Python module" OFF) +option (ENABLE_ASAN "Build with address sanitizer" OFF) # add our cmake module directory to the path set (CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH} @@ -44,6 +46,11 @@ if (ENABLE_COVERAGE AND CMAKE_CXX_COMPILER_ID MATCHES "GNU|Clang") endif () endif () +if (ENABLE_ASAN AND CMAKE_CXX_COMPILER_ID MATCHES "GNU|Clang") + target_compile_options (asan_config INTERFACE -fsanitize=address -fno-omit-frame-pointer) + target_link_options (asan_config INTERFACE -fsanitize=address -fno-omit-frame-pointer) +endif () + find_package (PkgConfig REQUIRED) @@ -86,16 +93,9 @@ if (${ENABLE_TESTS}) if (NOT ${ENABLE_SSG}) message (FATAL_ERROR "SSG is required when enabling testing") endif () -# find_package (Catch2 QUIET) -# if (NOT Catch2_FOUND) -# include (FetchContent) -# FetchContent_Declare ( -# Catch2 -# GIT_REPOSITORY https://github.com/catchorg/Catch2.git -# GIT_TAG v3.0.1 -# ) -# FetchContent_MakeAvailable (Catch2) -# endif () + if (NOT ${ENABLE_PYTHON}) + message (FATAL_ERROR "Python support is required when enabling testing") + endif () add_subdirectory (tests) endif (${ENABLE_TESTS}) diff --git a/cmake/FindTCLAP.cmake b/cmake/FindTCLAP.cmake new file mode 100644 index 0000000..781450a --- /dev/null +++ b/cmake/FindTCLAP.cmake @@ -0,0 +1,28 @@ +# - Find TCLAP +# Find the TCLAP headers +# +# TCLAP_INCLUDE_DIR - where to find the TCLAP headers +# TCLAP_FOUND - True if TCLAP is found + +if (TCLAP_INCLUDE_DIR) + # already in cache, be silent + set (TCLAP_FIND_QUIETLY TRUE) +endif (TCLAP_INCLUDE_DIR) + +# find the headers +find_path (TCLAP_INCLUDE_PATH tclap/CmdLine.h + PATHS + ${CMAKE_SOURCE_DIR}/include + ${CMAKE_INSTALL_PREFIX}/include + ) + +# handle the QUIETLY and REQUIRED arguments and set TCLAP_FOUND to +# TRUE if all listed variables are TRUE +include (FindPackageHandleStandardArgs) +find_package_handle_standard_args (TCLAP "TCLAP (http://tclap.sourceforge.net/) could not be found. Set TCLAP_INCLUDE_PATH to point to the headers adding '-DTCLAP_INCLUDE_PATH=/path/to/tclap' to the cmake command." TCLAP_INCLUDE_PATH) + +if (TCLAP_FOUND) + set (TCLAP_INCLUDE_DIR ${TCLAP_INCLUDE_PATH}) +endif (TCLAP_FOUND) + +mark_as_advanced(TCLAP_INCLUDE_PATH) diff --git a/cmake/Findreadline.cmake b/cmake/Findreadline.cmake new file mode 100644 index 0000000..288b82e --- /dev/null +++ b/cmake/Findreadline.cmake @@ -0,0 +1,36 @@ +# Try to find libreadline +# Once done, this will define +# +# READLINE_FOUND - system has readline +# READLINE_INCLUDE_DIRS - readline include directories +# READLINE_LIBRARIES - libraries need to use readline +# +# and the following imported targets +# +# READLINE::READLINE + +find_path (READLINE_INCLUDE_DIR + NAMES readline/readline.h + HINTS ${READLINE_ROOT}) + +find_library (READLINE_LIBRARY + NAMES readline + HINTS ${READLINE_ROOT} + PATH_SUFFIXES ${CMAKE_INSTALL_LIBDIR}) + +include (FindPackageHandleStandardArgs) +find_package_handle_standard_args (readline + REQUIRED_VARS READLINE_LIBRARY READLINE_INCLUDE_DIR) + +mark_as_advanced (READLINE_FOUND READLINE_LIBRARY READLINE_INCLUDE_DIR) + +if (READLINE_FOUND AND NOT TARGET READLINE::READLINE) + add_library (READLINE::READLINE UNKNOWN IMPORTED) + set_target_properties (READLINE::READLINE PROPERTIES + IMPORTED_LOCATION "${READLINE_LIBRARY}" + INTERFACE_INCLUDE_DIRECTORIES "${READLINE_INCLUDE_DIR}") +endif () + +set (READLINE_INCLUDE_DIRS ${READLINE_INCLUDE_DIR}) +set (READLINE_LIBRARIES ${READLINE_LIBRARY}) + diff --git a/include/mochi-raft.h b/include/mochi-raft.h index c72ceae..fb952b9 100644 --- a/include/mochi-raft.h +++ b/include/mochi-raft.h @@ -112,14 +112,11 @@ int mraft_io_set_rpc_timeout(struct raft_io* raft_io, double timeout_ms); /** * @see raft_init */ -static inline int mraft_init(struct raft* r, - struct raft_io* io, - struct raft_fsm* fsm, - raft_id id, - const char* address) -{ - return raft_init(r, io, fsm, id, address); -} +int mraft_init(struct raft* r, + struct raft_io* io, + struct raft_fsm* fsm, + raft_id id, + const char* address); /** * @brief Close the raft instance. diff --git a/include/mochi-raft.hpp b/include/mochi-raft.hpp index 3bf4c3a..ef96699 100644 --- a/include/mochi-raft.hpp +++ b/include/mochi-raft.hpp @@ -124,6 +124,12 @@ class Raft { uint16_t provider_id = 0, ABT_pool pool = ABT_POOL_NULL) { + memset(&m_raft_fsm, 0, sizeof(m_raft_fsm)); + memset(&m_raft_log, 0, sizeof(m_raft_log)); + memset(&m_raft_io, 0, sizeof(m_raft_io)); + memset(&m_raft_tracer, 0, sizeof(m_raft_tracer)); + memset(&m_raft, 0, sizeof(m_raft)); + int ret; hg_return_t hret; @@ -181,6 +187,14 @@ class Raft { Raft& operator=(const Raft&) = delete; Raft& operator=(Raft&&) = delete; + void set_heartbeat_timeout(unsigned msecs) { + raft_set_heartbeat_timeout(&m_raft, msecs); + } + + raft_io* get_raft_io() { + return &m_raft_io; + } + template void bootstrap(const ServerInfoContainer& serverList) { diff --git a/spack.yaml b/spack.yaml index 7957f20..9d4acf2 100644 --- a/spack.yaml +++ b/spack.yaml @@ -8,6 +8,8 @@ spack: - json-c - mochi-ssg - mochi-thallium + - tclap + - py-pybind11 modules: prefix_inspections: lib: [LD_LIBRARY_PATH] diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 0ba57f5..13d52c9 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -27,7 +27,8 @@ target_link_libraries (mochi-raft PkgConfig::RAFT ${OPTIONAL_SSG} PkgConfig::JSONC - PRIVATE coverage_config) + PRIVATE coverage_config + asan_config) target_include_directories (mochi-raft PUBLIC $) target_include_directories (mochi-raft BEFORE PUBLIC $) diff --git a/src/abt-io-log.c b/src/abt-io-log.c index 80ba00f..23cfa03 100644 --- a/src/abt-io-log.c +++ b/src/abt-io-log.c @@ -12,14 +12,13 @@ #include "abt-io-macros.h" #include "abt-io-log-helpers.h" -#define ENTRY_FILE_MAX_SIZE (128 * 1024) - struct abt_io_log { raft_id id; char* path; abt_io_instance_id aid; margo_instance_id mid; bool owns_aid; + size_t max_entry_file_size; }; static int abt_io_log_load(struct mraft_log* log, @@ -198,7 +197,7 @@ static int abt_io_log_append(struct mraft_log* log, const off_t entry_size = sizeof(entries[i].term) + sizeof(entries[i].type) + sizeof(entries[i].buf.len) + entries[i].buf.len; - if (offset + entry_size > ENTRY_FILE_MAX_SIZE) { + if (offset + entry_size > abtlog->max_entry_file_size) { /* Size exceeded */ /* Close old entry file */ ABT_IO_CLOSE(abtlog->aid, entry_fd); @@ -649,13 +648,14 @@ int mraft_abt_io_log_init(struct mraft_log* log, raft_id id, mraft_abt_io_log_ar SET_FUNCTION(snapshot_get); #undef SET_FUNCTION - struct abt_io_log* abtlog = calloc(1, sizeof(*abtlog)); - abtlog->aid = aid; - abtlog->id = id; - abtlog->mid = mid; - abtlog->owns_aid = owns_aid; - abtlog->path = path; - log->data = abtlog; + struct abt_io_log* abtlog = calloc(1, sizeof(*abtlog)); + abtlog->aid = aid; + abtlog->id = id; + abtlog->mid = mid; + abtlog->owns_aid = owns_aid; + abtlog->path = path; + abtlog->max_entry_file_size = max_entry_file_size; + log->data = abtlog; /* Create each of the files if not already done */ int fd; @@ -708,8 +708,10 @@ int mraft_abt_io_log_finalize(struct mraft_log* log) if(abtlog->owns_aid) abt_io_finalize(abtlog->aid); + free(abtlog->path); + free(abtlog); memset(log, 0, sizeof(*log)); - MRAFT_SUCCESS; + return MRAFT_SUCCESS; } diff --git a/src/abt-io-macros.h b/src/abt-io-macros.h index 5e27029..3f42890 100644 --- a/src/abt-io-macros.h +++ b/src/abt-io-macros.h @@ -7,6 +7,8 @@ #ifndef ABT_IO_MACROS_H #define ABT_IO_MACROS_H +#include + #ifdef __cplusplus extern "C" { #endif @@ -70,4 +72,4 @@ int _abt_io_error_handler(abt_io_instance_id abtio, int fd, int error_code); } #endif -#endif /* ABT_IO_MACROS_H */ \ No newline at end of file +#endif /* ABT_IO_MACROS_H */ diff --git a/src/mraft-data.h b/src/mraft-data.h new file mode 100644 index 0000000..3b7cc0b --- /dev/null +++ b/src/mraft-data.h @@ -0,0 +1,25 @@ +/* + * (C) 2023 The University of Chicago + * + * See COPYRIGHT in top-level directory. + */ +#ifndef MRAFT_DATA_H +#define MRAFT_DATA_H + +#include "mochi-raft.h" + +struct mraft_data { + ABT_mutex mutex; +}; + +static inline void raft_lock(struct raft* r) { + struct mraft_data* __data = (struct mraft_data*)((r)->data); + ABT_mutex_lock(__data->mutex); +} + +static inline void raft_unlock(struct raft* r) { + struct mraft_data* __data = (struct mraft_data*)((r)->data); + ABT_mutex_unlock(__data->mutex); +} + +#endif diff --git a/src/mraft-io.c b/src/mraft-io.c index ae22801..bfa8589 100644 --- a/src/mraft-io.c +++ b/src/mraft-io.c @@ -4,6 +4,7 @@ * See COPYRIGHT in top-level directory. */ #include "mochi-raft.h" +#include "mraft-data.h" #include "mraft-io.h" #include "mraft-rpc.h" #include @@ -151,15 +152,22 @@ int mraft_io_impl_load(struct raft_io *io, static void ticker_ult(void* args) { struct raft_io* io = (struct raft_io*)args; - struct mraft_io_impl* impl = (struct mraft_io_impl*)io->impl; + struct raft* r = (struct raft*)io->data; + volatile struct mraft_io_impl* impl = (struct mraft_io_impl*)io->impl; margo_trace(impl->mid, "[mraft] Starting ticker ULT"); raft_io_tick_cb tick = impl->tick_cb; while(tick) { + double t1 = ABT_get_wtime(); + raft_lock(r); #ifdef MRAFT_ENABLE_TESTS if(!impl->simulate_dead) #endif tick(io); - margo_thread_sleep(impl->mid, impl->tick_msec); + raft_unlock(r); + double t2 = ABT_get_wtime(); + double tick_duration_msec = (t2 - t1)*1000; + if(tick_duration_msec < impl->tick_msec) + margo_thread_sleep(impl->mid, impl->tick_msec); tick = impl->tick_cb; } } @@ -184,8 +192,10 @@ int mraft_io_impl_bootstrap(struct raft_io *io, const struct raft_configuration struct mraft_io_impl* impl = (struct mraft_io_impl*)io->impl; margo_trace(impl->mid, "[mraft] Boostrapping raft cluster"); if(!impl->log->bootstrap) { + // LCOV_EXCL_START margo_error(impl->mid, "[mraft] bootstrap function in mraft_log structure not implemented"); return RAFT_NOTFOUND; + // LCOV_EXCL_STOP } return (impl->log->bootstrap)(impl->log, conf); } @@ -195,8 +205,10 @@ int mraft_io_impl_recover(struct raft_io *io, const struct raft_configuration *c struct mraft_io_impl* impl = (struct mraft_io_impl*)io->impl; margo_trace(impl->mid, "[mraft] Recovering raft cluster"); if(!impl->log->recover) { + // LCOV_EXCL_START margo_error(impl->mid, "[mraft] recover function in mraft_log structure not implemented"); return RAFT_NOTFOUND; + // LCOV_EXCL_STOP } return (impl->log->recover)(impl->log, conf); } @@ -206,8 +218,10 @@ int mraft_io_impl_set_term(struct raft_io *io, raft_term term) struct mraft_io_impl* impl = (struct mraft_io_impl*)io->impl; margo_trace(impl->mid, "[mraft] Setting term to %lu", term); if(!impl->log->set_term) { + // LCOV_EXCL_START margo_error(impl->mid, "[mraft] set_term function in mraft_log structure not implemented"); return RAFT_NOTFOUND; + // LCOV_EXCL_STOP } return (impl->log->set_term)(impl->log, term); } @@ -217,21 +231,37 @@ int mraft_io_impl_set_vote(struct raft_io *io, raft_id server_id) struct mraft_io_impl* impl = (struct mraft_io_impl*)io->impl; margo_trace(impl->mid, "[mraft] Setting vote to %lu", server_id); if(!impl->log->set_vote) { + // LCOV_EXCL_START margo_error(impl->mid, "[mraft] set_vote function in mraft_log structure not implemented"); return RAFT_NOTFOUND; + // LCOV_EXCL_STOP } return (impl->log->set_vote)(impl->log, server_id); } +struct send_ctx { + hg_handle_t handle; + struct raft_io_send* req; +}; + +static void send_complete(void* u, hg_return_t hret) +{ + struct send_ctx* ctx = (struct send_ctx*)u; + margo_destroy(ctx->handle); + int status = (hret == HG_SUCCESS || hret == HG_TIMEOUT) ? 0 : RAFT_CANCELED; + if(ctx->req->cb) (ctx->req->cb)(ctx->req, status); + +} + int mraft_io_impl_send(struct raft_io *io, struct raft_io_send *req, const struct raft_message *message, raft_io_send_cb cb) { struct mraft_io_impl* impl = (struct mraft_io_impl*)io->impl; - hg_handle_t h; - hg_return_t hret; - hg_addr_t addr = HG_ADDR_NULL; + hg_return_t hret; + hg_addr_t addr = HG_ADDR_NULL; + struct send_ctx* send_ctx = NULL; #ifdef MRAFT_ENABLE_TESTS if(impl->simulate_dead) @@ -243,8 +273,8 @@ int mraft_io_impl_send(struct raft_io *io, in.server_address = impl->self_address; margo_trace(impl->mid, - "[mraft] Sending message of type %d to server id %lu (%s)", - in.type, in.server_id, in.server_address); + "[mraft] Sending message of type %d to server at address %s", + in.type, in.server_address); req->cb = cb; @@ -253,30 +283,36 @@ int mraft_io_impl_send(struct raft_io *io, margo_error(impl->mid, "[mraft] Could not resolve address %s: margo_addr_lookup returned %d", message->server_address, hret); - if(cb) cb(req, RAFT_CANCELED); return MRAFT_ERR_FROM_MERCURY; } - hret = margo_create(impl->mid, addr, impl->craft_rpc_id, &h); + send_ctx = (struct send_ctx*)calloc(1, sizeof(*send_ctx)); + send_ctx->req = req; + + hret = margo_create(impl->mid, addr, impl->craft_rpc_id, &send_ctx->handle); if(hret != HG_SUCCESS) { margo_error(impl->mid, "[mraft] Could not create handle: margo_create returned %d", hret); - if(cb) cb(req, RAFT_CANCELED); + margo_addr_free(impl->mid, addr); + free(send_ctx); return MRAFT_ERR_FROM_MERCURY; } - hret = margo_provider_forward_timed(impl->provider_id, h, (void*)&in, - impl->craft_rpc_timeout_ms); - if(hret != HG_SUCCESS && hret != HG_TIMEOUT) { + hret = margo_provider_cforward_timed(impl->provider_id, + send_ctx->handle, + (void*)&in, + impl->craft_rpc_timeout_ms, + send_complete, + send_ctx); + if(hret != HG_SUCCESS) { margo_error(impl->mid, "[mraft] Could forward handle: margo_provider_forward returned %d", hret); - if(cb) cb(req, RAFT_CANCELED); + margo_addr_free(impl->mid, addr); + free(send_ctx); return MRAFT_ERR_FROM_MERCURY; } - margo_destroy(h); - - if(cb) cb(req, 0); + margo_addr_free(impl->mid, addr); return MRAFT_SUCCESS; } @@ -439,6 +475,7 @@ static void mraft_craft_rpc_ult(hg_handle_t h) margo_instance_id mid = margo_hg_handle_get_instance(h); const struct hg_info* info = margo_get_info(h); struct raft_io* io = (struct raft_io*)margo_registered_data(mid, info->id); + struct raft* r = (struct raft*)io->data; struct mraft_io_impl* impl = (struct mraft_io_impl*)io->impl; hret = margo_get_input(h, &msg); @@ -455,7 +492,9 @@ static void mraft_craft_rpc_ult(hg_handle_t h) if(!impl->simulate_dead) #endif if(recv_cb) { + raft_lock(r); recv_cb(io, &msg); + raft_unlock(r); // this change in the message is necessary because the callback // has already freed some fields but did not reset them switch(msg.type) { diff --git a/src/mraft-rpc.h b/src/mraft-rpc.h index 8681831..71fbcb4 100644 --- a/src/mraft-rpc.h +++ b/src/mraft-rpc.h @@ -40,8 +40,6 @@ static inline hg_return_t hg_proc_raft_buffer(hg_proc_t proc, struct raft_buffer buf->len = 0; buf->base = NULL; break; - default: - break; } return HG_SUCCESS; } @@ -98,8 +96,6 @@ static inline hg_return_t hg_proc_raft_append_entries(hg_proc_t proc, struct raf break; case HG_FREE: break; - default: - break; } return HG_SUCCESS; } diff --git a/src/mraft.c b/src/mraft.c index b9984a7..cadbbc2 100644 --- a/src/mraft.c +++ b/src/mraft.c @@ -4,6 +4,7 @@ * See COPYRIGHT in top-level directory. */ #include "mochi-raft.h" +#include "mraft-data.h" #include "mraft-io.h" #include "mraft-rpc.h" #include @@ -72,9 +73,29 @@ int mraft_io_finalize(struct raft_io* raft_io) return MRAFT_SUCCESS; } +int mraft_init(struct raft* r, + struct raft_io* io, + struct raft_fsm* fsm, + raft_id id, + const char* address) +{ + int ret = raft_init(r, io, fsm, id, address); + if(ret == 0) { + struct mraft_data* data = calloc(1, sizeof(*data)); + ABT_mutex_create(&data->mutex); + r->data = (void*)data; + } + return ret; +} + void mraft_close(struct raft* r) { + raft_lock(r); raft_close(r, NULL); + raft_unlock(r); + struct mraft_data* data = (struct mraft_data*)r->data; + ABT_mutex_free(&data->mutex); + free(data); } #ifdef ENABLE_SSG @@ -104,7 +125,9 @@ int mraft_bootstrap_from_ssg(struct raft* r, } raft_configuration_add(&conf, member_id, address, RAFT_VOTER); } + raft_lock(r); ret = raft_bootstrap(r, &conf); + raft_unlock(r); raft_configuration_close(&conf); return ret; error: @@ -149,7 +172,9 @@ int mraft_apply(struct raft *r, memcpy(bufs_cpy[i].base, bufs[i].base, bufs_cpy[i].len); } + raft_lock(r); ret = raft_apply(r, &req, bufs_cpy, n, mraft_apply_cb); + raft_unlock(r); if(ret != 0) { ABT_eventual_free(&ev); for(unsigned i=0; i < n; i++) { @@ -161,12 +186,14 @@ int mraft_apply(struct raft *r, ABT_eventual_wait(ev, (void**)&status); ret = *status; ABT_eventual_free(&ev); - return 0; + return ret; } raft_id leader_id = 0; const char* leader_address = NULL; + raft_lock(r); raft_leader(r, &leader_id, &leader_address); + raft_unlock(r); if(leader_id == 0) return RAFT_LEADERSHIPLOST; @@ -238,7 +265,9 @@ int mraft_barrier(struct raft *r) ABT_eventual ev = ABT_EVENTUAL_NULL; ABT_eventual_create(sizeof(int), &ev); req.data = (void*)ev; + raft_lock(r); ret = raft_barrier(r, &req, mraft_barrier_cb); + raft_unlock(r); if(ret != 0) { ABT_eventual_free(&ev); return ret; @@ -247,12 +276,14 @@ int mraft_barrier(struct raft *r) ABT_eventual_wait(ev, (void**)&status); ret = *status; ABT_eventual_free(&ev); - return 0; + return ret; } raft_id leader_id = 0; const char* leader_address = NULL; + raft_lock(r); raft_leader(r, &leader_id, &leader_address); + raft_unlock(r); if(leader_id == 0) return RAFT_LEADERSHIPLOST; @@ -321,7 +352,9 @@ int mraft_add(struct raft *r, ABT_eventual ev = ABT_EVENTUAL_NULL; ABT_eventual_create(sizeof(int), &ev); req.data = (void*)ev; + raft_lock(r); ret = raft_add(r, &req, id, address, mraft_change_cb); + raft_unlock(r); if(ret != 0) { ABT_eventual_free(&ev); return ret; @@ -330,12 +363,14 @@ int mraft_add(struct raft *r, ABT_eventual_wait(ev, (void**)&status); ret = *status; ABT_eventual_free(&ev); - return 0; + return ret; } raft_id leader_id = 0; const char* leader_address = NULL; + raft_lock(r); raft_leader(r, &leader_id, &leader_address); + raft_unlock(r); if(leader_id == 0) return RAFT_LEADERSHIPLOST; @@ -403,7 +438,9 @@ int mraft_assign(struct raft *r, ABT_eventual ev = ABT_EVENTUAL_NULL; ABT_eventual_create(sizeof(int), &ev); req.data = (void*)ev; + raft_lock(r); ret = raft_assign(r, &req, id, role, mraft_change_cb); + raft_unlock(r); if(ret != 0) { ABT_eventual_free(&ev); return ret; @@ -412,12 +449,14 @@ int mraft_assign(struct raft *r, ABT_eventual_wait(ev, (void**)&status); ret = *status; ABT_eventual_free(&ev); - return 0; + return ret; } raft_id leader_id = 0; const char* leader_address = NULL; + raft_lock(r); raft_leader(r, &leader_id, &leader_address); + raft_unlock(r); if(leader_id == 0) return RAFT_LEADERSHIPLOST; @@ -484,7 +523,9 @@ int mraft_remove(struct raft *r, ABT_eventual ev = ABT_EVENTUAL_NULL; ABT_eventual_create(sizeof(int), &ev); req.data = (void*)ev; + raft_lock(r); ret = raft_remove(r, &req, id, mraft_change_cb); + raft_unlock(r); if(ret != 0) { ABT_eventual_free(&ev); return ret; @@ -493,12 +534,14 @@ int mraft_remove(struct raft *r, ABT_eventual_wait(ev, (void**)&status); ret = *status; ABT_eventual_free(&ev); - return 0; + return ret; } raft_id leader_id = 0; const char* leader_address = NULL; + raft_lock(r); raft_leader(r, &leader_id, &leader_address); + raft_unlock(r); if(leader_id == 0) return RAFT_LEADERSHIPLOST; @@ -570,7 +613,9 @@ int mraft_transfer(struct raft *r, ABT_eventual ev = ABT_EVENTUAL_NULL; ABT_eventual_create(0, &ev); req.data = (void*)ev; + raft_lock(r); ret = raft_transfer(r, &req, id, mraft_transfer_cb); + raft_unlock(r); if(ret != 0) { ABT_eventual_free(&ev); return ret; @@ -582,7 +627,9 @@ int mraft_transfer(struct raft *r, raft_id leader_id = 0; const char* leader_address = NULL; + raft_lock(r); raft_leader(r, &leader_id, &leader_address); + raft_unlock(r); if(leader_id == 0) return RAFT_LEADERSHIPLOST; diff --git a/src/pcg_basic.c b/src/pcg_basic.c index 8c2fd0d..9b87b24 100644 --- a/src/pcg_basic.c +++ b/src/pcg_basic.c @@ -1,3 +1,4 @@ +// LCOV_EXCL_START /* * PCG Random Number Generation for C. * @@ -113,4 +114,4 @@ uint32_t pcg32_boundedrand(uint32_t bound) { return pcg32_boundedrand_r(&pcg32_global, bound); } - +// LCOV_EXCL_STOP diff --git a/src/pcg_basic.h b/src/pcg_basic.h index e2b526a..12e18c2 100644 --- a/src/pcg_basic.h +++ b/src/pcg_basic.h @@ -1,3 +1,4 @@ +// LCOV_EXCL_START /* * PCG Random Number Generation for C. * @@ -76,3 +77,4 @@ uint32_t pcg32_boundedrand_r(pcg32_random_t* rng, uint32_t bound); #endif #endif // PCG_BASIC_H_INCLUDED +// LCOV_EXCL_STOP diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 1bf82cc..1da6371 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -2,33 +2,21 @@ enable_testing () find_package (MPI REQUIRED) find_package (thallium REQUIRED) +find_package (TCLAP REQUIRED) +find_package (readline REQUIRED) +set (CMAKE_CXX_STANDARD 17) +set (CMAKE_CXX_STANDARD_REQUIRED ON) -add_executable (mraft_get_raft_id mraft_get_raft_id.c) -target_link_libraries (mraft_get_raft_id PUBLIC mochi-raft PkgConfig::SSG MPI::MPI_C) +add_executable (mraft-py-test python-test-framework.cpp) +target_link_libraries (mraft-py-test + PRIVATE pybind11::embed asan_config mochi-raft thallium READLINE::READLINE) -add_executable (memory-log-simple-test memory-log-simple-test.c) -target_link_libraries (memory-log-simple-test PUBLIC mochi-raft PkgConfig::SSG MPI::MPI_C) - -add_executable (abt-io-log-simple-test abt-io-log-simple-test.c) -target_link_libraries (abt-io-log-simple-test PUBLIC mochi-raft PkgConfig::SSG MPI::MPI_C) - -add_executable (memory-log-error-recovery memory-log-error-recovery.c) -target_link_libraries (memory-log-error-recovery PUBLIC mochi-raft PkgConfig::SSG MPI::MPI_C) - -add_executable (abt-io-log-error-recovery abt-io-log-error-recovery.c) -target_link_libraries (abt-io-log-error-recovery PUBLIC mochi-raft PkgConfig::SSG MPI::MPI_C) - -add_executable (memory-log-generic-test memory-log-generic-test.cpp) -target_link_libraries (memory-log-generic-test PUBLIC mochi-raft thallium) - -add_executable (abt-io-log-generic-test abt-io-log-generic-test.cpp) -target_link_libraries (abt-io-log-generic-test PUBLIC mochi-raft thallium) - -file (GLOB scenarios ${CMAKE_CURRENT_SOURCE_DIR}/scenarios/*.txt) +file (GLOB scenarios ${CMAKE_CURRENT_SOURCE_DIR}/scenarios/*.py) foreach (scenario ${scenarios}) get_filename_component (test-name ${scenario} NAME_WE) - message (STATUS "Found test scenario: ${test-name}") - add_test (NAME ${test-name} - COMMAND timeout 600 ${CMAKE_CURRENT_BINARY_DIR}/abt-io-log-generic-test -f ${scenario}) + add_test (NAME ${test-name}-abt-io + COMMAND ${CMAKE_CURRENT_SOURCE_DIR}/run-test.sh ${scenario} abt-io) + add_test (NAME ${test-name}-memory + COMMAND ${CMAKE_CURRENT_SOURCE_DIR}/run-test.sh ${scenario} memory) endforeach () -add_custom_target (check COMMAND ${CMAKE_CTEST_COMMAND} DEPENDS ${scenarios} abt-io-log-generic-test) +add_custom_target (check COMMAND ${CMAKE_CTEST_COMMAND} DEPENDS ${scenarios} mraft-py-test) diff --git a/tests/abt-io-log-generic-test.cpp b/tests/abt-io-log-generic-test.cpp index 988b27d..5ca65ca 100644 --- a/tests/abt-io-log-generic-test.cpp +++ b/tests/abt-io-log-generic-test.cpp @@ -43,7 +43,7 @@ class MyFSM : public mraft::StateMachine { void trace(Message&& msg, Args&&... args) { std::string fmt{"[fsm:%d] "}; fmt += std::forward(msg); - margo_trace(mid, fmt.c_str(), raftId, std::forward(args)...); + margo_info(mid, fmt.c_str(), raftId, std::forward(args)...); } public: @@ -81,19 +81,20 @@ class WorkerProvider : public tl::provider { public: - WorkerProvider(tl::engine& e, raft_id id) + WorkerProvider(tl::engine& e, raft_id id, std::string config) : tl::provider(e, 0) , myfsm{e.get_margo_instance(), id} - , abtlog{id} + , abtlog{id, ABT_IO_INSTANCE_NULL, config.c_str(), e.get_margo_instance()} , raft(e.get_margo_instance(), id, myfsm, abtlog) , raftId(id) { // raft.enable_tracer(true); raft.set_tracer( [this](const char* file, int line, const char* message) { - margo_trace(get_engine().get_margo_instance(), + margo_debug(get_engine().get_margo_instance(), "[worker:%lu] [%s:%d] %s", raftId, file, line, message); }); + raft.set_heartbeat_timeout(500); #define DEFINE_WORKER_RPC(__rpc__) define(#__rpc__, &WorkerProvider::__rpc__) DEFINE_WORKER_RPC(start); @@ -106,6 +107,7 @@ class WorkerProvider : public tl::provider { DEFINE_WORKER_RPC(transfer); DEFINE_WORKER_RPC(shutdown); DEFINE_WORKER_RPC(suspend); + DEFINE_WORKER_RPC(terminate); DEFINE_WORKER_RPC(get_fsm_content); #undef DEFINE_WORKER_RPC } @@ -155,9 +157,15 @@ class WorkerProvider : public tl::provider { MRAFT_WRAP_CPP_CALL(apply, &bufs, 1U); } + int terminate() + { + get_engine().finalize(); + return 0; + } + mraft::ServerInfo get_leader(void) const { - margo_trace(get_engine().get_margo_instance(), + margo_debug(get_engine().get_margo_instance(), "[worker:%lu] received get_leader", raftId); mraft::ServerInfo leaderInfo; try { @@ -168,7 +176,7 @@ class WorkerProvider : public tl::provider { .address = std::string(""), }; } - margo_trace(get_engine().get_margo_instance(), + margo_debug(get_engine().get_margo_instance(), "[worker:%lu] completed get_leader", raftId); return leaderInfo; } @@ -180,7 +188,7 @@ class WorkerProvider : public tl::provider { int shutdown(void) const { - margo_trace(get_engine().get_margo_instance(), + margo_debug(get_engine().get_margo_instance(), "[worker:%lu] received shutdown", raftId); try { get_engine().finalize(); @@ -189,16 +197,18 @@ class WorkerProvider : public tl::provider { "[worker:%lu] %s", ex.what()); return 1; } - margo_trace(get_engine().get_margo_instance(), + margo_debug(get_engine().get_margo_instance(), "[worker:%lu] completed shutdown", raftId); return 0; } int suspend(double timeout_ms) const { + margo_debug(get_engine().get_margo_instance(), + "[worker:%lu] suspending for %lf msec", raftId, timeout_ms); try { get_engine().get_progress_pool().make_thread( - [timeout_ms]() { sleep(timeout_ms); }, tl::anonymous{}); + [timeout_ms]() { sleep(timeout_ms/1000); }, tl::anonymous{}); } catch (thallium::exception& ex) { margo_critical(get_engine().get_margo_instance(), "[worker:%lu] %s", raftId, ex.what()); @@ -216,7 +226,9 @@ class Master { public: - Master(tl::engine& e) : engine(e) + Master(tl::engine& e, std::string path) + : engine(e) + , path(std::move(path)) { #define DEFINE_MASTER_RPC(__rpc__) \ rpcs.insert(std::make_pair(#__rpc__, engine.define(#__rpc__))) @@ -229,6 +241,7 @@ class Master { DEFINE_MASTER_RPC(transfer); DEFINE_MASTER_RPC(shutdown); DEFINE_MASTER_RPC(suspend); + DEFINE_MASTER_RPC(terminate); DEFINE_MASTER_RPC(get_leader); DEFINE_MASTER_RPC(get_fsm_content); #undef DEFINE_MASTER_RPC @@ -237,7 +250,14 @@ class Master { ~Master() { // Request to all workers to shutdown - while (!cluster.empty()) shutdownWorker(cluster.cbegin()->first); + //while (!cluster.empty()) shutdownWorker(cluster.cbegin()->first); + for(auto& p : cluster) { + auto& addr = p.second; + auto ep = engine.lookup(addr); + auto ph = tl::provider_handle{ep, 0}; + rpcs["terminate"].on(ph)(); + waitpid(p.first, NULL, 0); + } engine.finalize(); } @@ -248,9 +268,9 @@ class Master { while (std::getline(input, line)) { if (line.empty() || line[0] == '#') continue; - margo_trace(engine.get_margo_instance(), + margo_debug(engine.get_margo_instance(), "=================================================="); - margo_trace(engine.get_margo_instance(), + margo_debug(engine.get_margo_instance(), "[master] read line: \"%s\"", line.c_str()); std::istringstream iss(line); std::string command; @@ -286,20 +306,21 @@ class Master { iss >> raftId >> timeout_ms; suspendWorker(raftId, timeout_ms); } else { - margo_trace(engine.get_margo_instance(), + margo_debug(engine.get_margo_instance(), "[master] ignoring unrecognized command \"%s\"", command.c_str()); } } - margo_trace(engine.get_margo_instance(), + margo_debug(engine.get_margo_instance(), "=================================================="); - margo_trace(engine.get_margo_instance(), + margo_debug(engine.get_margo_instance(), "[master] finished reading input stream"); } private: tl::engine engine; + std::string path; std::map rpcs; std::set seenRaftIds; std::map cluster; @@ -316,6 +337,9 @@ class Master { int pipeFd[2]; // Pipe for worker to communicate self_addr to master char workerAddrPtr[256]; + std::stringstream config; + config << "{\"path\":\"" << path << "\"}"; + if (pipe(pipeFd) == -1) { margo_critical(engine.get_margo_instance(), "failed creating communication pipe"); @@ -337,14 +361,14 @@ class Master { // The child creates its own engine engine = tl::engine("ofi+tcp;ofi_rxm", THALLIUM_SERVER_MODE); - margo_set_log_level(engine.get_margo_instance(), MARGO_LOG_TRACE); + margo_set_log_level(engine.get_margo_instance(), MARGO_LOG_DEBUG); // Get the self_addr and resize it to size 256 with '\0' std::string self_addr = engine.self(); self_addr.resize(256, '\0'); // Create provider - WorkerProvider* provider = new WorkerProvider(engine, raftId); + WorkerProvider* provider = new WorkerProvider(engine, raftId, config.str()); engine.push_finalize_callback([provider]() { delete provider; }); @@ -352,7 +376,7 @@ class Master { // Send self_addr to master ssize_t _ = write(pipeFd[1], self_addr.c_str(), self_addr.size()); close(pipeFd[1]); - margo_trace(engine.get_margo_instance(), + margo_debug(engine.get_margo_instance(), "[worker:%lu] wrote self address to pipe: %s", raftId, self_addr.c_str()); @@ -369,7 +393,7 @@ class Master { close(pipeFd[0]); std::string workerAddrStr(workerAddrPtr); workerAddrStr.resize(256, '\0'); - margo_trace(engine.get_margo_instance(), + margo_debug(engine.get_margo_instance(), "[master] read address from pipe: %s", workerAddrPtr); // Get handle for the spawned worker to make RPC requests to worker @@ -378,7 +402,7 @@ class Master { // If its the first spawned worker, send it an RPC requesting to // bootstrap the raft cluster if (cluster.size() == 0) { - margo_trace(engine.get_margo_instance(), + margo_debug(engine.get_margo_instance(), "[master] requesting bootstrap RPC to worker: " "id=%llu, address=%s", raftId, workerAddrPtr); @@ -397,7 +421,7 @@ class Master { } // Send RPC requesting the spawned worked to call mraft_start - margo_trace(engine.get_margo_instance(), + margo_debug(engine.get_margo_instance(), "[master] requesting start RPC to worker: " "id=%llu, address=%s", raftId, workerAddrPtr); @@ -418,13 +442,13 @@ class Master { mraft::ServerInfo leader; ret = getLeaderInfo(leader, handle); margo_assert(engine.get_margo_instance(), ret == 0); - margo_trace( + margo_debug( engine.get_margo_instance(), "[master] found leader of cluster: id=%llu, address=%s", leader.id, leader.address.c_str()); // Request add RPC to leader - margo_trace(engine.get_margo_instance(), + margo_debug(engine.get_margo_instance(), "[master] requesting start RPC to leader"); try { ret = rpcs["add"].on(handle)(raftId, workerAddrStr); @@ -436,7 +460,7 @@ class Master { // Request assign RPC to leader const auto role = mraft::Role::VOTER; - margo_trace(engine.get_margo_instance(), + margo_debug(engine.get_margo_instance(), "requesting assign RPC to leader: assigning " "id=%llu to role=%i", raftId, static_cast(role)); @@ -465,7 +489,7 @@ class Master { // If the worker is the leader, it must first transfer leadership if (leader.id == raftId) { - margo_trace(engine.get_margo_instance(), + margo_debug(engine.get_margo_instance(), "[master] leadership needs to be transfered before " "removal of id: %llu", raftId); @@ -478,7 +502,7 @@ class Master { } } - margo_trace( + margo_debug( engine.get_margo_instance(), "[master] requesting transfer RPC from id=%llu to id=%llu", raftId, transferToId); @@ -494,7 +518,7 @@ class Master { // Request the worker to remove itself from the cluster std::string workerAddr = cluster[raftId]; handle = tl::provider_handle(engine.lookup(workerAddr), 0); - margo_trace( + margo_debug( engine.get_margo_instance(), "[master] requesting remove RPC to worker: id=%llu, address=%s", raftId, workerAddr.c_str()); @@ -507,7 +531,7 @@ class Master { margo_assert(engine.get_margo_instance(), ret == 0); // Request worker to remove the shutdown process from the cluster - margo_trace( + margo_debug( engine.get_margo_instance(), "[master] requesting shutdown RPC to worker: id=%llu, address=%s", raftId, workerAddr.c_str()); @@ -530,7 +554,7 @@ class Master { tl::provider_handle handle = tl::provider_handle(engine.lookup(workerAddr), 0); - margo_trace( + margo_debug( engine.get_margo_instance(), "[master] requesting shutdown RPC to worker: id=%llu, address=%s", raftId, workerAddr.c_str()); @@ -538,10 +562,10 @@ class Master { ret = rpcs["shutdown"].on(handle)(); for (const auto& pair : pidToRaftId) { if (pair.second == raftId) { - margo_trace(engine.get_margo_instance(), + margo_debug(engine.get_margo_instance(), "[master] waiting on PID=%d", pair.first); waitpid(pair.first, NULL, 0); - margo_trace(engine.get_margo_instance(), + margo_debug(engine.get_margo_instance(), "[master] done waiting on PID=%d", pair.first); } } @@ -561,26 +585,26 @@ class Master { mraft::ServerInfo leader; ret = getLeaderInfo(leader, handle); // margo_assert(engine.get_margo_instance(), ret == 0); - margo_trace(engine.get_margo_instance(), + margo_debug(engine.get_margo_instance(), "[master] found leader of cluster: id=%llu, address=%s", leader.id, leader.address.c_str()); if (ret != 0) { if (cluster.size() == 1) break; - margo_trace(engine.get_margo_instance(), + margo_debug(engine.get_margo_instance(), "[master] no leader found... trying again"); margo_thread_sleep(engine.get_margo_instance(), 2000); continue; } - margo_trace( + margo_debug( engine.get_margo_instance(), "[master] requesting remove RPC to leader: remove id=%llu", raftId); const auto start_time = std::chrono::steady_clock::now(); const auto end_time - = start_time + std::chrono::duration(500); + = start_time + std::chrono::duration(5000); const auto current_timeout_ms = std::chrono::duration( @@ -594,7 +618,7 @@ class Master { margo_assert(engine.get_margo_instance(), ret == 0); break; } catch (thallium::timeout& e) { - margo_trace( + margo_debug( engine.get_margo_instance(), "[master] timeout on remove RPC to id=%llu, address=%s", leader.id, leader.address.c_str()); @@ -626,11 +650,11 @@ class Master { tl::provider_handle handle; ret = getLeaderInfo(leader, handle); margo_assert(engine.get_margo_instance(), ret == 0); - margo_trace(engine.get_margo_instance(), + margo_debug(engine.get_margo_instance(), "[master] found leader of cluster: id=%llu, address=%s", leader.id, leader.address.c_str()); - margo_trace( + margo_debug( engine.get_margo_instance(), "[master] requesting remove RPC to leader: remove id=%llu", killedWorkerRaftId); @@ -651,7 +675,7 @@ class Master { margo_assert(engine.get_margo_instance(), ret == 0); break; } catch (thallium::timeout& e) { - margo_trace( + margo_debug( engine.get_margo_instance(), "[master] timeout on remove RPC to id=%llu, address=%s", leader.id, leader.address.c_str()); @@ -672,11 +696,11 @@ class Master { tl::provider_handle handle; ret = getLeaderInfo(leader, handle); margo_assert(engine.get_margo_instance(), ret == 0); - margo_trace(engine.get_margo_instance(), + margo_debug(engine.get_margo_instance(), "[master] found leader of cluster: id=%llu, address=%s", leader.id, leader.address.c_str()); - margo_trace(engine.get_margo_instance(), + margo_debug(engine.get_margo_instance(), "[master] requesting apply RPC to leader: apply data='%s'", data.c_str()); try { @@ -693,7 +717,7 @@ class Master { int ret; std::string workerAddr = cluster[raftId]; tl::provider_handle handle(engine.lookup(workerAddr), 0); - margo_trace( + margo_debug( engine.get_margo_instance(), "[master] requesting suspend RPC to worker id=%llu: timeout_ms=%lf", raftId, timeout_ms); @@ -709,7 +733,7 @@ class Master { int getLeaderInfo(mraft::ServerInfo& leader, tl::provider_handle& handle) { for (auto it = cluster.cbegin(); it != cluster.cend(); it++) { - margo_trace(engine.get_margo_instance(), + margo_debug(engine.get_margo_instance(), "[master] requesting get_leader RPC to worker: " "id=%llu, address=%s", it->first, it->second.c_str()); @@ -728,12 +752,12 @@ class Master { = rpcs["get_leader"].on(handle).timed(current_timeout_ms); if (leader.id != 0) return 0; } catch (thallium::timeout& e) { - margo_trace( + margo_debug( engine.get_margo_instance(), "[master] timeout on get_leader RPC to id=%llu, address=%s", it->first, it->second.c_str()); } catch (const thallium::exception& e) { - margo_trace(engine.get_margo_instance(), + margo_debug(engine.get_margo_instance(), "[master] getLeader request threw exception..."); margo_error(engine.get_margo_instance(), "[master] %s", e.what()); @@ -752,11 +776,16 @@ class Master { * line arguments * @return 0 on success, -1 on failure */ -int parseCommandLineArgs(int argc, char* argv[], char** filename) +int parseCommandLineArgs(int argc, char* argv[], char** filename, char** path) { - static const char* const shortOpts = "f:"; + static const char* const shortOpts = "f:p:"; static const option longOpts[] - = {{"file", required_argument, nullptr, 'f'}, {nullptr, 0, nullptr, 0}}; + = {{"file", required_argument, nullptr, 'f'}, + {"path", required_argument, nullptr, 'p'}, + {nullptr, 0, nullptr, 0}}; + + *filename = nullptr; + *path = nullptr; int option; while ((option = getopt_long(argc, argv, shortOpts, longOpts, nullptr)) @@ -765,6 +794,9 @@ int parseCommandLineArgs(int argc, char* argv[], char** filename) case 'f': *filename = optarg; break; + case 'p': + *path = optarg; + break; case '?': default: // Handle unrecognized options or missing arguments @@ -780,27 +812,28 @@ int parseCommandLineArgs(int argc, char* argv[], char** filename) int main(int argc, char* argv[]) { tl::engine engine("ofi+tcp;ofi_rxm", THALLIUM_SERVER_MODE); - margo_set_log_level(engine.get_margo_instance(), MARGO_LOG_TRACE); + margo_set_log_level(engine.get_margo_instance(), MARGO_LOG_DEBUG); // Parse command-line arguments char* filename = nullptr; - parseCommandLineArgs(argc, argv, &filename); + char* path = nullptr; + parseCommandLineArgs(argc, argv, &filename, &path); if(filename) { - margo_trace(engine.get_margo_instance(), + margo_debug(engine.get_margo_instance(), "[master] Reading from file %s", filename); } else { - margo_trace(engine.get_margo_instance(), + margo_debug(engine.get_margo_instance(), "[master] Reading from stdin"); } std::ifstream inputFile; std::istream* input = (!filename) ? &std::cin : (inputFile.open(filename), &inputFile); - Master master(engine); + Master master(engine, path); - margo_trace(engine.get_margo_instance(), "[master] reading input stream"); + margo_debug(engine.get_margo_instance(), "[master] reading input stream"); master.readInput(*input); - margo_trace(engine.get_margo_instance(), "[master] exiting program"); + margo_debug(engine.get_margo_instance(), "[master] exiting program"); return 0; } diff --git a/tests/master.hpp b/tests/master.hpp new file mode 100644 index 0000000..64ec52f --- /dev/null +++ b/tests/master.hpp @@ -0,0 +1,424 @@ +#ifndef MRAFT_TEST_MASTER_HPP +#define MRAFT_TEST_MASTER_HPP + +#include "worker.hpp" +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace tl = thallium; +using namespace std::literals::chrono_literals; + +struct MasterContext; + +struct MasterOptions { + std::string protocol; + tl::logger::level logLevel; +}; + +static inline auto& operator<<(std::ostream& stream, const mraft::Role& role) { + switch(role) { + case mraft::Role::SPARE: + stream << "SPARE"; break; + case mraft::Role::VOTER: + stream << "VOTER"; break; + case mraft::Role::STANDBY: + stream << "STANDBY"; break; + default: + stream << "???"; + } + return stream; +} + +struct WorkerHandle { + + raft_id raftID = 0; + pid_t pID = -1; + std::string address; + mraft::Role role = mraft::Role::SPARE; + bool running = false; + std::weak_ptr master; + + WorkerHandle() = default; + + WorkerHandle(std::shared_ptr m, raft_id id, pid_t pid, std::string addr) + : raftID(id) + , pID(pid) + , address(std::move(addr)) + , master(m) {} + + auto to_string() const { + std::stringstream stream; + auto addr = (pID == -1) ? std::string{"?"} : static_cast(address); + stream << "WorkerHandle" + << " {raftID=" << raftID + << ", pID=" << pID + << ", address=" << addr + << ", role=" << role + << ", started=" << std::boolalpha << running + << "}"; + return stream.str(); + } + + bool operator==(const WorkerHandle& other) const { + return raftID == other.raftID + && pID == other.pID + && address == other.address; + } +}; + +struct ClusterHandle { + std::map> workers; + raft_id nextRaftID = 1; + + auto to_string() const { + std::stringstream stream; + for(const auto& [raftID, worker] : workers) { + stream << "- " << worker->to_string() << "\n"; + } + auto result = stream.str(); + if(!result.empty()) result.resize(result.size()-1); + return result; + } + + auto size() const { return workers.size(); } + auto begin() const { return workers.begin(); } + auto end() const { return workers.end(); } + auto begin() { return workers.begin(); } + auto end() { return workers.end(); } +}; + +struct Client { + + tl::engine engine; + + tl::remote_procedure bootstrap; + tl::remote_procedure start; + tl::remote_procedure apply; + tl::remote_procedure get_leader; + tl::remote_procedure get_fsm_content; + tl::remote_procedure barrier; + tl::remote_procedure assign; + tl::remote_procedure transfer; + tl::remote_procedure add; + tl::remote_procedure remove; + tl::remote_procedure suspend; + tl::remote_procedure isolate; + + Client(const char* protocol, tl::logger::level logLevel) + : engine{protocol, THALLIUM_CLIENT_MODE} + , bootstrap(engine.define("mraft_test_bootstrap")) + , start(engine.define("mraft_test_start")) + , apply(engine.define("mraft_test_apply")) + , get_leader(engine.define("mraft_test_get_leader")) + , get_fsm_content(engine.define("mraft_test_get_fsm_content")) + , barrier(engine.define("mraft_test_barrier")) + , assign(engine.define("mraft_test_assign")) + , transfer(engine.define("mraft_test_transfer")) + , add(engine.define("mraft_test_add")) + , remove(engine.define("mraft_test_remove")) + , suspend(engine.define("mraft_test_suspend")) + , isolate(engine.define("mraft_test_isolate")) { + engine.set_log_level(logLevel); + } +}; + +struct MasterContext : public std::enable_shared_from_this { + + MasterOptions masterOptions; + WorkerOptions workerOptions; + std::shared_ptr cluster; + std::shared_ptr client; + + MasterContext(const MasterOptions& mOptions, const WorkerOptions& wOptions) + : masterOptions{mOptions} + , workerOptions{wOptions} + , cluster{std::make_shared()} { + reinitializeClient(); + } + + void finalizeClient() { client.reset(); } + + void reinitializeClient() { + client = std::make_shared(masterOptions.protocol.c_str(), masterOptions.logLevel); + } + + void bootstrap() { + std::vector servers; + servers.reserve(cluster->size()); + for(const auto& [id, worker] : *cluster) + servers.push_back({id, worker->address, mraft::Role::VOTER}); + std::vector responses; + responses.reserve(cluster->size()); + for(const auto& [id, worker] : *cluster) { + tl::provider_handle ph = client->engine.lookup(worker->address); + responses.push_back(client->bootstrap.on(ph).async(servers)); + } + for(auto& response : responses) + response.wait(); + for(auto& [id, worker] : *cluster) + worker->role = mraft::Role::VOTER; + } + + void start() { + std::vector responses; + responses.reserve(cluster->size()); + for(auto& [id, worker] : *cluster) { + if(worker->running) continue; + tl::provider_handle ph = client->engine.lookup(worker->address); + responses.push_back(client->start.on(ph).async()); + } + for(auto& response : responses) + response.wait(); + for(auto& [id, worker] : *cluster) + worker->running = true; + } + + void waitForLeader() { + for(const auto& [id, worker] : *cluster) { + mraft::ServerInfo info; + while(true) { + tl::provider_handle ph = client->engine.lookup(worker->address); + info = client->get_leader.on(ph)(); + if(info.id != 0) break; + tl::thread::sleep(client->engine, 500); + } + } + } + + struct CatchMeIfYouCan {}; + + std::shared_ptr spawnWorker(raft_id id = 0) { + + int pipefd[2]; + if (pipe(pipefd) == -1) { + perror("Pipe creation failed"); + return nullptr; + } + auto raftID = id != 0 ? id : cluster->nextRaftID++; + + // we need to temporarily destroy everything thallium-related + // so that the child process does not inherit thems + finalizeClient(); + + pid_t pid = fork(); + if (pid < 0) { + perror("Fork failed"); + if(id == 0) cluster->nextRaftID--; + reinitializeClient(); + return nullptr; + + } else if (pid == 0) { + // Child process + close(pipefd[0]); + runWorker(pipefd[1], raftID, workerOptions); + // force a proper stack unwinding all the way back to main + throw CatchMeIfYouCan{}; + + } else { + // Parent process + close(pipefd[1]); + char address[1024]; + ssize_t bytes_read = read(pipefd[0], address, 1024); + (void)bytes_read; + close(pipefd[0]); + + reinitializeClient(); + + // add the new worker + cluster->workers[raftID] = std::make_shared( + shared_from_this(), raftID, pid, address); + + return cluster->workers[raftID]; + } + } + + #define ENSURE_WORKER_HANDLE_RUNNING(w, ret) do { \ + if(!w.running) { \ + margo_error(mid, "[master] worker is not running"); \ + return ret; \ + } \ + } while(0) + + #define CALL_RPC(rpc, w, ...) do { \ + tl::provider_handle ph = client->engine.lookup(w.address); \ + int ret = client->rpc.on(ph).timed(1s, ##__VA_ARGS__); \ + if(ret != 0) throw std::runtime_error( \ + std::string{"RPC returned "} + std::to_string(ret)); \ + } while(0) + + #define CATCH_ERRORS(rpc, ret) \ + catch(tl::timeout& ex) { \ + margo_error(mid, "[master] " #rpc " timed out"); \ + return ret; \ + } catch(const std::exception& ex) { \ + margo_error(mid, "[master] " #rpc " failed: %s", ex.what()); \ + return ret; \ + } + + bool apply(const WorkerHandle& w, const std::string& command) const { + auto mid = client->engine.get_margo_instance(); + ENSURE_WORKER_HANDLE_RUNNING(w, false); + try { + CALL_RPC(apply, w, command); + } CATCH_ERRORS(apply, false); + return true; + } + + bool assign(const WorkerHandle& w, WorkerHandle& target, mraft::Role role) const { + auto mid = client->engine.get_margo_instance(); + ENSURE_WORKER_HANDLE_RUNNING(w, false); + try { + CALL_RPC(assign, w, target.raftID, role); + target.role = role; + } CATCH_ERRORS(assign, false); + return true; + } + + bool add(const WorkerHandle& w, WorkerHandle& target) const { + auto mid = client->engine.get_margo_instance(); + ENSURE_WORKER_HANDLE_RUNNING(w, false); + try { + CALL_RPC(add, w, target.raftID, target.address); + } CATCH_ERRORS(add, false); + return true; + } + + bool remove(const WorkerHandle& w, WorkerHandle& target) const { + return remove(w, target.raftID); + } + + bool remove(const WorkerHandle& w, raft_id target) const { + auto mid = client->engine.get_margo_instance(); + ENSURE_WORKER_HANDLE_RUNNING(w, false); + try { + CALL_RPC(remove, w, target); + } CATCH_ERRORS(remove, false); + return true; + } + + bool start(WorkerHandle& w) const { + auto mid = client->engine.get_margo_instance(); + if(w.running) { + margo_error(mid, "[master] worker is alread running"); + return false; + } + try { + CALL_RPC(start, w); + w.running = true; + } CATCH_ERRORS(start, false); + return true; + } + + bool transfer(const WorkerHandle& w, WorkerHandle& target) const { + auto mid = client->engine.get_margo_instance(); + ENSURE_WORKER_HANDLE_RUNNING(w, false); + ENSURE_WORKER_HANDLE_RUNNING(target, false); + try { + CALL_RPC(transfer, w, target.raftID); + } CATCH_ERRORS(transfer, false); + return true; + } + + bool barrier(const WorkerHandle& w) const { + auto mid = client->engine.get_margo_instance(); + ENSURE_WORKER_HANDLE_RUNNING(w, false); + try { + CALL_RPC(barrier, w); + } CATCH_ERRORS(barrier, false); + return true; + } + + bool suspend(const WorkerHandle& w, unsigned msec) const { + auto mid = client->engine.get_margo_instance(); + ENSURE_WORKER_HANDLE_RUNNING(w, false); + try { + CALL_RPC(suspend, w, msec); + } CATCH_ERRORS(suspend, false); + return true; + } + + bool isolate(const WorkerHandle& w, bool flag) const { + auto mid = client->engine.get_margo_instance(); + ENSURE_WORKER_HANDLE_RUNNING(w, false); + try { + CALL_RPC(isolate, w, flag); + } CATCH_ERRORS(isolate, false); + return true; + } + + std::optional get_fsm_content(const WorkerHandle& w) const { + auto mid = client->engine.get_margo_instance(); + ENSURE_WORKER_HANDLE_RUNNING(w, std::nullopt); + try { + tl::provider_handle ph = client->engine.lookup(w.address); + std::string content = client->get_fsm_content.on(ph).timed(1s); + return content; + } CATCH_ERRORS(get_fsm_content, std::nullopt); + } + + std::shared_ptr get_leader(const WorkerHandle& w) const { + auto mid = client->engine.get_margo_instance(); + ENSURE_WORKER_HANDLE_RUNNING(w, nullptr); + try { + tl::provider_handle ph = client->engine.lookup(w.address); + raft_id leaderId = client->get_leader.on(ph).timed(1s); + if(leaderId == 0 || cluster->workers.count(leaderId) == 0) + return nullptr; + else + return cluster->workers[leaderId]; + } CATCH_ERRORS(get_leader, nullptr); + } + + bool shutdown(WorkerHandle& w) const { + static const WorkerHandle none; + auto mid = client->engine.get_margo_instance(); + auto it = cluster->workers.find(w.raftID); + if(it == cluster->workers.end()) { + margo_error(mid, "[master] worker not found in the cluster"); + return false; + } + try { + tl::endpoint addr = client->engine.lookup(w.address); + client->engine.shutdown_remote_engine(addr); + } catch(const std::exception& ex) { + margo_error(mid, "[master] shutdown failed: %s", ex.what()); + return false; + } + while(waitpid(w.pID, NULL, WNOHANG) != w.pID) { + usleep(100); + } + cluster->workers.erase(it); + w = none; + return true; + } + + bool kill(WorkerHandle& w) const { + static const WorkerHandle none; + auto mid = client->engine.get_margo_instance(); + auto it = cluster->workers.find(w.raftID); + if(it == cluster->workers.end()) { + margo_error(mid, "[master] worker not found in the cluster"); + return false; + } + ::kill(w.pID, SIGKILL); + while(waitpid(w.pID, NULL, WNOHANG) != w.pID) { + usleep(100); + } + cluster->workers.erase(it); + w = none; + return true; + } +}; + +#endif diff --git a/tests/python-test-framework.cpp b/tests/python-test-framework.cpp new file mode 100644 index 0000000..404fe4b --- /dev/null +++ b/tests/python-test-framework.cpp @@ -0,0 +1,316 @@ +#include "master.hpp" +#include "worker.hpp" +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace py = pybind11; +using namespace pybind11::literals; +namespace fs = std::filesystem; + +struct Options { + std::string protocol; + tl::logger::level masterLogLevel; + tl::logger::level workerLogLevel; + std::string pythonFile; + size_t initClusterSize; + std::string logPath; + std::string logType; + std::string tracePath; + size_t heartbeatTimeoutMs; +}; + +static void parseCommandLine(int argc, char** argv, Options& options); + +PYBIND11_EMBEDDED_MODULE(raft, m) { + + py::enum_(m, "Role") + .value("SPARE", mraft::Role::SPARE) + .value("VOTER", mraft::Role::VOTER) + .value("STANDBY", mraft::Role::STANDBY) + .export_values(); + + py::class_>(m, "Worker") + .def_readonly("raft_id", &WorkerHandle::raftID) + .def_readonly("address", &WorkerHandle::address) + .def("__repr__", [](const WorkerHandle& w) -> std::string { + return w.to_string(); + }) + .def("apply", [](const WorkerHandle& w, const std::string& command) -> bool { + return w.master.lock()->apply(w, command); + }) + .def("assign", [](const WorkerHandle& w, WorkerHandle& target, mraft::Role role) -> bool { + return w.master.lock()->assign(w, target, role); + }) + .def("add", [](const WorkerHandle& w, WorkerHandle& toAdd) -> bool { + return w.master.lock()->add(w, toAdd); + }) + .def("remove", [](const WorkerHandle& w, WorkerHandle& toRemove) -> bool { + return w.master.lock()->remove(w, toRemove); + }) + .def("remove", [](const WorkerHandle& w, raft_id id) -> bool { + return w.master.lock()->remove(w, id); + }) + .def("start", [](WorkerHandle& w) -> bool { + return w.master.lock()->start(w); + }) + .def("transfer", [](const WorkerHandle& w, WorkerHandle& target) -> bool { + return w.master.lock()->transfer(w, target); + }) + .def("barrier", [](const WorkerHandle& w) -> bool { + return w.master.lock()->barrier(w); + }) + .def("suspend", [](const WorkerHandle& w, unsigned msec) -> bool { + return w.master.lock()->suspend(w, msec); + }) + .def("isolate", [](const WorkerHandle& w, bool flag) -> bool { + return w.master.lock()->isolate(w, flag); + }) + .def("get_fsm_content", [](const WorkerHandle& w) -> std::optional { + return w.master.lock()->get_fsm_content(w); + }) + .def("get_leader", [](const WorkerHandle& w) -> std::shared_ptr { + return w.master.lock()->get_leader(w); + }) + .def("shutdown", [](WorkerHandle& w) -> bool { + return w.master.lock()->shutdown(w); + }) + .def("kill", [](WorkerHandle& w) -> bool { + return w.master.lock()->kill(w); + }); + + py::class_>(m, "Cluster") + .def("__repr__", [](const MasterContext& master) { + return master.cluster->to_string(); + }) + .def("__len__", [](const MasterContext& master) { + return master.cluster->workers.size(); + }) + .def("__getitem__", [](const MasterContext& master, raft_id id) -> std::shared_ptr { + if(master.cluster->workers.count(id) != 0) + return master.cluster->workers[id]; + else + return nullptr; + }) + .def("spawn", [](MasterContext& master, raft_id id) -> std::shared_ptr { + return master.spawnWorker(id); + }, "id"_a=0); +} + +// ---------------------------------------------------------------------------- +// Main function +// ---------------------------------------------------------------------------- + +int main(int argc, char** argv) { + Options options; + parseCommandLine(argc, argv, options); + + MasterOptions masterOptions; + masterOptions.protocol = options.protocol; + masterOptions.logLevel = options.masterLogLevel; + + WorkerOptions workerOptions; + workerOptions.protocol = options.protocol; + workerOptions.logLevel = options.workerLogLevel; + workerOptions.logPath = options.logPath; + workerOptions.logType = options.logType; + workerOptions.tracePath = options.tracePath; + workerOptions.heartbeatTimeoutMs = options.heartbeatTimeoutMs; + + auto master = std::make_shared(masterOptions, workerOptions); + + py::scoped_interpreter guard{}; + + py::module_ mod = py::module_::import("raft"); + mod.attr("cluster") = master; + + py::dict locals; + py::dict globals = py::globals(); + + auto masterPID = getpid(); + decltype(masterPID) currentPID; + + int ret = 0; + + // initial cluster setup + for(size_t i = 0; i < options.initClusterSize; ++i) { + std::shared_ptr w; + try { + w = master->spawnWorker(); + } catch(const MasterContext::CatchMeIfYouCan&) { + // prevent child from running master code when it leave worker code + return 0; + } + if(!w) { + auto mid = master->client->engine.get_margo_instance(); + margo_critical(mid, "Could not spawn initial worker %lu", i+1); + ret = 1; + goto cleanup; + } + } + + master->bootstrap(); + master->start(); + master->waitForLeader(); + + if(options.pythonFile.empty()) { + // interactive mode + char* l; + while((l = readline(">> ")) != nullptr) { + if(*l) add_history(l); + std::string line{l}; + while(!line.empty() && !::isalnum(line.front())) line = line.substr(1); + if(line.rfind("exit", 0) == 0) break; + try { + py::exec(line, globals, locals); + } catch(const py::error_already_set& e) { + currentPID = getpid(); + if(currentPID != masterPID) return 0; + auto mid = master->client->engine.get_margo_instance(); + margo_error(mid, "[master] python error: %s", e.what()); + } + free(l); + } + } else { + // file mode + std::ifstream file(options.pythonFile); + if(!file.good()) { + auto mid = master->client->engine.get_margo_instance(); + margo_critical(mid, "[master] could not find file %s", options.pythonFile.c_str()); + } + std::string content{ + (std::istreambuf_iterator(file)), + (std::istreambuf_iterator())}; + file.close(); + try { + py::exec(content.c_str(), globals, locals); + } catch(const py::error_already_set& e) { + currentPID = getpid(); + if(currentPID != masterPID) return 0; + auto mid = master->client->engine.get_margo_instance(); + margo_error(mid, "[master] python error: %s", e.what()); + ret = 1; + } + } + +cleanup: + for(const auto& [raftID, worker] : *master->cluster) { + auto addr = master->client->engine.lookup(worker->address); + master->client->engine.shutdown_remote_engine(addr); + auto mid = master->client->engine.get_margo_instance(); + margo_trace(mid, + "[master] waiting for worker with pID=%u and raftID=%lu to terminate...", + worker->pID, worker->raftID); + waitpid(worker->pID, nullptr, 0); + } + return ret; +} + +// ---------------------------------------------------------------------------- +// Command line argument parsing +// ---------------------------------------------------------------------------- + +static void parseCommandLine(int argc, char** argv, Options& options) { + try { + TCLAP::CmdLine cmd("Mochi-Raft test framework", ' ', "0.1.0"); + + TCLAP::UnlabeledValueArg protocol( + "protocol", "Protocol (e.g. ofi+tcp)", true, "na+sm", "protocol"); + TCLAP::ValueArg masterLogLevel( + "v", "verbosity", + "Log level for the master (trace, debug, info, warning, error, critical, off)", + false, "info", "level"); + TCLAP::ValueArg workerLogLevel( + "w", "worker-verbosity", + "Log level for the worker (defaults to that of the master)", + false, "", "level"); + TCLAP::ValueArg pythonFile( + "f", "python-file", "Python file for the master to execute", false, + "", "filename"); + TCLAP::ValueArg clusterSize( + "n", "cluster-size", "Initial number of processes the cluster contains", + false, 1, "size"); + TCLAP::ValueArg logPath( + "p", "log-path", "Path where the logs should be stored", false, + ".", "path"); + TCLAP::ValueArg tracePath( + "t", "trace-path", "Path where the worker traces should be stored", false, + "", "path"); + TCLAP::ValueArg logType( + "l", "log-type", "Type of log to use (\"abt-io\" or \"memory\")", false, + "abt-io", "type"); + TCLAP::ValueArg heartbeatTimeoutMs( + "m", "heartbeat-period", "Heartbeat period in milliseconds", + false, 100, "milliseconds"); + + cmd.add(protocol); + cmd.add(masterLogLevel); + cmd.add(workerLogLevel); + cmd.add(pythonFile); + cmd.add(clusterSize); + cmd.add(logPath); + cmd.add(logType); + cmd.add(tracePath); + cmd.add(heartbeatTimeoutMs); + cmd.parse(argc, argv); + + static std::unordered_map logLevelMap = { + {"trace", tl::logger::level::trace}, + {"debug", tl::logger::level::debug}, + {"info", tl::logger::level::info}, + {"warning", tl::logger::level::warning}, + {"error", tl::logger::level::error}, + {"critical", tl::logger::level::critical}, + {"off", tl::logger::level::critical} + }; + + options.protocol = protocol.getValue(); + options.pythonFile = pythonFile.getValue(); + options.initClusterSize = clusterSize.getValue(); + options.logPath = logPath.getValue(); + options.logType = logType.getValue(); + options.tracePath = tracePath.getValue(); + options.heartbeatTimeoutMs = heartbeatTimeoutMs.getValue(); + if(logLevelMap.count(masterLogLevel.getValue())) + options.masterLogLevel = logLevelMap[masterLogLevel.getValue()]; + else + options.masterLogLevel = tl::logger::level::info; + if(logLevelMap.count(workerLogLevel.getValue())) + options.workerLogLevel = logLevelMap[workerLogLevel.getValue()]; + else + options.workerLogLevel = options.masterLogLevel; + + if(!options.pythonFile.empty() && !fs::is_regular_file(options.pythonFile)) { + std::cerr << "error: " << options.pythonFile << " does not exist or is not a file" << std::endl; + exit(-1); + } + if(!options.pythonFile.empty() && !fs::is_directory(options.logPath)) { + std::cerr << "error: " << options.logPath << " does not exist or not a directory" << std::endl; + exit(-1); + } + if(options.initClusterSize == 0) { + std::cerr << "error: cannot start with a cluster of size 0" << std::endl; + exit(-1); + } + if(options.logType != "abt-io" && options.logType != "memory") { + std::cerr << "error: invalid log type (should be \"abt-io\" or \"memory\")" << std::endl; + exit(-1); + } + + } catch (TCLAP::ArgException& e) { + std::cerr << "error: " << e.error() << " for arg " << e.argId() + << std::endl; + exit(-1); + } +} diff --git a/tests/run-test.sh b/tests/run-test.sh new file mode 100755 index 0000000..2829660 --- /dev/null +++ b/tests/run-test.sh @@ -0,0 +1,25 @@ +#!/bin/bash + +SCENARIO=$1 +BACKEND=$2 + +SCENARIO_FILE=$(basename "$SCENARIO") +SCENARIO_NAME="${SCENARIO_FILE%.*}-${BACKEND}" + +STORAGE=`mktemp -d storage-${SCENARIO_NAME}-XXXXXXXX` + +timeout 60 ./mraft-py-test na+sm \ + -n 3 \ + -f $SCENARIO \ + -p $STORAGE \ + -l $BACKEND \ + -w trace \ + -t $STORAGE +RET=$? + +if [ "$RET" -eq "0" ]; then + echo $RET +# rm -rf $STORAGE +fi + +exit $RET diff --git a/tests/scenarios/01-apply.py b/tests/scenarios/01-apply.py new file mode 100644 index 0000000..e423106 --- /dev/null +++ b/tests/scenarios/01-apply.py @@ -0,0 +1,19 @@ +# This test applies 3 entries to the state machine, +# performs a barrier, then checks that the content +# of the state machine is as expected in all workers. +from raft import cluster +import time + +expected = "" +for i in range(0, 3): + entry = f"entry_{i}" + cluster[i+1].apply(entry) + expected = expected + entry + +cluster[1].barrier() + +time.sleep(1) + +for i in range(0,3): + content = cluster[i+1].get_fsm_content() + assert content == expected, f"in worker {i+1} content: {content} differs from {expected}" diff --git a/tests/scenarios/01-single-server.txt b/tests/scenarios/01-single-server.txt deleted file mode 100644 index 95f1e99..0000000 --- a/tests/scenarios/01-single-server.txt +++ /dev/null @@ -1,14 +0,0 @@ -# This scenario adds a single server then applies -# a few entries. -add 1 -sleep 2000 -apply HELLO_1_ -apply HELLO_2_ -apply HELLO_3_ -apply HELLO_4_ -apply HELLO_5_ -sleep 2000 -apply HELLO_6_ -apply HELLO_7_ -apply HELLO_8_ -sleep 2000 diff --git a/tests/scenarios/02-add.py b/tests/scenarios/02-add.py new file mode 100644 index 0000000..0e3bdee --- /dev/null +++ b/tests/scenarios/02-add.py @@ -0,0 +1,49 @@ +# This test applies 3 entries to the state machine, +# performs a barrier, adds a new process, adds more +# entries, then check that the content of the state +# machine is as expected in all workers. +import raft +from raft import cluster +import time + +print("[python] Applying 3 commands") +expected = "" +for i in range(0, 3): + entry = f"entry_{i}" + cluster[i+1].apply(entry) + expected = expected + entry + +print("[python] Issuing barrier") +cluster[1].barrier() + +print("[python] Creating new worker") +new_worker = cluster.spawn() +assert new_worker.raft_id == 4, "New worker's raft ID should be 4" + +print("[python] Starting new worker") +new_worker.start() + +print("[python] Adding the new worker to the cluster") +cluster[1].add(new_worker) + +print("[python] Assigning the new worker as voter") +cluster[1].assign(new_worker, raft.VOTER) + +print("[python] Applying 4 more commands") +for i in range(0, 4): + entry = f"entry_{i+3}" + cluster[i+1].apply(entry) + expected = expected + entry + +print("[python] Issuing barrier") +cluster[1].barrier() + +print("[python] Checking the content of the state machines") +print(f"[python] Expected content: {expected}") + +time.sleep(1) + +for i in range(0, 4): + content = cluster[i+1].get_fsm_content() + print(f"[python] Content in worker {i+1} is {content}") + assert content == expected, f"in worker {i+1} content: {content} differs from {expected}" diff --git a/tests/scenarios/03-shutdown.py b/tests/scenarios/03-shutdown.py new file mode 100644 index 0000000..2676228 --- /dev/null +++ b/tests/scenarios/03-shutdown.py @@ -0,0 +1,60 @@ +# This test applies 3 entries to the state machine, +# performs a barrier, adds a new process, adds more +# entries, removes the added process, adds more entries, +# then check that the content of the state machine is +# as expected in all remaining workers. +import raft +from raft import cluster +import time + +print("[python] Applying 3 commands") +expected = "" +for i in range(0, 3): + entry = f"entry_{i}" + cluster[i+1].apply(entry) + expected = expected + entry + +print("[python] Issuing barrier") +cluster[1].barrier() + +print("[python] Creating new worker") +new_worker = cluster.spawn() +assert new_worker.raft_id == 4, "New worker's raft ID should be 4" + +print("[python] Starting new worker") +new_worker.start() + +print("[python] Adding the new worker to the cluster") +cluster[1].add(new_worker) + +print("[python] Assigning the new worker as voter") +cluster[1].assign(new_worker, raft.VOTER) + +print("[python] Applying 4 more commands") +for i in range(0, 4): + entry = f"entry_{i+3}" + cluster[i+1].apply(entry) + expected = expected + entry + +print("[python] Issuing barrier") +cluster[1].barrier() + +print("[python] Shutting down worker") +new_worker.shutdown() +assert len(cluster) == 3, "Cluster should have 3 workers" + +print("[python] Applying 3 commands") +for i in range(0, 3): + entry = f"entry_{i+7}" + cluster[i+1].apply(entry) + expected = expected + entry + +print("[python] Issuing barrier") +cluster[1].barrier() + +print("[python] Checking the content of the state machines") +print(f"[python] Expected content: {expected}") +for i in range(0, 3): + content = cluster[i+1].get_fsm_content() + print(f"[python] Content in worker {i+1} is {content}") + assert content == expected, f"in worker {i+1} content: {content} differs from {expected}" diff --git a/tests/scenarios/04-kill.py b/tests/scenarios/04-kill.py new file mode 100644 index 0000000..82be7af --- /dev/null +++ b/tests/scenarios/04-kill.py @@ -0,0 +1,56 @@ +# same as 03-shutdown.py but kills the added worker +import raft +from raft import cluster +import time + +print("[python] Applying 3 commands") +expected = "" +for i in range(0, 3): + entry = f"entry_{i}" + cluster[i+1].apply(entry) + expected = expected + entry + +print("[python] Issuing barrier") +cluster[1].barrier() + +print("[python] Creating new worker") +new_worker = cluster.spawn() +assert new_worker.raft_id == 4, "New worker's raft ID should be 4" + +print("[python] Starting new worker") +new_worker.start() + +print("[python] Adding the new worker to the cluster") +cluster[1].add(new_worker) + +print("[python] Assigning the new worker as voter") +cluster[1].assign(new_worker, raft.VOTER) + +print("[python] Applying 4 more commands") +for i in range(0, 4): + entry = f"entry_{i+3}" + cluster[i+1].apply(entry) + expected = expected + entry + +print("[python] Issuing barrier") +cluster[1].barrier() + +print("[python] Shutting down worker") +new_worker.kill() +assert len(cluster) == 3, "Cluster should have 3 workers" + +print("[python] Applying 3 commands") +for i in range(0, 3): + entry = f"entry_{i+7}" + cluster[i+1].apply(entry) + expected = expected + entry + +print("[python] Issuing barrier") +cluster[1].barrier() + +print("[python] Checking the content of the state machines") +print(f"[python] Expected content: {expected}") +for i in range(0, 3): + content = cluster[i+1].get_fsm_content() + print(f"[python] Content in worker {i+1} is {content}") + assert content == expected, f"in worker {i+1} content: {content} differs from {expected}" diff --git a/tests/scenarios/05-switch-role.py b/tests/scenarios/05-switch-role.py new file mode 100644 index 0000000..c0f0882 --- /dev/null +++ b/tests/scenarios/05-switch-role.py @@ -0,0 +1,47 @@ +# This test applies 3 entries to the state machine, +# performs a barrier, then changes the role of a worker +# to "spare", applies 3 more entries, and changes the +# role of the worker back to "voter", waits a bit, +# then check that the worker has the right content. + +import raft +from raft import cluster +import time + +print("[python] Make sure worker 1 is the leader") +cluster[1].transfer(cluster[1]) +while (cluster[1].get_leader().raft_id != 1): + time.sleep(1) + +print("[python] Applying 3 commands") +expected = "" +for i in range(0, 3): + entry = f"entry_{i}" + cluster[i+1].apply(entry) + expected = expected + entry + +print("[python] Barrier") +cluster[1].barrier() + +print("[python] Changing worker 3 into a spare") +cluster[1].assign(cluster[3], raft.SPARE) + +print("[python] Applying 2 more commands") +for i in range(0, 2): + entry = f"entry_{i+3}" + cluster[i+1].apply(entry) + expected = expected + entry + +print("[python] Barrier") +cluster[1].barrier() + +print("[python] Changing worker 3 into a voter") +cluster[1].assign(cluster[3], raft.VOTER) + +print("[python] Sleeping for 2 seconds") +time.sleep(2) + +print("[python] Checking state machine in worker 3") +content = cluster[3].get_fsm_content() +assert content == expected, f"content in worker 3 ({content}) should be {expected}" + diff --git a/tests/scenarios/06-remove-add.py b/tests/scenarios/06-remove-add.py new file mode 100644 index 0000000..2330899 --- /dev/null +++ b/tests/scenarios/06-remove-add.py @@ -0,0 +1,48 @@ +# This test applies 3 entries to the state machine, +# performs a barrier, then removes worker 3, applies 2 +# more entries, and adds back worker 3, waits a bit, +# then check that the worker has the right content. + +import raft +from raft import cluster +import time + +print("[python] Make sure worker 1 is the leader") +cluster[1].transfer(cluster[1]) +while (cluster[1].get_leader().raft_id != 1): + time.sleep(1) + +print("[python] Applying 3 commands") +expected = "" +for i in range(0, 3): + entry = f"entry_{i}" + cluster[i+1].apply(entry) + expected = expected + entry + +print("[python] Barrier") +cluster[1].barrier() + +print("[python] Removing worker 3") +cluster[1].remove(cluster[3]) + +print("[python] Applying 2 more commands") +for i in range(0, 2): + entry = f"entry_{i+3}" + cluster[i+1].apply(entry) + expected = expected + entry + +print("[python] Barrier") +cluster[1].barrier() + +print("[python] Adding worker 3 back") +cluster[1].add(cluster[3]) + +print("[python] Making 3 a voter") +cluster[1].assign(cluster[3], raft.VOTER) + +print("[python] Sleeping for 2 seconds") +time.sleep(2) + +print("[python] Checking state machine in worker 3") +content = cluster[3].get_fsm_content() +assert content == expected, f"content in worker 3 ({content}) should be {expected}" diff --git a/tests/scenarios/07-kill-respawn.py b/tests/scenarios/07-kill-respawn.py new file mode 100644 index 0000000..38a4f1f --- /dev/null +++ b/tests/scenarios/07-kill-respawn.py @@ -0,0 +1,67 @@ +# This test applies 3 entries to the state machine, +# performs a barrier, then removes worker 3, applies 2 +# more entries, and adds back worker 3, waits a bit, +# then check that the worker has the right content. + +import raft +from raft import cluster +import time + +print("[python] Make sure worker 1 is the leader") +cluster[1].transfer(cluster[1]) +while (cluster[1].get_leader().raft_id != 1): + time.sleep(1) + +print("[python] Applying 3 commands") +expected = "" +for i in range(0, 3): + entry = f"entry_{i}" + cluster[i+1].apply(entry) + expected = expected + entry + +print("[python] Barrier") +cluster[1].barrier() + +print("[python] Kill worker 3") +cluster[3].kill() + +print("[python] Applying 2 more commands") +for i in range(0, 2): + entry = f"entry_{i+3}" + cluster[i+1].apply(entry) + expected = expected + entry + +print("[python] Barrier") +cluster[1].barrier() + +print("[python] Officially removing worker 3") +cluster[1].remove(3) + +print("[python] Applying 2 more commands") +for i in range(0, 2): + entry = f"entry_{i+5}" + cluster[i+1].apply(entry) + expected = expected + entry + +print("[python] Barrier") +cluster[1].barrier() + +print("[python] Creating new worker with id 3") +new_worker3 = cluster.spawn(3) +assert new_worker3.raft_id == 3, "New worker's raft ID should be 3" + +print("[python] Starting new worker") +new_worker3.start() + +print("[python] Adding the new worker 3 to the cluster") +cluster[1].add(new_worker3) + +print("[python] Assigning the new worker 3 as voter") +cluster[1].assign(new_worker3, raft.VOTER) + +print("[python] Sleeping for 2 seconds") +time.sleep(2) + +print("[python] Checking state machine in worker 3") +content = cluster[3].get_fsm_content() +assert content == expected, f"content in worker 3 ({content}) should be {expected}" diff --git a/tests/scenarios/08-apply-many.py b/tests/scenarios/08-apply-many.py new file mode 100644 index 0000000..2c0ed48 --- /dev/null +++ b/tests/scenarios/08-apply-many.py @@ -0,0 +1,43 @@ +# This test applies 10 entries to the state machine, +# performs a barrier, then checks the content of the +# state machine. This test is meant to force the abt-io +# log backend to create a second entry file (entry files +# are limited to 1MB by default). +from raft import cluster +import time + + +def generate_random_string(n): + import random + letters = list('0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ') + random_string = ''.join(random.choice(letters) for _ in range(n)) + return random_string + +expected = "" +for i in range(0, 200): + s = 10*1024 # 10 KB entry + print(f"[python] applying an entry of size {s} via worker {(i%3)+1}") + entry = generate_random_string(s) + while not cluster[(i % 3) + 1].apply(entry): + time.sleep(1) + expected = expected + entry + time.sleep(0.2) + +for i in range(0, 3): + for j in range(0, 10): + print(f"[python] executing barrier in worker {i+1} (attempt {j})") + if cluster[i+1].barrier(): + break + time.sleep(1) + +print(f"[python] sleeping for 5 seconds") +time.sleep(30) + +for i in range(0,3): + content = cluster[i+1].get_fsm_content() + if content is None: + print(content) + else: + print(content[0:10]) + index = next((j for j, (char1, char2) in enumerate(zip(content, expected)) if char1 != char2), None) + assert content == expected, f"in worker {i+1} content (len={len(content)} differs from expected (len={len(expected)}) at character {index}" diff --git a/tests/spack.yaml b/tests/spack.yaml index 7eb2993..5b7dc9d 100644 --- a/tests/spack.yaml +++ b/tests/spack.yaml @@ -8,13 +8,17 @@ spack: - json-c - mochi-ssg+mpi ^mpich - mochi-thallium + - tclap + - readline + - py-pybind11 modules: prefix_inspections: lib: [LD_LIBRARY_PATH] lib64: [LD_LIBRARY_PATH] + mirrors: + mochi-buildcache: + url: oci://ghcr.io/mochi-hpc/mochi-spack-buildcache + signed: false config: - source_cache: ~/.spack-ci/source_cache - misc_cache: ~/.spack-ci/misc_cache - test_cache: ~/.spack-ci/test_cache install_tree: - root: ~/.spack-ci/install + padded_length: 128 diff --git a/tests/test-framework.cpp b/tests/test-framework.cpp new file mode 100644 index 0000000..760964e --- /dev/null +++ b/tests/test-framework.cpp @@ -0,0 +1,806 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#define SOL_ALL_SAFETIES_ON 1 +#include +#include + +namespace fs = std::filesystem; +namespace tl = thallium; + +template +using sptr = std::shared_ptr; + +struct Options { + std::string protocol; + tl::logger::level masterLogLevel; + tl::logger::level workerLogLevel; + std::string luaFile; + size_t initClusterSize; + std::string logPath; + std::string logType; + std::string tracePath; +}; + +static void parseCommandLine(int argc, char** argv, Options& options); +static int runMaster(const Options& options); + +// ---------------------------------------------------------------------------- +// Main function +// ---------------------------------------------------------------------------- + +int main(int argc, char** argv) { + Options options; + parseCommandLine(argc, argv, options); + return runMaster(options); +} + +// ---------------------------------------------------------------------------- +// Worker structure +// ---------------------------------------------------------------------------- + +struct FSM : public mraft::StateMachine { + + FSM(raft_id id) + : id(id) {} + + void apply(const struct raft_buffer* buf, void** result) override { + std::unique_lock lock{content_mtx}; + content.append(std::string_view{(const char*)buf->base, buf->len}); + } + + void snapshot(struct raft_buffer* bufs[], unsigned* n_bufs) override { + std::unique_lock lock{content_mtx}; + *bufs = static_cast(raft_malloc(sizeof(**bufs))); + *n_bufs = 1; + (*bufs)[0].base = strdup(content.c_str()); + (*bufs)[0].len = content.size(); + } + + void restore(struct raft_buffer* buf) override { + std::unique_lock lock{content_mtx}; + content.assign(std::string_view{(const char*)buf->base, buf->len}); + } + + raft_id id; + std::string content; + mutable tl::mutex content_mtx; + +}; + +struct Worker : public tl::provider { + + Worker(tl::engine engine, raft_id raftID, std::unique_ptr theLog) + : tl::provider(engine, 0) + , fsm{raftID} + , log{std::move(theLog)} + , raft{engine.get_margo_instance(), raftID, fsm, *log} + , id{raftID} { + #define DEFINE_WORKER_RPC(__rpc__) define("mraft_test_" #__rpc__, &Worker::__rpc__) + DEFINE_WORKER_RPC(bootstrap); + DEFINE_WORKER_RPC(start); + DEFINE_WORKER_RPC(add); + DEFINE_WORKER_RPC(assign); + DEFINE_WORKER_RPC(remove); + DEFINE_WORKER_RPC(apply); + DEFINE_WORKER_RPC(get_leader); + DEFINE_WORKER_RPC(transfer); + DEFINE_WORKER_RPC(suspend); + DEFINE_WORKER_RPC(barrier); + DEFINE_WORKER_RPC(isolate); + DEFINE_WORKER_RPC(get_fsm_content); + #undef DEFINE_WORKER_RPC + raft.enable_tracer(true); + raft.set_tracer( + [mid=engine.get_margo_instance(), raftID](const char* file, int line, const char* msg) { + margo_trace(mid, "[worker:%d] [%s:%d] %s", (unsigned)raftID, file, line, msg); + }); + } + + #define WRAP_CALL(func, ...) \ + do { \ + try { \ + raft.func(__VA_ARGS__); \ + } catch (const mraft::RaftException& ex) { \ + margo_error(get_engine().get_margo_instance(),\ + "[worker:%lu] %s failed: %s", \ + id, #func, raft_strerror(ex.code())); \ + return ex.code(); \ + } \ + return 0; \ + } while (0) + + int bootstrap(const std::vector& servers) { + WRAP_CALL(bootstrap, servers); + } + + int start() { + WRAP_CALL(start); + } + + int add(const raft_id& raftId, const std::string& addr) { + WRAP_CALL(add, raftId, addr.c_str()); + } + + int assign(const raft_id& raftId, const mraft::Role& role) { + WRAP_CALL(assign, raftId, role); + } + + int remove(const raft_id& raftId) { + WRAP_CALL(remove, raftId); + } + + int apply(const std::string& buffer) { + struct raft_buffer bufs = { + .base = (void*)buffer.c_str(), + .len = buffer.size() + }; + WRAP_CALL(apply, &bufs, 1U); + } + + int barrier() { + WRAP_CALL(barrier); + } + + int isolate(bool flag) { + auto raft_io = raft.get_raft_io(); + return mraft_io_simulate_dead(raft_io, flag); + } + + mraft::ServerInfo get_leader() const { + mraft::ServerInfo leaderInfo{0, ""}; + try { + leaderInfo = raft.get_leader(); + } catch (mraft::RaftException& ex) {} + return leaderInfo; + } + + int transfer(raft_id transferToId) { + WRAP_CALL(transfer, transferToId); + } + + void suspend(const tl::request& req, unsigned msec) const { + req.respond(0); + usleep(msec*1000); + } + + std::string get_fsm_content() const { + std::unique_lock lock{fsm.content_mtx}; + return fsm.content; + } + + #undef WRAP_CALL + + raft_id id; + FSM fsm; + std::unique_ptr log; + mraft::Raft raft; +}; + +// ---------------------------------------------------------------------------- +// Master functions and structure definitions +// ---------------------------------------------------------------------------- + +static inline auto& operator<<(std::ostream& stream, const mraft::Role& role) { + switch(role) { + case mraft::Role::SPARE: + stream << "SPARE"; break; + case mraft::Role::VOTER: + stream << "VOTER"; break; + case mraft::Role::STANDBY: + stream << "STANDBY"; break; + default: + stream << "???"; + } + return stream; +} + +struct WorkerHandle { + + raft_id raftID = 0; + pid_t pID = -1; + tl::provider_handle address; + mraft::Role knownRole = mraft::Role::SPARE; + bool knownRunning = false; + + WorkerHandle() = default; + + WorkerHandle(raft_id id, pid_t pid, tl::provider_handle addr) + : raftID(id) + , pID(pid) + , address(addr) {} + + auto to_string() const { + std::stringstream stream; + auto addr = (pID == -1) ? std::string{"?"} : static_cast(address); + stream << "WorkerHandle" + << " {raftID=" << raftID + << ", pID=" << pID + << ", address=" << addr + << ", role=" << knownRole + << ", started=" << std::boolalpha << knownRunning + << "}"; + return stream.str(); + } + + bool operator==(const WorkerHandle& other) const { + return raftID == other.raftID + && pID == other.pID + && address == other.address; + } +}; + +struct Cluster { + + std::map> workers; + tl::engine engine; + tl::remote_procedure rpc_bootstrap; + tl::remote_procedure rpc_start; + tl::remote_procedure rpc_apply; + tl::remote_procedure rpc_get_leader; + tl::remote_procedure rpc_get_fsm_content; + tl::remote_procedure rpc_barrier; + tl::remote_procedure rpc_assign; + tl::remote_procedure rpc_transfer; + tl::remote_procedure rpc_add; + tl::remote_procedure rpc_remove; + tl::remote_procedure rpc_suspend; + tl::remote_procedure rpc_isolate; + + Cluster(tl::engine engine) + : engine(engine) + , rpc_bootstrap(engine.define("mraft_test_bootstrap")) + , rpc_start(engine.define("mraft_test_start")) + , rpc_apply(engine.define("mraft_test_apply")) + , rpc_get_leader(engine.define("mraft_test_get_leader")) + , rpc_get_fsm_content(engine.define("mraft_test_get_fsm_content")) + , rpc_barrier(engine.define("mraft_test_barrier")) + , rpc_assign(engine.define("mraft_test_assign")) + , rpc_transfer(engine.define("mraft_test_transfer")) + , rpc_add(engine.define("mraft_test_add")) + , rpc_remove(engine.define("mraft_test_remove")) + , rpc_suspend(engine.define("mraft_test_suspend")) + , rpc_isolate(engine.define("mraft_test_isolate")) + {} + + size_t size() const { + return workers.size(); + } + + auto to_string() const { + std::stringstream stream; + for(const auto& [raftID, worker] : workers) { + stream << "- " << worker->to_string() << "\n"; + } + auto result = stream.str(); + if(!result.empty()) result.resize(result.size()-1); + return result; + } + + void bootstrap() { + std::vector servers; + servers.reserve(workers.size()); + for(const auto& [id, worker] : workers) + servers.push_back({id, worker->address, mraft::Role::VOTER}); + std::vector responses; + responses.reserve(workers.size()); + for(const auto& [id, worker] : workers) + responses.push_back(rpc_bootstrap.on(worker->address).async(servers)); + for(auto& response : responses) + response.wait(); + for(auto& [id, worker] : workers) + worker->knownRole = mraft::Role::VOTER; + } + + void start() { + std::vector responses; + responses.reserve(workers.size()); + for(auto& [id, worker] : workers) { + if(worker->knownRunning) continue; + responses.push_back(rpc_start.on(worker->address).async()); + } + for(auto& response : responses) + response.wait(); + for(auto& [id, worker] : workers) + worker->knownRunning = true; + } + + void waitForLeader() { + for(const auto& [id, worker] : workers) { + mraft::ServerInfo info; + while(true) { + info = rpc_get_leader.on(worker->address)(); + if(info.id != 0) break; + tl::thread::sleep(engine, 500); + } + } + } +}; + +struct MasterContext { + tl::engine engine; + sol::state lua; + std::shared_ptr cluster; + raft_id nextRaftID = 1; +}; + +static sptr spawnWorker(MasterContext& master, const Options& options) { + int pipefd[2]; + if (pipe(pipefd) == -1) { + perror("Pipe creation failed"); + return nullptr; + } + + auto raftID = master.nextRaftID++; + + pid_t pid = fork(); + if (pid < 0) { + perror("Fork failed"); + master.nextRaftID--; + return nullptr; + + } else if (pid == 0) { + // Child process + close(pipefd[0]); + master.engine.finalize(); + + if(!options.tracePath.empty()) { + auto stdoutFilename = options.tracePath + "/" + std::to_string(raftID) + ".out"; + auto stderrFilename = options.tracePath + "/" + std::to_string(raftID) + ".err"; + FILE* _ = freopen(stdoutFilename.c_str(), "a+", stdout); + _ = freopen(stderrFilename.c_str(), "a+", stderr); + } + + auto engine = tl::engine(options.protocol, MARGO_SERVER_MODE); + engine.set_log_level(options.workerLogLevel); + engine.enable_remote_shutdown(); + + std::unique_ptr log; + if(options.logType == "abt-io") { + auto config = std::string{"{\"path\":\""} + options.logPath + "\"}"; + log = std::make_unique( + raftID, ABT_IO_INSTANCE_NULL, config.c_str(), engine.get_margo_instance()); + } else { + if(options.logPath != ".") + margo_warning(engine.get_margo_instance(), + "--log-path set to \"%s\" but will be ignored with the memory log", + options.logPath.c_str()); + log = std::make_unique(); + } + + auto worker = new Worker{engine, raftID, std::move(log)}; + engine.push_finalize_callback([worker](){ delete worker; }); + + auto address = static_cast(engine.self()); + address.resize(1024); + ssize_t _ = write(pipefd[1], address.data(), 1024); + close(pipefd[1]); + + engine.wait_for_finalize(); + std::exit(0); + + } else { + // Parent process + close(pipefd[1]); + char address[1024]; + ssize_t bytes_read = read(pipefd[0], address, 1024); + close(pipefd[0]); + master.cluster->workers[raftID] = std::make_shared( + raftID, pid, tl::provider_handle{master.engine.lookup(address), 0}); + return master.cluster->workers[raftID]; + } +} + +static int runMaster(const Options& options) { + + int ret = 0; + MasterContext master; + master.engine = tl::engine{options.protocol, THALLIUM_CLIENT_MODE}; + master.engine.set_log_level(options.masterLogLevel); + master.lua.open_libraries(sol::lib::base, sol::lib::string, sol::lib::math); + master.cluster = std::make_shared(master.engine); + + master.lua.new_enum("Role", + "STANDBY", mraft::Role::STANDBY, + "VOTER", mraft::Role::VOTER, + "SPARE", mraft::Role::SPARE); + + master.lua.new_usertype("Cluster", + sol::meta_function::index, [](Cluster& cluster, raft_id id) -> std::shared_ptr { + if(cluster.workers.count(id) == 0) return nullptr; + return cluster.workers[id]; + }, + "spawn", [&master, &options]() mutable { + return spawnWorker(master, options); + } + ); + + master.lua["cluster"] = master.cluster; + + using namespace std::literals::chrono_literals; + + master.lua.new_usertype("Worker", + "address", sol::property([](const WorkerHandle& w) { return static_cast(w.address); }), + "raft_id", sol::readonly(&WorkerHandle::raftID), + // apply command (asks the worker to apply a command) + "apply", [rpc=master.cluster->rpc_apply, mid=master.engine.get_margo_instance()] + (const WorkerHandle& w, const std::string& command) { + if(!w.knownRunning) { + margo_error(mid, "[master] worker is not running"); + return false; + } + try { + rpc.on(w.address).timed(1s, command); + return true; + } catch(tl::timeout& ex) { + margo_error(mid, "[master] apply timed out"); + } catch(const std::exception& ex) { + margo_error(mid, "[master] apply failed: %s", ex.what()); + } + return false; + }, + // assign (asks the worker to assign another worker the specified role) + "assign", [rpc=master.cluster->rpc_assign, mid=master.engine.get_margo_instance()] + (const WorkerHandle& w, WorkerHandle& target, mraft::Role role) { + if(!w.knownRunning) { + margo_error(mid, "[master] worker is not running"); + return false; + } + try { + rpc.on(w.address).timed(1s, target.raftID, role); + target.knownRole = role; + return true; + } catch(tl::timeout& ex) { + margo_error(mid, "[master] assign timed out"); + } catch(const std::exception& ex) { + margo_error(mid, "[master] assign failed: %s", ex.what()); + } + return false; + }, + // add (asks the worker to ask the leader to add the given process) + "add", [rpc=master.cluster->rpc_add, mid=master.engine.get_margo_instance()] + (const WorkerHandle& w, const WorkerHandle& toAdd) { + if(!w.knownRunning) { + margo_error(mid, "[master] worker is not running"); + return false; + } + try { + rpc.on(w.address).timed(1s, + toAdd.raftID, static_cast(toAdd.address)); + return true; + } catch(tl::timeout& ex) { + margo_error(mid, "[master] add timed out"); + } catch(const std::exception& ex) { + margo_error(mid, "[master] add failed: %s", ex.what()); + } + return false; + }, + // remove (asks the worker to ask the leader to remove the given process) + "remove", [rpc=master.cluster->rpc_remove, mid=master.engine.get_margo_instance()] + (const WorkerHandle& w, const WorkerHandle& toRemove) { + if(!w.knownRunning) { + margo_error(mid, "[master] worker is not running"); + return false; + } + try { + rpc.on(w.address).timed(1s, toRemove.raftID); + return true; + } catch(tl::timeout& ex) { + margo_error(mid, "[master] remove timed out"); + } catch(const std::exception& ex) { + margo_error(mid, "[master] remove failed: %s", ex.what()); + } + return false; + }, + // start (start the worker, if it is not started already) + "start", [&master, rpc=master.cluster->rpc_start, mid=master.engine.get_margo_instance()] + (WorkerHandle& w) { + if(w.knownRunning) { + margo_error(mid, "[master] worker is already running"); + return false; + } + try { + rpc.on(w.address).timed(1s); + w.knownRunning = true; + return true; + } catch(tl::timeout& ex) { + margo_error(mid, "[master] start timed out"); + } catch(const std::exception& ex) { + margo_error(mid, "[master] start failed: %s", ex.what()); + } + return false; + }, + // transfer (asks the worker to request leadership be transferred to another worker) + "transfer", [rpc=master.cluster->rpc_transfer, mid=master.engine.get_margo_instance()] + (const WorkerHandle& w, raft_id id) { + if(!w.knownRunning) { + margo_error(mid, "[master] worker is not running"); + return false; + } + try { + rpc.on(w.address).timed(1s, id); + return true; + } catch(tl::timeout& ex) { + margo_error(mid, "[master] transfer timed out"); + } catch(const std::exception& ex) { + margo_error(mid, "[master] transfer failed: %s", ex.what()); + } + return false; + }, + // barrier (wait for the commands to be committed) + "barrier", [rpc=master.cluster->rpc_barrier, mid=master.engine.get_margo_instance()] + (const WorkerHandle& w) -> bool { + if(!w.knownRunning) { + margo_error(mid, "[master] worker is not running"); + return false; + } + try { + rpc.on(w.address).timed(1s); + return true; + } catch(tl::timeout& ex) { + margo_error(mid, "[master] barrier timed out"); + } catch(const std::exception& ex) { + margo_error(mid, "[master] barrier failed: %s", ex.what()); + } + return false; + }, + // suspend (put the process to sleep for the target number of milliseconds) + "suspend", [rpc=master.cluster->rpc_suspend, mid=master.engine.get_margo_instance()] + (const WorkerHandle& w, unsigned msec) -> bool { + if(!w.knownRunning) { + margo_error(mid, "[master] worker is not running"); + return false; + } + try { + rpc.on(w.address).timed(1s, msec); + return true; + } catch(tl::timeout& ex) { + margo_error(mid, "[master] suspend timed out"); + } catch(const std::exception& ex) { + margo_error(mid, "[master] suspend failed: %s", ex.what()); + } + return false; + }, + // isolate (isolate the process by making it ignore all RPCs) + "isolate", [rpc=master.cluster->rpc_isolate, mid=master.engine.get_margo_instance()] + (const WorkerHandle& w, bool flag) -> bool { + if(!w.knownRunning) { + margo_error(mid, "[master] worker is not running"); + return false; + } + try { + rpc.on(w.address).timed(1s, flag); + return true; + } catch(tl::timeout& ex) { + margo_error(mid, "[master] isolate timed out"); + } catch(const std::exception& ex) { + margo_error(mid, "[master] isolate failed: %s", ex.what()); + } + return false; + }, + // get_fsm_content (asks the worker for the content of its FSM) + "get_fsm_content", [rpc=master.cluster->rpc_get_fsm_content, mid=master.engine.get_margo_instance()] + (const WorkerHandle& w) -> std::optional { + if(!w.knownRunning) { + margo_error(mid, "[master] worker is not running"); + return std::nullopt; + } + try { + return static_cast(rpc.on(w.address).timed(1s)); + } catch(tl::timeout& ex) { + margo_error(mid, "[master] get_fsm_content timed out"); + } catch(const std::exception& ex) { + margo_error(mid, "[master] get_fsm_content failed: %s", ex.what()); + } + return std::nullopt; + }, + // get_leader (asks the worker who the leader currently is) + "get_leader", [&master, rpc=master.cluster->rpc_get_leader, mid=master.engine.get_margo_instance()] + (const WorkerHandle& w) -> std::shared_ptr { + if(!w.knownRunning) { + margo_error(mid, "[master] worker is not running"); + return nullptr; + } + try { + mraft::ServerInfo info = rpc.on(w.address).timed(1s); + auto& workers = master.cluster->workers; + auto it = workers.find(info.id); + if(it == workers.end()) return nullptr; + return it->second; + } catch(tl::timeout& ex) { + margo_error(mid, "[master] get_leader timed out"); + } catch(const std::exception& ex) { + margo_error(mid, "[master] get_leader failed: %s", ex.what()); + } + return nullptr; + }, + // shutdown command (sends a remote shutdown RPC to the worker's margo instance + "shutdown", [&master](WorkerHandle& self) mutable { + static WorkerHandle none; + auto& workers = master.cluster->workers; + auto it = workers.find(self.raftID); + if(it == workers.end()) { + auto mid = master.engine.get_margo_instance(); + margo_error(mid, "[master] worker not found in the cluster"); + return; + } + master.engine.shutdown_remote_engine(it->second->address); + while(waitpid(self.pID, NULL, WNOHANG) != self.pID) { + usleep(100); + } + workers.erase(it); + self = none; + }, + // kill command (send SIGKILL to worker) + "kill", [&master](WorkerHandle& self) mutable { + static WorkerHandle none; + auto& workers = master.cluster->workers; + auto it = workers.find(self.raftID); + if(it == workers.end()) { + auto mid = master.engine.get_margo_instance(); + margo_error(mid, "[master] worker not found in the cluster"); + return; + } + kill(self.pID, SIGKILL); + waitpid(self.pID, NULL, 0); + workers.erase(it); + self = none; + } + ); + + master.lua.set_function("sleep", [](unsigned msec) { + usleep(msec*1000); + }); + + // initial cluster setup + for(size_t i = 0; i < options.initClusterSize; ++i) { + auto w = spawnWorker(master, options); + if(!w) { + margo_critical(master.engine.get_margo_instance(), + "Could not spawn initial worker %lu", i); + ret = 1; + goto cleanup; + } + } + master.cluster->bootstrap(); + master.cluster->start(); + master.cluster->waitForLeader(); + + // execution of lua code + if(options.luaFile.empty()) { + char* l; + while((l = readline(">> ")) != nullptr) { + if(*l) add_history(l); + std::string line{l}; + while(!line.empty() && !::isalnum(line.front())) line = line.substr(1); + if(line.rfind("exit", 0) == 0) break; + master.lua.script(line, [](lua_State*, sol::protected_function_result pfr) { + sol::error err = pfr; + std::cout << err.what() << std::endl; + return pfr; + }); + free(l); + } + } else { + try { + master.lua.script_file(options.luaFile); + } catch(const std::exception& ex) { + margo_critical(master.engine.get_margo_instance(), "%s", ex.what()); + ret = 1; + } + } + +cleanup: + for(const auto& [raftID, worker] : master.cluster->workers) { + master.engine.shutdown_remote_engine(worker->address); + waitpid(worker->pID, nullptr, 0); + } + + return ret; +} + +// ---------------------------------------------------------------------------- +// Command line argument parsing +// ---------------------------------------------------------------------------- + +static void parseCommandLine(int argc, char** argv, Options& options) { + try { + TCLAP::CmdLine cmd("Mochi-Raft test framework", ' ', "0.1.0"); + + TCLAP::UnlabeledValueArg protocol( + "protocol", "Protocol (e.g. ofi+tcp)", true, "na+sm", "protocol"); + TCLAP::ValueArg masterLogLevel( + "v", "verbosity", + "Log level for the master (trace, debug, info, warning, error, critical, off)", + false, "info", "level"); + TCLAP::ValueArg workerLogLevel( + "w", "worker-verbosity", + "Log level for the worker (defaults to that of the master)", + false, "", "level"); + TCLAP::ValueArg luaFile( + "f", "lua-file", "Lua file for the master to execute", false, + "", "filename"); + TCLAP::ValueArg clusterSize( + "n", "cluster-size", "Initial number of processes the cluster contains", + false, 1, "size"); + TCLAP::ValueArg logPath( + "p", "log-path", "Path where the logs should be stored", false, + ".", "path"); + TCLAP::ValueArg tracePath( + "t", "trace-path", "Path where the worker traces should be stored", false, + "", "path"); + TCLAP::ValueArg logType( + "l", "log-type", "Type of log to use (\"abt-io\" or \"memory\")", false, + "abt-io", "type"); + + cmd.add(protocol); + cmd.add(masterLogLevel); + cmd.add(workerLogLevel); + cmd.add(luaFile); + cmd.add(clusterSize); + cmd.add(logPath); + cmd.add(logType); + cmd.add(tracePath); + cmd.parse(argc, argv); + + static std::unordered_map logLevelMap = { + {"trace", tl::logger::level::trace}, + {"debug", tl::logger::level::debug}, + {"info", tl::logger::level::info}, + {"warning", tl::logger::level::warning}, + {"error", tl::logger::level::error}, + {"critical", tl::logger::level::critical}, + {"off", tl::logger::level::critical} + }; + + options.protocol = protocol.getValue(); + options.luaFile = luaFile.getValue(); + options.initClusterSize = clusterSize.getValue(); + options.logPath = logPath.getValue(); + options.logType = logType.getValue(); + options.tracePath = tracePath.getValue(); + if(logLevelMap.count(masterLogLevel.getValue())) + options.masterLogLevel = logLevelMap[masterLogLevel.getValue()]; + else + options.masterLogLevel = tl::logger::level::info; + if(logLevelMap.count(workerLogLevel.getValue())) + options.workerLogLevel = logLevelMap[workerLogLevel.getValue()]; + else + options.workerLogLevel = options.masterLogLevel; + + if(!options.luaFile.empty() && !fs::is_regular_file(options.luaFile)) { + std::cerr << "error: " << options.luaFile << " does not exist or is not a file" << std::endl; + exit(-1); + } + if(!options.luaFile.empty() && !fs::is_directory(options.logPath)) { + std::cerr << "error: " << options.logPath << " does not exist or not a directory" << std::endl; + exit(-1); + } + if(options.initClusterSize == 0) { + std::cerr << "error: cannot start with a cluster of size 0" << std::endl; + exit(-1); + } + if(options.logType != "abt-io" && options.logType != "memory") { + std::cerr << "error: invalid log type (should be \"abt-io\" or \"memory\")" << std::endl; + exit(-1); + } + + } catch (TCLAP::ArgException& e) { + std::cerr << "error: " << e.error() << " for arg " << e.argId() + << std::endl; + exit(-1); + } +} diff --git a/tests/worker.hpp b/tests/worker.hpp new file mode 100644 index 0000000..b69c2d5 --- /dev/null +++ b/tests/worker.hpp @@ -0,0 +1,196 @@ +#ifndef MRAFT_TEST_WORKER_HPP +#define MRAFT_TEST_WORKER_HPP + +#include +#include +#include +#include +#include +#include +#include + +namespace tl = thallium; + +struct WorkerOptions { + std::string protocol; + tl::logger::level logLevel; + std::string logPath; + std::string logType; + std::string tracePath; + size_t heartbeatTimeoutMs; +}; + +struct FSM : public mraft::StateMachine { + + void apply(const struct raft_buffer* buf, void** result) override { + std::unique_lock lock{content_mtx}; + content.append(std::string_view{(const char*)buf->base, buf->len}); + } + + void snapshot(struct raft_buffer* bufs[], unsigned* n_bufs) override { + std::unique_lock lock{content_mtx}; + *bufs = static_cast(raft_malloc(sizeof(**bufs))); + *n_bufs = 1; + (*bufs)[0].base = strdup(content.c_str()); + (*bufs)[0].len = content.size(); + } + + void restore(struct raft_buffer* buf) override { + std::unique_lock lock{content_mtx}; + content.assign(std::string_view{(const char*)buf->base, buf->len}); + } + + std::string content; + mutable tl::mutex content_mtx; + +}; + +struct Worker : public tl::provider { + + Worker(tl::engine engine, raft_id raftID, std::unique_ptr theLog, size_t hbTimeout = 100) + : tl::provider(engine, 0) + , fsm{} + , log{std::move(theLog)} + , raft{engine.get_margo_instance(), raftID, fsm, *log} + , id{raftID} { + raft.set_heartbeat_timeout(hbTimeout); + #define DEFINE_WORKER_RPC(__rpc__) define("mraft_test_" #__rpc__, &Worker::__rpc__) + DEFINE_WORKER_RPC(bootstrap); + DEFINE_WORKER_RPC(start); + DEFINE_WORKER_RPC(add); + DEFINE_WORKER_RPC(assign); + DEFINE_WORKER_RPC(remove); + DEFINE_WORKER_RPC(apply); + DEFINE_WORKER_RPC(get_leader); + DEFINE_WORKER_RPC(transfer); + DEFINE_WORKER_RPC(suspend); + DEFINE_WORKER_RPC(barrier); + DEFINE_WORKER_RPC(isolate); + DEFINE_WORKER_RPC(get_fsm_content); + #undef DEFINE_WORKER_RPC + raft.enable_tracer(true); + raft.set_tracer( + [mid=engine.get_margo_instance(), raftID](const char* file, int line, const char* msg) { + margo_trace(mid, "[worker:%d] [%s:%d] %s", (unsigned)raftID, file, line, msg); + }); + } + + #define WRAP_CALL(func, ...) \ + do { \ + try { \ + raft.func(__VA_ARGS__); \ + } catch (const mraft::RaftException& ex) { \ + margo_error(get_engine().get_margo_instance(),\ + "[worker:%lu] %s failed: %s", \ + id, #func, raft_strerror(ex.code())); \ + return ex.code(); \ + } \ + return 0; \ + } while (0) + + int bootstrap(const std::vector& servers) { + WRAP_CALL(bootstrap, servers); + } + + int start() { + WRAP_CALL(start); + } + + int add(const raft_id& raftId, const std::string& addr) { + WRAP_CALL(add, raftId, addr.c_str()); + } + + int assign(const raft_id& raftId, const mraft::Role& role) { + WRAP_CALL(assign, raftId, role); + } + + int remove(const raft_id& raftId) { + WRAP_CALL(remove, raftId); + } + + int apply(const std::string& buffer) { + struct raft_buffer bufs = { + .base = (void*)buffer.c_str(), + .len = buffer.size() + }; + WRAP_CALL(apply, &bufs, 1U); + } + + int barrier() { + WRAP_CALL(barrier); + } + + int isolate(bool flag) { + auto raft_io = raft.get_raft_io(); + return mraft_io_simulate_dead(raft_io, flag); + } + + mraft::ServerInfo get_leader() const { + mraft::ServerInfo leaderInfo{0, ""}; + try { + leaderInfo = raft.get_leader(); + } catch (mraft::RaftException& ex) {} + return leaderInfo; + } + + int transfer(raft_id transferToId) { + WRAP_CALL(transfer, transferToId); + } + + void suspend(const tl::request& req, unsigned msec) const { + req.respond(0); + usleep(msec*1000); + } + + std::string get_fsm_content() const { + std::unique_lock lock{fsm.content_mtx}; + return fsm.content; + } + + #undef WRAP_CALL + + raft_id id; + FSM fsm; + std::unique_ptr log; + mraft::Raft raft; +}; + +static inline int runWorker(int fdToMaster, raft_id raftID, const WorkerOptions& options) { + + if(!options.tracePath.empty()) { + auto stdoutFilename = options.tracePath + "/" + std::to_string(raftID) + ".out"; + auto stderrFilename = options.tracePath + "/" + std::to_string(raftID) + ".err"; + FILE* _ = freopen(stdoutFilename.c_str(), "a+", stdout); + _ = freopen(stderrFilename.c_str(), "a+", stderr); + } + + auto engine = tl::engine(options.protocol, MARGO_SERVER_MODE); + engine.set_log_level(options.logLevel); + engine.enable_remote_shutdown(); + + std::unique_ptr log; + if(options.logType == "abt-io") { + auto config = std::string{"{\"path\":\""} + options.logPath + "\"}"; + log = std::make_unique( + raftID, ABT_IO_INSTANCE_NULL, config.c_str(), engine.get_margo_instance()); + } else { + if(options.logPath != ".") + margo_warning(engine.get_margo_instance(), + "--log-path set to \"%s\" but will be ignored with the memory log", + options.logPath.c_str()); + log = std::make_unique(); + } + + auto worker = new Worker{engine, raftID, std::move(log), options.heartbeatTimeoutMs}; + engine.push_prefinalize_callback([worker](){ delete worker; }); + + auto address = static_cast(engine.self()); + address.resize(1024); + ssize_t _ = write(fdToMaster, address.data(), 1024); + close(fdToMaster); + + engine.wait_for_finalize(); + return 0; +} + +#endif