diff --git a/src/integration-tests/test_reconfigure_domains.py b/src/integration-tests/test_reconfigure_domains.py index 001f5ed70..82f2c25b2 100644 --- a/src/integration-tests/test_reconfigure_domains.py +++ b/src/integration-tests/test_reconfigure_domains.py @@ -33,8 +33,8 @@ INITIAL_MSG_QUOTA = 10 -URI_PRIORITY_1 = f"bmq://{tc.DOMAIN_PRIORITY}/abcd-queue" -URI_PRIORITY_2 = f"bmq://{tc.DOMAIN_PRIORITY}/qrst-queue" +URI_PRIORITY_1 = f"bmq://{tc.DOMAIN_PRIORITY_SC}/abcd-queue" +URI_PRIORITY_2 = f"bmq://{tc.DOMAIN_PRIORITY_SC}/qrst-queue" class TestReconfigureDomains: @@ -49,8 +49,8 @@ def setup_cluster(self, cluster: Cluster): self.reader.open(URI_PRIORITY_1, flags=["read"], succeed=True) self.reader.open(URI_PRIORITY_2, flags=["read"], succeed=True) - # Instruct 'writer to 'POST 'n' messages to the domain. - # Returns 'True' if all of them succeed, and a false-y value otherwise. + # Instruct writer to POST `n` messages to the `uri`. Return true if all of + # them succeed, else a false-y value. def post_n_msgs(self, uri, n): results = ( self.writer.post(uri, payload=[f"msg{i}"], wait_ack=True) @@ -58,15 +58,19 @@ def post_n_msgs(self, uri, n): ) return all(res == Client.e_SUCCESS for res in results) - # Helper method which tells 'leader' to reload the domain config. - def reconfigure_to_n_msgs(self, cluster: Cluster, num_msgs, leader_only=True): + # Instruct the leader to reconfigure domain config in `cluster` to have a + # limit of `max_num_msgs`. If 'leader_only' is true, the 'DOMAINS + # RECONFIGURE' command will only be issued to the leader. Return true if + # reconfigure succeeds, else false. + def reconfigure_to_limit_n_msgs(self, cluster: Cluster, max_num_msgs): cluster.config.domains[ - tc.DOMAIN_PRIORITY - ].definition.parameters.storage.domain_limits.messages = num_msgs + tc.DOMAIN_PRIORITY_SC + ].definition.parameters.storage.domain_limits.messages = max_num_msgs return cluster.reconfigure_domain( - tc.DOMAIN_PRIORITY, leader_only=leader_only, succeed=True + tc.DOMAIN_PRIORITY_SC, leader_only=True, succeed=True ) + # Verify that reconfiguring domain message limits work as expected. @tweak.domain.storage.domain_limits.messages(INITIAL_MSG_QUOTA) def test_reconfigure_domain_message_limits(self, multi_node: Cluster): assert self.post_n_msgs(URI_PRIORITY_1, INITIAL_MSG_QUOTA) @@ -84,7 +88,7 @@ def test_reconfigure_domain_message_limits(self, multi_node: Cluster): assert not self.post_n_msgs(URI_PRIORITY_2, 1) # Modify the domain configuration to hold 2 more messages. - self.reconfigure_to_n_msgs(multi_node, INITIAL_MSG_QUOTA + 10) + self.reconfigure_to_limit_n_msgs(multi_node, INITIAL_MSG_QUOTA + 10) # Observe that posting two more messages succeeds. assert self.post_n_msgs(URI_PRIORITY_1, 5) @@ -96,13 +100,13 @@ def test_reconfigure_domain_message_limits(self, multi_node: Cluster): assert not self.post_n_msgs(URI_PRIORITY_2, 1) # Reconfigure limit back down to the initial value. - self.reconfigure_to_n_msgs(multi_node, INITIAL_MSG_QUOTA) + self.reconfigure_to_limit_n_msgs(multi_node, INITIAL_MSG_QUOTA) # Observe that posting continues to fail. assert not self.post_n_msgs(URI_PRIORITY_1, 1) assert not self.post_n_msgs(URI_PRIORITY_2, 1) - # Confirm 5 messages from the 'reader'. + # Confirm 10 messages from the 'reader'. self.reader.confirm(URI_PRIORITY_1, "+10", succeed=True) # Observe that posting still fails, since we are still at capacity. @@ -119,6 +123,7 @@ def test_reconfigure_domain_message_limits(self, multi_node: Cluster): self.reader.confirm(URI_PRIORITY_1, "+1", succeed=True) assert self.post_n_msgs(URI_PRIORITY_2, 1) + # Verify that reconfiguring queue message limits work as expected. @tweak.domain.storage.queue_limits.messages(INITIAL_MSG_QUOTA) def test_reconfigure_queue_message_limits(self, multi_node: Cluster): # Resource monitor allows exceeding message quota exactly once before @@ -135,11 +140,11 @@ def test_reconfigure_queue_message_limits(self, multi_node: Cluster): # Again observe that posting once more fails. assert not self.post_n_msgs(URI_PRIORITY_2, 1) - # Modify the domain configuration to hold 2 more messages per queue. + # Modify the domain configuration to hold 1 more message per queue. multi_node.config.domains[ - tc.DOMAIN_PRIORITY + tc.DOMAIN_PRIORITY_SC ].definition.parameters.storage.queue_limits.messages = (INITIAL_MSG_QUOTA + 1) - multi_node.reconfigure_domain(tc.DOMAIN_PRIORITY, succeed=True) + multi_node.reconfigure_domain(tc.DOMAIN_PRIORITY_SC, succeed=True) # Observe that posting one more message now succeeds for each queue. assert self.post_n_msgs(URI_PRIORITY_1, 1) @@ -149,6 +154,7 @@ def test_reconfigure_queue_message_limits(self, multi_node: Cluster): assert not self.post_n_msgs(URI_PRIORITY_1, 1) assert not self.post_n_msgs(URI_PRIORITY_2, 1) + # Verify that domain reconfiguration persists after leader change. @tweak.domain.storage.domain_limits.messages(1) def test_reconfigure_with_leader_change(self, multi_node: Cluster): leader = multi_node.last_known_leader @@ -158,10 +164,10 @@ def test_reconfigure_with_leader_change(self, multi_node: Cluster): assert not self.post_n_msgs(URI_PRIORITY_1, 1) # Reconfigure every node to accept an additional message. - self.reconfigure_to_n_msgs(multi_node, 3, leader_only=False) + self.reconfigure_to_limit_n_msgs(multi_node, 2) # Ensure the capacity increased as expected, then confirm one message. - assert self.post_n_msgs(URI_PRIORITY_1, 2) + assert self.post_n_msgs(URI_PRIORITY_1, 1) assert not self.post_n_msgs(URI_PRIORITY_1, 1) self.reader.confirm(URI_PRIORITY_1, "+1", succeed=True) @@ -193,10 +199,10 @@ def test_reconfigure_max_clients(self, multi_node: Cluster): # Reconfigure the domain to allow for one more producer to connect. multi_node.config.domains[ - tc.DOMAIN_PRIORITY + tc.DOMAIN_PRIORITY_SC ].definition.parameters.max_producers = 2 multi_node.reconfigure_domain( - tc.DOMAIN_PRIORITY, leader_only=True, succeed=True + tc.DOMAIN_PRIORITY_SC, leader_only=True, succeed=True ) # Confirm that the queue can be opened for writing, but not reading. @@ -205,10 +211,10 @@ def test_reconfigure_max_clients(self, multi_node: Cluster): # Reconfigure the domain to allow for one more consumer to connect. multi_node.config.domains[ - tc.DOMAIN_PRIORITY + tc.DOMAIN_PRIORITY_SC ].definition.parameters.max_consumers = 2 multi_node.reconfigure_domain( - tc.DOMAIN_PRIORITY, leader_only=True, succeed=True + tc.DOMAIN_PRIORITY_SC, leader_only=True, succeed=True ) # Confirm that the queue can be opened for reading. @@ -229,10 +235,10 @@ def test_reconfigure_max_queues(self, multi_node: Cluster): # Reconfigure the domain to allow for one more producer to connect. multi_node.config.domains[ - tc.DOMAIN_PRIORITY + tc.DOMAIN_PRIORITY_SC ].definition.parameters.max_queues = 3 multi_node.reconfigure_domain( - tc.DOMAIN_PRIORITY, leader_only=True, succeed=True + tc.DOMAIN_PRIORITY_SC, leader_only=True, succeed=True ) # Confirm that one more queue can be opened. @@ -261,10 +267,10 @@ def test_reconfigure_max_idle_time(self, multi_node: Cluster): # Reconfigure domain to tolerate as much as two seconds of idleness. multi_node.config.domains[ - tc.DOMAIN_PRIORITY + tc.DOMAIN_PRIORITY_SC ].definition.parameters.max_idle_time = 2 multi_node.reconfigure_domain( - tc.DOMAIN_PRIORITY, leader_only=True, succeed=True + tc.DOMAIN_PRIORITY_SC, leader_only=True, succeed=True ) # Write two further messages to the queue. @@ -294,10 +300,10 @@ def test_reconfigure_message_ttl(self, multi_node: Cluster): # Reconfigure the domain to wait 3 seconds before GC'ing messages. multi_node.config.domains[ - tc.DOMAIN_PRIORITY + tc.DOMAIN_PRIORITY_SC ].definition.parameters.message_ttl = 10 multi_node.reconfigure_domain( - tc.DOMAIN_PRIORITY, leader_only=True, succeed=True + tc.DOMAIN_PRIORITY_SC, leader_only=True, succeed=True ) # Write two further messages to the queue. @@ -314,7 +320,7 @@ def test_reconfigure_message_ttl(self, multi_node: Cluster): @tweak.domain.max_delivery_attempts(0) def test_reconfigure_max_delivery_attempts(self, multi_node: Cluster): - URI = f"bmq://{tc.DOMAIN_PRIORITY}/reconf-rda" + URI = f"bmq://{tc.DOMAIN_PRIORITY_SC}/reconf-rda" proxy = next(multi_node.proxy_cycle()) # Open the queue through the writer. @@ -348,16 +354,16 @@ def do_test(expect_success): # Reconfigure messages to expire after 5 delivery attempts. multi_node.config.domains[ - tc.DOMAIN_PRIORITY + tc.DOMAIN_PRIORITY_SC ].definition.parameters.max_delivery_attempts = 5 - multi_node.reconfigure_domain(tc.DOMAIN_PRIORITY, succeed=True) + multi_node.reconfigure_domain(tc.DOMAIN_PRIORITY_SC, succeed=True) # Expect that message will expire after failed deliveries. do_test(False) @tweak.domain.max_delivery_attempts(1) def test_reconfigure_max_delivery_attempts_finite(self, multi_node: Cluster): - URI = f"bmq://{tc.DOMAIN_PRIORITY}/reconf-rda" + URI = f"bmq://{tc.DOMAIN_PRIORITY_SC}/reconf-rda" proxy = next(multi_node.proxy_cycle()) # Open the queue through the writer. @@ -392,9 +398,9 @@ def do_test(expect_success, delivery_attempts): for max_delivery_attempts in range(2, 7): # Reconfigure messages to expire after max_delivery_attempts delivery attempts. multi_node.config.domains[ - tc.DOMAIN_PRIORITY + tc.DOMAIN_PRIORITY_SC ].definition.parameters.max_delivery_attempts = max_delivery_attempts - multi_node.reconfigure_domain(tc.DOMAIN_PRIORITY, succeed=True) + multi_node.reconfigure_domain(tc.DOMAIN_PRIORITY_SC, succeed=True) # Attempt to deliver message max_delivery_attempts times, # client confirms at the last attempt. @@ -414,7 +420,7 @@ def test_reconfigure_max_delivery_attempts_on_existing_messages( # On this stage, we open a producer to a queue and post 5 messages # with serial payloads: ["msg0", "msg1", "msg2", "msg3", "msg4"]. # We consider the very first message "msg0" poisonous. - URI = f"bmq://{tc.DOMAIN_PRIORITY}/reconf-rda-on-existing-msgs" + URI = f"bmq://{tc.DOMAIN_PRIORITY_SC}/reconf-rda-on-existing-msgs" proxy = next(multi_node.proxy_cycle()) self.writer.open(URI, flags=["write,ack"], succeed=True) @@ -479,15 +485,15 @@ def run_consumers(max_attempts: int) -> int: admin = AdminClient() admin.connect(*cluster.admin_endpoint) - res = admin.send_admin(f"DOMAINS DOMAIN {tc.DOMAIN_PRIORITY} INFOS") + res = admin.send_admin(f"DOMAINS DOMAIN {tc.DOMAIN_PRIORITY_SC} INFOS") assert '"maxDeliveryAttempts" : 0' in res cluster.config.domains[ - tc.DOMAIN_PRIORITY + tc.DOMAIN_PRIORITY_SC ].definition.parameters.max_delivery_attempts = 5 - cluster.reconfigure_domain(tc.DOMAIN_PRIORITY, succeed=True) + cluster.reconfigure_domain(tc.DOMAIN_PRIORITY_SC, succeed=True) - res = admin.send_admin(f"DOMAINS DOMAIN {tc.DOMAIN_PRIORITY} INFOS") + res = admin.send_admin(f"DOMAINS DOMAIN {tc.DOMAIN_PRIORITY_SC} INFOS") assert '"maxDeliveryAttempts" : 5' in res admin.stop()