Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DPE-5218] Implement restore flow #108

Merged
merged 4 commits into from
Oct 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions actions.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,10 @@ create-backup:

list-backups:
description: List database backups. S3 credentials are retrieved from a relation with the S3 integrator charm.

restore:
description: Restore a database backup. S3 credentials are retrieved from a relation with the S3 integrator charm.
params:
backup-id:
type: string
description: A backup-id to identify the backup to restore. Format of <%Y-%m-%dT%H:%M:%SZ>
16 changes: 15 additions & 1 deletion src/charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ def _on_install(self, event: InstallEvent) -> None:

self.unit.set_workload_version(self.workload.get_version())

def _on_cluster_relation_changed(self, event: EventBase) -> None:
def _on_cluster_relation_changed(self, event: EventBase) -> None: # noqa: C901
"""Generic handler for all 'something changed, update' events across all relations."""
# NOTE: k8s specific check, the container needs to be available before moving on
if not self.workload.container_can_connect:
Expand All @@ -201,6 +201,11 @@ def _on_cluster_relation_changed(self, event: EventBase) -> None:
self._set_status(Status.NO_PEER_RELATION)
return

if self.state.cluster.is_restore_in_progress:
# Ongoing backup restore, we can early return here since the
# chain of events is only relevant to the backup event handler
return

# don't want to prematurely set config using outdated/missing relation data
# also skip update-status overriding statues during upgrades
if not self.upgrade_events.idle:
Expand Down Expand Up @@ -354,6 +359,7 @@ def init_server(self):
logger.debug("setting properties and jaas")
self.config_manager.set_zookeeper_properties()
self.config_manager.set_jaas_config()
self.config_manager.set_client_jaas_config()

# during pod-reschedules (e.g upgrades or otherwise) we lose all files
# need to manually add-back key/truststores
Expand Down Expand Up @@ -438,6 +444,14 @@ def update_quorum(self, event: EventBase) -> None:

self.update_client_data()

def disconnect_clients(self) -> None:
"""Remove a necessary part of the client databag, acting as a logical disconnect."""
if not self.unit.is_leader():
return

for client in self.state.clients:
client.update({"endpoints": ""})

def update_client_data(self) -> None:
"""Writes necessary relation data to all related applications."""
if not self.unit.is_leader():
Expand Down
9 changes: 9 additions & 0 deletions src/core/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,9 @@ def stable(self) -> Status:
if self.stale_quorum:
return Status.STALE_QUORUM

if self.cluster.is_restore_in_progress:
return Status.ONGOING_RESTORE

if not self.all_servers_added:
return Status.NOT_ALL_ADDED

Expand All @@ -372,3 +375,9 @@ def ready(self) -> Status:
return Status.ALL_UNIFIED

return self.stable

@property
def is_next_restore_step_possible(self) -> bool:
"""Are all units done with the current restore instruction?"""
current_instruction = self.cluster.restore_instruction
return all((unit.restore_progress is current_instruction for unit in self.servers))
22 changes: 21 additions & 1 deletion src/core/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from ops.model import Application, Relation, Unit
from typing_extensions import deprecated, override

from core.stubs import S3ConnectionInfo
from core.stubs import RestoreStep, S3ConnectionInfo
from literals import CHARM_USERS, CLIENT_PORT, ELECTION_PORT, SECRETS_APP, SERVER_PORT

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -289,6 +289,21 @@ def s3_credentials(self) -> S3ConnectionInfo:
# This is checked in events.backup actions
return json.loads(self.relation_data.get("s3-credentials", "{}"))

@property
def id_to_restore(self) -> str:
"""Backup id to restore."""
return self.relation_data.get("id-to-restore", "")

@property
def restore_instruction(self) -> RestoreStep:
"""Current restore flow step to go through."""
return RestoreStep(self.relation_data.get("restore-instruction", ""))

@property
def is_restore_in_progress(self) -> bool:
"""Is the cluster undergoing a restore?"""
return bool(self.id_to_restore)


class ZKServer(RelationState):
"""State collection metadata for a charm unit."""
Expand Down Expand Up @@ -440,3 +455,8 @@ def sans(self) -> dict[str, list[str]]:
"sans_ip": [self.ip],
"sans_dns": [self.hostname, self.fqdn],
}

