Skip to content

Commit

Permalink
refactor: fix storage attached failures, tighten up status checks (#83)
Browse files Browse the repository at this point in the history
  • Loading branch information
marcoppenheimer authored Mar 16, 2023
1 parent 2eb1780 commit f4088b6
Show file tree
Hide file tree
Showing 17 changed files with 703 additions and 431 deletions.
368 changes: 194 additions & 174 deletions src/charm.py

Large diffs are not rendered by default.

55 changes: 40 additions & 15 deletions src/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from literals import (
ADMIN_USER,
INTER_BROKER_USER,
INTERNAL_USERS,
PEER,
REL_NAME,
SECURITY_PROTOCOL_PORTS,
Expand Down Expand Up @@ -114,12 +115,17 @@ def internal_user_credentials(self) -> Dict[str, str]:
Returns:
Dict of usernames and passwords
"""
return {
credentials = {
user: password
for user in [INTER_BROKER_USER, ADMIN_USER]
for user in INTERNAL_USERS
if (password := self.charm.get_secret(scope="app", key=f"{user}-password"))
}

if not len(credentials) == len(INTERNAL_USERS):
return {}

return credentials

@property
def zookeeper_config(self) -> Dict[str, str]:
"""The config from current ZooKeeper relations for data necessary for broker connection.
Expand Down Expand Up @@ -153,13 +159,22 @@ def zookeeper_config(self) -> Dict[str, str]:

return zookeeper_config

@property
def zookeeper_related(self) -> bool:
"""Checks if there is a relation with ZooKeeper.
Returns:
True if there is a ZooKeeper relation. Otherwise False
"""
return bool(self.charm.model.relations[ZK])

@property
def zookeeper_connected(self) -> bool:
"""Checks if there is an active ZooKeeper relation.
"""Checks if there is an active ZooKeeper relation with all necessary data.
Returns:
True if ZooKeeper is currently related with sufficient relation data
for a broker to connect with. False otherwise.
for a broker to connect with. Otherwise False
"""
if self.zookeeper_config.get("connect", None):
return True
Expand Down Expand Up @@ -199,7 +214,7 @@ def bootstrap_server(self) -> List[str]:
]
port = (
SECURITY_PROTOCOL_PORTS["SASL_SSL"].client
if self.charm.tls.enabled
if (self.charm.tls.enabled and self.charm.tls.certificate)
else SECURITY_PROTOCOL_PORTS["SASL_PLAINTEXT"].client
)
return [f"{host}:{port}" for host in hosts]
Expand Down Expand Up @@ -274,15 +289,18 @@ def scram_properties(self) -> List[str]:
Returns:
list of scram properties to be set
"""
username = INTER_BROKER_USER
password = self.internal_user_credentials.get(INTER_BROKER_USER, "")

scram_properties = [
f'listener.name.{self.internal_listener.name.lower()}.scram-sha-512.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="{INTER_BROKER_USER}" password="{self.internal_user_credentials[INTER_BROKER_USER]}";'
f'listener.name.{self.internal_listener.name.lower()}.scram-sha-512.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="{username}" password="{password}";'
]
client_scram = [
auth.name for auth in self.client_listeners if auth.protocol.startswith("SASL_")
]
for name in client_scram:
scram_properties.append(
f'listener.name.{name.lower()}.scram-sha-512.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="{INTER_BROKER_USER}" password="{self.internal_user_credentials[INTER_BROKER_USER]}";'
f'listener.name.{name.lower()}.scram-sha-512.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="{username}" password="{password}";'
)

return scram_properties
Expand All @@ -291,7 +309,11 @@ def scram_properties(self) -> List[str]:
def security_protocol(self) -> AuthMechanism:
"""Infers current charm security.protocol based on current relations."""
# FIXME: When we have multiple auth_mechanims/listeners, remove this method
return "SASL_SSL" if self.charm.tls.enabled else "SASL_PLAINTEXT"
return (
"SASL_SSL"
if (self.charm.tls.enabled and self.charm.tls.certificate)
else "SASL_PLAINTEXT"
)

@property
def auth_mechanisms(self) -> List[AuthMechanism]:
Expand Down Expand Up @@ -335,7 +357,7 @@ def super_users(self) -> str:
Returns:
Semicolon delimited string of current super users
"""
super_users = [INTER_BROKER_USER, ADMIN_USER]
super_users = set(INTERNAL_USERS)
for relation in self.charm.model.relations[REL_NAME]:
extra_user_roles = relation.data[relation.app].get("extra-user-roles", "")
password = (
Expand All @@ -345,9 +367,9 @@ def super_users(self) -> str:
)
# if passwords are set for client admins, they're good to load
if "admin" in extra_user_roles and password is not None:
super_users.append(f"relation-{relation.id}")
super_users.add(f"relation-{relation.id}")

super_users_arg = [f"User:{user}" for user in super_users]
super_users_arg = sorted([f"User:{user}" for user in super_users])

return ";".join(super_users_arg)

Expand All @@ -371,14 +393,17 @@ def client_properties(self) -> List[str]:
Returns:
List of properties to be set
"""
username = ADMIN_USER
password = self.internal_user_credentials.get(ADMIN_USER, "")

client_properties = [
f'sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="{ADMIN_USER}" password="{self.internal_user_credentials[ADMIN_USER]}";',
f'sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="{username}" password="{password}";',
"sasl.mechanism=SCRAM-SHA-512",
f"security.protocol={self.security_protocol}", # FIXME: will need changing once multiple listener auth schemes
f"bootstrap.servers={','.join(self.bootstrap_server)}",
]

if self.charm.tls.enabled:
if self.charm.tls.enabled and self.charm.tls.certificate:
client_properties += self.tls_properties

return client_properties
Expand Down Expand Up @@ -415,9 +440,9 @@ def server_properties(self) -> List[str]:
+ DEFAULT_CONFIG_OPTIONS.split("\n")
)

if self.charm.tls.enabled:
if self.charm.tls.enabled and self.charm.tls.certificate:
properties += self.tls_properties + self.zookeeper_tls_properties
logger.debug(f"server properties: {properties}")

return properties

@property
Expand Down
42 changes: 42 additions & 0 deletions src/literals.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,11 @@
"""Collection of globals common to the KafkaCharm."""

from dataclasses import dataclass
from enum import Enum
from typing import Dict, Literal

from ops.model import ActiveStatus, BlockedStatus, MaintenanceStatus, StatusBase, WaitingStatus

CHARM_KEY = "kafka"
SNAP_NAME = "charmed-kafka"
NODE_EXPORTER_SNAP_NAME = "node-exporter"
Expand All @@ -19,9 +22,11 @@
TLS_RELATION = "certificates"
TRUSTED_CERTIFICATE_RELATION = "trusted-certificate"
TRUSTED_CA_RELATION = "trusted-ca"
INTERNAL_USERS = [INTER_BROKER_USER, ADMIN_USER]

AuthMechanism = Literal["SASL_PLAINTEXT", "SASL_SSL", "SSL"]
Scope = Literal["INTERNAL", "CLIENT"]
DebugLevel = Literal["DEBUG", "INFO", "WARNING", "ERROR"]


@dataclass
Expand All @@ -35,3 +40,40 @@ class Ports:
"SASL_SSL": Ports(9093, 19093),
"SSL": Ports(9094, 19094),
}


