Skip to content

Commit

Permalink
test_reconfigure_domains.py: Improve comments and readability (#537)
Browse files Browse the repository at this point in the history
* test_reconfigure_domains.py: Improve comments and readability

Signed-off-by: Yuan Jing Vincent Yan <[email protected]>

* Address feedback

Signed-off-by: Yuan Jing Vincent Yan <[email protected]>

---------

Signed-off-by: Yuan Jing Vincent Yan <[email protected]>
  • Loading branch information
kaikulimu authored Dec 11, 2024
1 parent fae5ec1 commit 35cf678
Showing 1 changed file with 45 additions and 39 deletions.
84 changes: 45 additions & 39 deletions src/integration-tests/test_reconfigure_domains.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@

INITIAL_MSG_QUOTA = 10

URI_PRIORITY_1 = f"bmq://{tc.DOMAIN_PRIORITY}/abcd-queue"
URI_PRIORITY_2 = f"bmq://{tc.DOMAIN_PRIORITY}/qrst-queue"
URI_PRIORITY_1 = f"bmq://{tc.DOMAIN_PRIORITY_SC}/abcd-queue"
URI_PRIORITY_2 = f"bmq://{tc.DOMAIN_PRIORITY_SC}/qrst-queue"


class TestReconfigureDomains:
Expand All @@ -49,24 +49,28 @@ def setup_cluster(self, cluster: Cluster):
self.reader.open(URI_PRIORITY_1, flags=["read"], succeed=True)
self.reader.open(URI_PRIORITY_2, flags=["read"], succeed=True)

# Instruct 'writer to 'POST 'n' messages to the domain.
# Returns 'True' if all of them succeed, and a false-y value otherwise.
# Instruct writer to POST `n` messages to the `uri`. Return true if all of
# them succeed, else a false-y value.
def post_n_msgs(self, uri, n):
results = (
self.writer.post(uri, payload=[f"msg{i}"], wait_ack=True)
for i in range(0, n)
)
return all(res == Client.e_SUCCESS for res in results)

# Helper method which tells 'leader' to reload the domain config.
def reconfigure_to_n_msgs(self, cluster: Cluster, num_msgs, leader_only=True):
# Instruct the leader to reconfigure domain config in `cluster` to have a
# limit of `max_num_msgs`. If 'leader_only' is true, the 'DOMAINS
# RECONFIGURE' command will only be issued to the leader. Return true if
# reconfigure succeeds, else false.
def reconfigure_to_limit_n_msgs(self, cluster: Cluster, max_num_msgs: int) -> bool:
cluster.config.domains[
tc.DOMAIN_PRIORITY
].definition.parameters.storage.domain_limits.messages = num_msgs
tc.DOMAIN_PRIORITY_SC
].definition.parameters.storage.domain_limits.messages = max_num_msgs
return cluster.reconfigure_domain(
tc.DOMAIN_PRIORITY, leader_only=leader_only, succeed=True
tc.DOMAIN_PRIORITY_SC, leader_only=True, succeed=True
)

# Verify that reconfiguring domain message limits works as expected.
@tweak.domain.storage.domain_limits.messages(INITIAL_MSG_QUOTA)
def test_reconfigure_domain_message_limits(self, multi_node: Cluster):
assert self.post_n_msgs(URI_PRIORITY_1, INITIAL_MSG_QUOTA)
Expand All @@ -83,8 +87,8 @@ def test_reconfigure_domain_message_limits(self, multi_node: Cluster):
assert not self.post_n_msgs(URI_PRIORITY_1, 1)
assert not self.post_n_msgs(URI_PRIORITY_2, 1)

# Modify the domain configuration to hold 2 more messages.
self.reconfigure_to_n_msgs(multi_node, INITIAL_MSG_QUOTA + 10)
# Modify the domain configuration to hold 10 more messages.
self.reconfigure_to_limit_n_msgs(multi_node, INITIAL_MSG_QUOTA + 10)

# Observe that posting two more messages succeeds.
assert self.post_n_msgs(URI_PRIORITY_1, 5)
Expand All @@ -96,13 +100,13 @@ def test_reconfigure_domain_message_limits(self, multi_node: Cluster):
assert not self.post_n_msgs(URI_PRIORITY_2, 1)

# Reconfigure limit back down to the initial value.
self.reconfigure_to_n_msgs(multi_node, INITIAL_MSG_QUOTA)
self.reconfigure_to_limit_n_msgs(multi_node, INITIAL_MSG_QUOTA)

# Observe that posting continues to fail.
assert not self.post_n_msgs(URI_PRIORITY_1, 1)
assert not self.post_n_msgs(URI_PRIORITY_2, 1)

# Confirm 5 messages from the 'reader'.
# Confirm 10 messages from the 'reader'.
self.reader.confirm(URI_PRIORITY_1, "+10", succeed=True)

