Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Performance[MQB,BMQ]: return constructed blobs as pointers #471

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 12 additions & 6 deletions src/groups/bmq/bmqa/bmqa_message.t.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,8 @@ static void test2_validPushMessagePrint()
typedef bsl::shared_ptr<bmqimp::Event> EventImplSp;

bdlbb::PooledBlobBufferFactory bufferFactory(4 * 1024, s_allocator_p);
bmqp::BlobPoolUtil::BlobSpPool blobSpPool(
bmqp::BlobPoolUtil::createBlobPool(&bufferFactory, s_allocator_p));
bmqa::Event event;

EventImplSp& implPtr = reinterpret_cast<EventImplSp&>(event);
Expand All @@ -183,7 +185,7 @@ static void test2_validPushMessagePrint()
bsl::strlen(buffer));

// Create PushEventBuilder
bmqp::PushEventBuilder peb(&bufferFactory, s_allocator_p);
bmqp::PushEventBuilder peb(&blobSpPool, s_allocator_p);
ASSERT_EQ(sizeof(bmqp::EventHeader), static_cast<size_t>(peb.eventSize()));
ASSERT_EQ(sizeof(bmqp::EventHeader),
static_cast<size_t>(peb.blob().length()));
Expand Down Expand Up @@ -240,6 +242,8 @@ static void test3_messageProperties()
// allocate memory for an automatically generated CorrelationId.

bdlbb::PooledBlobBufferFactory bufferFactory(4 * 1024, s_allocator_p);
bmqp::BlobPoolUtil::BlobSpPool blobSpPool(
bmqp::BlobPoolUtil::createBlobPool(&bufferFactory, s_allocator_p));

const int queueId = 4321;
const bmqt::MessageGUID guid;
Expand Down Expand Up @@ -269,7 +273,7 @@ static void test3_messageProperties()
bdlbb::BlobUtil::append(&payload, buffer, bsl::strlen(buffer));

// Create PushEventBuilder
bmqp::PushEventBuilder peb(&bufferFactory, s_allocator_p);
bmqp::PushEventBuilder peb(&blobSpPool, s_allocator_p);

// Add SubQueueInfo option
bmqt::EventBuilderResult::Enum rc = peb.addSubQueueInfosOption(
Expand Down Expand Up @@ -403,6 +407,8 @@ static void test4_subscriptionHandle()
bsl::shared_ptr<bmqimp::Queue> queueSp = bsl::make_shared<bmqimp::Queue>(
s_allocator_p);
bdlbb::PooledBlobBufferFactory bufferFactory(4 * 1024, s_allocator_p);
bmqp::BlobPoolUtil::BlobSpPool blobSpPool(
bmqp::BlobPoolUtil::createBlobPool(&bufferFactory, s_allocator_p));
bdlbb::Blob payload(&bufferFactory, s_allocator_p);

queueSp->setId(queueId);
Expand All @@ -424,7 +430,7 @@ static void test4_subscriptionHandle()
s_allocator_p);

// Create PushEventBuilder
bmqp::PushEventBuilder peb(&bufferFactory, s_allocator_p);
bmqp::PushEventBuilder peb(&blobSpPool, s_allocator_p);
ASSERT_EQ(0, peb.messageCount());

// Add SubQueueInfo option (subscription Id)
Expand Down Expand Up @@ -485,7 +491,7 @@ static void test4_subscriptionHandle()
s_allocator_p);

// Create PushEventBuilder
bmqp::PushEventBuilder peb(&bufferFactory, s_allocator_p);
bmqp::PushEventBuilder peb(&blobSpPool, s_allocator_p);
ASSERT_EQ(0, peb.messageCount());

// Add message
Expand Down Expand Up @@ -529,7 +535,7 @@ static void test4_subscriptionHandle()
s_allocator_p);

// Create PutEventBuilder
bmqp::PutEventBuilder builder(&bufferFactory, s_allocator_p);
bmqp::PutEventBuilder builder(&blobSpPool, s_allocator_p);
ASSERT_EQ(0, builder.messageCount());

// Add message
Expand Down Expand Up @@ -567,7 +573,7 @@ static void test4_subscriptionHandle()
s_allocator_p);

