Skip to content

Commit

Permalink
add branch in DomainManager::processCommand to deal with REMOVE command
Browse files Browse the repository at this point in the history
implement the first pass where the config file of the domain is still on the disk,
and the admin command can only be sent to leader/primary

Signed-off-by: Emelia Lei <[email protected]>
  • Loading branch information
emelialei88 committed Dec 23, 2024
1 parent 74c7fd4 commit c3ba4ff
Show file tree
Hide file tree
Showing 21 changed files with 626 additions and 22 deletions.
118 changes: 118 additions & 0 deletions src/groups/mqb/mqba/mqba_domainmanager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -702,6 +702,124 @@ int DomainManager::processCommand(mqbcmd::DomainsResult* result,
return 0; // RETURN
}
}
else if (command.isRemoveValue()) {
const bsl::string& name = command.remove().domain();

DomainSp domainSp;

if (0 != locateOrCreateDomain(&domainSp, name)) {
bmqu::MemOutStream os;
os << "Trying to remove a nonexistent domain '" << name << "'";
result->makeError().message() = os.str();
return -1; // RETURN
}

// First pass
if (command.remove().finalize().isNull()) {
BALL_LOG_INFO << "[First pass] DOMAINS REMOVE '" << name
<< "' called!!!";

// TODO: broadcast to all other nodes

// 1. Reject if there's any opened queue
if (domainSp->hasActiveQueue()) {
bmqu::MemOutStream os;
os << "Trying to remove the domain '" << name
<< "' while there are queues open";
result->makeError().message() = os.str();
return -1; // RETURN
}

BALL_LOG_INFO << "BEFORE CHEKCING CLUSTER STATUS";

// 2. Reject if the state of cluster is not healthy
// Notice that the bad state can happen anywhere down the road,
// so this check is not enough to prevent a partial execution.
// It's inevitable, so we can only make sure the code doesn't
// break if we run this command again
// TODO: ask if this is necessary???
mqbi::Cluster* cluster = domainSp->cluster();
mqbcmd::ClusterResult clusterStatusResult;
mqbcmd::ClusterCommand clusterStatusCommand;
clusterStatusCommand.makeStatus();

int rc = cluster->processCommand(&clusterStatusResult,
clusterStatusCommand);
if (clusterStatusResult.isErrorValue()) {
result->makeError(clusterStatusResult.error());
return rc; // RETURN
}

BALL_LOG_INFO << "AFTER CHEKCING CLUSTER STATUS";
BALL_LOG_INFO << clusterStatusResult.clusterStatus();

BSLS_ASSERT_SAFE(clusterStatusResult.isClusterStatusValue());

if (!clusterStatusResult.clusterStatus().isHealthy()) {
bmqu::MemOutStream os;
os << "Domain '" << name << "' in cluster '" << name
<< "' is not healthy";
result->makeError().message() = os.str();
return -1; // RETURN
}

BALL_LOG_INFO << "BEFORE SETTING 'DELETED' FLAG";

// 3. Mark DOMAIN REMOVING to Block all incoming openQueue requests
// TODO: this idea requires a lot of getters and setters exposed in
// mqbi - better idea?
domainSp->removeDomain();

BALL_LOG_INFO << "BEFORE CLEAN DOMAINRESOLVER CACHE";

// 4. Mark domain for delete in domainResolver
d_domainResolver_mp->clearCache(name);

BALL_LOG_INFO << "BEFORE PURGE";

// 5. Purge inactive queues
// remove virtual storage; add a record in journal file
mqbcmd::DomainCommand domainCommand;
domainCommand.makePurge();

mqbcmd::DomainResult domainResult;
rc = domainSp->processCommand(&domainResult, domainCommand);

if (domainResult.isErrorValue()) {
result->makeError(domainResult.error());
return rc; // RETURN
}
else if (domainResult.isSuccessValue()) {
result->makeSuccess(domainResult.success());
return rc; // RETURN
}
result->makeDomainResult(domainResult);

// 6. Force GC queues
// remove Queue from domain; remove storage from partition
// CLUSTERS CLUSTER <name> FORCE_GC_QUEUES
// TODO: Do we want to have add another type of result?
mqbcmd::ClusterResult clusterForceGCResult;
rc = cluster->gcQueueOnDomain(&clusterForceGCResult, name);
if (clusterForceGCResult.isErrorValue()) {
result->makeError(clusterForceGCResult.error());
return -1; // RETURN
}

// 7. Mark DOMAIN REMOVED to accecpt the second pass
domainSp->removeDomainCompleted();
}
// Second pass
else {
// TODO: remove the domain object

BALL_LOG_INFO << "[Second pass] DOMAINS REMOVE '" << name
<< "' finalize called!!!";
result->makeSuccess();
return 0; // RETURN
}
return 0;
}

