diff --git a/ocs_ci/ocs/bucket_utils.py b/ocs_ci/ocs/bucket_utils.py index 13cba85f32e..bdfdad94032 100644 --- a/ocs_ci/ocs/bucket_utils.py +++ b/ocs_ci/ocs/bucket_utils.py @@ -2115,6 +2115,15 @@ def update_replication_policy(bucket_name, replication_policy_dict): ).patch(params=json.dumps(replication_policy_patch_dict), format_type="merge") +def get_replication_policy(bucket_name): + + return OCP( + kind="obc", + namespace=config.ENV_DATA["cluster_namespace"], + resource_name=bucket_name, + ).get()["spec"]["additionalConfig"]["replicationPolicy"] + + def patch_replication_policy_to_bucketclass( bucketclass_name, rule_id, destination_bucket_name ): @@ -2716,6 +2725,42 @@ def bulk_s3_put_bucket_lifecycle_config(mcg_obj, buckets, lifecycle_config): logger.info("Applied lifecyle rule on all the buckets") +def upload_random_objects_to_source_and_wait_for_replication( + mcg_obj, + source_bucket, + target_bucket, + mockup_logger, + file_dir, + pattern="ObjKey-", + amount=1, + prefix=None, + timeout=600, +): + """ + Upload randomly generated objects to the source bucket and wait until the + replication happens + + """ + + logger.info(f"Randomly generating {amount} object/s") + obj_list = write_random_objects_in_pod( + io_pod=mockup_logger.awscli_pod, + file_dir=file_dir, + amount=amount, + pattern=pattern, + ) + + mockup_logger.upload_random_objects_and_log( + source_bucket.name, file_dir=file_dir, obj_list=obj_list, prefix=prefix + ) + assert compare_bucket_object_list( + mcg_obj, + source_bucket.name, + target_bucket.name, + timeout=timeout, + ), f"Standard replication failed to complete in {timeout} seconds" + + def upload_test_objects_to_source_and_wait_for_replication( mcg_obj, source_bucket, target_bucket, mockup_logger, timeout ): @@ -2945,7 +2990,9 @@ def put_bucket_versioning_via_awscli( ) -def upload_obj_versions(mcg_obj, awscli_pod, bucket_name, obj_key, amount=1, size="1M"): +def upload_obj_versions( + mcg_obj, awscli_pod, bucket_name, obj_key, prefix=None, amount=1, size="1M" +): """ Upload multiple random data versions to a given object key and return their ETag values @@ -2960,6 +3007,10 @@ def upload_obj_versions(mcg_obj, awscli_pod, bucket_name, obj_key, amount=1, siz Returns: list: List of ETag values of versions in latest to oldest order """ + full_object_path = ( + f"s3://{bucket_name}/{prefix}" if prefix else f"s3://{bucket_name}/" + ) + file_dir = os.path.join("/tmp", str(uuid4())) awscli_pod.exec_cmd_on_pod(f"mkdir {file_dir}") @@ -2974,7 +3025,7 @@ def upload_obj_versions(mcg_obj, awscli_pod, bucket_name, obj_key, amount=1, siz # the uploaded object's ETag in the response resp = awscli_pod.exec_cmd_on_pod( command=craft_s3_command( - f"cp {file_path} s3://{bucket_name}/{obj_key} --debug 2>&1", + f"cp {file_path} {full_object_path}/{obj_key} --debug 2>&1", mcg_obj=mcg_obj, ), out_yaml_format=False, @@ -3030,5 +3081,35 @@ def get_obj_versions(mcg_obj, awscli_pod, bucket_name, obj_key): # Remove quotes from the ETag values for easier usage for d in versions_dicts: d["ETag"] = d["ETag"].strip('"') - return versions_dicts + + +def verify_deletion_marker(mcg_obj, awscli_pod, bucket_name, object_key): + """ + Verify if deletion marker exists for the given object key + + Args: + mcg_obj (MCG): MCG object + awscli_pod (Pod): Pod object where AWS CLI is installed + bucket_name (str): Name of the bucket + object_key (str): Object key + + Returns: + True if DeletionMarkers exists else False + + """ + resp = awscli_pod.exec_cmd_on_pod( + command=craft_s3_command( + f"list-object-versions --bucket {bucket_name} --prefix {object_key}", + mcg_obj=mcg_obj, + api=True, + ), + out_yaml_format=False, + ) + + if resp and "DeleteMarkers" in resp: + delete_markers = json.loads(resp).get("DeleteMarkers")[0] + logger.info(f"{bucket_name}:\n{delete_markers}") + if delete_markers.get("IsLatest"): + return True + return False diff --git a/ocs_ci/ocs/resources/mcg_replication_policy.py b/ocs_ci/ocs/resources/mcg_replication_policy.py index a28ece2b902..b70c0ecaeb0 100644 --- a/ocs_ci/ocs/resources/mcg_replication_policy.py +++ b/ocs_ci/ocs/resources/mcg_replication_policy.py @@ -38,15 +38,18 @@ def __init__( self, destination_bucket, sync_deletions=False, + sync_versions=False, prefix="", ): super().__init__(destination_bucket, prefix) self.sync_deletions = sync_deletions + self.sync_versions = sync_versions @abstractmethod def to_dict(self): dict = super().to_dict() dict["rules"][0]["sync_deletions"] = self.sync_deletions + dict["rules"][0]["sync_versions"] = self.sync_versions dict["log_replication_info"] = {} return dict @@ -65,8 +68,9 @@ def __init__( logs_bucket="", prefix="", logs_location_prefix="", + sync_versions=False, ): - super().__init__(destination_bucket, sync_deletions, prefix) + super().__init__(destination_bucket, sync_deletions, sync_versions, prefix) self.logs_bucket = logs_bucket self.logs_location_prefix = logs_location_prefix diff --git a/ocs_ci/ocs/resources/mockup_bucket_logger.py b/ocs_ci/ocs/resources/mockup_bucket_logger.py index 8c535493a23..d5f56af881d 100644 --- a/ocs_ci/ocs/resources/mockup_bucket_logger.py +++ b/ocs_ci/ocs/resources/mockup_bucket_logger.py @@ -101,7 +101,29 @@ def upload_arbitrary_object_and_log(self, bucket_name): self._upload_mockup_logs(bucket_name, [obj_name], "PUT") - def delete_objs_and_log(self, bucket_name, objs): + def upload_random_objects_and_log( + self, bucket_name, file_dir, obj_list, prefix=None + ): + """ + Uploads randomly generated objects to the bucket and upload a matching + mockup log + + """ + + logger.info( + f"Uploading randomly generated objects from {file_dir} to {bucket_name}" + ) + prefix = prefix if prefix else "" + sync_object_directory( + self.awscli_pod, + file_dir, + f"s3://{bucket_name}/{prefix}", + self.mcg_obj, + ) + + self._upload_mockup_logs(bucket_name=bucket_name, obj_list=obj_list, op="PUT") + + def delete_objs_and_log(self, bucket_name, objs, prefix=None): """ Delete list of objects from the MCG bucket and write matching mockup logs @@ -112,17 +134,20 @@ def delete_objs_and_log(self, bucket_name, objs): """ logger.info(f"Deleting the {objs} from the bucket") + prefix = prefix if prefix else "" obj_list = list_objects_from_bucket( self.awscli_pod, - f"s3://{bucket_name}", + f"s3://{bucket_name}/{prefix}/", s3_obj=self.mcg_obj, ) if set(objs).issubset(set(obj_list)): for i in range(len(objs)): s3cmd = craft_s3_command( - f"rm s3://{bucket_name}/{objs[i]}", self.mcg_obj + f"rm s3://{bucket_name}/{prefix}/{objs[i]}", self.mcg_obj ) self.awscli_pod.exec_cmd_on_pod(s3cmd) + if prefix: + objs = [f"{prefix}/{obj}" for obj in objs] self._upload_mockup_logs(bucket_name, objs, "DELETE") def delete_all_objects_and_log(self, bucket_name): diff --git a/tests/conftest.py b/tests/conftest.py index 9191cb64bbd..742ae1f8ddf 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -46,6 +46,8 @@ from ocs_ci.ocs.bucket_utils import ( craft_s3_command, put_bucket_policy, + update_replication_policy, + put_bucket_versioning_via_awscli, ) from ocs_ci.ocs.constants import FUSION_CONF_DIR from ocs_ci.ocs.cnv.virtual_machine import VirtualMachine, VMCloner @@ -8138,6 +8140,21 @@ def factory(interval): return factory +@pytest.fixture() +def reduce_replication_delay(add_env_vars_to_noobaa_core_class): + + def factory(interval=1): + + new_delay_in_milliseconfs = interval * 60 * 1000 + new_env_var_touples = [ + (constants.BUCKET_REPLICATOR_DELAY_PARAM, new_delay_in_milliseconfs), + (constants.BUCKET_LOG_REPLICATOR_DELAY_PARAM, new_delay_in_milliseconfs), + ] + add_env_vars_to_noobaa_core_class(new_env_var_touples) + + return factory + + @pytest.fixture() def reset_conn_score(): """ @@ -8465,21 +8482,34 @@ def aws_log_based_replication_setup( """ A fixture to set up standard log-based replication with deletion sync. - Args: - awscli_pod_session(Pod): A pod running the AWS CLI - mcg_obj_session(MCG): An MCG object - bucket_factory: A bucket factory fixture - - Returns: - MockupBucketLogger: A MockupBucketLogger object - Bucket: The source bucket - Bucket: The target bucket - """ reduce_replication_delay_setup() - def factory(bucketclass_dict=None): + def factory( + bucketclass_dict=None, + prefix_source="", + prefix_target="", + bidirectional=False, + deletion_sync=True, + enable_versioning=False, + ): + """ + A fixture to set up standard log-based replication with deletion sync. + + Args: + bucketclass_dict (Dict): Dictionary representing bucketclass parameters + bidirectional (Bool): True if you want to setup bi-directional replication + otherwise False + deletion_sync (Bool): True if you want to setup deletion sync otherwise False + + Returns: + MockupBucketLogger: A MockupBucketLogger object + Bucket: The source bucket + Bucket: The target bucket + + """ + log.info("Starting log-based replication setup") if bucketclass_dict is None: bucketclass_dict = { @@ -8492,27 +8522,60 @@ def factory(bucketclass_dict=None): }, } target_bucket = bucket_factory(bucketclass=bucketclass_dict)[0] + if enable_versioning: + put_bucket_versioning_via_awscli( + mcg_obj_session, awscli_pod_session, target_bucket.name + ) - mockup_logger = MockupBucketLogger( + mockup_logger_source = MockupBucketLogger( awscli_pod=awscli_pod_session, mcg_obj=mcg_obj_session, bucket_factory=bucket_factory, platform=constants.AWS_PLATFORM, region=constants.DEFAULT_AWS_REGION, ) - replication_policy = AwsLogBasedReplicationPolicy( + replication_policy_source = AwsLogBasedReplicationPolicy( destination_bucket=target_bucket.name, - sync_deletions=True, - logs_bucket=mockup_logger.logs_bucket_uls_name, + sync_deletions=deletion_sync, + logs_bucket=mockup_logger_source.logs_bucket_uls_name, + prefix=prefix_source, + sync_versions=True, ) source_bucket = bucket_factory( - 1, bucketclass=bucketclass_dict, replication_policy=replication_policy + 1, + bucketclass=bucketclass_dict, + replication_policy=replication_policy_source, )[0] + if enable_versioning: + put_bucket_versioning_via_awscli( + mcg_obj_session, awscli_pod_session, source_bucket.name + ) + + mockup_logger_target = None + if bidirectional: + mockup_logger_target = MockupBucketLogger( + awscli_pod=awscli_pod_session, + mcg_obj=mcg_obj_session, + bucket_factory=bucket_factory, + platform=constants.AWS_PLATFORM, + region=constants.DEFAULT_AWS_REGION, + ) + + replication_policy_target = AwsLogBasedReplicationPolicy( + destination_bucket=source_bucket.name, + sync_deletions=deletion_sync, + logs_bucket=mockup_logger_target.logs_bucket_uls_name, + prefix=prefix_target, + sync_versions=True, + ) + update_replication_policy( + target_bucket.name, replication_policy_target.to_dict() + ) log.info("log-based replication setup complete") - return mockup_logger, source_bucket, target_bucket + return mockup_logger_source, mockup_logger_target, source_bucket, target_bucket return factory diff --git a/tests/cross_functional/conftest.py b/tests/cross_functional/conftest.py index 56bb6b2c63a..10ae80b93f5 100644 --- a/tests/cross_functional/conftest.py +++ b/tests/cross_functional/conftest.py @@ -362,6 +362,15 @@ def factory(noobaa_pvc_obj): ) return restore_pvc_objs, snap_obj + def teardown(): + """ + Teardown code to delete the restore pvc objects + + """ + for pvc_obj in restore_pvc_objs: + pvc_obj.delete() + + request.addfinalizer(teardown) return factory diff --git a/tests/cross_functional/system_test/test_mcg_replication_with_disruptions.py b/tests/cross_functional/system_test/test_mcg_replication_with_disruptions.py index 89dfe0aa7f8..100c07e9051 100644 --- a/tests/cross_functional/system_test/test_mcg_replication_with_disruptions.py +++ b/tests/cross_functional/system_test/test_mcg_replication_with_disruptions.py @@ -1,3 +1,4 @@ +import json import logging import pytest @@ -29,6 +30,12 @@ write_random_test_objects_to_bucket, upload_test_objects_to_source_and_wait_for_replication, update_replication_policy, + put_bucket_versioning_via_awscli, + get_obj_versions, + upload_random_objects_to_source_and_wait_for_replication, + get_replication_policy, + s3_put_bucket_versioning, + verify_deletion_marker, ) from ocs_ci.ocs import ocp from ocs_ci.ocs.resources.pvc import get_pvc_objs @@ -41,9 +48,16 @@ get_noobaa_core_pod, get_noobaa_pods, wait_for_noobaa_pods_running, + get_pod_node, ) + from ocs_ci.utility.retry import retry -from ocs_ci.ocs.exceptions import CommandFailed, ResourceWrongStatusException +from ocs_ci.ocs.exceptions import ( + CommandFailed, + ResourceWrongStatusException, + TimeoutExpiredError, +) +from ocs_ci.utility.utils import TimeoutSampler logger = logging.getLogger(__name__) @@ -312,7 +326,9 @@ def test_log_based_replication_with_disruptions( skip_any_features=["nsfs", "rgw kafka", "caching"], ) - mockup_logger, source_bucket, target_bucket = aws_log_based_replication_setup() + mockup_logger, _, source_bucket, target_bucket = ( + aws_log_based_replication_setup() + ) # upload test objects to the bucket and verify replication upload_test_objects_to_source_and_wait_for_replication( @@ -404,3 +420,347 @@ def test_log_based_replication_with_disruptions( object_amount=5, ) logger.info("No issues seen with the MCG bg feature validation") + + +class TestMCGReplicationWithVersioningSystemTest: + + @retry(Exception, tries=5, delay=30) + def upload_objects_with_retry( + self, + mcg_obj_session, + source_bucket, + target_bucket, + mockup_logger, + file_dir, + pattern, + prefix, + ): + upload_random_objects_to_source_and_wait_for_replication( + mcg_obj_session, + source_bucket, + target_bucket, + mockup_logger, + file_dir, + pattern=pattern, + amount=1, + prefix=prefix, + timeout=300, + ) + + def test_bucket_replication_with_versioning_system_test( + self, + awscli_pod_session, + mcg_obj_session, + bucket_factory, + reduce_replication_delay, + nodes, + noobaa_db_backup, + noobaa_db_recovery_from_backup, + aws_log_based_replication_setup, + test_directory_setup, + ): + + prefix_1 = "site_1" + prefix_2 = "site_2" + object_key = "ObjectKey-" + + # Reduce the replication delay to 1 minute + logger.info("Reduce the bucket replication delay") + reduce_replication_delay() + + # Setup two buckets with bi-directional replication enabled + # deletion sync enabled + bucketclass_dict = { + "interface": "OC", + "backingstore_dict": {"aws": [(1, "eu-central-1")]}, + } + mockup_logger_source, mockup_logger_target, bucket_1, bucket_2 = ( + aws_log_based_replication_setup( + bucketclass_dict=bucketclass_dict, + bidirectional=True, + prefix_source=prefix_1, + prefix_target=prefix_2, + ) + ) + + # Upload object and verify that bucket replication works + logger.info(f"Uploading object {object_key} to the bucket {bucket_1.name}") + self.upload_objects_with_retry( + mcg_obj_session, + bucket_1, + bucket_2, + mockup_logger_source, + test_directory_setup.origin_dir, + pattern=object_key, + prefix=prefix_1, + ) + + # Enable object versioning on both the buckets + s3_put_bucket_versioning(mcg_obj_session, bucket_1.name) + s3_put_bucket_versioning(mcg_obj_session, bucket_2.name) + + put_bucket_versioning_via_awscli( + mcg_obj_session, awscli_pod_session, bucket_1.name + ) + put_bucket_versioning_via_awscli( + mcg_obj_session, awscli_pod_session, bucket_2.name + ) + logger.info("Enabled object versioning for both the buckets") + + # Enable sync versions in both buckets replication policy + replication_1 = json.loads(get_replication_policy(bucket_name=bucket_2.name)) + replication_2 = json.loads(get_replication_policy(bucket_name=bucket_1.name)) + replication_1["rules"][0]["sync_versions"] = True + replication_2["rules"][0]["sync_versions"] = True + + update_replication_policy(bucket_2.name, replication_1) + update_replication_policy(bucket_1.name, replication_2) + logger.info( + "Enabled sync versions in the replication policy for both the buckets" + ) + + # This function samples if versions of objects in both the buckets under + # given prefix matches or not + def sample_if_versions_match(bucket_1, bucket_2, prefix): + def verify_object_version_etags(bucket_1, bucket_2, prefix): + bucket_1_etags = [ + v["ETag"] + for v in get_obj_versions( + mcg_obj_session, + awscli_pod_session, + bucket_1.name, + f"{prefix}/{object_key}0", + ) + ] + bucket_2_etags = [ + v["ETag"] + for v in get_obj_versions( + mcg_obj_session, + awscli_pod_session, + bucket_2.name, + f"{prefix}/{object_key}0", + ) + ] + logger.info( + f"\n{bucket_1.name} Etags: {bucket_1_etags}" + f"\n{bucket_2.name} Etags: {bucket_2_etags}" + ) + return bucket_1_etags == bucket_2_etags + + try: + for verified in TimeoutSampler( + timeout=300, + sleep=30, + func=verify_object_version_etags, + bucket_1=bucket_1, + bucket_2=bucket_2, + prefix=prefix, + ): + if verified: + return True + except TimeoutExpiredError: + logger.error( + "\nEtags dont match for both the buckets even after timeout. hence they dont have same versions" + ) + return False + + # Update previously uploaded object with new data and new version + self.upload_objects_with_retry( + mcg_obj_session, + bucket_2, + bucket_1, + mockup_logger_target, + test_directory_setup.origin_dir, + pattern=object_key, + prefix=prefix_2, + ) + logger.info( + f"Updated object {object_key} with new version data in bucket {bucket_2.name}" + ) + + assert sample_if_versions_match( + bucket_1, bucket_2, prefix_2 + ), f"Source bucket and target buckets dont have matching versions for the object {object_key}" + logger.info( + f"Replication works from {bucket_1.name} to {bucket_2.name} and has all the versions of object {object_key}" + ) + + # Will perform disruptive operations and object uploads, version verifications + # parallely. + with ThreadPoolExecutor(max_workers=1) as executor: + + # Update object uploaded previously from the second bucket and then shutdown the noobaa pod nodes + noobaa_pods = get_noobaa_pods( + namespace=constants.OPENSHIFT_STORAGE_NAMESPACE + ) + nodes_to_shutdown = [get_pod_node(pod_obj) for pod_obj in noobaa_pods] + logger.info( + f"Updating object {object_key} with new version data in bucket {bucket_1.name}" + ) + future = executor.submit( + self.upload_objects_with_retry, + mcg_obj_session, + bucket_1, + bucket_2, + mockup_logger_source, + test_directory_setup.origin_dir, + pattern=object_key, + prefix=prefix_1, + ) + + # nodes.stop_nodes(list(set(nodes_to_shutdown))) + logger.info(f"Stopped these noobaa pod nodes {nodes_to_shutdown}") + + # Wait for the upload to finish + future.result() + + assert sample_if_versions_match(bucket_1, bucket_2, prefix_1), ( + f"Source bucket and target buckets dont have matching" + f" versions for the object {object_key}" + ) + logger.info( + f"Replication works from {bucket_2.name} to {bucket_1.name} and" + f" has all the versions of object {object_key}" + ) + + logger.info("Starting nodes now...") + # nodes.start_nodes(nodes=nodes_to_shutdown) + wait_for_noobaa_pods_running() + + # Update object uploaded previously from the first bucket and then restart the noobaa pods + logger.info( + f"Updating object {object_key} with new version data in bucket {bucket_2.name}" + ) + future = executor.submit( + self.upload_objects_with_retry, + mcg_obj_session, + bucket_2, + bucket_1, + mockup_logger_target, + test_directory_setup.origin_dir, + pattern=object_key, + prefix=prefix_2, + ) + for pod_obj in noobaa_pods: + pod_obj.delete(force=True) + logger.info(f"Deleted noobaa pod {pod_obj.name}") + logger.info("Restarted all Noobaa pods") + + # Wait for the upload to finish + future.result() + + assert sample_if_versions_match(bucket_1, bucket_2, prefix_2), ( + f"Source bucket and target buckets dont have matching " + f"versions for the object {object_key}" + ) + logger.info( + f"Replication works from {bucket_1.name} to {bucket_2.name} " + f"and has all the versions of object {object_key}" + ) + future.result() + + # This function will sample if object deletion markers are synced + # between the objects + def sample_if_delete_marker_exists(bucket_1, bucket_2, prefix): + def verify_delete_markers(bucket_1, bucket_2, prefix): + return verify_deletion_marker( + mcg_obj_session, + awscli_pod_session, + bucket_1.name, + object_key=f"{prefix}/{object_key}0", + ) == verify_deletion_marker( + mcg_obj_session, + awscli_pod_session, + bucket_2.name, + object_key=f"{prefix}/{object_key}0", + ) + + try: + for verified in TimeoutSampler( + timeout=300, + sleep=30, + func=verify_delete_markers, + bucket_1=bucket_1, + bucket_2=bucket_2, + prefix=prefix, + ): + if verified: + return True + except TimeoutExpiredError: + logger.error( + f"\nDeleteMarkers dont exist for the object {object_key} " + f"in both the buckets" + ) + return False + + # Take the noobaa db backup and then perform the object deletion + # with deletion sync disabled. + logger.info("Taking backup of noobaa db") + noobaa_pvc_obj = get_pvc_objs(pvc_names=[constants.NOOBAA_DB_PVC_NAME]) + _, snap_obj = noobaa_db_backup(noobaa_pvc_obj) + + logger.info("Disabling deletion sync for the second bucket") + replication_2["rules"][0]["sync_deletions"] = False + update_replication_policy(bucket_1.name, replication_1) + + logger.info(f"Deleting the object {object_key} from the bucket {bucket_1.name}") + mockup_logger_source.delete_objs_and_log( + bucket_1.name, [f"{object_key}0"], prefix=prefix_1 + ) + # s3_delete_object(mcg_obj_session, bucket_1.name, f"{prefix_1}/{object_key}0") + + assert not compare_bucket_object_list( + mcg_obj_session, + bucket_1.name, + bucket_2.name, + timeout=300, + ), "Deletion sync worked even when deletion sync was disabled" + logger.info("Deletion sync didn't seem to work as expected") + + assert not sample_if_delete_marker_exists( + bucket_1, bucket_2, prefix_1 + ), "DeleteMarkers are synced when it was not suppose to sync" + logger.info( + "DeleteMarkers are not synced between the buckets after " + "object deleted as expected" + ) + + # Recover the noobaa db from the backup and perform + # object deletion and verify deletion sync works + logger.info("Recovering noobaa db from backup") + noobaa_db_recovery_from_backup(snap_obj, noobaa_pvc_obj, noobaa_pods) + wait_for_noobaa_pods_running(timeout=420) + + logger.info(f"Deleting the object {object_key} from the bucket {bucket_2.name}") + # s3_delete_object(mcg_obj_session, bucket_2.name, f"{prefix_2}/{object_key}0") + logger.info(f"Deleting the object {object_key} from the bucket {bucket_1.name}") + mockup_logger_source.delete_objs_and_log( + bucket_2.name, [f"{object_key}0"], prefix=prefix_2 + ) + + assert compare_bucket_object_list( + mcg_obj_session, + bucket_1.name, + bucket_2.name, + timeout=600, + ), ( + "Deletion sync didnt work while it suppose to work, " + "even after noobaa db recovery" + ) + logger.info("Deletion sync worked as expected") + + assert sample_if_versions_match(bucket_1, bucket_2, prefix_2), ( + f"Source bucket and target buckets dont have matching " + f"versions for the object {object_key}" + ) + logger.info( + f"Replication works from {bucket_1.name} to {bucket_2.name} and " + f"has all the versions of object {object_key}" + ) + + assert sample_if_delete_marker_exists( + bucket_1, bucket_2, prefix_1 + ), "DeleteMarkers are not synced between the buckets after object is deleted" + logger.info( + "DeleteMarkers are synced between the buckets after object deleted as expected" + )