Skip to content

Commit

Permalink
Set of consumer-prefix-group from relation data (#67)
Browse files Browse the repository at this point in the history
  • Loading branch information
welpaolo authored Mar 8, 2023
1 parent acd6b7c commit 16dd4d9
Show file tree
Hide file tree
Showing 7 changed files with 198 additions and 53 deletions.
115 changes: 89 additions & 26 deletions lib/charms/data_platform_libs/v0/data_interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,6 @@ def _on_topic_requested(self, event: TopicRequestedEvent):

import json
import logging
import os
from abc import ABC, abstractmethod
from collections import namedtuple
from datetime import datetime
Expand All @@ -304,7 +303,9 @@ def _on_topic_requested(self, event: TopicRequestedEvent):

# Increment this PATCH version before using `charmcraft publish-lib` or reset
# to 0 if you are raising the major API version
LIBPATCH = 5
LIBPATCH = 8

PYDEPS = ["ops>=2.0.0"]

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -501,12 +502,6 @@ def fetch_relation_data(self) -> dict:
a dict of the values stored in the relation data bag
for all relation instances (indexed by the relation ID).
"""
if "-relation-broken" in os.environ.get("JUJU_HOOK_NAME", ""):
# read more in https://bugs.launchpad.net/juju/+bug/1960934
raise RuntimeError(
"`fetch_relation_data` cannot be used in `*-relation-broken` events"
)

data = {}
for relation in self.relations:
data[relation.id] = {
Expand Down Expand Up @@ -544,7 +539,19 @@ def _diff(self, event: RelationChangedEvent) -> Diff:
@property
def relations(self) -> List[Relation]:
"""The list of Relation instances associated with this relation_name."""
return list(self.charm.model.relations[self.relation_name])
return [
relation
for relation in self.charm.model.relations[self.relation_name]
if self._is_relation_active(relation)
]

@staticmethod
def _is_relation_active(relation: Relation):
try:
_ = repr(relation.data)
return True
except RuntimeError:
return False

@staticmethod
def _is_resource_created_for_relation(relation: Relation):
Expand All @@ -564,12 +571,28 @@ def is_resource_created(self, relation_id: Optional[int] = None) -> bool:
Returns:
True or False
Raises:
IndexError: If relation_id is provided but that relation does not exist
"""
if relation_id:
return self._is_resource_created_for_relation(self.relations[relation_id])
if relation_id is not None:
try:
relation = [relation for relation in self.relations if relation.id == relation_id][
0
]
return self._is_resource_created_for_relation(relation)
except IndexError:
raise IndexError(f"relation id {relation_id} cannot be accessed")
else:
return all(
[self._is_resource_created_for_relation(relation) for relation in self.relations]
return (
all(
[
self._is_resource_created_for_relation(relation)
for relation in self.relations
]
)
if self.relations
else False
)


Expand Down Expand Up @@ -637,6 +660,11 @@ class DatabaseProvidesEvents(CharmEvents):
class DatabaseRequiresEvent(RelationEvent):
"""Base class for database events."""

@property
def database(self) -> Optional[str]:
"""Returns the database name."""
return self.relation.data[self.relation.app].get("database")

@property
def endpoints(self) -> Optional[str]:
"""Returns a comma separated list of read/write endpoints."""
Expand Down Expand Up @@ -720,6 +748,18 @@ def _on_relation_changed(self, event: RelationChangedEvent) -> None:
if "database" in diff.added:
self.on.database_requested.emit(event.relation, app=event.app, unit=event.unit)

def set_database(self, relation_id: int, database_name: str) -> None:
"""Set database name.
This function writes in the application data bag, therefore,
only the leader unit can call it.
Args:
relation_id: the identifier for a particular relation.
database_name: database name.
"""
self._update_relation_data(relation_id, {"database": database_name})

def set_endpoints(self, relation_id: int, connection_strings: str) -> None:
"""Set database primary connections.
Expand Down Expand Up @@ -950,6 +990,11 @@ def topic(self) -> Optional[str]:
"""Returns the topic that was requested."""
return self.relation.data[self.relation.app].get("topic")

@property
def consumer_group_prefix(self) -> Optional[str]:
"""Returns the consumer-group-prefix that was requested."""
return self.relation.data[self.relation.app].get("consumer-group-prefix")


class TopicRequestedEvent(KafkaProvidesEvent, ExtraRoleEvent):
"""Event emitted when a new topic is requested for use on this relation."""
Expand All @@ -967,6 +1012,11 @@ class KafkaProvidesEvents(CharmEvents):
class KafkaRequiresEvent(RelationEvent):
"""Base class for Kafka events."""

@property
def topic(self) -> Optional[str]:
"""Returns the topic."""
return self.relation.data[self.relation.app].get("topic")

@property
def bootstrap_server(self) -> Optional[str]:
"""Returns a a comma-seperated list of broker uris."""
Expand Down Expand Up @@ -1026,6 +1076,15 @@ def _on_relation_changed(self, event: RelationChangedEvent) -> None:
if "topic" in diff.added:
self.on.topic_requested.emit(event.relation, app=event.app, unit=event.unit)

def set_topic(self, relation_id: int, topic: str) -> None:
"""Set topic name in the application relation databag.
Args:
relation_id: the identifier for a particular relation.
topic: the topic name.
"""
self._update_relation_data(relation_id, {"topic": topic})

def set_bootstrap_server(self, relation_id: int, bootstrap_server: str) -> None:
"""Set the bootstrap server in the application relation databag.
Expand Down Expand Up @@ -1059,26 +1118,30 @@ class KafkaRequires(DataRequires):

on = KafkaRequiresEvents()

def __init__(self, charm, relation_name: str, topic: str, extra_user_roles: str = None):
def __init__(
self,
charm,
relation_name: str,
topic: str,
extra_user_roles: Optional[str] = None,
consumer_group_prefix: Optional[str] = None,
):
"""Manager of Kafka client relations."""
# super().__init__(charm, relation_name)
super().__init__(charm, relation_name, extra_user_roles)
self.charm = charm
self.topic = topic
self.consumer_group_prefix = consumer_group_prefix or ""

def _on_relation_joined_event(self, event: RelationJoinedEvent) -> None:
"""Event emitted when the application joins the Kafka relation."""
# Sets both topic and extra user roles in the relation
# if the roles are provided. Otherwise, sets only the topic.
self._update_relation_data(
event.relation.id,
{
"topic": self.topic,
"extra-user-roles": self.extra_user_roles,
}
if self.extra_user_roles is not None
else {"topic": self.topic},
)
# Sets topic, extra user roles, and "consumer-group-prefix" in the relation
relation_data = {
f: getattr(self, f.replace("-", "_"), "")
for f in ["consumer-group-prefix", "extra-user-roles", "topic"]
}

self._update_relation_data(event.relation.id, relation_data)

def _on_relation_changed_event(self, event: RelationChangedEvent) -> None:
"""Event emitted when the Kafka relation has changed."""
Expand All @@ -1104,4 +1167,4 @@ def _on_relation_changed_event(self, event: RelationChangedEvent) -> None:
self.on.bootstrap_server_changed.emit(
event.relation, app=event.app, unit=event.unit
) # here check if this is the right design
return
return
5 changes: 4 additions & 1 deletion src/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,9 @@ def on_topic_requested(self, event: TopicRequestedEvent):
zookeeper_uris = self.charm.kafka_config.zookeeper_config.get("connect", "")
tls = "enabled" if self.charm.tls.enabled else "disabled"

consumer_group_prefix = f"{username}-" if "consumer" in extra_user_roles else ""
consumer_group_prefix = (
event.consumer_group_prefix or f"{username}-" if "consumer" in extra_user_roles else ""
)

self.kafka_auth.add_user(
username=username,
Expand Down Expand Up @@ -96,6 +98,7 @@ def on_topic_requested(self, event: TopicRequestedEvent):
self.kafka_provider.set_credentials(relation.id, username, password)
self.kafka_provider.set_tls(relation.id, tls)
self.kafka_provider.set_zookeeper_uris(relation.id, zookeeper_uris)
self.kafka_provider.set_topic(relation.id, topic)

@property
def peer_relation(self) -> Relation:
Expand Down
Loading

0 comments on commit 16dd4d9

Please sign in to comment.