Skip to content

Commit

Permalink
[DPE-4302] manual large deployments upgrade (#302)
Browse files Browse the repository at this point in the history
The goal of this PR is to test the manual steps to run a large
deployments upgrade.

Moves some generic opensearch_backup.py methods to the base class, so
any class can correctly assess if there is a backup/restore in progress
or not.

---------

Co-authored-by: Mehdi Bendriss <[email protected]>
Co-authored-by: Carl Csaposs <[email protected]>
  • Loading branch information
3 people authored May 8, 2024
1 parent 77da032 commit 2af1298
Show file tree
Hide file tree
Showing 4 changed files with 366 additions and 155 deletions.
296 changes: 142 additions & 154 deletions lib/charms/opensearch/v0/opensearch_backups.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,144 @@ def _on_s3_relation_action(self, event: EventBase) -> None:
logger.info("Deployment description not yet available, failing actions.")
event.fail("Failed: deployment description not yet available")

def _request(self, *args, **kwargs) -> dict[str, Any] | None:
"""Returns the output of OpenSearchDistribution.request() or throws an error.
Request method can return one of many: Union[Dict[str, any], List[any], int]
and raise multiple types of errors.
If int is returned, then throws an exception informing the HTTP request failed.
If the request fails, returns the error text or None if only status code is found.
Raises:
- ValueError
"""
if "retries" not in kwargs.keys():
kwargs["retries"] = 6
if "timeout" not in kwargs.keys():
kwargs["timeout"] = 10
# We are interested to see the entire response
kwargs["resp_status_code"] = False
try:
result = self.charm.opensearch.request(*args, **kwargs)
except OpenSearchHttpError as e:
return e.response_body if e.response_body else None
return result if isinstance(result, dict) else None

def _is_restore_in_progress(self) -> bool:
"""Checks if the restore is currently in progress.
Two options:
1) no restore requested: return False
2) check for each index shard: for all type=SNAPSHOT and stage=DONE, return False.
"""
indices_status = self._request("GET", "/_recovery?human") or {}
for info in indices_status.values():
# Now, check the status of each shard
for shard in info["shards"]:
if shard["type"] == "SNAPSHOT" and shard["stage"] != "DONE":
return True
return False

def is_backup_in_progress(self) -> bool:
"""Returns True if backup is in progress, False otherwise.
We filter the _query_backup_status() and seek for the following states:
- SNAPSHOT_IN_PROGRESS
"""
if self._query_backup_status() in [
BackupServiceState.SNAPSHOT_IN_PROGRESS,
BackupServiceState.RESPONSE_FAILED_NETWORK,
]:
# We have a backup in progress or we cannot reach the API
# taking the "safe path" of informing a backup is in progress
return True
return False

def _query_backup_status(self, backup_id: Optional[str] = None) -> BackupServiceState:
try:
for attempt in Retrying(stop=stop_after_attempt(5), wait=wait_fixed(5)):
with attempt:
target = f"_snapshot/{S3_REPOSITORY}/"
target += f"{backup_id.lower()}" if backup_id else "_all"
output = self._request("GET", target)
logger.debug(f"Backup status: {output}")
except RetryError as e:
logger.error(f"_request failed with: {e}")
return BackupServiceState.RESPONSE_FAILED_NETWORK
return self.get_service_status(output)

def get_service_status( # noqa: C901
self, response: dict[str, Any] | None
) -> BackupServiceState:
"""Returns the response status in a Enum.
Based on:
https://github.com/opensearch-project/OpenSearch/blob/
ba78d93acf1da6dae16952d8978de87cb4df2c61/
server/src/main/java/org/opensearch/OpenSearchServerException.java#L837
https://github.com/opensearch-project/OpenSearch/blob/
ba78d93acf1da6dae16952d8978de87cb4df2c61/
plugins/repository-s3/src/yamlRestTest/resources/rest-api-spec/test/repository_s3/40_repository_ec2_credentials.yml
"""
if not response:
return BackupServiceState.SNAPSHOT_FAILED_UNKNOWN

try:
if "error" not in response:
return BackupServiceState.SUCCESS
type = response["error"]["root_cause"][0]["type"]
reason = response["error"]["root_cause"][0]["reason"]
except KeyError as e:
logger.exception(e)
logger.error("response contained unknown error code")
return BackupServiceState.RESPONSE_FAILED_NETWORK
# Check if we error'ed b/c s3 repo is not configured, hence we are still
# waiting for the plugin to be configured
if type == "repository_exception" and REPO_NOT_CREATED_ERR in reason:
return BackupServiceState.REPO_NOT_CREATED
if type == "repository_exception" and REPO_CREATING_ERR in reason:
return BackupServiceState.REPO_CREATION_ERR
if type == "repository_exception":
return BackupServiceState.REPO_ERR_UNKNOWN
if type == "repository_missing_exception":
return BackupServiceState.REPO_MISSING
if type == "repository_verification_exception" and REPO_NOT_ACCESS_ERR in reason:
return BackupServiceState.REPO_S3_UNREACHABLE
if type == "illegal_argument_exception":
return BackupServiceState.ILLEGAL_ARGUMENT
if type == "snapshot_missing_exception":
return BackupServiceState.SNAPSHOT_MISSING
if type == "snapshot_restore_exception" and RESTORE_OPEN_INDEX_WITH_SAME_NAME in reason:
return BackupServiceState.SNAPSHOT_RESTORE_ERROR_INDEX_NOT_CLOSED
if type == "snapshot_restore_exception":
return BackupServiceState.SNAPSHOT_RESTORE_ERROR
return self.get_snapshot_status(response)

def get_snapshot_status(self, response: Dict[str, Any] | None) -> BackupServiceState:
"""Returns the snapshot status."""
if not response:
return BackupServiceState.SNAPSHOT_FAILED_UNKNOWN
# Now, check snapshot status:
r_str = str(response)
if "IN_PROGRESS" in r_str:
return BackupServiceState.SNAPSHOT_IN_PROGRESS
if "PARTIAL" in r_str:
return BackupServiceState.SNAPSHOT_PARTIALLY_TAKEN
if "INCOMPATIBLE" in r_str:
return BackupServiceState.SNAPSHOT_INCOMPATIBILITY
if "FAILED" in r_str:
return BackupServiceState.SNAPSHOT_FAILED_UNKNOWN
return BackupServiceState.SUCCESS

def is_idle_or_not_set(self) -> bool:
"""Checks if the backup system is idle or not yet configured.
"idle": configured but there are no backups nor restores in progress.
"not_set": set by the children classes
"""
return not (self.is_backup_in_progress() or self._is_restore_in_progress())


class OpenSearchNonOrchestratorClusterBackup(OpenSearchBackupBase):
"""Simpler implementation of backup relation for non-orchestrator clusters.
Expand Down Expand Up @@ -311,14 +449,6 @@ def _on_s3_relation_action(self, event: EventBase) -> None:
"""Deployment description available, non-orchestrator, fail any actions."""
event.fail("Failed: execute the action on the orchestrator cluster instead.")

def is_idle_or_not_set(self) -> bool:
"""Checks if the backup system is idle or not yet configured.
"idle": configured but there are no backups nor restores in progress.
"not_set": set by the children classes
"""
return not (self.is_backup_in_progress() or self._is_restore_in_progress())

def _is_restore_in_progress(self) -> bool:
"""Checks if the restore is currently in progress.
Expand All @@ -344,25 +474,6 @@ def _is_restore_in_progress(self) -> bool:
return True
return False

def is_backup_in_progress(self) -> bool:
"""Returns True if backup is in progress, False otherwise.
We filter the _query_backup_status() and seek for the following states:
- SNAPSHOT_IN_PROGRESS
"""
try:
output = self.charm.opensearch.request("GET", f"_snapshot/{S3_REPOSITORY}/_all")
# Simpler check, as we are not interested if a backup is in progress only
return BackupServiceState.SNAPSHOT_IN_PROGRESS in str(output)
except OpenSearchHttpError:
# Defaults to True if we have a failure, to avoid any actions due to
# intermittent connection issues.
logger.warning(
"is_backup_in_progress: failed to get snapshots status"
" - assuming backup is in progress"
)
return True


class OpenSearchBackup(OpenSearchBackupBase):
"""Implements backup relation and API management."""
Expand Down Expand Up @@ -542,21 +653,6 @@ def is_idle_or_not_set(self) -> bool:
BackupServiceState.REPO_MISSING,
] or not (self.is_backup_in_progress() or self._is_restore_in_progress())