bmqu::MemOutStream os;
os << "Unknown command '" << command << "'";
Expand Down
50 changes: 47 additions & 3 deletions src/groups/mqb/mqbblp/mqbblp_cluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -441,10 +441,12 @@ void Cluster::sendAck(bmqt::AckResult::Enum status,
d_throttledFailedAckMessages,
BALL_LOG_INFO << description() << ": failed Ack "
<< "[status: " << status << ", source: '" << source
<< "'" << ", correlationId: " << correlationId
<< "'"
<< ", correlationId: " << correlationId
<< ", GUID: " << messageGUID << ", queue: '"
<< (found ? uri : "** null **") << "' "
<< "(id: " << queueId << ")] " << "to node "
<< "(id: " << queueId << ")] "
<< "to node "
<< nodeSession->clusterNode()->nodeDescription(););
}

Expand Down Expand Up @@ -492,7 +494,8 @@ void Cluster::sendAck(bmqt::AckResult::Enum status,
d_throttledDroppedAckMessages,
BALL_LOG_ERROR << description() << ": dropping ACK message "
<< "[status: " << status << ", source: '" << source
<< "'" << ", correlationId: " << correlationId
<< "'"
<< ", correlationId: " << correlationId
<< ", GUID: " << messageGUID
<< ", queueId: " << queueId << "] to node "
<< nodeSession->clusterNode()->nodeDescription()
Expand Down Expand Up @@ -3656,6 +3659,47 @@ void Cluster::loadClusterStatus(mqbcmd::ClusterResult* result)
storageResult.clusterStorageSummary();
}

int Cluster::gcQueueOnDomain(mqbcmd::ClusterResult* result,
const bsl::string& domainName)
{
// exected by *ANY* thread

dispatcher()->execute(
bdlf::BindUtil::bind(&Cluster::gcQueueOnDomainDispatched,
this,
result,
domainName),
this);

dispatcher()->synchronize(this);

return 0;
}

void Cluster::gcQueueOnDomainDispatched(mqbcmd::ClusterResult* result,
const bsl::string& domainName)
{
// executed by the *DISPATCHER* thread

// PRECONDITIONS
BSLS_ASSERT_SAFE(dispatcher()->inDispatcherThread(this));

// 'true' implies immediate
if (const int rc = d_clusterOrchestrator.queueHelper().gcExpiredQueues(
true,
domainName)) {
BALL_LOG_ERROR << "Failed to execute force GC queues command (rc: "
<< rc << ")";
result->makeError().message() = "Failed to execute command (rc: " +
bsl::to_string(rc) + ")";
}
else {
// Otherwise the command succeeded.
BALL_LOG_INFO << "SUCCESS in Cluster::gcQueueOnDomainDispatched";
result->makeSuccess();
}
}

void Cluster::printClusterStateSummary(bsl::ostream& out,
int level,
int spacesPerLevel) const
Expand Down
8 changes: 8 additions & 0 deletions src/groups/mqb/mqbblp/mqbblp_cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -673,6 +673,14 @@ class Cluster : public mqbi::Cluster,
/// Load the cluster state to the specified `out` object.
void loadClusterStatus(mqbcmd::ClusterResult* out) BSLS_KEYWORD_OVERRIDE;

/// Force GC queues in this cluster on a given domain.
int gcQueueOnDomain(mqbcmd::ClusterResult* result,
const bsl::string& domainName) BSLS_KEYWORD_OVERRIDE;

/// Executed by dispatcher thread.
void gcQueueOnDomainDispatched(mqbcmd::ClusterResult* result,
const bsl::string& domainName);

// MANIPULATORS
// (virtual: mqbnet::SessionEventProcessor)

Expand Down
28 changes: 21 additions & 7 deletions src/groups/mqb/mqbblp/mqbblp_clusterproxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -780,7 +780,7 @@ void ClusterProxy::processEvent(const bmqp::Event& event,
<< "Received unexpected event: " << event;
BSLS_ASSERT_SAFE(false && "Unexpected event received");
return; // RETURN
} // break;
} // break;
default: {
BALL_LOG_ERROR << "#UNEXPECTED_EVENT " << description()
<< "Received unknown event: " << event;
Expand Down Expand Up @@ -1338,6 +1338,20 @@ void ClusterProxy::loadClusterStatus(mqbcmd::ClusterResult* out)
loadQueuesInfo(&clusterProxyStatus.queuesInfo());
}

int ClusterProxy::gcQueueOnDomain(
mqbcmd::ClusterResult* result,
BSLS_ANNOTATION_UNUSED const bsl::string& domainName)
{
// exected by *ANY* thread

bdlma::LocalSequentialAllocator<256> localAllocator(d_allocator_p);
bmqu::MemOutStream os(&localAllocator);
os << "GC Queue not supported on a Proxy.";
result->makeError().message() = os.str();

return 0;
}