// Create AckEventBuilder
bmqp::AckEventBuilder builder(&bufferFactory, s_allocator_p);
bmqp::AckEventBuilder builder(&blobSpPool, s_allocator_p);
ASSERT_EQ(0, builder.messageCount());

bmqt::EventBuilderResult::Enum rc =
Expand Down
8 changes: 6 additions & 2 deletions src/groups/bmq/bmqa/bmqa_messageevent.t.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,9 @@ static void test2_ackMesageIteratorTest()
const int k_NUM_MSGS = 5;

bdlbb::PooledBlobBufferFactory bufferFactory(256, s_allocator_p);
bmqp::AckEventBuilder builder(&bufferFactory, s_allocator_p);
bmqp::BlobPoolUtil::BlobSpPool blobSpPool(
bmqp::BlobPoolUtil::createBlobPool(&bufferFactory, s_allocator_p));
bmqp::AckEventBuilder builder(&blobSpPool, s_allocator_p);
bsl::vector<AckData> messages(s_allocator_p);

PVV("Appending messages");
Expand Down Expand Up @@ -222,7 +224,9 @@ static void test3_putMessageIteratorTest()
const int k_NUM_MSGS = 5;

bdlbb::PooledBlobBufferFactory bufferFactory(256, s_allocator_p);
bmqp::PutEventBuilder builder(&bufferFactory, s_allocator_p);
bmqp::BlobPoolUtil::BlobSpPool blobSpPool(
bmqp::BlobPoolUtil::createBlobPool(&bufferFactory, s_allocator_p));
bmqp::PutEventBuilder builder(&blobSpPool, s_allocator_p);
bsl::vector<PutData> messages(s_allocator_p);

