From 8347139184ffd7a1f973a3f071645d553e77e2bc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Zamora=20Mart=C3=ADnez?= <76525382+zmraul@users.noreply.github.com> Date: Tue, 6 Sep 2022 17:34:13 +0200 Subject: [PATCH] Add password rotation action (#20) * add password rotation action * add rotation test * fix: wait longer for ZK to update brokers Co-authored-by: Marc Oppenheimer --- .gitignore | 1 + actions.yaml | 14 +++++ src/charm.py | 53 +++++++++++++++- src/literals.py | 1 + src/utils.py | 4 +- tests/integration/helpers.py | 26 ++++++++ tests/integration/test_charm.py | 2 +- tests/integration/test_password_rotation.py | 67 +++++++++++++++++++++ tests/integration/test_provider.py | 6 +- tests/integration/test_scaling.py | 2 +- 10 files changed, 167 insertions(+), 9 deletions(-) create mode 100644 tests/integration/test_password_rotation.py diff --git a/.gitignore b/.gitignore index 89b61c91..10162fd3 100644 --- a/.gitignore +++ b/.gitignore @@ -5,3 +5,4 @@ build/ .coverage __pycache__/ *.py[cod] +.vscode \ No newline at end of file diff --git a/actions.yaml b/actions.yaml index d1e54838..b82d53c7 100644 --- a/actions.yaml +++ b/actions.yaml @@ -1,3 +1,5 @@ +# Copyright 2022 Canonical Ltd. +# See LICENSE file for licensing details. # In Juju 3 this will be easier to copy get-server-properties: description: | @@ -6,3 +8,15 @@ get-snap-apps: description: | Action to retrieve the available commands within the kafka snap +set-password: + description: Change the system user's password, which is used by the charm. + It is for internal charm users and SHOULD NOT be used by applications. + This action must be called on the leader unit. + params: + username: + type: string + description: The username, the default value 'operator'. + Possible values - operator + password: + type: string + description: The password will be auto-generated if this option is not specified. diff --git a/src/charm.py b/src/charm.py index 0eccbdb7..dd8fb666 100755 --- a/src/charm.py +++ b/src/charm.py @@ -9,14 +9,20 @@ from charms.kafka.v0.kafka_snap import KafkaSnap from charms.rolling_ops.v0.rollingops import RollingOpsManager -from ops.charm import CharmBase, ConfigChangedEvent, RelationEvent, RelationJoinedEvent +from ops.charm import ( + ActionEvent, + CharmBase, + ConfigChangedEvent, + RelationEvent, + RelationJoinedEvent, +) from ops.framework import EventBase from ops.main import main from ops.model import ActiveStatus, BlockedStatus, Relation, WaitingStatus from auth import KafkaAuth from config import KafkaConfig -from literals import CHARM_KEY, PEER, ZK +from literals import CHARM_KEY, CHARM_USERS, PEER, ZK from provider import KafkaProvider from utils import broker_active, generate_password, safe_get_file @@ -45,6 +51,8 @@ def __init__(self, *args): self.framework.observe(self.on[ZK].relation_changed, self._on_config_changed) self.framework.observe(self.on[ZK].relation_broken, self._on_zookeeper_broken) + self.framework.observe(self.on.set_password_action, self._set_password_action) + @property def peer_relation(self) -> Relation: """The cluster peer relation.""" @@ -159,6 +167,47 @@ def _restart(self, event: EventBase) -> None: self.snap.restart_snap_service("kafka") + def _set_password_action(self, event: ActionEvent): + """Handler for set-password action. + + Set the password for a specific user, if no passwords are passed, generate them. + """ + if not self.unit.is_leader(): + msg = "Password rotation must be called on leader unit" + logger.error(msg) + event.fail(msg) + return + + username = event.params.get("username", "sync") + if username not in CHARM_USERS: + msg = f"The action can be run only for users used by the charm: {CHARM_USERS} not {username}." + logger.error(msg) + event.fail(msg) + return + + new_password = event.params.get("password", generate_password()) + if new_password == self.kafka_config.sync_password: + event.log("The old and new passwords are equal.") + event.set_results({f"{username}-password": new_password}) + return + + # Update the user + kafka_auth = KafkaAuth( + opts=self.kafka_config.extra_args, + zookeeper=self.kafka_config.zookeeper_config.get("connect", ""), + ) + try: + kafka_auth.add_user(username=username, password=new_password) + except subprocess.CalledProcessError as e: + # command to add users fails if attempted too early + logger.debug(str(e)) + event.fail(str(e)) + return + + # Store the password on application databag + self.peer_relation.data[self.app].update({f"{username}_password": new_password}) + event.set_results({f"{username}-password": new_password}) + @property def ready_to_start(self) -> bool: """Check for active ZooKeeper relation and adding of inter-broker auth username. diff --git a/src/literals.py b/src/literals.py index cbc381b3..336c07a3 100644 --- a/src/literals.py +++ b/src/literals.py @@ -8,3 +8,4 @@ PEER = "cluster" ZK = "zookeeper" REL_NAME = "kafka-client" +CHARM_USERS = ["sync"] diff --git a/src/utils.py b/src/utils.py index 17c61ff3..9b571766 100644 --- a/src/utils.py +++ b/src/utils.py @@ -23,8 +23,8 @@ @retry( # retry to give ZK time to update its broker zNodes before failing - wait=wait_fixed(5), - stop=stop_after_attempt(6), + wait=wait_fixed(6), + stop=stop_after_attempt(10), retry_error_callback=(lambda state: state.outcome.result()), retry=retry_if_not_result(lambda result: True if result else False), ) diff --git a/tests/integration/helpers.py b/tests/integration/helpers.py index 61f36cd3..d5781e3d 100644 --- a/tests/integration/helpers.py +++ b/tests/integration/helpers.py @@ -7,11 +7,13 @@ from typing import Any, Dict, List, Set, Tuple import yaml +from pytest_operator.plugin import OpsTest from auth import Acl, KafkaAuth METADATA = yaml.safe_load(Path("./metadata.yaml").read_text()) APP_NAME = METADATA["name"] +ZK_NAME = "zookeeper" def load_acls(model_full_name: str, zookeeper_uri: str) -> Set[Acl]: @@ -52,6 +54,17 @@ def check_user(model_full_name: str, username: str, zookeeper_uri: str) -> None: assert "SCRAM-SHA-512" in result +def get_user(model_full_name: str, username: str, zookeeper_uri: str) -> str: + result = check_output( + f"JUJU_MODEL={model_full_name} juju ssh kafka/0 'kafka.configs --zookeeper {zookeeper_uri} --describe --entity-type users --entity-name {username}'", + stderr=PIPE, + shell=True, + universal_newlines=True, + ) + + return result + + def show_unit(unit_name: str, model_full_name: str) -> Any: result = check_output( f"JUJU_MODEL={model_full_name} juju show-unit {unit_name}", @@ -97,3 +110,16 @@ def get_kafka_zk_relation_data(unit_name: str, model_full_name: str) -> Dict[str zk_relation_data["uris"] = info["application-data"]["uris"] zk_relation_data["username"] = info["application-data"]["username"] return zk_relation_data + + +async def set_password(ops_test: OpsTest, username="sync", password=None, num_unit=0) -> str: + """Use the charm action to start a password rotation.""" + params = {"username": username} + if password: + params["password"] = password + + action = await ops_test.model.units.get(f"{APP_NAME}/{num_unit}").run_action( + "set-password", **params + ) + password = await action.wait() + return password.results diff --git a/tests/integration/test_charm.py b/tests/integration/test_charm.py index 161341e4..ad2f9a9d 100644 --- a/tests/integration/test_charm.py +++ b/tests/integration/test_charm.py @@ -16,7 +16,7 @@ async def test_build_and_deploy(ops_test: OpsTest): kafka_charm = await ops_test.build_charm(".") await asyncio.gather( ops_test.model.deploy( - "zookeeper", channel="edge", application_name="zookeeper", num_units=1 + "zookeeper", channel="edge", application_name="zookeeper", num_units=3 ), ops_test.model.deploy(kafka_charm, application_name="kafka", num_units=1), ) diff --git a/tests/integration/test_password_rotation.py b/tests/integration/test_password_rotation.py new file mode 100644 index 00000000..5044bfb6 --- /dev/null +++ b/tests/integration/test_password_rotation.py @@ -0,0 +1,67 @@ +#!/usr/bin/env python3 +# Copyright 2022 Canonical Ltd. +# See LICENSE file for licensing details. + +import asyncio +import logging + +import pytest +from helpers import ( + APP_NAME, + ZK_NAME, + get_kafka_zk_relation_data, + get_user, + set_password, +) +from pytest_operator.plugin import OpsTest + +logger = logging.getLogger(__name__) + + +@pytest.mark.abort_on_fail +@pytest.mark.skip_if_deployed +async def test_build_and_deploy(ops_test: OpsTest): + kafka_charm = await ops_test.build_charm(".") + await asyncio.gather( + ops_test.model.deploy(ZK_NAME, channel="edge", application_name=ZK_NAME, num_units=3), + ops_test.model.deploy(kafka_charm, application_name=APP_NAME, num_units=1), + ) + await ops_test.model.block_until(lambda: len(ops_test.model.applications[ZK_NAME].units) == 3) + await ops_test.model.wait_for_idle(apps=[APP_NAME, ZK_NAME]) + assert ops_test.model.applications[APP_NAME].status == "waiting" + assert ops_test.model.applications[ZK_NAME].status == "active" + + await ops_test.model.add_relation(APP_NAME, ZK_NAME) + + async with ops_test.fast_forward(): + await ops_test.model.wait_for_idle(apps=[APP_NAME, ZK_NAME]) + + assert ops_test.model.applications[APP_NAME].status == "active" + assert ops_test.model.applications[ZK_NAME].status == "active" + + +async def test_password_rotation(ops_test: OpsTest): + """Check that password stored on ZK has changed after a password rotation.""" + relation_data = get_kafka_zk_relation_data( + unit_name=f"{APP_NAME}/0", model_full_name=ops_test.model_full_name + ) + uri = relation_data["uris"].split(",")[-1] + + initial_sync_user = get_user( + username="sync", + zookeeper_uri=uri, + model_full_name=ops_test.model_full_name, + ) + + result = await set_password(ops_test, username="sync", num_unit=0) + assert "sync-password" in result.keys() + + await ops_test.model.wait_for_idle(apps=[APP_NAME, ZK_NAME]) + + new_sync_user = get_user( + username="sync", + zookeeper_uri=uri, + model_full_name=ops_test.model_full_name, + ) + + assert initial_sync_user != new_sync_user diff --git a/tests/integration/test_provider.py b/tests/integration/test_provider.py index b1a44007..73309a9e 100644 --- a/tests/integration/test_provider.py +++ b/tests/integration/test_provider.py @@ -30,14 +30,14 @@ def usernames(): @pytest.mark.abort_on_fail async def test_deploy_charms_relate_active(ops_test: OpsTest, usernames): - zk_charm = await ops_test.build_charm(".") + charm = await ops_test.build_charm(".") app_charm = await ops_test.build_charm("tests/integration/app-charm") await asyncio.gather( ops_test.model.deploy( - "zookeeper", channel="edge", application_name="zookeeper", num_units=1 + "zookeeper", channel="edge", application_name="zookeeper", num_units=3 ), - ops_test.model.deploy(zk_charm, application_name=APP_NAME, num_units=1), + ops_test.model.deploy(charm, application_name=APP_NAME, num_units=1), ops_test.model.deploy(app_charm, application_name=DUMMY_NAME_1, num_units=1), ) await ops_test.model.wait_for_idle(apps=[APP_NAME, DUMMY_NAME_1, ZK]) diff --git a/tests/integration/test_scaling.py b/tests/integration/test_scaling.py index 0bc8f61f..b412cddf 100644 --- a/tests/integration/test_scaling.py +++ b/tests/integration/test_scaling.py @@ -28,7 +28,7 @@ async def test_kafka_simple_scale_up(ops_test: OpsTest): await asyncio.gather( ops_test.model.deploy( - "zookeeper", channel="edge", application_name="zookeeper", num_units=1 + "zookeeper", channel="edge", application_name="zookeeper", num_units=3 ), ops_test.model.deploy(kafka_charm, application_name=APP_NAME, num_units=1), )