def _is_restore_in_progress(self) -> bool:
"""Checks if the restore is currently in progress.
Two options:
1) no restore requested: return False
2) check for each index shard: for all type=SNAPSHOT and stage=DONE, return False.
"""
indices_status = self._request("GET", "/_recovery?human") or {}
for info in indices_status.values():
# Now, check the status of each shard
for shard in info["shards"]:
if shard["type"] == "SNAPSHOT" and shard["stage"] != "DONE":
return True
return False

def _is_restore_complete(self) -> bool:
"""Checks if the restore is finished.
Expand Down Expand Up @@ -717,34 +813,6 @@ def _list_backups(self) -> Dict[int, str]:
for snapshot in response.get("snapshots", [])
}

def is_backup_in_progress(self) -> bool:
"""Returns True if backup is in progress, False otherwise.
We filter the _query_backup_status() and seek for the following states:
- SNAPSHOT_IN_PROGRESS
"""
if self._query_backup_status() in [
BackupServiceState.SNAPSHOT_IN_PROGRESS,
BackupServiceState.RESPONSE_FAILED_NETWORK,
]:
# We have a backup in progress or we cannot reach the API
# taking the "safe path" of informing a backup is in progress
return True
return False

def _query_backup_status(self, backup_id: Optional[str] = None) -> BackupServiceState:
try:
for attempt in Retrying(stop=stop_after_attempt(5), wait=wait_fixed(5)):
with attempt:
target = f"_snapshot/{S3_REPOSITORY}/"
target += f"{backup_id.lower()}" if backup_id else "_all"
output = self._request("GET", target)
logger.debug(f"Backup status: {output}")
except RetryError as e:
logger.error(f"_request failed with: {e}")
return BackupServiceState.RESPONSE_FAILED_NETWORK
return self.get_service_status(output)

def _on_s3_credentials_changed(self, event: EventBase) -> None: # noqa: C901
"""Calls the plugin manager config handler.
Expand Down Expand Up @@ -977,75 +1045,12 @@ def can_use_s3_repository(self) -> bool:
return False
return True

