Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat[MQB]: Monolithic Virtual Storage #334

Merged
merged 14 commits into from
Oct 1, 2024
Merged
12 changes: 9 additions & 3 deletions src/groups/mqb/mqba/mqba_application.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <mqba_domainmanager.h>
#include <mqba_sessionnegotiator.h>
#include <mqbblp_clustercatalog.h>
#include <mqbblp_relayqueueengine.h>
#include <mqbcfg_brokerconfig.h>
#include <mqbcfg_messages.h>
#include <mqbcmd_commandlist.h>
Expand Down Expand Up @@ -159,6 +160,8 @@ Application::Application(bdlmt::EventScheduler* scheduler,
bdlf::PlaceHolders::_2), // allocator
k_BLOB_POOL_GROWTH_STRATEGY,
d_allocators.get("BlobSpPool"))
, d_pushElementsPool(sizeof(mqbblp::PushStream::Element),
d_allocators.get("PushElementsPool"))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that the default implementation of this concurrent pool uses geometric growth limited by 32. Does it worth it to set a larger const growth?

https://github.com/bloomberg/bde/blob/6ec108c973912eea7f009df971c553e7b8fa5ed5/groups/bdl/bdlma/bdlma_concurrentpool.cpp#L45

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do not expect large volume here as it is highly volatile. Once message is pushed, it gets removed

, d_allocatorsStatContext_p(allocatorsStatContext)
, d_pluginManager_mp()
, d_statController_mp()
Expand Down Expand Up @@ -269,6 +272,11 @@ int Application::start(bsl::ostream& errorDescription)
}
}

mqbi::ClusterResources resources(d_scheduler_p,
&d_bufferFactory,
&d_blobSpPool,
&d_pushElementsPool);

