Skip to content

Commit

Permalink
Merge pull request #9 from canonical/user_acls
Browse files Browse the repository at this point in the history
feat: add provider relatation for adding/deleting users of related clients
  • Loading branch information
marcoppenheimer authored Jul 18, 2022
2 parents e25e045 + da25c3b commit 09a051e
Show file tree
Hide file tree
Showing 13 changed files with 518 additions and 40 deletions.
15 changes: 3 additions & 12 deletions config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,6 @@ options:
clientPort=2181
listeners=SASL_PLAINTEXT://:9092
# replication
# forces 3 node minimum to allow for rolling restarts
default.replication.factor=3
# sensible default
num.partitions=3
# replication.factor - 1
min.insync.replicas=2
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=2
offsets.topic.replication.factor=3
# offsets
offsets.topic.num.partitions=50
offsets.commit.required.acks=-1
Expand All @@ -45,6 +34,9 @@ options:
allow.everyone.if.no.acl.found=false
super.users=User:sync
listener.name.sasl_plaintext.sasl.enabled.mechanisms=SCRAM-SHA-512
# zookeeper.set.acl=true
## Backup
# background.threads=10
Expand Down Expand Up @@ -82,7 +74,6 @@ options:
# socket.request.max.bytes=104857600
# socket.send.buffer.bytes=102400
# zookeeper.session.timeout.ms=6000
# zookeeper.set.acl=false
# connections.max.idle.ms=600000
# controlled.shutdown.enable=true
# controlled.shutdown.max.retries=3
Expand Down
3 changes: 1 addition & 2 deletions lib/charms/kafka/v0/kafka_snap.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def _on_start(self, event):

# Increment this PATCH version before using `charmcraft publish-lib` or reset
# to 0 if you are raising the major API version
LIBPATCH = 2
LIBPATCH = 3


SNAP_CONFIG_PATH = "/var/snap/kafka/common/"
Expand Down Expand Up @@ -246,6 +246,5 @@ def run_bin_command(bin_keyword: str, bin_args: List[str], opts: List[str]) -> s
logger.debug(f"{output=}")
return output
except subprocess.CalledProcessError as e:
logger.exception(e)
logger.debug(f"cmd failed - cmd={e.cmd}, stdout={e.stdout}, stderr={e.stderr}")
raise e
4 changes: 4 additions & 0 deletions metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,7 @@ peers:
requires:
zookeeper:
interface: zookeeper

provides:
kafka:
interface: kafka
12 changes: 9 additions & 3 deletions src/charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@
import logging
import secrets
import string
import subprocess

from charms.kafka.v0.kafka_snap import KafkaSnap
from kafka_provides import KafkaProvider
from ops.charm import CharmBase, RelationEvent, RelationJoinedEvent
from ops.framework import EventBase
from ops.main import main
Expand Down Expand Up @@ -38,6 +40,7 @@ def __init__(self, *args):
self.name = CHARM_KEY
self.snap = KafkaSnap()
self.kafka_config = KafkaConfig(self)
self.client_relations = KafkaProvider(self)

self.framework.observe(getattr(self.on, "install"), self._on_install)
self.framework.observe(getattr(self.on, "leader_elected"), self._on_leader_elected)
Expand Down Expand Up @@ -108,10 +111,13 @@ def _on_start(self, event: EventBase) -> None:
self.kafka_config.set_jaas_config()

# do not start units until SCRAM users have been added to ZooKeeper for server-server auth
if self.unit.is_leader():
if self.kafka_config.add_users_to_zookeeper():
if self.unit.is_leader() and self.kafka_config.sync_password:
try:
self.kafka_config.add_user_to_zookeeper(
username="sync", password=self.kafka_config.sync_password
)
self.peer_relation.data[self.app].update({"broker-creds": "added"})
else:
except subprocess.CalledProcessError:
# command to add users fails sometimes for unknown reasons. Retry seems to fix it.
event.defer()
return
Expand Down
87 changes: 64 additions & 23 deletions src/kafka_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@
"""Manager for handling Kafka configuration."""