@dataclass
class StatusLevel:
status: StatusBase
log_level: DebugLevel


class Status(Enum):
ACTIVE = StatusLevel(ActiveStatus(), "DEBUG")
NO_PEER_RELATION = StatusLevel(MaintenanceStatus("no peer relation yet"), "DEBUG")
SNAP_NOT_INSTALLED = StatusLevel(BlockedStatus(f"unable to install {SNAP_NAME} snap"), "ERROR")
SNAP_NOT_RUNNING = StatusLevel(BlockedStatus("snap service not running"), "WARNING")
ZK_NOT_RELATED = StatusLevel(BlockedStatus("missing required zookeeper relation"), "ERROR")
ZK_NOT_CONNECTED = StatusLevel(BlockedStatus("unit not connected to zookeeper"), "ERROR")
ZK_TLS_MISMATCH = StatusLevel(
BlockedStatus("tls must be enabled on both kafka and zookeeper"), "ERROR"
)
ZK_NO_DATA = StatusLevel(WaitingStatus("zookeeper credentials not created yet"), "INFO")
ADDED_STORAGE = StatusLevel(
ActiveStatus("manual partition reassignment may be needed to utilize new storage volumes"),
"WARNING",
)
REMOVED_STORAGE = StatusLevel(
ActiveStatus(
"manual partition reassignment from replicated brokers recommended due to lost partitions on removed storage volumes"
),
"ERROR",
)
REMOVED_STORAGE_NO_REPL = StatusLevel(
ActiveStatus("potential log-data loss due to storage removal without replication"),
"ERROR",
)
NO_BROKER_CREDS = StatusLevel(
WaitingStatus("internal broker credentials not yet added"), "INFO"
)
NO_CERT = StatusLevel(WaitingStatus("unit waiting for signed certificates"), "INFO")
15 changes: 6 additions & 9 deletions src/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,7 @@ def __init__(self, charm) -> None:

