Skip to content

Commit

Permalink
[4.18] Test bucket replication with versioning - utility functions an…
Browse files Browse the repository at this point in the history
…d happy path validation (#10957)

Signed-off-by: Sagi Hirshfeld <[email protected]>
  • Loading branch information
sagihirshfeld authored Jan 6, 2025
1 parent e865d5e commit 24f84f9
Show file tree
Hide file tree
Showing 3 changed files with 272 additions and 0 deletions.
111 changes: 111 additions & 0 deletions ocs_ci/ocs/bucket_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2921,3 +2921,114 @@ def create_s3client_from_assume_role_creds(mcg_obj, assume_role_creds):
aws_session_token=assumed_session_token,
)
return assumed_s3_resource.meta.client


def put_bucket_versioning_via_awscli(
mcg_obj, awscli_pod, bucket_name, status="Enabled"
):
"""
Put bucket versioning using AWS CLI
Args:
mcg_obj (MCG): MCG object
awscli_pod (Pod): Pod object where AWS CLI is installed
bucket_name (str): Name of the bucket
status (str): Status of the versioning
"""
awscli_pod.exec_cmd_on_pod(
command=craft_s3_command(
f"put-bucket-versioning --bucket {bucket_name} --versioning-configuration Status={status}",
mcg_obj=mcg_obj,
api=True,
)
)


def upload_obj_versions(mcg_obj, awscli_pod, bucket_name, obj_key, amount=1, size="1M"):
"""
Upload multiple random data versions to a given object key and return their ETag values
Args:
mcg_obj (MCG): MCG object
awscli_pod (Pod): Pod object where AWS CLI is installed
bucket_name (str): Name of the bucket
obj_key (str): Object key
amount (int): Number of versions to create
size (str): Size of the object. I.E 1M
Returns:
list: List of ETag values of versions in latest to oldest order
"""
file_dir = os.path.join("/tmp", str(uuid4()))
awscli_pod.exec_cmd_on_pod(f"mkdir {file_dir}")

etags = []

for i in range(amount):
file_path = os.path.join(file_dir, f"{obj_key}_{i}")
awscli_pod.exec_cmd_on_pod(
command=f"dd if=/dev/urandom of={file_path} bs={size} count=1"
)
# Use debug and redirect it to stdout to get
# the uploaded object's ETag in the response
resp = awscli_pod.exec_cmd_on_pod(
command=craft_s3_command(
f"cp {file_path} s3://{bucket_name}/{obj_key} --debug 2>&1",
mcg_obj=mcg_obj,
),
out_yaml_format=False,
)

# Parse the ETag from the response
# Filter the line containing the JSON
line = next(filter(lambda line: "ETag" in line, resp.splitlines()))
json_start = line.index("{")
json_str = line[json_start:]

# Fix quotes to read as dict
json_str = json_str.replace("'", '"')
json_str = json_str.replace('""', '"')

# Convert to dict and extract the ETag
new_etag = json.loads(json_str).get("ETag")

# Later versions should precede the older ones
# this achieves the expected order of versions
# we'd get from list-object-versions
etags.insert(0, new_etag)

return etags


def get_obj_versions(mcg_obj, awscli_pod, bucket_name, obj_key):
"""
Get object versions using AWS CLI
Args:
mcg_obj (MCG): MCG object
awscli_pod (Pod): Pod object where AWS CLI is installed
bucket_name (str): Name of the bucket
obj_key (str): Object key
Returns:
list: List of dictionaries containing the versions data
"""
resp = awscli_pod.exec_cmd_on_pod(
command=craft_s3_command(
f"list-object-versions --bucket {bucket_name} --prefix {obj_key}",
mcg_obj=mcg_obj,
api=True,
),
out_yaml_format=False,
)

versions_dicts = []
if resp and "Versions" in resp:
versions_dicts = json.loads(resp).get("Versions")

# Remove quotes from the ETag values for easier usage
for d in versions_dicts:
d["ETag"] = d["ETag"].strip('"')

return versions_dicts
17 changes: 17 additions & 0 deletions ocs_ci/ocs/resources/mcg_replication_policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,3 +99,20 @@ def to_dict(self):
dict["log_replication_info"]["endpoint_type"] = "AZURE"

return dict


class ReplicationPolicyWithVersioning(McgReplicationPolicy):
"""
A class to handle the MCG bucket replication policy JSON structure with versioning.
"""

def __init__(self, target_bucket, sync_versions=True, prefix=""):
super().__init__(target_bucket, prefix)
self.sync_versions = sync_versions

def to_dict(self):
dict = super().to_dict()
dict["rules"][0]["sync_versions"] = self.sync_versions

