diff --git a/ocs_ci/ocs/amq.py b/ocs_ci/ocs/amq.py index 155478fb178..2d44f04f9c7 100644 --- a/ocs_ci/ocs/amq.py +++ b/ocs_ci/ocs/amq.py @@ -79,7 +79,13 @@ def _clone_amq(self): """ try: log.info(f"cloning amq in {self.dir}") - git_clone_cmd = f"git clone {self.repo} " + + # Further setup in this class assumes the use of zookeeper, before + # the following change was made to strimzi-kafka-operator: + # https://github.com/strimzi/strimzi-kafka-operator/pull/10982 + last_compatible_branch = "release-0.44.x" + git_clone_cmd = f"git clone --branch {last_compatible_branch} {self.repo} " + run(git_clone_cmd, shell=True, cwd=self.dir, check=True) self.amq_dir = "strimzi-kafka-operator/packaging/install/cluster-operator/" self.amq_kafka_pers_yaml = ( diff --git a/ocs_ci/ocs/bucket_utils.py b/ocs_ci/ocs/bucket_utils.py index 44e5b28bdba..13cba85f32e 100644 --- a/ocs_ci/ocs/bucket_utils.py +++ b/ocs_ci/ocs/bucket_utils.py @@ -2921,3 +2921,114 @@ def create_s3client_from_assume_role_creds(mcg_obj, assume_role_creds): aws_session_token=assumed_session_token, ) return assumed_s3_resource.meta.client + + +def put_bucket_versioning_via_awscli( + mcg_obj, awscli_pod, bucket_name, status="Enabled" +): + """ + Put bucket versioning using AWS CLI + + Args: + mcg_obj (MCG): MCG object + awscli_pod (Pod): Pod object where AWS CLI is installed + bucket_name (str): Name of the bucket + status (str): Status of the versioning + + """ + awscli_pod.exec_cmd_on_pod( + command=craft_s3_command( + f"put-bucket-versioning --bucket {bucket_name} --versioning-configuration Status={status}", + mcg_obj=mcg_obj, + api=True, + ) + ) + + +def upload_obj_versions(mcg_obj, awscli_pod, bucket_name, obj_key, amount=1, size="1M"): + """ + Upload multiple random data versions to a given object key and return their ETag values + + Args: + mcg_obj (MCG): MCG object + awscli_pod (Pod): Pod object where AWS CLI is installed + bucket_name (str): Name of the bucket + obj_key (str): Object key + amount (int): Number of versions to create + size (str): Size of the object. I.E 1M + + Returns: + list: List of ETag values of versions in latest to oldest order + """ + file_dir = os.path.join("/tmp", str(uuid4())) + awscli_pod.exec_cmd_on_pod(f"mkdir {file_dir}") + + etags = [] + + for i in range(amount): + file_path = os.path.join(file_dir, f"{obj_key}_{i}") + awscli_pod.exec_cmd_on_pod( + command=f"dd if=/dev/urandom of={file_path} bs={size} count=1" + ) + # Use debug and redirect it to stdout to get + # the uploaded object's ETag in the response + resp = awscli_pod.exec_cmd_on_pod( + command=craft_s3_command( + f"cp {file_path} s3://{bucket_name}/{obj_key} --debug 2>&1", + mcg_obj=mcg_obj, + ), + out_yaml_format=False, + ) + + # Parse the ETag from the response + # Filter the line containing the JSON + line = next(filter(lambda line: "ETag" in line, resp.splitlines())) + json_start = line.index("{") + json_str = line[json_start:] + + # Fix quotes to read as dict + json_str = json_str.replace("'", '"') + json_str = json_str.replace('""', '"') + + # Convert to dict and extract the ETag + new_etag = json.loads(json_str).get("ETag") + + # Later versions should precede the older ones + # this achieves the expected order of versions + # we'd get from list-object-versions + etags.insert(0, new_etag) + + return etags + + +def get_obj_versions(mcg_obj, awscli_pod, bucket_name, obj_key): + """ + Get object versions using AWS CLI + + Args: + mcg_obj (MCG): MCG object + awscli_pod (Pod): Pod object where AWS CLI is installed + bucket_name (str): Name of the bucket + obj_key (str): Object key + + Returns: + list: List of dictionaries containing the versions data + """ + resp = awscli_pod.exec_cmd_on_pod( + command=craft_s3_command( + f"list-object-versions --bucket {bucket_name} --prefix {obj_key}", + mcg_obj=mcg_obj, + api=True, + ), + out_yaml_format=False, + ) + + versions_dicts = [] + if resp and "Versions" in resp: + versions_dicts = json.loads(resp).get("Versions") + + # Remove quotes from the ETag values for easier usage + for d in versions_dicts: + d["ETag"] = d["ETag"].strip('"') + + return versions_dicts diff --git a/ocs_ci/ocs/constants.py b/ocs_ci/ocs/constants.py index 97c62d4db6e..348c33507ad 100644 --- a/ocs_ci/ocs/constants.py +++ b/ocs_ci/ocs/constants.py @@ -3117,3 +3117,11 @@ MACHINE_POOL_ACTIONS = [CREATE, EDIT, DELETE] # MDR multicluster roles MDR_ROLES = ["ActiveACM", "PassiveACM", "PrimaryODF", "SecondaryODF"] + +ENCRYPTION_DASHBOARD_CONTEXT_MAP = { + "Cluster-wide encryption": "cluster_wide_encryption", + "Storage class encryption": "storageclass_encryption", + "In-transit encryption": "intransit_encryption", + "Block storage": "block_storage", + "Object storage": "object_storage", +} diff --git a/ocs_ci/ocs/resources/mcg_replication_policy.py b/ocs_ci/ocs/resources/mcg_replication_policy.py index d18300a09dd..a28ece2b902 100644 --- a/ocs_ci/ocs/resources/mcg_replication_policy.py +++ b/ocs_ci/ocs/resources/mcg_replication_policy.py @@ -99,3 +99,20 @@ def to_dict(self): dict["log_replication_info"]["endpoint_type"] = "AZURE" return dict + + +class ReplicationPolicyWithVersioning(McgReplicationPolicy): + """ + A class to handle the MCG bucket replication policy JSON structure with versioning. + + """ + + def __init__(self, target_bucket, sync_versions=True, prefix=""): + super().__init__(target_bucket, prefix) + self.sync_versions = sync_versions + + def to_dict(self): + dict = super().to_dict() + dict["rules"][0]["sync_versions"] = self.sync_versions + + return dict diff --git a/ocs_ci/ocs/ui/helpers_ui.py b/ocs_ci/ocs/ui/helpers_ui.py index 60d1a50a7d9..ef68dc7589c 100644 --- a/ocs_ci/ocs/ui/helpers_ui.py +++ b/ocs_ci/ocs/ui/helpers_ui.py @@ -8,6 +8,7 @@ from ocs_ci.ocs.ui.base_ui import login_ui, close_browser from ocs_ci.ocs.ui.add_replace_device_ui import AddReplaceDeviceUI from ocs_ci.ocs.resources.storage_cluster import get_deviceset_count, get_osd_size +from ocs_ci.ocs.exceptions import ResourceNotFoundError logger = logging.getLogger(__name__) @@ -217,3 +218,27 @@ def is_ui_deployment(): return True return False + + +def extract_encryption_status(root_element, svg_path): + """Function to extract encryption status from an SVG element + + Args: + root_element (str): Dom root element + svg_element (str): svg element path + + Returns: + bool: if encryption status is enable for given element return True otherwise False. + + Raises: + ResourceNotFoundError: If given resource is not found. + """ + try: + svg_element = root_element.find_element(By.CSS_SELECTOR, svg_path) + if svg_element and svg_element.tag_name == "svg": + if svg_element.get_attribute("data-test") == "success-icon": + return True + else: + return False + except Exception as e: + raise ResourceNotFoundError(f"Given SVG element is not Found: {e}") diff --git a/ocs_ci/ocs/ui/page_objects/encryption_module.py b/ocs_ci/ocs/ui/page_objects/encryption_module.py new file mode 100644 index 00000000000..7f73c70f802 --- /dev/null +++ b/ocs_ci/ocs/ui/page_objects/encryption_module.py @@ -0,0 +1,138 @@ +from ocs_ci.ocs.ui.helpers_ui import logger, extract_encryption_status +from ocs_ci.ocs.constants import ENCRYPTION_DASHBOARD_CONTEXT_MAP +from ocs_ci.ocs.ui.page_objects.page_navigator import PageNavigator + + +class EncryptionModule(PageNavigator): + def _get_encryption_summary(self, context_key): + """ + Generic method to collect encryption summary based on the context. + + Args: + context_key (str): Key to determine the validation location. + + Returns: + dict: Encryption summary for the given context. + """ + encryption_summary = { + "object_storage": {"status": None, "kms": ""}, + "cluster_wide_encryption": {"status": None, "kms": ""}, + "storageclass_encryption": {"status": None, "kms": ""}, + "intransit_encryption": {"status": None}, + } + + logger.info(f"Getting Encryption Summary for context: {context_key}") + + # Open the encryption summary popup + self.do_click( + self.validation_loc["encryption_summary"][context_key]["enabled"], + enable_screenshot=True, + ) + + self.page_has_loaded( + module_loc=self.validation_loc["encryption_summary"][context_key][ + "encryption_content_data" + ] + ) + + # Get elements for text and root + encryption_content_location = self.validation_loc["encryption_summary"][ + context_key + ]["encryption_content_data"] + encryption_summary_text = self.get_element_text(encryption_content_location) + root_elements = self.get_elements(encryption_content_location) + + if not root_elements: + raise ValueError("Error getting root web element") + root_element = root_elements[0] + + # Process encryption summary text + current_context = None + for line in encryption_summary_text.split("\n"): + line = line.strip() + if line in ENCRYPTION_DASHBOARD_CONTEXT_MAP: + current_context = ENCRYPTION_DASHBOARD_CONTEXT_MAP[line] + continue + + if ( + current_context + in [ + "object_storage", + "cluster_wide_encryption", + "storageclass_encryption", + ] + and "External Key Management Service" in line + ): + encryption_summary[current_context]["kms"] = line.split(":")[-1].strip() + encryption_summary[current_context]["status"] = ( + extract_encryption_status( + root_element, + self._get_svg_selector(context_key, current_context), + ) + ) + elif current_context == "intransit_encryption": + encryption_summary[current_context]["status"] = ( + extract_encryption_status( + root_element, + self._get_svg_selector(context_key, current_context), + ) + ) + + logger.info(f"Encryption Summary for {context_key}: {encryption_summary}") + + # Close the popup + logger.info("Closing the popup") + self.do_click( + self.validation_loc["encryption_summary"][context_key]["close"], + enable_screenshot=True, + ) + + return encryption_summary + + def _get_svg_selector(self, context_key, current_context): + """ + Get the appropriate SVG selector for extracting encryption status. + + Args: + context_key (str): The context key. + current_context (str): The current encryption context. + + Returns: + str: SVG selector path. + """ + selectors = { + "object_storage": { + "object_storage": "div.pf-v5-l-flex:nth-child(1) > div:nth-child(2) > svg", + "intransit_encryption": "div.pf-v5-l-flex:nth-child(4) > div:nth-child(2) > svg", + }, + "file_and_block": { + "cluster_wide_encryption": ( + "div.pf-m-align-items-center:nth-child(1) > " + "div:nth-child(2) > svg:nth-child(1)" + ), + "storageclass_encryption": ( + "div.pf-v5-l-flex:nth-child(6) > " + "div:nth-child(2) > svg:nth-child(1)" + ), + "intransit_encryption": "div.pf-v5-l-flex:nth-child(10) > div:nth-child(2) > svg", + }, + } + return selectors.get(context_key, {}).get(current_context, "") + + def get_object_encryption_summary(self): + """ + Retrieve the encryption summary for the object details page. + + Returns: + dict: Encryption summary on object details page. + """ + return self._get_encryption_summary("object_storage") + + def get_block_file_encryption_summary(self): + """ + Retrieve the encryption summary for the block and file page. + + Returns: + dict: Encryption summary on block and file page. + """ + return self._get_encryption_summary("file_and_block") diff --git a/ocs_ci/ocs/ui/page_objects/storage_system_details.py b/ocs_ci/ocs/ui/page_objects/storage_system_details.py index 3922c295ee6..afde9efe945 100644 --- a/ocs_ci/ocs/ui/page_objects/storage_system_details.py +++ b/ocs_ci/ocs/ui/page_objects/storage_system_details.py @@ -2,11 +2,13 @@ from ocs_ci.ocs.ui.base_ui import logger, BaseUI from ocs_ci.ocs.ui.page_objects.storage_system_tab import StorageSystemTab from ocs_ci.utility import version +from ocs_ci.ocs.ui.page_objects.encryption_module import EncryptionModule -class StorageSystemDetails(StorageSystemTab): +class StorageSystemDetails(StorageSystemTab, EncryptionModule): def __init__(self): StorageSystemTab.__init__(self) + EncryptionModule.__init__(self) def nav_details_overview(self): logger.info("Click on Overview tab") @@ -33,6 +35,8 @@ def nav_details_object(self): else: self.do_click(self.validation_loc["object"], enable_screenshot=True) + return self + def nav_block_and_file(self): """ Accessible only at StorageSystems / StorageSystem details / Overview diff --git a/ocs_ci/ocs/ui/views.py b/ocs_ci/ocs/ui/views.py index 87aab0c4cc2..b2e4d85ab61 100644 --- a/ocs_ci/ocs/ui/views.py +++ b/ocs_ci/ocs/ui/views.py @@ -1794,6 +1794,39 @@ ), } +validation_4_18 = { + "encryption_summary": { + "file_and_block": { + "enabled": ( + "//button[@class='pf-v5-c-button pf-m-link pf-m-inline' and text()='Enabled']", + By.XPATH, + ), + "close": ( + "//button[@class='pf-v5-c-button pf-m-plain' and @aria-label='Close']", + By.XPATH, + ), + "encryption_content_data": ( + "//div[@class='pf-v5-c-popover__body']", + By.XPATH, + ), + }, + "object_storage": { + "enabled": ( + "//button[@class='pf-v5-c-button pf-m-link pf-m-inline' and text()='Enabled']", + By.XPATH, + ), + "close": ( + "//button[@class='pf-v5-c-button pf-m-plain' and @aria-label='Close']", + By.XPATH, + ), + "encryption_content_data": ( + "//div[@class='pf-v5-c-popover__content']", + By.XPATH, + ), + }, + } +} + topology = { "topology_graph": ("//*[@data-kind='graph']", By.XPATH), "node_label": ("//*[@class='pf-topology__node__label']", By.XPATH), @@ -2035,6 +2068,7 @@ **validation_4_13, **validation_4_14, **validation_4_17, + **validation_4_18, }, "block_pool": {**block_pool, **block_pool_4_12, **block_pool_4_13}, "storageclass": {**storageclass, **storageclass_4_9}, diff --git a/tests/functional/encryption/test_encryption_configuration_dashboard.py b/tests/functional/encryption/test_encryption_configuration_dashboard.py new file mode 100644 index 00000000000..db57840f4de --- /dev/null +++ b/tests/functional/encryption/test_encryption_configuration_dashboard.py @@ -0,0 +1,171 @@ +import pytest +import logging + +log = logging.getLogger(__name__) +from ocs_ci.ocs.ui.page_objects.page_navigator import PageNavigator +from ocs_ci.framework.pytest_customization.marks import green_squad, tier1 +from ocs_ci.ocs.ocp import OCP +from ocs_ci.framework import config +from ocs_ci.ocs import constants +from ocs_ci.ocs.resources.storage_cluster import StorageCluster +from ocs_ci.helpers.helpers import storagecluster_independent_check + + +@green_squad +@tier1 +class TestEncryptionConfigurationDashboard: + @pytest.fixture(autouse=True) + def encryption_status(self): + """ + Collect Encryption status from storagecluster and noobaa spec. + """ + # Retrieve encryption details + cluster_name = ( + constants.DEFAULT_CLUSTERNAME_EXTERNAL_MODE + if storagecluster_independent_check() + else constants.DEFAULT_CLUSTERNAME + ) + + sc_obj = StorageCluster( + resource_name=cluster_name, + namespace=config.ENV_DATA["cluster_namespace"], + ) + + self.enc_details = sc_obj.data["spec"].get("encryption", {}) + self.intransit_encryption_status = ( + sc_obj.data["spec"] + .get("network", {}) + .get("connections", {}) + .get("encryption", {}) + .get("enabled", False) + ) + log.info(f"Encryption details from storagecluster Spec: {self.enc_details}") + + noobaa_obj = OCP( + kind="noobaa", + namespace=config.ENV_DATA["cluster_namespace"], + resource_name="noobaa", + ) + + self.noobaa_kms = ( + noobaa_obj.data["spec"] + .get("security", {}) + .get("kms", {}) + .get("connectionDetails", {}) + .get("KMS_PROVIDER", None) # Provide a default value of None if not found + ) + log.info(f"Noobaa Spec has mentioned KMS: {self.noobaa_kms}") + + def validate_encryption( + self, context, actual_status, expected_status, error_message + ): + """Helper function to validate encryption details + + Args: + context (str): Encryption Type + actual_status (str): Encryption status in the spec file + expected_status (str): Encryption status shown on the dashboard. + error_message (str): Error message to display. + """ + assert actual_status == expected_status, error_message + log.info(f"{context} status is as expected: {actual_status}") + + @pytest.mark.polarion_id("OCS-6300") + def test_file_block_encryption_configuration_dashboard(self, setup_ui_class): + """Test the encryption configuration dashboard of File And Block details for correctness. + + Steps: + 1. Navigate to file and block details page + 2. Open encryption details. + 3. verify encryption data with the nooba and storagecluster spec. + """ + + # Navigate to the block and file page + block_and_file_page = ( + PageNavigator() + .nav_odf_default_page() + .nav_storage_systems_tab() + .nav_storagecluster_storagesystem_details() + .nav_block_and_file() + ) + + # Retrieve encryption summary from the dashboard + encryption_summary = block_and_file_page.get_block_file_encryption_summary() + + # Validate cluster-wide encryption + cluster_wide_details = self.enc_details.get("clusterWide", {}) + if isinstance(cluster_wide_details, dict): + self.validate_encryption( + "ClusterWide Encryption", + encryption_summary["cluster_wide_encryption"]["status"], + cluster_wide_details.get("status", False), + "ClusterWide Encryption is not showing correctly in the dashboard.", + ) + self.validate_encryption( + "ClusterWide KMS", + encryption_summary["cluster_wide_encryption"]["kms"], + cluster_wide_details.get("kms", {}).get("enable", False), + "KMS is not mentioned in the encryption summary.", + ) + else: + log.warning( + "ClusterWide Encryption details are not a dictionary, skipping checks." + ) + + # Validate storage class encryption + storage_class_details = self.enc_details.get("storageClass", {}) + if isinstance(storage_class_details, dict): + self.validate_encryption( + "StorageClass Encryption", + encryption_summary["storageclass_encryption"]["status"], + storage_class_details.get("status", False), + "StorageClass encryption is not showing correctly in the dashboard.", + ) + else: + log.warning("StorageClass details are not a dictionary, skipping checks.") + + # Validate in-transit encryption + self.validate_encryption( + "InTransit Encryption", + encryption_summary["intransit_encryption"]["status"], + self.intransit_encryption_status, + "InTransit Encryption status is incorrect in the dashboard.", + ) + + @pytest.mark.polarion_id("OCS-6301") + def test_object_storage_encryption_configuration_dashboard(self, setup_ui_class): + """Test the encryption configuration dashboard of Object details for correctness. + + Steps: + 1. Navigate to object details page + 2. Open encryption details. + 3. verify encryption data with the nooba and storagecluster spec. + """ + # Navigate to the Object Storage page + object_details_page = ( + PageNavigator() + .nav_odf_default_page() + .nav_storage_systems_tab() + .nav_storagecluster_storagesystem_details() + .nav_details_object() + ) + + encryption_summary = object_details_page.get_object_encryption_summary() + log.info(f"Encryption Summary from page : {encryption_summary}") + + # Validate Object Encryption Summary + assert encryption_summary["object_storage"][ + "status" + ], "Object encryption summary is wrong" + assert ( + encryption_summary["object_storage"]["kms"].upper() + == self.noobaa_kms.upper() + ), "KMS details is not correct" + + # Validate in-transit encryption + self.validate_encryption( + "InTransit Encryption", + encryption_summary["intransit_encryption"]["status"], + self.intransit_encryption_status, + "InTransit Encryption status is incorrect in the dashboard.", + ) diff --git a/tests/functional/object/mcg/test_bucket_replication_with_versioning.py b/tests/functional/object/mcg/test_bucket_replication_with_versioning.py new file mode 100644 index 00000000000..d08781c730f --- /dev/null +++ b/tests/functional/object/mcg/test_bucket_replication_with_versioning.py @@ -0,0 +1,144 @@ +import json +import logging +from uuid import uuid4 + +import pytest + +from ocs_ci.framework.pytest_customization.marks import ( + mcg, + polarion_id, + red_squad, + runs_on_provider, + tier1, +) +from ocs_ci.framework.testlib import MCGTest +from ocs_ci.ocs import constants +from ocs_ci.ocs.bucket_utils import ( + get_obj_versions, + put_bucket_versioning_via_awscli, + update_replication_policy, + upload_obj_versions, +) +from ocs_ci.ocs.exceptions import TimeoutExpiredError +from ocs_ci.ocs.resources.mcg_replication_policy import ReplicationPolicyWithVersioning +from ocs_ci.utility.utils import TimeoutSampler + +logger = logging.getLogger(__name__) + + +TIMEOUT = 300 + + +@mcg +@red_squad +@runs_on_provider +class TestReplicationWithVersioning(MCGTest): + """ + Test suite for MCG object replication policies + """ + + @pytest.fixture(autouse=True, scope="class") + def reduce_replication_delay_setup(self, add_env_vars_to_noobaa_core_class): + """ + Reduce the replication delay to one minute + + Args: + new_delay_in_miliseconds (function): A function to add env vars to the noobaa-core pod + """ + new_delay_in_miliseconds = 60 * 1000 + new_env_var_touples = [ + (constants.BUCKET_REPLICATOR_DELAY_PARAM, new_delay_in_miliseconds), + (constants.BUCKET_LOG_REPLICATOR_DELAY_PARAM, new_delay_in_miliseconds), + ] + add_env_vars_to_noobaa_core_class(new_env_var_touples) + + @pytest.fixture() + def buckets_with_versioning(self, bucket_factory, mcg_obj, awscli_pod_session): + """ + Prepare two buckets with versioning enabled + + Args: + bucket_factory: Fixture for creating new buckets + mcg_obj_session: The session-scoped MCG object + awscli_pod_session: The session-scoped AWSCLI pod + + Returns: + Tuple of two buckets: source and target + """ + # Using the OC interface allows patching a repli policy on the OBC + bucket_a = bucket_factory(1, "OC")[0] + bucket_b = bucket_factory(1, "OC")[0] + + put_bucket_versioning_via_awscli(mcg_obj, awscli_pod_session, bucket_a.name) + put_bucket_versioning_via_awscli(mcg_obj, awscli_pod_session, bucket_b.name) + + return bucket_a, bucket_b + + @tier1 + @polarion_id("OCS-6294") + def test_bucket_replication_with_versioning( + self, + awscli_pod, + mcg_obj, + buckets_with_versioning, + ): + """ + 1. Create two buckets and enable versioning on both + 2. Set a bucket replication policy with versioning enabled on the source bucket + 3. Write some versions to the source bucket + 4. Verify the versions were replicated to the target bucket in the same order + """ + obj_key = "test_obj_" + str(uuid4())[:4] + versions_amount = 5 + + # 1. Create two buckets and enable versioning on both + source_bucket, target_bucket = buckets_with_versioning + + # 2. Set a bucket replication policy with versioning enabled on the source bucket + replication_policy = ReplicationPolicyWithVersioning( + target_bucket=target_bucket.name + ) + update_replication_policy(source_bucket.name, replication_policy.to_dict()) + + # 3. Write some versions to the source bucket + source_etags = upload_obj_versions( + mcg_obj, + awscli_pod, + source_bucket.name, + obj_key=obj_key, + amount=versions_amount, + ) + logger.info(f"Uploaded versions with etags: {source_etags}") + + # 4. Verify the versions were replicated to the target bucket in the same order + last_target_etags = None + try: + for target_versions in TimeoutSampler( + timeout=TIMEOUT, + sleep=30, + func=get_obj_versions, + mcg_obj=mcg_obj, + awscli_pod=awscli_pod, + bucket_name=target_bucket.name, + obj_key=obj_key, + ): + target_etags = [v["ETag"] for v in target_versions] + if source_etags == target_etags: + logger.info( + f"Source and target etags match: {source_etags} == {target_etags}" + ) + break + logger.warning( + f"Source and target etags do not match: {source_etags} != {target_etags}" + ) + last_target_etags = target_etags + except TimeoutExpiredError as e: + err_msg = ( + f"Source and target etags do not match after {TIMEOUT} seconds:\n" + f"Source etags:\n{json.dumps(source_etags, indent=2)}\n" + f"Target etags:\n{json.dumps(last_target_etags, indent=2)}" + ) + logger.error(err_msg) + raise TimeoutExpiredError(f"{str(e)}\n\n{err_msg}") + + logger.info("All versions were replicated successfully")