Skip to content

Commit

Permalink
[DPE-3926] Enforce zookeeper client interface (#196)
Browse files Browse the repository at this point in the history
  • Loading branch information
Batalex authored May 28, 2024
1 parent 06c8bbe commit 47c661c
Show file tree
Hide file tree
Showing 10 changed files with 42 additions and 13 deletions.
21 changes: 17 additions & 4 deletions src/core/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,19 @@ def endpoints(self) -> str:
or ""
)

@property
def database(self) -> str:
"""Path allocated for Kafka on ZooKeeper."""
if not self.relation:
return ""

return (
self.data_interface.fetch_relation_field(
relation_id=self.relation.id, field="database"
)
or self.chroot
)

@property
def chroot(self) -> str:
"""Path allocated for Kafka on ZooKeeper."""
Expand Down Expand Up @@ -320,8 +333,8 @@ def tls(self) -> bool:
@property
def connect(self) -> str:
"""Full connection string of sorted uris."""
sorted_uris = sorted(self.uris.replace(self.chroot, "").split(","))
sorted_uris[-1] = sorted_uris[-1] + self.chroot
sorted_uris = sorted(self.uris.replace(self.database, "").split(","))
sorted_uris[-1] = sorted_uris[-1] + self.database
return ",".join(sorted_uris)

@property
Expand All @@ -332,7 +345,7 @@ def zookeeper_connected(self) -> bool:
True if ZooKeeper is currently related with sufficient relation data
for a broker to connect with. Otherwise False
"""
if not all([self.username, self.password, self.endpoints, self.chroot, self.uris]):
if not all([self.username, self.password, self.endpoints, self.database, self.uris]):
return False

return True
Expand All @@ -356,7 +369,7 @@ def broker_active(self) -> bool:
"""Checks if broker id is recognised as active by ZooKeeper."""
broker_id = self.data_interface.local_unit.name.split("/")[1]
hosts = self.endpoints.split(",")
path = f"{self.chroot}/brokers/ids/"
path = f"{self.database}/brokers/ids/"

zk = ZooKeeperManager(hosts=hosts, username=self.username, password=self.password)
try:
Expand Down
8 changes: 7 additions & 1 deletion src/events/zookeeper.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,13 @@ def __init__(self, charm) -> None:
def _on_zookeeper_created(self, _) -> None:
"""Handler for `zookeeper_relation_created` events."""
if self.model.unit.is_leader():
self.charm.state.zookeeper.update({"chroot": "/" + self.model.app.name})
self.charm.state.zookeeper.update(
{
"database": "/" + self.model.app.name,
"requested-secrets": '["username","password","tls","tls-ca","uris"]',
"chroot": "/" + self.model.app.name,
}
)

def _on_zookeeper_changed(self, event: RelationChangedEvent) -> None:
"""Handler for `zookeeper_relation_created/joined/changed` events, ensuring internal users get created."""
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/ha/ha_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ def is_up(ops_test: OpsTest, broker_id: int) -> bool:
ops_test=ops_test, owner=ZK, unit_name=f"{APP_NAME}/0"
)
active_brokers = get_active_brokers(config=kafka_zk_relation_data)
chroot = kafka_zk_relation_data.get("chroot", "")
chroot = kafka_zk_relation_data.get("database", kafka_zk_relation_data.get("chroot", ""))
return f"{chroot}/brokers/ids/{broker_id}" in active_brokers


Expand Down
2 changes: 1 addition & 1 deletion tests/integration/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,7 @@ def get_active_brokers(config: Dict) -> Set[str]:
Returns:
Set of active broker ids
"""
chroot = config.get("chroot", "")
chroot = config.get("database", config.get("chroot", ""))
hosts = config.get("endpoints", "").split(",")
username = config.get("username", "")
password = config.get("password", "")
Expand Down
4 changes: 2 additions & 2 deletions tests/integration/test_scaling.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ async def test_kafka_simple_scale_up(ops_test: OpsTest, kafka_charm):
)

active_brokers = get_active_brokers(config=kafka_zk_relation_data)
chroot = kafka_zk_relation_data.get("chroot", "")
chroot = kafka_zk_relation_data.get("database", kafka_zk_relation_data.get("chroot", ""))

assert f"{chroot}/brokers/ids/0" in active_brokers
assert f"{chroot}/brokers/ids/1" in active_brokers
Expand All @@ -72,7 +72,7 @@ async def test_kafka_simple_scale_down(ops_test: OpsTest):
)

active_brokers = get_active_brokers(config=kafka_zk_relation_data)
chroot = kafka_zk_relation_data.get("chroot", "")
chroot = kafka_zk_relation_data.get("database", kafka_zk_relation_data.get("chroot", ""))

