From 6d610988833c02993fc18c2a03090517809fa8de Mon Sep 17 00:00:00 2001 From: Kevin Kreitner Date: Sat, 18 Jan 2020 18:24:51 +0100 Subject: [PATCH 1/5] Always apply whole config dict --- esque/controller/topic_controller.py | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/esque/controller/topic_controller.py b/esque/controller/topic_controller.py index f361bc08..87ec8056 100644 --- a/esque/controller/topic_controller.py +++ b/esque/controller/topic_controller.py @@ -96,10 +96,22 @@ def create_topics(self, topics: List[Topic]): @invalidate_cache_after def alter_configs(self, topics: List[Topic]): for topic in topics: - config_resource = ConfigResource(ConfigResource.Type.TOPIC, topic.name, topic.config) + altered_config = self._get_altered_config(topic) + config_resource = ConfigResource(ConfigResource.Type.TOPIC, topic.name, altered_config) future_list = self.cluster.confluent_client.alter_configs([config_resource]) ensure_kafka_future_done(next(islice(future_list.values(), 1))) + def _get_altered_config(self, topic) -> Dict[str, str]: + cluster_topic = self.get_cluster_topic(topic.name) + current_config = cluster_topic.config.items() + altered_config = {} + for name, value in current_config: + if name in topic.config: + altered_config[name] = topic.config[name] + continue + altered_config[name] = value + return altered_config + @invalidate_cache_after def delete_topic(self, topic: Topic): future = self.cluster.confluent_client.delete_topics([topic.name])[topic.name] @@ -128,7 +140,7 @@ def get_offsets_closest_to_timestamp( topic_partition.partition: topic_partition.offset for topic_partition in topic_partitions_with_new_offsets } - def update_from_cluster(self, topic: Topic): + def update_from_cluster(self, topic: Topic) -> Topic: """Takes a topic and, based on its name, updates all attributes from the cluster""" confluent_topic: ConfluentTopic = self._get_client_topic(topic.name, ClientTypes.Confluent) From 7567821c020ac2717e05f8f6bbaceea7564554c9 Mon Sep 17 00:00:00 2001 From: Kevin Kreitner Date: Tue, 4 Feb 2020 21:32:40 +0100 Subject: [PATCH 2/5] Add test for altering config --- esque/controller/topic_controller.py | 2 +- tests/integration/test_topic_controller.py | 18 ++++++++++++++++++ 2 files changed, 19 insertions(+), 1 deletion(-) diff --git a/esque/controller/topic_controller.py b/esque/controller/topic_controller.py index 87ec8056..c3f411da 100644 --- a/esque/controller/topic_controller.py +++ b/esque/controller/topic_controller.py @@ -101,7 +101,7 @@ def alter_configs(self, topics: List[Topic]): future_list = self.cluster.confluent_client.alter_configs([config_resource]) ensure_kafka_future_done(next(islice(future_list.values(), 1))) - def _get_altered_config(self, topic) -> Dict[str, str]: + def _get_altered_config(self, topic: Topic) -> Dict[str, str]: cluster_topic = self.get_cluster_topic(topic.name) current_config = cluster_topic.config.items() altered_config = {} diff --git a/tests/integration/test_topic_controller.py b/tests/integration/test_topic_controller.py index 34e17beb..8a5f40bb 100644 --- a/tests/integration/test_topic_controller.py +++ b/tests/integration/test_topic_controller.py @@ -54,6 +54,24 @@ def test_alter_topic_config_works(topic_controller: TopicController, topic_id: s final_config = after_changes_applied_topic.config assert final_config.get("cleanup.policy") == "compact" +@pytest.mark.integration +def test_alter_topic_config_only_change_mentioned_attributes(topic_controller: TopicController, topic_id: str): + initial_topic = Topic(topic_id, config={"cleanup.policy": "delete", "min.compaction.lag": "1000000"}) + + topic_controller.create_topics([initial_topic]) + topic_controller.update_from_cluster(initial_topic) + config = initial_topic.config + assert config.get("cleanup.policy") == "delete" + assert config.get("min.compaction.lag") == "1000000" + change_topic = Topic(topic_id, config={"cleanup.policy": "compact"}) + topic_controller.alter_configs([change_topic]) + topic_controller.update_from_cluster(change_topic) + after_changes_applied_topic = topic_controller.get_cluster_topic(topic_id) + + final_config = after_changes_applied_topic.config + assert final_config.get("cleanup.policy") == "compact" + assert final_config.get("min.compaction.lag") == "1000000" + @pytest.mark.integration def test_topic_listing_works(topic_controller: TopicController, topic: str): From 127da5ed8a85abd94600faa559c211a216d89b85 Mon Sep 17 00:00:00 2001 From: Kevin Kreitner Date: Tue, 4 Feb 2020 21:34:17 +0100 Subject: [PATCH 3/5] Update test method name --- tests/integration/test_topic_controller.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/test_topic_controller.py b/tests/integration/test_topic_controller.py index 8a5f40bb..006aba2e 100644 --- a/tests/integration/test_topic_controller.py +++ b/tests/integration/test_topic_controller.py @@ -55,7 +55,7 @@ def test_alter_topic_config_works(topic_controller: TopicController, topic_id: s assert final_config.get("cleanup.policy") == "compact" @pytest.mark.integration -def test_alter_topic_config_only_change_mentioned_attributes(topic_controller: TopicController, topic_id: str): +def test_alter_topic_config_only_changes_mentioned_attributes(topic_controller: TopicController, topic_id: str): initial_topic = Topic(topic_id, config={"cleanup.policy": "delete", "min.compaction.lag": "1000000"}) topic_controller.create_topics([initial_topic]) From a52c5338db973352a9cfcc87e8d34c710f935099 Mon Sep 17 00:00:00 2001 From: Kevin Kreitner Date: Tue, 4 Feb 2020 21:37:17 +0100 Subject: [PATCH 4/5] Fix coding styles --- tests/integration/test_topic_controller.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/integration/test_topic_controller.py b/tests/integration/test_topic_controller.py index 006aba2e..71ee5e30 100644 --- a/tests/integration/test_topic_controller.py +++ b/tests/integration/test_topic_controller.py @@ -54,6 +54,7 @@ def test_alter_topic_config_works(topic_controller: TopicController, topic_id: s final_config = after_changes_applied_topic.config assert final_config.get("cleanup.policy") == "compact" + @pytest.mark.integration def test_alter_topic_config_only_changes_mentioned_attributes(topic_controller: TopicController, topic_id: str): initial_topic = Topic(topic_id, config={"cleanup.policy": "delete", "min.compaction.lag": "1000000"}) From 67965a7ca329ebc32b8c5423b45b88ff5d1a5088 Mon Sep 17 00:00:00 2001 From: Swen Wenzel <5111028+swenzel@users.noreply.github.com> Date: Wed, 5 Feb 2020 08:04:05 +0100 Subject: [PATCH 5/5] Apply suggestions from code review fix config variable name --- tests/integration/test_topic_controller.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/integration/test_topic_controller.py b/tests/integration/test_topic_controller.py index 71ee5e30..8a4d3a60 100644 --- a/tests/integration/test_topic_controller.py +++ b/tests/integration/test_topic_controller.py @@ -57,13 +57,13 @@ def test_alter_topic_config_works(topic_controller: TopicController, topic_id: s @pytest.mark.integration def test_alter_topic_config_only_changes_mentioned_attributes(topic_controller: TopicController, topic_id: str): - initial_topic = Topic(topic_id, config={"cleanup.policy": "delete", "min.compaction.lag": "1000000"}) + initial_topic = Topic(topic_id, config={"cleanup.policy": "delete", "min.compaction.lag.ms": "1000000"}) topic_controller.create_topics([initial_topic]) topic_controller.update_from_cluster(initial_topic) config = initial_topic.config assert config.get("cleanup.policy") == "delete" - assert config.get("min.compaction.lag") == "1000000" + assert config.get("min.compaction.lag.ms") == "1000000" change_topic = Topic(topic_id, config={"cleanup.policy": "compact"}) topic_controller.alter_configs([change_topic]) topic_controller.update_from_cluster(change_topic) @@ -71,7 +71,7 @@ def test_alter_topic_config_only_changes_mentioned_attributes(topic_controller: final_config = after_changes_applied_topic.config assert final_config.get("cleanup.policy") == "compact" - assert final_config.get("min.compaction.lag") == "1000000" + assert final_config.get("min.compaction.lag.ms") == "1000000" @pytest.mark.integration