def on_topic_requested(self, event: TopicRequestedEvent):
"""Handle the on topic requested event."""
if not self.charm.ready_to_start:
logger.debug("cannot add user, ZooKeeper not yet connected")
event.defer()
return

if not self.charm.kafka_config.zookeeper_connected:
logger.debug("cannot update ACLs, ZooKeeper not yet connected")
if not self.charm.healthy:
event.defer()
return

Expand Down Expand Up @@ -113,11 +107,14 @@ def _on_relation_broken(self, event: RelationBrokenEvent) -> None:
Args:
event: the event from a related client application needing a user
"""
# don't remove anything if app is going down
if self.charm.app.planned_units == 0:
return

if not self.charm.unit.is_leader():
return

if not self.charm.ready_to_start:
logger.debug("cannot remove user, ZooKeeper not yet connected")
if not self.charm.healthy:
event.defer()
return

Expand Down
29 changes: 28 additions & 1 deletion src/snap.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@

from charms.operator_libs_linux.v0 import apt
from charms.operator_libs_linux.v1 import snap
from tenacity import retry
from tenacity.retry import retry_if_not_result
from tenacity.stop import stop_after_attempt
from tenacity.wait import wait_fixed

from literals import NODE_EXPORTER_SNAP_NAME, SNAP_NAME

Expand All @@ -27,7 +31,7 @@ def __init__(self) -> None:
self.kafka = snap.SnapCache()[SNAP_NAME]

def install(self) -> bool:
"""Loads the Kafka snap from LP, returning a StatusBase for the Charm to set.
"""Loads the Kafka snap from LP.
Returns:
True if successfully installed. False otherwise.
Expand Down Expand Up @@ -126,6 +130,29 @@ def disable_enable(self, snap_service: str) -> None:
subprocess.run(f"snap disable {snap_service}", shell=True)
subprocess.run(f"snap enable {snap_service}", shell=True)

@retry(
wait=wait_fixed(1),
stop=stop_after_attempt(5),
retry_error_callback=lambda state: state.outcome.result(),
retry=retry_if_not_result(lambda result: True if result else False),
)
def active(self, snap_service: str) -> bool:
"""Checks if service is active.
Args:
snap_service: The desired service to check active
Returns:
True if service is active. Otherwise False
Raises:
KeyError if service does not exist
"""
try:
return bool(self.kafka.services[snap_service]["active"])
except KeyError:
return False

@staticmethod
def run_bin_command(bin_keyword: str, bin_args: List[str], opts: List[str]) -> str:
"""Runs kafka bin command with desired args.
Expand Down
15 changes: 6 additions & 9 deletions src/tls.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,10 +193,10 @@ def _trusted_relation_broken(self, event: RelationBrokenEvent) -> None:
return

