From 8a3e4eee8587d07be7caceee640c729a3b45318a Mon Sep 17 00:00:00 2001 From: Carlos Segarra Date: Thu, 20 Jul 2023 15:54:34 +0000 Subject: [PATCH 1/9] makespan: helpful debugging --- include/faabric/transport/Message.h | 8 ++++++ src/mpi/MpiWorld.cpp | 43 ++++++++++++++++++++++------ src/transport/PointToPointBroker.cpp | 2 +- 3 files changed, 44 insertions(+), 9 deletions(-) diff --git a/include/faabric/transport/Message.h b/include/faabric/transport/Message.h index fe0d5a22f..6af99b051 100644 --- a/include/faabric/transport/Message.h +++ b/include/faabric/transport/Message.h @@ -1,5 +1,6 @@ #pragma once +#include #include #include #include @@ -36,6 +37,13 @@ enum class MessageResponseCode ERROR }; +static std::map MessageResponseCodeText = { + { MessageResponseCode::SUCCESS, "Success" }, + { MessageResponseCode::TERM, "Connection terminated" }, + { MessageResponseCode::TIMEOUT, "Message timed out" }, + { MessageResponseCode::ERROR, "Error" }, +}; + /** * Represents message data passed around the transport layer. Essentially an * array of bytes, with a size and a flag to say whether there's more data to diff --git a/src/mpi/MpiWorld.cpp b/src/mpi/MpiWorld.cpp index 798e10cf1..feef1a8c0 100644 --- a/src/mpi/MpiWorld.cpp +++ b/src/mpi/MpiWorld.cpp @@ -298,6 +298,23 @@ void MpiWorld::initLocalRemoteLeaders() std::iter_swap(it.second.begin(), std::min_element(it.second.begin(), it.second.end())); } + + SPDLOG_INFO("{}:{}:{} setting local-remote leaders (local leader: {})", + thisRankMsg->appid(), + thisRankMsg->groupid(), + thisRankMsg->groupidx(), + localLeader); + for (auto it : ranksForHost) { + std::string line = fmt::format("{}:", it.first); + for (auto h : it.second) { + line = fmt::format("{} {}", line, h); + } + SPDLOG_INFO("{}:{}:{} local-remote-leaders: {}", + thisRankMsg->appid(), + thisRankMsg->groupid(), + thisRankMsg->groupidx(), + line); + } } void MpiWorld::getCartesianRank(int rank, @@ -1456,18 +1473,28 @@ std::shared_ptr MpiWorld::recvBatchReturnLast(int sendRank, if (isLocal) { // First receive messages that happened before us for (int i = 0; i < batchSize - 1; i++) { - SPDLOG_TRACE("MPI - pending recv {} -> {}", sendRank, recvRank); - auto pendingMsg = getLocalQueue(sendRank, recvRank)->dequeue(); - - // Put the unacked message in the UMB - assert(!msgIt->isAcknowledged()); - msgIt->acknowledge(pendingMsg); - msgIt++; + try { + SPDLOG_TRACE("MPI - pending recv {} -> {}", sendRank, recvRank); + auto pendingMsg = getLocalQueue(sendRank, recvRank)->dequeue(); + + // Put the unacked message in the UMB + assert(!msgIt->isAcknowledged()); + msgIt->acknowledge(pendingMsg); + msgIt++; + } catch (faabric::util::QueueTimeoutException& e) { + SPDLOG_ERROR("Timed out with: MPI - pending recv {} -> {}", sendRank, recvRank); + throw e; + } } // Finally receive the message corresponding to us SPDLOG_TRACE("MPI - recv {} -> {}", sendRank, recvRank); - ourMsg = getLocalQueue(sendRank, recvRank)->dequeue(); + try { + ourMsg = getLocalQueue(sendRank, recvRank)->dequeue(); + } catch (faabric::util::QueueTimeoutException& e) { + SPDLOG_ERROR("Timed out with: MPI - recv {} -> {}", sendRank, recvRank); + throw e; + } } else { // First receive messages that happened before us for (int i = 0; i < batchSize - 1; i++) { diff --git a/src/transport/PointToPointBroker.cpp b/src/transport/PointToPointBroker.cpp index 52a9f08af..9b440da3b 100644 --- a/src/transport/PointToPointBroker.cpp +++ b/src/transport/PointToPointBroker.cpp @@ -756,7 +756,7 @@ std::vector PointToPointBroker::recvMessage(int groupId, SPDLOG_WARN( "Error {} ({}) when awaiting a message ({}:{} seq: {} label: {})", static_cast(recvMsg.getResponseCode()), - nng_strerror(static_cast(recvMsg.getResponseCode())), + MessageResponseCodeText.at(recvMsg.getResponseCode()), sendIdx, recvIdx, expectedSeqNum, From 574fc14a2e191c8f466fb7fa845f298d055b1665 Mon Sep 17 00:00:00 2001 From: Carlos Segarra Date: Thu, 20 Jul 2023 16:57:32 +0000 Subject: [PATCH 2/9] mpi: more cleanup --- src/mpi/MpiWorld.cpp | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/src/mpi/MpiWorld.cpp b/src/mpi/MpiWorld.cpp index feef1a8c0..c6f969d81 100644 --- a/src/mpi/MpiWorld.cpp +++ b/src/mpi/MpiWorld.cpp @@ -1435,6 +1435,9 @@ std::shared_ptr MpiWorld::getLocalQueue(int sendRank, // Note - the queues themselves perform concurrency control void MpiWorld::initLocalQueues() { + // TODO: iirc we used _not_ to clean stuff here, as it caused races + // during migration, so maybe this breaks migration (FIXME) + localQueues.clear(); localQueues.resize(size * size); for (const int sendRank : ranksForHost[thisHost]) { for (const int recvRank : ranksForHost[thisHost]) { @@ -1482,7 +1485,12 @@ std::shared_ptr MpiWorld::recvBatchReturnLast(int sendRank, msgIt->acknowledge(pendingMsg); msgIt++; } catch (faabric::util::QueueTimeoutException& e) { - SPDLOG_ERROR("Timed out with: MPI - pending recv {} -> {}", sendRank, recvRank); + SPDLOG_ERROR("{}:{}:{} Timed out with: MPI - pending recv {} -> {}", + thisRankMsg->appid(), + thisRankMsg->groupid(), + thisRankMsg->groupidx(), + sendRank, + recvRank); throw e; } } @@ -1492,7 +1500,12 @@ std::shared_ptr MpiWorld::recvBatchReturnLast(int sendRank, try { ourMsg = getLocalQueue(sendRank, recvRank)->dequeue(); } catch (faabric::util::QueueTimeoutException& e) { - SPDLOG_ERROR("Timed out with: MPI - recv {} -> {}", sendRank, recvRank); + SPDLOG_ERROR("{}:{}:{} Timed out with: MPI - recv {} -> {}", + thisRankMsg->appid(), + thisRankMsg->groupid(), + thisRankMsg->groupidx(), + sendRank, + recvRank); throw e; } } else { From 43280f29d7d1ee045db16367b20c714685bb15f5 Mon Sep 17 00:00:00 2001 From: Carlos Segarra Date: Mon, 24 Jul 2023 16:38:00 +0000 Subject: [PATCH 3/9] endpoint: try to catch bug --- src/endpoint/FaabricEndpointHandler.cpp | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/endpoint/FaabricEndpointHandler.cpp b/src/endpoint/FaabricEndpointHandler.cpp index a9387fa6c..0e8edab29 100644 --- a/src/endpoint/FaabricEndpointHandler.cpp +++ b/src/endpoint/FaabricEndpointHandler.cpp @@ -33,9 +33,12 @@ void FaabricEndpointHandler::onRequest( // TODO: for the moment we keep the endpoint handler, but we are not meant // to receive any requests here. Eventually we will delete it - SPDLOG_ERROR("Faabric handler received empty request"); + SPDLOG_ERROR("Faabric handler received request? (body: {})", request.body()); + throw std::runtime_error("WHAT?"); + /* response.result(beast::http::status::bad_request); response.body() = std::string("Empty request"); ctx.sendFunction(std::move(response)); + */ } } From 3ffef084c7e587867c1748b550a0678020ab0445 Mon Sep 17 00:00:00 2001 From: Carlos Segarra Date: Mon, 24 Jul 2023 17:29:37 +0000 Subject: [PATCH 4/9] mpi: add contains method to registry --- include/faabric/mpi/MpiWorldRegistry.h | 2 ++ src/mpi/MpiWorldRegistry.cpp | 7 +++++++ 2 files changed, 9 insertions(+) diff --git a/include/faabric/mpi/MpiWorldRegistry.h b/include/faabric/mpi/MpiWorldRegistry.h index 993203bb5..baac4f7b3 100644 --- a/include/faabric/mpi/MpiWorldRegistry.h +++ b/include/faabric/mpi/MpiWorldRegistry.h @@ -16,6 +16,8 @@ class MpiWorldRegistry MpiWorld& getWorld(int worldId); + bool worldExists(int worldId); + void clear(); private: diff --git a/src/mpi/MpiWorldRegistry.cpp b/src/mpi/MpiWorldRegistry.cpp index 20fe3dbe8..652953700 100644 --- a/src/mpi/MpiWorldRegistry.cpp +++ b/src/mpi/MpiWorldRegistry.cpp @@ -67,6 +67,13 @@ MpiWorld& MpiWorldRegistry::getWorld(int worldId) return worldMap[worldId]; } +bool MpiWorldRegistry::worldExists(int worldId) +{ + faabric::util::SharedLock lock(registryMutex); + + return worldMap.contains(worldId); +} + void MpiWorldRegistry::clear() { faabric::util::FullLock lock(registryMutex); From 25ca948e057c665ad8063e4d948eaed9205cfcef Mon Sep 17 00:00:00 2001 From: Carlos Segarra Date: Mon, 24 Jul 2023 18:11:25 +0000 Subject: [PATCH 5/9] endpoint: be less strict with empty requests --- src/endpoint/FaabricEndpointHandler.cpp | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/src/endpoint/FaabricEndpointHandler.cpp b/src/endpoint/FaabricEndpointHandler.cpp index 0e8edab29..2c4b11a82 100644 --- a/src/endpoint/FaabricEndpointHandler.cpp +++ b/src/endpoint/FaabricEndpointHandler.cpp @@ -31,14 +31,20 @@ void FaabricEndpointHandler::onRequest( // Text response type response.set(header::content_type, "text/plain"); + // Request body contains a string that is formatted as a JSON + std::string requestStr = request.body(); + + // Handle JSON // TODO: for the moment we keep the endpoint handler, but we are not meant // to receive any requests here. Eventually we will delete it - SPDLOG_ERROR("Faabric handler received request? (body: {})", request.body()); + if (requestStr.empty()) { + SPDLOG_ERROR("Planner handler received empty request"); + response.result(beast::http::status::bad_request); + response.body() = std::string("Empty request"); + return ctx.sendFunction(std::move(response)); + } + + SPDLOG_ERROR("Faabric handler non-empty request? (body: {})", request.body()); throw std::runtime_error("WHAT?"); - /* - response.result(beast::http::status::bad_request); - response.body() = std::string("Empty request"); - ctx.sendFunction(std::move(response)); - */ } } From 12c549e5edca56b904c583817931c9a9d402e4bd Mon Sep 17 00:00:00 2001 From: Carlos Segarra Date: Tue, 25 Jul 2023 08:06:28 +0000 Subject: [PATCH 6/9] debug: add try/catch arround mpi/transport --- src/mpi/MpiWorld.cpp | 10 ++++++++++ src/transport/PointToPointBroker.cpp | 24 +++++++++++++++++++++--- 2 files changed, 31 insertions(+), 3 deletions(-) diff --git a/src/mpi/MpiWorld.cpp b/src/mpi/MpiWorld.cpp index c6f969d81..610cc69ae 100644 --- a/src/mpi/MpiWorld.cpp +++ b/src/mpi/MpiWorld.cpp @@ -59,6 +59,7 @@ void MpiWorld::sendRemoteMpiMessage(std::string dstHost, if (!msg->SerializeToString(&serialisedBuffer)) { throw std::runtime_error("Error serialising message"); } + try { broker.sendMessage( thisRankMsg->groupid(), sendRank, @@ -67,6 +68,15 @@ void MpiWorld::sendRemoteMpiMessage(std::string dstHost, serialisedBuffer.size(), dstHost, true); + } catch (std::runtime_error& e) { + SPDLOG_ERROR("{}:{}:{} Timed out with: MPI - send {} -> {}", + thisRankMsg->appid(), + thisRankMsg->groupid(), + thisRankMsg->groupidx(), + sendRank, + recvRank); + throw e; + } } std::shared_ptr MpiWorld::recvRemoteMpiMessage(int sendRank, diff --git a/src/transport/PointToPointBroker.cpp b/src/transport/PointToPointBroker.cpp index 9b440da3b..abe0c0c18 100644 --- a/src/transport/PointToPointBroker.cpp +++ b/src/transport/PointToPointBroker.cpp @@ -666,8 +666,17 @@ void PointToPointBroker::sendMessage(int groupId, localSendSeqNum, endpoint.getAddress()); - endpoint.send(NO_HEADER, buffer, bufferSize, localSendSeqNum); - + try { + endpoint.send(NO_HEADER, buffer, bufferSize, localSendSeqNum); + } catch (std::runtime_error& e) { + SPDLOG_ERROR("Timed-out with local point-to-point message {}:{}:{} (seq: {}) to {}", + groupId, + sendIdx, + recvIdx, + localSendSeqNum, + endpoint.getAddress()); + throw e; + } } else { auto cli = getClient(host); faabric::PointToPointMessage msg; @@ -689,7 +698,16 @@ void PointToPointBroker::sendMessage(int groupId, remoteSendSeqNum, host); - cli->sendMessage(msg, remoteSendSeqNum); + try { + cli->sendMessage(msg, remoteSendSeqNum); + } catch (std::runtime_error& e) { + SPDLOG_TRACE("Timed-out with remote point-to-point message {}:{}:{} (seq: {}) to {}", + groupId, + sendIdx, + recvIdx, + remoteSendSeqNum, + host); + } } } From 7e77f25172b580cb510949e402fe499fe7d31fe6 Mon Sep 17 00:00:00 2001 From: Carlos Segarra Date: Tue, 25 Jul 2023 09:28:36 +0000 Subject: [PATCH 7/9] mpi: know what everybody is doing in case of an error --- src/mpi/MpiWorld.cpp | 14 ++++++++++++-- src/transport/PointToPointServer.cpp | 1 + 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/src/mpi/MpiWorld.cpp b/src/mpi/MpiWorld.cpp index 610cc69ae..046551213 100644 --- a/src/mpi/MpiWorld.cpp +++ b/src/mpi/MpiWorld.cpp @@ -82,8 +82,18 @@ void MpiWorld::sendRemoteMpiMessage(std::string dstHost, std::shared_ptr MpiWorld::recvRemoteMpiMessage(int sendRank, int recvRank) { - auto msg = - broker.recvMessage(thisRankMsg->groupid(), sendRank, recvRank, true); + std::vector msg; + try { + msg = + broker.recvMessage(thisRankMsg->groupid(), sendRank, recvRank, true); + } catch (std::runtime_error& e) { + SPDLOG_ERROR("{}:{}:{} Timed out with: MPI - recv (remote) {} -> {}", + thisRankMsg->appid(), + thisRankMsg->groupid(), + thisRankMsg->groupidx(), + sendRank, + recvRank); + } PARSE_MSG(MPIMessage, msg.data(), msg.size()); return std::make_shared(parsedMsg); } diff --git a/src/transport/PointToPointServer.cpp b/src/transport/PointToPointServer.cpp index a62a021ec..968fa734a 100644 --- a/src/transport/PointToPointServer.cpp +++ b/src/transport/PointToPointServer.cpp @@ -35,6 +35,7 @@ void PointToPointServer::doAsyncRecv(transport::Message& message) // Send the message locally to the downstream socket, add the // sequence number for in-order reception + // FIXME: this can sometimes throw an exception broker.sendMessage(parsedMsg.groupid(), parsedMsg.sendidx(), parsedMsg.recvidx(), From f9cd464a3f016e6e28af4ea16756cfa95001e511 Mon Sep 17 00:00:00 2001 From: Carlos Segarra Date: Tue, 25 Jul 2023 11:32:55 +0000 Subject: [PATCH 8/9] makespan: finally working --- src/endpoint/FaabricEndpointHandler.cpp | 4 +-- src/mpi/MpiWorld.cpp | 37 ++++++------------------- src/transport/PointToPointServer.cpp | 1 - src/util/config.cpp | 4 ++- 4 files changed, 14 insertions(+), 32 deletions(-) diff --git a/src/endpoint/FaabricEndpointHandler.cpp b/src/endpoint/FaabricEndpointHandler.cpp index 2c4b11a82..54831eb5c 100644 --- a/src/endpoint/FaabricEndpointHandler.cpp +++ b/src/endpoint/FaabricEndpointHandler.cpp @@ -44,7 +44,7 @@ void FaabricEndpointHandler::onRequest( return ctx.sendFunction(std::move(response)); } - SPDLOG_ERROR("Faabric handler non-empty request? (body: {})", request.body()); - throw std::runtime_error("WHAT?"); + SPDLOG_ERROR("Worker HTTP handler received non-empty request (body: {})", request.body()); + throw std::runtime_error("Worker HTTP handler received non-empty request"); } } diff --git a/src/mpi/MpiWorld.cpp b/src/mpi/MpiWorld.cpp index 046551213..acfc538b8 100644 --- a/src/mpi/MpiWorld.cpp +++ b/src/mpi/MpiWorld.cpp @@ -60,14 +60,14 @@ void MpiWorld::sendRemoteMpiMessage(std::string dstHost, throw std::runtime_error("Error serialising message"); } try { - broker.sendMessage( - thisRankMsg->groupid(), - sendRank, - recvRank, - reinterpret_cast(serialisedBuffer.data()), - serialisedBuffer.size(), - dstHost, - true); + broker.sendMessage( + thisRankMsg->groupid(), + sendRank, + recvRank, + reinterpret_cast(serialisedBuffer.data()), + serialisedBuffer.size(), + dstHost, + true); } catch (std::runtime_error& e) { SPDLOG_ERROR("{}:{}:{} Timed out with: MPI - send {} -> {}", thisRankMsg->appid(), @@ -93,6 +93,7 @@ std::shared_ptr MpiWorld::recvRemoteMpiMessage(int sendRank, thisRankMsg->groupidx(), sendRank, recvRank); + throw e; } PARSE_MSG(MPIMessage, msg.data(), msg.size()); return std::make_shared(parsedMsg); @@ -318,23 +319,6 @@ void MpiWorld::initLocalRemoteLeaders() std::iter_swap(it.second.begin(), std::min_element(it.second.begin(), it.second.end())); } - - SPDLOG_INFO("{}:{}:{} setting local-remote leaders (local leader: {})", - thisRankMsg->appid(), - thisRankMsg->groupid(), - thisRankMsg->groupidx(), - localLeader); - for (auto it : ranksForHost) { - std::string line = fmt::format("{}:", it.first); - for (auto h : it.second) { - line = fmt::format("{} {}", line, h); - } - SPDLOG_INFO("{}:{}:{} local-remote-leaders: {}", - thisRankMsg->appid(), - thisRankMsg->groupid(), - thisRankMsg->groupidx(), - line); - } } void MpiWorld::getCartesianRank(int rank, @@ -1455,9 +1439,6 @@ std::shared_ptr MpiWorld::getLocalQueue(int sendRank, // Note - the queues themselves perform concurrency control void MpiWorld::initLocalQueues() { - // TODO: iirc we used _not_ to clean stuff here, as it caused races - // during migration, so maybe this breaks migration (FIXME) - localQueues.clear(); localQueues.resize(size * size); for (const int sendRank : ranksForHost[thisHost]) { for (const int recvRank : ranksForHost[thisHost]) { diff --git a/src/transport/PointToPointServer.cpp b/src/transport/PointToPointServer.cpp index 968fa734a..a62a021ec 100644 --- a/src/transport/PointToPointServer.cpp +++ b/src/transport/PointToPointServer.cpp @@ -35,7 +35,6 @@ void PointToPointServer::doAsyncRecv(transport::Message& message) // Send the message locally to the downstream socket, add the // sequence number for in-order reception - // FIXME: this can sometimes throw an exception broker.sendMessage(parsedMsg.groupid(), parsedMsg.sendidx(), parsedMsg.recvidx(), diff --git a/src/util/config.cpp b/src/util/config.cpp index bea8517d4..4df46a5d9 100644 --- a/src/util/config.cpp +++ b/src/util/config.cpp @@ -70,8 +70,10 @@ void SystemConfig::initialise() this->getSystemConfIntParam("STATE_SERVER_THREADS", "2"); snapshotServerThreads = this->getSystemConfIntParam("SNAPSHOT_SERVER_THREADS", "2"); + // FIXME: temporarily set this value to a higher number to work-around: + // https://github.com/faasm/faabric/issues/335 pointToPointServerThreads = - this->getSystemConfIntParam("POINT_TO_POINT_SERVER_THREADS", "2"); + this->getSystemConfIntParam("POINT_TO_POINT_SERVER_THREADS", "8"); // Dirty tracking dirtyTrackingMode = getEnvVar("DIRTY_TRACKING_MODE", "segfault"); From 5251bba00bc53f8d5557a094e85d172c25146458 Mon Sep 17 00:00:00 2001 From: Carlos Segarra Date: Tue, 25 Jul 2023 11:49:54 +0000 Subject: [PATCH 9/9] nits: run clang format --- src/endpoint/FaabricEndpointHandler.cpp | 3 ++- src/mpi/MpiWorld.cpp | 13 +++++++------ src/transport/PointToPointBroker.cpp | 6 ++++-- 3 files changed, 13 insertions(+), 9 deletions(-) diff --git a/src/endpoint/FaabricEndpointHandler.cpp b/src/endpoint/FaabricEndpointHandler.cpp index 54831eb5c..d67037c80 100644 --- a/src/endpoint/FaabricEndpointHandler.cpp +++ b/src/endpoint/FaabricEndpointHandler.cpp @@ -44,7 +44,8 @@ void FaabricEndpointHandler::onRequest( return ctx.sendFunction(std::move(response)); } - SPDLOG_ERROR("Worker HTTP handler received non-empty request (body: {})", request.body()); + SPDLOG_ERROR("Worker HTTP handler received non-empty request (body: {})", + request.body()); throw std::runtime_error("Worker HTTP handler received non-empty request"); } } diff --git a/src/mpi/MpiWorld.cpp b/src/mpi/MpiWorld.cpp index acfc538b8..d73afef94 100644 --- a/src/mpi/MpiWorld.cpp +++ b/src/mpi/MpiWorld.cpp @@ -1486,12 +1486,13 @@ std::shared_ptr MpiWorld::recvBatchReturnLast(int sendRank, msgIt->acknowledge(pendingMsg); msgIt++; } catch (faabric::util::QueueTimeoutException& e) { - SPDLOG_ERROR("{}:{}:{} Timed out with: MPI - pending recv {} -> {}", - thisRankMsg->appid(), - thisRankMsg->groupid(), - thisRankMsg->groupidx(), - sendRank, - recvRank); + SPDLOG_ERROR( + "{}:{}:{} Timed out with: MPI - pending recv {} -> {}", + thisRankMsg->appid(), + thisRankMsg->groupid(), + thisRankMsg->groupidx(), + sendRank, + recvRank); throw e; } } diff --git a/src/transport/PointToPointBroker.cpp b/src/transport/PointToPointBroker.cpp index abe0c0c18..849e0520b 100644 --- a/src/transport/PointToPointBroker.cpp +++ b/src/transport/PointToPointBroker.cpp @@ -669,7 +669,8 @@ void PointToPointBroker::sendMessage(int groupId, try { endpoint.send(NO_HEADER, buffer, bufferSize, localSendSeqNum); } catch (std::runtime_error& e) { - SPDLOG_ERROR("Timed-out with local point-to-point message {}:{}:{} (seq: {}) to {}", + SPDLOG_ERROR("Timed-out with local point-to-point message {}:{}:{} " + "(seq: {}) to {}", groupId, sendIdx, recvIdx, @@ -701,7 +702,8 @@ void PointToPointBroker::sendMessage(int groupId, try { cli->sendMessage(msg, remoteSendSeqNum); } catch (std::runtime_error& e) { - SPDLOG_TRACE("Timed-out with remote point-to-point message {}:{}:{} (seq: {}) to {}", + SPDLOG_TRACE("Timed-out with remote point-to-point message " + "{}:{}:{} (seq: {}) to {}", groupId, sendIdx, recvIdx,