return dict
144 changes: 144 additions & 0 deletions tests/functional/object/mcg/test_bucket_replication_with_versioning.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
import json
import logging
from uuid import uuid4

import pytest

from ocs_ci.framework.pytest_customization.marks import (
mcg,
polarion_id,
red_squad,
runs_on_provider,
tier1,
)
from ocs_ci.framework.testlib import MCGTest
from ocs_ci.ocs import constants
from ocs_ci.ocs.bucket_utils import (
get_obj_versions,
put_bucket_versioning_via_awscli,
update_replication_policy,
upload_obj_versions,
)
from ocs_ci.ocs.exceptions import TimeoutExpiredError
from ocs_ci.ocs.resources.mcg_replication_policy import ReplicationPolicyWithVersioning
from ocs_ci.utility.utils import TimeoutSampler

logger = logging.getLogger(__name__)


TIMEOUT = 300


@mcg
@red_squad
@runs_on_provider
class TestReplicationWithVersioning(MCGTest):
"""
Test suite for MCG object replication policies
"""

@pytest.fixture(autouse=True, scope="class")
def reduce_replication_delay_setup(self, add_env_vars_to_noobaa_core_class):
"""
Reduce the replication delay to one minute
Args:
new_delay_in_miliseconds (function): A function to add env vars to the noobaa-core pod
"""
new_delay_in_miliseconds = 60 * 1000
new_env_var_touples = [
(constants.BUCKET_REPLICATOR_DELAY_PARAM, new_delay_in_miliseconds),
(constants.BUCKET_LOG_REPLICATOR_DELAY_PARAM, new_delay_in_miliseconds),
]
add_env_vars_to_noobaa_core_class(new_env_var_touples)

@pytest.fixture()
def buckets_with_versioning(self, bucket_factory, mcg_obj, awscli_pod_session):
"""
Prepare two buckets with versioning enabled
Args:
bucket_factory: Fixture for creating new buckets
mcg_obj_session: The session-scoped MCG object
awscli_pod_session: The session-scoped AWSCLI pod
Returns:
Tuple of two buckets: source and target
"""
# Using the OC interface allows patching a repli policy on the OBC
bucket_a = bucket_factory(1, "OC")[0]
bucket_b = bucket_factory(1, "OC")[0]

put_bucket_versioning_via_awscli(mcg_obj, awscli_pod_session, bucket_a.name)
put_bucket_versioning_via_awscli(mcg_obj, awscli_pod_session, bucket_b.name)

return bucket_a, bucket_b

@tier1
@polarion_id("OCS-6294")
def test_bucket_replication_with_versioning(
self,
awscli_pod,
mcg_obj,
buckets_with_versioning,
):
"""
1. Create two buckets and enable versioning on both
2. Set a bucket replication policy with versioning enabled on the source bucket
3. Write some versions to the source bucket
4. Verify the versions were replicated to the target bucket in the same order
"""
obj_key = "test_obj_" + str(uuid4())[:4]
versions_amount = 5

# 1. Create two buckets and enable versioning on both
source_bucket, target_bucket = buckets_with_versioning

# 2. Set a bucket replication policy with versioning enabled on the source bucket
replication_policy = ReplicationPolicyWithVersioning(
target_bucket=target_bucket.name
)
update_replication_policy(source_bucket.name, replication_policy.to_dict())

# 3. Write some versions to the source bucket
source_etags = upload_obj_versions(
mcg_obj,
awscli_pod,
source_bucket.name,
obj_key=obj_key,
amount=versions_amount,
)
logger.info(f"Uploaded versions with etags: {source_etags}")

# 4. Verify the versions were replicated to the target bucket in the same order
last_target_etags = None
try:
for target_versions in TimeoutSampler(
timeout=TIMEOUT,
sleep=30,
func=get_obj_versions,
mcg_obj=mcg_obj,
awscli_pod=awscli_pod,
bucket_name=target_bucket.name,
obj_key=obj_key,
):
target_etags = [v["ETag"] for v in target_versions]
if source_etags == target_etags:
logger.info(
f"Source and target etags match: {source_etags} == {target_etags}"
)
break
logger.warning(
f"Source and target etags do not match: {source_etags} != {target_etags}"
)
last_target_etags = target_etags
except TimeoutExpiredError as e:
err_msg = (
f"Source and target etags do not match after {TIMEOUT} seconds:\n"
f"Source etags:\n{json.dumps(source_etags, indent=2)}\n"
f"Target etags:\n{json.dumps(last_target_etags, indent=2)}"
)
logger.error(err_msg)
raise TimeoutExpiredError(f"{str(e)}\n\n{err_msg}")

logger.info("All versions were replicated successfully")

0 comments on commit 24f84f9

Please sign in to comment.