Skip to content

Commit

Permalink
adding UT
Browse files Browse the repository at this point in the history
Signed-off-by: dorjesinpo <[email protected]>
  • Loading branch information
dorjesinpo committed Aug 21, 2024
1 parent f0503a5 commit 777a3a3
Show file tree
Hide file tree
Showing 8 changed files with 248 additions and 64 deletions.
2 changes: 1 addition & 1 deletion src/groups/mqb/mqbblp/mqbblp_cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -545,7 +545,7 @@ class Cluster : public mqbi::Cluster,
mqbi::DomainFactory* domainFactory,
mqbi::Dispatcher* dispatcher,
mqbnet::TransportManager* transportManager,
const mqbi::ClusterResources& resources,
const mqbi::ClusterResources& resources,
bslma::Allocator* allocator,
const mqbnet::Session::AdminCommandEnqueueCb& adminCb);

Expand Down
12 changes: 12 additions & 0 deletions src/groups/mqb/mqbblp/mqbblp_pushstream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,18 @@ VirtualPushStreamIterator::~VirtualPushStreamIterator()
// NOTHING
}

unsigned int VirtualPushStreamIterator::numApps() const
{
BSLS_ASSERT_SAFE(!atEnd());

return 1;
}

void VirtualPushStreamIterator::removeCurrentElement()
{
advance();
}