// MANIPULATORS
// (virtual: mqbi::DispatcherClient)
void ClusterProxy::onDispatcherEvent(const mqbi::DispatcherEvent& event)
Expand Down Expand Up @@ -1401,31 +1415,31 @@ void ClusterProxy::onDispatcherEvent(const mqbi::DispatcherEvent& event)
BSLS_ASSERT_OPT(false &&
"'DISPATCHER' type dispatcher event unexpected");
return; // RETURN
} // break;
} // break;
case mqbi::DispatcherEventType::e_CLUSTER_STATE: {
BSLS_ASSERT_OPT(false &&
"'CLUSTER_STATE' type dispatcher event unexpected");
return; // RETURN
} // break;
} // break;
case mqbi::DispatcherEventType::e_STORAGE: {
BSLS_ASSERT_OPT(false && "'STORAGE' type dispatcher event unexpected");
return; // RETURN
} // break;
} // break;
case mqbi::DispatcherEventType::e_RECOVERY: {
BSLS_ASSERT_OPT(false &&
"'RECOVERY' type dispatcher event unexpected");
return; // RETURN
} // break;
} // break;
case mqbi::DispatcherEventType::e_UNDEFINED: {
BSLS_ASSERT_OPT(false &&
"'UNDEFINED' type dispatcher event unexpected");
return; // RETURN
} // break;
} // break;
case mqbi::DispatcherEventType::e_REPLICATION_RECEIPT: {
BSLS_ASSERT_OPT(
false && "'REPLICATION_RECEIPT' type dispatcher event unexpected");
return; // RETURN
} // break;
} // break;
default: {
BALL_LOG_ERROR << "#UNEXPECTED_EVENT " << description()
<< ": received unexpected dispatcher event: " << event;
Expand Down
4 changes: 4 additions & 0 deletions src/groups/mqb/mqbblp/mqbblp_clusterproxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -539,6 +539,10 @@ class ClusterProxy : public mqbc::ClusterStateObserver,
/// Load the cluster state in the specified `out` object.
void loadClusterStatus(mqbcmd::ClusterResult* out) BSLS_KEYWORD_OVERRIDE;

/// Force GC queues in this cluster on a given domain.
int gcQueueOnDomain(mqbcmd::ClusterResult* result,
const bsl::string& domainName) BSLS_KEYWORD_OVERRIDE;

void getPrimaryNodes(int* rc,
bsl::ostream& errorDescription,
bsl::vector<mqbnet::ClusterNode*>* nodes,
Expand Down
8 changes: 7 additions & 1 deletion src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6140,7 +6140,8 @@ void ClusterQueueHelper::onCloseQueueResponse(
<< contextSp->d_peer->nodeDescription();
}

int ClusterQueueHelper::gcExpiredQueues(bool immediate)
int ClusterQueueHelper::gcExpiredQueues(bool immediate,
const bsl::string& domainName)
{
// executed by the cluster *DISPATCHER* thread

Expand Down Expand Up @@ -6175,6 +6176,11 @@ int ClusterQueueHelper::gcExpiredQueues(bool immediate)
QueueLiveState& qinfo = queueContextSp->d_liveQInfo;
const int pid = queueContextSp->partitionId();

if (domainName != "" &&
it->second->uri().qualifiedDomain().compare(domainName) != 0) {
continue; // CONTINUE
}

if (!isQueueAssigned(*queueContextSp)) {
continue; // CONTINUE
}
Expand Down
5 changes: 3 additions & 2 deletions src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.h
Original file line number Diff line number Diff line change
Expand Up @@ -579,7 +579,7 @@ class ClusterQueueHelper BSLS_KEYWORD_FINAL

/// Process the open queue request represented by the specified
/// `context`: that is, depending on the cluster mode and queue
/// assignment, either send an open queue request of create the queue.
/// assignment, either send an open queue request or create the queue.
/// The queue must have been assigned at this point.
void processOpenQueueRequest(const OpenQueueContext& context);

Expand Down Expand Up @@ -1107,7 +1107,8 @@ class ClusterQueueHelper BSLS_KEYWORD_FINAL
/// delete the qualified queues immediately instead of marking them for
/// deletion in future. Returns 0 on success or a non-zero error code on
/// failure.
int gcExpiredQueues(bool immediate = false);
int gcExpiredQueues(bool immediate = false,
const bsl::string& domainName = "");

ClusterQueueHelper& setOnQueueAssignedCb(const OnQueueAssignedCb& value);

Expand Down
Loading

0 comments on commit c3ba4ff

Please sign in to comment.