From 94810a17367586fd8887df91cf7e3e083bdb86f3 Mon Sep 17 00:00:00 2001 From: Marc Oppenheimer Date: Tue, 17 Oct 2023 16:08:12 +0100 Subject: [PATCH 1/3] chore: ACTUALLY use storage --- metadata.yaml | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/metadata.yaml b/metadata.yaml index d0cb938..6e83dd6 100644 --- a/metadata.yaml +++ b/metadata.yaml @@ -13,6 +13,9 @@ maintainers: containers: zookeeper: resource: zookeeper-image + mounts: + - storage: zookeeper + location: /var/lib/zookeeper resources: zookeeper-image: @@ -44,7 +47,7 @@ requires: optional: true storage: - data: + zookeeper: type: filesystem description: Directories where snapshot and transaction data is stored minimum-size: 10G From 799c7943828804aae2a85c6c6a1c8a47f6941533 Mon Sep 17 00:00:00 2001 From: Marc Oppenheimer Date: Tue, 17 Oct 2023 16:12:16 +0100 Subject: [PATCH 2/3] test: add scale down/up storage re-use test --- tests/integration/ha/helpers.py | 31 ++++++++++++ tests/integration/ha/test_ha.py | 84 +++++++++++++++++++++++++++++++++ 2 files changed, 115 insertions(+) diff --git a/tests/integration/ha/helpers.py b/tests/integration/ha/helpers.py index 51119fc..f41c736 100644 --- a/tests/integration/ha/helpers.py +++ b/tests/integration/ha/helpers.py @@ -524,3 +524,34 @@ async def delete_pod(ops_test, unit_name: str) -> None: ) await wait_idle(ops_test) + + +def get_transaction_logs_and_snapshots( + ops_test, unit_name: str, container_name: str = CONTAINER +) -> dict[str, list[str]]: + """Gets the most recent transaction log and snapshot files. + + Args: + ops_test: OpsTest + unit_name: the Juju unit to get timestamps from + container_name: the container to run command on + Defaults to '{container_name}' + + Returns: + Dict of keys "transactions", "snapshots" and value of list of filenames + """ + transaction_files = subprocess.check_output( + f"kubectl exec {unit_name.replace('/', '-')} -c {container_name} -n {ops_test.model.info.name} -- ls -1 /var/lib/zookeeper/data-log/version-2", + stderr=subprocess.PIPE, + shell=True, + universal_newlines=True, + ).splitlines() + + snapshot_files = subprocess.check_output( + f"kubectl exec {unit_name.replace('/', '-')} -c {container_name} -n {ops_test.model.info.name} -- ls -1 /var/lib/zookeeper/data/version-2", + stderr=subprocess.PIPE, + shell=True, + universal_newlines=True, + ).splitlines() + + return {"transactions": transaction_files, "snapshots": snapshot_files} diff --git a/tests/integration/ha/test_ha.py b/tests/integration/ha/test_ha.py index 5b2296f..d852048 100644 --- a/tests/integration/ha/test_ha.py +++ b/tests/integration/ha/test_ha.py @@ -24,6 +24,90 @@ async def test_deploy_active(ops_test: OpsTest): await helpers.wait_idle(ops_test) +@pytest.mark.abort_on_fail +async def test_scale_down_up_data(ops_test: OpsTest, request): + """Tests unit scale-down + up returns with data.""" + hosts = helpers.get_hosts(ops_test) + password = helpers.get_super_password(ops_test) + parent = request.node.name + current_scale = len(hosts.split(",")) + scaling_unit_name = sorted( + [unit.name for unit in ops_test.model.applications[helpers.APP_NAME].units] + )[-1] + + logger.info("Starting continuous_writes...") + cw.start_continuous_writes( + parent=parent, hosts=hosts, username=helpers.USERNAME, password=password + ) + await asyncio.sleep(CLIENT_TIMEOUT * 3) # letting client set up and start writing + + logger.info("Checking writes are running at all...") + assert cw.count_znodes( + parent=parent, hosts=hosts, username=helpers.USERNAME, password=password + ) + + logger.info("Getting transaction and snapshot files...") + current_files = helpers.get_transaction_logs_and_snapshots( + ops_test, unit_name=scaling_unit_name + ) + + logger.info(f"Scaling down to {current_scale - 1} units...") + await ops_test.model.applications[helpers.APP_NAME].scale(current_scale - 1) + await helpers.wait_idle(ops_test, units=current_scale - 1) + + surviving_hosts = helpers.get_hosts(ops_test) + + logger.info("Checking writes are increasing...") + writes = cw.count_znodes( + parent=parent, hosts=surviving_hosts, username=helpers.USERNAME, password=password + ) + await asyncio.sleep(CLIENT_TIMEOUT * 3) # increasing writes + new_writes = cw.count_znodes( + parent=parent, hosts=surviving_hosts, username=helpers.USERNAME, password=password + ) + assert new_writes > writes, "writes not continuing to ZK" + + logger.info(f"Scaling back up to {current_scale} units...") + await ops_test.model.applications[helpers.APP_NAME].scale(current_scale) + await helpers.wait_idle(ops_test, units=current_scale) + + logger.info("Counting writes on surviving units...") + last_write = cw.get_last_znode( + parent=parent, hosts=surviving_hosts, username=helpers.USERNAME, password=password + ) + total_writes = cw.count_znodes( + parent=parent, hosts=surviving_hosts, username=helpers.USERNAME, password=password + ) + assert last_write == total_writes + + new_host = max(set(helpers.get_hosts(ops_test).split(",")) - set(surviving_hosts.split(","))) + + logger.info("Checking new unit caught up...") + last_write_new = cw.get_last_znode( + parent=parent, hosts=new_host, username=helpers.USERNAME, password=password + ) + total_writes_new = cw.count_znodes( + parent=parent, hosts=new_host, username=helpers.USERNAME, password=password + ) + assert last_write == last_write_new + assert total_writes == total_writes_new + + logger.info("Stopping continuous_writes...") + cw.stop_continuous_writes() + + logger.info("Getting new transaction and snapshot files...") + new_files = helpers.get_transaction_logs_and_snapshots(ops_test, unit_name=scaling_unit_name) + + # zookeeper rolls snapshots + txn logs when a unit re-joins, meaning we can't check log timestamps + # checking file existence ensures re-use, as new files will have a different file suffix + # if storage wasn't re-used, there would be no files with the original suffix + for txn_log in current_files["transactions"]: + assert txn_log in new_files["transactions"], "storage not re-used, missing txn logs" + + for snapshot in current_files["snapshots"]: + assert snapshot in new_files["snapshots"], "storage not re-used, missing snapshots" + + @pytest.mark.abort_on_fail async def test_pod_reschedule(ops_test: OpsTest, request): """Forcefully reschedules ZooKeeper pod.""" From 28da02a639794ace986116b636dba0138dc18f54 Mon Sep 17 00:00:00 2001 From: Marc Oppenheimer Date: Tue, 17 Oct 2023 21:03:14 +0100 Subject: [PATCH 3/3] chore: stop writes before comparing new unit --- tests/integration/ha/test_ha.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/tests/integration/ha/test_ha.py b/tests/integration/ha/test_ha.py index d852048..8c0b212 100644 --- a/tests/integration/ha/test_ha.py +++ b/tests/integration/ha/test_ha.py @@ -71,6 +71,9 @@ async def test_scale_down_up_data(ops_test: OpsTest, request): await ops_test.model.applications[helpers.APP_NAME].scale(current_scale) await helpers.wait_idle(ops_test, units=current_scale) + logger.info("Stopping continuous_writes...") + cw.stop_continuous_writes() + logger.info("Counting writes on surviving units...") last_write = cw.get_last_znode( parent=parent, hosts=surviving_hosts, username=helpers.USERNAME, password=password @@ -80,9 +83,8 @@ async def test_scale_down_up_data(ops_test: OpsTest, request): ) assert last_write == total_writes - new_host = max(set(helpers.get_hosts(ops_test).split(",")) - set(surviving_hosts.split(","))) - logger.info("Checking new unit caught up...") + new_host = max(set(helpers.get_hosts(ops_test).split(",")) - set(surviving_hosts.split(","))) last_write_new = cw.get_last_znode( parent=parent, hosts=new_host, username=helpers.USERNAME, password=password ) @@ -92,9 +94,6 @@ async def test_scale_down_up_data(ops_test: OpsTest, request): assert last_write == last_write_new assert total_writes == total_writes_new - logger.info("Stopping continuous_writes...") - cw.stop_continuous_writes() - logger.info("Getting new transaction and snapshot files...") new_files = helpers.get_transaction_logs_and_snapshots(ops_test, unit_name=scaling_unit_name)