diff --git a/lib/remote/jsonrpcconnection.cpp b/lib/remote/jsonrpcconnection.cpp index 44f1662dde..0dede8b6e5 100644 --- a/lib/remote/jsonrpcconnection.cpp +++ b/lib/remote/jsonrpcconnection.cpp @@ -60,13 +60,19 @@ void JsonRpcConnection::Start() void JsonRpcConnection::HandleIncomingMessages(boost::asio::yield_context yc) { + namespace ch = std::chrono; + + auto toMilliseconds ([](ch::steady_clock::duration d) { + return ch::duration_cast(d).count(); + }); + m_Stream->next_layer().SetSeen(&m_Seen); for (;;) { - String message; + String jsonString; try { - message = JsonRpc::ReadMessage(m_Stream, yc, m_Endpoint ? -1 : 1024 * 1024); + jsonString = JsonRpc::ReadMessage(m_Stream, yc, m_Endpoint ? -1 : 1024 * 1024); } catch (const std::exception& ex) { Log(m_ShuttingDown ? LogDebug : LogNotice, "JsonRpcConnection") << "Error while reading JSON-RPC message for identity '" << m_Identity @@ -76,17 +82,50 @@ void JsonRpcConnection::HandleIncomingMessages(boost::asio::yield_context yc) } m_Seen = Utility::GetTime(); + if (m_Endpoint) { + m_Endpoint->AddMessageReceived(jsonString.GetLength()); + } + + String rpcMethod("UNKNOWN"); + ch::steady_clock::duration cpuBoundDuration(0); + auto start (ch::steady_clock::now()); try { CpuBoundWork handleMessage (yc); + // Cache the elapsed time to acquire a CPU semaphore used to detect extremely heavy workloads. + cpuBoundDuration = ch::steady_clock::now() - start; + + Dictionary::Ptr message = JsonRpc::DecodeMessage(jsonString); + if (String method = message->Get("method"); !method.IsEmpty()) { + rpcMethod = std::move(method); + } + MessageHandler(message); l_TaskStats.InsertValue(Utility::GetTime(), 1); + + auto total = ch::steady_clock::now() - start; + + Log msg(total >= ch::seconds(5) ? LogWarning : LogDebug, "JsonRpcConnection"); + msg << "Processed JSON-RPC '" << rpcMethod << "' message for identity '" << m_Identity + << "' (took total " << toMilliseconds(total) << "ms"; + + if (cpuBoundDuration >= ch::seconds(1)) { + msg << ", waited " << toMilliseconds(cpuBoundDuration) << "ms on semaphore"; + } + msg << ")."; } catch (const std::exception& ex) { - Log(m_ShuttingDown ? LogDebug : LogWarning, "JsonRpcConnection") - << "Error while processing JSON-RPC message for identity '" << m_Identity - << "': " << DiagnosticInformation(ex); + auto total = ch::steady_clock::now() - start; + + Log msg(m_ShuttingDown ? LogDebug : LogWarning, "JsonRpcConnection"); + msg << "Error while processing JSON-RPC '" << rpcMethod << "' message for identity '" + << m_Identity << "' (took total " << toMilliseconds(total) << "ms"; + + if (cpuBoundDuration >= ch::seconds(1)) { + msg << ", waited " << toMilliseconds(cpuBoundDuration) << "ms on semaphore"; + } + msg << "): " << DiagnosticInformation(ex); break; } @@ -259,10 +298,8 @@ void JsonRpcConnection::Disconnect() } } -void JsonRpcConnection::MessageHandler(const String& jsonString) +void JsonRpcConnection::MessageHandler(const Dictionary::Ptr& message) { - Dictionary::Ptr message = JsonRpc::DecodeMessage(jsonString); - if (m_Endpoint && message->Contains("ts")) { double ts = message->Get("ts"); @@ -281,8 +318,6 @@ void JsonRpcConnection::MessageHandler(const String& jsonString) origin->FromZone = m_Endpoint->GetZone(); else origin->FromZone = Zone::GetByName(message->Get("originZone")); - - m_Endpoint->AddMessageReceived(jsonString.GetLength()); } Value vmethod; diff --git a/lib/remote/jsonrpcconnection.hpp b/lib/remote/jsonrpcconnection.hpp index 3515573bb7..8261bb34c2 100644 --- a/lib/remote/jsonrpcconnection.hpp +++ b/lib/remote/jsonrpcconnection.hpp @@ -89,7 +89,19 @@ class JsonRpcConnection final : public Object void CheckLiveness(boost::asio::yield_context yc); bool ProcessMessage(); - void MessageHandler(const String& jsonString); + + /** + * MessageHandler routes the provided message to its corresponding handler (if any). + * + * This will first verify the timestamp of that RPC message (if any) and subsequently, rejects any message whose + * timestamp is less than the remote log position of the client Endpoint; otherwise, the endpoint's remote log + * position is updated to that timestamp. It is not expected to happen, but any message lacking an RPC method or + * referring to a non-existent one is also discarded. Afterwards, the RPC handler is then called for that message + * and sends it's result back to the sender if the message contains an ID. + * + * @param message Dictionary::Ptr The RPC message you want to process. + */ + void MessageHandler(const Dictionary::Ptr& message); void CertificateRequestResponseHandler(const Dictionary::Ptr& message);