Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test_reconfigure_domains.py: Improve comments and readability #537

Merged
merged 2 commits into from
Dec 11, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 45 additions & 39 deletions src/integration-tests/test_reconfigure_domains.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@

INITIAL_MSG_QUOTA = 10

URI_PRIORITY_1 = f"bmq://{tc.DOMAIN_PRIORITY}/abcd-queue"
URI_PRIORITY_2 = f"bmq://{tc.DOMAIN_PRIORITY}/qrst-queue"
URI_PRIORITY_1 = f"bmq://{tc.DOMAIN_PRIORITY_SC}/abcd-queue"
URI_PRIORITY_2 = f"bmq://{tc.DOMAIN_PRIORITY_SC}/qrst-queue"
Copy link
Collaborator

@678098 678098 Dec 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we want to test non-SC domain here anymore?
I am thinking if we can parametrize this with fixtures.
I am also ok if we just switch to SC here without extra work, or we can add an issue to support non-SC if it's worth it

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem of parameterize testing non-SC and SC domains exist for many integration test files. Will create an issue to support all of them

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a significant difference in the broker logics so we are better to support both? If we don't expect differences in the logics, might just leave SC-one

Copy link
Collaborator Author

@kaikulimu kaikulimu Dec 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The difference is SC domains are more asynchronous as they need to wait for quorum of receipts from replicas, so it's more susceptible to race conditions. I will create an issue.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.



class TestReconfigureDomains:
Expand All @@ -49,24 +49,28 @@ def setup_cluster(self, cluster: Cluster):
self.reader.open(URI_PRIORITY_1, flags=["read"], succeed=True)
self.reader.open(URI_PRIORITY_2, flags=["read"], succeed=True)

# Instruct 'writer to 'POST 'n' messages to the domain.
# Returns 'True' if all of them succeed, and a false-y value otherwise.
# Instruct writer to POST `n` messages to the `uri`. Return true if all of
# them succeed, else a false-y value.
def post_n_msgs(self, uri, n):
results = (
self.writer.post(uri, payload=[f"msg{i}"], wait_ack=True)
for i in range(0, n)
)
return all(res == Client.e_SUCCESS for res in results)

# Helper method which tells 'leader' to reload the domain config.
def reconfigure_to_n_msgs(self, cluster: Cluster, num_msgs, leader_only=True):
# Instruct the leader to reconfigure domain config in `cluster` to have a
# limit of `max_num_msgs`. If 'leader_only' is true, the 'DOMAINS
# RECONFIGURE' command will only be issued to the leader. Return true if
# reconfigure succeeds, else false.
def reconfigure_to_limit_n_msgs(self, cluster: Cluster, max_num_msgs: int) -> bool:
cluster.config.domains[
tc.DOMAIN_PRIORITY
].definition.parameters.storage.domain_limits.messages = num_msgs
tc.DOMAIN_PRIORITY_SC
].definition.parameters.storage.domain_limits.messages = max_num_msgs
return cluster.reconfigure_domain(
tc.DOMAIN_PRIORITY, leader_only=leader_only, succeed=True
tc.DOMAIN_PRIORITY_SC, leader_only=True, succeed=True
)

# Verify that reconfiguring domain message limits works as expected.
@tweak.domain.storage.domain_limits.messages(INITIAL_MSG_QUOTA)
def test_reconfigure_domain_message_limits(self, multi_node: Cluster):
assert self.post_n_msgs(URI_PRIORITY_1, INITIAL_MSG_QUOTA)
Expand All @@ -83,8 +87,8 @@ def test_reconfigure_domain_message_limits(self, multi_node: Cluster):
assert not self.post_n_msgs(URI_PRIORITY_1, 1)
assert not self.post_n_msgs(URI_PRIORITY_2, 1)

# Modify the domain configuration to hold 2 more messages.
self.reconfigure_to_n_msgs(multi_node, INITIAL_MSG_QUOTA + 10)
# Modify the domain configuration to hold 10 more messages.
self.reconfigure_to_limit_n_msgs(multi_node, INITIAL_MSG_QUOTA + 10)

# Observe that posting two more messages succeeds.
assert self.post_n_msgs(URI_PRIORITY_1, 5)
Expand All @@ -96,13 +100,13 @@ def test_reconfigure_domain_message_limits(self, multi_node: Cluster):
assert not self.post_n_msgs(URI_PRIORITY_2, 1)

# Reconfigure limit back down to the initial value.
self.reconfigure_to_n_msgs(multi_node, INITIAL_MSG_QUOTA)
self.reconfigure_to_limit_n_msgs(multi_node, INITIAL_MSG_QUOTA)

# Observe that posting continues to fail.
assert not self.post_n_msgs(URI_PRIORITY_1, 1)
assert not self.post_n_msgs(URI_PRIORITY_2, 1)

# Confirm 5 messages from the 'reader'.
# Confirm 10 messages from the 'reader'.
self.reader.confirm(URI_PRIORITY_1, "+10", succeed=True)