PVV("Appending messages");
Expand Down
22 changes: 18 additions & 4 deletions src/groups/bmq/bmqa/bmqa_mocksession.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,12 @@ Event MockSessionUtil::createAckEvent(const bsl::vector<AckParams>& acks,
implPtr = EventImplSp(new (*alloc) bmqimp::Event(g_bufferFactory_p, alloc),
alloc);

bmqp::AckEventBuilder ackBuilder(bufferFactory, alloc);
// TODO: deprecate `createAckEvent` with bufferFactory arg and introduce
// another function with BlobSpPool arg.
bmqa::Session::BlobSpPool blobSpPool(
bmqp::BlobPoolUtil::createBlobPool(bufferFactory, allocator));

bmqp::AckEventBuilder ackBuilder(&blobSpPool, alloc);
for (size_t i = 0; i != acks.size(); ++i) {
const AckParams& params = acks[i];
const QueueImplSp& impQueue = reinterpret_cast<const QueueImplSp&>(
Expand Down Expand Up @@ -336,7 +341,12 @@ Event MockSessionUtil::createPushEvent(
implPtr = EventImplSp(new (*alloc) bmqimp::Event(g_bufferFactory_p, alloc),
alloc);

bmqp::PushEventBuilder pushBuilder(bufferFactory, alloc);
// TODO: deprecate `createPushEvent` with bufferFactory arg and introduce
// another function with BlobSpPool arg.
bmqa::Session::BlobSpPool blobSpPool(
bmqp::BlobPoolUtil::createBlobPool(bufferFactory, allocator));

bmqp::PushEventBuilder pushBuilder(&blobSpPool, alloc);

for (size_t i = 0; i != pushEventParams.size(); ++i) {
const QueueImplSp& queueImplPtr = reinterpret_cast<const QueueImplSp&>(
Expand Down Expand Up @@ -946,6 +956,8 @@ int MockSession::start(const bsls::TimeInterval& timeout)
MockSession::MockSession(const bmqt::SessionOptions& options,
bslma::Allocator* allocator)
: d_blobBufferFactory(1024, allocator)
, d_blobSpPool(
bmqp::BlobPoolUtil::createBlobPool(&d_blobBufferFactory, allocator))
, d_eventHandler_mp(0)
, d_calls(allocator)
, d_eventsAndJobs(allocator)
Expand Down Expand Up @@ -990,6 +1002,8 @@ MockSession::MockSession(bslma::ManagedPtr<SessionEventHandler> eventHandler,
const bmqt::SessionOptions& options,
bslma::Allocator* allocator)
: d_blobBufferFactory(1024, allocator)
, d_blobSpPool(
bmqp::BlobPoolUtil::createBlobPool(&d_blobBufferFactory, allocator))
, d_eventHandler_mp(eventHandler)
, d_calls(bslma::Default::allocator(allocator))
, d_eventsAndJobs(bslma::Default::allocator(allocator))
Expand Down Expand Up @@ -1514,7 +1528,7 @@ void MockSession::loadMessageEventBuilder(MessageEventBuilder* builder)
g_bufferFactory_p,
d_allocator_p);

eventImplSpRef->configureAsMessageEvent(&d_blobBufferFactory);
eventImplSpRef->configureAsMessageEvent(&d_blobSpPool);
eventImplSpRef->setMessageCorrelationIdContainer(
d_corrIdContainer_sp.get());
}
Expand All @@ -1538,7 +1552,7 @@ void MockSession::loadConfirmEventBuilder(ConfirmEventBuilder* builder)
}

new (builderImplRef.d_buffer.buffer())
bmqp::ConfirmEventBuilder(&d_blobBufferFactory, d_allocator_p);
bmqp::ConfirmEventBuilder(&d_blobSpPool, d_allocator_p);

builderImplRef.d_builder_p = reinterpret_cast<bmqp::ConfirmEventBuilder*>(
builderImplRef.d_buffer.buffer());
Expand Down
8 changes: 8 additions & 0 deletions src/groups/bmq/bmqa/bmqa_mocksession.h
Original file line number Diff line number Diff line change
Expand Up @@ -749,6 +749,11 @@ struct MockSessionUtil {
/// Mechanism to mock a `bmqa::Session`
class MockSession : public AbstractSession {
public:
// TYPES

/// Pool of shared pointers to Blobs
typedef bmqa::Session::BlobSpPool BlobSpPool;

// CLASS METHODS

/// Perform a one time initialization needed by components used in
Expand Down Expand Up @@ -1010,6 +1015,9 @@ class MockSession : public AbstractSession {
/// Buffer factory
bdlbb::PooledBlobBufferFactory d_blobBufferFactory;

/// Pool of shared pointers to blobs
BlobSpPool d_blobSpPool;

/// Event handler (set only in asynchronous mode)
bslma::ManagedPtr<SessionEventHandler> d_eventHandler_mp;

Expand Down
6 changes: 3 additions & 3 deletions src/groups/bmq/bmqa/bmqa_session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -787,7 +787,7 @@ MessageEventBuilder Session::createMessageEventBuilder()
eventImplSpRef = d_impl.d_application_mp->brokerSession().createEvent();

eventImplSpRef->configureAsMessageEvent(
d_impl.d_application_mp->bufferFactory());
d_impl.d_application_mp->blobSpPool());

return builder;
}
Expand All @@ -814,7 +814,7 @@ void Session::loadMessageEventBuilder(MessageEventBuilder* builder)
eventImplSpRef = d_impl.d_application_mp->brokerSession().createEvent();

eventImplSpRef->configureAsMessageEvent(
d_impl.d_application_mp->bufferFactory());
d_impl.d_application_mp->blobSpPool());
}

void Session::loadConfirmEventBuilder(ConfirmEventBuilder* builder)
Expand All @@ -837,7 +837,7 @@ void Session::loadConfirmEventBuilder(ConfirmEventBuilder* builder)
}

new (builderImplRef.d_buffer.buffer())
bmqp::ConfirmEventBuilder(d_impl.d_application_mp->bufferFactory(),
bmqp::ConfirmEventBuilder(d_impl.d_application_mp->blobSpPool(),
d_impl.d_allocator_p);

builderImplRef.d_builder_p = reinterpret_cast<bmqp::ConfirmEventBuilder*>(
Expand Down
8 changes: 8 additions & 0 deletions src/groups/bmq/bmqa/bmqa_session.h
Original file line number Diff line number Diff line change
Expand Up @@ -565,6 +565,7 @@

// BDE
#include <ball_log.h>
#include <bdlcc_sharedobjectpool.h>
#include <bsl_memory.h>
#include <bsl_string.h>
#include <bslma_allocator.h>
Expand Down Expand Up @@ -679,6 +680,13 @@ class Session : public AbstractSession {
public:
// TYPES

/// Pool of shared pointers to Blobs
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not typedef bmqp::BlobPoolUtil::BlobSpPool?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't expose bmqp package in bmqa, since bmqa is a client package. And we need to declare BlobSpPool somewhere to be used in bmqa::MockSession. I moved it directly to private types in bmqa::MockSession

typedef bdlcc::SharedObjectPool<
bdlbb::Blob,
bdlcc::ObjectPoolFunctors::DefaultCreator,
bdlcc::ObjectPoolFunctors::RemoveAll<bdlbb::Blob> >
BlobSpPool;

/// Invoked as a response to an asynchronous open queue operation,
/// `OpenQueueCallback` is an alias for a callback function object
/// (functor) that takes as an argument the specified `result`,
Expand Down
5 changes: 5 additions & 0 deletions src/groups/bmq/bmqimp/bmqimp_application.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -553,6 +553,9 @@ Application::Application(
, d_channelsTip(&d_allocator)
, d_blobBufferFactory(sessionOptions.blobBufferSize(),
d_allocators.get("BlobBufferFactory"))
, d_blobSpPool(
bmqp::BlobPoolUtil::createBlobPool(&d_blobBufferFactory,
d_allocators.get("BlobSpPool")))
, d_scheduler(bsls::SystemClockType::e_MONOTONIC, &d_allocator)
, d_channelFactory(ntcCreateInterfaceConfig(sessionOptions, allocator),
&d_blobBufferFactory,
Expand Down Expand Up @@ -582,11 +585,13 @@ Application::Application(
negotiationMessage,
sessionOptions.connectTimeout(),
&d_blobBufferFactory,
&d_blobSpPool,
allocator),
allocator)
, d_connectHandle_mp()
, d_brokerSession(&d_scheduler,
&d_blobBufferFactory,
&d_blobSpPool,
d_sessionOptions,
eventHandlerCB,
bdlf::MemFnUtil::memFn(&Application::stateCb, this),
Expand Down
18 changes: 15 additions & 3 deletions src/groups/bmq/bmqimp/bmqimp_application.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,16 +78,18 @@ namespace bmqimp {

/// Top level object to manipulate a session with bmqbrkr
class Application {
public:
// PUBLIC TYPES
typedef bmqp::BlobPoolUtil::BlobSpPool BlobSpPool;

private:
// PRIVATE TYPES
typedef bslma::ManagedPtr<bmqio::ChannelFactory::OpHandle>
ChannelFactoryOpHandleMp;

private:
// CLASS-SCOPE CATEGORY
BALL_LOG_SET_CLASS_CATEGORY("BMQIMP.APPLICATION");

private:
// DATA
bmqst::StatContext d_allocatorStatContext;
// Stat context for counting allocators
Expand Down Expand Up @@ -116,6 +118,9 @@ class Application {
bdlbb::PooledBlobBufferFactory d_blobBufferFactory;
// Factory for blob buffers

/// Pool of shared pointers to blobs.
BlobSpPool d_blobSpPool;

bdlmt::EventScheduler d_scheduler;
// Scheduler

Expand Down Expand Up @@ -259,7 +264,9 @@ class Application {
/// instance.
bdlbb::BlobBufferFactory* bufferFactory();

// MANIPULATORS
/// Return a pointer to the blob shared pointer pool used by this instance.
/// Note that lifetime of the pointed-to pool is bound by this instance.
BlobSpPool* blobSpPool();

/// Start the session and the session pool. Return 0 on success or a
/// non-zero negative code otherwise. Calling start on an already
Expand Down Expand Up @@ -325,6 +332,11 @@ inline bdlbb::BlobBufferFactory* Application::bufferFactory()
return &d_blobBufferFactory;
}

inline Application::BlobSpPool* Application::blobSpPool()
{
return &d_blobSpPool;
}

} // close package namespace
} // close enterprise namespace

Expand Down
Loading
Loading