From 36a1a57827d8dd03cadcda0b360fcd78d78d6ec4 Mon Sep 17 00:00:00 2001 From: "Patrick M. Niedzielski" Date: Thu, 12 Dec 2024 18:30:10 -0500 Subject: [PATCH] Rename auto-subscriptions to application subscriptions MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This patch renames all instances of “auto subscriptions” in the code to “application subscriptions”, which is what we use externally to describe this feature. Because application subscriptions are only evaluated and referenced in the broker, there is no chance of an API-breaking change. Indeed, only the `mqb` package group is affected. This patch leaves log messages that reference auto subscriptions; this will be handled separately, as tools that parse the log for lines about application subscriptions may depend on this older output. Signed-off-by: Patrick M. Niedzielski --- src/groups/mqb/mqbblp/mqbblp_localqueue.cpp | 11 +++--- .../mqb/mqbblp/mqbblp_queueengineutil.cpp | 10 ++--- .../mqb/mqbblp/mqbblp_queueengineutil.h | 12 +++--- .../mqb/mqbblp/mqbblp_relayqueueengine.cpp | 2 +- .../mqb/mqbblp/mqbblp_relayqueueengine.h | 2 +- .../mqb/mqbblp/mqbblp_rootqueueengine.cpp | 26 +++++++------ .../mqb/mqbblp/mqbblp_rootqueueengine.h | 12 +++--- src/groups/mqb/mqbconfm/mqbconf.xsd | 2 +- src/groups/mqb/mqbconfm/mqbconfm_messages.h | 2 +- src/groups/mqb/mqbi/mqbi_queueengine.h | 14 +++---- .../mqb/mqbmock/mqbmock_queueengine.cpp | 2 +- src/groups/mqb/mqbmock/mqbmock_queueengine.h | 8 ++-- ...criptions.py => test_app_subscriptions.py} | 38 ++++++++++--------- src/python/blazingmq/schemas/mqbconf.py | 2 +- 14 files changed, 74 insertions(+), 69 deletions(-) rename src/integration-tests/{test_auto_subscriptions.py => test_app_subscriptions.py} (92%) diff --git a/src/groups/mqb/mqbblp/mqbblp_localqueue.cpp b/src/groups/mqb/mqbblp/mqbblp_localqueue.cpp index c3a674a78f..6672f1173e 100644 --- a/src/groups/mqb/mqbblp/mqbblp_localqueue.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_localqueue.cpp @@ -441,13 +441,12 @@ void LocalQueue::postMessage(const bmqp::PutHeader& putHeader, bsls::Types::Uint64 timestamp = bdlt::EpochUtil::convertToTimeT64( bdlt::CurrentTime::utc()); - // EXPERIMENTAL: - // Evaluate 'auto' subscriptions + // Evaluate application subscriptions mqbi::StorageResult::Enum res = - d_queueEngine_mp->evaluateAutoSubscriptions(putHeader, - appData, - translation, - timestamp); + d_queueEngine_mp->evaluateAppSubscriptions(putHeader, + appData, + translation, + timestamp); bool haveReceipt = true; unsigned int refCount = d_queueEngine_mp->messageReferenceCount(); diff --git a/src/groups/mqb/mqbblp/mqbblp_queueengineutil.cpp b/src/groups/mqb/mqbblp/mqbblp_queueengineutil.cpp index 023b3169f9..c1bd5fc1ec 100644 --- a/src/groups/mqb/mqbblp/mqbblp_queueengineutil.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_queueengineutil.cpp @@ -871,7 +871,7 @@ QueueEngineUtil_AppState::QueueEngineUtil_AppState( // nodes don't load the domain config. BSLS_ASSERT_SAFE(d_scheduler_p); - d_autoSubscription.d_evaluationContext_p = + d_appSubscription.d_evaluationContext_p = &d_routing_sp->d_queue.d_evaluationContext; const mqbcfg::AppConfig& brkrCfg = mqbcfg::BrokerConfig::get(); @@ -1372,21 +1372,21 @@ int QueueEngineUtil_AppState::setSubscription( if (mqbconfm::ExpressionVersion::E_VERSION_1 == value.version()) { if (d_subcriptionExpression.text().length()) { - int rc = d_autoSubscription.d_evaluator.compile( + int rc = d_appSubscription.d_evaluator.compile( d_subcriptionExpression.text(), d_routing_sp->d_compilationContext); return rc; // RETURN } } // Reset - d_autoSubscription.d_evaluator = bmqeval::SimpleEvaluator(); + d_appSubscription.d_evaluator = bmqeval::SimpleEvaluator(); return 0; } -bool QueueEngineUtil_AppState::evaluateAutoSubcription() +bool QueueEngineUtil_AppState::evaluateAppSubcription() { - return d_autoSubscription.evaluate(); + return d_appSubscription.evaluate(); } void QueueEngineUtil_AppState::authorize(const mqbu::StorageKey& appKey, diff --git a/src/groups/mqb/mqbblp/mqbblp_queueengineutil.h b/src/groups/mqb/mqbblp/mqbblp_queueengineutil.h index 9f9dfe2d3f..5548cfbc3d 100644 --- a/src/groups/mqb/mqbblp/mqbblp_queueengineutil.h +++ b/src/groups/mqb/mqbblp/mqbblp_queueengineutil.h @@ -374,10 +374,10 @@ struct QueueEngineUtil_AppState { bsls::AtomicBool d_isScheduled; mqbconfm::Expression d_subcriptionExpression; - // The auto subscription expression if any. + // The application subscription expression if any. - Routers::Expression d_autoSubscription; - // Evaluator of the auto subscription + Routers::Expression d_appSubscription; + // Evaluator of the application subscription unsigned int d_appOrdinal; @@ -503,11 +503,11 @@ struct QueueEngineUtil_AppState { const mqbi::StorageIterator* currentMessage, unsigned int ordinal); - // Set the auto subscription + // Set the application subscription int setSubscription(const mqbconfm::Expression& value); - // Evaluate the auto subscription - bool evaluateAutoSubcription(); + // Evaluate the application subscription + bool evaluateAppSubcription(); /// Change the state to authorized, thus enabling delivery void authorize(const mqbu::StorageKey& appKey, unsigned int appOrdinal); diff --git a/src/groups/mqb/mqbblp/mqbblp_relayqueueengine.cpp b/src/groups/mqb/mqbblp/mqbblp_relayqueueengine.cpp index 4a7b1bf6f4..fd300f0161 100644 --- a/src/groups/mqb/mqbblp/mqbblp_relayqueueengine.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_relayqueueengine.cpp @@ -1562,7 +1562,7 @@ void RelayQueueEngine::onTimer( // NOTHING } -mqbi::StorageResult::Enum RelayQueueEngine::evaluateAutoSubscriptions( +mqbi::StorageResult::Enum RelayQueueEngine::evaluateAppSubscriptions( BSLS_ANNOTATION_UNUSED const bmqp::PutHeader& putHeader, BSLS_ANNOTATION_UNUSED const bsl::shared_ptr& appData, BSLS_ANNOTATION_UNUSED const bmqp::MessagePropertiesInfo& mpi, diff --git a/src/groups/mqb/mqbblp/mqbblp_relayqueueengine.h b/src/groups/mqb/mqbblp/mqbblp_relayqueueengine.h index ac4cd31ae0..710e13182f 100644 --- a/src/groups/mqb/mqbblp/mqbblp_relayqueueengine.h +++ b/src/groups/mqb/mqbblp/mqbblp_relayqueueengine.h @@ -527,7 +527,7 @@ class RelayQueueEngine BSLS_KEYWORD_FINAL : public mqbi::QueueEngine { unsigned int appOrdinal) BSLS_KEYWORD_OVERRIDE; /// Not valid for 'RelayQueueEngine' - mqbi::StorageResult::Enum evaluateAutoSubscriptions( + mqbi::StorageResult::Enum evaluateAppSubscriptions( const bmqp::PutHeader& putHeader, const bsl::shared_ptr& appData, const bmqp::MessagePropertiesInfo& mpi, diff --git a/src/groups/mqb/mqbblp/mqbblp_rootqueueengine.cpp b/src/groups/mqb/mqbblp/mqbblp_rootqueueengine.cpp index 9c7993c7ed..ab164fb566 100644 --- a/src/groups/mqb/mqbblp/mqbblp_rootqueueengine.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_rootqueueengine.cpp @@ -265,7 +265,7 @@ RootQueueEngine::RootQueueEngine(QueueState* queueState, bdlf::PlaceHolders::_2), // enableLog allocator) , d_apps(allocator) -, d_hasAutoSubscriptions(false) +, d_hasAppSubscriptions(false) , d_isFanout(domainConfig.mode().isFanoutValue()) , d_scheduler_p(queueState->scheduler()) , d_miscWorkThreadPool_p(queueState->miscWorkThreadPool()) @@ -304,9 +304,10 @@ int RootQueueEngine::configure(bsl::ostream& errorDescription, , rc_APP_INITIALIZATION_ERROR = -1 // No Virtual Storage , - rc_AUTO_SUBSCRIPTION_ERROR = -2 // Wrong expression + rc_APP_SUBSCRIPTION_ERROR = -2 // Wrong expression , - rc_AUTO_SUBSCRIPTIONS_ERROR = -3 // Wrong number of auto subscriptions + rc_APP_SUBSCRIPTIONS_ERROR = -3 // Wrong number of application + // subscriptions }; // Populate map of appId to appKey for statically registered consumers @@ -314,7 +315,7 @@ int RootQueueEngine::configure(bsl::ostream& errorDescription, const bsl::vector& subscriptions = d_queueState_p->domain()->config().subscriptions(); - d_hasAutoSubscriptions = !subscriptions.empty(); + d_hasAppSubscriptions = !subscriptions.empty(); if (d_isFanout) { const bsl::vector& cfgAppIds = @@ -345,7 +346,7 @@ int RootQueueEngine::configure(bsl::ostream& errorDescription, << "' for the '" << itApp->first << "' app, rc: " << rc << ", reason: '" << bmqeval::ErrorType::toString(errorType) << "'"; - return rc_AUTO_SUBSCRIPTION_ERROR; // RETURN + return rc_APP_SUBSCRIPTION_ERROR; // RETURN } } else { @@ -364,13 +365,14 @@ int RootQueueEngine::configure(bsl::ostream& errorDescription, isReconfigure)) { return rc_APP_INITIALIZATION_ERROR; // RETURN } - // TODO: what is auto subscription "appId" for priority/broadcast? + // TODO: what is application subscription "appId" for + // priority/broadcast? if (subscriptions.size() > 1) { BALL_LOG_ERROR << "#QUEUE_CONFIGURE_FAILURE Queue '" << d_queueState_p->queue()->description() << "' Cannot have more than 1 auto subscription"; - return rc_AUTO_SUBSCRIPTIONS_ERROR; // RETURN + return rc_APP_SUBSCRIPTIONS_ERROR; // RETURN } Apps::iterator itApp = d_apps.begin(); @@ -395,7 +397,7 @@ int RootQueueEngine::configure(bsl::ostream& errorDescription, << subscriptions[0].expression().text() << "', rc: " << rc << ", reason: '" << bmqeval::ErrorType::toString(errorType) << "'"; - return rc_AUTO_SUBSCRIPTION_ERROR; // RETURN + return rc_APP_SUBSCRIPTION_ERROR; // RETURN } } @@ -1993,14 +1995,14 @@ void RootQueueEngine::unregisterStorage( iter->second->unauthorize(); } -mqbi::StorageResult::Enum RootQueueEngine::evaluateAutoSubscriptions( +mqbi::StorageResult::Enum RootQueueEngine::evaluateAppSubscriptions( const bmqp::PutHeader& putHeader, const bsl::shared_ptr& appData, const bmqp::MessagePropertiesInfo& mpi, bsls::Types::Uint64 timestamp) { - if (!d_hasAutoSubscriptions) { - // No-op if no auto subscriptions configured + if (!d_hasAppSubscriptions) { + // No-op if no application subscriptions configured return mqbi::StorageResult::e_SUCCESS; } @@ -2021,7 +2023,7 @@ mqbi::StorageResult::Enum RootQueueEngine::evaluateAutoSubscriptions( for (Apps::iterator it = d_apps.begin(); it != d_apps.end(); ++it) { AppStateSp& app = it->second; - if (!app->evaluateAutoSubcription()) { + if (!app->evaluateAppSubcription()) { result = d_queueState_p->storage()->autoConfirm(app->appKey(), timestamp); diff --git a/src/groups/mqb/mqbblp/mqbblp_rootqueueengine.h b/src/groups/mqb/mqbblp/mqbblp_rootqueueengine.h index a9567fd9f3..19cc3ada02 100644 --- a/src/groups/mqb/mqbblp/mqbblp_rootqueueengine.h +++ b/src/groups/mqb/mqbblp/mqbblp_rootqueueengine.h @@ -100,8 +100,8 @@ class RootQueueEngine BSLS_KEYWORD_FINAL : public mqbi::QueueEngine { Apps d_apps; // Map of appId to AppState - bool d_hasAutoSubscriptions; - // Does this queue engine have any auto subscriptions configured + bool d_hasAppSubscriptions; + // Does this queue engine have any application subscriptions configured const bool d_isFanout; @@ -407,12 +407,12 @@ class RootQueueEngine BSLS_KEYWORD_FINAL : public mqbi::QueueEngine { unsigned int appOrdinal) BSLS_KEYWORD_OVERRIDE; /// Given the specified 'putHeader', 'appData', 'mpi', and 'timestamp', - /// evaluate all Auto (Application) subscriptions and exclude applications - /// with negative results from message delivery. - /// Return 0 on success or an non-zero error code on failure. + /// evaluate all application subscriptions and exclude applications with + /// negative results from message delivery. Return 0 on success or an + /// non-zero error code on failure. /// /// THREAD: This method is called from the Queue's dispatcher thread. - mqbi::StorageResult::Enum evaluateAutoSubscriptions( + mqbi::StorageResult::Enum evaluateAppSubscriptions( const bmqp::PutHeader& putHeader, const bsl::shared_ptr& appData, const bmqp::MessagePropertiesInfo& mpi, diff --git a/src/groups/mqb/mqbconfm/mqbconf.xsd b/src/groups/mqb/mqbconfm/mqbconf.xsd index d41238a354..3027465cd2 100644 --- a/src/groups/mqb/mqbconfm/mqbconf.xsd +++ b/src/groups/mqb/mqbconfm/mqbconf.xsd @@ -198,7 +198,7 @@ message for the purpose of detecting duplicate PUTs. consistency.........: optional consistency mode. - subscriptions.......: optional Auto (Application) subscriptions + subscriptions.......: optional application subscriptions diff --git a/src/groups/mqb/mqbconfm/mqbconfm_messages.h b/src/groups/mqb/mqbconfm/mqbconfm_messages.h index 828f0fae53..ac2af9ec19 100644 --- a/src/groups/mqb/mqbconfm/mqbconfm_messages.h +++ b/src/groups/mqb/mqbconfm/mqbconfm_messages.h @@ -4763,7 +4763,7 @@ class Domain { // queue. Zero (the default) means unlimited deduplicationTimeMs.: // timeout, in milliseconds, to keep GUID of PUT message for the purpose of // detecting duplicate PUTs. consistency.........: optional consistency - // mode. subscriptions.......: optional Auto (Application) subscriptions + // mode. subscriptions.......: optional application subscriptions // INSTANCE DATA bsls::Types::Int64 d_messageTtl; diff --git a/src/groups/mqb/mqbi/mqbi_queueengine.h b/src/groups/mqb/mqbi/mqbi_queueengine.h index 5a858bb328..421a2f5d36 100644 --- a/src/groups/mqb/mqbi/mqbi_queueengine.h +++ b/src/groups/mqb/mqbi/mqbi_queueengine.h @@ -226,16 +226,16 @@ class QueueEngine { unsigned int appOrdinal); /// Given the specified 'putHeader', 'appData', 'mpi', and 'timestamp', - /// evaluate all Auto (Application) subscriptions and exclude applications - /// with negative results from message delivery. - /// Return 0 on success or an non-zero error code on failure. + /// evaluate all application subscriptions and exclude applications with + /// negative results from message delivery. Return 0 on success or an + /// non-zero error code on failure. /// /// THREAD: This method is called from the Queue's dispatcher thread. virtual StorageResult::Enum - evaluateAutoSubscriptions(const bmqp::PutHeader& putHeader, - const bsl::shared_ptr& appData, - const bmqp::MessagePropertiesInfo& mpi, - bsls::Types::Uint64 timestamp) = 0; + evaluateAppSubscriptions(const bmqp::PutHeader& putHeader, + const bsl::shared_ptr& appData, + const bmqp::MessagePropertiesInfo& mpi, + bsls::Types::Uint64 timestamp) = 0; // ACCESSORS diff --git a/src/groups/mqb/mqbmock/mqbmock_queueengine.cpp b/src/groups/mqb/mqbmock/mqbmock_queueengine.cpp index 51b0a1c10d..2ef1733114 100644 --- a/src/groups/mqb/mqbmock/mqbmock_queueengine.cpp +++ b/src/groups/mqb/mqbmock/mqbmock_queueengine.cpp @@ -143,7 +143,7 @@ void QueueEngine::onTimer( // NOTHING } -mqbi::StorageResult::Enum QueueEngine::evaluateAutoSubscriptions( +mqbi::StorageResult::Enum QueueEngine::evaluateAppSubscriptions( BSLS_ANNOTATION_UNUSED const bmqp::PutHeader& putHeader, BSLS_ANNOTATION_UNUSED const bsl::shared_ptr& appData, BSLS_ANNOTATION_UNUSED const bmqp::MessagePropertiesInfo& mpi, diff --git a/src/groups/mqb/mqbmock/mqbmock_queueengine.h b/src/groups/mqb/mqbmock/mqbmock_queueengine.h index 7c7461e22b..16af00459a 100644 --- a/src/groups/mqb/mqbmock/mqbmock_queueengine.h +++ b/src/groups/mqb/mqbmock/mqbmock_queueengine.h @@ -200,12 +200,12 @@ class QueueEngine : public mqbi::QueueEngine { void onTimer(bsls::Types::Int64 currentTimer) BSLS_KEYWORD_OVERRIDE; /// Given the specified 'putHeader', 'appData', 'mpi', and 'timestamp', - /// evaluate all Auto (Application) subscriptions and exclude applications - /// with negative results from message delivery. - /// Return 0 on success or an non-zero error code on failure. + /// evaluate all application subscriptions and exclude applications with + /// negative results from message delivery. Return 0 on success or an + /// non-zero error code on failure. /// /// THREAD: This method is called from the Queue's dispatcher thread. - mqbi::StorageResult::Enum evaluateAutoSubscriptions( + mqbi::StorageResult::Enum evaluateAppSubscriptions( const bmqp::PutHeader& putHeader, const bsl::shared_ptr& appData, const bmqp::MessagePropertiesInfo& mpi, diff --git a/src/integration-tests/test_auto_subscriptions.py b/src/integration-tests/test_app_subscriptions.py similarity index 92% rename from src/integration-tests/test_auto_subscriptions.py rename to src/integration-tests/test_app_subscriptions.py index 7f832a5811..5aa5db3834 100644 --- a/src/integration-tests/test_auto_subscriptions.py +++ b/src/integration-tests/test_app_subscriptions.py @@ -31,9 +31,10 @@ from blazingmq.dev.configurator.configurator import Configurator -class TestAutoSubscriptions: +class TestAppSubscriptions: """ - This test verifies auto subscription for one or more substreams (apps) + This test verifies application subscription for one or more substreams + (apps) """ def _start_client(self, broker, uri, name, subscriptions=[]): @@ -77,11 +78,11 @@ def _verify_delivery(self, consumer, num): {"appId": "bar", "expression": {"version": "E_VERSION_1", "text": "x==2"}}, ] ) - def test_auto_subscription_fanout(self, cluster: Cluster): + def test_app_subscription_fanout(self, cluster: Cluster): proxies = cluster.proxy_cycle() """ - Out of the 3 apps, configure one to evaluate auto subscription + Out of the 3 apps, configure one to evaluate application subscription negatively, another to evaluate positively, and another do not configure. Make sure the first does not get the message, and the rest do. @@ -162,9 +163,10 @@ def test_auto_subscription_fanout(self, cluster: Cluster): @tweak.domain.subscriptions( [{"appId": "", "expression": {"version": "E_VERSION_1", "text": "x==1"}}] ) - def test_auto_subscription_priority(self, cluster: Cluster): + def test_app_subscription_priority(self, cluster: Cluster): """ - Configure the priority queue to evaluate auto subscription negatively. + Configure the priority queue to evaluate application subscription + negatively. Make sure the queue does not get the message. Make sure the same is the case after restarts. """ @@ -217,12 +219,12 @@ def test_auto_subscription_priority(self, cluster: Cluster): {"appId": "bar", "expression": {"version": "E_VERSION_1", "text": "x > 2"}}, ] ) - def test_auto_subscription_with_consumer_subscription(self, cluster: Cluster): + def test_app_subscription_with_consumer_subscription(self, cluster: Cluster): """ - Out of the 3 apps, configure two to evaluate auto subscriptions. + Out of the 3 apps, configure two to evaluate application subscriptions. Configure consumsers with consumer subscriptions. - Make sure outcome of message delivery is logical AND of both auto and - consumer subscriptions. + Make sure outcome of message delivery is logical AND of both + application and consumer subscriptions. """ proxies = cluster.proxy_cycle() @@ -332,9 +334,10 @@ def test_auto_subscription_with_consumer_subscription(self, cluster: Cluster): @tweak.domain.subscriptions( [{"appId": "", "expression": {"version": "E_VERSION_1", "text": "x==1"}}] ) - def test_auto_subscription_broadcast(self, cluster: Cluster): + def test_app_subscription_broadcast(self, cluster: Cluster): """ - Configure the boadcast queue to evaluate auto subscription negatively. + Configure the boadcast queue to evaluate application subscription + negatively. Make sure the queue does not get the message. """ @@ -377,9 +380,10 @@ def test_auto_subscription_broadcast(self, cluster: Cluster): {"appId": "baz", "expression": {"version": "E_VERSION_1", "text": "x==3"}}, ] ) - def test_auto_subscription_fanout_all_negative(self, cluster: Cluster): + def test_app_subscription_fanout_all_negative(self, cluster: Cluster): """ - Configure all fanout Apps to evaluate auto subscriptions negatively. + Configure all fanout Apps to evaluate application subscriptions + negatively. Make sure none receives a message. """ proxies = cluster.proxy_cycle() @@ -434,11 +438,11 @@ def test_auto_subscription_fanout_all_negative(self, cluster: Cluster): ) def test_invalid_configuration(self, cluster: Cluster): """ - Configure priority domain with invalid auto subscription. + Configure priority domain with invalid application subscription. Make sure a queue fails to open. - Reconfigure the domain with valid auto subscription. + Reconfigure the domain with valid application subscription. Make sure a queue opens successfully. - Reconfigure the domain with invalid auto subscription. + Reconfigure the domain with invalid application subscription. Make sure the reconfigure command fails. Make sure a queue opens successfully. """ diff --git a/src/python/blazingmq/schemas/mqbconf.py b/src/python/blazingmq/schemas/mqbconf.py index b16a744d00..a475a1ec66 100644 --- a/src/python/blazingmq/schemas/mqbconf.py +++ b/src/python/blazingmq/schemas/mqbconf.py @@ -603,7 +603,7 @@ class Domain: message for the purpose of detecting duplicate PUTs. consistency.........: optional consistency mode. - subscriptions.......: optional Auto (Application) subscriptions + subscriptions.......: optional application subscriptions """ name: Optional[str] = field(