diff --git a/src/groups/mqb/mqbblp/mqbblp_cluster.h b/src/groups/mqb/mqbblp/mqbblp_cluster.h index e0953db0a..e7839323f 100644 --- a/src/groups/mqb/mqbblp/mqbblp_cluster.h +++ b/src/groups/mqb/mqbblp/mqbblp_cluster.h @@ -545,7 +545,7 @@ class Cluster : public mqbi::Cluster, mqbi::DomainFactory* domainFactory, mqbi::Dispatcher* dispatcher, mqbnet::TransportManager* transportManager, - const mqbi::ClusterResources& resources, + const mqbi::ClusterResources& resources, bslma::Allocator* allocator, const mqbnet::Session::AdminCommandEnqueueCb& adminCb); diff --git a/src/groups/mqb/mqbblp/mqbblp_pushstream.cpp b/src/groups/mqb/mqbblp/mqbblp_pushstream.cpp index 59db176ed..dc849e937 100644 --- a/src/groups/mqb/mqbblp/mqbblp_pushstream.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_pushstream.cpp @@ -263,6 +263,18 @@ VirtualPushStreamIterator::~VirtualPushStreamIterator() // NOTHING } +unsigned int VirtualPushStreamIterator::numApps() const +{ + BSLS_ASSERT_SAFE(!atEnd()); + + return 1; +} + +void VirtualPushStreamIterator::removeCurrentElement() +{ + advance(); +} + // MANIPULATORS bool VirtualPushStreamIterator::advance() { diff --git a/src/groups/mqb/mqbblp/mqbblp_pushstream.h b/src/groups/mqb/mqbblp/mqbblp_pushstream.h index e01520df3..c6a188419 100644 --- a/src/groups/mqb/mqbblp/mqbblp_pushstream.h +++ b/src/groups/mqb/mqbblp/mqbblp_pushstream.h @@ -245,7 +245,7 @@ class PushStreamIterator : public mqbi::StorageIterator { mutable bsl::shared_ptr d_options_sp; protected: - PushStream* d_owner_p; + PushStream* d_owner_p; /// Current (`mqbi::AppMessage`, `upstreamSubQueueId`) pair. mutable PushStream::Element* d_currentElement; @@ -388,11 +388,21 @@ class VirtualPushStreamIterator : public PushStreamIterator { /// Destructor virtual ~VirtualPushStreamIterator() BSLS_KEYWORD_OVERRIDE; + /// Remove the current element (`mqbi::AppMessage`, `upstreamSubQueueId` + /// pair) from the current PUSH GUID. + /// The behavior is undefined unless `atEnd` returns `false`. + void removeCurrentElement(); + + /// Return the number of elements (`mqbi::AppMessage`, `upstreamSubQueueId` + /// pairs) for the current PUSH GUID. + /// The behavior is undefined unless `atEnd` returns `false`. + unsigned int numApps() const; + /// Return the current element (`mqbi::AppMessage`, `upstreamSubQueueId` /// pair). /// The behavior is undefined unless `atEnd` returns `false`. - PushStream::Element* element(unsigned int appOrdinal) const - BSLS_KEYWORD_OVERRIDE; + PushStream::Element* + element(unsigned int appOrdinal) const BSLS_KEYWORD_OVERRIDE; // MANIPULATORS bool advance() BSLS_KEYWORD_OVERRIDE; diff --git a/src/groups/mqb/mqbblp/mqbblp_pushstream.t.cpp b/src/groups/mqb/mqbblp/mqbblp_pushstream.t.cpp new file mode 100644 index 000000000..19f09b24f --- /dev/null +++ b/src/groups/mqb/mqbblp/mqbblp_pushstream.t.cpp @@ -0,0 +1,214 @@ +// Copyright 2024 Bloomberg Finance L.P. +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// mqbblp_pushstream.t.cpp -*-C++-*- +#include + +// BMQ +#include + +#include +#include + +#include + +// TEST DRIVER +#include + +// CONVENIENCE +using namespace BloombergLP; +using namespace bsl; + +// ============================================================================ +// TESTS +// ---------------------------------------------------------------------------- + +static void test1_basic() +{ + mwctst::TestHelper::printTestName("PushStream basic test"); + + bdlma::ConcurrentPool pushElementsPool(sizeof(mqbblp::PushStream::Element), + s_allocator_p); + + mqbblp::PushStream ps(&pushElementsPool, s_allocator_p); + unsigned int subQueueId = 0; + bsl::shared_ptr app; // unused + bmqp::SubQueueInfo subscription; + + mqbblp::PushStream::iterator itGuid = ps.findOrAppendMessage( + bmqp::MessageGUIDGenerator::testGUID()); + + mqbblp::PushStream::Apps::iterator itApp = + ps.d_apps.emplace(subQueueId, app).first; + + mqbblp::PushStream::Element* element = ps.create(subscription, + itGuid, + itApp); + + ps.add(element); + ps.remove(element); + ps.destroy(element, false); +} + +static void test2_iterations() +{ + mwctst::TestHelper::printTestName("PushStream basic test"); + + // Imitate {m1, a1}, {m2, a2}, {m1, a2}, {m2, a1} + + mqbblp::PushStream ps(bsl::nullptr_t(), s_allocator_p); + unsigned int subQueueId1 = 1; + unsigned int subQueueId2 = 2; + + bsl::shared_ptr unused; + + bmqp::SubQueueInfo subscription1(1); + bmqp::SubQueueInfo subscription2(2); + + mqbblp::PushStream::iterator itGuid1 = ps.findOrAppendMessage( + bmqp::MessageGUIDGenerator::testGUID()); + + mqbblp::PushStream::Apps::iterator itApp1 = + ps.d_apps.emplace(subQueueId1, unused).first; + + mqbblp::PushStream::Element* element1 = ps.create(subscription1, + itGuid1, + itApp1); + + ps.add(element1); + + mqbblp::PushStream::iterator itGuid2 = ps.findOrAppendMessage( + bmqp::MessageGUIDGenerator::testGUID()); + + mqbblp::PushStream::Apps::iterator itApp2 = + ps.d_apps.emplace(subQueueId2, unused).first; + + mqbblp::PushStream::Element* element2 = ps.create(subscription2, + itGuid2, + itApp2); + + ps.add(element2); + + mqbblp::PushStream::Element* element3 = ps.create(subscription2, + itGuid1, + itApp2); + + ps.add(element3); + + mqbblp::PushStream::Element* element4 = ps.create(subscription1, + itGuid2, + itApp1); + + ps.add(element4); + + mqbu::CapacityMeter dummyCapacityMeter("dummy", s_allocator_p); + bmqt::Uri dummyUri("dummy", s_allocator_p); + mqbconfm::Domain dummyDomain(s_allocator_p); + + mqbs::InMemoryStorage dummyStorage(dummyUri, + mqbu::StorageKey::k_NULL_KEY, + mqbs::DataStore::k_INVALID_PARTITION_ID, + dummyDomain, + &dummyCapacityMeter, + s_allocator_p); + + mqbconfm::Storage config; + mqbconfm::Limits limits; + + config.makeInMemory(); + + limits.messages() = bsl::numeric_limits::max(); + limits.bytes() = bsl::numeric_limits::max(); + + mwcu::MemOutStream errorDescription(s_allocator_p); + dummyStorage.configure(errorDescription, + config, + limits, + bsl::numeric_limits::max(), + 0); + + { + mqbblp::PushStreamIterator pit(&dummyStorage, + &ps, + ps.d_stream.begin()); + + ASSERT(!pit.atEnd()); + ASSERT_EQ(pit.numApps(), 2); + + ASSERT_EQ(element1, pit.element(0)); + ASSERT_EQ(element3, pit.element(1)); + + ASSERT(pit.advance()); + ASSERT_EQ(pit.numApps(), 2); + + ASSERT_EQ(element2, pit.element(0)); + ASSERT_EQ(element4, pit.element(1)); + + ASSERT(!pit.advance()); + } + + { + mqbblp::VirtualPushStreamIterator vit(subQueueId1, + &dummyStorage, + &ps, + ps.d_stream.begin()); + + ASSERT(!vit.atEnd()); + ASSERT_EQ(vit.numApps(), 1); + + ASSERT_EQ(element1, vit.element(0)); + + vit.advance(); + + ASSERT(!vit.atEnd()); + ASSERT_EQ(vit.numApps(), 1); + + ASSERT_EQ(element4, vit.element(0)); + + vit.advance(); + + ASSERT(vit.atEnd()); + } + + ps.remove(element2); + ps.destroy(element2, false); + ps.remove(element3); + ps.destroy(element3, false); +} + +// ============================================================================ +// MAIN PROGRAM +// ---------------------------------------------------------------------------- + +int main(int argc, char* argv[]) +{ + TEST_PROLOG(mwctst::TestHelper::e_DEFAULT); + + bmqt::UriParser::initialize(s_allocator_p); + + switch (_testCase) { + case 0: + case 1: test1_basic(); break; + case 2: test2_iterations(); break; + default: { + cerr << "WARNING: CASE '" << _testCase << "' NOT FOUND." << endl; + s_testStatus = -1; + } break; + } + + bmqt::UriParser::shutdown(); + + TEST_EPILOG(mwctst::TestHelper::e_CHECK_DEF_GBL_ALLOC); +} diff --git a/src/groups/mqb/mqbblp/mqbblp_queueengineutil.cpp b/src/groups/mqb/mqbblp/mqbblp_queueengineutil.cpp index 45a90fd30..89163d27b 100644 --- a/src/groups/mqb/mqbblp/mqbblp_queueengineutil.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_queueengineutil.cpp @@ -1130,7 +1130,6 @@ QueueEngineUtil_AppState::processDeliveryList(bsls::TimeInterval* delay, } else if (BSLS_PERFORMANCEHINT_PREDICT_UNLIKELY( !reader->appMessageView(ordinal()).isPending())) { - BMQ_LOGTHROTTLE_INFO() << "#STORAGE_UNKNOWN_MESSAGE " << "Queue: '" << d_queue_p->description() << "', app: '" << appId() diff --git a/src/groups/mqb/mqbblp/mqbblp_queuehandle.cpp b/src/groups/mqb/mqbblp/mqbblp_queuehandle.cpp index 5e470fce0..5006924b3 100644 --- a/src/groups/mqb/mqbblp/mqbblp_queuehandle.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_queuehandle.cpp @@ -235,9 +235,7 @@ void QueueHandle::rejectMessageDispatched(const bmqt::MessageGUID& msgGUID, // Update unconfirmed messages list and stats // Inform the queue about that reject. - d_queue_sp->rejectMessage(msgGUID, - subStream->d_upstreamSubQueueId, - this); + d_queue_sp->rejectMessage(msgGUID, subStream->d_upstreamSubQueueId, this); updateMonitor(subStream, msgGUID, bmqp::EventType::e_REJECT); // That erases 'msgGUID' from 'd_unconfirmedMessages' @@ -1217,9 +1215,11 @@ bool QueueHandle::canDeliver(unsigned int downstreamSubscriptionId) const BSLS_ASSERT_SAFE(cit != d_subscriptions.end()); - BALL_LOG_INFO - << "QueueHandle [" << (d_clientContext_sp ? d_clientContext_sp->description() : "none") - << "] canDeliver " << cit->second->d_unconfirmedMonitor.messages(); + BALL_LOG_INFO << "QueueHandle [" + << (d_clientContext_sp ? d_clientContext_sp->description() + : "none") + << "] canDeliver " + << cit->second->d_unconfirmedMonitor.messages(); return d_clientContext_sp && (cit->second->d_unconfirmedMonitor.state() != diff --git a/src/groups/mqb/mqbblp/mqbblp_relayqueueengine.cpp b/src/groups/mqb/mqbblp/mqbblp_relayqueueengine.cpp index 1764c4e4c..d06d9ef60 100644 --- a/src/groups/mqb/mqbblp/mqbblp_relayqueueengine.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_relayqueueengine.cpp @@ -1519,7 +1519,7 @@ void RelayQueueEngine::afterQueuePurged(const bsl::string& appId, // Purge all virtual storages, and reset all iterators. - unsigned int numMessages = d_pushStream.removeAll(); + d_pushStream.removeAll(); for (AppsMap::iterator it = d_apps.begin(); it != d_apps.end(); ++it) { it->second->clear(); diff --git a/src/integration-tests/test_poison_messages.py b/src/integration-tests/test_poison_messages.py index 956757563..60eff2374 100644 --- a/src/integration-tests/test_poison_messages.py +++ b/src/integration-tests/test_poison_messages.py @@ -510,54 +510,3 @@ def test_poison_no_reject_broker_graceful_shutdown(self, multi_node: Cluster): next(proxies) proxy = next(proxies) self._stop_proxy(multi_node, proxy, tc.DOMAIN_PRIORITY, False) - - def test_poison_update_monitor(self, multi_node: Cluster): - uri = tc.URI_PRIORITY - - proxies = multi_node.proxy_cycle() - # pick proxy in datacenter opposite to the primary's - next(proxies) - proxy = next(proxies) - - consumer1 = proxy.create_client(f"consumer_1") - consumer1.open(uri, flags=["read"], succeed=True, max_unconfirmed_messages = 2) - - consumer2 = proxy.create_client(f"consumer_2") - consumer2.open(uri, flags=["read"], succeed=True, max_unconfirmed_messages = 2) - - producer = proxy.create_client("producer") - producer.open(uri, flags=["write", "ack"], succeed=True) - producer.post(uri, payload=["for consumer_1"], succeed=True) - producer.post(uri, payload=["for consumer_2"], succeed=True) - producer.post(uri, payload=["for consumer_1"], succeed=True) - producer.post(uri, payload=["for consumer_2"], succeed=True) - - consumer1.wait_push_event() - consumer2.wait_push_event() - - assert len(consumer1.list(uri, True)) == 2 - assert len(consumer2.list(uri, True)) == 2 - - consumer1.check_exit_code = False - consumer1.kill() - - replica = multi_node.process(proxy.get_active_node()) - consumer3 = replica.create_client(f"consumer_3") - consumer3.open(uri, flags=["read"], succeed=True, max_unconfirmed_messages = 2) - - producer.post(uri, payload=["for consumer_3"], succeed=True) - - consumer3.wait_push_event() - - # ensure the message still got delivered - assert len(consumer3.list(uri, True)) == 1 - - producer.post(uri, payload=["for consumer_3"], succeed=True) - - consumer3.wait_push_event() - - # ensure the message still got delivered - assert len(consumer3.list(uri, True)) == 2 - - -