# Observe that posting still fails, since we are still at capacity.
Expand All @@ -119,6 +123,7 @@ def test_reconfigure_domain_message_limits(self, multi_node: Cluster):
self.reader.confirm(URI_PRIORITY_1, "+1", succeed=True)
assert self.post_n_msgs(URI_PRIORITY_2, 1)

# Verify that reconfiguring queue message limits works as expected.
@tweak.domain.storage.queue_limits.messages(INITIAL_MSG_QUOTA)
def test_reconfigure_queue_message_limits(self, multi_node: Cluster):
# Resource monitor allows exceeding message quota exactly once before
Expand All @@ -135,11 +140,11 @@ def test_reconfigure_queue_message_limits(self, multi_node: Cluster):
# Again observe that posting once more fails.
assert not self.post_n_msgs(URI_PRIORITY_2, 1)

# Modify the domain configuration to hold 2 more messages per queue.
# Modify the domain configuration to hold 1 more message per queue.
multi_node.config.domains[
tc.DOMAIN_PRIORITY
tc.DOMAIN_PRIORITY_SC
].definition.parameters.storage.queue_limits.messages = (INITIAL_MSG_QUOTA + 1)
multi_node.reconfigure_domain(tc.DOMAIN_PRIORITY, succeed=True)
multi_node.reconfigure_domain(tc.DOMAIN_PRIORITY_SC, succeed=True)

# Observe that posting one more message now succeeds for each queue.
assert self.post_n_msgs(URI_PRIORITY_1, 1)
Expand All @@ -149,6 +154,7 @@ def test_reconfigure_queue_message_limits(self, multi_node: Cluster):
assert not self.post_n_msgs(URI_PRIORITY_1, 1)
assert not self.post_n_msgs(URI_PRIORITY_2, 1)

# Verify that domain reconfiguration persists after leader change.
@tweak.domain.storage.domain_limits.messages(1)
def test_reconfigure_with_leader_change(self, multi_node: Cluster):
leader = multi_node.last_known_leader
Expand All @@ -158,10 +164,10 @@ def test_reconfigure_with_leader_change(self, multi_node: Cluster):
assert not self.post_n_msgs(URI_PRIORITY_1, 1)

# Reconfigure every node to accept an additional message.
self.reconfigure_to_n_msgs(multi_node, 3, leader_only=False)
self.reconfigure_to_limit_n_msgs(multi_node, 2)

# Ensure the capacity increased as expected, then confirm one message.
assert self.post_n_msgs(URI_PRIORITY_1, 2)
assert self.post_n_msgs(URI_PRIORITY_1, 1)
assert not self.post_n_msgs(URI_PRIORITY_1, 1)
self.reader.confirm(URI_PRIORITY_1, "+1", succeed=True)

Expand Down Expand Up @@ -193,10 +199,10 @@ def test_reconfigure_max_clients(self, multi_node: Cluster):

# Reconfigure the domain to allow for one more producer to connect.
multi_node.config.domains[
tc.DOMAIN_PRIORITY
tc.DOMAIN_PRIORITY_SC
].definition.parameters.max_producers = 2
multi_node.reconfigure_domain(
tc.DOMAIN_PRIORITY, leader_only=True, succeed=True
tc.DOMAIN_PRIORITY_SC, leader_only=True, succeed=True
)

# Confirm that the queue can be opened for writing, but not reading.
Expand All @@ -205,10 +211,10 @@ def test_reconfigure_max_clients(self, multi_node: Cluster):

# Reconfigure the domain to allow for one more consumer to connect.
multi_node.config.domains[
tc.DOMAIN_PRIORITY
tc.DOMAIN_PRIORITY_SC
].definition.parameters.max_consumers = 2
multi_node.reconfigure_domain(
tc.DOMAIN_PRIORITY, leader_only=True, succeed=True
tc.DOMAIN_PRIORITY_SC, leader_only=True, succeed=True
)

# Confirm that the queue can be opened for reading.
Expand All @@ -229,10 +235,10 @@ def test_reconfigure_max_queues(self, multi_node: Cluster):

# Reconfigure the domain to allow for one more producer to connect.
multi_node.config.domains[
tc.DOMAIN_PRIORITY
tc.DOMAIN_PRIORITY_SC
].definition.parameters.max_queues = 3
multi_node.reconfigure_domain(
tc.DOMAIN_PRIORITY, leader_only=True, succeed=True
tc.DOMAIN_PRIORITY_SC, leader_only=True, succeed=True
)

# Confirm that one more queue can be opened.
Expand Down Expand Up @@ -261,10 +267,10 @@ def test_reconfigure_max_idle_time(self, multi_node: Cluster):

# Reconfigure domain to tolerate as much as two seconds of idleness.
multi_node.config.domains[
tc.DOMAIN_PRIORITY
tc.DOMAIN_PRIORITY_SC
].definition.parameters.max_idle_time = 2
multi_node.reconfigure_domain(
tc.DOMAIN_PRIORITY, leader_only=True, succeed=True
tc.DOMAIN_PRIORITY_SC, leader_only=True, succeed=True
)

