From 4a079db5075d1be2ca61c1796dfcfbb9b9d3a202 Mon Sep 17 00:00:00 2001 From: Carlos Date: Tue, 25 Jul 2023 14:48:16 +0100 Subject: [PATCH] Workaround for PTP message overflow (#336) * makespan: helpful debugging * mpi: more cleanup * endpoint: try to catch bug * mpi: add contains method to registry * endpoint: be less strict with empty requests * debug: add try/catch arround mpi/transport * mpi: know what everybody is doing in case of an error * makespan: finally working * nits: run clang format --- include/faabric/mpi/MpiWorldRegistry.h | 2 + include/faabric/transport/Message.h | 8 +++ src/endpoint/FaabricEndpointHandler.cpp | 18 ++++-- src/mpi/MpiWorld.cpp | 78 +++++++++++++++++++------ src/mpi/MpiWorldRegistry.cpp | 7 +++ src/transport/PointToPointBroker.cpp | 28 +++++++-- src/util/config.cpp | 4 +- 7 files changed, 118 insertions(+), 27 deletions(-) diff --git a/include/faabric/mpi/MpiWorldRegistry.h b/include/faabric/mpi/MpiWorldRegistry.h index 993203bb5..baac4f7b3 100644 --- a/include/faabric/mpi/MpiWorldRegistry.h +++ b/include/faabric/mpi/MpiWorldRegistry.h @@ -16,6 +16,8 @@ class MpiWorldRegistry MpiWorld& getWorld(int worldId); + bool worldExists(int worldId); + void clear(); private: diff --git a/include/faabric/transport/Message.h b/include/faabric/transport/Message.h index fe0d5a22f..6af99b051 100644 --- a/include/faabric/transport/Message.h +++ b/include/faabric/transport/Message.h @@ -1,5 +1,6 @@ #pragma once +#include #include #include #include @@ -36,6 +37,13 @@ enum class MessageResponseCode ERROR }; +static std::map MessageResponseCodeText = { + { MessageResponseCode::SUCCESS, "Success" }, + { MessageResponseCode::TERM, "Connection terminated" }, + { MessageResponseCode::TIMEOUT, "Message timed out" }, + { MessageResponseCode::ERROR, "Error" }, +}; + /** * Represents message data passed around the transport layer. Essentially an * array of bytes, with a size and a flag to say whether there's more data to diff --git a/src/endpoint/FaabricEndpointHandler.cpp b/src/endpoint/FaabricEndpointHandler.cpp index a9387fa6c..d67037c80 100644 --- a/src/endpoint/FaabricEndpointHandler.cpp +++ b/src/endpoint/FaabricEndpointHandler.cpp @@ -31,11 +31,21 @@ void FaabricEndpointHandler::onRequest( // Text response type response.set(header::content_type, "text/plain"); + // Request body contains a string that is formatted as a JSON + std::string requestStr = request.body(); + + // Handle JSON // TODO: for the moment we keep the endpoint handler, but we are not meant // to receive any requests here. Eventually we will delete it - SPDLOG_ERROR("Faabric handler received empty request"); - response.result(beast::http::status::bad_request); - response.body() = std::string("Empty request"); - ctx.sendFunction(std::move(response)); + if (requestStr.empty()) { + SPDLOG_ERROR("Planner handler received empty request"); + response.result(beast::http::status::bad_request); + response.body() = std::string("Empty request"); + return ctx.sendFunction(std::move(response)); + } + + SPDLOG_ERROR("Worker HTTP handler received non-empty request (body: {})", + request.body()); + throw std::runtime_error("Worker HTTP handler received non-empty request"); } } diff --git a/src/mpi/MpiWorld.cpp b/src/mpi/MpiWorld.cpp index 798e10cf1..d73afef94 100644 --- a/src/mpi/MpiWorld.cpp +++ b/src/mpi/MpiWorld.cpp @@ -59,21 +59,42 @@ void MpiWorld::sendRemoteMpiMessage(std::string dstHost, if (!msg->SerializeToString(&serialisedBuffer)) { throw std::runtime_error("Error serialising message"); } - broker.sendMessage( - thisRankMsg->groupid(), - sendRank, - recvRank, - reinterpret_cast(serialisedBuffer.data()), - serialisedBuffer.size(), - dstHost, - true); + try { + broker.sendMessage( + thisRankMsg->groupid(), + sendRank, + recvRank, + reinterpret_cast(serialisedBuffer.data()), + serialisedBuffer.size(), + dstHost, + true); + } catch (std::runtime_error& e) { + SPDLOG_ERROR("{}:{}:{} Timed out with: MPI - send {} -> {}", + thisRankMsg->appid(), + thisRankMsg->groupid(), + thisRankMsg->groupidx(), + sendRank, + recvRank); + throw e; + } } std::shared_ptr MpiWorld::recvRemoteMpiMessage(int sendRank, int recvRank) { - auto msg = - broker.recvMessage(thisRankMsg->groupid(), sendRank, recvRank, true); + std::vector msg; + try { + msg = + broker.recvMessage(thisRankMsg->groupid(), sendRank, recvRank, true); + } catch (std::runtime_error& e) { + SPDLOG_ERROR("{}:{}:{} Timed out with: MPI - recv (remote) {} -> {}", + thisRankMsg->appid(), + thisRankMsg->groupid(), + thisRankMsg->groupidx(), + sendRank, + recvRank); + throw e; + } PARSE_MSG(MPIMessage, msg.data(), msg.size()); return std::make_shared(parsedMsg); } @@ -1456,18 +1477,39 @@ std::shared_ptr MpiWorld::recvBatchReturnLast(int sendRank, if (isLocal) { // First receive messages that happened before us for (int i = 0; i < batchSize - 1; i++) { - SPDLOG_TRACE("MPI - pending recv {} -> {}", sendRank, recvRank); - auto pendingMsg = getLocalQueue(sendRank, recvRank)->dequeue(); - - // Put the unacked message in the UMB - assert(!msgIt->isAcknowledged()); - msgIt->acknowledge(pendingMsg); - msgIt++; + try { + SPDLOG_TRACE("MPI - pending recv {} -> {}", sendRank, recvRank); + auto pendingMsg = getLocalQueue(sendRank, recvRank)->dequeue(); + + // Put the unacked message in the UMB + assert(!msgIt->isAcknowledged()); + msgIt->acknowledge(pendingMsg); + msgIt++; + } catch (faabric::util::QueueTimeoutException& e) { + SPDLOG_ERROR( + "{}:{}:{} Timed out with: MPI - pending recv {} -> {}", + thisRankMsg->appid(), + thisRankMsg->groupid(), + thisRankMsg->groupidx(), + sendRank, + recvRank); + throw e; + } } // Finally receive the message corresponding to us SPDLOG_TRACE("MPI - recv {} -> {}", sendRank, recvRank); - ourMsg = getLocalQueue(sendRank, recvRank)->dequeue(); + try { + ourMsg = getLocalQueue(sendRank, recvRank)->dequeue(); + } catch (faabric::util::QueueTimeoutException& e) { + SPDLOG_ERROR("{}:{}:{} Timed out with: MPI - recv {} -> {}", + thisRankMsg->appid(), + thisRankMsg->groupid(), + thisRankMsg->groupidx(), + sendRank, + recvRank); + throw e; + } } else { // First receive messages that happened before us for (int i = 0; i < batchSize - 1; i++) { diff --git a/src/mpi/MpiWorldRegistry.cpp b/src/mpi/MpiWorldRegistry.cpp index 20fe3dbe8..652953700 100644 --- a/src/mpi/MpiWorldRegistry.cpp +++ b/src/mpi/MpiWorldRegistry.cpp @@ -67,6 +67,13 @@ MpiWorld& MpiWorldRegistry::getWorld(int worldId) return worldMap[worldId]; } +bool MpiWorldRegistry::worldExists(int worldId) +{ + faabric::util::SharedLock lock(registryMutex); + + return worldMap.contains(worldId); +} + void MpiWorldRegistry::clear() { faabric::util::FullLock lock(registryMutex); diff --git a/src/transport/PointToPointBroker.cpp b/src/transport/PointToPointBroker.cpp index 52a9f08af..849e0520b 100644 --- a/src/transport/PointToPointBroker.cpp +++ b/src/transport/PointToPointBroker.cpp @@ -666,8 +666,18 @@ void PointToPointBroker::sendMessage(int groupId, localSendSeqNum, endpoint.getAddress()); - endpoint.send(NO_HEADER, buffer, bufferSize, localSendSeqNum); - + try { + endpoint.send(NO_HEADER, buffer, bufferSize, localSendSeqNum); + } catch (std::runtime_error& e) { + SPDLOG_ERROR("Timed-out with local point-to-point message {}:{}:{} " + "(seq: {}) to {}", + groupId, + sendIdx, + recvIdx, + localSendSeqNum, + endpoint.getAddress()); + throw e; + } } else { auto cli = getClient(host); faabric::PointToPointMessage msg; @@ -689,7 +699,17 @@ void PointToPointBroker::sendMessage(int groupId, remoteSendSeqNum, host); - cli->sendMessage(msg, remoteSendSeqNum); + try { + cli->sendMessage(msg, remoteSendSeqNum); + } catch (std::runtime_error& e) { + SPDLOG_TRACE("Timed-out with remote point-to-point message " + "{}:{}:{} (seq: {}) to {}", + groupId, + sendIdx, + recvIdx, + remoteSendSeqNum, + host); + } } } @@ -756,7 +776,7 @@ std::vector PointToPointBroker::recvMessage(int groupId, SPDLOG_WARN( "Error {} ({}) when awaiting a message ({}:{} seq: {} label: {})", static_cast(recvMsg.getResponseCode()), - nng_strerror(static_cast(recvMsg.getResponseCode())), + MessageResponseCodeText.at(recvMsg.getResponseCode()), sendIdx, recvIdx, expectedSeqNum, diff --git a/src/util/config.cpp b/src/util/config.cpp index bea8517d4..4df46a5d9 100644 --- a/src/util/config.cpp +++ b/src/util/config.cpp @@ -70,8 +70,10 @@ void SystemConfig::initialise() this->getSystemConfIntParam("STATE_SERVER_THREADS", "2"); snapshotServerThreads = this->getSystemConfIntParam("SNAPSHOT_SERVER_THREADS", "2"); + // FIXME: temporarily set this value to a higher number to work-around: + // https://github.com/faasm/faabric/issues/335 pointToPointServerThreads = - this->getSystemConfIntParam("POINT_TO_POINT_SERVER_THREADS", "2"); + this->getSystemConfIntParam("POINT_TO_POINT_SERVER_THREADS", "8"); // Dirty tracking dirtyTrackingMode = getEnvVar("DIRTY_TRACKING_MODE", "segfault");