diff --git a/lib/charms/data_platform_libs/v0/data_interfaces.py b/lib/charms/data_platform_libs/v0/data_interfaces.py index df3fdd2b..9f72626b 100644 --- a/lib/charms/data_platform_libs/v0/data_interfaces.py +++ b/lib/charms/data_platform_libs/v0/data_interfaces.py @@ -280,7 +280,6 @@ def _on_topic_requested(self, event: TopicRequestedEvent): import json import logging -import os from abc import ABC, abstractmethod from collections import namedtuple from datetime import datetime @@ -304,7 +303,9 @@ def _on_topic_requested(self, event: TopicRequestedEvent): # Increment this PATCH version before using `charmcraft publish-lib` or reset # to 0 if you are raising the major API version -LIBPATCH = 5 +LIBPATCH = 8 + +PYDEPS = ["ops>=2.0.0"] logger = logging.getLogger(__name__) @@ -501,12 +502,6 @@ def fetch_relation_data(self) -> dict: a dict of the values stored in the relation data bag for all relation instances (indexed by the relation ID). """ - if "-relation-broken" in os.environ.get("JUJU_HOOK_NAME", ""): - # read more in https://bugs.launchpad.net/juju/+bug/1960934 - raise RuntimeError( - "`fetch_relation_data` cannot be used in `*-relation-broken` events" - ) - data = {} for relation in self.relations: data[relation.id] = { @@ -544,7 +539,19 @@ def _diff(self, event: RelationChangedEvent) -> Diff: @property def relations(self) -> List[Relation]: """The list of Relation instances associated with this relation_name.""" - return list(self.charm.model.relations[self.relation_name]) + return [ + relation + for relation in self.charm.model.relations[self.relation_name] + if self._is_relation_active(relation) + ] + + @staticmethod + def _is_relation_active(relation: Relation): + try: + _ = repr(relation.data) + return True + except RuntimeError: + return False @staticmethod def _is_resource_created_for_relation(relation: Relation): @@ -564,12 +571,28 @@ def is_resource_created(self, relation_id: Optional[int] = None) -> bool: Returns: True or False + + Raises: + IndexError: If relation_id is provided but that relation does not exist """ - if relation_id: - return self._is_resource_created_for_relation(self.relations[relation_id]) + if relation_id is not None: + try: + relation = [relation for relation in self.relations if relation.id == relation_id][ + 0 + ] + return self._is_resource_created_for_relation(relation) + except IndexError: + raise IndexError(f"relation id {relation_id} cannot be accessed") else: - return all( - [self._is_resource_created_for_relation(relation) for relation in self.relations] + return ( + all( + [ + self._is_resource_created_for_relation(relation) + for relation in self.relations + ] + ) + if self.relations + else False ) @@ -637,6 +660,11 @@ class DatabaseProvidesEvents(CharmEvents): class DatabaseRequiresEvent(RelationEvent): """Base class for database events.""" + @property + def database(self) -> Optional[str]: + """Returns the database name.""" + return self.relation.data[self.relation.app].get("database") + @property def endpoints(self) -> Optional[str]: """Returns a comma separated list of read/write endpoints.""" @@ -720,6 +748,18 @@ def _on_relation_changed(self, event: RelationChangedEvent) -> None: if "database" in diff.added: self.on.database_requested.emit(event.relation, app=event.app, unit=event.unit) + def set_database(self, relation_id: int, database_name: str) -> None: + """Set database name. + + This function writes in the application data bag, therefore, + only the leader unit can call it. + + Args: + relation_id: the identifier for a particular relation. + database_name: database name. + """ + self._update_relation_data(relation_id, {"database": database_name}) + def set_endpoints(self, relation_id: int, connection_strings: str) -> None: """Set database primary connections. @@ -950,6 +990,11 @@ def topic(self) -> Optional[str]: """Returns the topic that was requested.""" return self.relation.data[self.relation.app].get("topic") + @property + def consumer_group_prefix(self) -> Optional[str]: + """Returns the consumer-group-prefix that was requested.""" + return self.relation.data[self.relation.app].get("consumer-group-prefix") + class TopicRequestedEvent(KafkaProvidesEvent, ExtraRoleEvent): """Event emitted when a new topic is requested for use on this relation.""" @@ -967,6 +1012,11 @@ class KafkaProvidesEvents(CharmEvents): class KafkaRequiresEvent(RelationEvent): """Base class for Kafka events.""" + @property + def topic(self) -> Optional[str]: + """Returns the topic.""" + return self.relation.data[self.relation.app].get("topic") + @property def bootstrap_server(self) -> Optional[str]: """Returns a a comma-seperated list of broker uris.""" @@ -1026,6 +1076,15 @@ def _on_relation_changed(self, event: RelationChangedEvent) -> None: if "topic" in diff.added: self.on.topic_requested.emit(event.relation, app=event.app, unit=event.unit) + def set_topic(self, relation_id: int, topic: str) -> None: + """Set topic name in the application relation databag. + + Args: + relation_id: the identifier for a particular relation. + topic: the topic name. + """ + self._update_relation_data(relation_id, {"topic": topic}) + def set_bootstrap_server(self, relation_id: int, bootstrap_server: str) -> None: """Set the bootstrap server in the application relation databag. @@ -1059,26 +1118,30 @@ class KafkaRequires(DataRequires): on = KafkaRequiresEvents() - def __init__(self, charm, relation_name: str, topic: str, extra_user_roles: str = None): + def __init__( + self, + charm, + relation_name: str, + topic: str, + extra_user_roles: Optional[str] = None, + consumer_group_prefix: Optional[str] = None, + ): """Manager of Kafka client relations.""" # super().__init__(charm, relation_name) super().__init__(charm, relation_name, extra_user_roles) self.charm = charm self.topic = topic + self.consumer_group_prefix = consumer_group_prefix or "" def _on_relation_joined_event(self, event: RelationJoinedEvent) -> None: """Event emitted when the application joins the Kafka relation.""" - # Sets both topic and extra user roles in the relation - # if the roles are provided. Otherwise, sets only the topic. - self._update_relation_data( - event.relation.id, - { - "topic": self.topic, - "extra-user-roles": self.extra_user_roles, - } - if self.extra_user_roles is not None - else {"topic": self.topic}, - ) + # Sets topic, extra user roles, and "consumer-group-prefix" in the relation + relation_data = { + f: getattr(self, f.replace("-", "_"), "") + for f in ["consumer-group-prefix", "extra-user-roles", "topic"] + } + + self._update_relation_data(event.relation.id, relation_data) def _on_relation_changed_event(self, event: RelationChangedEvent) -> None: """Event emitted when the Kafka relation has changed.""" @@ -1104,4 +1167,4 @@ def _on_relation_changed_event(self, event: RelationChangedEvent) -> None: self.on.bootstrap_server_changed.emit( event.relation, app=event.app, unit=event.unit ) # here check if this is the right design - return \ No newline at end of file + return diff --git a/src/provider.py b/src/provider.py index 1a579ddb..9178aaa2 100644 --- a/src/provider.py +++ b/src/provider.py @@ -67,7 +67,9 @@ def on_topic_requested(self, event: TopicRequestedEvent): zookeeper_uris = self.charm.kafka_config.zookeeper_config.get("connect", "") tls = "enabled" if self.charm.tls.enabled else "disabled" - consumer_group_prefix = f"{username}-" if "consumer" in extra_user_roles else "" + consumer_group_prefix = ( + event.consumer_group_prefix or f"{username}-" if "consumer" in extra_user_roles else "" + ) self.kafka_auth.add_user( username=username, @@ -96,6 +98,7 @@ def on_topic_requested(self, event: TopicRequestedEvent): self.kafka_provider.set_credentials(relation.id, username, password) self.kafka_provider.set_tls(relation.id, tls) self.kafka_provider.set_zookeeper_uris(relation.id, zookeeper_uris) + self.kafka_provider.set_topic(relation.id, topic) @property def peer_relation(self) -> Relation: diff --git a/tests/integration/app-charm/lib/charms/data_platform_libs/v0/data_interfaces.py b/tests/integration/app-charm/lib/charms/data_platform_libs/v0/data_interfaces.py index 0dab99ac..9f72626b 100644 --- a/tests/integration/app-charm/lib/charms/data_platform_libs/v0/data_interfaces.py +++ b/tests/integration/app-charm/lib/charms/data_platform_libs/v0/data_interfaces.py @@ -280,7 +280,6 @@ def _on_topic_requested(self, event: TopicRequestedEvent): import json import logging -import os from abc import ABC, abstractmethod from collections import namedtuple from datetime import datetime @@ -304,7 +303,9 @@ def _on_topic_requested(self, event: TopicRequestedEvent): # Increment this PATCH version before using `charmcraft publish-lib` or reset # to 0 if you are raising the major API version -LIBPATCH = 5 +LIBPATCH = 8 + +PYDEPS = ["ops>=2.0.0"] logger = logging.getLogger(__name__) @@ -501,12 +502,6 @@ def fetch_relation_data(self) -> dict: a dict of the values stored in the relation data bag for all relation instances (indexed by the relation ID). """ - if "-relation-broken" in os.environ.get("JUJU_HOOK_NAME", ""): - # read more in https://bugs.launchpad.net/juju/+bug/1960934 - raise RuntimeError( - "`fetch_relation_data` cannot be used in `*-relation-broken` events" - ) - data = {} for relation in self.relations: data[relation.id] = { @@ -544,7 +539,19 @@ def _diff(self, event: RelationChangedEvent) -> Diff: @property def relations(self) -> List[Relation]: """The list of Relation instances associated with this relation_name.""" - return list(self.charm.model.relations[self.relation_name]) + return [ + relation + for relation in self.charm.model.relations[self.relation_name] + if self._is_relation_active(relation) + ] + + @staticmethod + def _is_relation_active(relation: Relation): + try: + _ = repr(relation.data) + return True + except RuntimeError: + return False @staticmethod def _is_resource_created_for_relation(relation: Relation): @@ -564,12 +571,28 @@ def is_resource_created(self, relation_id: Optional[int] = None) -> bool: Returns: True or False + + Raises: + IndexError: If relation_id is provided but that relation does not exist """ - if relation_id: - return self._is_resource_created_for_relation(self.relations[relation_id]) + if relation_id is not None: + try: + relation = [relation for relation in self.relations if relation.id == relation_id][ + 0 + ] + return self._is_resource_created_for_relation(relation) + except IndexError: + raise IndexError(f"relation id {relation_id} cannot be accessed") else: - return all( - [self._is_resource_created_for_relation(relation) for relation in self.relations] + return ( + all( + [ + self._is_resource_created_for_relation(relation) + for relation in self.relations + ] + ) + if self.relations + else False ) @@ -637,6 +660,11 @@ class DatabaseProvidesEvents(CharmEvents): class DatabaseRequiresEvent(RelationEvent): """Base class for database events.""" + @property + def database(self) -> Optional[str]: + """Returns the database name.""" + return self.relation.data[self.relation.app].get("database") + @property def endpoints(self) -> Optional[str]: """Returns a comma separated list of read/write endpoints.""" @@ -720,6 +748,18 @@ def _on_relation_changed(self, event: RelationChangedEvent) -> None: if "database" in diff.added: self.on.database_requested.emit(event.relation, app=event.app, unit=event.unit) + def set_database(self, relation_id: int, database_name: str) -> None: + """Set database name. + + This function writes in the application data bag, therefore, + only the leader unit can call it. + + Args: + relation_id: the identifier for a particular relation. + database_name: database name. + """ + self._update_relation_data(relation_id, {"database": database_name}) + def set_endpoints(self, relation_id: int, connection_strings: str) -> None: """Set database primary connections. @@ -950,6 +990,11 @@ def topic(self) -> Optional[str]: """Returns the topic that was requested.""" return self.relation.data[self.relation.app].get("topic") + @property + def consumer_group_prefix(self) -> Optional[str]: + """Returns the consumer-group-prefix that was requested.""" + return self.relation.data[self.relation.app].get("consumer-group-prefix") + class TopicRequestedEvent(KafkaProvidesEvent, ExtraRoleEvent): """Event emitted when a new topic is requested for use on this relation.""" @@ -967,6 +1012,11 @@ class KafkaProvidesEvents(CharmEvents): class KafkaRequiresEvent(RelationEvent): """Base class for Kafka events.""" + @property + def topic(self) -> Optional[str]: + """Returns the topic.""" + return self.relation.data[self.relation.app].get("topic") + @property def bootstrap_server(self) -> Optional[str]: """Returns a a comma-seperated list of broker uris.""" @@ -1026,6 +1076,15 @@ def _on_relation_changed(self, event: RelationChangedEvent) -> None: if "topic" in diff.added: self.on.topic_requested.emit(event.relation, app=event.app, unit=event.unit) + def set_topic(self, relation_id: int, topic: str) -> None: + """Set topic name in the application relation databag. + + Args: + relation_id: the identifier for a particular relation. + topic: the topic name. + """ + self._update_relation_data(relation_id, {"topic": topic}) + def set_bootstrap_server(self, relation_id: int, bootstrap_server: str) -> None: """Set the bootstrap server in the application relation databag. @@ -1059,26 +1118,30 @@ class KafkaRequires(DataRequires): on = KafkaRequiresEvents() - def __init__(self, charm, relation_name: str, topic: str, extra_user_roles: str = None): + def __init__( + self, + charm, + relation_name: str, + topic: str, + extra_user_roles: Optional[str] = None, + consumer_group_prefix: Optional[str] = None, + ): """Manager of Kafka client relations.""" # super().__init__(charm, relation_name) super().__init__(charm, relation_name, extra_user_roles) self.charm = charm self.topic = topic + self.consumer_group_prefix = consumer_group_prefix or "" def _on_relation_joined_event(self, event: RelationJoinedEvent) -> None: """Event emitted when the application joins the Kafka relation.""" - # Sets both topic and extra user roles in the relation - # if the roles are provided. Otherwise, sets only the topic. - self._update_relation_data( - event.relation.id, - { - "topic": self.topic, - "extra-user-roles": self.extra_user_roles, - } - if self.extra_user_roles is not None - else {"topic": self.topic}, - ) + # Sets topic, extra user roles, and "consumer-group-prefix" in the relation + relation_data = { + f: getattr(self, f.replace("-", "_"), "") + for f in ["consumer-group-prefix", "extra-user-roles", "topic"] + } + + self._update_relation_data(event.relation.id, relation_data) def _on_relation_changed_event(self, event: RelationChangedEvent) -> None: """Event emitted when the Kafka relation has changed.""" diff --git a/tests/integration/app-charm/src/charm.py b/tests/integration/app-charm/src/charm.py index aad8f645..458e87df 100755 --- a/tests/integration/app-charm/src/charm.py +++ b/tests/integration/app-charm/src/charm.py @@ -24,6 +24,7 @@ REL_NAME_PRODUCER = "kafka-client-producer" REL_NAME_ADMIN = "kafka-client-admin" ZK = "zookeeper" +CONSUMER_GROUP_PREFIX = "test-prefix" class ApplicationCharm(CharmBase): @@ -35,7 +36,11 @@ def __init__(self, *args): self.framework.observe(getattr(self.on, "start"), self._on_start) self.kafka_requirer_consumer = KafkaRequires( - self, relation_name=REL_NAME_CONSUMER, topic="test-topic", extra_user_roles="consumer" + self, + relation_name=REL_NAME_CONSUMER, + topic="test-topic", + extra_user_roles="consumer", + consumer_group_prefix=CONSUMER_GROUP_PREFIX, ) self.kafka_requirer_producer = KafkaRequires( self, relation_name=REL_NAME_PRODUCER, topic="test-topic", extra_user_roles="producer" diff --git a/tests/integration/helpers.py b/tests/integration/helpers.py index d51a7cae..03cea7a4 100644 --- a/tests/integration/helpers.py +++ b/tests/integration/helpers.py @@ -138,6 +138,11 @@ def get_provider_data( provider_relation_data["endpoints"] = info["application-data"]["endpoints"] provider_relation_data["zookeeper-uris"] = info["application-data"]["zookeeper-uris"] provider_relation_data["tls"] = info["application-data"]["tls"] + if "consumer-group-prefix" in info["application-data"]: + provider_relation_data["consumer-group-prefix"] = info["application-data"][ + "consumer-group-prefix" + ] + provider_relation_data["topic"] = info["application-data"]["topic"] return provider_relation_data diff --git a/tests/integration/test_provider.py b/tests/integration/test_provider.py index 477ccf86..bfd6dd41 100644 --- a/tests/integration/test_provider.py +++ b/tests/integration/test_provider.py @@ -75,6 +75,8 @@ async def test_deploy_charms_relate_active( assert acl.resource_type in ["GROUP", "TOPIC"] if acl.resource_type == "TOPIC": assert acl.resource_name == "test-topic" + if acl.resource_type == "GROUP": + assert acl.resource_name == "test-prefix" @pytest.mark.abort_on_fail @@ -245,6 +247,9 @@ async def test_connection_updated_on_tls_enabled(ops_test: OpsTest, app_charm): model_full_name=ops_test.model_full_name, endpoint="kafka-client-consumer", ) + assert provider_data["tls"] == "enabled" assert "9093" in provider_data["endpoints"] assert "2182" in provider_data["zookeeper-uris"] + assert "test-prefix" in provider_data["consumer-group-prefix"] + assert "test-topic" in provider_data["topic"] diff --git a/tests/unit/test_provider.py b/tests/unit/test_provider.py index 1a828ac2..9e6e57c9 100644 --- a/tests/unit/test_provider.py +++ b/tests/unit/test_provider.py @@ -160,6 +160,7 @@ def test_client_relation_joined_sets_necessary_relation_data(harness): "zookeeper-uris", "consumer-group-prefix", "tls", + "topic", ] ) == sorted(client_relation.data[harness.charm.app].keys())