Skip to content

Commit

Permalink
Add password rotation action (#20)
Browse files Browse the repository at this point in the history
* add password rotation action

* add rotation test

* fix: wait longer for ZK to update brokers

Co-authored-by: Marc Oppenheimer <[email protected]>
  • Loading branch information
zmraul and marcoppenheimer authored Sep 6, 2022
1 parent c44a29d commit 8347139
Show file tree
Hide file tree
Showing 10 changed files with 167 additions and 9 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ build/
.coverage
__pycache__/
*.py[cod]
.vscode
14 changes: 14 additions & 0 deletions actions.yaml
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
# Copyright 2022 Canonical Ltd.
# See LICENSE file for licensing details.
# In Juju 3 this will be easier to copy
get-server-properties:
description: |
Expand All @@ -6,3 +8,15 @@ get-snap-apps:
description: |
Action to retrieve the available commands within the kafka snap
set-password:
description: Change the system user's password, which is used by the charm.
It is for internal charm users and SHOULD NOT be used by applications.
This action must be called on the leader unit.
params:
username:
type: string
description: The username, the default value 'operator'.
Possible values - operator
password:
type: string
description: The password will be auto-generated if this option is not specified.
53 changes: 51 additions & 2 deletions src/charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,20 @@

from charms.kafka.v0.kafka_snap import KafkaSnap
from charms.rolling_ops.v0.rollingops import RollingOpsManager
from ops.charm import CharmBase, ConfigChangedEvent, RelationEvent, RelationJoinedEvent
from ops.charm import (
ActionEvent,
CharmBase,
ConfigChangedEvent,
RelationEvent,
RelationJoinedEvent,
)
from ops.framework import EventBase
from ops.main import main
from ops.model import ActiveStatus, BlockedStatus, Relation, WaitingStatus

from auth import KafkaAuth
from config import KafkaConfig
from literals import CHARM_KEY, PEER, ZK
from literals import CHARM_KEY, CHARM_USERS, PEER, ZK
from provider import KafkaProvider
from utils import broker_active, generate_password, safe_get_file

Expand Down Expand Up @@ -45,6 +51,8 @@ def __init__(self, *args):
self.framework.observe(self.on[ZK].relation_changed, self._on_config_changed)
self.framework.observe(self.on[ZK].relation_broken, self._on_zookeeper_broken)

self.framework.observe(self.on.set_password_action, self._set_password_action)

@property
def peer_relation(self) -> Relation:
"""The cluster peer relation."""
Expand Down Expand Up @@ -159,6 +167,47 @@ def _restart(self, event: EventBase) -> None:

self.snap.restart_snap_service("kafka")

def _set_password_action(self, event: ActionEvent):
"""Handler for set-password action.
Set the password for a specific user, if no passwords are passed, generate them.
"""
if not self.unit.is_leader():
msg = "Password rotation must be called on leader unit"
logger.error(msg)
event.fail(msg)
return

username = event.params.get("username", "sync")
if username not in CHARM_USERS:
msg = f"The action can be run only for users used by the charm: {CHARM_USERS} not {username}."
logger.error(msg)
event.fail(msg)
return

new_password = event.params.get("password", generate_password())
if new_password == self.kafka_config.sync_password:
event.log("The old and new passwords are equal.")
event.set_results({f"{username}-password": new_password})
return

# Update the user
kafka_auth = KafkaAuth(
opts=self.kafka_config.extra_args,
zookeeper=self.kafka_config.zookeeper_config.get("connect", ""),
)
try:
kafka_auth.add_user(username=username, password=new_password)
except subprocess.CalledProcessError as e:
# command to add users fails if attempted too early
logger.debug(str(e))
event.fail(str(e))
return

# Store the password on application databag
self.peer_relation.data[self.app].update({f"{username}_password": new_password})
event.set_results({f"{username}-password": new_password})

@property
def ready_to_start(self) -> bool:
"""Check for active ZooKeeper relation and adding of inter-broker auth username.
Expand Down
1 change: 1 addition & 0 deletions src/literals.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@
PEER = "cluster"
ZK = "zookeeper"
REL_NAME = "kafka-client"
CHARM_USERS = ["sync"]
4 changes: 2 additions & 2 deletions src/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@

@retry(
# retry to give ZK time to update its broker zNodes before failing
wait=wait_fixed(5),
stop=stop_after_attempt(6),
wait=wait_fixed(6),
stop=stop_after_attempt(10),
retry_error_callback=(lambda state: state.outcome.result()),
retry=retry_if_not_result(lambda result: True if result else False),
)
Expand Down
26 changes: 26 additions & 0 deletions tests/integration/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@
from typing import Any, Dict, List, Set, Tuple

import yaml
from pytest_operator.plugin import OpsTest

from auth import Acl, KafkaAuth

METADATA = yaml.safe_load(Path("./metadata.yaml").read_text())
APP_NAME = METADATA["name"]
ZK_NAME = "zookeeper"


def load_acls(model_full_name: str, zookeeper_uri: str) -> Set[Acl]:
Expand Down Expand Up @@ -52,6 +54,17 @@ def check_user(model_full_name: str, username: str, zookeeper_uri: str) -> None:
assert "SCRAM-SHA-512" in result


def get_user(model_full_name: str, username: str, zookeeper_uri: str) -> str:
result = check_output(
f"JUJU_MODEL={model_full_name} juju ssh kafka/0 'kafka.configs --zookeeper {zookeeper_uri} --describe --entity-type users --entity-name {username}'",
stderr=PIPE,
shell=True,
universal_newlines=True,
)

return result


def show_unit(unit_name: str, model_full_name: str) -> Any:
result = check_output(
f"JUJU_MODEL={model_full_name} juju show-unit {unit_name}",
Expand Down Expand Up @@ -97,3 +110,16 @@ def get_kafka_zk_relation_data(unit_name: str, model_full_name: str) -> Dict[str
zk_relation_data["uris"] = info["application-data"]["uris"]
zk_relation_data["username"] = info["application-data"]["username"]
return zk_relation_data


async def set_password(ops_test: OpsTest, username="sync", password=None, num_unit=0) -> str:
"""Use the charm action to start a password rotation."""
params = {"username": username}
if password:
params["password"] = password

action = await ops_test.model.units.get(f"{APP_NAME}/{num_unit}").run_action(
"set-password", **params
)
password = await action.wait()
return password.results
2 changes: 1 addition & 1 deletion tests/integration/test_charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ async def test_build_and_deploy(ops_test: OpsTest):
kafka_charm = await ops_test.build_charm(".")
await asyncio.gather(
ops_test.model.deploy(
"zookeeper", channel="edge", application_name="zookeeper", num_units=1
"zookeeper", channel="edge", application_name="zookeeper", num_units=3
),
ops_test.model.deploy(kafka_charm, application_name="kafka", num_units=1),
)
Expand Down
67 changes: 67 additions & 0 deletions tests/integration/test_password_rotation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
#!/usr/bin/env python3
# Copyright 2022 Canonical Ltd.
# See LICENSE file for licensing details.

import asyncio
import logging

import pytest
from helpers import (
APP_NAME,
ZK_NAME,
get_kafka_zk_relation_data,
get_user,
set_password,
)
from pytest_operator.plugin import OpsTest

logger = logging.getLogger(__name__)


@pytest.mark.abort_on_fail
@pytest.mark.skip_if_deployed
async def test_build_and_deploy(ops_test: OpsTest):
kafka_charm = await ops_test.build_charm(".")
await asyncio.gather(
ops_test.model.deploy(ZK_NAME, channel="edge", application_name=ZK_NAME, num_units=3),
ops_test.model.deploy(kafka_charm, application_name=APP_NAME, num_units=1),
)
await ops_test.model.block_until(lambda: len(ops_test.model.applications[ZK_NAME].units) == 3)
await ops_test.model.wait_for_idle(apps=[APP_NAME, ZK_NAME])
assert ops_test.model.applications[APP_NAME].status == "waiting"
assert ops_test.model.applications[ZK_NAME].status == "active"

await ops_test.model.add_relation(APP_NAME, ZK_NAME)

async with ops_test.fast_forward():
await ops_test.model.wait_for_idle(apps=[APP_NAME, ZK_NAME])

assert ops_test.model.applications[APP_NAME].status == "active"
assert ops_test.model.applications[ZK_NAME].status == "active"


async def test_password_rotation(ops_test: OpsTest):
"""Check that password stored on ZK has changed after a password rotation."""
relation_data = get_kafka_zk_relation_data(
unit_name=f"{APP_NAME}/0", model_full_name=ops_test.model_full_name
)
uri = relation_data["uris"].split(",")[-1]

initial_sync_user = get_user(
username="sync",
zookeeper_uri=uri,
model_full_name=ops_test.model_full_name,
)

result = await set_password(ops_test, username="sync", num_unit=0)
assert "sync-password" in result.keys()

await ops_test.model.wait_for_idle(apps=[APP_NAME, ZK_NAME])

new_sync_user = get_user(
username="sync",
zookeeper_uri=uri,
model_full_name=ops_test.model_full_name,
)

assert initial_sync_user != new_sync_user
6 changes: 3 additions & 3 deletions tests/integration/test_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,14 @@ def usernames():

@pytest.mark.abort_on_fail
async def test_deploy_charms_relate_active(ops_test: OpsTest, usernames):
zk_charm = await ops_test.build_charm(".")
charm = await ops_test.build_charm(".")
app_charm = await ops_test.build_charm("tests/integration/app-charm")

await asyncio.gather(
ops_test.model.deploy(
"zookeeper", channel="edge", application_name="zookeeper", num_units=1
"zookeeper", channel="edge", application_name="zookeeper", num_units=3
),
ops_test.model.deploy(zk_charm, application_name=APP_NAME, num_units=1),
ops_test.model.deploy(charm, application_name=APP_NAME, num_units=1),
ops_test.model.deploy(app_charm, application_name=DUMMY_NAME_1, num_units=1),
)
await ops_test.model.wait_for_idle(apps=[APP_NAME, DUMMY_NAME_1, ZK])
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/test_scaling.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ async def test_kafka_simple_scale_up(ops_test: OpsTest):

await asyncio.gather(
ops_test.model.deploy(
"zookeeper", channel="edge", application_name="zookeeper", num_units=1
"zookeeper", channel="edge", application_name="zookeeper", num_units=3
),
ops_test.model.deploy(kafka_charm, application_name=APP_NAME, num_units=1),
)
Expand Down

0 comments on commit 8347139

Please sign in to comment.