From 144ae0490de66e79c85c0c1d6656b5f82a34861e Mon Sep 17 00:00:00 2001 From: Adam Dyess Date: Mon, 18 Nov 2024 16:38:42 -0600 Subject: [PATCH] cloud integration is dependant on the remote cluster-tag --- .../k8s/lib/charms/k8s/v0/k8sd_api_manager.py | 10 ++ charms/worker/k8s/src/charm.py | 53 ++++++--- charms/worker/k8s/src/cloud_integration.py | 112 +++++++++++------- charms/worker/k8s/src/kube_control.py | 6 +- charms/worker/k8s/src/token_distributor.py | 51 ++++++-- .../k8s/tests/unit/test_cloud_integration.py | 15 +-- pyproject.toml | 2 + 7 files changed, 166 insertions(+), 83 deletions(-) diff --git a/charms/worker/k8s/lib/charms/k8s/v0/k8sd_api_manager.py b/charms/worker/k8s/lib/charms/k8s/v0/k8sd_api_manager.py index ae623a40..543e8216 100644 --- a/charms/worker/k8s/lib/charms/k8s/v0/k8sd_api_manager.py +++ b/charms/worker/k8s/lib/charms/k8s/v0/k8sd_api_manager.py @@ -834,6 +834,16 @@ def request_auth_token(self, username: str, groups: List[str]) -> SecretStr: auth_response = self._send_request(endpoint, "POST", AuthTokenResponse, body) return auth_response.metadata.token + def revoke_auth_token(self, token: str) -> None: + """Revoke a Kubernetes authentication token. + + Args: + token (str): The authentication token. + """ + endpoint = "/1.0/kubernetes/auth/tokens" + body = {"token": token} + self._send_request(endpoint, "DELETE", EmptyResponse, body) + def get_kubeconfig(self, server: Optional[str]) -> str: """Request a Kubernetes admin config. diff --git a/charms/worker/k8s/src/charm.py b/charms/worker/k8s/src/charm.py index 37dc2de0..78190174 100755 --- a/charms/worker/k8s/src/charm.py +++ b/charms/worker/k8s/src/charm.py @@ -58,8 +58,8 @@ from cloud_integration import CloudIntegration from cos_integration import COSIntegration from inspector import ClusterInspector -from literals import DEPENDENCIES from kube_control import configure as configure_kube_control +from literals import DEPENDENCIES from ops.interface_kube_control import KubeControlProvides from snap import management as snap_management from snap import version as snap_version @@ -709,29 +709,45 @@ def _get_proxy_env(self) -> Dict[str, str]: InvalidResponseError, K8sdConnectionError, ) - def _join_cluster(self): - """Retrieve the join token from secret databag and join the cluster.""" + def _join_cluster(self, event: ops.EventBase): + """Retrieve the join token from secret databag and join the cluster. + + Args: + event (ops.EventBase): event triggering the join + """ if not (relation := self.model.get_relation("cluster")): status.add(ops.BlockedStatus("Missing cluster integration")) - assert False, "Missing cluster integration" # nosec + raise ReconcilerError("Missing cluster integration") - if self.get_cluster_name(): + if local_cluster := self.get_cluster_name(): + self.cloud_integration.integrate(local_cluster, event) return status.add(ops.MaintenanceStatus("Joining cluster")) with self.collector.recover_token(relation) as token: - binding = self.model.get_binding(relation.name) - address = binding and binding.network.ingress_address - node_name = self.get_node_name() - cluster_addr = f"{address}:{K8SD_PORT}" - log.info("Joining %s(%s) to %s...", self.unit, node_name, cluster_addr) - request = JoinClusterRequest(name=node_name, address=cluster_addr, token=token) - if self.is_control_plane: - request.config = ControlPlaneNodeJoinConfig() - request.config.extra_sans = [_get_public_address()] - - self.api_manager.join_cluster(request) - log.info("Joined %s(%s)", self.unit, node_name) + remote_cluster = self.collector.cluster_name(relation, False) if relation else "" + self.cloud_integration.integrate(remote_cluster, event) + self._join_with_token(relation, token) + + def _join_with_token(self, relation: ops.Relation, token: str): + """Join the cluster with the given token. + + Args: + relation (ops.Relation): The relation to use for the token. + token (str): The token to use for joining the cluster. + """ + binding = self.model.get_binding(relation.name) + address = binding and binding.network.ingress_address + node_name = self.get_node_name() + cluster_addr = f"{address}:{K8SD_PORT}" + log.info("Joining %s(%s) to %s...", self.unit, node_name, cluster_addr) + request = JoinClusterRequest(name=node_name, address=cluster_addr, token=token) + if self.is_control_plane: + request.config = ControlPlaneNodeJoinConfig() + request.config.extra_sans = [_get_public_address()] + + self.api_manager.join_cluster(request) + log.info("Joined %s(%s)", self.unit, node_name) @on_error(WaitingStatus("Awaiting cluster removal")) def _death_handler(self, event: ops.EventBase): @@ -783,7 +799,7 @@ def _reconcile(self, event: ops.EventBase): self._revoke_cluster_tokens(event) self._ensure_cluster_config() self._announce_kubernetes_version() - self._join_cluster() + self._join_cluster(event) self._config_containerd_registries() self._configure_cos_integration() self._update_status() @@ -791,7 +807,6 @@ def _reconcile(self, event: ops.EventBase): if self.is_control_plane: self._copy_internal_kubeconfig() self._expose_ports() - self.cloud_integration.integrate(event) def _update_status(self): """Check k8s snap status.""" diff --git a/charms/worker/k8s/src/cloud_integration.py b/charms/worker/k8s/src/cloud_integration.py index 9697a634..87b1a5e4 100644 --- a/charms/worker/k8s/src/cloud_integration.py +++ b/charms/worker/k8s/src/cloud_integration.py @@ -1,7 +1,7 @@ # Copyright 2024 Canonical Ltd. # See LICENSE file for licensing details. -"""Cloud Integration for Charmed Kubernetes Control Plane.""" +"""Cloud Integration for Canonical k8s Operator.""" import logging from typing import Mapping, Optional, Union @@ -21,7 +21,7 @@ class CloudIntegration: - """Utility class that handles the integration with clouds for Charmed Kubernetes. + """Utility class that handles the integration with clouds for Canonical k8s. This class provides methods to configure instance tags and roles for control-plane units @@ -61,11 +61,73 @@ def cloud(self) -> Optional[CloudSpecificIntegration]: return None return cloud + def _integrate_aws(self, cloud: AWSIntegrationRequires, cluster_tag: str): + """Integrate with AWS cloud. + + Args: + cloud (AWSIntegrationRequires): AWS cloud integration. + cluster_tag (str): Tag to identify the cluster. + """ + aws_cluster_tag = {f"kubernetes.io/cluster/{cluster_tag}": "owned"} + if self.is_control_plane: + # wokeignore:rule=master + cloud.tag_instance({**aws_cluster_tag, "k8s.io/role/master": "true"}) + cloud.tag_instance_security_group(aws_cluster_tag) + cloud.tag_instance_subnet(aws_cluster_tag) + cloud.enable_object_storage_management(["kubernetes-*"]) + cloud.enable_load_balancer_management() + + # Necessary for cloud-provider-aws + cloud.enable_autoscaling_readonly() + cloud.enable_instance_modification() + cloud.enable_region_readonly() + else: + cloud.tag_instance(aws_cluster_tag) + cloud.tag_instance_security_group(aws_cluster_tag) + cloud.tag_instance_subnet(aws_cluster_tag) + cloud.enable_object_storage_management(["kubernetes-*"]) + + def _integrate_gcp(self, cloud: GCPIntegrationRequires, cluster_tag: str): + """Integrate with GCP cloud. + + Args: + cloud (GCPIntegrationRequires): GCP cloud integration. + cluster_tag (str): Tag to identify the cluster. + """ + gcp_cluster_tag = {"k8s-io-cluster-name": cluster_tag} + if self.is_control_plane: + # wokeignore:rule=master + cloud.tag_instance({**gcp_cluster_tag, "k8s-io-role-master": "master"}) + cloud.enable_object_storage_management() + cloud.enable_security_management() + else: + cloud.tag_instance(gcp_cluster_tag) + cloud.enable_object_storage_management() + + def _integrate_azure(self, cloud: AzureIntegrationRequires, cluster_tag: str): + """Integrate with Azure cloud. + + Args: + cloud (AzureIntegrationRequires): Azure cloud integration. + cluster_tag (str): Tag to identify the cluster. + """ + azure_cluster_tag = {"k8s-io-cluster-name": cluster_tag} + if self.is_control_plane: + # wokeignore:rule=master + cloud.tag_instance({**azure_cluster_tag, "k8s-io-role-master": "master"}) + cloud.enable_object_storage_management() + cloud.enable_security_management() + cloud.enable_loadbalancer_management() + else: + cloud.tag_instance(azure_cluster_tag) + cloud.enable_object_storage_management() + @status.on_error(ops.WaitingStatus("Waiting for cloud-integration")) - def integrate(self, event: ops.EventBase): + def integrate(self, cluster_tag: str, event: ops.EventBase): """Request tags and permissions for a control-plane node. Args: + cluster_tag (str): Tag to identify the integrating cluster. event (ops.EventBase): Event that triggered the integration Raises: @@ -74,50 +136,18 @@ def integrate(self, event: ops.EventBase): if not (cloud := self.cloud): return + if not cluster_tag: + raise ValueError("Cluster-tag is required for cloud integration") + cloud_name = self.charm.get_cloud_name() - cluster_tag = self.charm.get_cluster_name() status.add(ops.MaintenanceStatus(f"Integrate with {cloud_name}")) if isinstance(cloud, AWSIntegrationRequires): - aws_cluster_tag = {f"kubernetes.io/cluster/{cluster_tag}": "owned"} - if self.is_control_plane: - # wokeignore:rule=master - cloud.tag_instance({**aws_cluster_tag, "k8s.io/role/master": "true"}) - cloud.tag_instance_security_group(aws_cluster_tag) - cloud.tag_instance_subnet(aws_cluster_tag) - cloud.enable_object_storage_management(["kubernetes-*"]) - cloud.enable_load_balancer_management() - - # Necessary for cloud-provider-aws - cloud.enable_autoscaling_readonly() - cloud.enable_instance_modification() - cloud.enable_region_readonly() - else: - cloud.tag_instance(aws_cluster_tag) - cloud.tag_instance_security_group(aws_cluster_tag) - cloud.tag_instance_subnet(aws_cluster_tag) - cloud.enable_object_storage_management(["kubernetes-*"]) + self._integrate_aws(cloud, cluster_tag) elif isinstance(cloud, GCPIntegrationRequires): - gcp_cluster_tag = {"k8s-io-cluster-name": cluster_tag} - if self.is_control_plane: - # wokeignore:rule=master - cloud.tag_instance({**gcp_cluster_tag, "k8s-io-role-master": "master"}) - cloud.enable_object_storage_management() - cloud.enable_security_management() - else: - cloud.tag_instance(gcp_cluster_tag) - cloud.enable_object_storage_management() + self._integrate_gcp(cloud, cluster_tag) elif isinstance(cloud, AzureIntegrationRequires): - azure_cluster_tag = {"k8s-io-cluster-name": cluster_tag} - if self.is_control_plane: - # wokeignore:rule=master - cloud.tag_instance({**azure_cluster_tag, "k8s-io-role-master": "master"}) - cloud.enable_object_storage_management() - cloud.enable_security_management() - cloud.enable_loadbalancer_management() - else: - cloud.tag_instance(azure_cluster_tag) - cloud.enable_object_storage_management() + self._integrate_azure(cloud, cluster_tag) cloud.enable_instance_inspection() cloud.enable_dns_management() if self.is_control_plane: diff --git a/charms/worker/k8s/src/kube_control.py b/charms/worker/k8s/src/kube_control.py index 7745e3d2..facb5796 100644 --- a/charms/worker/k8s/src/kube_control.py +++ b/charms/worker/k8s/src/kube_control.py @@ -67,6 +67,6 @@ def configure(charm: K8sCharmProtocol): proxy_token=str(), ) - for user, _ in charm.kube_control.closed_auth_creds(): - log.info("TODO: Revoke auth-token for '%s'", user) - # charm.api_manager.remove_auth_token(cred.client_token.get_secret_value()) + for user, cred in charm.kube_control.closed_auth_creds(): + log.info("Revoke auth-token for '%s'", user) + charm.api_manager.revoke_auth_token(cred.load_client_token(charm.model, user)) diff --git a/charms/worker/k8s/src/token_distributor.py b/charms/worker/k8s/src/token_distributor.py index 2284d19d..c3d65c41 100644 --- a/charms/worker/k8s/src/token_distributor.py +++ b/charms/worker/k8s/src/token_distributor.py @@ -7,7 +7,7 @@ import logging import re from enum import Enum, auto -from typing import Dict, Optional, Union +from typing import Dict, Generator, Optional, Union import charms.contextual_status as status import ops @@ -87,12 +87,13 @@ def create(self, name: str, token_type: ClusterTokenType) -> SecretStr: worker = token_type == ClusterTokenType.WORKER return self.api_manager.create_join_token(name, worker=worker) - def revoke(self, name: str, ignore_errors: bool): + def revoke(self, name: str, _secret: Optional[ops.Secret], ignore_errors: bool): """Remove a cluster token. Args: - name (str): The name of the node. - ignore_errors (bool): Whether or not errors can be ignored + name (str): The name of the node. + _secret (Optional[ops.Secret]): The secret to revoke + ignore_errors (bool): Whether or not errors can be ignored Raises: K8sdConnectionError: reraises cluster token revoke failures @@ -100,7 +101,7 @@ def revoke(self, name: str, ignore_errors: bool): try: self.api_manager.remove_node(name) except (K8sdConnectionError, InvalidResponseError) as e: - if ignore_errors or e.code == ErrorCodes.STATUS_NODE_UNAVAILABLE: + if ignore_errors or getattr(e, "code") == ErrorCodes.STATUS_NODE_UNAVAILABLE: # Let's just ignore some of these expected errors: # "Remote end closed connection without response" # "Failed to check if node is control-plane" @@ -146,13 +147,32 @@ def create(self, name: str, token_type: ClusterTokenType) -> SecretStr: username=f"system:cos:{name}", groups=["system:cos"] ) - def revoke(self, name: str, ignore_errors: bool): + def revoke(self, _name: str, secret: Optional[ops.Secret], ignore_errors: bool): """Remove a COS token intentionally left unimplemented. Args: - name (str): The name of the node. - ignore_errors (bool): Whether or not errors can be ignored + _name (str): The name of the node. + secret (Optional[ops.Secret]): The secret to revoke + ignore_errors (bool): Whether or not errors can be ignored + + Raises: + K8sdConnectionError: reraises cluster token revoke failures """ + # pylint: disable=unused-argument + if not secret: + return + content = secret.get_content(refresh=True) + try: + self.api_manager.revoke_auth_token(content["token"]) + except (K8sdConnectionError, InvalidResponseError) as e: + if ignore_errors or getattr(e, "code") == ErrorCodes.STATUS_NODE_UNAVAILABLE: + # Let's just ignore some of these expected errors: + # "Remote end closed connection without response" + # "Failed to check if node is control-plane" + # Removing a node that doesn't exist + log.warning("Revoke_Auth_Token %s: but with an expected error: %s", _name, e) + else: + raise class TokenCollector: @@ -207,7 +227,7 @@ def cluster_name(self, relation: ops.Relation, local: bool) -> str: return cluster_name or "" @contextlib.contextmanager - def recover_token(self, relation: ops.Relation): + def recover_token(self, relation: ops.Relation) -> Generator[str, None, None]: """Request, recover token, and acknowledge token once used. Args: @@ -258,7 +278,7 @@ def __init__(self, charm: K8sCharmProtocol, node_name: str, api_manager: K8sdAPI TokenStrategy.COS: CosTokenManager(api_manager), } - def _get_juju_secret(self, relation: ops.Relation, unit: ops.Unit) -> Optional[str]: + def _get_juju_secret(self, relation: ops.Relation, unit: ops.Unit) -> Optional[ops.Secret]: """Lookup juju secret offered to a unit on this relation. Args: @@ -266,9 +286,12 @@ def _get_juju_secret(self, relation: ops.Relation, unit: ops.Unit) -> Optional[s unit (ops.Unit): The unit the secret is intended for Returns: - secret_id (None | str) if on the relation + secret_id (None | ops.Secret) if on the relation """ - return relation.data[self.charm.unit].get(SECRET_ID.format(unit.name)) + secret_id = SECRET_ID.format(unit.name) + if juju_secret := relation.data[self.charm.unit].get(secret_id): + return self.charm.model.get_secret(id=juju_secret) + return None def _revoke_juju_secret(self, relation: ops.Relation, unit: ops.Unit) -> None: """Revoke and remove juju secret offered to a unit on this relation. @@ -489,7 +512,9 @@ def revoke_tokens( ignore_errors |= state == "pending" # on pending tokens # if cluster doesn't match ignore_errors |= self.charm.get_cluster_name() != joined_cluster(relation, unit) - self.token_strategies[token_strategy].revoke(node, ignore_errors) + self.token_strategies[token_strategy].revoke( + node, self._get_juju_secret(relation, unit), ignore_errors + ) self.drop_node(relation, unit) self._revoke_juju_secret(relation, unit) diff --git a/charms/worker/k8s/tests/unit/test_cloud_integration.py b/charms/worker/k8s/tests/unit/test_cloud_integration.py index 435c23f6..a8e53ec2 100644 --- a/charms/worker/k8s/tests/unit/test_cloud_integration.py +++ b/charms/worker/k8s/tests/unit/test_cloud_integration.py @@ -13,6 +13,8 @@ from ops.interface_azure.requires import AzureIntegrationRequires from ops.interface_gcp.requires import GCPIntegrationRequires +TEST_CLUSTER_NAME = "my-cluster" + @pytest.fixture(autouse=True) def vendor_name(): @@ -38,9 +40,8 @@ def harness(request): harness.begin() harness.charm.is_worker = request.param == "worker" with mock.patch.object(harness.charm, "get_cloud_name"): - with mock.patch.object(harness.charm, "get_cluster_name", return_value="my-cluster"): - with mock.patch.object(harness.charm.reconciler, "reconcile"): - yield harness + with mock.patch.object(harness.charm.reconciler, "reconcile"): + yield harness harness.cleanup() @@ -86,7 +87,7 @@ def test_cloud_aws(harness): mock_cloud = mock_property() mock_cloud.evaluate_relation.return_value = None event = mock.MagicMock() - harness.charm.cloud_integration.integrate(event) + harness.charm.cloud_integration.integrate(TEST_CLUSTER_NAME, event) if harness.charm.is_worker: mock_cloud.tag_instance.assert_called_once_with( {"kubernetes.io/cluster/my-cluster": "owned"} @@ -139,7 +140,7 @@ def test_cloud_gce(harness): mock_cloud = mock_property() mock_cloud.evaluate_relation.return_value = None event = mock.MagicMock() - harness.charm.cloud_integration.integrate(event) + harness.charm.cloud_integration.integrate(TEST_CLUSTER_NAME, event) if harness.charm.is_worker: mock_cloud.tag_instance.assert_called_once_with({"k8s-io-cluster-name": "my-cluster"}) @@ -179,7 +180,7 @@ def test_cloud_azure(harness): mock_cloud = mock_property() mock_cloud.evaluate_relation.return_value = None event = mock.MagicMock() - harness.charm.cloud_integration.integrate(event) + harness.charm.cloud_integration.integrate(TEST_CLUSTER_NAME, event) if harness.charm.is_worker: mock_cloud.tag_instance.assert_called_once_with({"k8s-io-cluster-name": "my-cluster"}) else: @@ -218,5 +219,5 @@ def test_cloud_unknown(harness): return_value=None, ) as mock_property: event = mock.MagicMock() - harness.charm.cloud_integration.integrate(event) + harness.charm.cloud_integration.integrate(TEST_CLUSTER_NAME, event) assert mock_property.called diff --git a/pyproject.toml b/pyproject.toml index 391e6a69..363ba3ba 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -48,6 +48,8 @@ plugins = "pydantic.mypy" disable = "wrong-import-order,redefined-outer-name,too-many-instance-attributes,too-few-public-methods,no-self-argument,fixme,protected-access" # Ignore Pydantic check: https://github.com/pydantic/pydantic/issues/1961 extension-pkg-whitelist = "pydantic" # wokeignore:rule=whitelist +# Modules can be bigger than 1000 lines +max-module-lines = 1500 [tool.pylint.typecheck] # Ignore typechecking on pylxd manager classes