// MANIPULATORS
bool VirtualPushStreamIterator::advance()
{
Expand Down
16 changes: 13 additions & 3 deletions src/groups/mqb/mqbblp/mqbblp_pushstream.h
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ class PushStreamIterator : public mqbi::StorageIterator {
mutable bsl::shared_ptr<bdlbb::Blob> d_options_sp;

protected:
PushStream* d_owner_p;
PushStream* d_owner_p;

/// Current (`mqbi::AppMessage`, `upstreamSubQueueId`) pair.
mutable PushStream::Element* d_currentElement;
Expand Down Expand Up @@ -388,11 +388,21 @@ class VirtualPushStreamIterator : public PushStreamIterator {
/// Destructor
virtual ~VirtualPushStreamIterator() BSLS_KEYWORD_OVERRIDE;

/// Remove the current element (`mqbi::AppMessage`, `upstreamSubQueueId`
/// pair) from the current PUSH GUID.
/// The behavior is undefined unless `atEnd` returns `false`.
void removeCurrentElement();

/// Return the number of elements (`mqbi::AppMessage`, `upstreamSubQueueId`
/// pairs) for the current PUSH GUID.
/// The behavior is undefined unless `atEnd` returns `false`.
unsigned int numApps() const;

/// Return the current element (`mqbi::AppMessage`, `upstreamSubQueueId`
/// pair).
/// The behavior is undefined unless `atEnd` returns `false`.
PushStream::Element* element(unsigned int appOrdinal) const
BSLS_KEYWORD_OVERRIDE;
PushStream::Element*
element(unsigned int appOrdinal) const BSLS_KEYWORD_OVERRIDE;

// MANIPULATORS
bool advance() BSLS_KEYWORD_OVERRIDE;
Expand Down
214 changes: 214 additions & 0 deletions src/groups/mqb/mqbblp/mqbblp_pushstream.t.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
// Copyright 2024 Bloomberg Finance L.P.
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// mqbblp_pushstream.t.cpp -*-C++-*-
#include <mqbblp_pushstream.h>

// BMQ
#include <bmqp_messageguidgenerator.h>

#include <mqbcfg_messages.h>
#include <mqbs_inmemorystorage.h>

#include <mwcu_memoutstream.h>

// TEST DRIVER
#include <mwctst_testhelper.h>

// CONVENIENCE
using namespace BloombergLP;
using namespace bsl;

// ============================================================================
// TESTS
// ----------------------------------------------------------------------------

static void test1_basic()
{
mwctst::TestHelper::printTestName("PushStream basic test");

bdlma::ConcurrentPool pushElementsPool(sizeof(mqbblp::PushStream::Element),
s_allocator_p);

mqbblp::PushStream ps(&pushElementsPool, s_allocator_p);
unsigned int subQueueId = 0;
bsl::shared_ptr<mqbblp::RelayQueueEngine_AppState> app; // unused
bmqp::SubQueueInfo subscription;

mqbblp::PushStream::iterator itGuid = ps.findOrAppendMessage(
bmqp::MessageGUIDGenerator::testGUID());

mqbblp::PushStream::Apps::iterator itApp =
ps.d_apps.emplace(subQueueId, app).first;

mqbblp::PushStream::Element* element = ps.create(subscription,
itGuid,
itApp);

ps.add(element);
ps.remove(element);
ps.destroy(element, false);
}

static void test2_iterations()
{
mwctst::TestHelper::printTestName("PushStream basic test");

// Imitate {m1, a1}, {m2, a2}, {m1, a2}, {m2, a1}

mqbblp::PushStream ps(bsl::nullptr_t(), s_allocator_p);
unsigned int subQueueId1 = 1;
unsigned int subQueueId2 = 2;

bsl::shared_ptr<mqbblp::RelayQueueEngine_AppState> unused;

bmqp::SubQueueInfo subscription1(1);
bmqp::SubQueueInfo subscription2(2);

mqbblp::PushStream::iterator itGuid1 = ps.findOrAppendMessage(
bmqp::MessageGUIDGenerator::testGUID());

mqbblp::PushStream::Apps::iterator itApp1 =
ps.d_apps.emplace(subQueueId1, unused).first;

mqbblp::PushStream::Element* element1 = ps.create(subscription1,
itGuid1,
itApp1);

ps.add(element1);

mqbblp::PushStream::iterator itGuid2 = ps.findOrAppendMessage(
bmqp::MessageGUIDGenerator::testGUID());

mqbblp::PushStream::Apps::iterator itApp2 =
ps.d_apps.emplace(subQueueId2, unused).first;

mqbblp::PushStream::Element* element2 = ps.create(subscription2,
itGuid2,
itApp2);

ps.add(element2);

mqbblp::PushStream::Element* element3 = ps.create(subscription2,
itGuid1,
itApp2);

ps.add(element3);

mqbblp::PushStream::Element* element4 = ps.create(subscription1,
itGuid2,
itApp1);

ps.add(element4);

mqbu::CapacityMeter dummyCapacityMeter("dummy", s_allocator_p);
bmqt::Uri dummyUri("dummy", s_allocator_p);
mqbconfm::Domain dummyDomain(s_allocator_p);

mqbs::InMemoryStorage dummyStorage(dummyUri,
mqbu::StorageKey::k_NULL_KEY,
mqbs::DataStore::k_INVALID_PARTITION_ID,
dummyDomain,
&dummyCapacityMeter,
s_allocator_p);

mqbconfm::Storage config;
mqbconfm::Limits limits;

config.makeInMemory();

limits.messages() = bsl::numeric_limits<bsls::Types::Int64>::max();
limits.bytes() = bsl::numeric_limits<bsls::Types::Int64>::max();

mwcu::MemOutStream errorDescription(s_allocator_p);
dummyStorage.configure(errorDescription,
config,
limits,
bsl::numeric_limits<bsls::Types::Int64>::max(),
0);

{
mqbblp::PushStreamIterator pit(&dummyStorage,
&ps,
ps.d_stream.begin());

ASSERT(!pit.atEnd());
ASSERT_EQ(pit.numApps(), 2);

ASSERT_EQ(element1, pit.element(0));
ASSERT_EQ(element3, pit.element(1));

ASSERT(pit.advance());
ASSERT_EQ(pit.numApps(), 2);

ASSERT_EQ(element2, pit.element(0));
ASSERT_EQ(element4, pit.element(1));

ASSERT(!pit.advance());
}

{
mqbblp::VirtualPushStreamIterator vit(subQueueId1,
&dummyStorage,
&ps,
ps.d_stream.begin());

ASSERT(!vit.atEnd());
ASSERT_EQ(vit.numApps(), 1);

ASSERT_EQ(element1, vit.element(0));

vit.advance();

ASSERT(!vit.atEnd());
ASSERT_EQ(vit.numApps(), 1);

ASSERT_EQ(element4, vit.element(0));

vit.advance();

ASSERT(vit.atEnd());
}

ps.remove(element2);
ps.destroy(element2, false);
ps.remove(element3);
ps.destroy(element3, false);
}

// ============================================================================
// MAIN PROGRAM
// ----------------------------------------------------------------------------

int main(int argc, char* argv[])
{
TEST_PROLOG(mwctst::TestHelper::e_DEFAULT);

bmqt::UriParser::initialize(s_allocator_p);

switch (_testCase) {
case 0:
case 1: test1_basic(); break;
case 2: test2_iterations(); break;
default: {
cerr << "WARNING: CASE '" << _testCase << "' NOT FOUND." << endl;
s_testStatus = -1;
} break;
}

bmqt::UriParser::shutdown();

TEST_EPILOG(mwctst::TestHelper::e_CHECK_DEF_GBL_ALLOC);
}
3 changes: 1 addition & 2 deletions src/groups/mqb/mqbblp/mqbblp_queueengineutil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1129,8 +1129,7 @@ QueueEngineUtil_AppState::processDeliveryList(bsls::TimeInterval* delay,
<< "' (not in the storage)";
}
else if (BSLS_PERFORMANCEHINT_PREDICT_UNLIKELY(
!reader->appMessageView(ordinal()).isPending())) {

!reader->appMessageView(ordinal()).isPending())) {
BMQ_LOGTHROTTLE_INFO()
<< "#STORAGE_UNKNOWN_MESSAGE " << "Queue: '"
<< d_queue_p->description() << "', app: '" << appId()
Expand Down
12 changes: 6 additions & 6 deletions src/groups/mqb/mqbblp/mqbblp_queuehandle.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -235,9 +235,7 @@ void QueueHandle::rejectMessageDispatched(const bmqt::MessageGUID& msgGUID,
// Update unconfirmed messages list and stats

// Inform the queue about that reject.
d_queue_sp->rejectMessage(msgGUID,
subStream->d_upstreamSubQueueId,
this);
d_queue_sp->rejectMessage(msgGUID, subStream->d_upstreamSubQueueId, this);

updateMonitor(subStream, msgGUID, bmqp::EventType::e_REJECT);
// That erases 'msgGUID' from 'd_unconfirmedMessages'
Expand Down Expand Up @@ -1217,9 +1215,11 @@ bool QueueHandle::canDeliver(unsigned int downstreamSubscriptionId) const

BSLS_ASSERT_SAFE(cit != d_subscriptions.end());

BALL_LOG_INFO
<< "QueueHandle [" << (d_clientContext_sp ? d_clientContext_sp->description() : "none")
<< "] canDeliver " << cit->second->d_unconfirmedMonitor.messages();
BALL_LOG_INFO << "QueueHandle ["
<< (d_clientContext_sp ? d_clientContext_sp->description()
: "none")
<< "] canDeliver "
<< cit->second->d_unconfirmedMonitor.messages();

return d_clientContext_sp &&
(cit->second->d_unconfirmedMonitor.state() !=
Expand Down
2 changes: 1 addition & 1 deletion src/groups/mqb/mqbblp/mqbblp_relayqueueengine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1519,7 +1519,7 @@ void RelayQueueEngine::afterQueuePurged(const bsl::string& appId,

// Purge all virtual storages, and reset all iterators.

unsigned int numMessages = d_pushStream.removeAll();
d_pushStream.removeAll();

for (AppsMap::iterator it = d_apps.begin(); it != d_apps.end(); ++it) {
it->second->clear();
Expand Down
51 changes: 0 additions & 51 deletions src/integration-tests/test_poison_messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -510,54 +510,3 @@ def test_poison_no_reject_broker_graceful_shutdown(self, multi_node: Cluster):
next(proxies)
proxy = next(proxies)
self._stop_proxy(multi_node, proxy, tc.DOMAIN_PRIORITY, False)

def test_poison_update_monitor(self, multi_node: Cluster):
uri = tc.URI_PRIORITY

proxies = multi_node.proxy_cycle()
# pick proxy in datacenter opposite to the primary's
next(proxies)
proxy = next(proxies)

consumer1 = proxy.create_client(f"consumer_1")
consumer1.open(uri, flags=["read"], succeed=True, max_unconfirmed_messages = 2)

consumer2 = proxy.create_client(f"consumer_2")
consumer2.open(uri, flags=["read"], succeed=True, max_unconfirmed_messages = 2)

producer = proxy.create_client("producer")
producer.open(uri, flags=["write", "ack"], succeed=True)
producer.post(uri, payload=["for consumer_1"], succeed=True)
producer.post(uri, payload=["for consumer_2"], succeed=True)
producer.post(uri, payload=["for consumer_1"], succeed=True)
producer.post(uri, payload=["for consumer_2"], succeed=True)

consumer1.wait_push_event()
consumer2.wait_push_event()

assert len(consumer1.list(uri, True)) == 2
assert len(consumer2.list(uri, True)) == 2

consumer1.check_exit_code = False
consumer1.kill()

replica = multi_node.process(proxy.get_active_node())
consumer3 = replica.create_client(f"consumer_3")
consumer3.open(uri, flags=["read"], succeed=True, max_unconfirmed_messages = 2)

producer.post(uri, payload=["for consumer_3"], succeed=True)

consumer3.wait_push_event()

# ensure the message still got delivered
assert len(consumer3.list(uri, True)) == 1

producer.post(uri, payload=["for consumer_3"], succeed=True)

consumer3.wait_push_event()

# ensure the message still got delivered
assert len(consumer3.list(uri, True)) == 2



0 comments on commit 777a3a3

Please sign in to comment.