# Write two further messages to the queue.
Expand Down Expand Up @@ -294,10 +300,10 @@ def test_reconfigure_message_ttl(self, multi_node: Cluster):

# Reconfigure the domain to wait 3 seconds before GC'ing messages.
multi_node.config.domains[
tc.DOMAIN_PRIORITY
tc.DOMAIN_PRIORITY_SC
].definition.parameters.message_ttl = 10
multi_node.reconfigure_domain(
tc.DOMAIN_PRIORITY, leader_only=True, succeed=True
tc.DOMAIN_PRIORITY_SC, leader_only=True, succeed=True
)

# Write two further messages to the queue.
Expand All @@ -314,7 +320,7 @@ def test_reconfigure_message_ttl(self, multi_node: Cluster):

@tweak.domain.max_delivery_attempts(0)
def test_reconfigure_max_delivery_attempts(self, multi_node: Cluster):
URI = f"bmq://{tc.DOMAIN_PRIORITY}/reconf-rda"
URI = f"bmq://{tc.DOMAIN_PRIORITY_SC}/reconf-rda"
proxy = next(multi_node.proxy_cycle())

# Open the queue through the writer.
Expand Down Expand Up @@ -348,16 +354,16 @@ def do_test(expect_success):

# Reconfigure messages to expire after 5 delivery attempts.
multi_node.config.domains[
tc.DOMAIN_PRIORITY
tc.DOMAIN_PRIORITY_SC
].definition.parameters.max_delivery_attempts = 5
multi_node.reconfigure_domain(tc.DOMAIN_PRIORITY, succeed=True)
multi_node.reconfigure_domain(tc.DOMAIN_PRIORITY_SC, succeed=True)

# Expect that message will expire after failed deliveries.
do_test(False)

@tweak.domain.max_delivery_attempts(1)
def test_reconfigure_max_delivery_attempts_finite(self, multi_node: Cluster):
URI = f"bmq://{tc.DOMAIN_PRIORITY}/reconf-rda"
URI = f"bmq://{tc.DOMAIN_PRIORITY_SC}/reconf-rda"
proxy = next(multi_node.proxy_cycle())

# Open the queue through the writer.
Expand Down Expand Up @@ -392,9 +398,9 @@ def do_test(expect_success, delivery_attempts):
for max_delivery_attempts in range(2, 7):
# Reconfigure messages to expire after max_delivery_attempts delivery attempts.
multi_node.config.domains[
tc.DOMAIN_PRIORITY
tc.DOMAIN_PRIORITY_SC
].definition.parameters.max_delivery_attempts = max_delivery_attempts
multi_node.reconfigure_domain(tc.DOMAIN_PRIORITY, succeed=True)
multi_node.reconfigure_domain(tc.DOMAIN_PRIORITY_SC, succeed=True)

# Attempt to deliver message max_delivery_attempts times,
# client confirms at the last attempt.
Expand All @@ -414,7 +420,7 @@ def test_reconfigure_max_delivery_attempts_on_existing_messages(
# On this stage, we open a producer to a queue and post 5 messages
# with serial payloads: ["msg0", "msg1", "msg2", "msg3", "msg4"].
# We consider the very first message "msg0" poisonous.
URI = f"bmq://{tc.DOMAIN_PRIORITY}/reconf-rda-on-existing-msgs"
URI = f"bmq://{tc.DOMAIN_PRIORITY_SC}/reconf-rda-on-existing-msgs"
proxy = next(multi_node.proxy_cycle())

self.writer.open(URI, flags=["write,ack"], succeed=True)
Expand Down Expand Up @@ -479,15 +485,15 @@ def run_consumers(max_attempts: int) -> int:
admin = AdminClient()
admin.connect(*cluster.admin_endpoint)

res = admin.send_admin(f"DOMAINS DOMAIN {tc.DOMAIN_PRIORITY} INFOS")
res = admin.send_admin(f"DOMAINS DOMAIN {tc.DOMAIN_PRIORITY_SC} INFOS")
assert '"maxDeliveryAttempts" : 0' in res

cluster.config.domains[
tc.DOMAIN_PRIORITY
tc.DOMAIN_PRIORITY_SC
].definition.parameters.max_delivery_attempts = 5
cluster.reconfigure_domain(tc.DOMAIN_PRIORITY, succeed=True)
cluster.reconfigure_domain(tc.DOMAIN_PRIORITY_SC, succeed=True)

res = admin.send_admin(f"DOMAINS DOMAIN {tc.DOMAIN_PRIORITY} INFOS")
res = admin.send_admin(f"DOMAINS DOMAIN {tc.DOMAIN_PRIORITY_SC} INFOS")
assert '"maxDeliveryAttempts" : 5' in res

admin.stop()
Expand Down
Loading