# Observe that posting still fails, since we are still at capacity.
Expand All @@ -119,6 +123,7 @@ def test_reconfigure_domain_message_limits(self, multi_node: Cluster):
self.reader.confirm(URI_PRIORITY_1, "+1", succeed=True)
assert self.post_n_msgs(URI_PRIORITY_2, 1)

# Verify that reconfiguring queue message limits works as expected.
@tweak.domain.storage.queue_limits.messages(INITIAL_MSG_QUOTA)
def test_reconfigure_queue_message_limits(self, multi_node: Cluster):
# Resource monitor allows exceeding message quota exactly once before
Expand All @@ -135,11 +140,11 @@ def test_reconfigure_queue_message_limits(self, multi_node: Cluster):
# Again observe that posting once more fails.
assert not self.post_n_msgs(URI_PRIORITY_2, 1)

# Modify the domain configuration to hold 2 more messages per queue.
# Modify the domain configuration to hold 1 more message per queue.
multi_node.config.domains[
tc.DOMAIN_PRIORITY
tc.DOMAIN_PRIORITY_SC
].definition.parameters.storage.queue_limits.messages = (INITIAL_MSG_QUOTA + 1)
multi_node.reconfigure_domain(tc.DOMAIN_PRIORITY, succeed=True)
multi_node.reconfigure_domain(tc.DOMAIN_PRIORITY_SC, succeed=True)

# Observe that posting one more message now succeeds for each queue.
assert self.post_n_msgs(URI_PRIORITY_1, 1)
Expand All @@ -149,6 +154,7 @@ def test_reconfigure_queue_message_limits(self, multi_node: Cluster):
assert not self.post_n_msgs(URI_PRIORITY_1, 1)
assert not self.post_n_msgs(URI_PRIORITY_2, 1)

# Verify that domain reconfiguration persists after leader change.
@tweak.domain.storage.domain_limits.messages(1)
def test_reconfigure_with_leader_change(self, multi_node: Cluster):
leader = multi_node.last_known_leader
Expand All @@ -158,10 +164,10 @@ def test_reconfigure_with_leader_change(self, multi_node: Cluster):
assert not self.post_n_msgs(URI_PRIORITY_1, 1)

# Reconfigure every node to accept an additional message.
self.reconfigure_to_n_msgs(multi_node, 3, leader_only=False)
self.reconfigure_to_limit_n_msgs(multi_node, 2)

# Ensure the capacity increased as expected, then confirm one message.
assert self.post_n_msgs(URI_PRIORITY_1, 2)
assert self.post_n_msgs(URI_PRIORITY_1, 1)
assert not self.post_n_msgs(URI_PRIORITY_1, 1)
self.reader.confirm(URI_PRIORITY_1, "+1", succeed=True)

Expand Down Expand Up @@ -193,10 +199,10 @@ def test_reconfigure_max_clients(self, multi_node: Cluster):

# Reconfigure the domain to allow for one more producer to connect.
multi_node.config.domains[
tc.DOMAIN_PRIORITY
tc.DOMAIN_PRIORITY_SC
].definition.parameters.max_producers = 2
multi_node.reconfigure_domain(
tc.DOMAIN_PRIORITY, leader_only=True, succeed=True
tc.DOMAIN_PRIORITY_SC, leader_only=True, succeed=True
)

# Confirm that the queue can be opened for writing, but not reading.
Expand All @@ -205,10 +211,10 @@ def test_reconfigure_max_clients(self, multi_node: Cluster):

# Reconfigure the domain to allow for one more consumer to connect.
multi_node.config.domains[
tc.DOMAIN_PRIORITY
tc.DOMAIN_PRIORITY_SC
].definition.parameters.max_consumers = 2
multi_node.reconfigure_domain(
tc.DOMAIN_PRIORITY, leader_only=True, succeed=True
tc.DOMAIN_PRIORITY_SC, leader_only=True, succeed=True
)

# Confirm that the queue can be opened for reading.
Expand All @@ -229,10 +235,10 @@ def test_reconfigure_max_queues(self, multi_node: Cluster):

# Reconfigure the domain to allow for one more producer to connect.
multi_node.config.domains[
tc.DOMAIN_PRIORITY
tc.DOMAIN_PRIORITY_SC
].definition.parameters.max_queues = 3
multi_node.reconfigure_domain(
tc.DOMAIN_PRIORITY, leader_only=True, succeed=True
tc.DOMAIN_PRIORITY_SC, leader_only=True, succeed=True
)

# Confirm that one more queue can be opened.
Expand Down Expand Up @@ -261,10 +267,10 @@ def test_reconfigure_max_idle_time(self, multi_node: Cluster):

# Reconfigure domain to tolerate as much as two seconds of idleness.
multi_node.config.domains[
tc.DOMAIN_PRIORITY
tc.DOMAIN_PRIORITY_SC
].definition.parameters.max_idle_time = 2
multi_node.reconfigure_domain(
tc.DOMAIN_PRIORITY, leader_only=True, succeed=True
tc.DOMAIN_PRIORITY_SC, leader_only=True, succeed=True
)