def _request(self, *args, **kwargs) -> dict[str, Any] | None:
"""Returns the output of OpenSearchDistribution.request() or throws an error.
Request method can return one of many: Union[Dict[str, any], List[any], int]
and raise multiple types of errors.
If int is returned, then throws an exception informing the HTTP request failed.
If the request fails, returns the error text or None if only status code is found.
Raises:
- ValueError
"""
if "retries" not in kwargs.keys():
kwargs["retries"] = 6
if "timeout" not in kwargs.keys():
kwargs["timeout"] = 10
# We are interested to see the entire response
kwargs["resp_status_code"] = False
try:
result = self.charm.opensearch.request(*args, **kwargs)
except OpenSearchHttpError as e:
return e.response_body if e.response_body else None
return result if isinstance(result, dict) else None

def get_service_status( # noqa: C901
self, response: dict[str, Any] | None
) -> BackupServiceState:
"""Returns the response status in a Enum.
Based on:
https://github.com/opensearch-project/OpenSearch/blob/
ba78d93acf1da6dae16952d8978de87cb4df2c61/
server/src/main/java/org/opensearch/OpenSearchServerException.java#L837
https://github.com/opensearch-project/OpenSearch/blob/
ba78d93acf1da6dae16952d8978de87cb4df2c61/
plugins/repository-s3/src/yamlRestTest/resources/rest-api-spec/test/repository_s3/40_repository_ec2_credentials.yml
"""
if not response:
return BackupServiceState.SNAPSHOT_FAILED_UNKNOWN

