From 9075a4bdcf223f69e1cb1e429dd5fa9c9d88cc18 Mon Sep 17 00:00:00 2001
From: dorjesinpo <129227380+dorjesinpo@users.noreply.github.com>
Date: Thu, 19 Sep 2024 12:40:54 -0400
Subject: [PATCH] Feat[BMQ, MQB]: shutdown v2, optimizing shutdown logic (#399)
* Shutdown V2
Signed-off-by: dorjesinpo <129227380+dorjesinpo@users.noreply.github.com>
* cleaning
Signed-off-by: dorjesinpo <129227380+dorjesinpo@users.noreply.github.com>
* Addressing review
Signed-off-by: dorjesinpo <129227380+dorjesinpo@users.noreply.github.com>
---------
Signed-off-by: dorjesinpo <129227380+dorjesinpo@users.noreply.github.com>
---
src/groups/bmq/bmqp/bmqp_ctrlmsg.xsd | 1 +
src/groups/bmq/bmqp/bmqp_ctrlmsg_messages.cpp | 23 +-
src/groups/bmq/bmqp/bmqp_ctrlmsg_messages.h | 45 ++-
src/groups/bmq/bmqp/bmqp_protocol.cpp | 2 +
src/groups/bmq/bmqp/bmqp_protocol.h | 2 +
src/groups/bmq/bmqp/bmqp_requestmanager.h | 4 +
.../bmq/bmqp/bmqp_schemaeventbuilder.cpp | 2 +-
src/groups/mqb/mqba/mqba_adminsession.cpp | 4 +-
src/groups/mqb/mqba/mqba_adminsession.h | 6 +-
src/groups/mqb/mqba/mqba_application.cpp | 128 ++++++-
src/groups/mqb/mqba/mqba_application.h | 8 +
src/groups/mqb/mqba/mqba_clientsession.cpp | 300 ++++++++++-----
src/groups/mqb/mqba/mqba_clientsession.h | 43 ++-
.../mqb/mqba/mqba_sessionnegotiator.cpp | 4 +-
src/groups/mqb/mqbblp/mqbblp_cluster.cpp | 173 +++++----
src/groups/mqb/mqbblp/mqbblp_cluster.h | 18 +-
.../mqb/mqbblp/mqbblp_clustercatalog.cpp | 10 +-
src/groups/mqb/mqbblp/mqbblp_clustercatalog.h | 33 ++
.../mqb/mqbblp/mqbblp_clusterorchestrator.cpp | 20 +-
src/groups/mqb/mqbblp/mqbblp_clusterproxy.cpp | 137 ++++---
src/groups/mqb/mqbblp/mqbblp_clusterproxy.h | 24 +-
.../mqb/mqbblp/mqbblp_clusterqueuehelper.cpp | 345 +++++++++++++++++-
.../mqb/mqbblp/mqbblp_clusterqueuehelper.h | 29 +-
src/groups/mqb/mqbblp/mqbblp_queue.cpp | 15 +-
src/groups/mqb/mqbblp/mqbblp_queue.h | 9 +-
.../mqb/mqbblp/mqbblp_queueengineutil.cpp | 8 +-
.../mqb/mqbblp/mqbblp_queueengineutil.h | 2 +-
src/groups/mqb/mqbblp/mqbblp_queuehandle.cpp | 8 +-
.../mqb/mqbblp/mqbblp_queuehandlecatalog.cpp | 22 +-
.../mqb/mqbblp/mqbblp_queuehandlecatalog.h | 7 +-
src/groups/mqb/mqbblp/mqbblp_queuestate.h | 4 +-
.../mqb/mqbblp/mqbblp_relayqueueengine.cpp | 17 +-
.../mqb/mqbblp/mqbblp_relayqueueengine.h | 6 +-
src/groups/mqb/mqbblp/mqbblp_remotequeue.cpp | 24 +-
src/groups/mqb/mqbblp/mqbblp_remotequeue.h | 9 +-
.../mqb/mqbblp/mqbblp_rootqueueengine.cpp | 14 +-
.../mqb/mqbblp/mqbblp_rootqueueengine.h | 6 +-
src/groups/mqb/mqbblp/mqbblp_routers.h | 2 +-
.../mqb/mqbc/mqbc_clusterstateledgerutil.cpp | 2 +-
src/groups/mqb/mqbi/mqbi_cluster.h | 8 +-
src/groups/mqb/mqbi/mqbi_queue.h | 9 +-
src/groups/mqb/mqbi/mqbi_queueengine.h | 6 +-
src/groups/mqb/mqbmock/mqbmock_cluster.cpp | 3 +-
src/groups/mqb/mqbmock/mqbmock_cluster.h | 15 +-
src/groups/mqb/mqbmock/mqbmock_queue.cpp | 8 +-
src/groups/mqb/mqbmock/mqbmock_queue.h | 9 +-
.../mqb/mqbmock/mqbmock_queueengine.cpp | 2 +-
src/groups/mqb/mqbmock/mqbmock_queueengine.h | 6 +-
src/groups/mqb/mqbnet/mqbnet_dummysession.cpp | 3 +-
src/groups/mqb/mqbnet/mqbnet_dummysession.h | 8 +-
.../mqb/mqbnet/mqbnet_multirequestmanager.h | 9 +
src/groups/mqb/mqbnet/mqbnet_session.h | 8 +-
src/groups/mqb/mqbnet/mqbnet_session.t.cpp | 6 +-
.../test_graceful_shutdown.py | 88 +----
src/integration-tests/test_maxunconfirmed.py | 2 +-
55 files changed, 1304 insertions(+), 402 deletions(-)
diff --git a/src/groups/bmq/bmqp/bmqp_ctrlmsg.xsd b/src/groups/bmq/bmqp/bmqp_ctrlmsg.xsd
index c15c2ed12..7f4870891 100644
--- a/src/groups/bmq/bmqp/bmqp_ctrlmsg.xsd
+++ b/src/groups/bmq/bmqp/bmqp_ctrlmsg.xsd
@@ -1404,6 +1404,7 @@
+
diff --git a/src/groups/bmq/bmqp/bmqp_ctrlmsg_messages.cpp b/src/groups/bmq/bmqp/bmqp_ctrlmsg_messages.cpp
index c9d9a70c0..fa0952074 100644
--- a/src/groups/bmq/bmqp/bmqp_ctrlmsg_messages.cpp
+++ b/src/groups/bmq/bmqp/bmqp_ctrlmsg_messages.cpp
@@ -4017,19 +4017,26 @@ const char* StatusCategory::toString(StatusCategory::Value value)
const char StopRequest::CLASS_NAME[] = "StopRequest";
+const int StopRequest::DEFAULT_INITIALIZER_VERSION = 1;
+
const bdlat_AttributeInfo StopRequest::ATTRIBUTE_INFO_ARRAY[] = {
{ATTRIBUTE_ID_CLUSTER_NAME,
"clusterName",
sizeof("clusterName") - 1,
"",
- bdlat_FormattingMode::e_TEXT}};
+ bdlat_FormattingMode::e_TEXT},
+ {ATTRIBUTE_ID_VERSION,
+ "version",
+ sizeof("version") - 1,
+ "",
+ bdlat_FormattingMode::e_DEC}};
// CLASS METHODS
const bdlat_AttributeInfo* StopRequest::lookupAttributeInfo(const char* name,
int nameLength)
{
- for (int i = 0; i < 1; ++i) {
+ for (int i = 0; i < 2; ++i) {
const bdlat_AttributeInfo& attributeInfo =
StopRequest::ATTRIBUTE_INFO_ARRAY[i];
@@ -4047,6 +4054,8 @@ const bdlat_AttributeInfo* StopRequest::lookupAttributeInfo(int id)
switch (id) {
case ATTRIBUTE_ID_CLUSTER_NAME:
return &ATTRIBUTE_INFO_ARRAY[ATTRIBUTE_INDEX_CLUSTER_NAME];
+ case ATTRIBUTE_ID_VERSION:
+ return &ATTRIBUTE_INFO_ARRAY[ATTRIBUTE_INDEX_VERSION];
default: return 0;
}
}
@@ -4055,25 +4064,29 @@ const bdlat_AttributeInfo* StopRequest::lookupAttributeInfo(int id)
StopRequest::StopRequest(bslma::Allocator* basicAllocator)
: d_clusterName(basicAllocator)
+, d_version(DEFAULT_INITIALIZER_VERSION)
{
}
StopRequest::StopRequest(const StopRequest& original,
bslma::Allocator* basicAllocator)
: d_clusterName(original.d_clusterName, basicAllocator)
+, d_version(original.d_version)
{
}
#if defined(BSLS_COMPILERFEATURES_SUPPORT_RVALUE_REFERENCES) && \
defined(BSLS_COMPILERFEATURES_SUPPORT_NOEXCEPT)
StopRequest::StopRequest(StopRequest&& original) noexcept
-: d_clusterName(bsl::move(original.d_clusterName))
+: d_clusterName(bsl::move(original.d_clusterName)),
+ d_version(bsl::move(original.d_version))
{
}
StopRequest::StopRequest(StopRequest&& original,
bslma::Allocator* basicAllocator)
: d_clusterName(bsl::move(original.d_clusterName), basicAllocator)
+, d_version(bsl::move(original.d_version))
{
}
#endif
@@ -4088,6 +4101,7 @@ StopRequest& StopRequest::operator=(const StopRequest& rhs)
{
if (this != &rhs) {
d_clusterName = rhs.d_clusterName;
+ d_version = rhs.d_version;
}
return *this;
@@ -4099,6 +4113,7 @@ StopRequest& StopRequest::operator=(StopRequest&& rhs)
{
if (this != &rhs) {
d_clusterName = bsl::move(rhs.d_clusterName);
+ d_version = bsl::move(rhs.d_version);
}
return *this;
@@ -4108,6 +4123,7 @@ StopRequest& StopRequest::operator=(StopRequest&& rhs)
void StopRequest::reset()
{
bdlat_ValueTypeFunctions::reset(&d_clusterName);
+ d_version = DEFAULT_INITIALIZER_VERSION;
}
// ACCESSORS
@@ -4118,6 +4134,7 @@ StopRequest::print(bsl::ostream& stream, int level, int spacesPerLevel) const
bslim::Printer printer(&stream, level, spacesPerLevel);
printer.start();
printer.printAttribute("clusterName", this->clusterName());
+ printer.printAttribute("version", this->version());
printer.end();
return stream;
}
diff --git a/src/groups/bmq/bmqp/bmqp_ctrlmsg_messages.h b/src/groups/bmq/bmqp/bmqp_ctrlmsg_messages.h
index 012e7b623..e3e91e6f2 100644
--- a/src/groups/bmq/bmqp/bmqp_ctrlmsg_messages.h
+++ b/src/groups/bmq/bmqp/bmqp_ctrlmsg_messages.h
@@ -7429,18 +7429,21 @@ namespace bmqp_ctrlmsg {
class StopRequest {
// INSTANCE DATA
bsl::string d_clusterName;
+ int d_version;
public:
// TYPES
- enum { ATTRIBUTE_ID_CLUSTER_NAME = 0 };
+ enum { ATTRIBUTE_ID_CLUSTER_NAME = 0, ATTRIBUTE_ID_VERSION = 1 };
- enum { NUM_ATTRIBUTES = 1 };
+ enum { NUM_ATTRIBUTES = 2 };
- enum { ATTRIBUTE_INDEX_CLUSTER_NAME = 0 };
+ enum { ATTRIBUTE_INDEX_CLUSTER_NAME = 0, ATTRIBUTE_INDEX_VERSION = 1 };
// CONSTANTS
static const char CLASS_NAME[];
+ static const int DEFAULT_INITIALIZER_VERSION;
+
static const bdlat_AttributeInfo ATTRIBUTE_INFO_ARRAY[];
public:
@@ -7540,6 +7543,10 @@ class StopRequest {
/// object.
bsl::string& clusterName();
+ /// Return a reference to the non-modifiable "Version" attribute of this
+ /// object.
+ int& version();
+
// ACCESSORS
/// Format this object to the specified output `stream` at the
@@ -7587,6 +7594,10 @@ class StopRequest {
/// Return a reference to the non-modifiable "ClusterName" attribute of
/// this object.
const bsl::string& clusterName() const;
+
+ /// Return a reference to the non-modifiable "Version" attribute of this
+ /// object.
+ int version() const;
};
// FREE OPERATORS
@@ -27150,6 +27161,12 @@ int StopRequest::manipulateAttributes(MANIPULATOR& manipulator)
return ret;
}
+ ret = manipulator(&d_version,
+ ATTRIBUTE_INFO_ARRAY[ATTRIBUTE_INDEX_VERSION]);
+ if (ret) {
+ return ret;
+ }
+
return ret;
}
@@ -27163,6 +27180,10 @@ int StopRequest::manipulateAttribute(MANIPULATOR& manipulator, int id)
return manipulator(&d_clusterName,
ATTRIBUTE_INFO_ARRAY[ATTRIBUTE_INDEX_CLUSTER_NAME]);
}
+ case ATTRIBUTE_ID_VERSION: {
+ return manipulator(&d_version,
+ ATTRIBUTE_INFO_ARRAY[ATTRIBUTE_INDEX_VERSION]);
+ }
default: return NOT_FOUND;
}
}
@@ -27188,6 +27209,11 @@ inline bsl::string& StopRequest::clusterName()
return d_clusterName;
}
+inline int& StopRequest::version()
+{
+ return d_version;
+}
+
// ACCESSORS
template
int StopRequest::accessAttributes(ACCESSOR& accessor) const
@@ -27200,6 +27226,10 @@ int StopRequest::accessAttributes(ACCESSOR& accessor) const
return ret;
}
+ ret = accessor(d_version, ATTRIBUTE_INFO_ARRAY[ATTRIBUTE_INDEX_VERSION]);
+ if (ret) {
+ return ret;
+ }
return ret;
}
@@ -27213,6 +27243,10 @@ int StopRequest::accessAttribute(ACCESSOR& accessor, int id) const
return accessor(d_clusterName,
ATTRIBUTE_INFO_ARRAY[ATTRIBUTE_INDEX_CLUSTER_NAME]);
}
+ case ATTRIBUTE_ID_VERSION: {
+ return accessor(d_version,
+ ATTRIBUTE_INFO_ARRAY[ATTRIBUTE_INDEX_VERSION]);
+ }
default: return NOT_FOUND;
}
}
@@ -27238,6 +27272,11 @@ inline const bsl::string& StopRequest::clusterName() const
return d_clusterName;
}
+inline int StopRequest::version() const
+{
+ return d_version;
+}
+
template
void hashAppend(HASH_ALGORITHM& hashAlg,
const bmqp_ctrlmsg::StopRequest& object)
diff --git a/src/groups/bmq/bmqp/bmqp_protocol.cpp b/src/groups/bmq/bmqp/bmqp_protocol.cpp
index 1aff4bf4c..a1b521e51 100644
--- a/src/groups/bmq/bmqp/bmqp_protocol.cpp
+++ b/src/groups/bmq/bmqp/bmqp_protocol.cpp
@@ -264,6 +264,8 @@ const char HighAvailabilityFeatures::k_BROADCAST_TO_PROXIES[] =
"BROADCAST_TO_PROXIES";
const char HighAvailabilityFeatures::k_GRACEFUL_SHUTDOWN[] =
"GRACEFUL_SHUTDOWN";
+const char HighAvailabilityFeatures::k_GRACEFUL_SHUTDOWN_V2[] =
+ "GRACEFUL_SHUTDOWN_V2";
// --------------------------------
// struct MessagePropertiesFeatures
diff --git a/src/groups/bmq/bmqp/bmqp_protocol.h b/src/groups/bmq/bmqp/bmqp_protocol.h
index 7fa6f8faf..3424ddbc7 100644
--- a/src/groups/bmq/bmqp/bmqp_protocol.h
+++ b/src/groups/bmq/bmqp/bmqp_protocol.h
@@ -633,6 +633,8 @@ struct HighAvailabilityFeatures {
static const char k_BROADCAST_TO_PROXIES[];
static const char k_GRACEFUL_SHUTDOWN[];
+
+ static const char k_GRACEFUL_SHUTDOWN_V2[];
};
/// This struct defines feature names related to MessageProperties
diff --git a/src/groups/bmq/bmqp/bmqp_requestmanager.h b/src/groups/bmq/bmqp/bmqp_requestmanager.h
index 38d89b5b1..14bd5ae2a 100644
--- a/src/groups/bmq/bmqp/bmqp_requestmanager.h
+++ b/src/groups/bmq/bmqp/bmqp_requestmanager.h
@@ -1091,6 +1091,10 @@ void RequestManager::onRequestTimeout(int requestId)
// Explicitly invalidate the timeout since we processed it
request->d_timeoutSchedulerHandle.release();
+
+ if (!d_lateResponseMode) {
+ d_requests.erase(it);
+ }
} // close guard scope
BALL_LOG_ERROR << "Request with '" << request->nodeDescription()
diff --git a/src/groups/bmq/bmqp/bmqp_schemaeventbuilder.cpp b/src/groups/bmq/bmqp/bmqp_schemaeventbuilder.cpp
index 03ef9b8bd..42764f659 100644
--- a/src/groups/bmq/bmqp/bmqp_schemaeventbuilder.cpp
+++ b/src/groups/bmq/bmqp/bmqp_schemaeventbuilder.cpp
@@ -47,7 +47,7 @@ EncodingType::Enum SchemaEventBuilderUtil::bestEncodingSupported(
return EncodingType::e_BER; // RETURN
}
- // If remote suppports BER, return BER
+ // If remote supports BER, return BER
if (bsl::find(encodingsSupported.cbegin(),
encodingsSupported.cend(),
bsl::string(EncodingFeature::k_ENCODING_BER)) !=
diff --git a/src/groups/mqb/mqba/mqba_adminsession.cpp b/src/groups/mqb/mqba/mqba_adminsession.cpp
index a52cb7b2c..765072188 100644
--- a/src/groups/mqb/mqba/mqba_adminsession.cpp
+++ b/src/groups/mqb/mqba/mqba_adminsession.cpp
@@ -419,10 +419,12 @@ void AdminSession::tearDown(const bsl::shared_ptr& session,
}
void AdminSession::initiateShutdown(const ShutdownCb& callback,
- const bsls::TimeInterval& timeout)
+ const bsls::TimeInterval& timeout,
+ bool supportShutdownV2)
{
// executed by the *ANY* thread
(void)timeout;
+ (void)supportShutdownV2;
dispatcher()->execute(
bdlf::BindUtil::bind(&AdminSession::initiateShutdownDispatched,
diff --git a/src/groups/mqb/mqba/mqba_adminsession.h b/src/groups/mqb/mqba/mqba_adminsession.h
index 681065bcd..ca85b0b2b 100644
--- a/src/groups/mqb/mqba/mqba_adminsession.h
+++ b/src/groups/mqb/mqba/mqba_adminsession.h
@@ -262,9 +262,13 @@ class AdminSession : public mqbnet::Session, public mqbi::DispatcherClient {
/// Initiate the shutdown of the session and invoke the specified
/// `callback` upon completion of (asynchronous) shutdown sequence or
/// if the specified `timeout` is expired.
+ /// The optional (temporary) specified 'supportShutdownV2' indicates
+ /// shutdown V2 logic which is not applicable to `AdminSession`
+ /// implementation.
void
initiateShutdown(const ShutdownCb& callback,
- const bsls::TimeInterval& timeout) BSLS_KEYWORD_OVERRIDE;
+ const bsls::TimeInterval& timeout,
+ bool supportShutdownV2 = false) BSLS_KEYWORD_OVERRIDE;
/// Make the session abandon any work it has.
void invalidate() BSLS_KEYWORD_OVERRIDE;
diff --git a/src/groups/mqb/mqba/mqba_application.cpp b/src/groups/mqb/mqba/mqba_application.cpp
index cbb4c26b3..55ff3e51b 100644
--- a/src/groups/mqb/mqba/mqba_application.cpp
+++ b/src/groups/mqb/mqba/mqba_application.cpp
@@ -47,6 +47,7 @@
#include
#include
#include
+#include
// BDE
#include
@@ -76,6 +77,7 @@ namespace mqba {
namespace {
const int k_BLOBBUFFER_SIZE = 4 * 1024;
const int k_BLOB_POOL_GROWTH_STRATEGY = 1024;
+const bsls::Types::Int64 k_STOP_REQUEST_TIMEOUT_MS = 5000;
/// Create a new blob at the specified `arena` address, using the specified
/// `bufferFactory` and `allocator`.
@@ -461,6 +463,16 @@ void Application::stop()
d_transportManager_mp->initiateShutdown();
BALL_LOG_INFO << "Stopped listening for new connections.";
+ bool supportShutdownV2 = initiateShutdown();
+
+ if (supportShutdownV2) {
+ BALL_LOG_INFO << ": Executing GRACEFUL_SHUTDOWN_V2";
+ }
+ else {
+ BALL_LOG_INFO << ": Peers do not support "
+ << "GRACEFUL_SHUTDOWN_V2. Retreat to V1";
+ }
+
// For each cluster in cluster catalog, inform peers about this shutdown.
int count = d_clusterCatalog_mp->count();
bslmt::Latch latch(count);
@@ -471,7 +483,8 @@ void Application::stop()
count > 0;
++clusterIt, --count) {
clusterIt.cluster()->initiateShutdown(
- bdlf::BindUtil::bind(&bslmt::Latch::arrive, &latch));
+ bdlf::BindUtil::bind(&bslmt::Latch::arrive, &latch),
+ supportShutdownV2);
}
latch.wait();
@@ -491,19 +504,15 @@ void Application::stop()
// STOP everything.
- // Note that we must do an out of order stop/destroy with respect to the
- // 'DomainManager' and 'ClusterCatalog' because it appears that the domain
- // manager contains objects that are held and owned by 'ClusterCatalog', so
- // it is believed that the relationship is not properly done.
- //
// Note that clusterCatalog must be stopped before transport manager
// because transportManager.stop() blocks until all sessions have been
// destroyed, and above code proactively closes only the clientOrProxy
// sessions; clusterNode ones are being destroyed by the clusterCatalog
// calling stop on each cluster.
- STOP_OBJ(d_domainManager_mp, "DomainManager");
+
STOP_OBJ(d_clusterCatalog_mp, "ClusterCatalog");
STOP_OBJ(d_transportManager_mp, "TransportManager");
+ STOP_OBJ(d_domainManager_mp, "DomainManager");
STOP_OBJ(d_dispatcher_mp, "Dispatcher");
STOP_OBJ(d_configProvider_mp, "ConfigProvider");
STOP_OBJ(d_statController_mp, "StatController");
@@ -524,6 +533,111 @@ void Application::stop()
#undef STOP_OBJ
}
+bool Application::initiateShutdown()
+{
+ typedef bsl::vector > Sessions;
+
+ // Send a StopRequest to all connected cluster nodes and brokers
+ Sessions brokers(d_allocator_p);
+ Sessions clients(d_allocator_p);
+
+ for (mqbnet::TransportManagerIterator sessIt(d_transportManager_mp.get());
+ sessIt;
+ ++sessIt) {
+ bsl::shared_ptr sessionSp = sessIt.session().lock();
+ if (!sessionSp) {
+ continue; // CONTINUE
+ }
+
+ const bmqp_ctrlmsg::NegotiationMessage& negoMsg =
+ sessionSp->negotiationMessage();
+
+ const bmqp_ctrlmsg::ClientIdentity& peerIdentity =
+ negoMsg.isClientIdentityValue()
+ ? negoMsg.clientIdentity()
+ : negoMsg.brokerResponse().brokerIdentity();
+
+ bool isBroker = false;
+ if (mqbnet::ClusterUtil::isClientOrProxy(negoMsg)) {
+ clients.push_back(sessionSp);
+ if (!negoMsg.clientIdentity().clusterName().empty()) {
+ isBroker = true;
+ }
+ }
+ else {
+ isBroker = true;
+ }
+ if (isBroker) {
+ // Node or Proxy
+ // Expect all proxies and nodes support this feature.
+ if (!bmqp::ProtocolUtil::hasFeature(
+ bmqp::HighAvailabilityFeatures::k_FIELD_NAME,
+ bmqp::HighAvailabilityFeatures::k_GRACEFUL_SHUTDOWN,
+ peerIdentity.features())) {
+ BALL_LOG_ERROR << ": Peer doesn't support "
+ << "GRACEFUL_SHUTDOWN. Skip sending stopRequest"
+ << " to [" << peerIdentity << "]";
+ continue; // CONTINUE
+ }
+ if (!bmqp::ProtocolUtil::hasFeature(
+ bmqp::HighAvailabilityFeatures::k_FIELD_NAME,
+ bmqp::HighAvailabilityFeatures::k_GRACEFUL_SHUTDOWN_V2,
+ peerIdentity.features())) {
+ // Abandon the attempt to shutdown V2
+ return false; // RETURN
+ }
+ brokers.push_back(sessionSp);
+ }
+ }
+
+ bslmt::Latch latch(clients.size() + 1);
+ // The 'StopRequestManagerType::sendRequest' always calls 'd_responseCb'.
+
+ mqbblp::ClusterCatalog::StopRequestManagerType::RequestContextSp
+ contextSp =
+ d_clusterCatalog_mp->stopRequestManger().createRequestContext();
+
+ bmqp_ctrlmsg::StopRequest& request = contextSp->request()
+ .choice()
+ .makeClusterMessage()
+ .choice()
+ .makeStopRequest();
+
+ request.version() = 2;
+
+ bsls::TimeInterval shutdownTimeout;
+
+ shutdownTimeout.setTotalMilliseconds(k_STOP_REQUEST_TIMEOUT_MS);
+
+ contextSp->setDestinationNodes(brokers);
+
+ contextSp->setResponseCb(
+ bdlf::BindUtil::bind(&bslmt::Latch::arrive, &latch));
+
+ BALL_LOG_INFO << "Sending StopRequest V2 to " << brokers.size()
+ << " brokers; timeout is " << shutdownTimeout << " ms";
+
+ d_clusterCatalog_mp->stopRequestManger().sendRequest(contextSp,
+ shutdownTimeout);
+
+ BALL_LOG_INFO << "Shutting down " << clients.size()
+ << " clients; timeout is " << shutdownTimeout << " ms";
+
+ for (Sessions::const_iterator cit = clients.begin(); cit != clients.end();
+ ++cit) {
+ (*cit)->initiateShutdown(bdlf::BindUtil::bind(&bslmt::Latch::arrive,
+ &latch),
+ shutdownTimeout,
+ true);
+ }
+
+ // Need to wait for peers to update this node status to guarantee no new
+ // clusters.
+ latch.wait();
+
+ return true;
+}
+
mqbi::Cluster*
Application::getRelevantCluster(bsl::ostream& errorDescription,
const mqbcmd::Command& command) const
diff --git a/src/groups/mqb/mqba/mqba_application.h b/src/groups/mqb/mqba/mqba_application.h
index 1adb3c3f2..850af282b 100644
--- a/src/groups/mqb/mqba/mqba_application.h
+++ b/src/groups/mqb/mqba/mqba_application.h
@@ -169,6 +169,14 @@ class Application {
/// Pendant operation of the `oneTimeInit` one.
void oneTimeShutdown();
+ /// Attempt to execute graceful shutdown logic v2.
+ ///
+ /// If any node or proxy does not support the v2 graceful shutdown logic,
+ /// do not perform any shutdown actions and return `false`. Otherwise,
+ /// send v2 shutdown requests to all nodes, shutdown clients and proxies,
+ /// and return `true`.
+ bool initiateShutdown();
+
private:
// NOT IMPLEMENTED
Application(const Application& other) BSLS_CPP11_DELETED;
diff --git a/src/groups/mqb/mqba/mqba_clientsession.cpp b/src/groups/mqb/mqba/mqba_clientsession.cpp
index 743ba7dc7..e630670d1 100644
--- a/src/groups/mqb/mqba/mqba_clientsession.cpp
+++ b/src/groups/mqb/mqba/mqba_clientsession.cpp
@@ -370,10 +370,21 @@ void ClientSession::sendErrorResponse(
<< " failure response: " << response;
// Send the response
- sendPacket(d_state.d_schemaEventBuilder.blob(), true);
+ sendPacketDispatched(d_state.d_schemaEventBuilder.blob(), true);
}
void ClientSession::sendPacket(const bdlbb::Blob& blob, bool flushBuilders)
+{
+ dispatcher()->execute(
+ bdlf::BindUtil::bind(&ClientSession::sendPacketDispatched,
+ this,
+ blob,
+ flushBuilders),
+ this);
+}
+
+void ClientSession::sendPacketDispatched(const bdlbb::Blob& blob,
+ bool flushBuilders)
{
// executed by the *CLIENT* dispatcher thread
@@ -620,12 +631,6 @@ void ClientSession::tearDownImpl(bslmt::Semaphore* semaphore,
return; // RETURN
}
- // Set up the 'd_operationState' to indicate that the channel is dying and
- // we should not use it anymore trying to send any messages and should also
- // stop enqueuing 'callbacks' to the client dispatcher thread ...
- const bool doDeconfigure = d_operationState == e_RUNNING;
- d_operationState = e_DEAD;
-
// If stop request handling is in progress cancel checking for the
// unconfirmed messages.
if (d_periodicUnconfirmedCheckHandler) {
@@ -656,37 +661,32 @@ void ClientSession::tearDownImpl(bslmt::Semaphore* semaphore,
// closeQueue request is in process in the queue thread, so the handle is
// still active at the time tearDown is processed).
- int numHandlesDropped = 0;
- for (QueueStateMapIter it = d_queueSessionManager.queues().begin();
- it != d_queueSessionManager.queues().end();
- ++it) {
- if (!it->second.d_hasReceivedFinalCloseQueue) {
- BSLS_ASSERT_SAFE(it->second.d_handle_p->queue());
+ const bool hasLostTheClient = (!isBrokerShutdown && !isProxy());
- if (isBrokerShutdown ||
- d_clientIdentity_p->clientType() ==
- bmqp_ctrlmsg::ClientType::E_TCPBROKER) {
- it->second.d_handle_p->clearClient(false);
- }
- else {
- it->second.d_handle_p->clearClient(true);
- }
+ // if (d_operationState == e_SHUTTING_DOWN_V2) {
+ // // Leave over queues and handles
+ // }
+ // else {
+ // Set up the 'd_operationState' to indicate that the channel is dying and
+ // we should not use it anymore trying to send any messages and should also
+ // stop enqueuing 'callbacks' to the client dispatcher thread ...
+ const bool doDeconfigure = d_operationState == e_RUNNING;
- it->second.d_handle_p->drop(doDeconfigure);
- it->second.d_handle_p = 0;
- it->second.d_hasReceivedFinalCloseQueue = true;
- ++numHandlesDropped;
- }
- }
+ int numHandlesDropped = dropAllQueueHandles(doDeconfigure,
+ hasLostTheClient);
+ BALL_LOG_INFO << description() << ": Dropped " << numHandlesDropped
+ << " queue handles.";
+ // }
+ // Set up the 'd_operationState' to indicate that the channel is dying and
+ // we should not use it anymore trying to send any messages and should also
+ // stop enqueuing 'callbacks' to the client dispatcher thread ...
+ d_operationState = e_DEAD;
// Invalidating 'd_queueSessionManager' _after_ calling 'clearClient',
// otherwise handle can get destructed because of
// 'QueueSessionManager::onHandleReleased' early exit.
d_queueSessionManager.tearDown();
- BALL_LOG_INFO << description() << ": Dropped " << numHandlesDropped
- << " queue handles.";
-
// Now that we enqueued a 'drop' for all applicable queue handles, we need
// to synchronize on all queues, to make sure this drop event has been
// processed. We do so by enqueuing an event to all queues dispatchers
@@ -839,14 +839,15 @@ void ClientSession::onHandleConfiguredDispatched(
<< " for request: " << streamParamsCtrlMsg;
// Send the response
- sendPacket(d_state.d_schemaEventBuilder.blob(), true);
+ sendPacketDispatched(d_state.d_schemaEventBuilder.blob(), true);
logOperationTime(d_currentOpDescription);
}
void ClientSession::initiateShutdownDispatched(
const ShutdownCb& callback,
- const bsls::TimeInterval& timeout)
+ const bsls::TimeInterval& timeout,
+ bool supportShutdownV2)
{
// executed by the *CLIENT* dispatcher thread
@@ -862,15 +863,22 @@ void ClientSession::initiateShutdownDispatched(
callback();
return; // RETURN
}
+
if (d_operationState == e_SHUTTING_DOWN) {
// More than one cluster calls 'initiateShutdown'?
callback();
return; // RETURN
}
+ if (d_operationState == e_SHUTTING_DOWN_V2) {
+ // More than one cluster calls 'initiateShutdown'?
+ callback();
+ return; // RETURN
+ }
+
flush(); // Flush any pending messages
- // Wait for tearDown.
+ // 'tearDown' should invoke the 'callback'
d_shutdownCallback = callback;
if (d_operationState == e_DISCONNECTING) {
@@ -881,8 +889,36 @@ void ClientSession::initiateShutdownDispatched(
return; // RETURN
}
- // Wait for unconfirmed messages.
- // Wait for tearDown.
+ if (supportShutdownV2) {
+ d_operationState = e_SHUTTING_DOWN_V2;
+ d_queueSessionManager.shutDown();
+
+ callback();
+ }
+ else {
+ // After de-configuring (below), wait for unconfirmed messages.
+ // Once the wait for unconfirmed is over, close the channel
+
+ ShutdownContextSp context;
+ context.createInplace(
+ d_state.d_allocator_p,
+ bdlf::BindUtil::bind(&ClientSession::closeChannel, this),
+ timeout);
+
+ deconfigureAndWait(context);
+ }
+}
+
+void ClientSession::deconfigureAndWait(ShutdownContextSp& context)
+{
+ // executed by the *CLIENT* dispatcher thread
+
+ // PRECONDITIONS
+ BSLS_ASSERT_SAFE(dispatcher()->inDispatcherThread(this));
+ BSLS_ASSERT_SAFE(context);
+
+ // Use the same 'e_SHUTTING_DOWN' state for both shutting down and
+ // StopRequest processing.
d_operationState = e_SHUTTING_DOWN;
@@ -902,13 +938,6 @@ void ClientSession::initiateShutdownDispatched(
// No-op if the link is empty
d_shutdownChain.append(&link);
- // Check unconfirmed messages once all the handles are deconfigured
- ShutdownContextSp context;
- context.createInplace(d_state.d_allocator_p,
- bdlf::BindUtil::bind(&ClientSession::closeChannel,
- this),
- timeout);
-
d_shutdownChain.appendInplace(
bdlf::BindUtil::bind(&ClientSession::checkUnconfirmed,
this,
@@ -1174,18 +1203,8 @@ void ClientSession::processDisconnectAllQueues(
// us back to notify the handle can be deleted and we also don't need to
// send a close queue response since this release is not initiated by the
// client, but by the broker upon client's disconnect request.
- int numHandlesDropped = 0;
- for (QueueStateMapIter it = d_queueSessionManager.queues().begin();
- it != d_queueSessionManager.queues().end();
- ++it) {
- if (!it->second.d_hasReceivedFinalCloseQueue) {
- it->second.d_handle_p->clearClient(false);
- it->second.d_handle_p->drop(doDeconfigure);
- it->second.d_handle_p = 0;
- it->second.d_hasReceivedFinalCloseQueue = true;
- ++numHandlesDropped;
- }
- }
+
+ int numHandlesDropped = dropAllQueueHandles(doDeconfigure, false);
BALL_LOG_INFO << description() << ": processing disconnect, dropped "
<< numHandlesDropped << " queue handles.";
@@ -1267,7 +1286,7 @@ void ClientSession::processDisconnect(
<< ": Sending disconnect response: " << response;
// Send the response
- sendPacket(d_state.d_schemaEventBuilder.blob(), true);
+ sendPacketDispatched(d_state.d_schemaEventBuilder.blob(), true);
// Setting the 'd_operationState' to 'e_DISCONNECTED' implies that no
// messages will be pushed to the client after this one: we set it now
@@ -1354,7 +1373,7 @@ void ClientSession::openQueueCb(
<< " for request: " << handleParamsCtrlMsg;
// Send the response
- sendPacket(d_state.d_schemaEventBuilder.blob(), true);
+ sendPacketDispatched(d_state.d_schemaEventBuilder.blob(), true);
const bsl::string& queueUri =
handleParamsCtrlMsg.choice().openQueue().handleParameters().uri();
@@ -1416,7 +1435,7 @@ void ClientSession::closeQueueCb(
<< ": Sending closeQueue response: " << response;
// Send the response.
- sendPacket(d_state.d_schemaEventBuilder.blob(), true);
+ sendPacketDispatched(d_state.d_schemaEventBuilder.blob(), true);
// Release the handle's ptr in the queue's context to guarantee that the
// handle will be destroyed after all ongoing queue events are handled.
@@ -2749,34 +2768,11 @@ void ClientSession::processEvent(
return; // RETURN
}
case MsgChoice::SELECTION_ID_CLUSTER_MESSAGE: {
- if (d_clientIdentity_p->clientType() ==
- bmqp_ctrlmsg::ClientType::E_TCPBROKER &&
- choice.clusterMessage().choice().isStopResponseValue()) {
- bsl::shared_ptr cluster;
-
- if (d_clusterCatalog_p->findCluster(&cluster,
- choice.clusterMessage()
- .choice()
- .stopResponse()
- .clusterName())) {
- BSLS_ASSERT_SAFE(cluster);
- cluster->processResponse(controlMessage);
- return; // RETURN
- }
- else {
- BALL_LOG_ERROR << "#CLIENT_IMPROPER_BEHAVIOR "
- << description()
- << ": unknown Cluster in ClusterMessage: "
- << controlMessage;
- }
- }
- else {
- BALL_LOG_ERROR
- << "#CLIENT_IMPROPER_BEHAVIOR " << description()
- << ": unexpected ClusterMessage: " << controlMessage;
- }
- return; // RETURN
- }
+ eventCallback = bdlf::BindUtil::bind(
+ &ClientSession::processClusterMessage,
+ this,
+ controlMessage);
+ } break;
case MsgChoice::SELECTION_ID_UNDEFINED:
case MsgChoice::SELECTION_ID_STATUS:
default: {
@@ -2871,7 +2867,8 @@ void ClientSession::tearDown(const bsl::shared_ptr& session,
}
void ClientSession::initiateShutdown(const ShutdownCb& callback,
- const bsls::TimeInterval& timeout)
+ const bsls::TimeInterval& timeout,
+ bool supportShutdownV2)
{
// executed by the *ANY* thread
@@ -2908,7 +2905,8 @@ void ClientSession::initiateShutdown(const ShutdownCb& callback,
&ClientSession::initiateShutdownDispatched,
d_self.acquire()),
callback,
- timeout),
+ timeout,
+ supportShutdownV2),
this,
mqbi::DispatcherEventType::e_DISPATCHER);
// Use 'mqbi::DispatcherEventType::e_DISPATCHER' to avoid (re)enabling
@@ -3074,7 +3072,7 @@ void ClientSession::flush()
BALL_LOG_TRACE << description() << ": Flushing "
<< d_state.d_pushBuilder.messageCount()
<< " PUSH messages";
- sendPacket(d_state.d_pushBuilder.blob(), false);
+ sendPacketDispatched(d_state.d_pushBuilder.blob(), false);
d_state.d_pushBuilder.reset();
}
@@ -3083,10 +3081,134 @@ void ClientSession::flush()
BALL_LOG_TRACE << description() << ": Flushing "
<< d_state.d_ackBuilder.messageCount()
<< " ACK messages";
- sendPacket(d_state.d_ackBuilder.blob(), false);
+ sendPacketDispatched(d_state.d_ackBuilder.blob(), false);
d_state.d_ackBuilder.reset();
}
}
+void ClientSession::processClusterMessage(
+ const bmqp_ctrlmsg::ControlMessage& message)
+{
+ if (message.choice().clusterMessage().choice().isStopResponseValue()) {
+ BALL_LOG_INFO << description() << ": processStopResponse: " << message;
+ d_clusterCatalog_p->stopRequestManger().processResponse(message);
+ }
+ else if (message.choice().clusterMessage().choice().isStopRequestValue()) {
+ // This is StopRequest from Proxy
+ // Assume StopRequest V2
+
+ const bmqp_ctrlmsg::StopRequest& request =
+ message.choice().clusterMessage().choice().stopRequest();
+
+ BSLS_ASSERT_SAFE(request.version() == 2);
+
+ // Deconfigure all queues. Do NOT wait for unconfirmed
+
+ BALL_LOG_INFO << description() << ": processing StopRequest.";
+
+ bmqp_ctrlmsg::ControlMessage response;
+
+ response.choice().makeClusterMessage().choice().makeStopResponse();
+ response.rId() = message.rId();
+
+ d_state.d_schemaEventBuilder.reset();
+ int rc = d_state.d_schemaEventBuilder.setMessage(
+ response,
+ bmqp::EventType::e_CONTROL);
+
+ if (rc != 0) {
+ BALL_LOG_ERROR
+ << "#CLIENT_SEND_FAILURE " << description()
+ << ": Encoding StopRequest response has failed, [rc: " << rc
+ << "]: " << response;
+
+ return; // RETURN
+ }
+
+ ShutdownContextSp context;
+ context.createInplace(
+ d_state.d_allocator_p,
+ bdlf::BindUtil::bind(&ClientSession::sendPacket,
+ this,
+ d_state.d_schemaEventBuilder.blob(),
+ false));
+
+ processStopRequest(context);
+ }
+ else {
+ BALL_LOG_ERROR << "#CLIENT_IMPROPER_BEHAVIOR " << description()
+ << ": unknown Cluster in StopResponse: " << message;
+ }
+}
+
+void ClientSession::onDeconfiguredHandle(const ShutdownContextSp& contextSp)
+{
+ (void)contextSp;
+}
+
+void ClientSession::processStopRequest(ShutdownContextSp& contextSp)
+{
+ // This StopRequest arrives from a downstream (otherwise, ClusterProxy
+ // would receive it). As an upstream, this node needs to deconfigure all
+ // queues and then respond with StopResponse.
+
+ // Use the same logic as in the 'initiateShutdown' except that the final
+ // step is sending StopResponse instead of 'closeChannel'
+ // executed by the *CLIENT* dispatcher thread
+
+ // PRECONDITIONS
+ BSLS_ASSERT_SAFE(dispatcher()->inDispatcherThread(this));
+
+ if (d_operationState == e_DEAD) {
+ // The client is disconnected. No-op
+ return; // RETURN
+ }
+ if (d_operationState == e_SHUTTING_DOWN) {
+ // The broker is already shutting down or processing a StopRequest
+ // The de-configuring is done.
+ // Even if the waiting is in progress, still reply with StopResponse
+ return; // RETURN
+ }
+ if (d_operationState == e_SHUTTING_DOWN_V2) {
+ // The broker is already shutting down or processing a StopRequest
+ // The de-configuring is done.
+ // Even if the waiting is in progress, still reply with StopResponse
+ return; // RETURN
+ }
+
+ if (d_operationState == e_DISCONNECTING) {
+ // The client is disconnecting. No-op
+ return; // RETURN
+ }
+ for (QueueStateMapCIter cit = d_queueSessionManager.queues().begin();
+ cit != d_queueSessionManager.queues().end();
+ ++cit) {
+ if (!cit->second.d_hasReceivedFinalCloseQueue) {
+ cit->second.d_handle_p->deconfigureAll(
+ bdlf::BindUtil::bind(&ClientSession::onDeconfiguredHandle,
+ this,
+ contextSp));
+ }
+ }
+}
+
+int ClientSession::dropAllQueueHandles(bool doDeconfigure, bool hasLostClient)
+{
+ int numHandlesDropped = 0;
+ for (QueueStateMapIter it = d_queueSessionManager.queues().begin();
+ it != d_queueSessionManager.queues().end();
+ ++it) {
+ if (!it->second.d_hasReceivedFinalCloseQueue) {
+ it->second.d_handle_p->clearClient(hasLostClient);
+ it->second.d_handle_p->drop(doDeconfigure);
+ it->second.d_handle_p = 0;
+ it->second.d_hasReceivedFinalCloseQueue = true;
+ ++numHandlesDropped;
+ }
+ }
+
+ return numHandlesDropped;
+}
+
} // close package namespace
} // close enterprise namespace
diff --git a/src/groups/mqb/mqba/mqba_clientsession.h b/src/groups/mqb/mqba/mqba_clientsession.h
index 53f41d5d3..ecc803395 100644
--- a/src/groups/mqb/mqba/mqba_clientsession.h
+++ b/src/groups/mqb/mqba/mqba_clientsession.h
@@ -273,8 +273,12 @@ class ClientSession : public mqbnet::Session,
enum OperationState {
e_RUNNING // Running normally
,
+ // TODO(shutdown-v2): TEMPORARY, remove when all switch to StopRequest
+ // V2.
e_SHUTTING_DOWN // Shutting down due to 'initiateShutdown' request
,
+ e_SHUTTING_DOWN_V2 // Shutting down due to 'initiateShutdown' request
+ ,
e_DISCONNECTING // Disconnecting due to the client disconnect request
,
e_DISCONNECTED // The session is disconnected and no more valid
@@ -295,6 +299,8 @@ class ClientSession : public mqbnet::Session,
ShutdownContext(const ShutdownCb& callback,
const bsls::TimeInterval& timeout);
+ ShutdownContext(const ShutdownCb& callback);
+
~ShutdownContext();
};
// Struct to be used as a context for shutdown operation.
@@ -423,6 +429,7 @@ class ClientSession : public mqbnet::Session,
/// guarantee strict serialization of events when sending a control
/// message.
void sendPacket(const bdlbb::Blob& blob, bool flushBuilders);
+ void sendPacketDispatched(const bdlbb::Blob& blob, bool flushBuilders);
/// Flush as much as possible of the content of the internal
/// `channelBufferQueue`.
@@ -476,10 +483,13 @@ class ClientSession : public mqbnet::Session,
/// `callback` upon completion of (asynchronous) shutdown sequence or
/// if the specified `timeout` is expired.
void initiateShutdownDispatched(const ShutdownCb& callback,
- const bsls::TimeInterval& timeout);
+ const bsls::TimeInterval& timeout,
+ bool supportShutdownV2);
void invalidateDispatched();
+ void deconfigureAndWait(ShutdownContextSp& context);
+
void checkUnconfirmed(const ShutdownContextSp& shutdownCtx,
const VoidFunctor& completionCb);
@@ -517,6 +527,12 @@ class ClientSession : public mqbnet::Session,
const ShutdownContextSp& shutdownCtx,
const VoidFunctor& completionCb);
+ void processClusterMessage(const bmqp_ctrlmsg::ControlMessage& message);
+ void processStopRequest(ShutdownContextSp& context);
+ void onDeconfiguredHandle(const ShutdownContextSp& contextSp);
+
+ int dropAllQueueHandles(bool doDeconfigure, bool hasLostClient);
+
void processDisconnect(const bmqp_ctrlmsg::ControlMessage& controlMessage);
void processDisconnectAllQueues(
@@ -616,6 +632,8 @@ class ClientSession : public mqbnet::Session,
/// Return true if the session is `e_DISCONNECTED` or worse (`e_DEAD`).
bool isDisconnected() const;
+ bool isProxy() const;
+
public:
// TRAITS
BSLMF_NESTED_TRAIT_DECLARATION(ClientSession, bslma::UsesBslmaAllocator)
@@ -669,11 +687,15 @@ class ClientSession : public mqbnet::Session,
/// Initiate the shutdown of the session and invoke the specified
/// `callback` upon completion of (asynchronous) shutdown sequence or
- /// if the specified `timeout` is expired.
+ /// if the specified `timeout` is expired. If the optional (temporary)
+ /// specified 'supportShutdownV2' is 'true' execute shutdown logic V2
+ /// where upstream (not downstream) nodes deconfigure queues and the
+ /// shutting down node (not downstream) waits for CONFIRMS.
/// The shutdown is complete when 'tearDownAllQueuesDone'.
void
initiateShutdown(const ShutdownCb& callback,
- const bsls::TimeInterval& timeout) BSLS_KEYWORD_OVERRIDE;
+ const bsls::TimeInterval& timeout,
+ bool supportShutdownV2 = false) BSLS_KEYWORD_OVERRIDE;
/// Make the session abandon any work it has.
void invalidate() BSLS_KEYWORD_OVERRIDE;
@@ -782,6 +804,15 @@ inline ClientSession::ShutdownContext::ShutdownContext(
d_stopTime += timeout;
}
+inline ClientSession::ShutdownContext::ShutdownContext(
+ const ShutdownCb& callback)
+: d_callback(callback)
+, d_stopTime() // unused in V2
+, d_numUnconfirmedTotal(0) // unused in V2
+{
+ BSLS_ASSERT_SAFE(d_callback);
+}
+
inline ClientSession::ShutdownContext::~ShutdownContext()
{
// Assume 'd_callback' does not require specific thread
@@ -800,6 +831,12 @@ inline bool ClientSession::isDisconnected() const
return d_operationState == e_DISCONNECTED || d_operationState == e_DEAD;
}
+inline bool ClientSession::isProxy() const
+{
+ return d_clientIdentity_p->clientType() ==
+ bmqp_ctrlmsg::ClientType::E_TCPBROKER;
+}
+
inline bsl::shared_ptr ClientSession::channel() const
{
return d_channel_sp;
diff --git a/src/groups/mqb/mqba/mqba_sessionnegotiator.cpp b/src/groups/mqb/mqba/mqba_sessionnegotiator.cpp
index 91d8313e9..0b85b4f95 100644
--- a/src/groups/mqb/mqba/mqba_sessionnegotiator.cpp
+++ b/src/groups/mqb/mqba/mqba_sessionnegotiator.cpp
@@ -133,7 +133,9 @@ void loadBrokerIdentity(bmqp_ctrlmsg::ClientIdentity* identity,
.append(";")
.append(bmqp::HighAvailabilityFeatures::k_FIELD_NAME)
.append(":")
- .append(bmqp::HighAvailabilityFeatures::k_GRACEFUL_SHUTDOWN);
+ .append(bmqp::HighAvailabilityFeatures::k_GRACEFUL_SHUTDOWN)
+ .append(",")
+ .append(bmqp::HighAvailabilityFeatures::k_GRACEFUL_SHUTDOWN_V2);
if (shouldBroadcastToProxies) {
features.append(",").append(
diff --git a/src/groups/mqb/mqbblp/mqbblp_cluster.cpp b/src/groups/mqb/mqbblp/mqbblp_cluster.cpp
index a635b1a90..e7a143567 100644
--- a/src/groups/mqb/mqbblp/mqbblp_cluster.cpp
+++ b/src/groups/mqb/mqbblp/mqbblp_cluster.cpp
@@ -621,7 +621,8 @@ void Cluster::processCommandDispatched(mqbcmd::ClusterResult* result,
result->makeError().message() = os.str();
}
-void Cluster::initiateShutdownDispatched(const VoidFunctor& callback)
+void Cluster::initiateShutdownDispatched(const VoidFunctor& callback,
+ bool supportShutdownV2)
{
// executed by the *DISPATCHER* thread
@@ -632,84 +633,105 @@ void Cluster::initiateShutdownDispatched(const VoidFunctor& callback)
d_isStopping = true;
- // Send StopRequest to all nodes and proxies. The peers are expected not
- // to send any PUT msgs to this node after receiving StopRequest. For each
- // queue for which this node is the primary, peers (replicas and proxies)
- // will de-configure the queue, wait for configured timeout, close the
- // queue, and respond with StopResponse. The peers are expected not to
- // send any PUT/PUSH/ACK/CONFIRM msgs to this node after sending
- // StopResponse.
- //
- // Call 'initiateShutdown' for all client sessions.
- //
- // Also update self's status. Note that this node does not explicitly
- // issue a close-queue request for each of the queues.
-
d_clusterData.membership().setSelfNodeStatus(
bmqp_ctrlmsg::NodeStatus::E_STOPPING);
- mwcu::OperationChainLink link(d_shutdownChain.allocator());
- bsls::TimeInterval shutdownTimeout;
- shutdownTimeout.addMilliseconds(
- d_clusterData.clusterConfig().queueOperations().shutdownTimeoutMs());
-
- SessionSpVec sessions;
- for (mqbnet::TransportManagerIterator sessIt(
- &d_clusterData.transportManager());
- sessIt;
- ++sessIt) {
- bsl::shared_ptr sessionSp = sessIt.session().lock();
- if (!sessionSp) {
- continue; // CONTINUE
- }
+ if (supportShutdownV2) {
+ d_clusterOrchestrator.queueHelper().requestToStopPushing();
- const bmqp_ctrlmsg::NegotiationMessage& negoMsg =
- sessionSp->negotiationMessage();
+ bsls::TimeInterval whenToStop(
+ bsls::SystemTime::now(bsls::SystemClockType::e_MONOTONIC));
+ whenToStop.addMilliseconds(d_clusterData.clusterConfig()
+ .queueOperations()
+ .shutdownTimeoutMs());
- const bmqp_ctrlmsg::ClientIdentity& peerIdentity =
- negoMsg.isClientIdentityValue()
- ? negoMsg.clientIdentity()
- : negoMsg.brokerResponse().brokerIdentity();
+ d_shutdownChain.appendInplace(
+ bdlf::BindUtil::bind(&ClusterQueueHelper::checkUnconfirmedV2,
+ &d_clusterOrchestrator.queueHelper(),
+ whenToStop,
+ bdlf::PlaceHolders::_1)); // completionCb
+ }
+ else {
+ // TODO(shutdown-v2): TEMPORARY, remove when all switch to StopRequest
+ // V2.
+ // Send StopRequest to all nodes and proxies. The peers are expected
+ // not to send any PUT msgs to this node after receiving StopRequest.
+ // For each queue for which this node is the primary, peers (replicas
+ // and proxies) will de-configure the queue, wait for configured
+ // timeout, close the queue, and respond with StopResponse. The peers
+ // are expected not to send any PUT/PUSH/ACK/CONFIRM msgs to this node
+ // after sending StopResponse.
+ //
+ // Call 'initiateShutdown' for all client sessions.
+
+ mwcu::OperationChainLink link(d_shutdownChain.allocator());
+ bsls::TimeInterval shutdownTimeout;
+ shutdownTimeout.addMilliseconds(d_clusterData.clusterConfig()
+ .queueOperations()
+ .shutdownTimeoutMs());
+
+ SessionSpVec sessions;
+ for (mqbnet::TransportManagerIterator sessIt(
+ &d_clusterData.transportManager());
+ sessIt;
+ ++sessIt) {
+ bsl::shared_ptr sessionSp =
+ sessIt.session().lock();
+ if (!sessionSp) {
+ continue; // CONTINUE
+ }
- if (peerIdentity.clusterNodeId() ==
- d_clusterData.membership().netCluster()->selfNodeId()) {
- continue; // CONTINUE
- }
+ const bmqp_ctrlmsg::NegotiationMessage& negoMsg =
+ sessionSp->negotiationMessage();
- if (mqbnet::ClusterUtil::isClient(negoMsg)) {
- link.insert(bdlf::BindUtil::bind(
- &mqbnet::Session::initiateShutdown,
- sessionSp,
- bdlf::PlaceHolders::_1, // completion callback
- shutdownTimeout));
- continue; // CONTINUE
- }
+ const bmqp_ctrlmsg::ClientIdentity& peerIdentity =
+ negoMsg.isClientIdentityValue()
+ ? negoMsg.clientIdentity()
+ : negoMsg.brokerResponse().brokerIdentity();
- if (peerIdentity.clusterName() == name()) {
- // Expect all proxies and nodes support this feature.
- if (!bmqp::ProtocolUtil::hasFeature(
- bmqp::HighAvailabilityFeatures::k_FIELD_NAME,
- bmqp::HighAvailabilityFeatures::k_GRACEFUL_SHUTDOWN,
- peerIdentity.features())) {
- BALL_LOG_ERROR << description() << ": Peer doesn't support "
- << "GRACEFUL_SHUTDOWN. Skip sending stopRequest"
- << " to [" << peerIdentity << "]";
+ if (peerIdentity.clusterNodeId() ==
+ d_clusterData.membership().netCluster()->selfNodeId()) {
continue; // CONTINUE
}
- sessions.push_back(sessionSp);
+
+ if (mqbnet::ClusterUtil::isClient(negoMsg)) {
+ link.insert(bdlf::BindUtil::bind(
+ &mqbnet::Session::initiateShutdown,
+ sessionSp,
+ bdlf::PlaceHolders::_1, // completion callback
+ shutdownTimeout,
+ false));
+
+ continue; // CONTINUE
+ }
+
+ if (peerIdentity.clusterName() == name()) {
+ // Expect all proxies and nodes support this feature.
+ if (!bmqp::ProtocolUtil::hasFeature(
+ bmqp::HighAvailabilityFeatures::k_FIELD_NAME,
+ bmqp::HighAvailabilityFeatures::k_GRACEFUL_SHUTDOWN,
+ peerIdentity.features())) {
+ BALL_LOG_ERROR
+ << description() << ": Peer doesn't support "
+ << "GRACEFUL_SHUTDOWN. Skip sending stopRequest"
+ << " to [" << peerIdentity << "]";
+ continue; // CONTINUE
+ }
+ sessions.push_back(sessionSp);
+ }
}
- }
- link.insert(
- bdlf::BindUtil::bind(&Cluster::sendStopRequest,
- this,
- sessions,
- bdlf::PlaceHolders::_1)); // completion callback
+ link.insert(bdlf::BindUtil::bind(
+ &Cluster::sendStopRequest,
+ this,
+ sessions,
+ bdlf::PlaceHolders::_1)); // completion callback
- d_shutdownChain.append(&link);
+ d_shutdownChain.append(&link);
+ }
- // Add callback to be invoked once all the client sessions are shut down
- // and stop responses are received
+ // Also update self's status. Note that this node does not explicitly
+ // issue a close-queue request for each of the queues.
d_shutdownChain.appendInplace(
bdlf::BindUtil::bind(&Cluster::continueShutdown,
@@ -727,7 +749,7 @@ void Cluster::sendStopRequest(const SessionSpVec& sessions,
// Send a StopRequest to available cluster nodes and proxies connected to
// the cluster
StopRequestManagerType::RequestContextSp contextSp =
- d_stopRequestsManager.createRequestContext();
+ d_stopRequestsManager_p->createRequestContext();
bmqp_ctrlmsg::StopRequest& request = contextSp->request()
.choice()
.makeClusterMessage()
@@ -746,7 +768,7 @@ void Cluster::sendStopRequest(const SessionSpVec& sessions,
BALL_LOG_INFO << "Sending StopRequest to " << sessions.size()
<< " brokers; timeout is " << timeoutMs;
- d_stopRequestsManager.sendRequest(contextSp, timeoutMs);
+ d_stopRequestsManager_p->sendRequest(contextSp, timeoutMs);
// continue after receipt of all StopResponses or the timeout
}
@@ -2520,6 +2542,7 @@ Cluster::Cluster(const bslstl::StringRef& name,
BlobSpPool* blobSpPool,
bdlbb::BlobBufferFactory* bufferFactory,
mqbnet::TransportManager* transportManager,
+ StopRequestManagerType* stopRequestsManager,
bslma::Allocator* allocator,
const mqbnet::Session::AdminCommandEnqueueCb& adminCb)
: d_allocator_p(allocator)
@@ -2557,7 +2580,7 @@ Cluster::Cluster(const bslstl::StringRef& name,
, d_throttledDroppedPushMessages(5000, 5) // 5 logs per 5s interval
, d_logSummarySchedulerHandle()
, d_queueGcSchedulerHandle()
-, d_stopRequestsManager(&d_clusterData.requestManager(), allocator)
+, d_stopRequestsManager_p(stopRequestsManager)
, d_shutdownChain(allocator)
, d_adminCb(adminCb)
{
@@ -2670,7 +2693,8 @@ int Cluster::start(bsl::ostream& errorDescription)
return rc;
}
-void Cluster::initiateShutdown(const VoidFunctor& callback)
+void Cluster::initiateShutdown(const VoidFunctor& callback,
+ bool supportShutdownV2)
{
// executed by *ANY* thread
@@ -2682,7 +2706,8 @@ void Cluster::initiateShutdown(const VoidFunctor& callback)
dispatcher()->execute(
bdlf::BindUtil::bind(&Cluster::initiateShutdownDispatched,
this,
- callback),
+ callback,
+ supportShutdownV2),
this);
// Wait for above event to complete. This is needed because
@@ -3030,8 +3055,7 @@ void Cluster::processClusterControlMessage(
case MsgChoice::SELECTION_ID_STORAGE_SYNC_RESPONSE:
case MsgChoice::SELECTION_ID_PARTITION_SYNC_STATE_QUERY_RESPONSE:
case MsgChoice::SELECTION_ID_PARTITION_SYNC_DATA_QUERY_RESPONSE:
- case MsgChoice::SELECTION_ID_CLUSTER_SYNC_RESPONSE:
- case MsgChoice::SELECTION_ID_STOP_RESPONSE: {
+ case MsgChoice::SELECTION_ID_CLUSTER_SYNC_RESPONSE: {
// NOTE: that we cant simply just check if the msg has an id, because
// in cluster, it can receive requests which will have an id; so
// only messages that are response type should be sent to the
@@ -3047,6 +3071,11 @@ void Cluster::processClusterControlMessage(
source),
this);
} break; // BREAK
+
+ case MsgChoice::SELECTION_ID_STOP_RESPONSE: {
+ BALL_LOG_INFO << description() << ": processStopResponse: " << message;
+ d_stopRequestsManager_p->processResponse(message);
+ } break;
case MsgChoice::SELECTION_ID_PARTITION_PRIMARY_ADVISORY: {
dispatcher()->execute(
bdlf::BindUtil::bind(
diff --git a/src/groups/mqb/mqbblp/mqbblp_cluster.h b/src/groups/mqb/mqbblp/mqbblp_cluster.h
index c6f8319d1..5c51bffed 100644
--- a/src/groups/mqb/mqbblp/mqbblp_cluster.h
+++ b/src/groups/mqb/mqbblp/mqbblp_cluster.h
@@ -334,7 +334,7 @@ class Cluster : public mqbi::Cluster,
// Scheduler handle for the recurring
// queue gc check.
- StopRequestManagerType d_stopRequestsManager;
+ StopRequestManagerType* d_stopRequestsManager_p;
mwcu::OperationChain d_shutdownChain;
// Mechanism used for the Cluster
@@ -414,7 +414,11 @@ class Cluster : public mqbi::Cluster,
const mqbcmd::ClusterCommand& command);
/// Executed by dispatcher thread.
- void initiateShutdownDispatched(const VoidFunctor& callback);
+ void initiateShutdownDispatched(const VoidFunctor& callback,
+ bool supportShutdownV2);
+
+ // TODO(shutdown-v2): TEMPORARY, remove when all switch to StopRequest
+ // V2.
/// Send stop request to proxies and nodes specified in `sessions` using
/// the specified `stopCb` as a callback to be called once all the
@@ -549,6 +553,7 @@ class Cluster : public mqbi::Cluster,
BlobSpPool* blobSpPool,
bdlbb::BlobBufferFactory* bufferFactory,
mqbnet::TransportManager* transportManager,
+ StopRequestManagerType* stopRequestsManager,
bslma::Allocator* allocator,
const mqbnet::Session::AdminCommandEnqueueCb& adminCb);
@@ -566,8 +571,13 @@ class Cluster : public mqbi::Cluster,
/// Initiate the shutdown of the cluster. It is expected that `stop()`
/// will be called soon after this routine is invoked. Invoke the
/// specified `callback` upon completion of (asynchronous) shutdown
- /// sequence.
- void initiateShutdown(const VoidFunctor& callback) BSLS_KEYWORD_OVERRIDE;
+ /// sequence. If the optional (temporary) specified 'supportShutdownV2'
+ /// is 'true' execute shutdown logic V2 where upstream (not downstream)
+ /// nodes deconfigure queues and the shutting down node (not downstream)
+ /// wait for CONFIRMS.
+ void
+ initiateShutdown(const VoidFunctor& callback,
+ bool supportShutdownV2 = false) BSLS_KEYWORD_OVERRIDE;
/// Stop the `Cluster`.
void stop() BSLS_KEYWORD_OVERRIDE;
diff --git a/src/groups/mqb/mqbblp/mqbblp_clustercatalog.cpp b/src/groups/mqb/mqbblp/mqbblp_clustercatalog.cpp
index f7ebef940..e693a53b3 100644
--- a/src/groups/mqb/mqbblp/mqbblp_clustercatalog.cpp
+++ b/src/groups/mqb/mqbblp/mqbblp_clustercatalog.cpp
@@ -187,6 +187,7 @@ int ClusterCatalog::createCluster(bsl::ostream& errorDescription,
d_blobSpPool_p,
d_bufferFactory_p,
d_transportManager_p,
+ &d_stopRequestsManager,
clusterAllocator,
d_adminCb);
@@ -230,6 +231,7 @@ int ClusterCatalog::createCluster(bsl::ostream& errorDescription,
d_blobSpPool_p,
d_dispatcher_p,
d_transportManager_p,
+ &d_stopRequestsManager,
clusterAllocator);
info.d_cluster_sp.reset(cluster, clusterAllocator);
@@ -383,6 +385,12 @@ ClusterCatalog::ClusterCatalog(bdlmt::EventScheduler* scheduler,
, d_reversedClusterConnections(d_allocator_p)
, d_clusters(d_allocator_p)
, d_statContexts(statContexts)
+, d_requestManager(bmqp::EventType::e_CONTROL,
+ d_bufferFactory_p,
+ d_scheduler_p,
+ false, // lateResponseMode
+ d_allocator_p)
+, d_stopRequestsManager(&d_requestManager, d_allocator_p)
{
// PRECONDITIONS
BSLS_ASSERT_SAFE(scheduler->clockType() ==
@@ -396,7 +404,7 @@ ClusterCatalog::~ClusterCatalog()
"stop() must be called before destroying this object");
}
-int ClusterCatalog::loadBrokerClusterConfig(bsl::ostream& errorDescription)
+int ClusterCatalog::loadBrokerClusterConfig(bsl::ostream&)
{
// executed by the *MAIN* thread
diff --git a/src/groups/mqb/mqbblp/mqbblp_clustercatalog.h b/src/groups/mqb/mqbblp/mqbblp_clustercatalog.h
index 4af84ab81..38d65e609 100644
--- a/src/groups/mqb/mqbblp/mqbblp_clustercatalog.h
+++ b/src/groups/mqb/mqbblp/mqbblp_clustercatalog.h
@@ -68,6 +68,7 @@
#include
#include
#include
+#include
#include
// MWC
@@ -178,6 +179,15 @@ class ClusterCatalog {
// cluster.
};
+ typedef bmqp::RequestManager
+ RequestManagerType;
+
+ typedef mqbnet::MultiRequestManager >
+ StopRequestManagerType;
+
private:
// PRIVATE TYPES
@@ -283,6 +293,14 @@ class ClusterCatalog {
mqbnet::Session::AdminCommandEnqueueCb d_adminCb;
// Callback function to enqueue admin commands
+ RequestManagerType d_requestManager;
+ // Request manager to use
+
+ // Should be part of 'ClusterResources'
+ StopRequestManagerType d_stopRequestsManager;
+ // Request manager to send stop
+ // requests to connected brokers.
+
private:
// NOT IMPLEMENTED
ClusterCatalog(const ClusterCatalog&) BSLS_CPP11_DELETED;
@@ -411,6 +429,9 @@ class ClusterCatalog {
int processCommand(mqbcmd::ClustersResult* result,
const mqbcmd::ClustersCommand& command);
+ StopRequestManagerType& stopRequestManger();
+ void processStopResponse(const bmqp_ctrlmsg::ControlMessage& message);
+
/// Sets the callback, `value`, to pass to created clusters in this catalog
/// that runs when an admin command is received by the cluster.
void setAdminCommandEnqueueCallback(
@@ -512,6 +533,18 @@ inline bool ClusterCatalog::isMemberOf(const bsl::string& clusterName) const
return (d_myClusters.find(clusterName) != d_myClusters.end());
}
+inline ClusterCatalog::StopRequestManagerType&
+ClusterCatalog::stopRequestManger()
+{
+ return d_stopRequestsManager;
+}
+
+inline void ClusterCatalog::processStopResponse(
+ const bmqp_ctrlmsg::ControlMessage& message)
+{
+ d_requestManager.processResponse(message);
+}
+
// ----------------------------
// class ClusterCatalogIterator
// ----------------------------
diff --git a/src/groups/mqb/mqbblp/mqbblp_clusterorchestrator.cpp b/src/groups/mqb/mqbblp/mqbblp_clusterorchestrator.cpp
index e624c17e7..6ebb52e5f 100644
--- a/src/groups/mqb/mqbblp/mqbblp_clusterorchestrator.cpp
+++ b/src/groups/mqb/mqbblp/mqbblp_clusterorchestrator.cpp
@@ -849,15 +849,6 @@ void ClusterOrchestrator::processStopRequest(
request.choice().clusterMessage().choice().stopRequest();
const bsl::string& name = d_cluster_p->name();
- if (BSLS_PERFORMANCEHINT_PREDICT_UNLIKELY(stopRequest.clusterName() !=
- name)) {
- BSLS_PERFORMANCEHINT_UNLIKELY_HINT;
- BALL_LOG_ERROR << d_clusterData_p->identity().description()
- << ": invalid cluster name in the StopRequest from "
- << source->nodeDescription() << ", " << request;
- return; // RETURN
- }
-
mqbc::ClusterNodeSession* ns =
d_clusterData_p->membership().getClusterNodeSession(source);
BSLS_ASSERT_SAFE(ns);
@@ -889,6 +880,15 @@ void ClusterOrchestrator::processStopRequest(
<< ", current status: " << ns->nodeStatus()
<< ", new status: " << bmqp_ctrlmsg::NodeStatus::E_STOPPING;
+ // TODO(shutdown-v2): TEMPORARY, remove when all switch to StopRequest V2.
+ if (stopRequest.version() == 1 && stopRequest.clusterName() != name) {
+ BSLS_PERFORMANCEHINT_UNLIKELY_HINT;
+ BALL_LOG_ERROR << d_clusterData_p->identity().description()
+ << ": invalid cluster name in the StopRequest from "
+ << source->nodeDescription() << ", " << request;
+ return; // RETURN
+ }
+
ns->setNodeStatus(bmqp_ctrlmsg::NodeStatus::E_STOPPING);
processNodeStoppingNotification(ns, &request);
@@ -1025,7 +1025,7 @@ void ClusterOrchestrator::processNodeStoppingNotification(
d_queueHelper.processNodeStoppingNotification(ns->clusterNode(),
request,
- &ns->primaryPartitions());
+ ns);
// For each partition for which self is primary, notify the StorageMgr
// about the status of a peer node. Self may end up issuing a
diff --git a/src/groups/mqb/mqbblp/mqbblp_clusterproxy.cpp b/src/groups/mqb/mqbblp/mqbblp_clusterproxy.cpp
index c44616d0a..bbe62bc74 100644
--- a/src/groups/mqb/mqbblp/mqbblp_clusterproxy.cpp
+++ b/src/groups/mqb/mqbblp/mqbblp_clusterproxy.cpp
@@ -74,7 +74,7 @@ typedef bsl::function CompletionCallback;
/// Utility function used in `mwcu::OperationChain` as the operation
/// callback which just calls the completion callback.
-void allClientSessionsShutDown(const CompletionCallback& callback)
+void completeShutDown(const CompletionCallback& callback)
{
callback();
}
@@ -157,7 +157,8 @@ void ClusterProxy::startDispatched()
this));
}
-void ClusterProxy::initiateShutdownDispatched(const VoidFunctor& callback)
+void ClusterProxy::initiateShutdownDispatched(const VoidFunctor& callback,
+ bool supportShutdownV2)
{
// executed by the *DISPATCHER* thread
@@ -170,51 +171,73 @@ void ClusterProxy::initiateShutdownDispatched(const VoidFunctor& callback)
// Mark self as stopping.
d_isStopping = true;
- // Fill the first link with client session shutdown operations
- mwcu::OperationChainLink link(d_shutdownChain.allocator());
- SessionSpVec sessions;
- bsls::TimeInterval shutdownTimeout;
- shutdownTimeout.addMilliseconds(
- clusterProxyConfig()->queueOperations().shutdownTimeoutMs());
-
- for (mqbnet::TransportManagerIterator sessIt(
- &d_clusterData.transportManager());
- sessIt;
- ++sessIt) {
- bsl::shared_ptr sessionSp = sessIt.session().lock();
- if (!sessionSp) {
- continue; // CONTINUE
- }
+ if (supportShutdownV2) {
+ d_queueHelper.requestToStopPushing();
+
+ bsls::TimeInterval whenToStop(
+ bsls::SystemTime::now(bsls::SystemClockType::e_MONOTONIC));
+ whenToStop.addMilliseconds(d_clusterData.clusterConfig()
+ .queueOperations()
+ .shutdownTimeoutMs());
- const bmqp_ctrlmsg::NegotiationMessage& negoMsg =
- sessionSp->negotiationMessage();
- if (mqbnet::ClusterUtil::isClientOrProxy(negoMsg)) {
- if (mqbnet::ClusterUtil::isClient(negoMsg)) {
- link.insert(
- bdlf::BindUtil::bind(&mqbnet::Session::initiateShutdown,
- sessionSp,
- bdlf::PlaceHolders::_1,
- shutdownTimeout));
+ d_shutdownChain.appendInplace(
+ bdlf::BindUtil::bind(&ClusterQueueHelper::checkUnconfirmedV2,
+ &d_queueHelper,
+ whenToStop,
+ bdlf::PlaceHolders::_1)); // completionCb
+ }
+ else {
+ // TODO(shutdown-v2): TEMPORARY, remove when all switch to StopRequest
+ // V2.
+
+ // Fill the first link with client session shutdown operations
+ mwcu::OperationChainLink link(d_shutdownChain.allocator());
+ SessionSpVec sessions;
+ bsls::TimeInterval shutdownTimeout;
+ shutdownTimeout.addMilliseconds(
+ clusterProxyConfig()->queueOperations().shutdownTimeoutMs());
+
+ for (mqbnet::TransportManagerIterator sessIt(
+ &d_clusterData.transportManager());
+ sessIt;
+ ++sessIt) {
+ bsl::shared_ptr sessionSp =
+ sessIt.session().lock();
+ if (!sessionSp) {
+ continue; // CONTINUE
}
- else {
- sessions.push_back(sessionSp);
+
+ const bmqp_ctrlmsg::NegotiationMessage& negoMsg =
+ sessionSp->negotiationMessage();
+ if (mqbnet::ClusterUtil::isClientOrProxy(negoMsg)) {
+ if (mqbnet::ClusterUtil::isClient(negoMsg)) {
+ link.insert(bdlf::BindUtil::bind(
+ &mqbnet::Session::initiateShutdown,
+ sessionSp,
+ bdlf::PlaceHolders::_1,
+ shutdownTimeout,
+ false));
+ }
+ else {
+ sessions.push_back(sessionSp);
+ }
}
}
- }
- link.insert(
- bdlf::BindUtil::bind(&ClusterProxy::sendStopRequest,
- this,
- sessions,
- bdlf::PlaceHolders::_1)); // completion callback
+ link.insert(bdlf::BindUtil::bind(
+ &ClusterProxy::sendStopRequest,
+ this,
+ sessions,
+ bdlf::PlaceHolders::_1)); // completion callback
- d_shutdownChain.append(&link);
+ d_shutdownChain.append(&link);
+ }
- // Add callback to be invoked once all the client sessions are shut down
- d_shutdownChain.appendInplace(
- bdlf::BindUtil::bind(&allClientSessionsShutDown,
- bdlf::PlaceHolders::_1),
- callback);
+ // Add callback to be invoked once V1 shuts down all client sessions or
+ // V2 finishes waiting for unconfirmed
+ d_shutdownChain.appendInplace(bdlf::BindUtil::bind(&completeShutDown,
+ bdlf::PlaceHolders::_1),
+ callback);
d_shutdownChain.start();
}
@@ -681,6 +704,15 @@ void ClusterProxy::processEvent(const bmqp::Event& event,
this);
return; // RETURN
}
+ if (clusterMessage.choice().isStopResponseValue()) {
+ dispatcher()->execute(
+ bdlf::BindUtil::bind(
+ &ClusterProxy::processPeerStopResponse,
+ this,
+ controlMessage),
+ this);
+ return; // RETURN
+ }
if (clusterMessage.choice().isNodeStatusAdvisoryValue()) {
dispatcher()->execute(
bdlf::BindUtil::bind(
@@ -832,6 +864,14 @@ void ClusterProxy::processResponse(
this);
}
+void ClusterProxy::processPeerStopResponse(
+ const bmqp_ctrlmsg::ControlMessage& response)
+{
+ BALL_LOG_INFO << description() << ": processStopResponse: " << response;
+
+ d_stopRequestsManager_p->processResponse(response);
+}
+
void ClusterProxy::processPeerStopRequest(
mqbnet::ClusterNode* clusterNode,
const bmqp_ctrlmsg::ControlMessage& request)
@@ -841,11 +881,10 @@ void ClusterProxy::processPeerStopRequest(
// PRECONDITIONS
BSLS_ASSERT_SAFE(dispatcher()->inDispatcherThread(this));
- const bsl::vector* noPartitions = 0;
d_queueHelper.processNodeStoppingNotification(
clusterNode,
&request,
- noPartitions,
+ 0,
bdlf::BindUtil::bind(&ClusterProxy::finishStopSequence,
this,
clusterNode));
@@ -859,6 +898,7 @@ void ClusterProxy::finishStopSequence(
// PRECONDITIONS
BSLS_ASSERT_SAFE(dispatcher()->inDispatcherThread(this));
+ // REVISIT
// Internal-ticket D169562052
// TODO: handle/eliminate the possibility of Multiple StopRequests.
// Currently, cannot switch the active node because another StopRequest can
@@ -966,7 +1006,7 @@ void ClusterProxy::sendStopRequest(const SessionSpVec& sessions,
{
// Send a StopRequest to available proxies connected to the virtual cluster
StopRequestManagerType::RequestContextSp contextSp =
- d_stopRequestsManager.createRequestContext();
+ d_stopRequestsManager_p->createRequestContext();
bmqp_ctrlmsg::StopRequest& request = contextSp->request()
.choice()
.makeClusterMessage()
@@ -985,7 +1025,7 @@ void ClusterProxy::sendStopRequest(const SessionSpVec& sessions,
BALL_LOG_INFO << "Sending StopRequest to " << sessions.size()
<< " proxies; timeout is " << timeoutMs;
- d_stopRequestsManager.sendRequest(contextSp, timeoutMs);
+ d_stopRequestsManager_p->sendRequest(contextSp, timeoutMs);
// continue after receipt of all StopResponses or the timeout
}
@@ -1012,6 +1052,7 @@ ClusterProxy::ClusterProxy(
BlobSpPool* blobSpPool,
mqbi::Dispatcher* dispatcher,
mqbnet::TransportManager* transportManager,
+ StopRequestManagerType* stopRequestsManager,
bslma::Allocator* allocator)
: d_allocator_p(allocator)
, d_isStarted(false)
@@ -1042,7 +1083,7 @@ ClusterProxy::ClusterProxy(
, d_clusterMonitor(&d_clusterData, &d_state, d_allocator_p)
, d_activeNodeLookupEventHandle()
, d_shutdownChain(d_allocator_p)
-, d_stopRequestsManager(&d_clusterData.requestManager(), d_allocator_p)
+, d_stopRequestsManager_p(stopRequestsManager)
{
// PRECONDITIONS
mqbnet::Cluster* netCluster_p = d_clusterData.membership().netCluster();
@@ -1109,7 +1150,8 @@ int ClusterProxy::start(BSLS_ANNOTATION_UNUSED bsl::ostream& errorDescription)
return 0;
}
-void ClusterProxy::initiateShutdown(const VoidFunctor& callback)
+void ClusterProxy::initiateShutdown(const VoidFunctor& callback,
+ bool supportShutdownV2)
{
// executed by *ANY* thread
@@ -1121,7 +1163,8 @@ void ClusterProxy::initiateShutdown(const VoidFunctor& callback)
dispatcher()->execute(
bdlf::BindUtil::bind(&ClusterProxy::initiateShutdownDispatched,
this,
- callback),
+ callback,
+ supportShutdownV2),
this);
dispatcher()->synchronize(this);
diff --git a/src/groups/mqb/mqbblp/mqbblp_clusterproxy.h b/src/groups/mqb/mqbblp/mqbblp_clusterproxy.h
index 4d41efbd3..2d81a926e 100644
--- a/src/groups/mqb/mqbblp/mqbblp_clusterproxy.h
+++ b/src/groups/mqb/mqbblp/mqbblp_clusterproxy.h
@@ -251,7 +251,8 @@ class ClusterProxy : public mqbc::ClusterStateObserver,
// execution of the shutdown callbacks
// from the client sessions.
- StopRequestManagerType d_stopRequestsManager;
+ // Should be part of 'ClusterResources'
+ StopRequestManagerType* d_stopRequestsManager_p;
// Request manager to send stop
// requests to connected proxies.
@@ -264,7 +265,8 @@ class ClusterProxy : public mqbc::ClusterStateObserver,
/// Initiate the shutdown of the cluster. The specified `callback` will
/// be called when the shutdown is completed. This routine is invoked
/// in the cluster-dispatcher thread.
- void initiateShutdownDispatched(const VoidFunctor& callback);
+ void initiateShutdownDispatched(const VoidFunctor& callback,
+ bool supportShutdownV2 = false);
/// Stop the `Cluster`.
void stopDispatched();
@@ -377,6 +379,7 @@ class ClusterProxy : public mqbc::ClusterStateObserver,
/// transmitted request.
void processResponse(const bmqp_ctrlmsg::ControlMessage& response)
BSLS_KEYWORD_OVERRIDE;
+ void processPeerStopResponse(const bmqp_ctrlmsg::ControlMessage& response);
void processPeerStopRequest(mqbnet::ClusterNode* clusterNode,
const bmqp_ctrlmsg::ControlMessage& request);
@@ -398,6 +401,7 @@ class ClusterProxy : public mqbc::ClusterStateObserver,
void
processResponseDispatched(const bmqp_ctrlmsg::ControlMessage& response);
+ // TODO(shutdown-v2): TEMPORARY, remove when all switch to StopRequest V2.
/// Send stop request to proxies specified in `sessions` using the
/// specified `stopCb` as a callback to be called once all the requests
/// get responses.
@@ -438,6 +442,7 @@ class ClusterProxy : public mqbc::ClusterStateObserver,
BlobSpPool* blobSpPool,
mqbi::Dispatcher* dispatcher,
mqbnet::TransportManager* transportManager,
+ StopRequestManagerType* stopRequestsManager,
bslma::Allocator* allocator);
/// Destructor
@@ -451,11 +456,16 @@ class ClusterProxy : public mqbc::ClusterStateObserver,
/// error.
int start(bsl::ostream& errorDescription) BSLS_KEYWORD_OVERRIDE;
- /// Initiate the shutdown of the cluster. It is expected that `stop()`
- /// will be called soon after this routine is invoked. Invoke the
- /// specified `callback` upon completion of (asynchronous) shutdown
- /// sequence.
- void initiateShutdown(const VoidFunctor& callback) BSLS_KEYWORD_OVERRIDE;
+ /// Initiate the shutdown of the cluster and invoke the specified
+ /// `callback` upon completion of (asynchronous) shutdown sequence. It
+ /// is expected that `stop()` will be called soon after this routine is
+ /// invoked. If the optional (temporary) specified 'supportShutdownV2' is
+ /// 'true' execute shutdown logic V2 where upstream (not downstream) nodes
+ /// deconfigure queues and the shutting down node (not downstream) wait
+ /// for CONFIRMS.
+ void
+ initiateShutdown(const VoidFunctor& callback,
+ bool supportShutdownV2 = false) BSLS_KEYWORD_OVERRIDE;
/// Stop the `Cluster`.
void stop() BSLS_KEYWORD_OVERRIDE;
diff --git a/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp b/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp
index a80aaffd5..d25d6ec68 100644
--- a/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp
+++ b/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp
@@ -173,6 +173,12 @@ void handleHolderDummy(const bsl::shared_ptr& handle)
handle->queue()->dispatcher()->inDispatcherThread(handle->queue()));
}
+void countUnconfirmed(bsls::Types::Int64* result, mqbi::Queue* queue)
+{
+ *result += queue->countUnconfirmed(
+ bmqp::QueueId::k_UNASSIGNED_SUBQUEUE_ID);
+}
+
} // close unnamed namespace
// -----------------------------------------
@@ -1438,6 +1444,9 @@ void ClusterQueueHelper::onReopenQueueResponse(
--d_numPendingReopenQueueRequests;
+ // Process Close request instead of parking it
+ sqit->value().d_state = SubQueueContext::k_CLOSED;
+
return; // RETURN
}
@@ -1468,7 +1477,8 @@ void ClusterQueueHelper::onReopenQueueResponse(
notifyQueue(queueContext.get(),
upstreamSubQueueId,
generationCount,
- false);
+ false,
+ false); // isWriterOnly
// No need to send a configure-queue request for this queue.
// Decrement the num pending reopen queue request counter though,
@@ -1746,7 +1756,8 @@ void ClusterQueueHelper::onConfigureQueueResponse(
notifyQueue(queueContext.get(),
itStream->subId(),
generationCount,
- true);
+ true,
+ false); // isWriterOnly
}
}
@@ -2013,7 +2024,8 @@ bool ClusterQueueHelper::createQueue(
notifyQueue(queueContext,
bmqp::QueueId::k_DEFAULT_SUBQUEUE_ID,
genCount,
- true);
+ true, // isOpen
+ true); // isWriterOnly
}
context.d_callback(status,
@@ -2625,7 +2637,8 @@ void ClusterQueueHelper::onGetQueueHandleDispatched(
void ClusterQueueHelper::notifyQueue(QueueContext* queueContext,
unsigned int upstreamSubQueueId,
bsls::Types::Uint64 generationCount,
- bool isOpen)
+ bool isOpen,
+ bool isWriterOnly)
{
mqbi::Queue* queue = queueContext->d_liveQInfo.d_queue_sp.get();
if (queue == 0) {
@@ -2644,7 +2657,8 @@ void ClusterQueueHelper::notifyQueue(QueueContext* queueContext,
bdlf::BindUtil::bind(&mqbi::Queue::onOpenUpstream,
queue,
generationCount,
- upstreamSubQueueId),
+ upstreamSubQueueId,
+ isWriterOnly),
queue);
}
}
@@ -2687,6 +2701,22 @@ void ClusterQueueHelper::configureQueueDispatched(
BSLS_ASSERT_SAFE(
d_cluster_p->dispatcher()->inDispatcherThread(d_cluster_p));
+ if (d_supportShutdownV2) {
+ BMQ_LOGTHROTTLE_INFO()
+ << d_cluster_p->description()
+ << ": Shutting down and skipping configure queue [: " << uri
+ << "], queueId: " << queueId
+ << ", stream parameters: " << streamParameters;
+ if (callback) {
+ bmqp_ctrlmsg::Status status;
+ status.category() = bmqp_ctrlmsg::StatusCategory::E_SUCCESS;
+ status.message() = "Shutting down.";
+ callback(status, streamParameters);
+ }
+
+ return; // RETURN
+ }
+
QueueContextMapIter queueContextIt = d_queues.find(uri);
if (queueContextIt == d_queues.end()) {
@@ -2835,6 +2865,23 @@ void ClusterQueueHelper::releaseQueueDispatched(
BSLS_ASSERT_SAFE(
d_cluster_p->dispatcher()->inDispatcherThread(d_cluster_p));
+ if (d_supportShutdownV2) {
+ BMQ_LOGTHROTTLE_INFO()
+ << d_cluster_p->description()
+ << ": Shutting down and skipping close queue [: "
+ << handleParameters.uri()
+ << "], queueId: " << handleParameters.qId()
+ << ", handle parameters: " << handleParameters;
+ if (callback) {
+ bmqp_ctrlmsg::Status status;
+ status.category() = bmqp_ctrlmsg::StatusCategory::E_SUCCESS;
+ status.message() = "Shutting down.";
+ callback(status);
+ }
+
+ return; // RETURN
+ }
+
bmqt::Uri uri(handleParameters.uri());
QueueContextMapIter queueContextIt = d_queues.find(uri.canonical());
if (queueContextIt == d_queues.end()) {
@@ -4485,6 +4532,7 @@ ClusterQueueHelper::ClusterQueueHelper(
, d_numPendingReopenQueueRequests(0)
, d_primaryNotLeaderAlarmRaised(false)
, d_stopContexts(allocator)
+, d_supportShutdownV2(false)
{
BSLS_ASSERT(
d_clusterData_p->clusterConfig()
@@ -5227,17 +5275,56 @@ void ClusterQueueHelper::processShutdownEvent()
<< "], queueKey [" << queueContextSp->key()
<< "] which was assigned to PartitionId ["
<< queueContextSp->partitionId()
- << "], because self is going down and this queue has no "
- << "handles.";
+ << "], because self is going down.";
deleteQueue(queueContextSp.get());
}
}
+/// Stop sending PUSHes but continue receiving CONFIRMs, receiving and
+/// sending PUTs and ACKs.
+void ClusterQueueHelper::requestToStopPushing()
+{
+ // executed by the cluster *DISPATCHER* thread
+
+ // PRECONDITIONS
+ BSLS_ASSERT_SAFE(
+ d_cluster_p->dispatcher()->inDispatcherThread(d_cluster_p));
+
+ // Assume Shutdown V2
+ d_supportShutdownV2 = true;
+
+ // Prevent future queue operations from sending PUSHes.
+ for (QueueContextMapIter it = d_queues.begin(); it != d_queues.end();
+ ++it) {
+ QueueContextSp& queueContextSp = it->second;
+ QueueLiveState& qinfo = queueContextSp->d_liveQInfo;
+ mqbi::Queue* queue = qinfo.d_queue_sp.get();
+
+ if (!queue) {
+ continue; // CONTINUE
+ }
+
+ queue->dispatcher()->execute(
+ bdlf::BindUtil::bind(&mqbi::Queue::stopPushing, queue),
+ queue);
+ }
+}
+
+void ClusterQueueHelper::contextHolder(
+ const bsl::shared_ptr& contextSp,
+ const VoidFunctor& action)
+{
+ if (action) {
+ action();
+ }
+ (void)contextSp;
+}
+
void ClusterQueueHelper::processNodeStoppingNotification(
mqbnet::ClusterNode* clusterNode,
const bmqp_ctrlmsg::ControlMessage* request,
- const bsl::vector* partitions,
+ mqbc::ClusterNodeSession* ns,
const VoidFunctor& callback)
{
// executed by the cluster *DISPATCHER* thread
@@ -5250,6 +5337,13 @@ void ClusterQueueHelper::processNodeStoppingNotification(
// The 'shared_ptr' serves as a reference count of all pending queue
// operations. Once all functors complete, the 'finishStopSequence'
// deleter sends back StopResponse.
+
+ // TODO(shutdown-v2): TEMPORARY, remove 'timeout' when all switch to
+ // StopRequest V2.
+
+ // No need to wait for CONFIRMs, the waiting is done by the shutting down
+ // node.
+
int timeout =
d_clusterData_p->clusterConfig().queueOperations().stopTimeoutMs();
@@ -5287,12 +5381,146 @@ void ClusterQueueHelper::processNodeStoppingNotification(
<< clusterNode->nodeDescription()
<< " with timeout (ms) " << timeout;
- // Self node needs to issue close-queue requests for all the queues for
- // which specified 'source' node is the primary.
+ // TODO(shutdown-v2): TEMPORARY, remove when all switch to StopRequest
+ // V2.
+ bool supportShutdownV2 = true;
- if (!d_cluster_p->isRemote() ||
- d_clusterData_p->electorInfo().leaderNode() == contextSp->d_peer) {
- deconfigureQueues(contextSp, partitions);
+ if (request) {
+ const bmqp_ctrlmsg::StopRequest& stopRequest =
+ request->choice().clusterMessage().choice().stopRequest();
+
+ if (stopRequest.version() == 1) {
+ supportShutdownV2 = false;
+ }
+ else {
+ BSLS_ASSERT_SAFE(stopRequest.version() == 2);
+ }
+ }
+ // StopRequests have replaced E_STOPPING advisory.
+ // In any case, do minimal (V2) work unless explicitly requested
+
+ if (supportShutdownV2) {
+ if (ns) {
+ // As an Upstream, deconfigure queues of the (shutting down)
+ // ClusterNodeSession 'ns'.
+ // Call 'mqbi::QueueHandle::deconfigureAll' for each handle
+
+ const mqbc::ClusterNodeSession::QueueHandleMap& handles =
+ ns->queueHandles();
+
+ for (mqbc::ClusterNodeSession::QueueHandleMap::const_iterator
+ cit = handles.begin();
+ cit != handles.end();
+ ++cit) {
+ cit->second.d_handle_p->deconfigureAll(
+ bdlf::BindUtil::bind(
+ &ClusterQueueHelper::contextHolder,
+ this,
+ contextSp,
+ VoidFunctor()));
+ }
+ BALL_LOG_INFO << d_clusterData_p->identity().description()
+ << ": deconfigured " << handles.size()
+ << " handles while processing StopRequest from "
+ << clusterNode->nodeDescription() << " "
+ << contextSp.numReferences();
+ }
+ // else, this is a ClusterProxy (downstream) receiving request from
+ // an upstream Cluster Node (a request from a Proxy would arrive to
+ // ClientSession).
+ // Downstreams do not deconfigure queues in V2.
+ // See comment in 'ClusterProxy::processPeerStopRequest'
+
+ // As a Downstream, notify relevant queues about their shutting
+ // down upstream
+ for (QueueContextMapConstIter cit = d_queues.begin();
+ cit != d_queues.end();
+ ++cit) {
+ const QueueContextSp& queueContextSp = cit->second;
+ const QueueLiveState& queueLiveState =
+ queueContextSp->d_liveQInfo;
+ mqbi::Queue* queue = queueLiveState.d_queue_sp.get();
+
+ if (0 == queue || bmqp::QueueId::k_UNASSIGNED_QUEUE_ID ==
+ queueContextSp->d_liveQInfo.d_id) {
+ continue; // CONTINUE
+ }
+
+ if (!d_cluster_p->isRemote()) {
+ const int pid = queueContextSp->partitionId();
+
+ BSLS_ASSERT_SAFE(ns);
+
+ const bsl::vector& partitions =
+ ns->primaryPartitions();
+ if (partitions.end() ==
+ bsl::find(partitions.begin(), partitions.end(), pid)) {
+ continue; // CONTINUE
+ }
+ const ClusterStatePartitionInfo& pinfo =
+ d_clusterState_p->partition(pid);
+
+ if (bmqp_ctrlmsg::PrimaryStatus::E_ACTIVE !=
+ pinfo.primaryStatus()) {
+ // It's possible for a primary node to be non-active
+ // when it is shutting down -- if it was stopped before
+ // the node had a chance to transition to active
+ // primary for this partition.
+
+ continue; // CONTINUE
+ }
+ BSLS_ASSERT(pinfo.primaryNode() == clusterNode);
+ }
+ else if (d_clusterData_p->electorInfo().leaderNode() !=
+ clusterNode) {
+ continue; // CONTINUE
+ }
+
+ if (queueLiveState.d_subQueueIds.findBySubIdSafe(
+ bmqp::QueueId::k_DEFAULT_SUBQUEUE_ID) ==
+ queueLiveState.d_subQueueIds.end()) {
+ // Only buffering PUTs. Still sending CONFIRMs
+ continue; // CONTINUE
+ }
+
+ VoidFunctor inner = bdlf::BindUtil::bind(
+ &mqbi::Queue::onOpenUpstream,
+ queue,
+ 0,
+ bmqp::QueueId::k_DEFAULT_SUBQUEUE_ID,
+ true);
+
+ VoidFunctor outer = bdlf::BindUtil::bind(
+ &ClusterQueueHelper::contextHolder,
+ this,
+ contextSp,
+ inner);
+
+ queue->dispatcher()->execute(
+ outer,
+ queue,
+ mqbi::DispatcherEventType::e_DISPATCHER);
+
+ // Use 'mqbi::DispatcherEventType::e_DISPATCHER' to avoid
+ // (re)enabling 'd_flushList'
+ }
+ }
+ else {
+ // TODO(shutdown-v2): TEMPORARY, remove when all switch to
+ // StopRequest V2.
+ // Downstreams do not need to deconfigure queues for which the
+ // shutting down node is the upstream. The deconfiguring is done
+ // by the upstream of the shutting down node instead.
+ // Nor do they need to wait for CONFIRMs, the waiting is done by
+ // the shutting down node.
+
+ if (ns) {
+ deconfigureQueues(contextSp, &ns->primaryPartitions());
+ }
+ else if (d_clusterData_p->electorInfo().leaderNode() ==
+ contextSp->d_peer) {
+ deconfigureQueues(contextSp, 0);
+ }
}
}
else {
@@ -5378,6 +5606,8 @@ void ClusterQueueHelper::deconfigureQueues(
const bsl::shared_ptr& contextSp,
const bsl::vector* partitions)
{
+ // TODO(shutdown-v2): TEMPORARY, remove when all switch to StopRequest V2.
+
// executed by the cluster *DISPATCHER* thread
// PRECONDITIONS
@@ -5440,7 +5670,8 @@ void ClusterQueueHelper::deconfigureQueues(
bdlf::BindUtil::bind(&mqbi::Queue::onOpenUpstream,
queue,
0,
- bmqp::QueueId::k_DEFAULT_SUBQUEUE_ID),
+ bmqp::QueueId::k_DEFAULT_SUBQUEUE_ID,
+ true), // isWriterOnly
queue);
queue->dispatcher()->synchronize(queue);
@@ -5659,6 +5890,92 @@ void ClusterQueueHelper::checkUnconfirmed(
queueSp.get());
}
+void ClusterQueueHelper::checkUnconfirmedV2(
+ const bsls::TimeInterval& whenToStop,
+ const bsl::function& completionCallback)
+{
+ d_cluster_p->dispatcher()->execute(
+ bdlf::BindUtil::bind(&ClusterQueueHelper::checkUnconfirmedV2Dispatched,
+ this,
+ whenToStop,
+ completionCallback),
+ d_cluster_p);
+}
+
+void ClusterQueueHelper::checkUnconfirmedV2Dispatched(
+ const bsls::TimeInterval& whenToStop,
+ const bsl::function& completionCallback)
+{
+ // executed by the cluster *DISPATCHER* thread
+
+ // PRECONDITIONS
+ BSLS_ASSERT_SAFE(
+ d_cluster_p->dispatcher()->inDispatcherThread(d_cluster_p));
+
+ bsls::Types::Int64 result = 0;
+ for (QueueContextMapIter it = d_queues.begin(); it != d_queues.end();
+ ++it) {
+ QueueContextSp& queueContextSp = it->second;
+ QueueLiveState& qinfo = queueContextSp->d_liveQInfo;
+ mqbi::Queue* queue = qinfo.d_queue_sp.get();
+
+ if (!queue) {
+ continue; // CONTINUE
+ }
+
+ queue->dispatcher()->execute(
+ bdlf::BindUtil::bind(&countUnconfirmed, &result, queue),
+ queue);
+ queue->dispatcher()->synchronize(queue);
+ }
+
+ // Synchronize with all Queue Dispatcher threads
+ bslmt::Latch latch(1);
+ d_cluster_p->dispatcher()->execute(
+ mqbi::Dispatcher::ProcessorFunctor(), // empty
+ mqbi::DispatcherClientType::e_QUEUE,
+ bdlf::BindUtil::bind(&bslmt::Latch::arrive, &latch));
+
+ latch.wait();
+
+ if (result == 0) {
+ BALL_LOG_INFO << d_cluster_p->description()
+ << ": no unconfirmed message(s)";
+
+ completionCallback();
+ return;
+ }
+
+ bsls::TimeInterval t = bsls::SystemTime::now(
+ bsls::SystemClockType::e_MONOTONIC);
+
+ if (t < whenToStop) {
+ BALL_LOG_INFO << d_cluster_p->description() << ": waiting for "
+ << result << " unconfirmed message(s)";
+
+ t.addSeconds(1);
+ if (t > whenToStop) {
+ t = whenToStop;
+ }
+ bdlmt::EventScheduler::EventHandle eventHandle;
+ // Never cancel the timer
+ d_clusterData_p->scheduler().scheduleEvent(
+ &eventHandle,
+ t,
+ bdlf::BindUtil::bind(&ClusterQueueHelper::checkUnconfirmedV2,
+ this,
+ whenToStop,
+ completionCallback));
+
+ return; // RETURN
+ }
+ else {
+ BALL_LOG_WARN << d_cluster_p->description() << ": giving up on "
+ << result << " unconfirmed message(s)";
+ completionCallback();
+ }
+}
+
void ClusterQueueHelper::checkUnconfirmedQueueDispatched(
const bsl::shared_ptr& contextSp,
const QueueContextSp& queueContextSp,
diff --git a/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.h b/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.h
index fd28eb0fa..bbc42d362 100644
--- a/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.h
+++ b/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.h
@@ -166,6 +166,8 @@ class ClusterQueueHelper : public mqbc::ClusterStateObserver,
// State of the upstream
bdlmt::EventScheduler::EventHandle d_timer;
+ // TODO(shutdown-v2): TEMPORARY, remove when all switch to StopRequest
+ // V2.
// (timer handle 1s) when waiting for
// unconfirmed. This is to cancel the timer in
// the case when this broker stops while
@@ -483,6 +485,10 @@ class ClusterQueueHelper : public mqbc::ClusterStateObserver,
StopContexts d_stopContexts;
+ /// When `true`, all cluster nodes support StopRequest V2 and this node
+ /// executes shutdown V2 logic.
+ bool d_supportShutdownV2;
+
private:
// PRIVATE MANIPULATORS
@@ -755,7 +761,8 @@ class ClusterQueueHelper : public mqbc::ClusterStateObserver,
void notifyQueue(QueueContext* queueContext,
unsigned int upstreamSubQueueId,
bsls::Types::Uint64 generationCount,
- bool isOpen);
+ bool isOpen,
+ bool isWriterOnly = false);
void configureQueueDispatched(
const bmqt::Uri& uri,
@@ -851,6 +858,7 @@ class ClusterQueueHelper : public mqbc::ClusterStateObserver,
void deconfigureQueue(const bsl::shared_ptr& contextSp,
const QueueContextSp& queueContextSp);
+ // TODO(shutdown-v2): TEMPORARY, remove when all switch to StopRequest V2.
/// Second step of StopRequest / CLOSING node advisory processing
/// (after de-configure response). Start timer to wait the configured
/// `stopTimeoutMs` is there are any pending PUSH messages to collect
@@ -868,6 +876,7 @@ class ClusterQueueHelper : public mqbc::ClusterStateObserver,
unsigned int subId,
bsls::TimeInterval& t);
+ // TODO(shutdown-v2): TEMPORARY, remove when all switch to StopRequest V2.
void checkUnconfirmed(const bsl::shared_ptr& contextSp,
const QueueContextSp& queueContextSp,
unsigned int subId);
@@ -878,6 +887,10 @@ class ClusterQueueHelper : public mqbc::ClusterStateObserver,
const QueueContextSp& queueContextSp,
unsigned int subId);
+ void checkUnconfirmedV2Dispatched(
+ const bsls::TimeInterval& whenToStop,
+ const bsl::function& completionCallback);
+
void
waitForUnconfirmedDispatched(const bsl::shared_ptr& contextSp,
const QueueContextSp& queueContextSp,
@@ -902,6 +915,9 @@ class ClusterQueueHelper : public mqbc::ClusterStateObserver,
/// Send StopResponse to the request in the specified 'context.
void finishStopSequenceDispatched(StopContext* context);
+ void contextHolder(const bsl::shared_ptr& contextSp,
+ const VoidFunctor& action);
+
// PRIVATE ACCESSORS
/// Return true if for the specified `partitionId`, there is currently a
@@ -1079,6 +1095,13 @@ class ClusterQueueHelper : public mqbc::ClusterStateObserver,
/// Delete and unregister all queues which have no clients.
void processShutdownEvent();
+ /// Stop sending PUSHes but continue receiving CONFIRMs, receiving and
+ /// sending PUTs and ACKs.
+ void requestToStopPushing();
+
+ void checkUnconfirmedV2(const bsls::TimeInterval& whenToStop,
+ const bsl::function& completionCallback);
+
/// Garbage-collect all queues which meet the criteria, and have
/// expired. If the optionally specified `immediate` flag is true,
/// delete the qualified queues immediately instead of marking them for
@@ -1109,8 +1132,8 @@ class ClusterQueueHelper : public mqbc::ClusterStateObserver,
void processNodeStoppingNotification(
mqbnet::ClusterNode* clusterNode,
const bmqp_ctrlmsg::ControlMessage* request,
- const bsl::vector* partitions = 0,
- const VoidFunctor& callback = VoidFunctor());
+ mqbc::ClusterNodeSession* ns,
+ const VoidFunctor& callback = VoidFunctor());
void onLeaderAvailable();
// Called upon leader becoming available.
diff --git a/src/groups/mqb/mqbblp/mqbblp_queue.cpp b/src/groups/mqb/mqbblp/mqbblp_queue.cpp
index 19606582d..1834eeb35 100644
--- a/src/groups/mqb/mqbblp/mqbblp_queue.cpp
+++ b/src/groups/mqb/mqbblp/mqbblp_queue.cpp
@@ -576,7 +576,8 @@ void Queue::onOpenFailure(unsigned int subQueueId)
}
void Queue::onOpenUpstream(bsls::Types::Uint64 genCount,
- unsigned int subQueueId)
+ unsigned int subQueueId,
+ bool isWriterOnly)
{
// executed by the *QUEUE* dispatcher thread
@@ -584,7 +585,7 @@ void Queue::onOpenUpstream(bsls::Types::Uint64 genCount,
BSLS_ASSERT_SAFE(dispatcher()->inDispatcherThread(this));
if (d_remoteQueue_mp) {
- d_remoteQueue_mp->onOpenUpstream(genCount, subQueueId);
+ d_remoteQueue_mp->onOpenUpstream(genCount, subQueueId, isWriterOnly);
}
}
@@ -926,6 +927,11 @@ bsls::Types::Int64 Queue::countUnconfirmed(unsigned int subId)
// PRECONDITIONS
BSLS_ASSERT_SAFE(dispatcher()->inDispatcherThread(this));
+ if (subId == bmqp::QueueId::k_UNASSIGNED_SUBQUEUE_ID) {
+ return d_state.handleCatalog().countUnconfirmed(); // RETURN
+ }
+
+ // TODO(shutdown-v2): TEMPORARY, remove when all switch to StopRequest V2.
struct local {
static void sum(bsls::Types::Int64* sum,
mqbi::QueueHandle* handle,
@@ -949,5 +955,10 @@ bsls::Types::Int64 Queue::countUnconfirmed(unsigned int subId)
return result;
}
+void Queue::stopPushing()
+{
+ queueEngine()->resetState(true); // isShuttingDown
+}
+
} // close package namespace
} // close enterprise namespace
diff --git a/src/groups/mqb/mqbblp/mqbblp_queue.h b/src/groups/mqb/mqbblp/mqbblp_queue.h
index 29411db51..20abac452 100644
--- a/src/groups/mqb/mqbblp/mqbblp_queue.h
+++ b/src/groups/mqb/mqbblp/mqbblp_queue.h
@@ -265,6 +265,10 @@ class Queue BSLS_CPP11_FINAL : public mqbi::Queue {
bsls::Types::Int64
countUnconfirmed(unsigned int subId) BSLS_KEYWORD_OVERRIDE;
+ /// Stop sending PUSHes but continue receiving CONFIRMs, receiving and
+ /// sending PUTs and ACKs.
+ void stopPushing() BSLS_KEYWORD_OVERRIDE;
+
void onPushMessage(
const bmqt::MessageGUID& msgGUID,
const bsl::shared_ptr& appData,
@@ -311,10 +315,13 @@ class Queue BSLS_CPP11_FINAL : public mqbi::Queue {
/// and current upstream `genCount`, then the PUT message gets dropped
/// to avoid out of order PUTs. If the `upstreamSubQueueId` is
/// `k_ANY_SUBQUEUE_ID`, all SubQueues are reopen.
+ /// If the optionally specified isWriterOnly is true, ignore CONFIRMs. This
+ /// should be specified if the upstream is stopping.
///
/// THREAD: This method is called from the Queue's dispatcher thread.
void onOpenUpstream(bsls::Types::Uint64 genCount,
- unsigned int upstreamSubQueueId) BSLS_KEYWORD_OVERRIDE;
+ unsigned int upstreamSubQueueId,
+ bool isWriterOnly = false) BSLS_KEYWORD_OVERRIDE;
/// Notify the (remote) queue about reopen failure. The queue NACKs all
/// pending and incoming PUTs and drops CONFIRMs related to to the
diff --git a/src/groups/mqb/mqbblp/mqbblp_queueengineutil.cpp b/src/groups/mqb/mqbblp/mqbblp_queueengineutil.cpp
index e9ab00da5..61617db15 100644
--- a/src/groups/mqb/mqbblp/mqbblp_queueengineutil.cpp
+++ b/src/groups/mqb/mqbblp/mqbblp_queueengineutil.cpp
@@ -875,8 +875,10 @@ QueueEngineUtil_AppState::QueueEngineUtil_AppState(
QueueEngineUtil_AppState::~QueueEngineUtil_AppState()
{
// PRECONDITIONS
- BSLS_ASSERT_SAFE(!hasConsumers());
BSLS_ASSERT_SAFE(!d_throttleEventHandle);
+
+ // In the case of `convertToLocal`, the new `RootQueueEngine` can reuse the
+ // existing `RelayQueueEngine` routing contexts.
}
size_t
@@ -1269,7 +1271,7 @@ void QueueEngineUtil_AppState::cancelThrottle()
}
}
-void QueueEngineUtil_AppState::reset()
+void QueueEngineUtil_AppState::undoRouting()
{
d_priorityCount = 0;
cancelThrottle();
@@ -1286,7 +1288,7 @@ void QueueEngineUtil_AppState::rebuildConsumers(
{
// Rebuild ConsumersState for this app
// Prepare the app for rebuilding consumers
- reset();
+ undoRouting();
bsl::shared_ptr previous = d_routing_sp;
d_routing_sp = replacement;
diff --git a/src/groups/mqb/mqbblp/mqbblp_queueengineutil.h b/src/groups/mqb/mqbblp/mqbblp_queueengineutil.h
index eb810de86..2912b1494 100644
--- a/src/groups/mqb/mqbblp/mqbblp_queueengineutil.h
+++ b/src/groups/mqb/mqbblp/mqbblp_queueengineutil.h
@@ -410,7 +410,7 @@ struct QueueEngineUtil_AppState {
bool isExpired);
/// Reset the internal state to have no consumers.
- void reset();
+ void undoRouting();
/// Deliver all messages in the storage to the consumer represented by
/// this instance. Load the message delay into the specified `delay`.
diff --git a/src/groups/mqb/mqbblp/mqbblp_queuehandle.cpp b/src/groups/mqb/mqbblp/mqbblp_queuehandle.cpp
index 2ae391ea6..ffcbeda0a 100644
--- a/src/groups/mqb/mqbblp/mqbblp_queuehandle.cpp
+++ b/src/groups/mqb/mqbblp/mqbblp_queuehandle.cpp
@@ -659,7 +659,7 @@ void QueueHandle::registerSubscription(unsigned int downstreamSubId,
// Ceil the limits values, so that if max redeliveries is 1, it will
// compute ok
const bsls::Types::Int64 lowWatermarkBytes =
- static_cast(
+ static_cast(
bsl::ceil(ci.maxUnconfirmedBytes() * k_WATERMARK_RATIO));
// We only care about whether we are at or above the `capacity`
@@ -1000,7 +1000,11 @@ void QueueHandle::deconfigureAll(
bdlf::BindUtil::bind(&QueueHandle::deconfigureDispatched,
this,
deconfiguredCb),
- d_queue_sp.get());
+ d_queue_sp.get(),
+ mqbi::DispatcherEventType::e_DISPATCHER);
+
+ // Use 'mqbi::DispatcherEventType::e_DISPATCHER' to avoid (re)enabling
+ // 'd_flushList'
}
void QueueHandle::deconfigureDispatched(
diff --git a/src/groups/mqb/mqbblp/mqbblp_queuehandlecatalog.cpp b/src/groups/mqb/mqbblp/mqbblp_queuehandlecatalog.cpp
index c23e4ae82..43b2ff6ee 100644
--- a/src/groups/mqb/mqbblp/mqbblp_queuehandlecatalog.cpp
+++ b/src/groups/mqb/mqbblp/mqbblp_queuehandlecatalog.cpp
@@ -122,7 +122,7 @@ void QueueHandleCatalog::queueHandleDeleter(mqbi::QueueHandle* handle)
QueueHandleCatalog::QueueHandleCatalog(mqbi::Queue* queue,
bslma::Allocator* allocator)
: d_queue_p(queue)
-, d_handleFactory_mp(new (*allocator) DefaultHandleFactory(), allocator)
+, d_handleFactory_mp(new(*allocator) DefaultHandleFactory(), allocator)
, d_handles(allocator)
, d_allocator_p(allocator)
{
@@ -364,5 +364,25 @@ void QueueHandleCatalog::loadInternals(
}
}
+bsls::Types::Int64 QueueHandleCatalog::countUnconfirmed() const
+{
+ // executed by the *QUEUE* dispatcher thread
+
+ // PRECONDITIONS
+ BSLS_ASSERT_SAFE(d_queue_p->dispatcher()->inDispatcherThread(d_queue_p));
+
+ bsls::Types::Int64 result = 0;
+
+ for (HandleMap::const_iterator cit = d_handles.begin();
+ cit != d_handles.end();
+ ++cit) {
+ const mqbi::QueueHandle* handle(cit->value().get());
+
+ result += handle->countUnconfirmed(
+ bmqp::QueueId::k_UNASSIGNED_SUBQUEUE_ID);
+ }
+ return result;
+}
+
} // close package namespace
} // close enterprise namespace
diff --git a/src/groups/mqb/mqbblp/mqbblp_queuehandlecatalog.h b/src/groups/mqb/mqbblp/mqbblp_queuehandlecatalog.h
index dfe44cbc1..47dad6cd5 100644
--- a/src/groups/mqb/mqbblp/mqbblp_queuehandlecatalog.h
+++ b/src/groups/mqb/mqbblp/mqbblp_queuehandlecatalog.h
@@ -30,9 +30,9 @@
// must be executed by the dispatcher thread of the associated queue.
// MQB
-
#include
#include
+#include
// BMQ
#include
@@ -170,7 +170,8 @@ class QueueHandleCatalog {
// CREATOR
/// Create a new object associated to the specified `queue`. Use the
- /// specified `allocator` for any memory allocations.
+ /// specified `allocator` for any memory allocations. Use the specified
+ /// 'counter' to aggregate the counting of unconfirmed by each handle.
QueueHandleCatalog(mqbi::Queue* queue, bslma::Allocator* allocator);
/// Destructor.
@@ -240,6 +241,8 @@ class QueueHandleCatalog {
/// Load into the specified `out` list the internal details about the
/// handles managed by this catalog.
void loadInternals(bsl::vector* out) const;
+
+ bsls::Types::Int64 countUnconfirmed() const;
};
// ============================================================================
diff --git a/src/groups/mqb/mqbblp/mqbblp_queuestate.h b/src/groups/mqb/mqbblp/mqbblp_queuestate.h
index 3ac02ce32..2366c66ff 100644
--- a/src/groups/mqb/mqbblp/mqbblp_queuestate.h
+++ b/src/groups/mqb/mqbblp/mqbblp_queuestate.h
@@ -203,7 +203,9 @@ class QueueState {
/// Create a new `QueueState` associated to the specified `queue` and
/// having the specified `uri`, `id`, `key`, `partitionId` and `domain`.
- /// Use the specified `allocator` for any memory allocations.
+ /// Use the specified `allocator` for any memory allocations. Use the
+ /// specified 'unconfirmedCounter' to aggregate the counting of
+ /// unconfirmed by each queue handle.
QueueState(mqbi::Queue* queue,
const bmqt::Uri& uri,
unsigned int id,
diff --git a/src/groups/mqb/mqbblp/mqbblp_relayqueueengine.cpp b/src/groups/mqb/mqbblp/mqbblp_relayqueueengine.cpp
index 6e7b7fafd..ff10791f1 100644
--- a/src/groups/mqb/mqbblp/mqbblp_relayqueueengine.cpp
+++ b/src/groups/mqb/mqbblp/mqbblp_relayqueueengine.cpp
@@ -810,7 +810,7 @@ void RelayQueueEngine::applyConfiguration(App_State& app,
BSLS_ASSERT_SAFE(d_queueState_p->queue()->dispatcher()->inDispatcherThread(
d_queueState_p->queue()));
- app.reset();
+ app.undoRouting();
app.d_routing_sp = context.d_routing_sp;
@@ -909,15 +909,18 @@ int RelayQueueEngine::configure(
return 0;
}
-void RelayQueueEngine::resetState()
+void RelayQueueEngine::resetState(bool isShuttingDown)
{
- d_self.reset(this);
-
for (AppsMap::iterator it = d_apps.begin(); it != d_apps.end(); ++it) {
- it->second->reset();
- it->second->d_routing_sp.reset();
+ it->second->undoRouting();
+ if (isShuttingDown) {
+ it->second->d_routing_sp->reset();
+ }
+ // else, keep the routing which new engine can reuse
+ }
+ if (!isShuttingDown) {
+ d_apps.clear();
}
- d_apps.clear();
}
int RelayQueueEngine::rebuildInternalState(
diff --git a/src/groups/mqb/mqbblp/mqbblp_relayqueueengine.h b/src/groups/mqb/mqbblp/mqbblp_relayqueueengine.h
index bb67b825f..3f306cf24 100644
--- a/src/groups/mqb/mqbblp/mqbblp_relayqueueengine.h
+++ b/src/groups/mqb/mqbblp/mqbblp_relayqueueengine.h
@@ -376,8 +376,10 @@ class RelayQueueEngine : public mqbi::QueueEngine {
virtual int
configure(bsl::ostream& errorDescription) BSLS_KEYWORD_OVERRIDE;
- /// Reset the internal state of this engine.
- virtual void resetState() BSLS_KEYWORD_OVERRIDE;
+ /// Reset the internal state of this engine. If the optionally specified
+ /// 'keepConfirming' is 'true', keep the data structures for CONFIRMs
+ /// processing.
+ virtual void resetState(bool isShuttingDown = false) BSLS_KEYWORD_OVERRIDE;
/// Rebuild the internal state of this engine. This method is invoked
/// when the queue this engine is associated with is created from an
diff --git a/src/groups/mqb/mqbblp/mqbblp_remotequeue.cpp b/src/groups/mqb/mqbblp/mqbblp_remotequeue.cpp
index fc11d89a4..7df2d78e6 100644
--- a/src/groups/mqb/mqbblp/mqbblp_remotequeue.cpp
+++ b/src/groups/mqb/mqbblp/mqbblp_remotequeue.cpp
@@ -523,6 +523,7 @@ RemoteQueue::RemoteQueue(QueueState* state,
, d_unackedPutCounter(0)
, d_subStreams(allocator)
, d_statePool_p(statePool)
+, d_producerState()
, d_allocator_p(allocator)
{
// PRECONDITIONS
@@ -948,8 +949,7 @@ void RemoteQueue::postMessage(const bmqp::PutHeader& putHeaderIn,
return; // RETURN
}
- SubStreamContext& ctx = subStreamContext(
- bmqp::QueueId::k_DEFAULT_SUBQUEUE_ID);
+ SubStreamContext& ctx = d_producerState;
if (ctx.d_state == SubStreamContext::e_NONE) {
BALL_LOG_WARN << "#CLIENT_IMPROPER_BEHAVIOR " << d_state_p->uri()
@@ -1590,6 +1590,9 @@ void RemoteQueue::onOpenFailure(unsigned int upstreamSubQueueId)
ctx.d_state = SubStreamContext::e_CLOSED;
ctx.d_genCount = 0;
+ d_producerState.d_state = SubStreamContext::e_CLOSED;
+ d_producerState.d_genCount = 0;
+
if (upstreamSubQueueId == bmqp::QueueId::k_DEFAULT_SUBQUEUE_ID) {
size_t numMessages = d_pendingMessages.size();
if (numMessages) {
@@ -1669,11 +1672,15 @@ void RemoteQueue::onLostUpstream()
i->d_genCount = 0;
}
+ d_producerState.d_state = SubStreamContext::e_STOPPED;
+ d_producerState.d_genCount = 0;
+
BALL_LOG_INFO << d_state_p->uri() << ": has lost the upstream";
}
void RemoteQueue::onOpenUpstream(bsls::Types::Uint64 genCount,
- unsigned int upstreamSubQueueId)
+ unsigned int upstreamSubQueueId,
+ bool isWriterOnly)
{
// executed by the *DISPATCHER* thread
@@ -1681,7 +1688,9 @@ void RemoteQueue::onOpenUpstream(bsls::Types::Uint64 genCount,
BSLS_ASSERT_SAFE(d_state_p->queue()->dispatcher()->inDispatcherThread(
d_state_p->queue()));
- SubStreamContext& ctx = subStreamContext(upstreamSubQueueId);
+ SubStreamContext& ctx = isWriterOnly
+ ? d_producerState
+ : subStreamContext(upstreamSubQueueId);
if (genCount == 0) {
// This is a result of StopRequest processing.
@@ -1689,7 +1698,7 @@ void RemoteQueue::onOpenUpstream(bsls::Types::Uint64 genCount,
// Until then, we buffer.
if (ctx.d_state == SubStreamContext::e_OPENED) {
if (upstreamSubQueueId == bmqp::QueueId::k_DEFAULT_SUBQUEUE_ID) {
- if (d_state_p->hasMultipleSubStreams()) {
+ if (isWriterOnly) {
BALL_LOG_INFO << d_state_p->uri()
<< ": buffering PUTs with generation count "
<< ctx.d_genCount
@@ -1700,6 +1709,8 @@ void RemoteQueue::onOpenUpstream(bsls::Types::Uint64 genCount,
<< ": buffering PUTs and CONFIRMs with"
<< " generation count " << ctx.d_genCount
<< " because upstream is stopping.";
+ d_producerState.d_state = SubStreamContext::e_STOPPED;
+ d_producerState.d_genCount = 0;
}
}
else {
@@ -1734,6 +1745,9 @@ void RemoteQueue::onOpenUpstream(bsls::Types::Uint64 genCount,
ctx.d_state = SubStreamContext::e_OPENED;
if (upstreamSubQueueId == bmqp::QueueId::k_DEFAULT_SUBQUEUE_ID) {
+ d_producerState.d_genCount = genCount;
+ d_producerState.d_state = SubStreamContext::e_OPENED;
+
retransmitPendingMessagesDispatched(genCount);
}
retransmitPendingConfirmsDispatched(upstreamSubQueueId);
diff --git a/src/groups/mqb/mqbblp/mqbblp_remotequeue.h b/src/groups/mqb/mqbblp/mqbblp_remotequeue.h
index 177c95b03..7d0360c41 100644
--- a/src/groups/mqb/mqbblp/mqbblp_remotequeue.h
+++ b/src/groups/mqb/mqbblp/mqbblp_remotequeue.h
@@ -255,6 +255,10 @@ class RemoteQueue {
StateSpPool* d_statePool_p;
+ SubStreamContext d_producerState;
+ // To discern consumer and producer which share the same
+ // `k_DEFAULT_SUBQUEUE_ID` in the priority mode.
+
bslma::Allocator* d_allocator_p;
// Allocator to use
private:
@@ -454,10 +458,13 @@ class RemoteQueue {
/// and current upstream `genCount`, then the PUT message gets dropped
/// to avoid out of order PUTs. If the `upstreamSubQueueId` is
/// `k_ANY_SUBQUEUE_ID`, all SubQueues are reopen.
+ /// If the optionally specified isWriterOnly is true, ignore CONFIRMs. This
+ /// should be specified if the upstream is stopping.
///
/// THREAD: This method is called from the Queue's dispatcher thread.
void onOpenUpstream(bsls::Types::Uint64 genCount,
- unsigned int upstreamSubQueueId);
+ unsigned int upstreamSubQueueId,
+ bool isWriterOnly = false);
/// Notify the (remote) queue about reopen failure. The queue NACKs all
/// pending and incoming PUTs and drops CONFIRMs related to to the
diff --git a/src/groups/mqb/mqbblp/mqbblp_rootqueueengine.cpp b/src/groups/mqb/mqbblp/mqbblp_rootqueueengine.cpp
index 2e5b853a7..c7fc0146a 100644
--- a/src/groups/mqb/mqbblp/mqbblp_rootqueueengine.cpp
+++ b/src/groups/mqb/mqbblp/mqbblp_rootqueueengine.cpp
@@ -425,10 +425,18 @@ int RootQueueEngine::initializeAppId(const bsl::string& appId,
return 0;
}
-void RootQueueEngine::resetState()
+void RootQueueEngine::resetState(bool isShuttingDown)
{
+ for (Apps::iterator it = d_apps.begin(); it != d_apps.end(); ++it) {
+ it->value()->undoRouting();
+ it->value()->d_routing_sp->reset();
+ }
+
d_consumptionMonitor.reset();
- d_apps.clear();
+
+ if (!isShuttingDown) {
+ d_apps.clear();
+ }
}
void RootQueueEngine::rebuildSelectedApp(
@@ -885,7 +893,7 @@ void RootQueueEngine::configureHandle(
const AppStateSp& affectedApp = iter->value();
// prepare the App for rebuilding consumers
- affectedApp->reset();
+ affectedApp->undoRouting();
// Rebuild the highest priority state for all affected apps.
diff --git a/src/groups/mqb/mqbblp/mqbblp_rootqueueengine.h b/src/groups/mqb/mqbblp/mqbblp_rootqueueengine.h
index 8e7598247..4fe859db8 100644
--- a/src/groups/mqb/mqbblp/mqbblp_rootqueueengine.h
+++ b/src/groups/mqb/mqbblp/mqbblp_rootqueueengine.h
@@ -258,8 +258,10 @@ class RootQueueEngine BSLS_KEYWORD_FINAL : public mqbi::QueueEngine {
virtual int
configure(bsl::ostream& errorDescription) BSLS_KEYWORD_OVERRIDE;
- /// Reset the internal state of this engine.
- virtual void resetState() BSLS_KEYWORD_OVERRIDE;
+ /// Reset the internal state of this engine. If the optionally specified
+ /// 'keepConfirming' is 'true', keep the data structures for CONFIRMs
+ /// processing.
+ virtual void resetState(bool isShuttingDown = false) BSLS_KEYWORD_OVERRIDE;
/// Rebuild the internal state of this engine. This method is invoked
/// when the queue this engine is associated with is created from an
diff --git a/src/groups/mqb/mqbblp/mqbblp_routers.h b/src/groups/mqb/mqbblp/mqbblp_routers.h
index a7d814294..056f36193 100644
--- a/src/groups/mqb/mqbblp/mqbblp_routers.h
+++ b/src/groups/mqb/mqbblp/mqbblp_routers.h
@@ -842,7 +842,7 @@ inline Routers::QueueRoutingContext::QueueRoutingContext(
: d_expressions(allocator)
, d_nextSubscriptionId(0)
, d_groupIds(allocator)
-, d_preader(new (*allocator) MessagePropertiesReader(schemaLearner, allocator),
+, d_preader(new(*allocator) MessagePropertiesReader(schemaLearner, allocator),
allocator)
, d_evaluationContext(0, allocator)
, d_allocator_p(allocator)
diff --git a/src/groups/mqb/mqbc/mqbc_clusterstateledgerutil.cpp b/src/groups/mqb/mqbc/mqbc_clusterstateledgerutil.cpp
index 9322c5603..b0ef644e2 100644
--- a/src/groups/mqb/mqbc/mqbc_clusterstateledgerutil.cpp
+++ b/src/groups/mqb/mqbc/mqbc_clusterstateledgerutil.cpp
@@ -142,7 +142,7 @@ int ClusterStateLedgerUtil::validateFileHeader(
const ClusterStateFileHeader& header,
const mqbu::StorageKey& expectedLogId)
{
- if (static_cast(header.protocolVersion()) !=
+ if (static_cast(header.protocolVersion()) !=
ClusterStateLedgerProtocol::k_VERSION) {
return ClusterStateLedgerUtilRc::e_INVALID_PROTOCOL_VERSION; // RETURN
}
diff --git a/src/groups/mqb/mqbi/mqbi_cluster.h b/src/groups/mqb/mqbi/mqbi_cluster.h
index 7cc16a1f1..0ab4d05c9 100644
--- a/src/groups/mqb/mqbi/mqbi_cluster.h
+++ b/src/groups/mqb/mqbi/mqbi_cluster.h
@@ -266,8 +266,12 @@ class Cluster : public DispatcherClient {
/// Initiate the shutdown of the cluster and invoke the specified
/// `callback` upon completion of (asynchronous) shutdown sequence. It
/// is expected that `stop()` will be called soon after this routine is
- /// invoked.
- virtual void initiateShutdown(const VoidFunctor& callback) = 0;
+ /// invoked. If the optional (temporary) specified 'supportShutdownV2' is
+ /// 'true' execute shutdown logic V2 where upstream (not downstream) nodes
+ /// deconfigure queues and the shutting down node (not downstream) wait
+ /// for CONFIRMS.
+ virtual void initiateShutdown(const VoidFunctor& callback,
+ bool supportShutdownV2 = false) = 0;
/// Stop the `Cluster`; this is the counterpart of the `start()`
/// operation.
diff --git a/src/groups/mqb/mqbi/mqbi_queue.h b/src/groups/mqb/mqbi/mqbi_queue.h
index 26547f5fd..2ae5d34c4 100644
--- a/src/groups/mqb/mqbi/mqbi_queue.h
+++ b/src/groups/mqb/mqbi/mqbi_queue.h
@@ -803,6 +803,10 @@ class Queue : public DispatcherClient {
/// `specified `subId'.
virtual bsls::Types::Int64 countUnconfirmed(unsigned int subId) = 0;
+ /// Stop sending PUSHes but continue receiving CONFIRMs, receiving and
+ /// sending PUTs and ACKs.
+ virtual void stopPushing() = 0;
+
/// Called when a message with the specified `msgGUID`, `appData`,
/// `options`, `compressionAlgorithmType` payload is pushed to this
/// queue. Note that depending upon the location of the queue instance,
@@ -852,10 +856,13 @@ class Queue : public DispatcherClient {
/// and current upstream `genCount`, then the PUT message gets dropped
/// to avoid out of order PUTs. If the `upstreamSubQueueId` is
/// `k_ANY_SUBQUEUE_ID`, all SubQueues are reopen.
+ /// If the optionally specified isWriterOnly is true, ignore CONFIRMs. This
+ /// should be specified if the upstream is stopping.
///
/// THREAD: This method is called from the Queue's dispatcher thread.
virtual void onOpenUpstream(bsls::Types::Uint64 genCount,
- unsigned int upstreamSubQueueId) = 0;
+ unsigned int upstreamSubQueueId,
+ bool isWriterOnly = false) = 0;
/// Notify the (remote) queue about (re)open failure. The queue NACKs
/// all pending and incoming PUTs and drops CONFIRMs related to to the
diff --git a/src/groups/mqb/mqbi/mqbi_queueengine.h b/src/groups/mqb/mqbi/mqbi_queueengine.h
index b6400d43c..ee3944f85 100644
--- a/src/groups/mqb/mqbi/mqbi_queueengine.h
+++ b/src/groups/mqb/mqbi/mqbi_queueengine.h
@@ -74,8 +74,10 @@ class QueueEngine {
/// otherwise and populate the specified `errorDescription`.
virtual int configure(bsl::ostream& errorDescription) = 0;
- /// Reset the internal state of this engine.
- virtual void resetState() = 0;
+ /// Reset the internal state of this engine. If the optionally specified
+ /// 'isShuttingDown' is 'true', clear the routing state but keep the Apps
+ /// state for CONFIRMs processing.
+ virtual void resetState(bool isShuttingDown = false) = 0;
/// Rebuild the internal state of this engine. This method is invoked
/// when the queue this engine is associated with is created from an
diff --git a/src/groups/mqb/mqbmock/mqbmock_cluster.cpp b/src/groups/mqb/mqbmock/mqbmock_cluster.cpp
index 743c5f558..9b14f2e91 100644
--- a/src/groups/mqb/mqbmock/mqbmock_cluster.cpp
+++ b/src/groups/mqb/mqbmock/mqbmock_cluster.cpp
@@ -325,7 +325,8 @@ int Cluster::start(BSLS_ANNOTATION_UNUSED bsl::ostream& errorDescription)
}
void Cluster::initiateShutdown(
- BSLS_ANNOTATION_UNUSED const VoidFunctor& callback)
+ BSLS_ANNOTATION_UNUSED const VoidFunctor& callback,
+ BSLS_ANNOTATION_UNUSED bool supportShutdownV2)
{
// PRECONDITIONS
BSLS_ASSERT_OPT(!d_isStarted &&
diff --git a/src/groups/mqb/mqbmock/mqbmock_cluster.h b/src/groups/mqb/mqbmock/mqbmock_cluster.h
index 307fd33e6..bcdd63a34 100644
--- a/src/groups/mqb/mqbmock/mqbmock_cluster.h
+++ b/src/groups/mqb/mqbmock/mqbmock_cluster.h
@@ -299,11 +299,16 @@ class Cluster : public mqbi::Cluster {
/// error.
int start(bsl::ostream& errorDescription) BSLS_KEYWORD_OVERRIDE;
- /// Initiate the shutdown of the cluster. It is expected that `stop()`
- /// will be called soon after this routine is invoked. Invoke the
- /// specified `callback` upon completion of (asynchronous) shutdown
- /// sequence.
- void initiateShutdown(const VoidFunctor& callback) BSLS_KEYWORD_OVERRIDE;
+ /// Initiate the shutdown of the cluster and invoke the specified
+ /// `callback` upon completion of (asynchronous) shutdown sequence. It
+ /// is expected that `stop()` will be called soon after this routine is
+ /// invoked. If the optional (temporary) specified 'supportShutdownV2' is
+ /// 'true' execute shutdown logic V2 where upstream (not downstream) nodes
+ /// deconfigure queues and the shutting down node (not downstream) wait
+ /// for CONFIRMS.
+ void
+ initiateShutdown(const VoidFunctor& callback,
+ bool supportShutdownV2 = false) BSLS_KEYWORD_OVERRIDE;
/// Stop the `Cluster`.
void stop() BSLS_KEYWORD_OVERRIDE;
diff --git a/src/groups/mqb/mqbmock/mqbmock_queue.cpp b/src/groups/mqb/mqbmock/mqbmock_queue.cpp
index 26d6b4d75..76505fcf6 100644
--- a/src/groups/mqb/mqbmock/mqbmock_queue.cpp
+++ b/src/groups/mqb/mqbmock/mqbmock_queue.cpp
@@ -251,6 +251,11 @@ Queue::countUnconfirmed(BSLS_ANNOTATION_UNUSED unsigned int subId)
return 0;
}
+void Queue::stopPushing()
+{
+ // NOT IMPLENTED
+}
+
void Queue::onPushMessage(
BSLS_ANNOTATION_UNUSED const bmqt::MessageGUID& msgGUID,
BSLS_ANNOTATION_UNUSED const bsl::shared_ptr& appData,
@@ -348,7 +353,8 @@ void Queue::onLostUpstream()
}
void Queue::onOpenUpstream(BSLS_ANNOTATION_UNUSED bsls::Types::Uint64 genCount,
- BSLS_ANNOTATION_UNUSED unsigned int subQueueId)
+ BSLS_ANNOTATION_UNUSED unsigned int subQueueId,
+ BSLS_ANNOTATION_UNUSED bool isWriterOnly)
{
// NOTHING
}
diff --git a/src/groups/mqb/mqbmock/mqbmock_queue.h b/src/groups/mqb/mqbmock/mqbmock_queue.h
index 4ce103c79..d9b5a6869 100644
--- a/src/groups/mqb/mqbmock/mqbmock_queue.h
+++ b/src/groups/mqb/mqbmock/mqbmock_queue.h
@@ -249,6 +249,10 @@ class Queue : public mqbi::Queue {
bsls::Types::Int64
countUnconfirmed(unsigned int subId) BSLS_KEYWORD_OVERRIDE;
+ /// Stop sending PUSHes but continue receiving CONFIRMs, receiving and
+ /// sending PUTs and ACKs.
+ void stopPushing() BSLS_KEYWORD_OVERRIDE;
+
/// Called when a message with the specified `msgGUID`, `appData`,
/// `options` and compressionAlgorithmType payload is pushed to this
/// queue. Note that depending upon the location of the queue instance,
@@ -299,10 +303,13 @@ class Queue : public mqbi::Queue {
/// and current upstream `genCount`, then the PUT message gets dropped
/// to avoid out of order PUTs. If the `upstreamSubQueueId` is
/// `k_ANY_SUBQUEUE_ID`, all SubQueues are reopen.
+ /// If the optionally specified isWriterOnly is true, ignore CONFIRMs. This
+ /// should be specified if the upstream is stopping.
///
/// THREAD: This method is called from the Queue's dispatcher thread.
void onOpenUpstream(bsls::Types::Uint64 genCount,
- unsigned int upstreamSubQueueId) BSLS_KEYWORD_OVERRIDE;
+ unsigned int upstreamSubQueueId,
+ bool isWriterOnly = false) BSLS_KEYWORD_OVERRIDE;
/// Notify the (remote) queue about reopen failure. The queue NACKs all
/// pending and incoming PUTs and drops CONFIRMs related to to the
diff --git a/src/groups/mqb/mqbmock/mqbmock_queueengine.cpp b/src/groups/mqb/mqbmock/mqbmock_queueengine.cpp
index 4dcf068e5..208a65d22 100644
--- a/src/groups/mqb/mqbmock/mqbmock_queueengine.cpp
+++ b/src/groups/mqb/mqbmock/mqbmock_queueengine.cpp
@@ -46,7 +46,7 @@ int QueueEngine::configure(
return 0;
}
-void QueueEngine::resetState()
+void QueueEngine::resetState(BSLS_ANNOTATION_UNUSED bool keepConfirming)
{
// NOTHING
}
diff --git a/src/groups/mqb/mqbmock/mqbmock_queueengine.h b/src/groups/mqb/mqbmock/mqbmock_queueengine.h
index a52a1ef51..b156ba56a 100644
--- a/src/groups/mqb/mqbmock/mqbmock_queueengine.h
+++ b/src/groups/mqb/mqbmock/mqbmock_queueengine.h
@@ -88,8 +88,10 @@ class QueueEngine : public mqbi::QueueEngine {
virtual int
configure(bsl::ostream& errorDescription) BSLS_KEYWORD_OVERRIDE;
- /// Reset the internal state of this engine.
- virtual void resetState() BSLS_KEYWORD_OVERRIDE;
+ /// Reset the internal state of this engine. If the optionally specified
+ /// 'keepConfirming' is 'true', keep the data structures for CONFIRMs
+ /// processing.
+ virtual void resetState(bool keepConfirming = false) BSLS_KEYWORD_OVERRIDE;
/// Rebuild the internal state of this engine. This method is invoked
/// when the queue this engine is associated with is created from an
diff --git a/src/groups/mqb/mqbnet/mqbnet_dummysession.cpp b/src/groups/mqb/mqbnet/mqbnet_dummysession.cpp
index 2407963f8..340e64fc1 100644
--- a/src/groups/mqb/mqbnet/mqbnet_dummysession.cpp
+++ b/src/groups/mqb/mqbnet/mqbnet_dummysession.cpp
@@ -58,7 +58,8 @@ void DummySession::tearDown(
void DummySession::initiateShutdown(
BSLS_ANNOTATION_UNUSED const ShutdownCb& callback,
- BSLS_ANNOTATION_UNUSED const bsls::TimeInterval& timeout)
+ BSLS_ANNOTATION_UNUSED const bsls::TimeInterval& timeout,
+ BSLS_ANNOTATION_UNUSED bool supportShutdownV2)
{
// NOTHING
}
diff --git a/src/groups/mqb/mqbnet/mqbnet_dummysession.h b/src/groups/mqb/mqbnet/mqbnet_dummysession.h
index 402709d8f..8c192c827 100644
--- a/src/groups/mqb/mqbnet/mqbnet_dummysession.h
+++ b/src/groups/mqb/mqbnet/mqbnet_dummysession.h
@@ -135,10 +135,14 @@ class DummySession : public Session {
/// Initiate the shutdown of the session and invoke the specified
/// `callback` upon completion of (asynchronous) shutdown sequence or
- /// if the specified `timeout` is expired.
+ /// if the specified `timeout` is expired. If the optional (temporary)
+ /// specified 'supportShutdownV2' is 'true' execute shutdown logic V2
+ /// where upstream (not downstream) nodes deconfigure queues and the
+ /// shutting down node (not downstream) waits for CONFIRMS.
void
initiateShutdown(const ShutdownCb& callback,
- const bsls::TimeInterval& timeout) BSLS_KEYWORD_OVERRIDE;
+ const bsls::TimeInterval& timeout,
+ bool supportShutdownV2 = false) BSLS_KEYWORD_OVERRIDE;
/// Make the session abandon any work it has.
void invalidate() BSLS_KEYWORD_OVERRIDE;
diff --git a/src/groups/mqb/mqbnet/mqbnet_multirequestmanager.h b/src/groups/mqb/mqbnet/mqbnet_multirequestmanager.h
index db7a67b36..577ad3124 100644
--- a/src/groups/mqb/mqbnet/mqbnet_multirequestmanager.h
+++ b/src/groups/mqb/mqbnet/mqbnet_multirequestmanager.h
@@ -252,6 +252,8 @@ class MultiRequestManager {
void sendRequest(const RequestContextSp& context,
bsls::TimeInterval timeout);
+
+ void processResponse(const bmqp_ctrlmsg::ControlMessage& message);
};
// ============================================================================
@@ -486,6 +488,13 @@ void MultiRequestManager::sendRequest(
}
}
+template
+inline void MultiRequestManager::processResponse(
+ const bmqp_ctrlmsg::ControlMessage& response)
+{
+ d_requestManager_p->processResponse(response);
+};
+
template
inline const bsl::string&
MultiRequestManager::targetDescription(
diff --git a/src/groups/mqb/mqbnet/mqbnet_session.h b/src/groups/mqb/mqbnet/mqbnet_session.h
index 9a00219c1..a395bff47 100644
--- a/src/groups/mqb/mqbnet/mqbnet_session.h
+++ b/src/groups/mqb/mqbnet/mqbnet_session.h
@@ -129,9 +129,13 @@ class Session : public SessionEventProcessor {
/// Initiate the shutdown of the session and invoke the specified
/// `callback` upon completion of (asynchronous) shutdown sequence or
- /// if the specified `timeout` is expired.
+ /// if the specified `timeout` is expired. If the optional (temporary)
+ /// specified 'supportShutdownV2' is 'true' execute shutdown logic V2
+ /// where upstream (not downstream) nodes deconfigure queues and the
+ /// shutting down node (not downstream) waits for CONFIRMS.
virtual void initiateShutdown(const ShutdownCb& callback,
- const bsls::TimeInterval& timeout) = 0;
+ const bsls::TimeInterval& timeout,
+ bool supportShutdownV2 = false) = 0;
/// Make the session abandon any work it has.
virtual void invalidate() = 0;
diff --git a/src/groups/mqb/mqbnet/mqbnet_session.t.cpp b/src/groups/mqb/mqbnet/mqbnet_session.t.cpp
index fb5c17b42..bf19fe2a7 100644
--- a/src/groups/mqb/mqbnet/mqbnet_session.t.cpp
+++ b/src/groups/mqb/mqbnet/mqbnet_session.t.cpp
@@ -69,9 +69,9 @@ struct SessionTestImp : bsls::ProtocolTestImp {
markDone();
}
- void
- initiateShutdown(const ShutdownCb& callback,
- const bsls::TimeInterval& timeout) BSLS_KEYWORD_OVERRIDE
+ void initiateShutdown(const ShutdownCb& callback,
+ const bsls::TimeInterval& timeout,
+ bool supportShutdownV2 = false) BSLS_KEYWORD_OVERRIDE
{
markDone();
}
diff --git a/src/integration-tests/test_graceful_shutdown.py b/src/integration-tests/test_graceful_shutdown.py
index 8f9e3cb28..68fdaf77d 100644
--- a/src/integration-tests/test_graceful_shutdown.py
+++ b/src/integration-tests/test_graceful_shutdown.py
@@ -186,7 +186,7 @@ def kill_wait_unconfirmed(self, peer):
# start graceful shutdown
peer.exit_gracefully()
- capture = peer.capture(r"Waiting for (-?\d+) unconfirmed messages", timeout=2)
+ capture = peer.capture(r"waiting for (-?\d+) unconfirmed message", timeout=2)
assert capture
num_messages = int(capture[1])
@@ -198,7 +198,7 @@ def kill_wait_unconfirmed(self, peer):
consumer.drain()
- capture = peer.capture(r"Waiting for (-?\d+) unconfirmed messages", timeout=2)
+ capture = peer.capture(r"waiting for (-?\d+) unconfirmed message", timeout=2)
assert capture
num_messages = int(capture[1])
@@ -209,14 +209,9 @@ def kill_wait_unconfirmed(self, peer):
consumer.drain()
- capture = peer.capture(
- r"finish shutdown sequence having (-?\d+) unconfirmed messages", timeout=2
- )
+ capture = peer.capture(r"no unconfirmed message", timeout=2)
assert capture
- num_messages = int(capture[1])
- assert num_messages == 0
-
peer.wait()
def setup_cluster(self, cluster):
@@ -259,11 +254,8 @@ def test_wait_unconfirmed_replica(
replica = cluster.process(self.replica_proxy.get_active_node())
self.kill_wait_unconfirmed(replica)
- @tweak.cluster.queue_operations.stop_timeout_ms(3000)
- @tweak.cluster.queue_operations.shutdown_timeout_ms(2000)
- def test_cancel_unconfirmed_timer(
- self, multi_node # pylint: disable=unused-argument
- ):
+ @tweak.cluster.queue_operations.shutdown_timeout_ms(1000)
+ def test_give_up_unconfirmed(self, multi_node): # pylint: disable=unused-argument
uriWrite = tc.URI_FANOUT
uriRead = tc.URI_FANOUT_FOO
@@ -287,23 +279,19 @@ def test_cancel_unconfirmed_timer(
# start graceful shutdown
leader.exit_gracefully()
- capture = replica.capture(r"waiting for 2 unconfirmed message", timeout=2)
+ capture = leader.capture(r"waiting for 2 unconfirmed message", timeout=2)
assert capture
- leader.force_stop()
+ capture = replica.capture(r"giving up on 2 unconfirmed message", timeout=2)
+ assert not capture
- replica.drain()
+ leader.force_stop()
# wait for the queue to recover
self.producer.post(uriWrite, payload=["msg3"], succeed=True)
consumer.wait_push_event()
- # the timer should be cancelled
- capture = replica.capture(r"giving up on 2 unconfirmed message", timeout=2)
- assert not capture
-
- @tweak.cluster.queue_operations.stop_timeout_ms(3000)
- @tweak.cluster.queue_operations.shutdown_timeout_ms(2000)
+ @tweak.cluster.queue_operations.shutdown_timeout_ms(1000)
def test_multiple_stop_requests(self, multi_cluster: Cluster):
cluster = multi_cluster
@@ -326,63 +314,13 @@ def test_multiple_stop_requests(self, multi_cluster: Cluster):
consumer.wait_push_event()
assert wait_until(lambda: len(consumer.list(uriRead, block=True)) == 2, 2)
- # start graceful shutdown
- for node in cluster.virtual_nodes():
- node.exit_gracefully()
-
- capture = self.replica_proxy.capture(
- r"waiting for 2 unconfirmed message", timeout=2
- )
- assert capture
-
- @tweak.cluster.queue_operations.stop_timeout_ms(999999)
- @tweak.cluster.queue_operations.shutdown_timeout_ms(999999)
- def test_active_node_down_stop_requests(self, multi_cluster: Cluster):
- """
- Ticket 169782591
- We have: Consumer -> Proxy -> active_node -> upstream_node.
- Start shutting down active_node (one of cluster.virtual_nodes())
- Because there are unconfirmed, Proxy lingers with StopResponse.
- Kill upstream_node. That event should not cancel StopRequest!
- """
- cluster = multi_cluster
-
- uriWrite = tc.URI_FANOUT
- uriRead = tc.URI_FANOUT_FOO
-
active_node = cluster.process(self.replica_proxy.get_active_node())
- assert active_node in cluster.virtual_nodes()
-
- upstream_node = cluster.process(active_node.get_active_node())
-
- # post 2 PUTs
- self.producer.post(uriWrite, payload=["msg1"], succeed=True)
- self.producer.post(uriWrite, payload=["msg2"], succeed=True)
-
- # start consumer
- consumer = self.replica_proxy.create_client("consumer")
-
- consumer.open(uriRead, flags=["read"], succeed=True)
-
- # receive messages
- consumer.wait_push_event()
- assert wait_until(lambda: len(consumer.list(uriRead, block=True)) == 2, 2)
# start graceful shutdown
- active_node.exit_gracefully()
-
- capture = self.replica_proxy.capture(
- r"waiting for 2 unconfirmed message", timeout=2
- )
- assert capture
-
- upstream_node.force_stop()
-
- consumer.confirm(uriRead, "*", succeed=True)
+ for node in cluster.virtual_nodes():
+ node.exit_gracefully()
- capture = active_node.capture(
- r"Received control message: \[ rId = (-?\d+) choice = \[ clusterMessage = \[ choice = \[ stopResponse"
- )
+ capture = active_node.capture(r"waiting for 2 unconfirmed message", timeout=2)
assert capture
@tweak.cluster.queue_operations.stop_timeout_ms(999999)
diff --git a/src/integration-tests/test_maxunconfirmed.py b/src/integration-tests/test_maxunconfirmed.py
index 2938e6eed..7cfdb7b8a 100644
--- a/src/integration-tests/test_maxunconfirmed.py
+++ b/src/integration-tests/test_maxunconfirmed.py
@@ -48,7 +48,7 @@ def post_n_msgs(self, uri, n):
)
return all(res == Client.e_SUCCESS for res in results)
- @tweak.cluster.queue_operations.stop_timeout_ms(1000)
+ @tweak.cluster.queue_operations.shutdown_timeout_ms(1000)
def test_maxunconfirmed(self, multi_node: Cluster):
# Post 100 messages
assert self.post_n_msgs(tc.URI_PRIORITY, 100)