import logging
import subprocess
from typing import Dict, Optional
from typing import Dict, List, Optional

from charms.kafka.v0.kafka_snap import SNAP_CONFIG_PATH, KafkaSnap, safe_write_to_file
from ops.charm import CharmBase
Expand Down Expand Up @@ -76,40 +75,82 @@ def set_kafka_opts() -> None:
opts_string = " ".join(OPTS)
safe_write_to_file(content=f"KAFKA_OPTS={opts_string}", path="/etc/environment", mode="a")

def set_server_properties(self) -> None:
"""Sets all kafka config properties to the server.properties path."""
base_config = self.charm.config["server-properties"]
@property
def default_replication_properties(self) -> List[str]:
"""Builds replication-related properties based on the expected app size.
Returns:
List of properties to be set
"""
replication_factor = min([3, self.charm.app.planned_units()])
min_isr = max([1, replication_factor])

return [
f"default.replication.factor={replication_factor}",
f"num.partitions={replication_factor}",
f"transaction.state.log.replication.factor={replication_factor}",
f"offsets.topic.replication.factor={replication_factor}",
f"min.insync.replicas={min_isr}",
f"transaction.state.log.min.isr={min_isr}",
]

@property
def auth_properties(self) -> List[str]:
"""Builds properties necessary for inter-broker authorization through ZooKeeper.
Returns:
List of properties to be set
"""
broker_id = self.charm.unit.name.split("/")[1]
host = (
self.charm.model.get_relation(PEER).data[self.charm.unit].get("private-address", None)
)
properties = (
f"{base_config}\n"
f"broker.id={broker_id}\n"
f"advertised.listeners=SASL_PLAINTEXT://{host}:9092\n"
f'zookeeper.connect={self.zookeeper_config["connect"]}\n'
f'listener.name.sasl_plaintext.scram-sha-512.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="sync" password="{self.sync_password}";'
return [
f"broker.id={broker_id}",
f"advertised.listeners=SASL_PLAINTEXT://{host}:9092",
f'zookeeper.connect={self.zookeeper_config["connect"]}',
f'listener.name.sasl_plaintext.scram-sha-512.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="sync" password="{self.sync_password}";',
]

def set_server_properties(self) -> None:
"""Sets all kafka config properties to the server.properties path."""
base_config = self.charm.config["server-properties"]
server_properties = (
[f"{base_config}"] + self.default_replication_properties + self.auth_properties
)

safe_write_to_file(
content=properties, path=f"{SNAP_CONFIG_PATH}/server.properties", mode="w"
content="\n".join(server_properties),
path=f"{SNAP_CONFIG_PATH}/server.properties",
mode="w",
)

def add_users_to_zookeeper(self) -> bool:
"""Adds user credentials to ZooKeeper to support inter-broker auth.
def add_user_to_zookeeper(self, username: str, password: str) -> None:
"""Adds user credentials to ZooKeeper for authorising clients and brokers.
Returns:
True if successful. False otherwise
Raises:
subprocess.CalledProcessError: If the command failed
"""
command = [
f"--zookeeper={self.zookeeper_config['connect']}",
"--alter",
"--entity-type=users",
f"--entity-name={username}",
f"--add-config=SCRAM-SHA-512=[password={password}]",
]
KafkaSnap.run_bin_command(bin_keyword="configs", bin_args=command, opts=OPTS)

def delete_user_from_zookeeper(self, username: str) -> None:
"""Deletes user credentials from ZooKeeper for authorising clients and brokers.
Raises:
subprocess.CalledProcessError: If the command failed
"""
command = [
f"--zookeeper={self.zookeeper_config['connect']}",
"--alter",
"--entity-type=users",
"--entity-name=sync",
f"--add-config=SCRAM-SHA-512=[password={self.sync_password}]",
f"--entity-name={username}",
"--delete-config=SCRAM-SHA-512",
]
try:
KafkaSnap.run_bin_command(bin_keyword="configs", bin_args=command, opts=OPTS)
return True
except subprocess.CalledProcessError:
return False
KafkaSnap.run_bin_command(bin_keyword="configs", bin_args=command, opts=OPTS)
109 changes: 109 additions & 0 deletions src/kafka_provider.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
#!/usr/bin/env python3
# Copyright 2022 Canonical Ltd.
# See LICENSE file for licensing details.

"""KafkaProvider class and methods."""

import logging
import secrets
import string
from typing import Dict

from ops.charm import RelationBrokenEvent, RelationJoinedEvent
from ops.framework import Object
from ops.model import Relation

REL_NAME = "kafka"
PEER = "cluster"

logger = logging.getLogger(__name__)


class KafkaProvider(Object):
"""Implements the provider-side logic for client applications relating to Kafka."""

def __init__(self, charm) -> None:
super().__init__(charm, "client")

self.charm = charm

self.framework.observe(
self.charm.on[REL_NAME].relation_joined, self._on_client_relation_joined
)
self.framework.observe(
self.charm.on[REL_NAME].relation_broken, self._on_client_relation_broken
)

@property
def app_relation(self) -> Relation:
"""The Kafka cluster's peer relation."""
return self.charm.model.get_relation(PEER)

def relation_config(self, relation: Relation) -> Dict[str, str]:
"""Builds necessary relation data for a given relation.
Args:
event: the event needing config
Returns:
Dict of `username`, `password` and `endpoints` data for the related app
"""
username = f"relation-{relation.id}"
password = self.app_relation.data[self.charm.app].get(username, self.generate_password())
units = set([self.charm.unit] + list(self.app_relation.units))
endpoints = [self.app_relation.data[unit]["private-address"] for unit in units]

return {"username": username, "password": password, "endpoints": ",".join(endpoints)}

def _on_client_relation_joined(self, event: RelationJoinedEvent) -> None:
"""Handler for `relation_joined` events."""
if not self.charm.unit.is_leader():
return

relation_config = self.relation_config(relation=event.relation)

self.add_user(username=relation_config["username"], password=relation_config["password"])
event.relation.data[self.charm.app].update(relation_config)

def _on_client_relation_broken(self, event: RelationBrokenEvent) -> None:
"""Handler for `relation_broken` events."""
if not self.charm.unit.is_leader():
return

relation_config = self.relation_config(relation=event.relation)

self.delete_user(username=relation_config["username"])

def add_user(self, username: str, password: str) -> None:
"""Adds/updates users' SCRAM credentials to ZooKeeper.
Args:
username: the user's username
password: the user's password
Raises:
subprocess.CalledProcessError: if the command failed
"""
self.charm.kafka_config.add_user_to_zookeeper(username=username, password=password)
self.app_relation.data[self.charm.app].update({username: password})

def delete_user(self, username: str) -> None:
"""Deletes users' SCRAM credentials from ZooKeeper.
Args:
username: the user's username
Raises:
subprocess.CalledProcessError: if the command failed
"""
self.charm.kafka_config.delete_user_from_zookeeper(username=username)
self.app_relation.data[self.charm.app].update({username: ""})

@staticmethod
def generate_password():
"""Creates randomized string for use as app passwords.
Returns:
String of 32 randomized letter+digit characters
"""
return "".join([secrets.choice(string.ascii_letters + string.digits) for _ in range(32)])
11 changes: 11 additions & 0 deletions tests/integration/app-charm/charmcraft.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# Copyright 2022 Canonical Ltd.
# See LICENSE file for licensing details.

type: charm
bases:
- build-on:
- name: "ubuntu"
channel: "20.04"
run-on:
- name: "ubuntu"
channel: "20.04"
17 changes: 17 additions & 0 deletions tests/integration/app-charm/metadata.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# Copyright 2022 Canonical Ltd.
# See LICENSE file for licensing details.

name: application
description: |
Dummy charm used in integration tests for Kafka.
summary: |
Dummy charm application meant to be used
only for testing of the libs in this repository.
peers:
cluster:
interface: cluster

requires:
kafka:
interface: kafka
1 change: 1 addition & 0 deletions tests/integration/app-charm/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ops >= 1.5.0
Loading

0 comments on commit 09a051e

Please sign in to comment.