# All units will need to remove the cert from their truststore
relation = event.relation
alias = self.generate_alias(app_name=relation.app.name, relation_id=relation.id)
filename = f"{alias}.pem"
self.remove_cert(alias=alias, filename=filename)
alias = self.generate_alias(
app_name=event.relation.app.name, relation_id=event.relation.id
)
self.remove_cert(alias=alias)

# The leader will also handle removing the "mtls" flag if needed
if not self.charm.unit.is_leader():
Expand Down Expand Up @@ -366,12 +366,9 @@ def _request_certificate(self):
@property
def _sans(self) -> Dict[str, List[str]]:
"""Builds a SAN dict of DNS names and IPs for the unit."""
unit_host = self.peer_relation.data[self.charm.unit].get("private-address", "")
unit_name = self.charm.unit.name

return {
"sans_ip": [unit_host],
"sans_dns": [unit_name, socket.getfqdn()],
"sans_ip": [self.charm.unit_host],
"sans_dns": [self.charm.unit.name, socket.getfqdn()],
}

def generate_alias(self, app_name: str, relation_id: int) -> str:
Expand Down
4 changes: 2 additions & 2 deletions src/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import string
from typing import Dict, List, Optional, Set

from charms.zookeeper.v0.client import ZooKeeperManager
from charms.zookeeper.v0.client import QuorumLeaderNotFoundError, ZooKeeperManager
from kazoo.exceptions import AuthFailedError, NoNodeError
from ops.model import Unit
from tenacity import retry
Expand Down Expand Up @@ -66,7 +66,7 @@ def get_active_brokers(zookeeper_config: Dict[str, str]) -> Set[str]:
try:
brokers = zk.leader_znodes(path=path)
# auth might not be ready with ZK after relation yet
except (NoNodeError, AuthFailedError) as e:
except (NoNodeError, AuthFailedError, QuorumLeaderNotFoundError) as e:
logger.debug(str(e))
return set()

Expand Down
2 changes: 1 addition & 1 deletion tests/integration/conftest.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#!/usr/bin/env python3
# Copyright 2022 Canonical Ltd.
# Copyright 2023 Canonical Ltd.
# See LICENSE file for licensing details.

import json
Expand Down
9 changes: 5 additions & 4 deletions tests/integration/test_charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,14 @@ async def test_build_and_deploy(ops_test: OpsTest, kafka_charm):
ops_test.model.deploy(kafka_charm, application_name=APP_NAME, num_units=1, series="jammy"),
)
await ops_test.model.wait_for_idle(apps=[APP_NAME, ZK_NAME])
assert ops_test.model.applications[APP_NAME].status == "waiting"
assert ops_test.model.applications[APP_NAME].status == "blocked"
assert ops_test.model.applications[ZK_NAME].status == "active"

await ops_test.model.add_relation(APP_NAME, ZK_NAME)
await ops_test.model.wait_for_idle(apps=[APP_NAME, ZK_NAME])
assert ops_test.model.applications[APP_NAME].status == "active"
assert ops_test.model.applications[ZK_NAME].status == "active"
async with ops_test.fast_forward():
await ops_test.model.wait_for_idle(apps=[APP_NAME, ZK_NAME], idle_period=30)
assert ops_test.model.applications[APP_NAME].status == "active"
assert ops_test.model.applications[ZK_NAME].status == "active"


@pytest.mark.abort_on_fail
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/test_password_rotation.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ async def test_build_and_deploy(ops_test: OpsTest, kafka_charm):
)
await ops_test.model.block_until(lambda: len(ops_test.model.applications[ZK_NAME].units) == 3)
await ops_test.model.wait_for_idle(apps=[APP_NAME, ZK_NAME])
assert ops_test.model.applications[APP_NAME].status == "waiting"
assert ops_test.model.applications[APP_NAME].status == "blocked"
assert ops_test.model.applications[ZK_NAME].status == "active"

await ops_test.model.add_relation(APP_NAME, ZK_NAME)
Expand Down
Loading

0 comments on commit f4088b6

Please sign in to comment.