# Write two further messages to the queue.
Expand Down Expand Up @@ -294,10 +300,10 @@ def test_reconfigure_message_ttl(self, multi_node: Cluster):

# Reconfigure the domain to wait 3 seconds before GC'ing messages.
multi_node.config.domains[
tc.DOMAIN_PRIORITY
tc.DOMAIN_PRIORITY_SC
].definition.parameters.message_ttl = 10
multi_node.reconfigure_domain(
tc.DOMAIN_PRIORITY, leader_only=True, succeed=True
tc.DOMAIN_PRIORITY_SC, leader_only=True, succeed=True
)

# Write two further messages to the queue.
Expand All @@ -314,7 +320,7 @@ def test_reconfigure_message_ttl(self, multi_node: Cluster):

@tweak.domain.max_delivery_attempts(0)
def test_reconfigure_max_delivery_attempts(self, multi_node: Cluster):
URI = f"bmq://{tc.DOMAIN_PRIORITY}/reconf-rda"
URI = f"bmq://{tc.DOMAIN_PRIORITY_SC}/reconf-rda"
proxy = next(multi_node.proxy_cycle())

# Open the queue through the writer.
Expand Down Expand Up @@ -348,16 +354,16 @@ def do_test(expect_success):

# Reconfigure messages to expire after 5 delivery attempts.
multi_node.config.domains[
tc.DOMAIN_PRIORITY
tc.DOMAIN_PRIORITY_SC
].definition.parameters.max_delivery_attempts = 5
multi_node.reconfigure_domain(tc.DOMAIN_PRIORITY, succeed=True)
multi_node.reconfigure_domain(tc.DOMAIN_PRIORITY_SC, succeed=True)

# Expect that message will expire after failed deliveries.
do_test(False)

@tweak.domain.max_delivery_attempts(1)
def test_reconfigure_max_delivery_attempts_finite(self, multi_node: Cluster):
URI = f"bmq://{tc.DOMAIN_PRIORITY}/reconf-rda"
URI = f"bmq://{tc.DOMAIN_PRIORITY_SC}/reconf-rda"
proxy = next(multi_node.proxy_cycle())

# Open the queue through the writer.
Expand Down Expand Up @@ -392,9 +398,9 @@ def do_test(expect_success, delivery_attempts):
for max_delivery_attempts in range(2, 7):
# Reconfigure messages to expire after max_delivery_attempts delivery attempts.
multi_node.config.domains[
tc.DOMAIN_PRIORITY
tc.DOMAIN_PRIORITY_SC
].definition.parameters.max_delivery_attempts = max_delivery_attempts
multi_node.reconfigure_domain(tc.DOMAIN_PRIORITY, succeed=True)
multi_node.reconfigure_domain(tc.DOMAIN_PRIORITY_SC, succeed=True)

# Attempt to deliver message max_delivery_attempts times,
# client confirms at the last attempt.
Expand All @@ -414,7 +420,7 @@ def test_reconfigure_max_delivery_attempts_on_existing_messages(
# On this stage, we open a producer to a queue and post 5 messages
# with serial payloads: ["msg0", "msg1", "msg2", "msg3", "msg4"].
# We consider the very first message "msg0" poisonous.
URI = f"bmq://{tc.DOMAIN_PRIORITY}/reconf-rda-on-existing-msgs"
URI = f"bmq://{tc.DOMAIN_PRIORITY_SC}/reconf-rda-on-existing-msgs"
proxy = next(multi_node.proxy_cycle())

self.writer.open(URI, flags=["write,ack"], succeed=True)
Expand Down Expand Up @@ -479,15 +485,15 @@ def run_consumers(max_attempts: int) -> int:
admin = AdminClient()
admin.connect(*cluster.admin_endpoint)

res = admin.send_admin(f"DOMAINS DOMAIN {tc.DOMAIN_PRIORITY} INFOS")
res = admin.send_admin(f"DOMAINS DOMAIN {tc.DOMAIN_PRIORITY_SC} INFOS")
assert '"maxDeliveryAttempts" : 0' in res

cluster.config.domains[
tc.DOMAIN_PRIORITY
tc.DOMAIN_PRIORITY_SC
].definition.parameters.max_delivery_attempts = 5
cluster.reconfigure_domain(tc.DOMAIN_PRIORITY, succeed=True)
cluster.reconfigure_domain(tc.DOMAIN_PRIORITY_SC, succeed=True)

res = admin.send_admin(f"DOMAINS DOMAIN {tc.DOMAIN_PRIORITY} INFOS")
res = admin.send_admin(f"DOMAINS DOMAIN {tc.DOMAIN_PRIORITY_SC} INFOS")
assert '"maxDeliveryAttempts" : 5' in res

admin.stop()
Expand Down

0 comments on commit 35cf678

Please sign in to comment.