From 2af1298cd9b978e840a8aeab554f8bab29170f29 Mon Sep 17 00:00:00 2001 From: phvalguima Date: Wed, 8 May 2024 20:37:35 +0200 Subject: [PATCH] [DPE-4302] manual large deployments upgrade (#302) The goal of this PR is to test the manual steps to run a large deployments upgrade. Moves some generic opensearch_backup.py methods to the base class, so any class can correctly assess if there is a backup/restore in progress or not. --------- Co-authored-by: Mehdi Bendriss Co-authored-by: Carl Csaposs --- .../opensearch/v0/opensearch_backups.py | 296 +++++++++--------- .../opensearch/v0/opensearch_base_charm.py | 3 +- src/machine_upgrade.py | 2 + .../test_manual_large_deployment_upgrades.py | 220 +++++++++++++ 4 files changed, 366 insertions(+), 155 deletions(-) create mode 100644 tests/integration/test_manual_large_deployment_upgrades.py diff --git a/lib/charms/opensearch/v0/opensearch_backups.py b/lib/charms/opensearch/v0/opensearch_backups.py index 4da8ea29d..da5972e16 100644 --- a/lib/charms/opensearch/v0/opensearch_backups.py +++ b/lib/charms/opensearch/v0/opensearch_backups.py @@ -225,6 +225,144 @@ def _on_s3_relation_action(self, event: EventBase) -> None: logger.info("Deployment description not yet available, failing actions.") event.fail("Failed: deployment description not yet available") + def _request(self, *args, **kwargs) -> dict[str, Any] | None: + """Returns the output of OpenSearchDistribution.request() or throws an error. + + Request method can return one of many: Union[Dict[str, any], List[any], int] + and raise multiple types of errors. + + If int is returned, then throws an exception informing the HTTP request failed. + If the request fails, returns the error text or None if only status code is found. + + Raises: + - ValueError + """ + if "retries" not in kwargs.keys(): + kwargs["retries"] = 6 + if "timeout" not in kwargs.keys(): + kwargs["timeout"] = 10 + # We are interested to see the entire response + kwargs["resp_status_code"] = False + try: + result = self.charm.opensearch.request(*args, **kwargs) + except OpenSearchHttpError as e: + return e.response_body if e.response_body else None + return result if isinstance(result, dict) else None + + def _is_restore_in_progress(self) -> bool: + """Checks if the restore is currently in progress. + + Two options: + 1) no restore requested: return False + 2) check for each index shard: for all type=SNAPSHOT and stage=DONE, return False. + """ + indices_status = self._request("GET", "/_recovery?human") or {} + for info in indices_status.values(): + # Now, check the status of each shard + for shard in info["shards"]: + if shard["type"] == "SNAPSHOT" and shard["stage"] != "DONE": + return True + return False + + def is_backup_in_progress(self) -> bool: + """Returns True if backup is in progress, False otherwise. + + We filter the _query_backup_status() and seek for the following states: + - SNAPSHOT_IN_PROGRESS + """ + if self._query_backup_status() in [ + BackupServiceState.SNAPSHOT_IN_PROGRESS, + BackupServiceState.RESPONSE_FAILED_NETWORK, + ]: + # We have a backup in progress or we cannot reach the API + # taking the "safe path" of informing a backup is in progress + return True + return False + + def _query_backup_status(self, backup_id: Optional[str] = None) -> BackupServiceState: + try: + for attempt in Retrying(stop=stop_after_attempt(5), wait=wait_fixed(5)): + with attempt: + target = f"_snapshot/{S3_REPOSITORY}/" + target += f"{backup_id.lower()}" if backup_id else "_all" + output = self._request("GET", target) + logger.debug(f"Backup status: {output}") + except RetryError as e: + logger.error(f"_request failed with: {e}") + return BackupServiceState.RESPONSE_FAILED_NETWORK + return self.get_service_status(output) + + def get_service_status( # noqa: C901 + self, response: dict[str, Any] | None + ) -> BackupServiceState: + """Returns the response status in a Enum. + + Based on: + https://github.com/opensearch-project/OpenSearch/blob/ + ba78d93acf1da6dae16952d8978de87cb4df2c61/ + server/src/main/java/org/opensearch/OpenSearchServerException.java#L837 + https://github.com/opensearch-project/OpenSearch/blob/ + ba78d93acf1da6dae16952d8978de87cb4df2c61/ + plugins/repository-s3/src/yamlRestTest/resources/rest-api-spec/test/repository_s3/40_repository_ec2_credentials.yml + """ + if not response: + return BackupServiceState.SNAPSHOT_FAILED_UNKNOWN + + try: + if "error" not in response: + return BackupServiceState.SUCCESS + type = response["error"]["root_cause"][0]["type"] + reason = response["error"]["root_cause"][0]["reason"] + except KeyError as e: + logger.exception(e) + logger.error("response contained unknown error code") + return BackupServiceState.RESPONSE_FAILED_NETWORK + # Check if we error'ed b/c s3 repo is not configured, hence we are still + # waiting for the plugin to be configured + if type == "repository_exception" and REPO_NOT_CREATED_ERR in reason: + return BackupServiceState.REPO_NOT_CREATED + if type == "repository_exception" and REPO_CREATING_ERR in reason: + return BackupServiceState.REPO_CREATION_ERR + if type == "repository_exception": + return BackupServiceState.REPO_ERR_UNKNOWN + if type == "repository_missing_exception": + return BackupServiceState.REPO_MISSING + if type == "repository_verification_exception" and REPO_NOT_ACCESS_ERR in reason: + return BackupServiceState.REPO_S3_UNREACHABLE + if type == "illegal_argument_exception": + return BackupServiceState.ILLEGAL_ARGUMENT + if type == "snapshot_missing_exception": + return BackupServiceState.SNAPSHOT_MISSING + if type == "snapshot_restore_exception" and RESTORE_OPEN_INDEX_WITH_SAME_NAME in reason: + return BackupServiceState.SNAPSHOT_RESTORE_ERROR_INDEX_NOT_CLOSED + if type == "snapshot_restore_exception": + return BackupServiceState.SNAPSHOT_RESTORE_ERROR + return self.get_snapshot_status(response) + + def get_snapshot_status(self, response: Dict[str, Any] | None) -> BackupServiceState: + """Returns the snapshot status.""" + if not response: + return BackupServiceState.SNAPSHOT_FAILED_UNKNOWN + # Now, check snapshot status: + r_str = str(response) + if "IN_PROGRESS" in r_str: + return BackupServiceState.SNAPSHOT_IN_PROGRESS + if "PARTIAL" in r_str: + return BackupServiceState.SNAPSHOT_PARTIALLY_TAKEN + if "INCOMPATIBLE" in r_str: + return BackupServiceState.SNAPSHOT_INCOMPATIBILITY + if "FAILED" in r_str: + return BackupServiceState.SNAPSHOT_FAILED_UNKNOWN + return BackupServiceState.SUCCESS + + def is_idle_or_not_set(self) -> bool: + """Checks if the backup system is idle or not yet configured. + + "idle": configured but there are no backups nor restores in progress. + "not_set": set by the children classes + """ + return not (self.is_backup_in_progress() or self._is_restore_in_progress()) + class OpenSearchNonOrchestratorClusterBackup(OpenSearchBackupBase): """Simpler implementation of backup relation for non-orchestrator clusters. @@ -311,14 +449,6 @@ def _on_s3_relation_action(self, event: EventBase) -> None: """Deployment description available, non-orchestrator, fail any actions.""" event.fail("Failed: execute the action on the orchestrator cluster instead.") - def is_idle_or_not_set(self) -> bool: - """Checks if the backup system is idle or not yet configured. - - "idle": configured but there are no backups nor restores in progress. - "not_set": set by the children classes - """ - return not (self.is_backup_in_progress() or self._is_restore_in_progress()) - def _is_restore_in_progress(self) -> bool: """Checks if the restore is currently in progress. @@ -344,25 +474,6 @@ def _is_restore_in_progress(self) -> bool: return True return False - def is_backup_in_progress(self) -> bool: - """Returns True if backup is in progress, False otherwise. - - We filter the _query_backup_status() and seek for the following states: - - SNAPSHOT_IN_PROGRESS - """ - try: - output = self.charm.opensearch.request("GET", f"_snapshot/{S3_REPOSITORY}/_all") - # Simpler check, as we are not interested if a backup is in progress only - return BackupServiceState.SNAPSHOT_IN_PROGRESS in str(output) - except OpenSearchHttpError: - # Defaults to True if we have a failure, to avoid any actions due to - # intermittent connection issues. - logger.warning( - "is_backup_in_progress: failed to get snapshots status" - " - assuming backup is in progress" - ) - return True - class OpenSearchBackup(OpenSearchBackupBase): """Implements backup relation and API management.""" @@ -542,21 +653,6 @@ def is_idle_or_not_set(self) -> bool: BackupServiceState.REPO_MISSING, ] or not (self.is_backup_in_progress() or self._is_restore_in_progress()) - def _is_restore_in_progress(self) -> bool: - """Checks if the restore is currently in progress. - - Two options: - 1) no restore requested: return False - 2) check for each index shard: for all type=SNAPSHOT and stage=DONE, return False. - """ - indices_status = self._request("GET", "/_recovery?human") or {} - for info in indices_status.values(): - # Now, check the status of each shard - for shard in info["shards"]: - if shard["type"] == "SNAPSHOT" and shard["stage"] != "DONE": - return True - return False - def _is_restore_complete(self) -> bool: """Checks if the restore is finished. @@ -717,34 +813,6 @@ def _list_backups(self) -> Dict[int, str]: for snapshot in response.get("snapshots", []) } - def is_backup_in_progress(self) -> bool: - """Returns True if backup is in progress, False otherwise. - - We filter the _query_backup_status() and seek for the following states: - - SNAPSHOT_IN_PROGRESS - """ - if self._query_backup_status() in [ - BackupServiceState.SNAPSHOT_IN_PROGRESS, - BackupServiceState.RESPONSE_FAILED_NETWORK, - ]: - # We have a backup in progress or we cannot reach the API - # taking the "safe path" of informing a backup is in progress - return True - return False - - def _query_backup_status(self, backup_id: Optional[str] = None) -> BackupServiceState: - try: - for attempt in Retrying(stop=stop_after_attempt(5), wait=wait_fixed(5)): - with attempt: - target = f"_snapshot/{S3_REPOSITORY}/" - target += f"{backup_id.lower()}" if backup_id else "_all" - output = self._request("GET", target) - logger.debug(f"Backup status: {output}") - except RetryError as e: - logger.error(f"_request failed with: {e}") - return BackupServiceState.RESPONSE_FAILED_NETWORK - return self.get_service_status(output) - def _on_s3_credentials_changed(self, event: EventBase) -> None: # noqa: C901 """Calls the plugin manager config handler. @@ -977,75 +1045,12 @@ def can_use_s3_repository(self) -> bool: return False return True - def _request(self, *args, **kwargs) -> dict[str, Any] | None: - """Returns the output of OpenSearchDistribution.request() or throws an error. - - Request method can return one of many: Union[Dict[str, any], List[any], int] - and raise multiple types of errors. - - If int is returned, then throws an exception informing the HTTP request failed. - If the request fails, returns the error text or None if only status code is found. - - Raises: - - ValueError - """ - if "retries" not in kwargs.keys(): - kwargs["retries"] = 6 - if "timeout" not in kwargs.keys(): - kwargs["timeout"] = 10 - # We are interested to see the entire response - kwargs["resp_status_code"] = False - try: - result = self.charm.opensearch.request(*args, **kwargs) - except OpenSearchHttpError as e: - return e.response_body if e.response_body else None - return result if isinstance(result, dict) else None - def get_service_status( # noqa: C901 self, response: dict[str, Any] | None ) -> BackupServiceState: - """Returns the response status in a Enum. - - Based on: - https://github.com/opensearch-project/OpenSearch/blob/ - ba78d93acf1da6dae16952d8978de87cb4df2c61/ - server/src/main/java/org/opensearch/OpenSearchServerException.java#L837 - https://github.com/opensearch-project/OpenSearch/blob/ - ba78d93acf1da6dae16952d8978de87cb4df2c61/ - plugins/repository-s3/src/yamlRestTest/resources/rest-api-spec/test/repository_s3/40_repository_ec2_credentials.yml - """ - if not response: - return BackupServiceState.SNAPSHOT_FAILED_UNKNOWN - - try: - if "error" not in response: - return BackupServiceState.SUCCESS - type = response["error"]["root_cause"][0]["type"] - reason = response["error"]["root_cause"][0]["reason"] - except KeyError as e: - logger.exception(e) - logger.error("response contained unknown error code") - return BackupServiceState.RESPONSE_FAILED_NETWORK - # Check if we error'ed b/c s3 repo is not configured, hence we are still - # waiting for the plugin to be configured - if type == "repository_exception" and REPO_NOT_CREATED_ERR in reason: - return BackupServiceState.REPO_NOT_CREATED - if type == "repository_exception" and REPO_CREATING_ERR in reason: - return BackupServiceState.REPO_CREATION_ERR - if type == "repository_exception": - return BackupServiceState.REPO_ERR_UNKNOWN - if type == "repository_missing_exception": - return BackupServiceState.REPO_MISSING - if type == "repository_verification_exception" and REPO_NOT_ACCESS_ERR in reason: - return BackupServiceState.REPO_S3_UNREACHABLE - if type == "illegal_argument_exception": - return BackupServiceState.ILLEGAL_ARGUMENT - if type == "snapshot_missing_exception": - return BackupServiceState.SNAPSHOT_MISSING - if type == "snapshot_restore_exception" and RESTORE_OPEN_INDEX_WITH_SAME_NAME in reason: - return BackupServiceState.SNAPSHOT_RESTORE_ERROR_INDEX_NOT_CLOSED - if type == "snapshot_restore_exception": - return BackupServiceState.SNAPSHOT_RESTORE_ERROR + """Returns the response status in a Enum.""" + if (status := super().get_service_status(response)) == BackupServiceState.SUCCESS: + return BackupServiceState.SUCCESS if ( "bucket" in self.s3_client.get_s3_connection_info() and S3_REPOSITORY in response @@ -1054,24 +1059,7 @@ def get_service_status( # noqa: C901 == response[S3_REPOSITORY]["settings"]["bucket"] ): return BackupServiceState.REPO_NOT_CREATED_ALREADY_EXISTS - # Ensure this is not containing any information about snapshots, return SUCCESS - return self.get_snapshot_status(response) - - def get_snapshot_status(self, response: Dict[str, Any] | None) -> BackupServiceState: - """Returns the snapshot status.""" - if not response: - return BackupServiceState.SNAPSHOT_FAILED_UNKNOWN - # Now, check snapshot status: - r_str = str(response) - if "IN_PROGRESS" in r_str: - return BackupServiceState.SNAPSHOT_IN_PROGRESS - if "PARTIAL" in r_str: - return BackupServiceState.SNAPSHOT_PARTIALLY_TAKEN - if "INCOMPATIBLE" in r_str: - return BackupServiceState.SNAPSHOT_INCOMPATIBILITY - if "FAILED" in r_str: - return BackupServiceState.SNAPSHOT_FAILED_UNKNOWN - return BackupServiceState.SUCCESS + return status def backup(charm: CharmBase) -> OpenSearchBackupBase: diff --git a/lib/charms/opensearch/v0/opensearch_base_charm.py b/lib/charms/opensearch/v0/opensearch_base_charm.py index b915a4a67..990032d4d 100644 --- a/lib/charms/opensearch/v0/opensearch_base_charm.py +++ b/lib/charms/opensearch/v0/opensearch_base_charm.py @@ -982,7 +982,7 @@ def _post_start_init(self, event: _StartOpenSearch): # noqa: C901 self.status.clear(WaitingToStart) if event.after_upgrade: - health = self.health.apply(wait_for_green_first=True, app=False) + health = self.health.get(local_app_only=False, wait_for_green_first=True) self.health.apply_for_unit_during_upgrade(health) # Cluster is considered healthy if green or yellow @@ -1029,6 +1029,7 @@ def _post_start_init(self, event: _StartOpenSearch): # noqa: C901 self.health.apply() self._upgrade.unit_state = upgrade.UnitState.HEALTHY + logger.debug("Set upgrade unit state to healthy") self._reconcile_upgrade() # update the peer cluster rel data with new IP in case of main cluster manager diff --git a/src/machine_upgrade.py b/src/machine_upgrade.py index 06218bdef..56f115457 100644 --- a/src/machine_upgrade.py +++ b/src/machine_upgrade.py @@ -179,7 +179,9 @@ def authorized(self) -> bool: or state is not upgrade.UnitState.HEALTHY ): # Waiting for higher number units to upgrade + logger.debug(f"Upgrade not authorized. Waiting for {unit.name=} to upgrade") return False + logger.debug(f"Upgrade not authorized. Waiting for {unit.name=} to upgrade") return False def upgrade_unit(self, *, snap: OpenSearchSnap) -> None: diff --git a/tests/integration/test_manual_large_deployment_upgrades.py b/tests/integration/test_manual_large_deployment_upgrades.py new file mode 100644 index 000000000..399ac64a5 --- /dev/null +++ b/tests/integration/test_manual_large_deployment_upgrades.py @@ -0,0 +1,220 @@ +#!/usr/bin/env python3 +# Copyright 2023 Canonical Ltd. +# See LICENSE file for licensing details. + +import asyncio +import logging + +import pytest +from pytest_operator.plugin import OpsTest + +from .ha.continuous_writes import ContinuousWrites +from .ha.helpers import app_name, assert_continuous_writes_consistency +from .ha.test_horizontal_scaling import IDLE_PERIOD +from .helpers import APP_NAME, MODEL_CONFIG, SERIES, run_action +from .helpers_deployments import get_application_units, wait_until +from .tls.test_tls import TLS_CERTIFICATES_APP_NAME + +logger = logging.getLogger(__name__) + + +OPENSEARCH_ORIGINAL_CHARM_NAME = "pguimaraes-opensearch-upgrade-test" +OPENSEARCH_INITIAL_CHANNEL = "latest/edge" +OPENSEARCH_MAIN_APP_NAME = "main" +OPENSEARCH_FAILOVER_APP_NAME = "failover" + + +charm = None + + +WORKLOAD = { + APP_NAME: 3, + OPENSEARCH_FAILOVER_APP_NAME: 2, + OPENSEARCH_MAIN_APP_NAME: 1, +} + + +@pytest.fixture() +async def c_writes(ops_test: OpsTest): + """Creates instance of the ContinuousWrites.""" + app = (await app_name(ops_test)) or APP_NAME + return ContinuousWrites(ops_test, app) + + +@pytest.fixture() +async def c_writes_runner(ops_test: OpsTest, c_writes: ContinuousWrites): + """Starts continuous write operations and clears writes at the end of the test.""" + await c_writes.start() + yield + await c_writes.clear() + logger.info("\n\n\n\nThe writes have been cleared.\n\n\n\n") + + +@pytest.mark.runner(["self-hosted", "linux", "X64", "jammy", "xlarge"]) +@pytest.mark.group(1) +@pytest.mark.abort_on_fail +@pytest.mark.skip_if_deployed +async def test_large_deployment_deploy_original_charm(ops_test: OpsTest) -> None: + """Build and deploy the charm for large deployment tests.""" + await ops_test.model.set_config(MODEL_CONFIG) + # Deploy TLS Certificates operator. + tls_config = {"ca-common-name": "CN_CA"} + + main_orchestrator_conf = { + "cluster_name": "backup-test", + "init_hold": False, + "roles": "cluster_manager", + } + failover_orchestrator_conf = { + "cluster_name": "backup-test", + "init_hold": True, + "roles": "cluster_manager", + } + data_hot_conf = {"cluster_name": "backup-test", "init_hold": True, "roles": "data.hot"} + + await asyncio.gather( + ops_test.model.deploy(TLS_CERTIFICATES_APP_NAME, channel="stable", config=tls_config), + ops_test.model.deploy( + OPENSEARCH_ORIGINAL_CHARM_NAME, + application_name=OPENSEARCH_MAIN_APP_NAME, + num_units=WORKLOAD[OPENSEARCH_MAIN_APP_NAME], + series=SERIES, + channel=OPENSEARCH_INITIAL_CHANNEL, + config=main_orchestrator_conf, + ), + ops_test.model.deploy( + OPENSEARCH_ORIGINAL_CHARM_NAME, + application_name=OPENSEARCH_FAILOVER_APP_NAME, + num_units=WORKLOAD[OPENSEARCH_FAILOVER_APP_NAME], + series=SERIES, + channel=OPENSEARCH_INITIAL_CHANNEL, + config=failover_orchestrator_conf, + ), + ops_test.model.deploy( + OPENSEARCH_ORIGINAL_CHARM_NAME, + application_name=APP_NAME, + num_units=WORKLOAD[APP_NAME], + series=SERIES, + channel=OPENSEARCH_INITIAL_CHANNEL, + config=data_hot_conf, + ), + ) + + # Large deployment setup + await ops_test.model.integrate("main:peer-cluster-orchestrator", "failover:peer-cluster") + await ops_test.model.integrate("main:peer-cluster-orchestrator", f"{APP_NAME}:peer-cluster") + await ops_test.model.integrate( + "failover:peer-cluster-orchestrator", f"{APP_NAME}:peer-cluster" + ) + + # TLS setup + await ops_test.model.integrate("main", TLS_CERTIFICATES_APP_NAME) + await ops_test.model.integrate("failover", TLS_CERTIFICATES_APP_NAME) + await ops_test.model.integrate(APP_NAME, TLS_CERTIFICATES_APP_NAME) + + # Charms except s3-integrator should be active + await wait_until( + ops_test, + apps=[ + TLS_CERTIFICATES_APP_NAME, + OPENSEARCH_MAIN_APP_NAME, + OPENSEARCH_FAILOVER_APP_NAME, + APP_NAME, + ], + apps_statuses=["active"], + units_statuses=["active"], + wait_for_exact_units={ + TLS_CERTIFICATES_APP_NAME: 1, + OPENSEARCH_MAIN_APP_NAME: WORKLOAD[OPENSEARCH_MAIN_APP_NAME], + OPENSEARCH_FAILOVER_APP_NAME: WORKLOAD[OPENSEARCH_FAILOVER_APP_NAME], + APP_NAME: WORKLOAD[APP_NAME], + }, + idle_period=IDLE_PERIOD, + timeout=3600, + ) + + +@pytest.mark.runner(["self-hosted", "linux", "X64", "jammy", "xlarge"]) +@pytest.mark.group(1) +@pytest.mark.abort_on_fail +async def test_manually_upgrade_to_local( + ops_test: OpsTest, c_writes: ContinuousWrites, c_writes_runner +) -> None: + """Test upgrade from usptream to currently locally built version.""" + units = await get_application_units(ops_test, OPENSEARCH_MAIN_APP_NAME) + leader_id = [u.id for u in units if u.is_leader][0] + + action = await run_action( + ops_test, + leader_id, + "pre-upgrade-check", + app=OPENSEARCH_MAIN_APP_NAME, + ) + assert action.status == "completed" + + logger.info("Build charm locally") + global charm + if not charm: + charm = await ops_test.build_charm(".") + + async with ops_test.fast_forward(): + for app, unit_count in WORKLOAD.items(): + application = ops_test.model.applications[app] + units = await get_application_units(ops_test, app) + leader_id = [u.id for u in units if u.is_leader][0] + + logger.info(f"Refresh app {app}, leader {leader_id}") + + await application.refresh(path=charm) + logger.info("Refresh is over, waiting for the charm to settle") + + if unit_count == 1: + # Upgrade already happened for this unit, wait for idle and continue + await wait_until( + ops_test, + apps=[app], + apps_statuses=["active"], + units_statuses=["active"], + idle_period=IDLE_PERIOD, + timeout=3600, + ) + logger.info(f"Upgrade of app {app} finished") + continue + + await wait_until( + ops_test, + apps=[app], + apps_statuses=["blocked"], + units_statuses=["active"], + wait_for_exact_units={ + app: unit_count, + }, + idle_period=120, + timeout=3600, + ) + # Resume the upgrade + action = await run_action( + ops_test, + leader_id, + "resume-upgrade", + app=app, + ) + assert action.status == "completed" + logger.info(f"resume-upgrade: {action}") + + await wait_until( + ops_test, + apps=[app], + apps_statuses=["active"], + units_statuses=["active"], + idle_period=IDLE_PERIOD, + timeout=3600, + ) + logger.info(f"Upgrade of app {app} finished") + + # continuous writes checks + await assert_continuous_writes_consistency( + ops_test, + c_writes, + [APP_NAME, OPENSEARCH_MAIN_APP_NAME], + )