diff --git a/src/groups/mqb/mqba/mqba_domainmanager.cpp b/src/groups/mqb/mqba/mqba_domainmanager.cpp index d5d5ca242..95b6d6990 100644 --- a/src/groups/mqb/mqba/mqba_domainmanager.cpp +++ b/src/groups/mqb/mqba/mqba_domainmanager.cpp @@ -702,6 +702,124 @@ int DomainManager::processCommand(mqbcmd::DomainsResult* result, return 0; // RETURN } } + else if (command.isRemoveValue()) { + const bsl::string& name = command.remove().domain(); + + DomainSp domainSp; + + if (0 != locateOrCreateDomain(&domainSp, name)) { + bmqu::MemOutStream os; + os << "Trying to remove a nonexistent domain '" << name << "'"; + result->makeError().message() = os.str(); + return -1; // RETURN + } + + // First pass + if (command.remove().finalize().isNull()) { + BALL_LOG_INFO << "[First pass] DOMAINS REMOVE '" << name + << "' called!!!"; + + // TODO: broadcast to all other nodes + + // 1. Reject if there's any opened queue + if (domainSp->hasActiveQueue()) { + bmqu::MemOutStream os; + os << "Trying to remove the domain '" << name + << "' while there are queues open"; + result->makeError().message() = os.str(); + return -1; // RETURN + } + + BALL_LOG_INFO << "BEFORE CHEKCING CLUSTER STATUS"; + + // 2. Reject if the state of cluster is not healthy + // Notice that the bad state can happen anywhere down the road, + // so this check is not enough to prevent a partial execution. + // It's inevitable, so we can only make sure the code doesn't + // break if we run this command again + // TODO: ask if this is necessary??? + mqbi::Cluster* cluster = domainSp->cluster(); + mqbcmd::ClusterResult clusterStatusResult; + mqbcmd::ClusterCommand clusterStatusCommand; + clusterStatusCommand.makeStatus(); + + int rc = cluster->processCommand(&clusterStatusResult, + clusterStatusCommand); + if (clusterStatusResult.isErrorValue()) { + result->makeError(clusterStatusResult.error()); + return rc; // RETURN + } + + BALL_LOG_INFO << "AFTER CHEKCING CLUSTER STATUS"; + BALL_LOG_INFO << clusterStatusResult.clusterStatus(); + + BSLS_ASSERT_SAFE(clusterStatusResult.isClusterStatusValue()); + + if (!clusterStatusResult.clusterStatus().isHealthy()) { + bmqu::MemOutStream os; + os << "Domain '" << name << "' in cluster '" << name + << "' is not healthy"; + result->makeError().message() = os.str(); + return -1; // RETURN + } + + BALL_LOG_INFO << "BEFORE SETTING 'DELETED' FLAG"; + + // 3. Mark DOMAIN REMOVING to Block all incoming openQueue requests + // TODO: this idea requires a lot of getters and setters exposed in + // mqbi - better idea? + domainSp->removeDomain(); + + BALL_LOG_INFO << "BEFORE CLEAN DOMAINRESOLVER CACHE"; + + // 4. Mark domain for delete in domainResolver + d_domainResolver_mp->clearCache(name); + + BALL_LOG_INFO << "BEFORE PURGE"; + + // 5. Purge inactive queues + // remove virtual storage; add a record in journal file + mqbcmd::DomainCommand domainCommand; + domainCommand.makePurge(); + + mqbcmd::DomainResult domainResult; + rc = domainSp->processCommand(&domainResult, domainCommand); + + if (domainResult.isErrorValue()) { + result->makeError(domainResult.error()); + return rc; // RETURN + } + else if (domainResult.isSuccessValue()) { + result->makeSuccess(domainResult.success()); + return rc; // RETURN + } + result->makeDomainResult(domainResult); + + // 6. Force GC queues + // remove Queue from domain; remove storage from partition + // CLUSTERS CLUSTER FORCE_GC_QUEUES + // TODO: Do we want to have add another type of result? + mqbcmd::ClusterResult clusterForceGCResult; + rc = cluster->gcQueueOnDomain(&clusterForceGCResult, name); + if (clusterForceGCResult.isErrorValue()) { + result->makeError(clusterForceGCResult.error()); + return -1; // RETURN + } + + // 7. Mark DOMAIN REMOVED to accecpt the second pass + domainSp->removeDomainCompleted(); + } + // Second pass + else { + // TODO: remove the domain object + + BALL_LOG_INFO << "[Second pass] DOMAINS REMOVE '" << name + << "' finalize called!!!"; + result->makeSuccess(); + return 0; // RETURN + } + return 0; + } bmqu::MemOutStream os; os << "Unknown command '" << command << "'"; diff --git a/src/groups/mqb/mqbblp/mqbblp_cluster.cpp b/src/groups/mqb/mqbblp/mqbblp_cluster.cpp index 2723c0830..ef247ca9e 100644 --- a/src/groups/mqb/mqbblp/mqbblp_cluster.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_cluster.cpp @@ -441,10 +441,12 @@ void Cluster::sendAck(bmqt::AckResult::Enum status, d_throttledFailedAckMessages, BALL_LOG_INFO << description() << ": failed Ack " << "[status: " << status << ", source: '" << source - << "'" << ", correlationId: " << correlationId + << "'" + << ", correlationId: " << correlationId << ", GUID: " << messageGUID << ", queue: '" << (found ? uri : "** null **") << "' " - << "(id: " << queueId << ")] " << "to node " + << "(id: " << queueId << ")] " + << "to node " << nodeSession->clusterNode()->nodeDescription();); } @@ -492,7 +494,8 @@ void Cluster::sendAck(bmqt::AckResult::Enum status, d_throttledDroppedAckMessages, BALL_LOG_ERROR << description() << ": dropping ACK message " << "[status: " << status << ", source: '" << source - << "'" << ", correlationId: " << correlationId + << "'" + << ", correlationId: " << correlationId << ", GUID: " << messageGUID << ", queueId: " << queueId << "] to node " << nodeSession->clusterNode()->nodeDescription() @@ -3656,6 +3659,47 @@ void Cluster::loadClusterStatus(mqbcmd::ClusterResult* result) storageResult.clusterStorageSummary(); } +int Cluster::gcQueueOnDomain(mqbcmd::ClusterResult* result, + const bsl::string& domainName) +{ + // exected by *ANY* thread + + dispatcher()->execute( + bdlf::BindUtil::bind(&Cluster::gcQueueOnDomainDispatched, + this, + result, + domainName), + this); + + dispatcher()->synchronize(this); + + return 0; +} + +void Cluster::gcQueueOnDomainDispatched(mqbcmd::ClusterResult* result, + const bsl::string& domainName) +{ + // executed by the *DISPATCHER* thread + + // PRECONDITIONS + BSLS_ASSERT_SAFE(dispatcher()->inDispatcherThread(this)); + + // 'true' implies immediate + if (const int rc = d_clusterOrchestrator.queueHelper().gcExpiredQueues( + true, + domainName)) { + BALL_LOG_ERROR << "Failed to execute force GC queues command (rc: " + << rc << ")"; + result->makeError().message() = "Failed to execute command (rc: " + + bsl::to_string(rc) + ")"; + } + else { + // Otherwise the command succeeded. + BALL_LOG_INFO << "SUCCESS in Cluster::gcQueueOnDomainDispatched"; + result->makeSuccess(); + } +} + void Cluster::printClusterStateSummary(bsl::ostream& out, int level, int spacesPerLevel) const diff --git a/src/groups/mqb/mqbblp/mqbblp_cluster.h b/src/groups/mqb/mqbblp/mqbblp_cluster.h index 13fe60889..57168c7e7 100644 --- a/src/groups/mqb/mqbblp/mqbblp_cluster.h +++ b/src/groups/mqb/mqbblp/mqbblp_cluster.h @@ -673,6 +673,14 @@ class Cluster : public mqbi::Cluster, /// Load the cluster state to the specified `out` object. void loadClusterStatus(mqbcmd::ClusterResult* out) BSLS_KEYWORD_OVERRIDE; + /// Force GC queues in this cluster on a given domain. + int gcQueueOnDomain(mqbcmd::ClusterResult* result, + const bsl::string& domainName) BSLS_KEYWORD_OVERRIDE; + + /// Executed by dispatcher thread. + void gcQueueOnDomainDispatched(mqbcmd::ClusterResult* result, + const bsl::string& domainName); + // MANIPULATORS // (virtual: mqbnet::SessionEventProcessor) diff --git a/src/groups/mqb/mqbblp/mqbblp_clusterproxy.cpp b/src/groups/mqb/mqbblp/mqbblp_clusterproxy.cpp index 94cca4401..2a0568fa8 100644 --- a/src/groups/mqb/mqbblp/mqbblp_clusterproxy.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_clusterproxy.cpp @@ -780,7 +780,7 @@ void ClusterProxy::processEvent(const bmqp::Event& event, << "Received unexpected event: " << event; BSLS_ASSERT_SAFE(false && "Unexpected event received"); return; // RETURN - } // break; + } // break; default: { BALL_LOG_ERROR << "#UNEXPECTED_EVENT " << description() << "Received unknown event: " << event; @@ -1338,6 +1338,20 @@ void ClusterProxy::loadClusterStatus(mqbcmd::ClusterResult* out) loadQueuesInfo(&clusterProxyStatus.queuesInfo()); } +int ClusterProxy::gcQueueOnDomain( + mqbcmd::ClusterResult* result, + BSLS_ANNOTATION_UNUSED const bsl::string& domainName) +{ + // exected by *ANY* thread + + bdlma::LocalSequentialAllocator<256> localAllocator(d_allocator_p); + bmqu::MemOutStream os(&localAllocator); + os << "GC Queue not supported on a Proxy."; + result->makeError().message() = os.str(); + + return 0; +} + // MANIPULATORS // (virtual: mqbi::DispatcherClient) void ClusterProxy::onDispatcherEvent(const mqbi::DispatcherEvent& event) @@ -1401,31 +1415,31 @@ void ClusterProxy::onDispatcherEvent(const mqbi::DispatcherEvent& event) BSLS_ASSERT_OPT(false && "'DISPATCHER' type dispatcher event unexpected"); return; // RETURN - } // break; + } // break; case mqbi::DispatcherEventType::e_CLUSTER_STATE: { BSLS_ASSERT_OPT(false && "'CLUSTER_STATE' type dispatcher event unexpected"); return; // RETURN - } // break; + } // break; case mqbi::DispatcherEventType::e_STORAGE: { BSLS_ASSERT_OPT(false && "'STORAGE' type dispatcher event unexpected"); return; // RETURN - } // break; + } // break; case mqbi::DispatcherEventType::e_RECOVERY: { BSLS_ASSERT_OPT(false && "'RECOVERY' type dispatcher event unexpected"); return; // RETURN - } // break; + } // break; case mqbi::DispatcherEventType::e_UNDEFINED: { BSLS_ASSERT_OPT(false && "'UNDEFINED' type dispatcher event unexpected"); return; // RETURN - } // break; + } // break; case mqbi::DispatcherEventType::e_REPLICATION_RECEIPT: { BSLS_ASSERT_OPT( false && "'REPLICATION_RECEIPT' type dispatcher event unexpected"); return; // RETURN - } // break; + } // break; default: { BALL_LOG_ERROR << "#UNEXPECTED_EVENT " << description() << ": received unexpected dispatcher event: " << event; diff --git a/src/groups/mqb/mqbblp/mqbblp_clusterproxy.h b/src/groups/mqb/mqbblp/mqbblp_clusterproxy.h index 667a052fe..8e0483074 100644 --- a/src/groups/mqb/mqbblp/mqbblp_clusterproxy.h +++ b/src/groups/mqb/mqbblp/mqbblp_clusterproxy.h @@ -539,6 +539,10 @@ class ClusterProxy : public mqbc::ClusterStateObserver, /// Load the cluster state in the specified `out` object. void loadClusterStatus(mqbcmd::ClusterResult* out) BSLS_KEYWORD_OVERRIDE; + /// Force GC queues in this cluster on a given domain. + int gcQueueOnDomain(mqbcmd::ClusterResult* result, + const bsl::string& domainName) BSLS_KEYWORD_OVERRIDE; + void getPrimaryNodes(int* rc, bsl::ostream& errorDescription, bsl::vector* nodes, diff --git a/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp b/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp index 609df8b01..daa1aafa6 100644 --- a/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp @@ -6140,7 +6140,8 @@ void ClusterQueueHelper::onCloseQueueResponse( << contextSp->d_peer->nodeDescription(); } -int ClusterQueueHelper::gcExpiredQueues(bool immediate) +int ClusterQueueHelper::gcExpiredQueues(bool immediate, + const bsl::string& domainName) { // executed by the cluster *DISPATCHER* thread @@ -6175,6 +6176,11 @@ int ClusterQueueHelper::gcExpiredQueues(bool immediate) QueueLiveState& qinfo = queueContextSp->d_liveQInfo; const int pid = queueContextSp->partitionId(); + if (domainName != "" && + it->second->uri().qualifiedDomain().compare(domainName) != 0) { + continue; // CONTINUE + } + if (!isQueueAssigned(*queueContextSp)) { continue; // CONTINUE } diff --git a/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.h b/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.h index f698d0e2b..7e53b8ec2 100644 --- a/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.h +++ b/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.h @@ -579,7 +579,7 @@ class ClusterQueueHelper BSLS_KEYWORD_FINAL /// Process the open queue request represented by the specified /// `context`: that is, depending on the cluster mode and queue - /// assignment, either send an open queue request of create the queue. + /// assignment, either send an open queue request or create the queue. /// The queue must have been assigned at this point. void processOpenQueueRequest(const OpenQueueContext& context); @@ -1107,7 +1107,8 @@ class ClusterQueueHelper BSLS_KEYWORD_FINAL /// delete the qualified queues immediately instead of marking them for /// deletion in future. Returns 0 on success or a non-zero error code on /// failure. - int gcExpiredQueues(bool immediate = false); + int gcExpiredQueues(bool immediate = false, + const bsl::string& domainName = ""); ClusterQueueHelper& setOnQueueAssignedCb(const OnQueueAssignedCb& value); diff --git a/src/groups/mqb/mqbblp/mqbblp_domain.cpp b/src/groups/mqb/mqbblp/mqbblp_domain.cpp index 1f1928ffd..3c394f960 100644 --- a/src/groups/mqb/mqbblp/mqbblp_domain.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_domain.cpp @@ -56,7 +56,8 @@ namespace mqbblp { namespace { const char k_LOG_CATEGORY[] = "MQBBLP.DOMAIN"; -const char k_NODE_IS_STOPPING[] = "Node is stopping"; +const char k_NODE_IS_STOPPING[] = "Node is stopping"; +const char k_DOMAIN_IS_REMOVING_OR_REMOVED[] = "Domain is removing or removed"; /// This method does nothing.. it's just used so that we can control the /// destruction of the specified `queue` to happen once we guarantee the @@ -371,7 +372,7 @@ Domain::Domain(const bsl::string& name, Domain::~Domain() { - BSLS_ASSERT_SAFE(e_STARTED != d_state && + BSLS_ASSERT_SAFE((e_STOPPING == d_state || e_STOPPED == d_state) && "'teardown' must be called before the destructor"); } @@ -528,9 +529,17 @@ void Domain::openQueue( // Reject this open-queue request with a soft failure status. bmqp_ctrlmsg::Status status; - status.category() = bmqp_ctrlmsg::StatusCategory::E_REFUSED; - status.code() = mqbi::ClusterErrorCode::e_STOPPING; - status.message() = k_NODE_IS_STOPPING; + + if (d_state == e_REMOVING || d_state == e_REMOVED) { + status.category() = bmqp_ctrlmsg::StatusCategory::E_REFUSED; + status.code() = mqbi::ClusterErrorCode::e_UNKNOWN; + status.message() = k_DOMAIN_IS_REMOVING_OR_REMOVED; + } + else { + status.category() = bmqp_ctrlmsg::StatusCategory::E_REFUSED; + status.code() = mqbi::ClusterErrorCode::e_STOPPING; + status.message() = k_NODE_IS_STOPPING; + } callback(status, static_cast(0), @@ -865,6 +874,20 @@ int Domain::processCommand(mqbcmd::DomainResult* result, return -1; } +void Domain::removeDomainStart() +{ + bslmt::LockGuard guard(&d_mutex); // LOCK + + d_state = e_REMOVING; +} + +void Domain::removeDomainCompleted() +{ + bslmt::LockGuard guard(&d_mutex); // LOCK + + d_state = e_REMOVED; +} + // ACCESSORS int Domain::lookupQueue(bsl::shared_ptr* out, const bmqt::Uri& uri) const @@ -960,5 +983,24 @@ void Domain::loadRoutingConfiguration( } } +bool Domain::hasActiveQueue() const +{ + // Queues are created before handles, so if d_queue is empty, + // there's shouldn't be any active handle + if (d_queues.empty()) { + return false; + } + + // If there's queue in this domain, check to see if there's + // any handle to it + for (QueueMapCIter it = d_queues.begin(); it != d_queues.end(); ++it) { + if (it->second->hasActiveHandle()) { + return true; + } + } + + return false; +} + } // close package namespace } // close enterprise namespace diff --git a/src/groups/mqb/mqbblp/mqbblp_domain.h b/src/groups/mqb/mqbblp/mqbblp_domain.h index dc1b76004..0bd822e96 100644 --- a/src/groups/mqb/mqbblp/mqbblp_domain.h +++ b/src/groups/mqb/mqbblp/mqbblp_domain.h @@ -108,7 +108,13 @@ class Domain BSLS_KEYWORD_FINAL : public mqbi::Domain, typedef mqbi::Storage::AppInfos AppInfos; typedef AppInfos::const_iterator AppInfosCIter; - enum DomainState { e_STARTED = 0, e_STOPPING = 1, e_STOPPED = 2 }; + enum DomainState { + e_STARTED = 0, + e_STOPPING = 1, + e_STOPPED = 2, + e_REMOVING = 3, + e_REMOVED = 4 + }; private: // DATA @@ -314,6 +320,12 @@ class Domain BSLS_KEYWORD_FINAL : public mqbi::Domain, processCommand(mqbcmd::DomainResult* result, const mqbcmd::DomainCommand& command) BSLS_KEYWORD_OVERRIDE; + /// Mark the state of domain to be REMOVING + void removeDomainStart() BSLS_KEYWORD_OVERRIDE; + + /// Mark the state of domain to be REMOVED + void removeDomainCompleted() BSLS_KEYWORD_OVERRIDE; + // ACCESSORS /// Load into the specified `out` the queue corresponding to the @@ -349,6 +361,10 @@ class Domain BSLS_KEYWORD_FINAL : public mqbi::Domain, /// should be used by all queues under this domain. void loadRoutingConfiguration(bmqp_ctrlmsg::RoutingConfiguration* config) const BSLS_KEYWORD_OVERRIDE; + + /// Check the state of the queues in this domain, return true if + /// there's queue with valid queue handles. + bool hasActiveQueue() const BSLS_KEYWORD_OVERRIDE; }; // ============================================================================ diff --git a/src/groups/mqb/mqbblp/mqbblp_queue.h b/src/groups/mqb/mqbblp/mqbblp_queue.h index 12a053a80..c9bc0d31b 100644 --- a/src/groups/mqb/mqbblp/mqbblp_queue.h +++ b/src/groups/mqb/mqbblp/mqbblp_queue.h @@ -443,6 +443,9 @@ class Queue BSLS_CPP11_FINAL : public mqbi::Queue { /// Return the Schema Leaner associated with this queue. bmqp::SchemaLearner& schemaLearner() const BSLS_KEYWORD_OVERRIDE; + + /// Return true if there's queue handle and they're actively used. + bool hasActiveHandle() const BSLS_KEYWORD_OVERRIDE; }; // ============================================================================ @@ -588,6 +591,11 @@ inline bmqp::SchemaLearner& Queue::schemaLearner() const return d_schemaLearner; } +inline bool Queue::hasActiveHandle() const +{ + return d_state.handleCatalog().handlesCount() != 0; +} + } // close package namespace } // close enterprise namespace diff --git a/src/groups/mqb/mqbi/mqbi_cluster.h b/src/groups/mqb/mqbi/mqbi_cluster.h index fc0a44a85..7904aeacc 100644 --- a/src/groups/mqb/mqbi/mqbi_cluster.h +++ b/src/groups/mqb/mqbi/mqbi_cluster.h @@ -371,6 +371,10 @@ class Cluster : public DispatcherClient { /// Load the cluster state to the specified `out` object. virtual void loadClusterStatus(mqbcmd::ClusterResult* out) = 0; + /// Force GC queues in this cluster on a given domain. + virtual int gcQueueOnDomain(mqbcmd::ClusterResult* result, + const bsl::string& domainName) = 0; + // ACCESSORS /// Return the name of this cluster. diff --git a/src/groups/mqb/mqbi/mqbi_domain.h b/src/groups/mqb/mqbi/mqbi_domain.h index 6f9ac5112..183724f32 100644 --- a/src/groups/mqb/mqbi/mqbi_domain.h +++ b/src/groups/mqb/mqbi/mqbi_domain.h @@ -177,6 +177,12 @@ class Domain { virtual int processCommand(mqbcmd::DomainResult* result, const mqbcmd::DomainCommand& command) = 0; + /// Mark the state of domain to be REMOVING + virtual void removeDomainStart() = 0; + + /// Mark the state of domain to be REMOVED + virtual void removeDomainCompleted() = 0; + // ACCESSORS /// Load into the specified `out` the queue corresponding to the @@ -212,6 +218,10 @@ class Domain { /// should be used by all queues under this domain. virtual void loadRoutingConfiguration( bmqp_ctrlmsg::RoutingConfiguration* config) const = 0; + + /// Check the state of the queues in this domain, return true if + /// there's queue with valid queue handles. + virtual bool hasActiveQueue() const = 0; }; // =================== diff --git a/src/groups/mqb/mqbi/mqbi_queue.h b/src/groups/mqb/mqbi/mqbi_queue.h index b1a0e321e..bfaca3dba 100644 --- a/src/groups/mqb/mqbi/mqbi_queue.h +++ b/src/groups/mqb/mqbi/mqbi_queue.h @@ -951,6 +951,9 @@ class Queue : public DispatcherClient { /// Return the Schema Leaner associated with this queue. virtual bmqp::SchemaLearner& schemaLearner() const = 0; + + /// Return true if there's queue handle and they're actively used. + virtual bool hasActiveHandle() const = 0; }; // ======================== diff --git a/src/groups/mqb/mqbmock/mqbmock_cluster.cpp b/src/groups/mqb/mqbmock/mqbmock_cluster.cpp index 0d57dc8d7..14c63e8e2 100644 --- a/src/groups/mqb/mqbmock/mqbmock_cluster.cpp +++ b/src/groups/mqb/mqbmock/mqbmock_cluster.cpp @@ -489,6 +489,16 @@ void Cluster::loadClusterStatus(mqbcmd::ClusterResult* out) out->makeClusterStatus(); } +int Cluster::gcQueueOnDomain( + mqbcmd::ClusterResult* result, + BSLS_ANNOTATION_UNUSED const bsl::string& domainName) +{ + bmqu::MemOutStream os; + os << "MockCluster::gcQueueOnDomain not implemented!"; + result->makeError().message() = os.str(); + return -1; +} + // MANIPULATORS // (specific to mqbmock::Cluster) Cluster& Cluster::_setIsClusterMember(bool value) diff --git a/src/groups/mqb/mqbmock/mqbmock_cluster.h b/src/groups/mqb/mqbmock/mqbmock_cluster.h index 23f03dbe1..89c35d283 100644 --- a/src/groups/mqb/mqbmock/mqbmock_cluster.h +++ b/src/groups/mqb/mqbmock/mqbmock_cluster.h @@ -402,6 +402,10 @@ class Cluster : public mqbi::Cluster { /// Load the cluster state to the specified `out` object. void loadClusterStatus(mqbcmd::ClusterResult* out) BSLS_KEYWORD_OVERRIDE; + /// Force GC queues in this cluster on a given domain. + int gcQueueOnDomain(mqbcmd::ClusterResult* result, + const bsl::string& domainName) BSLS_KEYWORD_OVERRIDE; + // MANIPULATORS // (specific to mqbmock::Cluster) diff --git a/src/groups/mqb/mqbmock/mqbmock_domain.cpp b/src/groups/mqb/mqbmock/mqbmock_domain.cpp index 7839bd262..699e03b94 100644 --- a/src/groups/mqb/mqbmock/mqbmock_domain.cpp +++ b/src/groups/mqb/mqbmock/mqbmock_domain.cpp @@ -149,6 +149,16 @@ int Domain::processCommand(mqbcmd::DomainResult* result, return -1; } +void Domain::removeDomainStart() +{ + BSLS_ASSERT_SAFE(false && "NOT IMPLEMENTED!"); +} + +void Domain::removeDomainCompleted() +{ + BSLS_ASSERT_SAFE(false && "NOT IMPLEMENTED!"); +} + int Domain::lookupQueue(bsl::shared_ptr* out, const bmqt::Uri& uri) const { @@ -220,6 +230,11 @@ void Domain::loadRoutingConfiguration( // NOTHING } +bool Domain::hasActiveQueue() const +{ + return true; +} + // ------------------- // class DomainFactory // ------------------- diff --git a/src/groups/mqb/mqbmock/mqbmock_domain.h b/src/groups/mqb/mqbmock/mqbmock_domain.h index b79335bcd..9d52f210f 100644 --- a/src/groups/mqb/mqbmock/mqbmock_domain.h +++ b/src/groups/mqb/mqbmock/mqbmock_domain.h @@ -209,6 +209,12 @@ class Domain : public mqbi::Domain { processCommand(mqbcmd::DomainResult* result, const mqbcmd::DomainCommand& command) BSLS_KEYWORD_OVERRIDE; + /// Mark the state of domain to be REMOVING + void removeDomainStart() BSLS_KEYWORD_OVERRIDE; + + /// Mark the state of domain to be REMOVED + void removeDomainCompleted() BSLS_KEYWORD_OVERRIDE; + /// Load into the specified `out`, if `out` is not 0, the queue /// corresponding to the specified `uri`, if found. Return 0 on success, /// or a non-zero return code otherwise. @@ -242,6 +248,10 @@ class Domain : public mqbi::Domain { /// should be used by all queues under this domain. void loadRoutingConfiguration(bmqp_ctrlmsg::RoutingConfiguration* config) const BSLS_KEYWORD_OVERRIDE; + + /// Check the state of the current domain, return true if it's + /// active and accepts incoming connections. + bool hasActiveQueue() const BSLS_KEYWORD_OVERRIDE; }; // =================== diff --git a/src/groups/mqb/mqbmock/mqbmock_queue.cpp b/src/groups/mqb/mqbmock/mqbmock_queue.cpp index fda477727..203a5a5c9 100644 --- a/src/groups/mqb/mqbmock/mqbmock_queue.cpp +++ b/src/groups/mqb/mqbmock/mqbmock_queue.cpp @@ -519,6 +519,13 @@ bmqp::SchemaLearner& Queue::schemaLearner() const return d_schemaLearner; } +bool Queue::hasActiveHandle() const +{ + BSLS_ASSERT_SAFE(false && "NOT IMPLEMENTED!"); + + return false; +} + // ------------------- // class HandleFactory // ------------------- diff --git a/src/groups/mqb/mqbmock/mqbmock_queue.h b/src/groups/mqb/mqbmock/mqbmock_queue.h index 3f5ff769d..70013fbbb 100644 --- a/src/groups/mqb/mqbmock/mqbmock_queue.h +++ b/src/groups/mqb/mqbmock/mqbmock_queue.h @@ -425,6 +425,9 @@ class Queue : public mqbi::Queue { /// Return the Schema Leaner associated with this queue. bmqp::SchemaLearner& schemaLearner() const BSLS_KEYWORD_OVERRIDE; + /// Return true if there's queue handle and they're actively used. + bool hasActiveHandle() const BSLS_KEYWORD_OVERRIDE; + // ACCESSORS // (specific to mqbi::MockQueue) }; diff --git a/src/groups/mqb/mqbs/mqbs_storagecollectionutil.cpp b/src/groups/mqb/mqbs/mqbs_storagecollectionutil.cpp index 5e0b5c1bd..8bf534c9a 100644 --- a/src/groups/mqb/mqbs/mqbs_storagecollectionutil.cpp +++ b/src/groups/mqb/mqbs/mqbs_storagecollectionutil.cpp @@ -14,6 +14,7 @@ // limitations under the License. // mqbs_storagecollectionutil.cpp -*-C++-*- +#include #include // BDE @@ -70,9 +71,8 @@ createStorageComparator(StorageCollectionUtilSortMetric::Enum metricType) bool byDomainFilter(const ReplicatedStorage* storage, const bsl::string& domainName) { - return strncmp(storage->queueUri().qualifiedDomain().data(), - domainName.c_str(), - storage->queueUri().qualifiedDomain().length()) == 0; + return storage->queueUri().qualifiedDomain().compare( + bslstl::StringRef(domainName)) == 0; } bool byMessageCountFilter(const ReplicatedStorage* storage, diff --git a/src/integration-tests/test_domain_remove.py b/src/integration-tests/test_domain_remove.py new file mode 100644 index 000000000..11c89d5f1 --- /dev/null +++ b/src/integration-tests/test_domain_remove.py @@ -0,0 +1,277 @@ +# Copyright 2024 Bloomberg Finance L.P. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +""" +This suite of test cases verifies the admin command +"DOMAINS REMOVE [finalize]" work as expected +""" + +import blazingmq.dev.it.testconstants as tc +from blazingmq.dev.it.fixtures import ( + multi_node, + single_node, + cluster, + Cluster, +) +from blazingmq.dev.it.process.admin import AdminClient +from blazingmq.dev.it.process.client import Client +import time + + +def test_remove_domain_with_queue_close(cluster: Cluster): + proxies = cluster.proxy_cycle() + proxy = next(proxies) + + # producer and consumer open the queue, + # post and confirm messages and both close + producer = proxy.create_client("producer") + producer.open(tc.URI_PRIORITY, flags=["write"], succeed=True) + + consumer = proxy.create_client("consumer") + consumer.open(tc.URI_PRIORITY, flags=["read"], succeed=True) + + producer.post( + tc.URI_PRIORITY, + [f"msg{i}" for i in range(3)], + succeed=True, + wait_ack=True, + ) + consumer.confirm(tc.URI_PRIORITY, "*", succeed=True) + producer.close(tc.URI_PRIORITY, succeed=True) + consumer.close(tc.URI_PRIORITY, succeed=True) + + # send remove domain admin command + # command couldn't go through since there's a queue open + admin = AdminClient() + leader = cluster.last_known_leader + admin.connect(leader.config.host, int(leader.config.port)) + res = admin.send_admin(f"DOMAINS REMOVE {tc.DOMAIN_PRIORITY}") + cluster._logger.info("=========================") + cluster._logger.info(res) + assert "while there are queues open" not in res + + +def test_remove_domain_when_cluster_unhealthy(multi_node: Cluster): + proxies = multi_node.proxy_cycle() + proxy = next(proxies) + + # find the two nodes which are not the known leader + leader = multi_node.last_known_leader + replicas = multi_node.nodes(exclude=leader) + member = replicas[0] + + # producer send a message, client confirm, then both close connection + producer = proxy.create_client("producer") + producer.open(tc.URI_PRIORITY, flags=["write"], succeed=True) + + consumer = proxy.create_client("consumer") + consumer.open(tc.URI_PRIORITY, flags=["read"], succeed=True) + + producer.post( + tc.URI_PRIORITY, [f"msg{i}" for i in range(5)], succeed=True, wait_ack=True + ) + + consumer.confirm(tc.URI_PRIORITY, "+1", succeed=True) + + producer.close(tc.URI_PRIORITY, succeed=True) + consumer.close(tc.URI_PRIORITY, succeed=True) + + # set quorum to make it impossible to select a leader + for node in multi_node.nodes(): + node.set_quorum(99, succeed=True) + + # klll the leader to make the cluster unhealthy + leader.check_exit_code = False + leader.kill() + leader.wait() + + # wait for the state to catch up + time.sleep(11) + + # send remove domain admin command + # command couldn't go through since state is unhealthy + admin = AdminClient() + admin.connect(member.config.host, int(member.config.port)) + res = admin.send_admin(f"DOMAINS REMOVE {tc.DOMAIN_PRIORITY}") + assert "is not healthy" in res + + +def test_remove_different_domain(cluster: Cluster): + proxies = cluster.proxy_cycle() + + # producer produces messages and then closes connection + producer = next(proxies).create_client("producer") + producer.open(tc.URI_PRIORITY, flags=["write"], succeed=True) + + producer.post( + tc.URI_PRIORITY, + [f"msg{i}" for i in range(3)], + succeed=True, + wait_ack=True, + ) + producer.close(tc.URI_PRIORITY) + + # send remove domain admin command + # for a different domain + admin = AdminClient() + leader = cluster.last_known_leader + admin.connect(leader.config.host, int(leader.config.port)) + + res = admin.send_admin(f"DOMAINS REMOVE {tc.DOMAIN_PRIORITY_SC}") + assert "No queue purged." in res + + # do the same things for a different pair reversely + producer.open(tc.URI_FANOUT_SC, flags=["write"], succeed=True) + + producer.post( + tc.URI_FANOUT_SC, + [f"msg{i}" for i in range(3)], + succeed=True, + wait_ack=True, + ) + producer.close(tc.URI_FANOUT_SC) + + res = admin.send_admin(f"DOMAINS REMOVE {tc.DOMAIN_FANOUT}") + assert "No queue purged." in res + + +def test_open_queue_after_remove_domain(cluster: Cluster): + proxies = cluster.proxy_cycle() + next(proxies) # eastp + proxy = next(proxies) # westp + + # producer produces messages and consumer confirms + # then both close connections + producer = proxy.create_client("producer") + producer.open(tc.URI_PRIORITY, flags=["write"], succeed=True) + + consumer = proxy.create_client("consumer") + consumer.open(tc.URI_PRIORITY, flags=["read"], succeed=True) + + producer.post( + tc.URI_PRIORITY, + [f"msg{i}" for i in range(3)], + succeed=True, + wait_ack=True, + ) + consumer.confirm(tc.URI_PRIORITY, "*", succeed=True) + producer.close(tc.URI_PRIORITY, succeed=True) + consumer.close(tc.URI_PRIORITY, succeed=True) + + # send remove domain admin command + admin = AdminClient() + leader = cluster.last_known_leader + admin.connect(leader.config.host, int(leader.config.port)) + admin.send_admin(f"DOMAINS REMOVE {tc.DOMAIN_PRIORITY}") + + # open queues on the removed domain should fail + assert producer.open(tc.URI_PRIORITY, flags=["write"], block=True) < 0 + + +def test_remove_domain_with_queue_open(cluster: Cluster): + proxies = cluster.proxy_cycle() + proxy = next(proxies) + + # producer produces messages and consumer confirms + # then both close connections + producer = proxy.create_client("producer") + producer.open(tc.URI_PRIORITY, flags=["write"], succeed=True) + + consumer = proxy.create_client("consumer") + consumer.open(tc.URI_PRIORITY, flags=["read"], succeed=True) + + producer.post( + tc.URI_PRIORITY, + [f"msg{i}" for i in range(3)], + succeed=True, + wait_ack=True, + ) + consumer.confirm(tc.URI_PRIORITY, "*", succeed=True) + + # send admin command + # when both producer and consumer open + admin = AdminClient() + leader = cluster.last_known_leader + admin.connect(leader.config.host, int(leader.config.port)) + res = admin.send_admin(f"DOMAINS REMOVE {tc.DOMAIN_PRIORITY}") + assert ( + f"Trying to remove the domain '{tc.DOMAIN_PRIORITY}' while there are queues open" + in res + ) + + # close producer and send the command again + producer.close(tc.URI_PRIORITY, succeed=True) + res = admin.send_admin(f"DOMAINS REMOVE {tc.DOMAIN_PRIORITY}") + assert ( + f"Trying to remove the domain '{tc.DOMAIN_PRIORITY}' while there are queues open" + in res + ) + + # open producer and close consumer and send the command again + producer.open(tc.URI_PRIORITY, flags=["write"], succeed=True) + consumer.close(tc.URI_PRIORITY, succeed=True) + res = admin.send_admin(f"DOMAINS REMOVE {tc.DOMAIN_PRIORITY}") + assert ( + f"Trying to remove the domain '{tc.DOMAIN_PRIORITY}' while there are queues open" + in res + ) + + # close both and send the command again + producer.close(tc.URI_PRIORITY, succeed=True) + res = admin.send_admin(f"DOMAINS REMOVE {tc.DOMAIN_PRIORITY}") + assert "while there are queues open" not in res + + +def test_remove_domain_with_unconfirmed_message(cluster: Cluster): + proxies = cluster.proxy_cycle() + proxy = next(proxies) + + # producer open the queue, + # produce messages and close the queue + producer = proxy.create_client("producer") + producer.open(tc.URI_PRIORITY, flags=["write"], succeed=True) + + producer.post( + tc.URI_PRIORITY, + [f"msg{i}" for i in range(3)], + succeed=True, + wait_ack=True, + ) + producer.close(tc.URI_PRIORITY, succeed=True) + + # send admin command + # unconfirmed messages will be purged + admin = AdminClient() + leader = cluster.last_known_leader + admin.connect(leader.config.host, int(leader.config.port)) + res = admin.send_admin(f"DOMAINS REMOVE {tc.DOMAIN_PRIORITY}") + assert "Purged 3 message(s)" in res + + +def test_remove_domain_not_on_disk(cluster: Cluster): + admin = AdminClient() + leader = cluster.last_known_leader + admin.connect(leader.config.host, int(leader.config.port)) + res = admin.send_admin("DOMAINS REMOVE domain.foo") + assert "Trying to remove a nonexistent domain" in res + + +def test_remove_domain_on_disk_not_in_cache(cluster: Cluster): + admin = AdminClient() + leader = cluster.last_known_leader + admin.connect(leader.config.host, int(leader.config.port)) + res = admin.send_admin(f"DOMAINS REMOVE {tc.DOMAIN_BROADCAST}") + assert "Trying to remove a nonexistent domain" not in res