Skip to content

Commit

Permalink
Clean request state after a view change (#936)
Browse files Browse the repository at this point in the history
* Improve pre-execution logs & change a default number of pre-execution threadsImprove pre-execution logs (#940)

* Request state needs to be cleaned after a view change
  • Loading branch information
yuliasherman authored Oct 13, 2020
1 parent f2b10ad commit d8912fc
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 44 deletions.
96 changes: 59 additions & 37 deletions bftengine/src/preprocessor/PreProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ PreProcessor::PreProcessor(shared_ptr<MsgsCommunicator> &msgsCommunicator,
preProcessResultBuffers_.push_back(Sliver(new char[maxPreExecResultSize_], maxPreExecResultSize_));
}
uint64_t numOfThreads = myReplica.getReplicaConfig().preExecConcurrencyLevel;
if (!numOfThreads) numOfThreads = min((uint16_t)thread::hardware_concurrency(), numOfClients_);
if (!numOfThreads) numOfThreads = numOfClients_;
threadPool_.start(numOfThreads);
LOG_INFO(logger(),
KVLOG(firstClientId, numOfClients_, maxPreExecResultSize_, preExecReqStatusCheckPeriodMilli_, numOfThreads));
Expand Down Expand Up @@ -185,13 +185,17 @@ void PreProcessor::onRequestsStatusCheckTimer() {
lock_guard<mutex> lock(clientEntry.second->mutex);
const auto &clientReqStatePtr = clientEntry.second->reqProcessingStatePtr;
if (clientReqStatePtr) {
if (clientReqStatePtr->isReqTimedOut(myReplica_.isCurrentPrimary())) {
if (clientReqStatePtr->isReqTimedOut()) {
preProcessorMetrics_.preProcessRequestTimedout.Get().Inc();
preProcessorMetrics_.preProcPossiblePrimaryFaultDetected.Get().Inc();
// The request could expire do to failed primary replica, let ReplicaImp to address that
// TBD YS: This causes a request to retry in case the primary is OK. Consider passing a kind of NOOP message.
const auto &clientId = clientEntry.first;
const auto &reqSeqNum = clientReqStatePtr->getReqSeqNum();
SCOPED_MDC_CID(clientReqStatePtr->getReqCid());
LOG_INFO(logger(), "Let replica to handle request" << KVLOG(reqSeqNum, clientId));
incomingMsgsStorage_->pushExternalMsg(clientReqStatePtr->buildClientRequestMsg(true));
releaseClientPreProcessRequest(clientEntry.second, clientEntry.first, CANCEL);
releaseClientPreProcessRequest(clientEntry.second, clientId, CANCEL);
} else if (myReplica_.isCurrentPrimary() && clientReqStatePtr->definePreProcessingConsensusResult() == CONTINUE)
resendPreProcessRequest(clientReqStatePtr);
}
Expand All @@ -200,6 +204,7 @@ void PreProcessor::onRequestsStatusCheckTimer() {

bool PreProcessor::checkClientMsgCorrectness(const ClientPreProcessReqMsgUniquePtr &clientReqMsg,
ReqId reqSeqNum) const {
SCOPED_MDC_CID(clientReqMsg->getCid());
if (myReplica_.isCollectingState()) {
LOG_INFO(logger(),
"ClientPreProcessRequestMsg"
Expand Down Expand Up @@ -279,9 +284,11 @@ void PreProcessor::onMessage<ClientPreProcessRequestMsg>(ClientPreProcessRequest
const auto &clientEntry = ongoingRequests_[clientId];
lock_guard<mutex> lock(clientEntry->mutex);
if (clientEntry->reqProcessingStatePtr) {
const ReqId &ongoingReqSeqNum = clientEntry->reqProcessingStatePtr->getReqSeqNum();
const auto &ongoingReqSeqNum = clientEntry->reqProcessingStatePtr->getReqSeqNum();
const auto &ongoingCid = clientEntry->reqProcessingStatePtr->getReqCid();
LOG_DEBUG(logger(),
KVLOG(reqSeqNum, clientId, senderId) << " is ignored:" << KVLOG(ongoingReqSeqNum) << " is in progress");
KVLOG(reqSeqNum, clientId, senderId)
<< " is ignored:" << KVLOG(ongoingReqSeqNum, ongoingCid) << " is in progress");
preProcessorMetrics_.preProcReqIgnored.Get().Inc();
return;
}
Expand Down Expand Up @@ -324,7 +331,11 @@ void PreProcessor::onMessage<PreProcessRequestMsg>(PreProcessRequestMsg *msg) {
const NodeIdType &clientId = preProcessReqMsg->clientId();
LOG_DEBUG(logger(), "Received PreProcessRequestMsg" << KVLOG(reqSeqNum, senderId, clientId));

if (myReplica_.isCurrentPrimary()) return;
if (myReplica_.isCurrentPrimary()) {
LOG_WARN(logger(),
"Ignore PreProcessRequestMsg as current replica is the primary" << KVLOG(reqSeqNum, senderId, clientId));
return;
}

if (!myReplica_.currentViewIsActive()) {
LOG_INFO(logger(), "PreProcessRequestMsg is ignored because current view is inactive," << KVLOG(reqSeqNum));
Expand Down Expand Up @@ -379,7 +390,10 @@ void PreProcessor::onMessage<PreProcessReplyMsg>(PreProcessReplyMsg *msg) {
return;
}
clientEntry->reqProcessingStatePtr->handlePreProcessReplyMsg(preProcessReplyMsg);
if (status == STATUS_REJECT) return;
if (status == STATUS_REJECT) {
LOG_DEBUG(logger(), "Received PreProcessReplyMsg with STATUS_REJECT" << KVLOG(reqSeqNum, senderId, clientId));
return;
}
result = clientEntry->reqProcessingStatePtr->definePreProcessingConsensusResult();
if (result == CONTINUE) resendPreProcessRequest(clientEntry->reqProcessingStatePtr);
}
Expand Down Expand Up @@ -424,7 +438,7 @@ void PreProcessor::cancelPreProcessing(NodeIdType clientId) {
reqSeqNum = clientEntry->reqProcessingStatePtr->getReqSeqNum();
SCOPED_MDC_CID(clientEntry->reqProcessingStatePtr->getReqCid());
releaseClientPreProcessRequest(clientEntry, clientId, CANCEL);
LOG_WARN(logger(), "Pre-processing consensus not reached - abort request" << KVLOG(reqSeqNum, clientId));
LOG_WARN(logger(), "Pre-processing consensus not reached - cancel request" << KVLOG(reqSeqNum, clientId));
}
}
}
Expand All @@ -447,6 +461,7 @@ void PreProcessor::finalizePreProcessing(NodeIdType clientId) {
reqProcessingStatePtr->getPrimaryPreProcessedResult(),
reqProcessingStatePtr->getReqTimeoutMilli(),
cid);
LOG_DEBUG(logger(), "Pass pre-processed request to the replica" << KVLOG(cid, reqSeqNum, clientId));
incomingMsgsStorage_->pushExternalMsg(move(clientRequestMsg));
preProcessorMetrics_.preProcReqSentForFurtherProcessing.Get().Inc();
releaseClientPreProcessRequest(clientEntry, clientId, COMPLETE);
Expand Down Expand Up @@ -527,6 +542,7 @@ void PreProcessor::releaseClientPreProcessRequest(const ClientRequestStateShared
givenReq->releaseResources();
clientEntry->reqProcessingHistory.push_back(move(givenReq));
} else { // No consensus reached => release request
SCOPED_MDC_CID(givenReq->getReqCid());
LOG_INFO(logger(), KVLOG(requestSeqNum, clientId) << " no consensus reached, request released");
givenReq.reset();
}
Expand Down Expand Up @@ -564,6 +580,7 @@ void PreProcessor::handleClientPreProcessRequestByPrimary(PreProcessRequestMsgSh
const auto &reqSeqNum = preProcessRequestMsg->reqSeqNum();
const auto &clientId = preProcessRequestMsg->clientId();
const auto &senderId = preProcessRequestMsg->senderId();
SCOPED_MDC_CID(preProcessRequestMsg->getCid());
LOG_DEBUG(logger(), "Start request processing by a primary replica" << KVLOG(reqSeqNum, clientId, senderId));
sendPreProcessRequestToAllReplicas(preProcessRequestMsg);
// Pre-process the request and calculate a hash of the result
Expand All @@ -578,11 +595,13 @@ void PreProcessor::handleClientPreProcessRequestByNonPrimary(ClientPreProcessReq
const auto &senderId = clientReqMsg->senderId();
const auto &reqTimeoutMilli = clientReqMsg->requestTimeoutMilli();
// Save parameters required for a message sending before being moved to registerRequest
const auto &msgBody = clientReqMsg->body();
const auto &msgType = clientReqMsg->type();
const auto &msgSize = clientReqMsg->size();
const auto msgBody = clientReqMsg->body();
const auto msgType = clientReqMsg->type();
const auto msgSize = clientReqMsg->size();
const auto cid = clientReqMsg->getCid();
// Register a client request message with an empty PreProcessRequestMsg to allow follow up.
if (registerRequest(move(clientReqMsg), PreProcessRequestMsgSharedPtr())) {
SCOPED_MDC_CID(cid);
LOG_DEBUG(
logger(),
"Start request processing by a non-primary replica" << KVLOG(reqSeqNum, clientId, senderId, reqTimeoutMilli));
Expand Down Expand Up @@ -611,9 +630,16 @@ void PreProcessor::sendPreProcessRequestToAllReplicas(const PreProcessRequestMsg
}
}

void PreProcessor::setPreprocessingRightNow(uint16_t clientId, bool set) {
const auto &clientEntry = ongoingRequests_[clientId];
lock_guard<mutex> lock(clientEntry->mutex);
if (clientEntry->reqProcessingStatePtr) clientEntry->reqProcessingStatePtr->setPreprocessingRightNow(set);
}

void PreProcessor::launchAsyncReqPreProcessingJob(const PreProcessRequestMsgSharedPtr &preProcessReqMsg,
bool isPrimary,
bool isRetry) {
setPreprocessingRightNow(preProcessReqMsg->clientId(), true);
auto *preProcessJob = new AsyncPreProcessJob(*this, preProcessReqMsg, isPrimary, isRetry);
threadPool_.add(preProcessJob);
}
Expand Down Expand Up @@ -648,23 +674,26 @@ uint32_t PreProcessor::launchReqPreProcessing(uint16_t clientId,
return resultLen;
}

PreProcessingResult PreProcessor::getPreProcessingConsensusResult(uint16_t clientId) {
ReqId PreProcessor::getOngoingReqIdForClient(uint16_t clientId) {
const auto &clientEntry = ongoingRequests_[clientId];
lock_guard<mutex> lock(clientEntry->mutex);
if (clientEntry->reqProcessingStatePtr)
return clientEntry->reqProcessingStatePtr->definePreProcessingConsensusResult();
return NONE;
if (clientEntry->reqProcessingStatePtr) return clientEntry->reqProcessingStatePtr->getReqSeqNum();
return 0;
}

ReqId PreProcessor::getOngoingReqIdForClient(uint16_t clientId) {
PreProcessingResult PreProcessor::handlePreProcessedReqByPrimaryAndGetConsensusResult(uint16_t clientId,
uint32_t resultBufLen) {
const auto &clientEntry = ongoingRequests_[clientId];
lock_guard<mutex> lock(clientEntry->mutex);
if (clientEntry->reqProcessingStatePtr) return clientEntry->reqProcessingStatePtr->getReqSeqNum();
return 0;
if (clientEntry->reqProcessingStatePtr) {
clientEntry->reqProcessingStatePtr->handlePrimaryPreProcessed(getPreProcessResultBuffer(clientId), resultBufLen);
return clientEntry->reqProcessingStatePtr->definePreProcessingConsensusResult();
}
return NONE;
}

void PreProcessor::handlePreProcessedReqPrimaryRetry(NodeIdType clientId) {
if (getPreProcessingConsensusResult(clientId) == COMPLETE)
void PreProcessor::handlePreProcessedReqPrimaryRetry(NodeIdType clientId, uint32_t resultBufLen) {
if (handlePreProcessedReqByPrimaryAndGetConsensusResult(clientId, resultBufLen) == COMPLETE)
finalizePreProcessing(clientId);
else
cancelPreProcessing(clientId);
Expand All @@ -673,49 +702,42 @@ void PreProcessor::handlePreProcessedReqPrimaryRetry(NodeIdType clientId) {
void PreProcessor::handlePreProcessedReqByPrimary(const PreProcessRequestMsgSharedPtr &preProcessReqMsg,
uint16_t clientId,
uint32_t resultBufLen) {
const auto &clientEntry = ongoingRequests_[clientId];
string cid;
PreProcessingResult result = NONE;
{
lock_guard<mutex> lock(clientEntry->mutex);
if (clientEntry->reqProcessingStatePtr) {
clientEntry->reqProcessingStatePtr->handlePrimaryPreProcessed(getPreProcessResultBuffer(clientId), resultBufLen);
result = clientEntry->reqProcessingStatePtr->definePreProcessingConsensusResult();
cid = clientEntry->reqProcessingStatePtr->getPreProcessRequest()->getCid();
}
}
if (result != NONE) handlePreProcessReplyMsg(cid, result, clientId, preProcessReqMsg->reqSeqNum());
const PreProcessingResult result = handlePreProcessedReqByPrimaryAndGetConsensusResult(clientId, resultBufLen);
if (result != NONE)
handlePreProcessReplyMsg(preProcessReqMsg->getCid(), result, clientId, preProcessReqMsg->reqSeqNum());
}

void PreProcessor::handlePreProcessedReqByNonPrimary(uint16_t clientId,
ReqId reqSeqNum,
uint32_t resBufLen,
const std::string &cid) {
setPreprocessingRightNow(clientId, false);
auto replyMsg = make_shared<PreProcessReplyMsg>(sigManager_, myReplicaId_, clientId, reqSeqNum);
replyMsg->setupMsgBody(getPreProcessResultBuffer(clientId), resBufLen, cid, STATUS_GOOD);
// Release the request before sending a reply to the primary to be able accepting new messages
releaseClientPreProcessRequestSafe(clientId, COMPLETE);
sendMsg(replyMsg->body(), myReplica_.currentPrimary(), replyMsg->type(), replyMsg->size());
SCOPED_MDC_CID(cid);
LOG_DEBUG(
logger(),
"Sent PreProcessReplyMsg with" << KVLOG(reqSeqNum) << " to the primary replica: " << myReplica_.currentPrimary());
LOG_DEBUG(logger(),
"Sent PreProcessReplyMsg with" << KVLOG(reqSeqNum, replyMsg->status())
<< " to the primary replica: " << myReplica_.currentPrimary());
}

void PreProcessor::handleReqPreProcessingJob(const PreProcessRequestMsgSharedPtr &preProcessReqMsg,
bool isPrimary,
bool isRetry) {
const string cid = preProcessReqMsg->getCid();
SCOPED_MDC_CID(cid);
const uint16_t &clientId = preProcessReqMsg->clientId();
const SeqNum &reqSeqNum = preProcessReqMsg->reqSeqNum();
const auto &span_context = preProcessReqMsg->spanContext<PreProcessRequestMsgSharedPtr::element_type>();
uint32_t actualResultBufLen = launchReqPreProcessing(
clientId, cid, reqSeqNum, preProcessReqMsg->requestLength(), preProcessReqMsg->requestBuf(), span_context);
if (isPrimary && isRetry) {
handlePreProcessedReqPrimaryRetry(clientId);
handlePreProcessedReqPrimaryRetry(clientId, actualResultBufLen);
return;
}
SCOPED_MDC_CID(cid);
LOG_DEBUG(logger(), "Request pre-processed" << KVLOG(isPrimary, reqSeqNum, clientId));
if (isPrimary)
handlePreProcessedReqByPrimary(preProcessReqMsg, clientId, actualResultBufLen);
else
Expand Down
5 changes: 3 additions & 2 deletions bftengine/src/preprocessor/PreProcessor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,10 +121,11 @@ class PreProcessor {
void handlePreProcessedReqByPrimary(const PreProcessRequestMsgSharedPtr &preProcessReqMsg,
uint16_t clientId,
uint32_t resultBufLen);
void handlePreProcessedReqPrimaryRetry(NodeIdType clientId);
void handlePreProcessedReqPrimaryRetry(NodeIdType clientId, uint32_t resultBufLen);
void finalizePreProcessing(NodeIdType clientId);
void cancelPreProcessing(NodeIdType clientId);
PreProcessingResult getPreProcessingConsensusResult(uint16_t clientId);
void setPreprocessingRightNow(uint16_t clientId, bool set);
PreProcessingResult handlePreProcessedReqByPrimaryAndGetConsensusResult(uint16_t clientId, uint32_t resultBufLen);
void handlePreProcessReplyMsg(const std::string &cid,
PreProcessingResult result,
NodeIdType clientId,
Expand Down
10 changes: 6 additions & 4 deletions bftengine/src/preprocessor/RequestProcessingState.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ RequestProcessingState::RequestProcessingState(uint16_t numOfReplicas,
entryTime_(getMonotonicTimeMilli()),
clientPreProcessReqMsg_(move(clientReqMsg)),
preProcessRequestMsg_(preProcessRequestMsg) {
SCOPED_MDC_CID(cid);
LOG_DEBUG(logger(), "Created RequestProcessingState with" << KVLOG(reqSeqNum, numOfReplicas_));
}

Expand All @@ -56,6 +57,7 @@ void RequestProcessingState::setPreProcessRequest(PreProcessRequestMsgSharedPtr
}

void RequestProcessingState::handlePrimaryPreProcessed(const char *preProcessResult, uint32_t preProcessResultLen) {
preprocessingRightNow_ = false;
primaryPreProcessResult_ = preProcessResult;
primaryPreProcessResultLen_ = preProcessResultLen;
primaryPreProcessResultHash_ =
Expand Down Expand Up @@ -115,13 +117,13 @@ auto RequestProcessingState::calculateMaxNbrOfEqualHashes(uint16_t &maxNumOfEqua
return itOfChosenHash;
}

bool RequestProcessingState::isReqTimedOut(bool isPrimary) const {
bool RequestProcessingState::isReqTimedOut() const {
if (!clientPreProcessReqMsg_) return false;

SCOPED_MDC_CID(cid_);
if (!isPrimary || primaryPreProcessResultLen_ != 0) {
// On the primary: check request timeout once an asynchronous pre-execution completed (to not abort the execution
// thread)
LOG_DEBUG(logger(), KVLOG(preprocessingRightNow_));
if (!preprocessingRightNow_) {
// Check request timeout once an asynchronous pre-execution completed (to not abort the execution thread)
auto reqProcessingTime = getMonotonicTimeMilli() - entryTime_;
if (reqProcessingTime > clientPreProcessReqMsg_->requestTimeoutMilli()) {
LOG_WARN(logger(),
Expand Down
4 changes: 3 additions & 1 deletion bftengine/src/preprocessor/RequestProcessingState.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class RequestProcessingState {
PreProcessingResult definePreProcessingConsensusResult();
const char* getPrimaryPreProcessedResult() const { return primaryPreProcessResult_; }
uint32_t getPrimaryPreProcessedResultLen() const { return primaryPreProcessResultLen_; }
bool isReqTimedOut(bool isPrimary) const;
bool isReqTimedOut() const;
uint64_t getReqTimeoutMilli() const {
return clientPreProcessReqMsg_ ? clientPreProcessReqMsg_->requestTimeoutMilli() : 0;
}
Expand All @@ -54,6 +54,7 @@ class RequestProcessingState {
void releaseResources();
ReplicaIdsList getRejectedReplicasList() { return rejectedReplicaIds_; }
void resetRejectedReplicasList() { rejectedReplicaIds_.clear(); }
void setPreprocessingRightNow(bool set) { preprocessingRightNow_ = set; }

static void init(uint16_t numOfRequiredReplies);

Expand Down Expand Up @@ -89,6 +90,7 @@ class RequestProcessingState {
// Maps result hash to the number of equal hashes
std::map<concord::util::SHA3_256::Digest, int> preProcessingResultHashes_;
bool retrying_ = false;
bool preprocessingRightNow_ = false;
};

typedef std::unique_ptr<RequestProcessingState> RequestProcessingStateUniquePtr;
Expand Down

0 comments on commit d8912fc

Please sign in to comment.