try:
if "error" not in response:
return BackupServiceState.SUCCESS
type = response["error"]["root_cause"][0]["type"]
reason = response["error"]["root_cause"][0]["reason"]
except KeyError as e:
logger.exception(e)
logger.error("response contained unknown error code")
return BackupServiceState.RESPONSE_FAILED_NETWORK
# Check if we error'ed b/c s3 repo is not configured, hence we are still
# waiting for the plugin to be configured
if type == "repository_exception" and REPO_NOT_CREATED_ERR in reason:
return BackupServiceState.REPO_NOT_CREATED
if type == "repository_exception" and REPO_CREATING_ERR in reason:
return BackupServiceState.REPO_CREATION_ERR
if type == "repository_exception":
return BackupServiceState.REPO_ERR_UNKNOWN
if type == "repository_missing_exception":
return BackupServiceState.REPO_MISSING
if type == "repository_verification_exception" and REPO_NOT_ACCESS_ERR in reason:
return BackupServiceState.REPO_S3_UNREACHABLE
if type == "illegal_argument_exception":
return BackupServiceState.ILLEGAL_ARGUMENT
if type == "snapshot_missing_exception":
return BackupServiceState.SNAPSHOT_MISSING
if type == "snapshot_restore_exception" and RESTORE_OPEN_INDEX_WITH_SAME_NAME in reason:
return BackupServiceState.SNAPSHOT_RESTORE_ERROR_INDEX_NOT_CLOSED
if type == "snapshot_restore_exception":
return BackupServiceState.SNAPSHOT_RESTORE_ERROR
"""Returns the response status in a Enum."""
if (status := super().get_service_status(response)) == BackupServiceState.SUCCESS:
return BackupServiceState.SUCCESS
if (
"bucket" in self.s3_client.get_s3_connection_info()
and S3_REPOSITORY in response
Expand All @@ -1054,24 +1059,7 @@ def get_service_status( # noqa: C901
== response[S3_REPOSITORY]["settings"]["bucket"]
):
return BackupServiceState.REPO_NOT_CREATED_ALREADY_EXISTS
# Ensure this is not containing any information about snapshots, return SUCCESS
return self.get_snapshot_status(response)

def get_snapshot_status(self, response: Dict[str, Any] | None) -> BackupServiceState:
"""Returns the snapshot status."""
if not response:
return BackupServiceState.SNAPSHOT_FAILED_UNKNOWN
# Now, check snapshot status:
r_str = str(response)
if "IN_PROGRESS" in r_str:
return BackupServiceState.SNAPSHOT_IN_PROGRESS
if "PARTIAL" in r_str:
return BackupServiceState.SNAPSHOT_PARTIALLY_TAKEN
if "INCOMPATIBLE" in r_str:
return BackupServiceState.SNAPSHOT_INCOMPATIBILITY
if "FAILED" in r_str:
return BackupServiceState.SNAPSHOT_FAILED_UNKNOWN
return BackupServiceState.SUCCESS
return status


def backup(charm: CharmBase) -> OpenSearchBackupBase:
Expand Down
3 changes: 2 additions & 1 deletion lib/charms/opensearch/v0/opensearch_base_charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -982,7 +982,7 @@ def _post_start_init(self, event: _StartOpenSearch): # noqa: C901
self.status.clear(WaitingToStart)

if event.after_upgrade:
health = self.health.apply(wait_for_green_first=True, app=False)
health = self.health.get(local_app_only=False, wait_for_green_first=True)
self.health.apply_for_unit_during_upgrade(health)

# Cluster is considered healthy if green or yellow
Expand Down Expand Up @@ -1029,6 +1029,7 @@ def _post_start_init(self, event: _StartOpenSearch): # noqa: C901
self.health.apply()

self._upgrade.unit_state = upgrade.UnitState.HEALTHY
logger.debug("Set upgrade unit state to healthy")
self._reconcile_upgrade()

# update the peer cluster rel data with new IP in case of main cluster manager
Expand Down
2 changes: 2 additions & 0 deletions src/machine_upgrade.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,9 @@ def authorized(self) -> bool:
or state is not upgrade.UnitState.HEALTHY
):
# Waiting for higher number units to upgrade
logger.debug(f"Upgrade not authorized. Waiting for {unit.name=} to upgrade")
return False
logger.debug(f"Upgrade not authorized. Waiting for {unit.name=} to upgrade")
return False

def upgrade_unit(self, *, snap: OpenSearchSnap) -> None:
Expand Down
Loading

0 comments on commit 2af1298

Please sign in to comment.