// Start the StatController
d_statController_mp.load(
new (*d_allocator_p) mqbstat::StatController(
Expand Down Expand Up @@ -355,12 +363,10 @@ int Application::start(bsl::ostream& errorDescription)

// Start the ClusterCatalog
d_clusterCatalog_mp.load(new (*d_allocator_p) mqbblp::ClusterCatalog(
d_scheduler_p,
d_dispatcher_mp.get(),
d_transportManager_mp.get(),
statContextsMap,
&d_bufferFactory,
&d_blobSpPool,
resources,
d_allocators.get("ClusterCatalog")),
d_allocator_p);

Expand Down
2 changes: 2 additions & 0 deletions src/groups/mqb/mqba/mqba_application.h
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,8 @@ class Application {

BlobSpPool d_blobSpPool;

bdlma::ConcurrentPool d_pushElementsPool;

mwcst::StatContext* d_allocatorsStatContext_p;
// Stat context of the counting allocators,
// if used
Expand Down
11 changes: 4 additions & 7 deletions src/groups/mqb/mqbblp/mqbblp_cluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2537,22 +2537,18 @@ Cluster::Cluster(const bslstl::StringRef& name,
bslma::ManagedPtr<mqbnet::Cluster> netCluster,
const StatContextsMap& statContexts,
mqbi::DomainFactory* domainFactory,
bdlmt::EventScheduler* scheduler,
mqbi::Dispatcher* dispatcher,
BlobSpPool* blobSpPool,
bdlbb::BlobBufferFactory* bufferFactory,
mqbnet::TransportManager* transportManager,
StopRequestManagerType* stopRequestsManager,
const mqbi::ClusterResources& resources,
bslma::Allocator* allocator,
const mqbnet::Session::AdminCommandEnqueueCb& adminCb)
: d_allocator_p(allocator)
, d_allocators(d_allocator_p)
, d_isStarted(false)
, d_isStopping(false)
, d_clusterData(name,
scheduler,
bufferFactory,
blobSpPool,
resources,
clusterConfig,
mqbcfg::ClusterProxyDefinition(allocator),
netCluster,
Expand Down Expand Up @@ -2588,7 +2584,8 @@ Cluster::Cluster(const bslstl::StringRef& name,
BSLS_ASSERT(d_allocator_p);
mqbnet::Cluster* netCluster_p = d_clusterData.membership().netCluster();
BSLS_ASSERT(netCluster_p && "NetCluster not set !");
BSLS_ASSERT(scheduler->clockType() == bsls::SystemClockType::e_MONOTONIC);
BSLS_ASSERT(resources.scheduler()->clockType() ==
bsls::SystemClockType::e_MONOTONIC);
BSLS_ASSERT_SAFE(d_clusterData.membership().selfNode() &&
"SelfNode not found in cluster!");

Expand Down
18 changes: 8 additions & 10 deletions src/groups/mqb/mqbblp/mqbblp_cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -467,17 +467,17 @@ class Cluster : public mqbi::Cluster,
const bmqp::QueueId& queueId,
mqbc::ClusterNodeSession* ns,
bmqp::EventType::Enum eventType);
// Validate a message of the specified 'eventType' using the specified
// 'queueId' and 'ns'. Return one of `ValidationResult` values. Populate
// the specified 'queueHandle' if the queue is found.
// Validate a message of the specified `eventType` using the specified
// `queueId` and `ns`. Return one of `ValidationResult` values. Populate
// the specified `queueHandle` if the queue is found.

bool validateRelayMessage(mqbc::ClusterNodeSession** ns,
bsl::ostream* errorStream,
const int pid);
// Validate a relay message using the specified 'pid'. Return true if the
// message is valid and false otherwise. Populate the specified 'ns' if the
// Validate a relay message using the specified `pid`. Return true if the
// message is valid and false otherwise. Populate the specified `ns` if the
// message is valid or load a descriptive error message into the
// 'errorStream' if the message is invalid.
// `errorStream` if the message is invalid.

/// Executes in any thread.
void
Expand Down Expand Up @@ -541,19 +541,17 @@ class Cluster : public mqbi::Cluster,
/// Create a new object representing a cluster having the specified
/// `name`, `clusterConfig` and `statContexts`, associated to the
/// specified `netCluster` and using the specified `domainFactory`,
/// `scheduler`, `dispatcher`, `blobSpPool` and `bufferFactory`. Use
/// `scheduler`, `dispatcher`, `transportManager`, and `resources`. Use
/// the specified `allocator` for any memory allocation.
Cluster(const bslstl::StringRef& name,
const mqbcfg::ClusterDefinition& clusterConfig,
bslma::ManagedPtr<mqbnet::Cluster> netCluster,
const StatContextsMap& statContexts,
mqbi::DomainFactory* domainFactory,
bdlmt::EventScheduler* scheduler,
mqbi::Dispatcher* dispatcher,
BlobSpPool* blobSpPool,
bdlbb::BlobBufferFactory* bufferFactory,
mqbnet::TransportManager* transportManager,
StopRequestManagerType* stopRequestsManager,
const mqbi::ClusterResources& resources,
bslma::Allocator* allocator,
const mqbnet::Session::AdminCommandEnqueueCb& adminCb);

Expand Down
30 changes: 11 additions & 19 deletions src/groups/mqb/mqbblp/mqbblp_clustercatalog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -182,12 +182,10 @@ int ClusterCatalog::createCluster(bsl::ostream& errorDescription,
netCluster,
d_statContexts,
d_domainFactory_p,
d_scheduler_p,
d_dispatcher_p,
d_blobSpPool_p,
d_bufferFactory_p,
d_transportManager_p,
&d_stopRequestsManager,
d_resources,
clusterAllocator,
d_adminCb);

Expand Down Expand Up @@ -226,12 +224,10 @@ int ClusterCatalog::createCluster(bsl::ostream& errorDescription,
clusterProxyDefinition,
netCluster,
d_statContexts,
d_scheduler_p,
d_bufferFactory_p,
d_blobSpPool_p,
d_dispatcher_p,
d_transportManager_p,
&d_stopRequestsManager,
d_resources,
clusterAllocator);

info.d_cluster_sp.reset(cluster, clusterAllocator);
Expand Down Expand Up @@ -362,20 +358,15 @@ int ClusterCatalog::initiateReversedClusterConnectionsImp(
return rc;
}

ClusterCatalog::ClusterCatalog(bdlmt::EventScheduler* scheduler,
mqbi::Dispatcher* dispatcher,
mqbnet::TransportManager* transportManager,
const StatContextsMap& statContexts,
bdlbb::BlobBufferFactory* bufferFactory,
BlobSpPool* blobSpPool,
bslma::Allocator* allocator)
ClusterCatalog::ClusterCatalog(mqbi::Dispatcher* dispatcher,
mqbnet::TransportManager* transportManager,
const StatContextsMap& statContexts,
const mqbi::ClusterResources& resources,
bslma::Allocator* allocator)
: d_allocator_p(allocator)
, d_allocators(d_allocator_p)
, d_isStarted(false)
, d_scheduler_p(scheduler)
, d_dispatcher_p(dispatcher)
, d_bufferFactory_p(bufferFactory)
, d_blobSpPool_p(blobSpPool)
, d_transportManager_p(transportManager)
, d_domainFactory_p(0)
, d_clustersDefinition(d_allocator_p)
Expand All @@ -386,14 +377,15 @@ ClusterCatalog::ClusterCatalog(bdlmt::EventScheduler* scheduler,
, d_clusters(d_allocator_p)
, d_statContexts(statContexts)
, d_requestManager(bmqp::EventType::e_CONTROL,
d_bufferFactory_p,
d_scheduler_p,
resources.bufferFactory(),
resources.scheduler(),
false, // lateResponseMode
d_allocator_p)
, d_stopRequestsManager(&d_requestManager, d_allocator_p)
, d_resources(resources)
{
// PRECONDITIONS
BSLS_ASSERT_SAFE(scheduler->clockType() ==
BSLS_ASSERT_SAFE(d_resources.scheduler()->clockType() ==
bsls::SystemClockType::e_MONOTONIC);
}

Expand Down
40 changes: 10 additions & 30 deletions src/groups/mqb/mqbblp/mqbblp_clustercatalog.h
Original file line number Diff line number Diff line change
Expand Up @@ -140,16 +140,6 @@ class ClusterCatalog {
// FRIENDS
friend class ClusterCatalogIterator;

public:
// TYPES

/// Pool of shared pointers to Blobs
typedef bdlcc::SharedObjectPool<
bdlbb::Blob,
bdlcc::ObjectPoolFunctors::DefaultCreator,
bdlcc::ObjectPoolFunctors::RemoveAll<bdlbb::Blob> >
BlobSpPool;

public:
// TYPES

Expand Down Expand Up @@ -225,19 +215,9 @@ class ClusterCatalog {
bool d_isStarted;
// True if this component is started

bdlmt::EventScheduler* d_scheduler_p;
// EventScheduler to use

mqbi::Dispatcher* d_dispatcher_p;
// Dispatcher to use

bdlbb::BlobBufferFactory* d_bufferFactory_p;
// Blob buffer factory to use

BlobSpPool* d_blobSpPool_p;
// Pool of shared pointers to blob to
// use.

mqbnet::TransportManager* d_transportManager_p;
// TransportManager for creating
// mqbnet::Cluster
Expand Down Expand Up @@ -290,6 +270,8 @@ class ClusterCatalog {
StatContextsMap d_statContexts;
// Map of stat contexts

const mqbi::ClusterResources d_resources;

mqbnet::Session::AdminCommandEnqueueCb d_adminCb;
// Callback function to enqueue admin commands

Expand Down Expand Up @@ -352,16 +334,14 @@ class ClusterCatalog {

// CREATORS

/// Create a new object using the specified `scheduler`, `dispatcher`,
/// `transportManager`, `bufferFactory`, `blobSpPool`, and the specified
/// `allocator`.
ClusterCatalog(bdlmt::EventScheduler* scheduler,
mqbi::Dispatcher* dispatcher,
mqbnet::TransportManager* transportManager,
const StatContextsMap& statContexts,
bdlbb::BlobBufferFactory* bufferFactory,
BlobSpPool* blobSpPool,
bslma::Allocator* allocator);
/// Create a new object using the specified 'dispatcher',
/// 'transportManager', 'statContexts', 'resources', and the specified
/// 'allocator'.
ClusterCatalog(mqbi::Dispatcher* dispatcher,
mqbnet::TransportManager* transportManager,
const StatContextsMap& statContexts,
const mqbi::ClusterResources& resources,
bslma::Allocator* allocator);

/// Destructor.
~ClusterCatalog();
Expand Down
10 changes: 3 additions & 7 deletions src/groups/mqb/mqbblp/mqbblp_clusterproxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1047,20 +1047,16 @@ ClusterProxy::ClusterProxy(
const mqbcfg::ClusterProxyDefinition& clusterProxyConfig,
bslma::ManagedPtr<mqbnet::Cluster> netCluster,
const StatContextsMap& statContexts,
bdlmt::EventScheduler* scheduler,
bdlbb::BlobBufferFactory* bufferFactory,
BlobSpPool* blobSpPool,
mqbi::Dispatcher* dispatcher,
mqbnet::TransportManager* transportManager,
StopRequestManagerType* stopRequestsManager,
const mqbi::ClusterResources& resources,
bslma::Allocator* allocator)
: d_allocator_p(allocator)
, d_isStarted(false)
, d_isStopping(false)
, d_clusterData(name,
scheduler,
bufferFactory,
blobSpPool,
resources,
mqbcfg::ClusterDefinition(allocator),
clusterProxyConfig,
netCluster,
Expand Down Expand Up @@ -1088,7 +1084,7 @@ ClusterProxy::ClusterProxy(
// PRECONDITIONS
mqbnet::Cluster* netCluster_p = d_clusterData.membership().netCluster();
BSLS_ASSERT_SAFE(netCluster_p && "NetCluster not set !");
BSLS_ASSERT_SAFE(scheduler->clockType() ==
BSLS_ASSERT_SAFE(resources.scheduler()->clockType() ==
bsls::SystemClockType::e_MONOTONIC);

d_clusterData.clusterConfig().queueOperations() =
Expand Down
19 changes: 7 additions & 12 deletions src/groups/mqb/mqbblp/mqbblp_clusterproxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -291,11 +291,8 @@ class ClusterProxy : public mqbc::ClusterStateObserver,
mqbnet::ClusterNode* node,
BSLS_ANNOTATION_UNUSED const bmqp_ctrlmsg::ClientIdentity& identity);

// Executed by the dispatcher thread when there is a change in the
// specified 'node' connectivity. If not empty, the specified
// 'session' points to newly connected 'Session'. Empty 'session'
// indicates loss of connectivity.

/// Executed by the dispatcher thread when the specified `node` becomes
/// unavailable.
void onNodeDownDispatched(mqbnet::ClusterNode* node);

/// Callback method when the `activeNodeLookupEvent` has expired.
Expand Down Expand Up @@ -429,20 +426,18 @@ class ClusterProxy : public mqbc::ClusterStateObserver,
// CREATORS

/// Create a new object representing a cluster having the specified
/// `name`, `clusterProxyConfig` and `statContexts`, associated to the
/// specified `netCluster` and using the specified `scheduler`,
/// `bufferFactory`, `blobSpPool` and `dispatcher`. Use the specified
/// `allocator` for any memory allocation.
/// `name`, `clusterConfig` and `statContexts`, associated to the
/// specified `netCluster` and using the specified `domainFactory`,
/// `scheduler`, `dispatcher`, `transportManager`, and `resources`. Use
/// the specified `allocator` for any memory allocation.
ClusterProxy(const bslstl::StringRef& name,
const mqbcfg::ClusterProxyDefinition& clusterProxyConfig,
bslma::ManagedPtr<mqbnet::Cluster> netCluster,
const StatContextsMap& statContexts,
bdlmt::EventScheduler* scheduler,
bdlbb::BlobBufferFactory* bufferFactory,
BlobSpPool* blobSpPool,
mqbi::Dispatcher* dispatcher,
mqbnet::TransportManager* transportManager,
StopRequestManagerType* stopRequestsManager,
const mqbi::ClusterResources& resources,
bslma::Allocator* allocator);

/// Destructor
Expand Down
Loading
Loading