Skip to content

Commit

Permalink
cloud integration is dependant on the remote cluster-tag
Browse files Browse the repository at this point in the history
  • Loading branch information
addyess committed Nov 18, 2024
1 parent fdc41e4 commit 4733368
Show file tree
Hide file tree
Showing 6 changed files with 163 additions and 82 deletions.
10 changes: 10 additions & 0 deletions charms/worker/k8s/lib/charms/k8s/v0/k8sd_api_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -834,6 +834,16 @@ def request_auth_token(self, username: str, groups: List[str]) -> SecretStr:
auth_response = self._send_request(endpoint, "POST", AuthTokenResponse, body)
return auth_response.metadata.token

def revoke_auth_token(self, token: str) -> None:
"""Revoke a Kubernetes authentication token.
Args:
token (str): The authentication token.
"""
endpoint = "/1.0/kubernetes/auth/tokens"
body = {"token": token}
self._send_request(endpoint, "DELETE", EmptyResponse, body)

def get_kubeconfig(self, server: Optional[str]) -> str:
"""Request a Kubernetes admin config.
Expand Down
51 changes: 33 additions & 18 deletions charms/worker/k8s/src/charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -697,29 +697,45 @@ def _get_proxy_env(self) -> Dict[str, str]:
InvalidResponseError,
K8sdConnectionError,
)
def _join_cluster(self):
"""Retrieve the join token from secret databag and join the cluster."""
def _join_cluster(self, event: ops.EventBase):
"""Retrieve the join token from secret databag and join the cluster.
Args:
event (ops.EventBase): event triggering the join
"""
if not (relation := self.model.get_relation("cluster")):
status.add(ops.BlockedStatus("Missing cluster integration"))
assert False, "Missing cluster integration" # nosec
raise ReconcilerError("Missing cluster integration")

if self.get_cluster_name():
if local_cluster := self.get_cluster_name():
self.cloud_integration.integrate(local_cluster, event)
return

status.add(ops.MaintenanceStatus("Joining cluster"))
with self.collector.recover_token(relation) as token:
binding = self.model.get_binding(relation.name)
address = binding and binding.network.ingress_address
node_name = self.get_node_name()
cluster_addr = f"{address}:{K8SD_PORT}"
log.info("Joining %s(%s) to %s...", self.unit, node_name, cluster_addr)
request = JoinClusterRequest(name=node_name, address=cluster_addr, token=token)
if self.is_control_plane:
request.config = ControlPlaneNodeJoinConfig()
request.config.extra_sans = [_get_public_address()]

self.api_manager.join_cluster(request)
log.info("Joined %s(%s)", self.unit, node_name)
remote_cluster = self.collector.cluster_name(relation, False) if relation else ""
self.cloud_integration.integrate(remote_cluster, event)
self._join_with_token(relation, token)

def _join_with_token(self, relation: ops.Relation, token: str):
"""Join the cluster with the given token.
Args:
relation (ops.Relation): The relation to use for the token.
token (str): The token to use for joining the cluster.
"""
binding = self.model.get_binding(relation.name)
address = binding and binding.network.ingress_address
node_name = self.get_node_name()
cluster_addr = f"{address}:{K8SD_PORT}"
log.info("Joining %s(%s) to %s...", self.unit, node_name, cluster_addr)
request = JoinClusterRequest(name=node_name, address=cluster_addr, token=token)
if self.is_control_plane:
request.config = ControlPlaneNodeJoinConfig()
request.config.extra_sans = [_get_public_address()]

self.api_manager.join_cluster(request)
log.info("Joined %s(%s)", self.unit, node_name)

@on_error(WaitingStatus("Awaiting cluster removal"))
def _death_handler(self, event: ops.EventBase):
Expand Down Expand Up @@ -771,15 +787,14 @@ def _reconcile(self, event: ops.EventBase):
self._revoke_cluster_tokens(event)
self._ensure_cluster_config()
self._announce_kubernetes_version()
self._join_cluster()
self._join_cluster(event)
self._config_containerd_registries()
self._configure_cos_integration()
self._update_status()
self._apply_node_labels()
if self.is_control_plane:
self._copy_internal_kubeconfig()
self._expose_ports()
self.cloud_integration.integrate(event)

def _update_status(self):
"""Check k8s snap status."""
Expand Down
112 changes: 71 additions & 41 deletions charms/worker/k8s/src/cloud_integration.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Copyright 2024 Canonical Ltd.
# See LICENSE file for licensing details.

"""Cloud Integration for Charmed Kubernetes Control Plane."""
"""Cloud Integration for Canonical k8s Operator."""

import logging
from typing import Mapping, Optional, Union
Expand All @@ -21,7 +21,7 @@


class CloudIntegration:
"""Utility class that handles the integration with clouds for Charmed Kubernetes.
"""Utility class that handles the integration with clouds for Canonical k8s.
This class provides methods to configure instance tags and roles for control-plane
units
Expand Down Expand Up @@ -61,11 +61,73 @@ def cloud(self) -> Optional[CloudSpecificIntegration]:
return None
return cloud

def _integrate_aws(self, cloud: AWSIntegrationRequires, cluster_tag: str):
"""Integrate with AWS cloud.
Args:
cloud (AWSIntegrationRequires): AWS cloud integration.
cluster_tag (str): Tag to identify the cluster.
"""
aws_cluster_tag = {f"kubernetes.io/cluster/{cluster_tag}": "owned"}
if self.is_control_plane:
# wokeignore:rule=master
cloud.tag_instance({**aws_cluster_tag, "k8s.io/role/master": "true"})
cloud.tag_instance_security_group(aws_cluster_tag)
cloud.tag_instance_subnet(aws_cluster_tag)
cloud.enable_object_storage_management(["kubernetes-*"])
cloud.enable_load_balancer_management()

# Necessary for cloud-provider-aws
cloud.enable_autoscaling_readonly()
cloud.enable_instance_modification()
cloud.enable_region_readonly()
else:
cloud.tag_instance(aws_cluster_tag)
cloud.tag_instance_security_group(aws_cluster_tag)
cloud.tag_instance_subnet(aws_cluster_tag)
cloud.enable_object_storage_management(["kubernetes-*"])

def _integrate_gcp(self, cloud: GCPIntegrationRequires, cluster_tag: str):
"""Integrate with GCP cloud.
Args:
cloud (GCPIntegrationRequires): GCP cloud integration.
cluster_tag (str): Tag to identify the cluster.
"""
gcp_cluster_tag = {"k8s-io-cluster-name": cluster_tag}
if self.is_control_plane:
# wokeignore:rule=master
cloud.tag_instance({**gcp_cluster_tag, "k8s-io-role-master": "master"})
cloud.enable_object_storage_management()
cloud.enable_security_management()
else:
cloud.tag_instance(gcp_cluster_tag)
cloud.enable_object_storage_management()

def _integrate_azure(self, cloud: AzureIntegrationRequires, cluster_tag: str):
"""Integrate with Azure cloud.
Args:
cloud (AzureIntegrationRequires): Azure cloud integration.
cluster_tag (str): Tag to identify the cluster.
"""
azure_cluster_tag = {"k8s-io-cluster-name": cluster_tag}
if self.is_control_plane:
# wokeignore:rule=master
cloud.tag_instance({**azure_cluster_tag, "k8s-io-role-master": "master"})
cloud.enable_object_storage_management()
cloud.enable_security_management()
cloud.enable_loadbalancer_management()
else:
cloud.tag_instance(azure_cluster_tag)
cloud.enable_object_storage_management()

@status.on_error(ops.WaitingStatus("Waiting for cloud-integration"))
def integrate(self, event: ops.EventBase):
def integrate(self, cluster_tag: str, event: ops.EventBase):
"""Request tags and permissions for a control-plane node.
Args:
cluster_tag (str): Tag to identify the integrating cluster.
event (ops.EventBase): Event that triggered the integration
Raises:
Expand All @@ -74,50 +136,18 @@ def integrate(self, event: ops.EventBase):
if not (cloud := self.cloud):
return