@property
def restore_progress(self) -> RestoreStep:
"""Latest restore flow step the unit went through."""
return RestoreStep(self.relation_data.get("restore-progress", ""))
25 changes: 25 additions & 0 deletions src/core/stubs.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
# See LICENSE file for licensing details.

"""Types module."""
from enum import Enum
from typing import TypedDict

S3ConnectionInfo = TypedDict(
Expand All @@ -19,3 +20,27 @@


BackupMetadata = TypedDict("BackupMetadata", {"id": str, "log-sequence-number": int, "path": str})


class RestoreStep(str, Enum):
"""Represent restore flow step."""

NOT_STARTED = ""
STOP_WORKFLOW = "stop"
RESTORE = "restore"
RESTART = "restart"
CLEAN = "clean"

def next_step(self) -> "RestoreStep":
"""Get the next logical restore flow step."""
match self:
case RestoreStep.NOT_STARTED:
return RestoreStep.STOP_WORKFLOW
case RestoreStep.STOP_WORKFLOW:
return RestoreStep.RESTORE
case RestoreStep.RESTORE:
return RestoreStep.RESTART
case RestoreStep.RESTART:
return RestoreStep.CLEAN
case RestoreStep.CLEAN:
return RestoreStep.NOT_STARTED
8 changes: 8 additions & 0 deletions src/core/workload.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,14 @@ def jaas(self) -> str:
"""
return f"{self.conf_path}/zookeeper-jaas.cfg"

@property
def client_jaas(self) -> str:
"""The client-jaas.cfg filepath.

Contains internal user credentials used in SASL auth.
"""
return f"{self.conf_path}/client-jaas.cfg"

@property
def jmx_prometheus_javaagent(self) -> str:
"""The JMX exporter JAR filepath.
Expand Down
141 changes: 135 additions & 6 deletions src/events/backup.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,14 @@
CredentialsGoneEvent,
S3Requirer,
)
from ops import ActionEvent
from ops import (
ActionEvent,
RelationEvent,
)
from ops.framework import Object
from tenacity import retry, retry_if_result, stop_after_attempt, wait_fixed

from core.stubs import S3ConnectionInfo
from core.stubs import RestoreStep, S3ConnectionInfo
from literals import S3_BACKUPS_PATH, S3_REL_NAME, Status
from managers.backup import BackupManager

Expand All @@ -41,7 +45,11 @@ def __init__(self, charm):

self.framework.observe(self.charm.on.create_backup_action, self._on_create_backup_action)
self.framework.observe(self.charm.on.list_backups_action, self._on_list_backups_action)
# self.framework.observe(self.charm.on.restore_action, self._on_restore_action)
self.framework.observe(self.charm.on.restore_action, self._on_restore_action)

self.framework.observe(
getattr(self.charm.on, "cluster_relation_changed"), self._restore_event_dispatch
)

def _on_s3_credentials_changed(self, event: CredentialsChangedEvent):
if not self.charm.unit.is_leader():
Expand Down Expand Up @@ -137,6 +145,127 @@ def _on_list_backups_action(self, event: ActionEvent):
event.log(output)
event.set_results({"backups": json.dumps(backups_metadata)})

def _on_restore_action(self, _):
# TODO
pass
def _on_restore_action(self, event: ActionEvent):
"""Restore a snapshot referenced by its id.

Steps:
- stop client traffic
- stop all units
- backup local state so that we can rollback if anything goes wrong (manual op)
- wipe data folders
- get snapshot from object storage, save in data folder
- restart units
- cleanup leftover files
- notify clients
"""
id_to_restore = event.params.get("backup-id", "")
failure_conditions = [
(
lambda: not self.charm.unit.is_leader(),
"Action must be ran on the application leader",
),
(
lambda: not self.charm.state.cluster.s3_credentials,
"Cluster needs an access to an object storage to make a backup",
),
(
lambda: not id_to_restore,
"No backup id to restore provided",
),
(
lambda: not self.backup_manager.is_snapshot_in_bucket(id_to_restore),
"Backup id not found in storage object",
),
(
lambda: bool(self.charm.state.cluster.is_restore_in_progress),
"A snapshot restore is currently ongoing",
),
]