assert f"{chroot}/brokers/ids/0" in active_brokers
assert f"{chroot}/brokers/ids/1" not in active_brokers
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/test_tls.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ async def test_kafka_tls_scaling(ops_test: OpsTest):
owner=ZK,
)
active_brokers = get_active_brokers(config=kafka_zk_relation_data)
chroot = kafka_zk_relation_data.get("chroot", "")
chroot = kafka_zk_relation_data.get("database", kafka_zk_relation_data.get("chroot", ""))
assert f"{chroot}/brokers/ids/0" in active_brokers
assert f"{chroot}/brokers/ids/1" in active_brokers
assert f"{chroot}/brokers/ids/2" in active_brokers
Expand Down
1 change: 1 addition & 0 deletions tests/unit/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ def zk_data() -> dict[str, str]:
"username": "glorfindel",
"password": "mellon",
"endpoints": "10.10.10.10",
"database": "/kafka",
"chroot": "/kafka",
"uris": "10.10.10.10:2181",
"tls": "disabled",
Expand Down
5 changes: 2 additions & 3 deletions tests/unit/test_charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -501,9 +501,8 @@ def test_zookeeper_joined_sets_chroot(harness: Harness):
zk_rel_id = harness.add_relation(ZK, ZK)
harness.add_relation_unit(zk_rel_id, f"{ZK}/0")

assert CHARM_KEY in harness.charm.model.relations[ZK][0].data[harness.charm.app].get(
"chroot", ""
)
rel = harness.charm.model.relations[ZK][0].data[harness.charm.app]
assert CHARM_KEY in rel.get("database", rel.get("chroot", ""))


def test_zookeeper_broken_stops_service_and_removes_meta_properties(harness: Harness):
Expand Down
9 changes: 9 additions & 0 deletions tests/unit/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ def test_log_dirs_in_server_properties(harness: Harness):
zk_relation_id,
harness.charm.app.name,
{
"database": "/kafka",
"chroot": "/kafka",
"username": "moria",
"password": "mellon",
Expand Down Expand Up @@ -119,6 +120,7 @@ def test_listeners_in_server_properties(harness: Harness):
zk_relation_id,
harness.charm.app.name,
{
"database": "/kafka",
"chroot": "/kafka",
"username": "moria",
"password": "mellon",
Expand Down Expand Up @@ -162,6 +164,7 @@ def test_ssl_listeners_in_server_properties(harness: Harness):
zk_relation_id,
harness.charm.app.name,
{
"database": "/kafka",
"chroot": "/kafka",
"username": "moria",
"password": "mellon",
Expand Down Expand Up @@ -206,6 +209,7 @@ def test_zookeeper_config_succeeds_fails_config(harness: Harness):
zk_relation_id,
harness.charm.app.name,
{
"database": "/kafka",
"chroot": "/kafka",
"username": "moria",
"endpoints": "1.1.1.1,2.2.2.2",
Expand All @@ -223,6 +227,7 @@ def test_zookeeper_config_succeeds_valid_config(harness: Harness):
zk_relation_id,
harness.charm.app.name,
{
"database": "/kafka",
"chroot": "/kafka",
"username": "moria",
"password": "mellon",
Expand Down Expand Up @@ -336,6 +341,7 @@ def test_ssl_principal_mapping_rules(harness: Harness):
zk_relation_id,
harness.charm.app.name,
{
"database": "/kafka",
"chroot": "/kafka",
"username": "moria",
"password": "mellon",
Expand Down Expand Up @@ -376,6 +382,7 @@ def test_auth_properties(harness: Harness):
zk_relation_id,
harness.charm.app.name,
{
"database": "/kafka",
"chroot": "/kafka",
"username": "moria",
"password": "mellon",
Expand All @@ -400,6 +407,7 @@ def test_rack_properties(harness: Harness):
zk_relation_id,
harness.charm.app.name,
{
"database": "/kafka",
"chroot": "/kafka",
"username": "moria",
"password": "mellon",
Expand Down Expand Up @@ -427,6 +435,7 @@ def test_inter_broker_protocol_version(harness: Harness):
zk_relation_id,
harness.charm.app.name,
{
"database": "/kafka",
"chroot": "/kafka",
"username": "moria",
"password": "mellon",
Expand Down
1 change: 1 addition & 0 deletions tests/unit/test_tls.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ def harness():
zk_relation_id,
harness.charm.app.name,
{
"database": "/kafka",
"chroot": "/kafka",
"username": "moria",
"password": "mellon",
Expand Down

0 comments on commit 47c661c

Please sign in to comment.