if not cluster_tag:
raise ValueError("Cluster-tag is required for cloud integration")

cloud_name = self.charm.get_cloud_name()
cluster_tag = self.charm.get_cluster_name()

status.add(ops.MaintenanceStatus(f"Integrate with {cloud_name}"))
if isinstance(cloud, AWSIntegrationRequires):
aws_cluster_tag = {f"kubernetes.io/cluster/{cluster_tag}": "owned"}
if self.is_control_plane:
# wokeignore:rule=master
cloud.tag_instance({**aws_cluster_tag, "k8s.io/role/master": "true"})
cloud.tag_instance_security_group(aws_cluster_tag)
cloud.tag_instance_subnet(aws_cluster_tag)
cloud.enable_object_storage_management(["kubernetes-*"])
cloud.enable_load_balancer_management()

# Necessary for cloud-provider-aws
cloud.enable_autoscaling_readonly()
cloud.enable_instance_modification()
cloud.enable_region_readonly()
else:
cloud.tag_instance(aws_cluster_tag)
cloud.tag_instance_security_group(aws_cluster_tag)
cloud.tag_instance_subnet(aws_cluster_tag)
cloud.enable_object_storage_management(["kubernetes-*"])
self._integrate_aws(cloud, cluster_tag)
elif isinstance(cloud, GCPIntegrationRequires):
gcp_cluster_tag = {"k8s-io-cluster-name": cluster_tag}
if self.is_control_plane:
# wokeignore:rule=master
cloud.tag_instance({**gcp_cluster_tag, "k8s-io-role-master": "master"})
cloud.enable_object_storage_management()
cloud.enable_security_management()
else:
cloud.tag_instance(gcp_cluster_tag)
cloud.enable_object_storage_management()
self._integrate_gcp(cloud, cluster_tag)
elif isinstance(cloud, AzureIntegrationRequires):
azure_cluster_tag = {"k8s-io-cluster-name": cluster_tag}
if self.is_control_plane:
# wokeignore:rule=master
cloud.tag_instance({**azure_cluster_tag, "k8s-io-role-master": "master"})
cloud.enable_object_storage_management()
cloud.enable_security_management()
cloud.enable_loadbalancer_management()
else:
cloud.tag_instance(azure_cluster_tag)
cloud.enable_object_storage_management()
self._integrate_azure(cloud, cluster_tag)
cloud.enable_instance_inspection()
cloud.enable_dns_management()
if self.is_control_plane:
Expand Down
6 changes: 3 additions & 3 deletions charms/worker/k8s/src/kube_control.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,6 @@ def configure(charm: K8sCharmProtocol):
proxy_token=str(),
)

