diff --git a/src/groups/mqb/mqbblp/mqbblp_localqueue.cpp b/src/groups/mqb/mqbblp/mqbblp_localqueue.cpp index c3a674a78f..6672f1173e 100644 --- a/src/groups/mqb/mqbblp/mqbblp_localqueue.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_localqueue.cpp @@ -441,13 +441,12 @@ void LocalQueue::postMessage(const bmqp::PutHeader& putHeader, bsls::Types::Uint64 timestamp = bdlt::EpochUtil::convertToTimeT64( bdlt::CurrentTime::utc()); - // EXPERIMENTAL: - // Evaluate 'auto' subscriptions + // Evaluate application subscriptions mqbi::StorageResult::Enum res = - d_queueEngine_mp->evaluateAutoSubscriptions(putHeader, - appData, - translation, - timestamp); + d_queueEngine_mp->evaluateAppSubscriptions(putHeader, + appData, + translation, + timestamp); bool haveReceipt = true; unsigned int refCount = d_queueEngine_mp->messageReferenceCount(); diff --git a/src/groups/mqb/mqbblp/mqbblp_queueengineutil.cpp b/src/groups/mqb/mqbblp/mqbblp_queueengineutil.cpp index 023b3169f9..c1bd5fc1ec 100644 --- a/src/groups/mqb/mqbblp/mqbblp_queueengineutil.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_queueengineutil.cpp @@ -871,7 +871,7 @@ QueueEngineUtil_AppState::QueueEngineUtil_AppState( // nodes don't load the domain config. BSLS_ASSERT_SAFE(d_scheduler_p); - d_autoSubscription.d_evaluationContext_p = + d_appSubscription.d_evaluationContext_p = &d_routing_sp->d_queue.d_evaluationContext; const mqbcfg::AppConfig& brkrCfg = mqbcfg::BrokerConfig::get(); @@ -1372,21 +1372,21 @@ int QueueEngineUtil_AppState::setSubscription( if (mqbconfm::ExpressionVersion::E_VERSION_1 == value.version()) { if (d_subcriptionExpression.text().length()) { - int rc = d_autoSubscription.d_evaluator.compile( + int rc = d_appSubscription.d_evaluator.compile( d_subcriptionExpression.text(), d_routing_sp->d_compilationContext); return rc; // RETURN } } // Reset - d_autoSubscription.d_evaluator = bmqeval::SimpleEvaluator(); + d_appSubscription.d_evaluator = bmqeval::SimpleEvaluator(); return 0; } -bool QueueEngineUtil_AppState::evaluateAutoSubcription() +bool QueueEngineUtil_AppState::evaluateAppSubcription() { - return d_autoSubscription.evaluate(); + return d_appSubscription.evaluate(); } void QueueEngineUtil_AppState::authorize(const mqbu::StorageKey& appKey, diff --git a/src/groups/mqb/mqbblp/mqbblp_queueengineutil.h b/src/groups/mqb/mqbblp/mqbblp_queueengineutil.h index 9f9dfe2d3f..5548cfbc3d 100644 --- a/src/groups/mqb/mqbblp/mqbblp_queueengineutil.h +++ b/src/groups/mqb/mqbblp/mqbblp_queueengineutil.h @@ -374,10 +374,10 @@ struct QueueEngineUtil_AppState { bsls::AtomicBool d_isScheduled; mqbconfm::Expression d_subcriptionExpression; - // The auto subscription expression if any. + // The application subscription expression if any. - Routers::Expression d_autoSubscription; - // Evaluator of the auto subscription + Routers::Expression d_appSubscription; + // Evaluator of the application subscription unsigned int d_appOrdinal; @@ -503,11 +503,11 @@ struct QueueEngineUtil_AppState { const mqbi::StorageIterator* currentMessage, unsigned int ordinal); - // Set the auto subscription + // Set the application subscription int setSubscription(const mqbconfm::Expression& value); - // Evaluate the auto subscription - bool evaluateAutoSubcription(); + // Evaluate the application subscription + bool evaluateAppSubcription(); /// Change the state to authorized, thus enabling delivery void authorize(const mqbu::StorageKey& appKey, unsigned int appOrdinal); diff --git a/src/groups/mqb/mqbblp/mqbblp_relayqueueengine.cpp b/src/groups/mqb/mqbblp/mqbblp_relayqueueengine.cpp index 4a7b1bf6f4..fd300f0161 100644 --- a/src/groups/mqb/mqbblp/mqbblp_relayqueueengine.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_relayqueueengine.cpp @@ -1562,7 +1562,7 @@ void RelayQueueEngine::onTimer( // NOTHING } -mqbi::StorageResult::Enum RelayQueueEngine::evaluateAutoSubscriptions( +mqbi::StorageResult::Enum RelayQueueEngine::evaluateAppSubscriptions( BSLS_ANNOTATION_UNUSED const bmqp::PutHeader& putHeader, BSLS_ANNOTATION_UNUSED const bsl::shared_ptr& appData, BSLS_ANNOTATION_UNUSED const bmqp::MessagePropertiesInfo& mpi, diff --git a/src/groups/mqb/mqbblp/mqbblp_relayqueueengine.h b/src/groups/mqb/mqbblp/mqbblp_relayqueueengine.h index ac4cd31ae0..710e13182f 100644 --- a/src/groups/mqb/mqbblp/mqbblp_relayqueueengine.h +++ b/src/groups/mqb/mqbblp/mqbblp_relayqueueengine.h @@ -527,7 +527,7 @@ class RelayQueueEngine BSLS_KEYWORD_FINAL : public mqbi::QueueEngine { unsigned int appOrdinal) BSLS_KEYWORD_OVERRIDE; /// Not valid for 'RelayQueueEngine' - mqbi::StorageResult::Enum evaluateAutoSubscriptions( + mqbi::StorageResult::Enum evaluateAppSubscriptions( const bmqp::PutHeader& putHeader, const bsl::shared_ptr& appData, const bmqp::MessagePropertiesInfo& mpi, diff --git a/src/groups/mqb/mqbblp/mqbblp_rootqueueengine.cpp b/src/groups/mqb/mqbblp/mqbblp_rootqueueengine.cpp index 9c7993c7ed..ab164fb566 100644 --- a/src/groups/mqb/mqbblp/mqbblp_rootqueueengine.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_rootqueueengine.cpp @@ -265,7 +265,7 @@ RootQueueEngine::RootQueueEngine(QueueState* queueState, bdlf::PlaceHolders::_2), // enableLog allocator) , d_apps(allocator) -, d_hasAutoSubscriptions(false) +, d_hasAppSubscriptions(false) , d_isFanout(domainConfig.mode().isFanoutValue()) , d_scheduler_p(queueState->scheduler()) , d_miscWorkThreadPool_p(queueState->miscWorkThreadPool()) @@ -304,9 +304,10 @@ int RootQueueEngine::configure(bsl::ostream& errorDescription, , rc_APP_INITIALIZATION_ERROR = -1 // No Virtual Storage , - rc_AUTO_SUBSCRIPTION_ERROR = -2 // Wrong expression + rc_APP_SUBSCRIPTION_ERROR = -2 // Wrong expression , - rc_AUTO_SUBSCRIPTIONS_ERROR = -3 // Wrong number of auto subscriptions + rc_APP_SUBSCRIPTIONS_ERROR = -3 // Wrong number of application + // subscriptions }; // Populate map of appId to appKey for statically registered consumers @@ -314,7 +315,7 @@ int RootQueueEngine::configure(bsl::ostream& errorDescription, const bsl::vector& subscriptions = d_queueState_p->domain()->config().subscriptions(); - d_hasAutoSubscriptions = !subscriptions.empty(); + d_hasAppSubscriptions = !subscriptions.empty(); if (d_isFanout) { const bsl::vector& cfgAppIds = @@ -345,7 +346,7 @@ int RootQueueEngine::configure(bsl::ostream& errorDescription, << "' for the '" << itApp->first << "' app, rc: " << rc << ", reason: '" << bmqeval::ErrorType::toString(errorType) << "'"; - return rc_AUTO_SUBSCRIPTION_ERROR; // RETURN + return rc_APP_SUBSCRIPTION_ERROR; // RETURN } } else { @@ -364,13 +365,14 @@ int RootQueueEngine::configure(bsl::ostream& errorDescription, isReconfigure)) { return rc_APP_INITIALIZATION_ERROR; // RETURN } - // TODO: what is auto subscription "appId" for priority/broadcast? + // TODO: what is application subscription "appId" for + // priority/broadcast? if (subscriptions.size() > 1) { BALL_LOG_ERROR << "#QUEUE_CONFIGURE_FAILURE Queue '" << d_queueState_p->queue()->description() << "' Cannot have more than 1 auto subscription"; - return rc_AUTO_SUBSCRIPTIONS_ERROR; // RETURN + return rc_APP_SUBSCRIPTIONS_ERROR; // RETURN } Apps::iterator itApp = d_apps.begin(); @@ -395,7 +397,7 @@ int RootQueueEngine::configure(bsl::ostream& errorDescription, << subscriptions[0].expression().text() << "', rc: " << rc << ", reason: '" << bmqeval::ErrorType::toString(errorType) << "'"; - return rc_AUTO_SUBSCRIPTION_ERROR; // RETURN + return rc_APP_SUBSCRIPTION_ERROR; // RETURN } } @@ -1993,14 +1995,14 @@ void RootQueueEngine::unregisterStorage( iter->second->unauthorize(); } -mqbi::StorageResult::Enum RootQueueEngine::evaluateAutoSubscriptions( +mqbi::StorageResult::Enum RootQueueEngine::evaluateAppSubscriptions( const bmqp::PutHeader& putHeader, const bsl::shared_ptr& appData, const bmqp::MessagePropertiesInfo& mpi, bsls::Types::Uint64 timestamp) { - if (!d_hasAutoSubscriptions) { - // No-op if no auto subscriptions configured + if (!d_hasAppSubscriptions) { + // No-op if no application subscriptions configured return mqbi::StorageResult::e_SUCCESS; } @@ -2021,7 +2023,7 @@ mqbi::StorageResult::Enum RootQueueEngine::evaluateAutoSubscriptions( for (Apps::iterator it = d_apps.begin(); it != d_apps.end(); ++it) { AppStateSp& app = it->second; - if (!app->evaluateAutoSubcription()) { + if (!app->evaluateAppSubcription()) { result = d_queueState_p->storage()->autoConfirm(app->appKey(), timestamp); diff --git a/src/groups/mqb/mqbblp/mqbblp_rootqueueengine.h b/src/groups/mqb/mqbblp/mqbblp_rootqueueengine.h index a9567fd9f3..19cc3ada02 100644 --- a/src/groups/mqb/mqbblp/mqbblp_rootqueueengine.h +++ b/src/groups/mqb/mqbblp/mqbblp_rootqueueengine.h @@ -100,8 +100,8 @@ class RootQueueEngine BSLS_KEYWORD_FINAL : public mqbi::QueueEngine { Apps d_apps; // Map of appId to AppState - bool d_hasAutoSubscriptions; - // Does this queue engine have any auto subscriptions configured + bool d_hasAppSubscriptions; + // Does this queue engine have any application subscriptions configured const bool d_isFanout; @@ -407,12 +407,12 @@ class RootQueueEngine BSLS_KEYWORD_FINAL : public mqbi::QueueEngine { unsigned int appOrdinal) BSLS_KEYWORD_OVERRIDE; /// Given the specified 'putHeader', 'appData', 'mpi', and 'timestamp', - /// evaluate all Auto (Application) subscriptions and exclude applications - /// with negative results from message delivery. - /// Return 0 on success or an non-zero error code on failure. + /// evaluate all application subscriptions and exclude applications with + /// negative results from message delivery. Return 0 on success or an + /// non-zero error code on failure. /// /// THREAD: This method is called from the Queue's dispatcher thread. - mqbi::StorageResult::Enum evaluateAutoSubscriptions( + mqbi::StorageResult::Enum evaluateAppSubscriptions( const bmqp::PutHeader& putHeader, const bsl::shared_ptr& appData, const bmqp::MessagePropertiesInfo& mpi, diff --git a/src/groups/mqb/mqbconfm/mqbconf.xsd b/src/groups/mqb/mqbconfm/mqbconf.xsd index d41238a354..3027465cd2 100644 --- a/src/groups/mqb/mqbconfm/mqbconf.xsd +++ b/src/groups/mqb/mqbconfm/mqbconf.xsd @@ -198,7 +198,7 @@ message for the purpose of detecting duplicate PUTs. consistency.........: optional consistency mode. - subscriptions.......: optional Auto (Application) subscriptions + subscriptions.......: optional application subscriptions diff --git a/src/groups/mqb/mqbconfm/mqbconfm_messages.h b/src/groups/mqb/mqbconfm/mqbconfm_messages.h index 828f0fae53..ac2af9ec19 100644 --- a/src/groups/mqb/mqbconfm/mqbconfm_messages.h +++ b/src/groups/mqb/mqbconfm/mqbconfm_messages.h @@ -4763,7 +4763,7 @@ class Domain { // queue. Zero (the default) means unlimited deduplicationTimeMs.: // timeout, in milliseconds, to keep GUID of PUT message for the purpose of // detecting duplicate PUTs. consistency.........: optional consistency - // mode. subscriptions.......: optional Auto (Application) subscriptions + // mode. subscriptions.......: optional application subscriptions // INSTANCE DATA bsls::Types::Int64 d_messageTtl; diff --git a/src/groups/mqb/mqbi/mqbi_queueengine.h b/src/groups/mqb/mqbi/mqbi_queueengine.h index 5a858bb328..421a2f5d36 100644 --- a/src/groups/mqb/mqbi/mqbi_queueengine.h +++ b/src/groups/mqb/mqbi/mqbi_queueengine.h @@ -226,16 +226,16 @@ class QueueEngine { unsigned int appOrdinal); /// Given the specified 'putHeader', 'appData', 'mpi', and 'timestamp', - /// evaluate all Auto (Application) subscriptions and exclude applications - /// with negative results from message delivery. - /// Return 0 on success or an non-zero error code on failure. + /// evaluate all application subscriptions and exclude applications with + /// negative results from message delivery. Return 0 on success or an + /// non-zero error code on failure. /// /// THREAD: This method is called from the Queue's dispatcher thread. virtual StorageResult::Enum - evaluateAutoSubscriptions(const bmqp::PutHeader& putHeader, - const bsl::shared_ptr& appData, - const bmqp::MessagePropertiesInfo& mpi, - bsls::Types::Uint64 timestamp) = 0; + evaluateAppSubscriptions(const bmqp::PutHeader& putHeader, + const bsl::shared_ptr& appData, + const bmqp::MessagePropertiesInfo& mpi, + bsls::Types::Uint64 timestamp) = 0; // ACCESSORS diff --git a/src/groups/mqb/mqbmock/mqbmock_queueengine.cpp b/src/groups/mqb/mqbmock/mqbmock_queueengine.cpp index 51b0a1c10d..2ef1733114 100644 --- a/src/groups/mqb/mqbmock/mqbmock_queueengine.cpp +++ b/src/groups/mqb/mqbmock/mqbmock_queueengine.cpp @@ -143,7 +143,7 @@ void QueueEngine::onTimer( // NOTHING } -mqbi::StorageResult::Enum QueueEngine::evaluateAutoSubscriptions( +mqbi::StorageResult::Enum QueueEngine::evaluateAppSubscriptions( BSLS_ANNOTATION_UNUSED const bmqp::PutHeader& putHeader, BSLS_ANNOTATION_UNUSED const bsl::shared_ptr& appData, BSLS_ANNOTATION_UNUSED const bmqp::MessagePropertiesInfo& mpi, diff --git a/src/groups/mqb/mqbmock/mqbmock_queueengine.h b/src/groups/mqb/mqbmock/mqbmock_queueengine.h index 7c7461e22b..16af00459a 100644 --- a/src/groups/mqb/mqbmock/mqbmock_queueengine.h +++ b/src/groups/mqb/mqbmock/mqbmock_queueengine.h @@ -200,12 +200,12 @@ class QueueEngine : public mqbi::QueueEngine { void onTimer(bsls::Types::Int64 currentTimer) BSLS_KEYWORD_OVERRIDE; /// Given the specified 'putHeader', 'appData', 'mpi', and 'timestamp', - /// evaluate all Auto (Application) subscriptions and exclude applications - /// with negative results from message delivery. - /// Return 0 on success or an non-zero error code on failure. + /// evaluate all application subscriptions and exclude applications with + /// negative results from message delivery. Return 0 on success or an + /// non-zero error code on failure. /// /// THREAD: This method is called from the Queue's dispatcher thread. - mqbi::StorageResult::Enum evaluateAutoSubscriptions( + mqbi::StorageResult::Enum evaluateAppSubscriptions( const bmqp::PutHeader& putHeader, const bsl::shared_ptr& appData, const bmqp::MessagePropertiesInfo& mpi, diff --git a/src/integration-tests/test_auto_subscriptions.py b/src/integration-tests/test_app_subscriptions.py similarity index 92% rename from src/integration-tests/test_auto_subscriptions.py rename to src/integration-tests/test_app_subscriptions.py index 7f832a5811..5aa5db3834 100644 --- a/src/integration-tests/test_auto_subscriptions.py +++ b/src/integration-tests/test_app_subscriptions.py @@ -31,9 +31,10 @@ from blazingmq.dev.configurator.configurator import Configurator -class TestAutoSubscriptions: +class TestAppSubscriptions: """ - This test verifies auto subscription for one or more substreams (apps) + This test verifies application subscription for one or more substreams + (apps) """ def _start_client(self, broker, uri, name, subscriptions=[]): @@ -77,11 +78,11 @@ def _verify_delivery(self, consumer, num): {"appId": "bar", "expression": {"version": "E_VERSION_1", "text": "x==2"}}, ] ) - def test_auto_subscription_fanout(self, cluster: Cluster): + def test_app_subscription_fanout(self, cluster: Cluster): proxies = cluster.proxy_cycle() """ - Out of the 3 apps, configure one to evaluate auto subscription + Out of the 3 apps, configure one to evaluate application subscription negatively, another to evaluate positively, and another do not configure. Make sure the first does not get the message, and the rest do. @@ -162,9 +163,10 @@ def test_auto_subscription_fanout(self, cluster: Cluster): @tweak.domain.subscriptions( [{"appId": "", "expression": {"version": "E_VERSION_1", "text": "x==1"}}] ) - def test_auto_subscription_priority(self, cluster: Cluster): + def test_app_subscription_priority(self, cluster: Cluster): """ - Configure the priority queue to evaluate auto subscription negatively. + Configure the priority queue to evaluate application subscription + negatively. Make sure the queue does not get the message. Make sure the same is the case after restarts. """ @@ -217,12 +219,12 @@ def test_auto_subscription_priority(self, cluster: Cluster): {"appId": "bar", "expression": {"version": "E_VERSION_1", "text": "x > 2"}}, ] ) - def test_auto_subscription_with_consumer_subscription(self, cluster: Cluster): + def test_app_subscription_with_consumer_subscription(self, cluster: Cluster): """ - Out of the 3 apps, configure two to evaluate auto subscriptions. + Out of the 3 apps, configure two to evaluate application subscriptions. Configure consumsers with consumer subscriptions. - Make sure outcome of message delivery is logical AND of both auto and - consumer subscriptions. + Make sure outcome of message delivery is logical AND of both + application and consumer subscriptions. """ proxies = cluster.proxy_cycle() @@ -332,9 +334,10 @@ def test_auto_subscription_with_consumer_subscription(self, cluster: Cluster): @tweak.domain.subscriptions( [{"appId": "", "expression": {"version": "E_VERSION_1", "text": "x==1"}}] ) - def test_auto_subscription_broadcast(self, cluster: Cluster): + def test_app_subscription_broadcast(self, cluster: Cluster): """ - Configure the boadcast queue to evaluate auto subscription negatively. + Configure the boadcast queue to evaluate application subscription + negatively. Make sure the queue does not get the message. """ @@ -377,9 +380,10 @@ def test_auto_subscription_broadcast(self, cluster: Cluster): {"appId": "baz", "expression": {"version": "E_VERSION_1", "text": "x==3"}}, ] ) - def test_auto_subscription_fanout_all_negative(self, cluster: Cluster): + def test_app_subscription_fanout_all_negative(self, cluster: Cluster): """ - Configure all fanout Apps to evaluate auto subscriptions negatively. + Configure all fanout Apps to evaluate application subscriptions + negatively. Make sure none receives a message. """ proxies = cluster.proxy_cycle() @@ -434,11 +438,11 @@ def test_auto_subscription_fanout_all_negative(self, cluster: Cluster): ) def test_invalid_configuration(self, cluster: Cluster): """ - Configure priority domain with invalid auto subscription. + Configure priority domain with invalid application subscription. Make sure a queue fails to open. - Reconfigure the domain with valid auto subscription. + Reconfigure the domain with valid application subscription. Make sure a queue opens successfully. - Reconfigure the domain with invalid auto subscription. + Reconfigure the domain with invalid application subscription. Make sure the reconfigure command fails. Make sure a queue opens successfully. """ diff --git a/src/python/blazingmq/schemas/mqbconf.py b/src/python/blazingmq/schemas/mqbconf.py index b16a744d00..a475a1ec66 100644 --- a/src/python/blazingmq/schemas/mqbconf.py +++ b/src/python/blazingmq/schemas/mqbconf.py @@ -603,7 +603,7 @@ class Domain: message for the purpose of detecting duplicate PUTs. consistency.........: optional consistency mode. - subscriptions.......: optional Auto (Application) subscriptions + subscriptions.......: optional application subscriptions """ name: Optional[str] = field(