for check, msg in failure_conditions:
if check():
logging.error(msg)
event.set_results({"error": msg})
event.fail(msg)
return

self.charm.state.cluster.update(
{
"id-to-restore": id_to_restore,
"restore-instruction": RestoreStep.NOT_STARTED.value,
}
)
self.charm.disconnect_clients()

event.log(f"Beginning restore flow for snapshot {id_to_restore}")

def _restore_event_dispatch(self, event: RelationEvent):
"""Dispatch restore event to the proper method."""
if not self.charm.state.cluster.is_restore_in_progress:
if self.charm.state.unit_server.restore_progress is not RestoreStep.NOT_STARTED:
self.charm.state.unit_server.update(
{"restore-progress": RestoreStep.NOT_STARTED.value}
)
self.charm._set_status(self.charm.state.ready)
return

if self.charm.unit.is_leader():
self._maybe_progress_step()

match self.charm.state.cluster.restore_instruction, self.charm.state.unit_server.restore_progress:
case RestoreStep.STOP_WORKFLOW, RestoreStep.NOT_STARTED:
self._stop_workflow()
case RestoreStep.RESTORE, RestoreStep.STOP_WORKFLOW:
self._download_and_restore()
case RestoreStep.RESTART, RestoreStep.RESTORE:
self._restart_workflow()
case RestoreStep.CLEAN, RestoreStep.RESTART:
self._cleaning()
case _:
pass

def _maybe_progress_step(self):
"""Check that all units are done with the current instruction and move to the next if applicable."""
current_instruction = self.charm.state.cluster.restore_instruction
next_instruction = current_instruction.next_step()

if self.charm.state.is_next_restore_step_possible:
payload = {"restore-instruction": next_instruction.value}
if current_instruction is RestoreStep.CLEAN:
payload = payload | {"id-to-restore": "", "to_restore": ""}
# Update ACLs for already related clients and trigger a relation-changed
# on their side to enable them to reconnect.
self.charm.update_client_data()
self.charm.quorum_manager.update_acls()

self.charm.state.cluster.update(payload)

def _stop_workflow(self) -> None:
self.charm._set_status(Status.ONGOING_RESTORE)
logger.info("Restoring - stopping workflow")
self.charm.workload.stop()
self.charm.state.unit_server.update({"restore-progress": RestoreStep.STOP_WORKFLOW.value})

def _download_and_restore(self) -> None:
logger.info("Restoring - restore snapshot")
self.backup_manager.restore_snapshot(
self.charm.state.cluster.id_to_restore, self.charm.workload
)
self.charm.state.unit_server.update({"restore-progress": RestoreStep.RESTORE.value})

def _restart_workflow(self) -> None:
logger.info("Restoring - restarting workflow")
self.charm.workload.restart()
self.charm.state.unit_server.update({"restore-progress": RestoreStep.RESTART.value})

@retry(
wait=wait_fixed(5),
stop=stop_after_attempt(3),
retry=retry_if_result(lambda res: res is False),
)
def _cleaning(self) -> bool | None:
if not self.charm.workload.healthy:
return False
logger.info("Restoring - cleaning files")
self.backup_manager.cleanup_leftover_files(self.charm.workload)
self.charm.state.unit_server.update({"restore-progress": RestoreStep.CLEAN.value})
2 changes: 1 addition & 1 deletion src/events/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def _on_client_relation_updated(self, event: RelationEvent) -> None:
Future `client_relation_changed` events called on non-leader units checks passwords before
restarting.
"""
if not self.charm.unit.is_leader():
if not self.charm.unit.is_leader() or self.charm.state.cluster.is_restore_in_progress:
return

if not self.charm.state.stable:
Expand Down
1 change: 1 addition & 0 deletions src/literals.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ class Status(Enum):
BlockedStatus("invalid s3 configuration - missing mandatory parameters"), "ERROR"
)
BUCKET_NOT_CREATED = StatusLevel(BlockedStatus("cannot create s3 bucket"), "ERROR")
ONGOING_RESTORE = StatusLevel(MaintenanceStatus("restoring backup"), "INFO")


SECRETS_APP = ["sync-password", "super-password"]
Expand Down
Loading
Loading