diff --git a/config.yaml b/config.yaml index a74ec01c..88e1428b 100644 --- a/config.yaml +++ b/config.yaml @@ -12,17 +12,6 @@ options: clientPort=2181 listeners=SASL_PLAINTEXT://:9092 - # replication - # forces 3 node minimum to allow for rolling restarts - default.replication.factor=3 - # sensible default - num.partitions=3 - # replication.factor - 1 - min.insync.replicas=2 - transaction.state.log.replication.factor=3 - transaction.state.log.min.isr=2 - offsets.topic.replication.factor=3 - # offsets offsets.topic.num.partitions=50 offsets.commit.required.acks=-1 @@ -45,6 +34,9 @@ options: allow.everyone.if.no.acl.found=false super.users=User:sync listener.name.sasl_plaintext.sasl.enabled.mechanisms=SCRAM-SHA-512 + # zookeeper.set.acl=true + + ## Backup # background.threads=10 @@ -82,7 +74,6 @@ options: # socket.request.max.bytes=104857600 # socket.send.buffer.bytes=102400 # zookeeper.session.timeout.ms=6000 - # zookeeper.set.acl=false # connections.max.idle.ms=600000 # controlled.shutdown.enable=true # controlled.shutdown.max.retries=3 diff --git a/lib/charms/kafka/v0/kafka_snap.py b/lib/charms/kafka/v0/kafka_snap.py index 4ece20ad..3670e508 100644 --- a/lib/charms/kafka/v0/kafka_snap.py +++ b/lib/charms/kafka/v0/kafka_snap.py @@ -52,7 +52,7 @@ def _on_start(self, event): # Increment this PATCH version before using `charmcraft publish-lib` or reset # to 0 if you are raising the major API version -LIBPATCH = 2 +LIBPATCH = 3 SNAP_CONFIG_PATH = "/var/snap/kafka/common/" @@ -246,6 +246,5 @@ def run_bin_command(bin_keyword: str, bin_args: List[str], opts: List[str]) -> s logger.debug(f"{output=}") return output except subprocess.CalledProcessError as e: - logger.exception(e) logger.debug(f"cmd failed - cmd={e.cmd}, stdout={e.stdout}, stderr={e.stderr}") raise e diff --git a/metadata.yaml b/metadata.yaml index 097d0930..58bd0443 100644 --- a/metadata.yaml +++ b/metadata.yaml @@ -17,3 +17,7 @@ peers: requires: zookeeper: interface: zookeeper + +provides: + kafka: + interface: kafka diff --git a/src/charm.py b/src/charm.py index 2eece09e..da2e55eb 100755 --- a/src/charm.py +++ b/src/charm.py @@ -7,8 +7,10 @@ import logging import secrets import string +import subprocess from charms.kafka.v0.kafka_snap import KafkaSnap +from kafka_provides import KafkaProvider from ops.charm import CharmBase, RelationEvent, RelationJoinedEvent from ops.framework import EventBase from ops.main import main @@ -38,6 +40,7 @@ def __init__(self, *args): self.name = CHARM_KEY self.snap = KafkaSnap() self.kafka_config = KafkaConfig(self) + self.client_relations = KafkaProvider(self) self.framework.observe(getattr(self.on, "install"), self._on_install) self.framework.observe(getattr(self.on, "leader_elected"), self._on_leader_elected) @@ -108,10 +111,13 @@ def _on_start(self, event: EventBase) -> None: self.kafka_config.set_jaas_config() # do not start units until SCRAM users have been added to ZooKeeper for server-server auth - if self.unit.is_leader(): - if self.kafka_config.add_users_to_zookeeper(): + if self.unit.is_leader() and self.kafka_config.sync_password: + try: + self.kafka_config.add_user_to_zookeeper( + username="sync", password=self.kafka_config.sync_password + ) self.peer_relation.data[self.app].update({"broker-creds": "added"}) - else: + except subprocess.CalledProcessError: # command to add users fails sometimes for unknown reasons. Retry seems to fix it. event.defer() return diff --git a/src/kafka_config.py b/src/kafka_config.py index dca8dc2f..424caa30 100644 --- a/src/kafka_config.py +++ b/src/kafka_config.py @@ -5,8 +5,7 @@ """Manager for handling Kafka configuration.""" import logging -import subprocess -from typing import Dict, Optional +from typing import Dict, List, Optional from charms.kafka.v0.kafka_snap import SNAP_CONFIG_PATH, KafkaSnap, safe_write_to_file from ops.charm import CharmBase @@ -76,40 +75,82 @@ def set_kafka_opts() -> None: opts_string = " ".join(OPTS) safe_write_to_file(content=f"KAFKA_OPTS={opts_string}", path="/etc/environment", mode="a") - def set_server_properties(self) -> None: - """Sets all kafka config properties to the server.properties path.""" - base_config = self.charm.config["server-properties"] + @property + def default_replication_properties(self) -> List[str]: + """Builds replication-related properties based on the expected app size. + + Returns: + List of properties to be set + """ + replication_factor = min([3, self.charm.app.planned_units()]) + min_isr = max([1, replication_factor]) + + return [ + f"default.replication.factor={replication_factor}", + f"num.partitions={replication_factor}", + f"transaction.state.log.replication.factor={replication_factor}", + f"offsets.topic.replication.factor={replication_factor}", + f"min.insync.replicas={min_isr}", + f"transaction.state.log.min.isr={min_isr}", + ] + + @property + def auth_properties(self) -> List[str]: + """Builds properties necessary for inter-broker authorization through ZooKeeper. + + Returns: + List of properties to be set + """ broker_id = self.charm.unit.name.split("/")[1] host = ( self.charm.model.get_relation(PEER).data[self.charm.unit].get("private-address", None) ) - properties = ( - f"{base_config}\n" - f"broker.id={broker_id}\n" - f"advertised.listeners=SASL_PLAINTEXT://{host}:9092\n" - f'zookeeper.connect={self.zookeeper_config["connect"]}\n' - f'listener.name.sasl_plaintext.scram-sha-512.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="sync" password="{self.sync_password}";' + return [ + f"broker.id={broker_id}", + f"advertised.listeners=SASL_PLAINTEXT://{host}:9092", + f'zookeeper.connect={self.zookeeper_config["connect"]}', + f'listener.name.sasl_plaintext.scram-sha-512.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="sync" password="{self.sync_password}";', + ] + + def set_server_properties(self) -> None: + """Sets all kafka config properties to the server.properties path.""" + base_config = self.charm.config["server-properties"] + server_properties = ( + [f"{base_config}"] + self.default_replication_properties + self.auth_properties ) safe_write_to_file( - content=properties, path=f"{SNAP_CONFIG_PATH}/server.properties", mode="w" + content="\n".join(server_properties), + path=f"{SNAP_CONFIG_PATH}/server.properties", + mode="w", ) - def add_users_to_zookeeper(self) -> bool: - """Adds user credentials to ZooKeeper to support inter-broker auth. + def add_user_to_zookeeper(self, username: str, password: str) -> None: + """Adds user credentials to ZooKeeper for authorising clients and brokers. - Returns: - True if successful. False otherwise + Raises: + subprocess.CalledProcessError: If the command failed + """ + command = [ + f"--zookeeper={self.zookeeper_config['connect']}", + "--alter", + "--entity-type=users", + f"--entity-name={username}", + f"--add-config=SCRAM-SHA-512=[password={password}]", + ] + KafkaSnap.run_bin_command(bin_keyword="configs", bin_args=command, opts=OPTS) + + def delete_user_from_zookeeper(self, username: str) -> None: + """Deletes user credentials from ZooKeeper for authorising clients and brokers. + + Raises: + subprocess.CalledProcessError: If the command failed """ command = [ f"--zookeeper={self.zookeeper_config['connect']}", "--alter", "--entity-type=users", - "--entity-name=sync", - f"--add-config=SCRAM-SHA-512=[password={self.sync_password}]", + f"--entity-name={username}", + "--delete-config=SCRAM-SHA-512", ] - try: - KafkaSnap.run_bin_command(bin_keyword="configs", bin_args=command, opts=OPTS) - return True - except subprocess.CalledProcessError: - return False + KafkaSnap.run_bin_command(bin_keyword="configs", bin_args=command, opts=OPTS) diff --git a/src/kafka_provider.py b/src/kafka_provider.py new file mode 100644 index 00000000..2b07c47e --- /dev/null +++ b/src/kafka_provider.py @@ -0,0 +1,109 @@ +#!/usr/bin/env python3 +# Copyright 2022 Canonical Ltd. +# See LICENSE file for licensing details. + +"""KafkaProvider class and methods.""" + +import logging +import secrets +import string +from typing import Dict + +from ops.charm import RelationBrokenEvent, RelationJoinedEvent +from ops.framework import Object +from ops.model import Relation + +REL_NAME = "kafka" +PEER = "cluster" + +logger = logging.getLogger(__name__) + + +class KafkaProvider(Object): + """Implements the provider-side logic for client applications relating to Kafka.""" + + def __init__(self, charm) -> None: + super().__init__(charm, "client") + + self.charm = charm + + self.framework.observe( + self.charm.on[REL_NAME].relation_joined, self._on_client_relation_joined + ) + self.framework.observe( + self.charm.on[REL_NAME].relation_broken, self._on_client_relation_broken + ) + + @property + def app_relation(self) -> Relation: + """The Kafka cluster's peer relation.""" + return self.charm.model.get_relation(PEER) + + def relation_config(self, relation: Relation) -> Dict[str, str]: + """Builds necessary relation data for a given relation. + + Args: + event: the event needing config + + Returns: + Dict of `username`, `password` and `endpoints` data for the related app + """ + username = f"relation-{relation.id}" + password = self.app_relation.data[self.charm.app].get(username, self.generate_password()) + units = set([self.charm.unit] + list(self.app_relation.units)) + endpoints = [self.app_relation.data[unit]["private-address"] for unit in units] + + return {"username": username, "password": password, "endpoints": ",".join(endpoints)} + + def _on_client_relation_joined(self, event: RelationJoinedEvent) -> None: + """Handler for `relation_joined` events.""" + if not self.charm.unit.is_leader(): + return + + relation_config = self.relation_config(relation=event.relation) + + self.add_user(username=relation_config["username"], password=relation_config["password"]) + event.relation.data[self.charm.app].update(relation_config) + + def _on_client_relation_broken(self, event: RelationBrokenEvent) -> None: + """Handler for `relation_broken` events.""" + if not self.charm.unit.is_leader(): + return + + relation_config = self.relation_config(relation=event.relation) + + self.delete_user(username=relation_config["username"]) + + def add_user(self, username: str, password: str) -> None: + """Adds/updates users' SCRAM credentials to ZooKeeper. + + Args: + username: the user's username + password: the user's password + + Raises: + subprocess.CalledProcessError: if the command failed + """ + self.charm.kafka_config.add_user_to_zookeeper(username=username, password=password) + self.app_relation.data[self.charm.app].update({username: password}) + + def delete_user(self, username: str) -> None: + """Deletes users' SCRAM credentials from ZooKeeper. + + Args: + username: the user's username + + Raises: + subprocess.CalledProcessError: if the command failed + """ + self.charm.kafka_config.delete_user_from_zookeeper(username=username) + self.app_relation.data[self.charm.app].update({username: ""}) + + @staticmethod + def generate_password(): + """Creates randomized string for use as app passwords. + + Returns: + String of 32 randomized letter+digit characters + """ + return "".join([secrets.choice(string.ascii_letters + string.digits) for _ in range(32)]) diff --git a/tests/integration/app-charm/charmcraft.yaml b/tests/integration/app-charm/charmcraft.yaml new file mode 100644 index 00000000..e109b8b2 --- /dev/null +++ b/tests/integration/app-charm/charmcraft.yaml @@ -0,0 +1,11 @@ +# Copyright 2022 Canonical Ltd. +# See LICENSE file for licensing details. + +type: charm +bases: + - build-on: + - name: "ubuntu" + channel: "20.04" + run-on: + - name: "ubuntu" + channel: "20.04" diff --git a/tests/integration/app-charm/metadata.yaml b/tests/integration/app-charm/metadata.yaml new file mode 100644 index 00000000..de52389d --- /dev/null +++ b/tests/integration/app-charm/metadata.yaml @@ -0,0 +1,17 @@ +# Copyright 2022 Canonical Ltd. +# See LICENSE file for licensing details. + +name: application +description: | + Dummy charm used in integration tests for Kafka. +summary: | + Dummy charm application meant to be used + only for testing of the libs in this repository. + +peers: + cluster: + interface: cluster + +requires: + kafka: + interface: kafka diff --git a/tests/integration/app-charm/requirements.txt b/tests/integration/app-charm/requirements.txt new file mode 100644 index 00000000..56f5f642 --- /dev/null +++ b/tests/integration/app-charm/requirements.txt @@ -0,0 +1 @@ +ops >= 1.5.0 diff --git a/tests/integration/app-charm/src/charm.py b/tests/integration/app-charm/src/charm.py new file mode 100755 index 00000000..c6235cd7 --- /dev/null +++ b/tests/integration/app-charm/src/charm.py @@ -0,0 +1,56 @@ +#!/usr/bin/env python3 +# Copyright 2022 Canonical Ltd. +# See LICENSE file for licensing details. + +"""Application charm that connects to database charms. + +This charm is meant to be used only for testing +of the libraries in this repository. +""" + +import logging + +from ops.charm import CharmBase, RelationEvent +from ops.main import main +from ops.model import ActiveStatus + +logger = logging.getLogger(__name__) + + +CHARM_KEY = "app" +PEER = "cluster" +REL_NAME = "kafka" + + +class ApplicationCharm(CharmBase): + """Application charm that connects to database charms.""" + + def __init__(self, *args): + super().__init__(*args) + self.name = CHARM_KEY + + self.framework.observe(getattr(self.on, "start"), self._on_start) + self.framework.observe(self.on[REL_NAME].relation_changed, self._log) + self.framework.observe(self.on[REL_NAME].relation_broken, self._log) + self.framework.observe(self.on[REL_NAME].relation_joined, self._set_data) + + @property + def relation(self): + return self.model.get_relation(REL_NAME) + + def _on_start(self, _) -> None: + self.unit.status = ActiveStatus() + + def _set_data(self, _) -> None: + if not self.unit.is_leader(): + return + + # reasonable confidence there won't be conflicting chroots + self.relation.data[self.app].update({"password": "thisisatestpassword"}) + + def _log(self, event: RelationEvent): + return + + +if __name__ == "__main__": + main(ApplicationCharm) diff --git a/tests/integration/helpers.py b/tests/integration/helpers.py new file mode 100644 index 00000000..efe5b67b --- /dev/null +++ b/tests/integration/helpers.py @@ -0,0 +1,55 @@ +#!/usr/bin/env python3 +# Copyright 2022 Canonical Ltd. +# See LICENSE file for licensing details. +import re +from pathlib import Path +from subprocess import PIPE, check_output +from typing import Any, List, Tuple + +import yaml + +METADATA = yaml.safe_load(Path("./metadata.yaml").read_text()) +APP_NAME = METADATA["name"] + + +def check_user(model_full_name: str, username: str, zookeeper_uri: str) -> None: + result = check_output( + f"JUJU_MODEL={model_full_name} juju ssh kafka/0 'kafka.configs --zookeeper {zookeeper_uri} --describe --entity-type users --entity-name {username}'", + stderr=PIPE, + shell=True, + universal_newlines=True, + ) + + assert "SCRAM-SHA-512" in result + + +def show_unit(unit_name: str, model_full_name: str) -> Any: + result = check_output( + f"JUJU_MODEL={model_full_name} juju show-unit {unit_name}", + stderr=PIPE, + shell=True, + universal_newlines=True, + ) + + return yaml.safe_load(result) + + +def get_zookeeper_connection(unit_name: str, model_full_name: str) -> Tuple[List[str], str]: + result = show_unit(unit_name=unit_name, model_full_name=model_full_name) + + relations_info = result[unit_name]["relation-info"] + + usernames = [] + zookeeper_uri = "" + for info in relations_info: + if info["endpoint"] == "cluster": + for key in info["application-data"].keys(): + if re.match(r"(relation\-[/d]+)", key): + usernames.append(key) + if info["endpoint"] == "zookeeper": + zookeeper_uri = info["application-data"]["uris"] + + if zookeeper_uri and usernames: + return usernames, zookeeper_uri + else: + raise Exception("config not found") diff --git a/tests/integration/test_kafka_provider.py b/tests/integration/test_kafka_provider.py new file mode 100644 index 00000000..15380409 --- /dev/null +++ b/tests/integration/test_kafka_provider.py @@ -0,0 +1,101 @@ +#!/usr/bin/env python3 +# Copyright 2022 Canonical Ltd. +# See LICENSE file for licensing details. + +import asyncio +import logging + +import pytest +from pytest_operator.plugin import OpsTest + +from tests.integration.helpers import check_user, get_zookeeper_connection + +logger = logging.getLogger(__name__) + +APP_NAME = "kafka" +ZK = "zookeeper" +DUMMY_NAME_1 = "app" +DUMMY_NAME_2 = "appii" + + +@pytest.fixture(scope="module") +def usernames(): + return {} + + +@pytest.mark.abort_on_fail +async def test_deploy_charms_relate_active(ops_test: OpsTest, usernames): + zk_charm = await ops_test.build_charm(".") + app_charm = await ops_test.build_charm("tests/integration/app-charm") + + await asyncio.gather( + ops_test.model.deploy( + "zookeeper", channel="edge", application_name="zookeeper", num_units=3 + ), + ops_test.model.deploy(zk_charm, application_name=APP_NAME, num_units=3), + ops_test.model.deploy(app_charm, application_name=DUMMY_NAME_1, num_units=1), + ) + await ops_test.model.wait_for_idle(apps=[APP_NAME, DUMMY_NAME_1, ZK]) + await ops_test.model.add_relation(APP_NAME, ZK) + await ops_test.model.wait_for_idle(apps=[APP_NAME, ZK]) + await ops_test.model.add_relation(APP_NAME, DUMMY_NAME_1) + await ops_test.model.wait_for_idle(apps=[APP_NAME, DUMMY_NAME_1]) + assert ops_test.model.applications[APP_NAME].status == "active" + assert ops_test.model.applications[DUMMY_NAME_1].status == "active" + + # implicitly tests setting of kafka app data + returned_usernames, zookeeper_uri = get_zookeeper_connection( + unit_name="kafka/0", model_full_name=ops_test.model_full_name + ) + usernames.add(returned_usernames) + + for username in usernames: + check_user( + username=username, + zookeeper_uri=zookeeper_uri, + model_full_name=ops_test.model_full_name, + ) + + +@pytest.mark.abort_on_fail +async def test_deploy_multiple_charms_relate_active(ops_test: OpsTest, usernames): + appii_charm = await ops_test.build_charm("tests/integration/app-charm") + await ops_test.model.deploy(appii_charm, application_name=DUMMY_NAME_2, num_units=1), + await ops_test.model.wait_for_idle(apps=[DUMMY_NAME_2]) + await ops_test.model.add_relation(APP_NAME, DUMMY_NAME_2) + await ops_test.model.wait_for_idle(apps=[APP_NAME, DUMMY_NAME_2]) + assert ops_test.model.applications[APP_NAME].status == "active" + assert ops_test.model.applications[DUMMY_NAME_1].status == "active" + assert ops_test.model.applications[DUMMY_NAME_2].status == "active" + + returned_usernames, zookeeper_uri = get_zookeeper_connection( + unit_name="kafka/0", model_full_name=ops_test.model_full_name + ) + usernames.add(returned_usernames) + + for username in usernames: + check_user( + username=username, + zookeeper_uri=zookeeper_uri, + model_full_name=ops_test.model_full_name, + ) + + +@pytest.mark.abort_on_fail +async def test_remove_application_removes_user(ops_test: OpsTest, usernames): + await ops_test.model.applications[DUMMY_NAME_1].remove() + await ops_test.model.wait_for_idle(apps=[APP_NAME]) + assert ops_test.model.applications[APP_NAME].status == "active" + + usernames, zookeeper_uri = get_zookeeper_connection( + unit_name="kafka/0", model_full_name=ops_test.model_full_name + ) + + # checks that past usernames no longer exist in ZooKeeper + with pytest.raises(AssertionError): + for username in usernames: + check_user( + username=username, + zookeeper_uri=zookeeper_uri, + model_full_name=ops_test.model_full_name, + ) diff --git a/tests/unit/test_kafka_provider.py b/tests/unit/test_kafka_provider.py new file mode 100644 index 00000000..73d6a4d1 --- /dev/null +++ b/tests/unit/test_kafka_provider.py @@ -0,0 +1,87 @@ +#!/usr/bin/env python3 +# Copyright 2022 Canonical Ltd. +# See LICENSE file for licensing details. + +import logging +import unittest +from collections import namedtuple + +import ops.testing +from ops.charm import CharmBase +from ops.testing import Harness + +from kafka_provider import KafkaProvider + +ops.testing.SIMULATE_CAN_CONNECT = True + +logger = logging.getLogger(__name__) + +METADATA = """ + name: kafka + peers: + cluster: + interface: cluster + provides: + kafka: + interface: kafka +""" + +CustomRelation = namedtuple("Relation", ["id"]) + + +class DummyKafkaCharm(CharmBase): + def __init__(self, *args): + super().__init__(*args) + self.client_relation = KafkaProvider(self) + + +class TestProvider(unittest.TestCase): + def setUp(self): + self.harness = Harness(DummyKafkaCharm, meta=METADATA) + self.addCleanup(self.harness.cleanup) + self.harness.begin_with_initial_hooks() + + @property + def provider(self): + return self.harness.charm.client_relation + + def test_relation_config_new_relation_no_password(self): + self.harness.set_leader(True) + relation_id = self.harness.add_relation("kafka", "client_app") + self.harness.update_relation_data( + self.provider.app_relation.id, "kafka/0", {"private-address": "treebeard"} + ) + self.harness.add_relation_unit(self.provider.app_relation.id, "kafka/1") + self.harness.update_relation_data( + self.provider.app_relation.id, "kafka/1", {"private-address": "shelob"} + ) + + config = self.harness.charm.client_relation.relation_config( + relation=self.harness.charm.model.get_relation( + relation_name="kafka", relation_id=relation_id + ) + ) + + self.assertEqual(sorted(["endpoints", "password", "username"]), sorted(config.keys())) + self.assertEqual(sorted(config["endpoints"].split(",")), ["shelob", "treebeard"]) + self.assertEqual(len(config["password"]), 32) + + def test_relation_config_existing_relation_password(self): + self.harness.set_leader(True) + relation_id = self.harness.add_relation("kafka", "client_app") + self.harness.update_relation_data( + self.harness.charm.model.get_relation("cluster").id, + "kafka", + {"relation-1": "keepitsecret"}, + ) + self.harness.update_relation_data( + self.provider.app_relation.id, "kafka/0", {"private-address": "treebeard"} + ) + + config = self.harness.charm.client_relation.relation_config( + relation=self.harness.charm.model.get_relation( + relation_name="kafka", relation_id=relation_id + ) + ) + + self.assertEqual(config["password"], "keepitsecret")