for user, _ in charm.kube_control.closed_auth_creds():
log.info("TODO: Revoke auth-token for '%s'", user)
# charm.api_manager.remove_auth_token(cred.client_token.get_secret_value())
for user, cred in charm.kube_control.closed_auth_creds():
log.info("Revoke auth-token for '%s'", user)
charm.api_manager.revoke_auth_token(cred.load_client_token(charm.model, user))
51 changes: 38 additions & 13 deletions charms/worker/k8s/src/token_distributor.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import logging
import re
from enum import Enum, auto
from typing import Dict, Optional, Union
from typing import Dict, Generator, Optional, Union

import charms.contextual_status as status
import ops
Expand Down Expand Up @@ -87,20 +87,21 @@ def create(self, name: str, token_type: ClusterTokenType) -> SecretStr:
worker = token_type == ClusterTokenType.WORKER
return self.api_manager.create_join_token(name, worker=worker)

def revoke(self, name: str, ignore_errors: bool):
def revoke(self, name: str, _secret: Optional[ops.Secret], ignore_errors: bool):
"""Remove a cluster token.
Args:
name (str): The name of the node.
ignore_errors (bool): Whether or not errors can be ignored
name (str): The name of the node.
_secret (Optional[ops.Secret]): The secret to revoke
ignore_errors (bool): Whether or not errors can be ignored
Raises:
K8sdConnectionError: reraises cluster token revoke failures
"""
try:
self.api_manager.remove_node(name)
except (K8sdConnectionError, InvalidResponseError) as e:
if ignore_errors or e.code == ErrorCodes.STATUS_NODE_UNAVAILABLE:
if ignore_errors or getattr(e, "code") == ErrorCodes.STATUS_NODE_UNAVAILABLE:
# Let's just ignore some of these expected errors:
# "Remote end closed connection without response"
# "Failed to check if node is control-plane"
Expand Down Expand Up @@ -146,13 +147,32 @@ def create(self, name: str, token_type: ClusterTokenType) -> SecretStr:
username=f"system:cos:{name}", groups=["system:cos"]
)

def revoke(self, name: str, ignore_errors: bool):
def revoke(self, _name: str, secret: Optional[ops.Secret], ignore_errors: bool):
"""Remove a COS token intentionally left unimplemented.
Args:
name (str): The name of the node.
ignore_errors (bool): Whether or not errors can be ignored
_name (str): The name of the node.
secret (Optional[ops.Secret]): The secret to revoke
ignore_errors (bool): Whether or not errors can be ignored
Raises:
K8sdConnectionError: reraises cluster token revoke failures
"""
# pylint: disable=unused-argument
if not secret:
return
content = secret.get_content(refresh=True)
try:
self.api_manager.revoke_auth_token(content["token"])
except (K8sdConnectionError, InvalidResponseError) as e:
if ignore_errors or getattr(e, "code") == ErrorCodes.STATUS_NODE_UNAVAILABLE:
# Let's just ignore some of these expected errors:
# "Remote end closed connection without response"
# "Failed to check if node is control-plane"
# Removing a node that doesn't exist
log.warning("Revoke_Auth_Token %s: but with an expected error: %s", _name, e)
else:
raise


class TokenCollector:
Expand Down Expand Up @@ -207,7 +227,7 @@ def cluster_name(self, relation: ops.Relation, local: bool) -> str:
return cluster_name or ""

@contextlib.contextmanager
def recover_token(self, relation: ops.Relation):
def recover_token(self, relation: ops.Relation) -> Generator[str, None, None]:
"""Request, recover token, and acknowledge token once used.
Args:
Expand Down Expand Up @@ -258,17 +278,20 @@ def __init__(self, charm: K8sCharmProtocol, node_name: str, api_manager: K8sdAPI
TokenStrategy.COS: CosTokenManager(api_manager),
}

def _get_juju_secret(self, relation: ops.Relation, unit: ops.Unit) -> Optional[str]:
def _get_juju_secret(self, relation: ops.Relation, unit: ops.Unit) -> Optional[ops.Secret]:
"""Lookup juju secret offered to a unit on this relation.
Args:
relation (ops.Relation): Which relation (cluster or k8s-cluster)
unit (ops.Unit): The unit the secret is intended for
Returns:
secret_id (None | str) if on the relation
secret_id (None | ops.Secret) if on the relation
"""
return relation.data[self.charm.unit].get(SECRET_ID.format(unit.name))
secret_id = SECRET_ID.format(unit.name)
if juju_secret := relation.data[self.charm.unit].get(secret_id):
return self.charm.model.get_secret(id=juju_secret)
return None

def _revoke_juju_secret(self, relation: ops.Relation, unit: ops.Unit) -> None:
"""Revoke and remove juju secret offered to a unit on this relation.
Expand Down Expand Up @@ -489,7 +512,9 @@ def revoke_tokens(
ignore_errors |= state == "pending" # on pending tokens
# if cluster doesn't match
ignore_errors |= self.charm.get_cluster_name() != joined_cluster(relation, unit)
self.token_strategies[token_strategy].revoke(node, ignore_errors)
self.token_strategies[token_strategy].revoke(
node, self._get_juju_secret(relation, unit), ignore_errors
)
self.drop_node(relation, unit)
self._revoke_juju_secret(relation, unit)

Expand Down
Loading

0 comments on commit 4733368

Please sign in to comment.