Skip to content

Commit

Permalink
Rename auto-subscriptions to application subscriptions
Browse files Browse the repository at this point in the history
This patch renames all instances of “auto subscriptions” in the code to
“application subscriptions”, which is what we use externally to describe this
feature.  Because application subscriptions are only evaluated and referenced
in the broker, there is no chance of an API-breaking change.  Indeed, only the
`mqb` package group is affected.

This patch leaves log messages that reference auto subscriptions;
this will be handled separately, as tools that parse the log for lines about
application subscriptions may depend on this older output.

Signed-off-by: Patrick M. Niedzielski <[email protected]>
  • Loading branch information
pniedzielski committed Dec 12, 2024
1 parent 8625cab commit 36a1a57
Show file tree
Hide file tree
Showing 14 changed files with 74 additions and 69 deletions.
11 changes: 5 additions & 6 deletions src/groups/mqb/mqbblp/mqbblp_localqueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -441,13 +441,12 @@ void LocalQueue::postMessage(const bmqp::PutHeader& putHeader,
bsls::Types::Uint64 timestamp = bdlt::EpochUtil::convertToTimeT64(
bdlt::CurrentTime::utc());

// EXPERIMENTAL:
// Evaluate 'auto' subscriptions
// Evaluate application subscriptions
mqbi::StorageResult::Enum res =
d_queueEngine_mp->evaluateAutoSubscriptions(putHeader,
appData,
translation,
timestamp);
d_queueEngine_mp->evaluateAppSubscriptions(putHeader,
appData,
translation,
timestamp);

bool haveReceipt = true;
unsigned int refCount = d_queueEngine_mp->messageReferenceCount();
Expand Down
10 changes: 5 additions & 5 deletions src/groups/mqb/mqbblp/mqbblp_queueengineutil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -871,7 +871,7 @@ QueueEngineUtil_AppState::QueueEngineUtil_AppState(
// nodes don't load the domain config.
BSLS_ASSERT_SAFE(d_scheduler_p);

d_autoSubscription.d_evaluationContext_p =
d_appSubscription.d_evaluationContext_p =
&d_routing_sp->d_queue.d_evaluationContext;

const mqbcfg::AppConfig& brkrCfg = mqbcfg::BrokerConfig::get();
Expand Down Expand Up @@ -1372,21 +1372,21 @@ int QueueEngineUtil_AppState::setSubscription(

if (mqbconfm::ExpressionVersion::E_VERSION_1 == value.version()) {
if (d_subcriptionExpression.text().length()) {
int rc = d_autoSubscription.d_evaluator.compile(
int rc = d_appSubscription.d_evaluator.compile(
d_subcriptionExpression.text(),
d_routing_sp->d_compilationContext);
return rc; // RETURN
}
}
// Reset
d_autoSubscription.d_evaluator = bmqeval::SimpleEvaluator();
d_appSubscription.d_evaluator = bmqeval::SimpleEvaluator();

return 0;
}

bool QueueEngineUtil_AppState::evaluateAutoSubcription()
bool QueueEngineUtil_AppState::evaluateAppSubcription()
{
return d_autoSubscription.evaluate();
return d_appSubscription.evaluate();
}

void QueueEngineUtil_AppState::authorize(const mqbu::StorageKey& appKey,
Expand Down
12 changes: 6 additions & 6 deletions src/groups/mqb/mqbblp/mqbblp_queueengineutil.h
Original file line number Diff line number Diff line change
Expand Up @@ -374,10 +374,10 @@ struct QueueEngineUtil_AppState {
bsls::AtomicBool d_isScheduled;

mqbconfm::Expression d_subcriptionExpression;
// The auto subscription expression if any.
// The application subscription expression if any.

Routers::Expression d_autoSubscription;
// Evaluator of the auto subscription
Routers::Expression d_appSubscription;
// Evaluator of the application subscription

unsigned int d_appOrdinal;

Expand Down Expand Up @@ -503,11 +503,11 @@ struct QueueEngineUtil_AppState {
const mqbi::StorageIterator* currentMessage,
unsigned int ordinal);

// Set the auto subscription
// Set the application subscription
int setSubscription(const mqbconfm::Expression& value);

// Evaluate the auto subscription
bool evaluateAutoSubcription();
// Evaluate the application subscription
bool evaluateAppSubcription();

/// Change the state to authorized, thus enabling delivery
void authorize(const mqbu::StorageKey& appKey, unsigned int appOrdinal);
Expand Down
2 changes: 1 addition & 1 deletion src/groups/mqb/mqbblp/mqbblp_relayqueueengine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1562,7 +1562,7 @@ void RelayQueueEngine::onTimer(
// NOTHING
}

mqbi::StorageResult::Enum RelayQueueEngine::evaluateAutoSubscriptions(
mqbi::StorageResult::Enum RelayQueueEngine::evaluateAppSubscriptions(
BSLS_ANNOTATION_UNUSED const bmqp::PutHeader& putHeader,
BSLS_ANNOTATION_UNUSED const bsl::shared_ptr<bdlbb::Blob>& appData,
BSLS_ANNOTATION_UNUSED const bmqp::MessagePropertiesInfo& mpi,
Expand Down
2 changes: 1 addition & 1 deletion src/groups/mqb/mqbblp/mqbblp_relayqueueengine.h
Original file line number Diff line number Diff line change
Expand Up @@ -527,7 +527,7 @@ class RelayQueueEngine BSLS_KEYWORD_FINAL : public mqbi::QueueEngine {
unsigned int appOrdinal) BSLS_KEYWORD_OVERRIDE;

/// Not valid for 'RelayQueueEngine'
mqbi::StorageResult::Enum evaluateAutoSubscriptions(
mqbi::StorageResult::Enum evaluateAppSubscriptions(
const bmqp::PutHeader& putHeader,
const bsl::shared_ptr<bdlbb::Blob>& appData,
const bmqp::MessagePropertiesInfo& mpi,
Expand Down
26 changes: 14 additions & 12 deletions src/groups/mqb/mqbblp/mqbblp_rootqueueengine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ RootQueueEngine::RootQueueEngine(QueueState* queueState,
bdlf::PlaceHolders::_2), // enableLog
allocator)
, d_apps(allocator)
, d_hasAutoSubscriptions(false)
, d_hasAppSubscriptions(false)
, d_isFanout(domainConfig.mode().isFanoutValue())
, d_scheduler_p(queueState->scheduler())
, d_miscWorkThreadPool_p(queueState->miscWorkThreadPool())
Expand Down Expand Up @@ -304,17 +304,18 @@ int RootQueueEngine::configure(bsl::ostream& errorDescription,
,
rc_APP_INITIALIZATION_ERROR = -1 // No Virtual Storage
,
rc_AUTO_SUBSCRIPTION_ERROR = -2 // Wrong expression
rc_APP_SUBSCRIPTION_ERROR = -2 // Wrong expression
,
rc_AUTO_SUBSCRIPTIONS_ERROR = -3 // Wrong number of auto subscriptions
rc_APP_SUBSCRIPTIONS_ERROR = -3 // Wrong number of application
// subscriptions
};

// Populate map of appId to appKey for statically registered consumers
size_t numApps = 0;

const bsl::vector<mqbconfm::Subscription>& subscriptions =
d_queueState_p->domain()->config().subscriptions();
d_hasAutoSubscriptions = !subscriptions.empty();
d_hasAppSubscriptions = !subscriptions.empty();

if (d_isFanout) {
const bsl::vector<bsl::string>& cfgAppIds =
Expand Down Expand Up @@ -345,7 +346,7 @@ int RootQueueEngine::configure(bsl::ostream& errorDescription,
<< "' for the '" << itApp->first << "' app, rc: " << rc
<< ", reason: '"
<< bmqeval::ErrorType::toString(errorType) << "'";
return rc_AUTO_SUBSCRIPTION_ERROR; // RETURN
return rc_APP_SUBSCRIPTION_ERROR; // RETURN
}
}
else {
Expand All @@ -364,13 +365,14 @@ int RootQueueEngine::configure(bsl::ostream& errorDescription,
isReconfigure)) {
return rc_APP_INITIALIZATION_ERROR; // RETURN
}
// TODO: what is auto subscription "appId" for priority/broadcast?
// TODO: what is application subscription "appId" for
// priority/broadcast?
if (subscriptions.size() > 1) {
BALL_LOG_ERROR << "#QUEUE_CONFIGURE_FAILURE Queue '"
<< d_queueState_p->queue()->description()
<< "' Cannot have more than 1 auto subscription";

return rc_AUTO_SUBSCRIPTIONS_ERROR; // RETURN
return rc_APP_SUBSCRIPTIONS_ERROR; // RETURN
}

Apps::iterator itApp = d_apps.begin();
Expand All @@ -395,7 +397,7 @@ int RootQueueEngine::configure(bsl::ostream& errorDescription,
<< subscriptions[0].expression().text()
<< "', rc: " << rc << ", reason: '"
<< bmqeval::ErrorType::toString(errorType) << "'";
return rc_AUTO_SUBSCRIPTION_ERROR; // RETURN
return rc_APP_SUBSCRIPTION_ERROR; // RETURN
}
}

Expand Down Expand Up @@ -1993,14 +1995,14 @@ void RootQueueEngine::unregisterStorage(
iter->second->unauthorize();
}

mqbi::StorageResult::Enum RootQueueEngine::evaluateAutoSubscriptions(
mqbi::StorageResult::Enum RootQueueEngine::evaluateAppSubscriptions(
const bmqp::PutHeader& putHeader,
const bsl::shared_ptr<bdlbb::Blob>& appData,
const bmqp::MessagePropertiesInfo& mpi,
bsls::Types::Uint64 timestamp)
{
if (!d_hasAutoSubscriptions) {
// No-op if no auto subscriptions configured
if (!d_hasAppSubscriptions) {
// No-op if no application subscriptions configured
return mqbi::StorageResult::e_SUCCESS;
}

Expand All @@ -2021,7 +2023,7 @@ mqbi::StorageResult::Enum RootQueueEngine::evaluateAutoSubscriptions(

for (Apps::iterator it = d_apps.begin(); it != d_apps.end(); ++it) {
AppStateSp& app = it->second;
if (!app->evaluateAutoSubcription()) {
if (!app->evaluateAppSubcription()) {
result = d_queueState_p->storage()->autoConfirm(app->appKey(),
timestamp);

Expand Down
12 changes: 6 additions & 6 deletions src/groups/mqb/mqbblp/mqbblp_rootqueueengine.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,8 @@ class RootQueueEngine BSLS_KEYWORD_FINAL : public mqbi::QueueEngine {
Apps d_apps;
// Map of appId to AppState

bool d_hasAutoSubscriptions;
// Does this queue engine have any auto subscriptions configured
bool d_hasAppSubscriptions;
// Does this queue engine have any application subscriptions configured

const bool d_isFanout;

Expand Down Expand Up @@ -407,12 +407,12 @@ class RootQueueEngine BSLS_KEYWORD_FINAL : public mqbi::QueueEngine {
unsigned int appOrdinal) BSLS_KEYWORD_OVERRIDE;

/// Given the specified 'putHeader', 'appData', 'mpi', and 'timestamp',
/// evaluate all Auto (Application) subscriptions and exclude applications
/// with negative results from message delivery.
/// Return 0 on success or an non-zero error code on failure.
/// evaluate all application subscriptions and exclude applications with
/// negative results from message delivery. Return 0 on success or an
/// non-zero error code on failure.
///
/// THREAD: This method is called from the Queue's dispatcher thread.
mqbi::StorageResult::Enum evaluateAutoSubscriptions(
mqbi::StorageResult::Enum evaluateAppSubscriptions(
const bmqp::PutHeader& putHeader,
const bsl::shared_ptr<bdlbb::Blob>& appData,
const bmqp::MessagePropertiesInfo& mpi,
Expand Down
2 changes: 1 addition & 1 deletion src/groups/mqb/mqbconfm/mqbconf.xsd
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@
message for the purpose of detecting duplicate
PUTs.
consistency.........: optional consistency mode.
subscriptions.......: optional Auto (Application) subscriptions
subscriptions.......: optional application subscriptions
</documentation>
</annotation>
<sequence>
Expand Down
2 changes: 1 addition & 1 deletion src/groups/mqb/mqbconfm/mqbconfm_messages.h
Original file line number Diff line number Diff line change
Expand Up @@ -4763,7 +4763,7 @@ class Domain {
// queue. Zero (the default) means unlimited deduplicationTimeMs.:
// timeout, in milliseconds, to keep GUID of PUT message for the purpose of
// detecting duplicate PUTs. consistency.........: optional consistency
// mode. subscriptions.......: optional Auto (Application) subscriptions
// mode. subscriptions.......: optional application subscriptions

// INSTANCE DATA
bsls::Types::Int64 d_messageTtl;
Expand Down
14 changes: 7 additions & 7 deletions src/groups/mqb/mqbi/mqbi_queueengine.h
Original file line number Diff line number Diff line change
Expand Up @@ -226,16 +226,16 @@ class QueueEngine {
unsigned int appOrdinal);

/// Given the specified 'putHeader', 'appData', 'mpi', and 'timestamp',
/// evaluate all Auto (Application) subscriptions and exclude applications
/// with negative results from message delivery.
/// Return 0 on success or an non-zero error code on failure.
/// evaluate all application subscriptions and exclude applications with
/// negative results from message delivery. Return 0 on success or an
/// non-zero error code on failure.
///
/// THREAD: This method is called from the Queue's dispatcher thread.
virtual StorageResult::Enum
evaluateAutoSubscriptions(const bmqp::PutHeader& putHeader,
const bsl::shared_ptr<bdlbb::Blob>& appData,
const bmqp::MessagePropertiesInfo& mpi,
bsls::Types::Uint64 timestamp) = 0;
evaluateAppSubscriptions(const bmqp::PutHeader& putHeader,
const bsl::shared_ptr<bdlbb::Blob>& appData,
const bmqp::MessagePropertiesInfo& mpi,
bsls::Types::Uint64 timestamp) = 0;

// ACCESSORS

Expand Down
2 changes: 1 addition & 1 deletion src/groups/mqb/mqbmock/mqbmock_queueengine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ void QueueEngine::onTimer(
// NOTHING
}

mqbi::StorageResult::Enum QueueEngine::evaluateAutoSubscriptions(
mqbi::StorageResult::Enum QueueEngine::evaluateAppSubscriptions(
BSLS_ANNOTATION_UNUSED const bmqp::PutHeader& putHeader,
BSLS_ANNOTATION_UNUSED const bsl::shared_ptr<bdlbb::Blob>& appData,
BSLS_ANNOTATION_UNUSED const bmqp::MessagePropertiesInfo& mpi,
Expand Down
8 changes: 4 additions & 4 deletions src/groups/mqb/mqbmock/mqbmock_queueengine.h
Original file line number Diff line number Diff line change
Expand Up @@ -200,12 +200,12 @@ class QueueEngine : public mqbi::QueueEngine {
void onTimer(bsls::Types::Int64 currentTimer) BSLS_KEYWORD_OVERRIDE;

/// Given the specified 'putHeader', 'appData', 'mpi', and 'timestamp',
/// evaluate all Auto (Application) subscriptions and exclude applications
/// with negative results from message delivery.
/// Return 0 on success or an non-zero error code on failure.
/// evaluate all application subscriptions and exclude applications with
/// negative results from message delivery. Return 0 on success or an
/// non-zero error code on failure.
///
/// THREAD: This method is called from the Queue's dispatcher thread.
mqbi::StorageResult::Enum evaluateAutoSubscriptions(
mqbi::StorageResult::Enum evaluateAppSubscriptions(
const bmqp::PutHeader& putHeader,
const bsl::shared_ptr<bdlbb::Blob>& appData,
const bmqp::MessagePropertiesInfo& mpi,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,10 @@
from blazingmq.dev.configurator.configurator import Configurator


class TestAutoSubscriptions:
class TestAppSubscriptions:
"""
This test verifies auto subscription for one or more substreams (apps)
This test verifies application subscription for one or more substreams
(apps)
"""

def _start_client(self, broker, uri, name, subscriptions=[]):
Expand Down Expand Up @@ -77,11 +78,11 @@ def _verify_delivery(self, consumer, num):
{"appId": "bar", "expression": {"version": "E_VERSION_1", "text": "x==2"}},
]
)
def test_auto_subscription_fanout(self, cluster: Cluster):
def test_app_subscription_fanout(self, cluster: Cluster):
proxies = cluster.proxy_cycle()

"""
Out of the 3 apps, configure one to evaluate auto subscription
Out of the 3 apps, configure one to evaluate application subscription
negatively, another to evaluate positively, and another do not
configure.
Make sure the first does not get the message, and the rest do.
Expand Down Expand Up @@ -162,9 +163,10 @@ def test_auto_subscription_fanout(self, cluster: Cluster):
@tweak.domain.subscriptions(
[{"appId": "", "expression": {"version": "E_VERSION_1", "text": "x==1"}}]
)
def test_auto_subscription_priority(self, cluster: Cluster):
def test_app_subscription_priority(self, cluster: Cluster):
"""
Configure the priority queue to evaluate auto subscription negatively.
Configure the priority queue to evaluate application subscription
negatively.
Make sure the queue does not get the message.
Make sure the same is the case after restarts.
"""
Expand Down Expand Up @@ -217,12 +219,12 @@ def test_auto_subscription_priority(self, cluster: Cluster):
{"appId": "bar", "expression": {"version": "E_VERSION_1", "text": "x > 2"}},
]
)
def test_auto_subscription_with_consumer_subscription(self, cluster: Cluster):
def test_app_subscription_with_consumer_subscription(self, cluster: Cluster):
"""
Out of the 3 apps, configure two to evaluate auto subscriptions.
Out of the 3 apps, configure two to evaluate application subscriptions.
Configure consumsers with consumer subscriptions.
Make sure outcome of message delivery is logical AND of both auto and
consumer subscriptions.
Make sure outcome of message delivery is logical AND of both
application and consumer subscriptions.
"""

proxies = cluster.proxy_cycle()
Expand Down Expand Up @@ -332,9 +334,10 @@ def test_auto_subscription_with_consumer_subscription(self, cluster: Cluster):
@tweak.domain.subscriptions(
[{"appId": "", "expression": {"version": "E_VERSION_1", "text": "x==1"}}]
)
def test_auto_subscription_broadcast(self, cluster: Cluster):
def test_app_subscription_broadcast(self, cluster: Cluster):
"""
Configure the boadcast queue to evaluate auto subscription negatively.
Configure the boadcast queue to evaluate application subscription
negatively.
Make sure the queue does not get the message.
"""

Expand Down Expand Up @@ -377,9 +380,10 @@ def test_auto_subscription_broadcast(self, cluster: Cluster):
{"appId": "baz", "expression": {"version": "E_VERSION_1", "text": "x==3"}},
]
)
def test_auto_subscription_fanout_all_negative(self, cluster: Cluster):
def test_app_subscription_fanout_all_negative(self, cluster: Cluster):
"""
Configure all fanout Apps to evaluate auto subscriptions negatively.
Configure all fanout Apps to evaluate application subscriptions
negatively.
Make sure none receives a message.
"""
proxies = cluster.proxy_cycle()
Expand Down Expand Up @@ -434,11 +438,11 @@ def test_auto_subscription_fanout_all_negative(self, cluster: Cluster):
)
def test_invalid_configuration(self, cluster: Cluster):
"""
Configure priority domain with invalid auto subscription.
Configure priority domain with invalid application subscription.
Make sure a queue fails to open.
Reconfigure the domain with valid auto subscription.
Reconfigure the domain with valid application subscription.
Make sure a queue opens successfully.
Reconfigure the domain with invalid auto subscription.
Reconfigure the domain with invalid application subscription.
Make sure the reconfigure command fails.
Make sure a queue opens successfully.
"""
Expand Down
2 changes: 1 addition & 1 deletion src/python/blazingmq/schemas/mqbconf.py
Original file line number Diff line number Diff line change
Expand Up @@ -603,7 +603,7 @@ class Domain:
message for the purpose of detecting duplicate
PUTs.
consistency.........: optional consistency mode.
subscriptions.......: optional Auto (Application) subscriptions
subscriptions.......: optional application subscriptions
"""

name: Optional[str] = field(
Expand Down

0 comments on commit 36a1a57

Please sign in to comment.