From 5ae150f6dce1b82c5536d5a964bbede1ab78b690 Mon Sep 17 00:00:00 2001 From: Mateo Florido Date: Tue, 12 Nov 2024 16:42:56 -0500 Subject: [PATCH 1/6] Share Kubernetes Version Over Relation Data --- charms/worker/k8s/src/charm.py | 46 ++++++++++++++++++++++ charms/worker/k8s/src/token_distributor.py | 2 + charms/worker/k8s/tests/unit/test_base.py | 2 + 3 files changed, 50 insertions(+) diff --git a/charms/worker/k8s/src/charm.py b/charms/worker/k8s/src/charm.py index 78b48996..cccbfb04 100755 --- a/charms/worker/k8s/src/charm.py +++ b/charms/worker/k8s/src/charm.py @@ -601,6 +601,50 @@ def _get_scrape_jobs(self): log.exception("Failed to get COS token.") return [] + @on_error( + ops.WaitingStatus("Sharing Cluster Version"), + AssertionError, + ) + def _update_kubernetes_version(self): + """Update the unit Kubernetes version in the cluster relation.""" + relation_name = "cluster" + if not (relation := self.model.get_relation(relation_name)): + assert False, "Missing cluster integration" # nosec + if version := snap_version("k8s"): + relation.data[self.unit]["version"] = version + + @on_error( + ops.WaitingStatus("Announcing Kubernetes version"), + AssertionError, + ) + def _announce_kubernetes_version(self): + """Announce the Kubernetes version to the cluster. + + This method ensures that the Kubernetes version is consistent across the cluster. + """ + if not (peer := self.model.get_relation("cluster")): + assert False, "Missing cluster integration" # nosec + if not (worker := self.model.get_relation("k8s-cluster")): + assert False, "Missing cluster integration" # nosec + version = snap_version("k8s") + assert version, "k8s-snap is not installed" # nosec + + for unit in peer.units: + if unit.name == self.unit.name: + continue + if peer.data[unit].get("version") != version: + status.add(ops.BlockedStatus(f"Version mismatch with {unit.name}")) + assert False, "Version mismatch with cluster nodes" # nosec + + for unit in worker.units: + if unit.name == self.unit.name: + continue + if worker.data[unit].get("version") != version: + status.add(ops.BlockedStatus(f"Version mismatch with {unit.name}")) + assert False, "Version mismatch with cluster nodes" # nosec + + peer.data[self.app]["version"] = version + def _get_proxy_env(self) -> Dict[str, str]: """Retrieve the Juju model config proxy values. @@ -690,6 +734,7 @@ def _reconcile(self, event: ops.EventBase): self._install_snaps() self._apply_snap_requirements() self._check_k8sd_ready() + self._update_kubernetes_version() if self.lead_control_plane: self._k8s_info(event) self._bootstrap_k8s_snap() @@ -700,6 +745,7 @@ def _reconcile(self, event: ops.EventBase): self._apply_cos_requirements() self._revoke_cluster_tokens(event) self._ensure_cluster_config() + self._announce_kubernetes_version() self._join_cluster() self._config_containerd_registries() self._configure_cos_integration() diff --git a/charms/worker/k8s/src/token_distributor.py b/charms/worker/k8s/src/token_distributor.py index 170cf75b..367029aa 100644 --- a/charms/worker/k8s/src/token_distributor.py +++ b/charms/worker/k8s/src/token_distributor.py @@ -5,6 +5,7 @@ import contextlib import logging +import re from enum import Enum, auto from typing import Dict, Optional, Protocol, Union @@ -308,6 +309,7 @@ def active_nodes(self, relation: ops.Relation): return { self.charm.model.get_unit(str(u)): data for u, data in relation.data[self.charm.app].items() + if re.match(r"k8s(-worker)?/\d+", u) } def drop_node(self, relation: ops.Relation, unit: ops.Unit): diff --git a/charms/worker/k8s/tests/unit/test_base.py b/charms/worker/k8s/tests/unit/test_base.py index 0613b3ec..acff6d85 100644 --- a/charms/worker/k8s/tests/unit/test_base.py +++ b/charms/worker/k8s/tests/unit/test_base.py @@ -54,6 +54,7 @@ def mock_reconciler_handlers(harness): "_configure_cos_integration", "_update_status", "_apply_node_labels", + "_update_kubernetes_version", } if harness.charm.is_control_plane: handler_names |= { @@ -66,6 +67,7 @@ def mock_reconciler_handlers(harness): "_revoke_cluster_tokens", "_ensure_cluster_config", "_expose_ports", + "_announce_kubernetes_version", } handlers = [mock.patch(f"charm.K8sCharm.{name}") for name in handler_names] From 264243b7647e5c8606c8c2bd004429708403bf1b Mon Sep 17 00:00:00 2001 From: Mateo Florido Date: Tue, 12 Nov 2024 17:51:08 -0500 Subject: [PATCH 2/6] Compress Check Logic --- charms/worker/k8s/src/charm.py | 32 +++++++++++++++----------------- 1 file changed, 15 insertions(+), 17 deletions(-) diff --git a/charms/worker/k8s/src/charm.py b/charms/worker/k8s/src/charm.py index cccbfb04..3f50fc76 100755 --- a/charms/worker/k8s/src/charm.py +++ b/charms/worker/k8s/src/charm.py @@ -622,28 +622,26 @@ def _announce_kubernetes_version(self): This method ensures that the Kubernetes version is consistent across the cluster. """ - if not (peer := self.model.get_relation("cluster")): - assert False, "Missing cluster integration" # nosec - if not (worker := self.model.get_relation("k8s-cluster")): - assert False, "Missing cluster integration" # nosec + peer = self.model.get_relation("cluster") + worker = self.model.get_relation("k8s-cluster") + if not all([peer, worker]): + assert False, "Missing cluster integration" + version = snap_version("k8s") assert version, "k8s-snap is not installed" # nosec - for unit in peer.units: - if unit.name == self.unit.name: - continue - if peer.data[unit].get("version") != version: - status.add(ops.BlockedStatus(f"Version mismatch with {unit.name}")) - assert False, "Version mismatch with cluster nodes" # nosec - - for unit in worker.units: - if unit.name == self.unit.name: - continue - if worker.data[unit].get("version") != version: - status.add(ops.BlockedStatus(f"Version mismatch with {unit.name}")) - assert False, "Version mismatch with cluster nodes" # nosec + for relation in (peer, worker): + units = (unit for unit in relation.units if unit.name != self.unit.name) + for unit in units: + unit_version = relation.data[unit].get("version") + if not unit_version: + assert False, f"Waiting for version from {unit.name}" + if unit_version != version: + status.add(ops.BlockedStatus(f"Version mismatch with {unit.name}")) + assert False, "Version mismatch with cluster nodes" # nosec peer.data[self.app]["version"] = version + worker.data[self.app]["version"] = version def _get_proxy_env(self) -> Dict[str, str]: """Retrieve the Juju model config proxy values. From ce65a3f545882a30fd922481dda5097231766a2e Mon Sep 17 00:00:00 2001 From: Mateo Florido Date: Wed, 13 Nov 2024 08:21:02 -0500 Subject: [PATCH 3/6] Refactor Check Logic --- charms/worker/k8s/src/charm.py | 12 ++++-------- charms/worker/k8s/src/token_distributor.py | 4 ++++ 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/charms/worker/k8s/src/charm.py b/charms/worker/k8s/src/charm.py index 3f50fc76..d6305487 100755 --- a/charms/worker/k8s/src/charm.py +++ b/charms/worker/k8s/src/charm.py @@ -607,8 +607,7 @@ def _get_scrape_jobs(self): ) def _update_kubernetes_version(self): """Update the unit Kubernetes version in the cluster relation.""" - relation_name = "cluster" - if not (relation := self.model.get_relation(relation_name)): + if not (relation := self.model.get_relation("cluster")): assert False, "Missing cluster integration" # nosec if version := snap_version("k8s"): relation.data[self.unit]["version"] = version @@ -624,24 +623,21 @@ def _announce_kubernetes_version(self): """ peer = self.model.get_relation("cluster") worker = self.model.get_relation("k8s-cluster") - if not all([peer, worker]): - assert False, "Missing cluster integration" version = snap_version("k8s") assert version, "k8s-snap is not installed" # nosec for relation in (peer, worker): + assert relation, "Missing cluster integration" # nosec units = (unit for unit in relation.units if unit.name != self.unit.name) for unit in units: unit_version = relation.data[unit].get("version") - if not unit_version: - assert False, f"Waiting for version from {unit.name}" + assert unit_version, f"Waiting for version from {unit.name}" # nosec if unit_version != version: status.add(ops.BlockedStatus(f"Version mismatch with {unit.name}")) assert False, "Version mismatch with cluster nodes" # nosec + relation.data[self.app]["version"] = version - peer.data[self.app]["version"] = version - worker.data[self.app]["version"] = version def _get_proxy_env(self) -> Dict[str, str]: """Retrieve the Juju model config proxy values. diff --git a/charms/worker/k8s/src/token_distributor.py b/charms/worker/k8s/src/token_distributor.py index 367029aa..c476ccb5 100644 --- a/charms/worker/k8s/src/token_distributor.py +++ b/charms/worker/k8s/src/token_distributor.py @@ -300,6 +300,10 @@ def _revoke_juju_secret(self, relation: ops.Relation, unit: ops.Unit) -> None: def active_nodes(self, relation: ops.Relation): """Get nodes from application databag for given relation. + This method filters out entries in the application databag that are not + to the cluster units. It uses the regex pattern, which matches patterns + like k8s/0, k8s-worker/0, etc. + Args: relation (ops.Relation): Which relation (cluster or k8s-cluster) From eda5889e2015d8a9bb3eb4e12b59ec303ac02e82 Mon Sep 17 00:00:00 2001 From: Mateo Florido Date: Wed, 13 Nov 2024 09:17:32 -0500 Subject: [PATCH 4/6] Fix Format --- charms/worker/k8s/src/charm.py | 1 - 1 file changed, 1 deletion(-) diff --git a/charms/worker/k8s/src/charm.py b/charms/worker/k8s/src/charm.py index d6305487..a4c8e26f 100755 --- a/charms/worker/k8s/src/charm.py +++ b/charms/worker/k8s/src/charm.py @@ -638,7 +638,6 @@ def _announce_kubernetes_version(self): assert False, "Version mismatch with cluster nodes" # nosec relation.data[self.app]["version"] = version - def _get_proxy_env(self) -> Dict[str, str]: """Retrieve the Juju model config proxy values. From 9d273a37cda0f3a710b6d8ba83496ca3939b8df2 Mon Sep 17 00:00:00 2001 From: Mateo Florido Date: Wed, 13 Nov 2024 12:38:50 -0500 Subject: [PATCH 5/6] Address Code Review --- charms/worker/k8s/src/charm.py | 19 +++++++++---------- charms/worker/k8s/src/token_distributor.py | 4 +++- 2 files changed, 12 insertions(+), 11 deletions(-) diff --git a/charms/worker/k8s/src/charm.py b/charms/worker/k8s/src/charm.py index a4c8e26f..f8f126ba 100755 --- a/charms/worker/k8s/src/charm.py +++ b/charms/worker/k8s/src/charm.py @@ -603,18 +603,16 @@ def _get_scrape_jobs(self): @on_error( ops.WaitingStatus("Sharing Cluster Version"), - AssertionError, ) def _update_kubernetes_version(self): """Update the unit Kubernetes version in the cluster relation.""" - if not (relation := self.model.get_relation("cluster")): - assert False, "Missing cluster integration" # nosec + relation = self.model.get_relation("cluster") + assert relation, "Missing cluster integration" # nosec if version := snap_version("k8s"): relation.data[self.unit]["version"] = version @on_error( ops.WaitingStatus("Announcing Kubernetes version"), - AssertionError, ) def _announce_kubernetes_version(self): """Announce the Kubernetes version to the cluster. @@ -624,19 +622,20 @@ def _announce_kubernetes_version(self): peer = self.model.get_relation("cluster") worker = self.model.get_relation("k8s-cluster") - version = snap_version("k8s") - assert version, "k8s-snap is not installed" # nosec + local_version = snap_version("k8s") + assert local_version, "k8s-snap is not installed" # nosec for relation in (peer, worker): - assert relation, "Missing cluster integration" # nosec + if not relation: + continue units = (unit for unit in relation.units if unit.name != self.unit.name) for unit in units: unit_version = relation.data[unit].get("version") assert unit_version, f"Waiting for version from {unit.name}" # nosec - if unit_version != version: + if unit_version != local_version: status.add(ops.BlockedStatus(f"Version mismatch with {unit.name}")) - assert False, "Version mismatch with cluster nodes" # nosec - relation.data[self.app]["version"] = version + assert unit_version==local_version, "Version mismatch with cluster nodes" # nosec + relation.data[self.app]["version"] = local_version def _get_proxy_env(self) -> Dict[str, str]: """Retrieve the Juju model config proxy values. diff --git a/charms/worker/k8s/src/token_distributor.py b/charms/worker/k8s/src/token_distributor.py index c476ccb5..d507d606 100644 --- a/charms/worker/k8s/src/token_distributor.py +++ b/charms/worker/k8s/src/token_distributor.py @@ -23,6 +23,8 @@ SECRET_ID = "{0}-secret-id" # nosec +UNIT_RE = re.compile(r"k8s(-worker)?/\d+") + class K8sCharm(Protocol): """Typing for the K8sCharm. @@ -313,7 +315,7 @@ def active_nodes(self, relation: ops.Relation): return { self.charm.model.get_unit(str(u)): data for u, data in relation.data[self.charm.app].items() - if re.match(r"k8s(-worker)?/\d+", u) + if UNIT_RE.match(u) } def drop_node(self, relation: ops.Relation, unit: ops.Unit): From f413261afe1431fd8ded1bbf5a57c1a9f8d611b4 Mon Sep 17 00:00:00 2001 From: Mateo Florido Date: Wed, 13 Nov 2024 14:36:13 -0500 Subject: [PATCH 6/6] Change AssertionError to ReconcilerError --- charms/worker/k8s/src/charm.py | 34 ++++++++++++++++++++-------------- 1 file changed, 20 insertions(+), 14 deletions(-) diff --git a/charms/worker/k8s/src/charm.py b/charms/worker/k8s/src/charm.py index f8f126ba..8d28884a 100755 --- a/charms/worker/k8s/src/charm.py +++ b/charms/worker/k8s/src/charm.py @@ -33,7 +33,7 @@ import ops import reschedule import yaml -from charms.contextual_status import WaitingStatus, on_error +from charms.contextual_status import ReconcilerError, WaitingStatus, on_error from charms.grafana_agent.v0.cos_agent import COSAgentProvider from charms.interface_external_cloud_provider import ExternalCloudProvider from charms.k8s.v0.k8sd_api_manager import ( @@ -601,40 +601,46 @@ def _get_scrape_jobs(self): log.exception("Failed to get COS token.") return [] - @on_error( - ops.WaitingStatus("Sharing Cluster Version"), - ) + @on_error(ops.WaitingStatus("Sharing Cluster Version")) def _update_kubernetes_version(self): - """Update the unit Kubernetes version in the cluster relation.""" + """Update the unit Kubernetes version in the cluster relation. + + Raises: + ReconcilerError: If the cluster integration is missing. + """ relation = self.model.get_relation("cluster") - assert relation, "Missing cluster integration" # nosec + if not relation: + raise ReconcilerError("Missing cluster integration") if version := snap_version("k8s"): relation.data[self.unit]["version"] = version - @on_error( - ops.WaitingStatus("Announcing Kubernetes version"), - ) + @on_error(ops.WaitingStatus("Announcing Kubernetes version")) def _announce_kubernetes_version(self): """Announce the Kubernetes version to the cluster. This method ensures that the Kubernetes version is consistent across the cluster. + + Raises: + ReconcilerError: If the k8s snap is not installed, the version is missing, + or the version does not match the local version. """ + if not (local_version := snap_version("k8s")): + raise ReconcilerError("k8s-snap is not installed") + peer = self.model.get_relation("cluster") worker = self.model.get_relation("k8s-cluster") - local_version = snap_version("k8s") - assert local_version, "k8s-snap is not installed" # nosec - for relation in (peer, worker): if not relation: continue units = (unit for unit in relation.units if unit.name != self.unit.name) for unit in units: unit_version = relation.data[unit].get("version") - assert unit_version, f"Waiting for version from {unit.name}" # nosec + if not unit_version: + raise ReconcilerError(f"Waiting for version from {unit.name}") if unit_version != local_version: status.add(ops.BlockedStatus(f"Version mismatch with {unit.name}")) - assert unit_version==local_version, "Version mismatch with cluster nodes" # nosec + raise ReconcilerError(f"Version mismatch with {unit.name}") relation.data[self.app]["version"] = local_version def _get_proxy_env(self) -> Dict[str, str]: