Skip to content

Commit

Permalink
Fix flakiness of Plugins tests + backup tests wrong target application (
Browse files Browse the repository at this point in the history
#300)

This PR fixes:
- Flakiness of plugins integration tests 
- Backup tests in large deployments were not passing the right
application name.

---------

Co-authored-by: Mehdi-Bendriss <[email protected]>
  • Loading branch information
phvalguima and Mehdi-Bendriss authored May 8, 2024
1 parent 6670d19 commit 77da032
Show file tree
Hide file tree
Showing 8 changed files with 120 additions and 105 deletions.
34 changes: 19 additions & 15 deletions tests/integration/ha/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,13 @@
class Shard:
"""Class for holding a shard."""

def __init__(self, index: str, num: int, is_prim: bool, node_id: str, unit_id: int):
def __init__(self, index: str, num: int, is_prim: bool, node_id: str, unit_id: int, app: str):
self.index = index
self.num = num
self.is_prim = is_prim
self.node_id = node_id
self.unit_id = unit_id
self.app = app


async def app_name(ops_test: OpsTest) -> Optional[str]:
Expand Down Expand Up @@ -155,14 +156,15 @@ async def get_shards_by_index(ops_test: OpsTest, unit_ip: str, index_name: str)
result = []
for shards_collection in response["shards"]:
for shard in shards_collection:
unit_id = int(nodes[shard["node"]]["name"].split("-")[1])
node_name_split = nodes[shard["node"]]["name"].split("-")
result.append(
Shard(
index=index_name,
num=shard["shard"],
is_prim=shard["primary"],
node_id=shard["node"],
unit_id=unit_id,
unit_id=int(node_name_split[-1]),
app="-".join(node_name_split[:-1]),
)
)

Expand Down Expand Up @@ -236,32 +238,34 @@ async def assert_continuous_writes_increasing(


async def assert_continuous_writes_consistency(
ops_test: OpsTest, c_writes: ContinuousWrites, app: str
ops_test: OpsTest, c_writes: ContinuousWrites, apps: List[str]
) -> None:
"""Continuous writes checks."""
result = await c_writes.stop()
logger.info(f"Continuous writes result: {result}")
assert result.max_stored_id == result.count - 1
assert result.max_stored_id == result.last_expected_id

# investigate the data in each shard, primaries and their respective replicas
units_ips = await get_application_unit_ids_ips(ops_test, app)
shards = await get_shards_by_index(
ops_test, list(units_ips.values())[0], ContinuousWrites.INDEX_NAME
)
unit_ip = await get_leader_unit_ip(ops_test, apps[0])

# fetch unit ips by unit id by application
apps_units_ips = {app: await get_application_unit_ids_ips(ops_test, app) for app in apps}

# investigate the data in each shard, primaries and their respective replicas
shards = await get_shards_by_index(ops_test, unit_ip, ContinuousWrites.INDEX_NAME)
shards_by_id = {}
for shard in shards:
shards_by_id.setdefault(shard.num, []).append(shard)

# count data on each shard. For the continuous writes index, we have 2 primary shards
# and replica shards of each on all the nodes. In other words: prim1 and its replicas
# will have a different "num" than prim2 and its replicas.
# count data on each shard. For the **balanced** continuous writes index, we have 2
# primary shards and replica shards of each on all the nodes. In other words: prim1 and
# its replicas will have a different "num" than prim2 and its replicas.
count_from_shards = 0
for shard_num, shards_list in shards_by_id.items():
count_by_shard = [
await c_writes.count(
units_ips[shard.unit_id], preference=f"_shards:{shard_num}|_only_local"
unit_ip=apps_units_ips[shard.app][shard.unit_id],
preference=f"_shards:{shard_num}|_only_local",
)
for shard in shards_list
]
Expand Down Expand Up @@ -532,7 +536,7 @@ async def assert_start_and_check_continuous_writes(
time.sleep(10)
# Ensure we have writes happening and the index is consistent at the end
await assert_continuous_writes_increasing(writer)
await assert_continuous_writes_consistency(ops_test, writer, app)
await assert_continuous_writes_consistency(ops_test, writer, [app])
# Clear the writer manually, as we are not using the conftest c_writes_runner to do so
await writer.clear()

Expand Down Expand Up @@ -571,7 +575,7 @@ async def list_backups(ops_test: OpsTest, leader_id: int, app: str = APP_NAME) -


async def assert_restore_indices_and_compare_consistency(
ops_test: OpsTest, app: str, leader_id: int, unit_ip: str, backup_id: int
ops_test: OpsTest, app: str, leader_id: int, unit_ip: str, backup_id: str
) -> None:
"""Ensures that continuous writes index has at least the value below."""
original_count = await index_docs_count(ops_test, app, unit_ip, ContinuousWrites.INDEX_NAME)
Expand Down
122 changes: 60 additions & 62 deletions tests/integration/ha/test_backups.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,47 @@

logger = logging.getLogger(__name__)

DEPLOY_CLOUD_GROUP_MARKS = [
(
pytest.param(
cloud_name,
deploy_type,
id=f"{cloud_name}-{deploy_type}",
marks=pytest.mark.group(f"{cloud_name}-{deploy_type}"),
)
)
for cloud_name in ["microceph", "aws"]
for deploy_type in ["large", "small"]
]


DEPLOY_SMALL_ONLY_CLOUD_GROUP_MARKS = [
(
pytest.param(
cloud_name,
deploy_type,
id=f"{cloud_name}-{deploy_type}",
marks=pytest.mark.group(f"{cloud_name}-{deploy_type}"),
)
)
for cloud_name in ["microceph", "aws"]
for deploy_type in ["small"]
]


DEPLOY_LARGE_ONLY_CLOUD_GROUP_MARKS = [
(
pytest.param(
cloud_name,
deploy_type,
id=f"{cloud_name}-{deploy_type}",
marks=pytest.mark.group(f"{cloud_name}-{deploy_type}"),
)
)
for cloud_name in ["microceph", "aws"]
for deploy_type in ["large"]
]

S3_INTEGRATOR = "s3-integrator"
S3_INTEGRATOR_CHANNEL = "latest/edge"
TIMEOUT = 10 * 60
Expand Down Expand Up @@ -189,53 +230,6 @@ async def _configure_s3(
)


S3_INTEGRATOR = "s3-integrator"
S3_INTEGRATOR_CHANNEL = "latest/edge"
TIMEOUT = 10 * 60


DEPLOY_CLOUD_GROUP_MARKS = [
(
pytest.param(
cloud_name,
deploy_type,
id=f"{cloud_name}-{deploy_type}",
marks=pytest.mark.group(f"{cloud_name}-{deploy_type}"),
)
)
for cloud_name in ["microceph", "aws"]
for deploy_type in ["large", "small"]
]


DEPLOY_SMALL_ONLY_CLOUD_GROUP_MARKS = [
(
pytest.param(
cloud_name,
deploy_type,
id=f"{cloud_name}-{deploy_type}",
marks=pytest.mark.group(f"{cloud_name}-{deploy_type}"),
)
)
for cloud_name in ["microceph", "aws"]
for deploy_type in ["small"]
]


DEPLOY_LARGE_ONLY_CLOUD_GROUP_MARKS = [
(
pytest.param(
cloud_name,
deploy_type,
id=f"{cloud_name}-{deploy_type}",
marks=pytest.mark.group(f"{cloud_name}-{deploy_type}"),
)
)
for cloud_name in ["microceph", "aws"]
for deploy_type in ["large"]
]


@pytest.mark.parametrize("cloud_name,deploy_type", DEPLOY_SMALL_ONLY_CLOUD_GROUP_MARKS)
@pytest.mark.abort_on_fail
@pytest.mark.skip_if_deployed
Expand Down Expand Up @@ -453,8 +447,9 @@ async def test_create_backup_and_restore(
) -> None:
"""Runs the backup process whilst writing to the cluster into 'noisy-index'."""
app = (await app_name(ops_test) or APP_NAME) if deploy_type == "small" else "main"
leader_id = await get_leader_unit_id(ops_test)
unit_ip = await get_leader_unit_ip(ops_test)
apps = [app] if deploy_type == "small" else [app, APP_NAME]
leader_id = await get_leader_unit_id(ops_test, app=app)
unit_ip = await get_leader_unit_ip(ops_test, app=app)
config = cloud_configs[cloud_name]

logger.info(f"Syncing credentials for {cloud_name}")
Expand All @@ -475,7 +470,7 @@ async def test_create_backup_and_restore(
)
# continuous writes checks
await assert_continuous_writes_increasing(c_writes)
await assert_continuous_writes_consistency(ops_test, c_writes, app)
await assert_continuous_writes_consistency(ops_test, c_writes, apps)
await assert_restore_indices_and_compare_consistency(
ops_test, app, leader_id, unit_ip, backup_id
)
Expand All @@ -501,8 +496,10 @@ async def test_remove_and_readd_s3_relation(
) -> None:
"""Removes and re-adds the s3-credentials relation to test backup and restore."""
app = (await app_name(ops_test) or APP_NAME) if deploy_type == "small" else "main"
leader_id: str = await get_leader_unit_id(ops_test)
unit_ip: str = await get_leader_unit_ip(ops_test)
apps = [app] if deploy_type == "small" else [app, APP_NAME]

leader_id: int = await get_leader_unit_id(ops_test, app=app)
unit_ip: str = await get_leader_unit_ip(ops_test, app=app)
config: Dict[str, str] = cloud_configs[cloud_name]

logger.info("Remove s3-credentials relation")
Expand Down Expand Up @@ -545,7 +542,7 @@ async def test_remove_and_readd_s3_relation(

# continuous writes checks
await assert_continuous_writes_increasing(c_writes)
await assert_continuous_writes_consistency(ops_test, c_writes, app)
await assert_continuous_writes_consistency(ops_test, c_writes, apps)
await assert_restore_indices_and_compare_consistency(
ops_test, app, leader_id, unit_ip, backup_id
)
Expand Down Expand Up @@ -611,13 +608,13 @@ async def test_restore_to_new_cluster(
# Credentials are set per test scenario
await ops_test.model.integrate(app, S3_INTEGRATOR)

leader_id = await get_leader_unit_id(ops_test)
unit_ip = await get_leader_unit_ip(ops_test)
leader_id = await get_leader_unit_id(ops_test, app=app)
unit_ip = await get_leader_unit_ip(ops_test, app=app)
config: Dict[str, str] = cloud_configs[cloud_name]

logger.info(f"Syncing credentials for {cloud_name}")
await _configure_s3(ops_test, config, cloud_credentials[cloud_name], app)
backups = await list_backups(ops_test, leader_id)
backups = await list_backups(ops_test, leader_id, app=app)

global cwrites_backup_doc_count
# We are expecting 2x backups available
Expand Down Expand Up @@ -660,7 +657,7 @@ async def test_restore_to_new_cluster(

# continuous writes checks
await assert_continuous_writes_increasing(writer)
await assert_continuous_writes_consistency(ops_test, writer, app)
await assert_continuous_writes_consistency(ops_test, writer, [app])
# This assert assures we have taken a new backup, after the last restore from the original
# cluster. That means the index is writable.
await assert_restore_indices_and_compare_consistency(
Expand Down Expand Up @@ -721,7 +718,8 @@ async def test_repo_missing_message(ops_test: OpsTest) -> None:
We use the message format to monitor the cluster status. We need to know if this
message pattern changed between releases of OpenSearch.
"""
unit_ip = await get_leader_unit_ip(ops_test)
app: str = (await app_name(ops_test)) or APP_NAME
unit_ip = await get_leader_unit_ip(ops_test, app=app)
resp = await http_request(
ops_test, "GET", f"https://{unit_ip}:9200/_snapshot/{S3_REPOSITORY}", json_resp=True
)
Expand All @@ -734,8 +732,8 @@ async def test_repo_missing_message(ops_test: OpsTest) -> None:
@pytest.mark.abort_on_fail
async def test_wrong_s3_credentials(ops_test: OpsTest) -> None:
"""Check the repo is misconfigured."""
unit_ip = await get_leader_unit_ip(ops_test)
app = (await app_name(ops_test)) or APP_NAME
unit_ip = await get_leader_unit_ip(ops_test, app=app)

config = {
"endpoint": "http://localhost",
Expand Down Expand Up @@ -789,9 +787,9 @@ async def test_change_config_and_backup_restore(
force_clear_cwrites_index,
) -> None:
"""Run for each cloud and update the cluster config."""
unit_ip: str = await get_leader_unit_ip(ops_test)
app: str = (await app_name(ops_test)) or APP_NAME
leader_id: str = await get_leader_unit_id(ops_test)
unit_ip: str = await get_leader_unit_ip(ops_test, app=app)
leader_id: int = await get_leader_unit_id(ops_test, app=app)

initial_count: int = 0
for cloud_name in cloud_configs.keys():
Expand Down Expand Up @@ -828,7 +826,7 @@ async def test_change_config_and_backup_restore(

# continuous writes checks
await assert_continuous_writes_increasing(writer)
await assert_continuous_writes_consistency(ops_test, writer, app)
await assert_continuous_writes_consistency(ops_test, writer, [app])
await assert_restore_indices_and_compare_consistency(
ops_test, app, leader_id, unit_ip, backup_id
)
Expand Down
18 changes: 9 additions & 9 deletions tests/integration/ha/test_ha.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ async def test_replication_across_members(
await delete_index(ops_test, app, leader_unit_ip, index_name)

# continuous writes checks
await assert_continuous_writes_consistency(ops_test, c_writes, app)
await assert_continuous_writes_consistency(ops_test, c_writes, [app])


@pytest.mark.group(1)
Expand Down Expand Up @@ -175,7 +175,7 @@ async def test_kill_db_process_node_with_primary_shard(
)

# continuous writes checks
await assert_continuous_writes_consistency(ops_test, c_writes, app)
await assert_continuous_writes_consistency(ops_test, c_writes, [app])


@pytest.mark.group(1)
Expand Down Expand Up @@ -227,7 +227,7 @@ async def test_kill_db_process_node_with_elected_cm(
)

# continuous writes checks
await assert_continuous_writes_consistency(ops_test, c_writes, app)
await assert_continuous_writes_consistency(ops_test, c_writes, [app])


@pytest.mark.group(1)
Expand Down Expand Up @@ -312,7 +312,7 @@ async def test_freeze_db_process_node_with_primary_shard(
)

# continuous writes checks
await assert_continuous_writes_consistency(ops_test, c_writes, app)
await assert_continuous_writes_consistency(ops_test, c_writes, [app])


@pytest.mark.group(1)
Expand Down Expand Up @@ -386,7 +386,7 @@ async def test_freeze_db_process_node_with_elected_cm(
)

# continuous writes checks
await assert_continuous_writes_consistency(ops_test, c_writes, app)
await assert_continuous_writes_consistency(ops_test, c_writes, [app])


@pytest.mark.group(1)
Expand Down Expand Up @@ -437,7 +437,7 @@ async def test_restart_db_process_node_with_elected_cm(
ops_test, leader_unit_ip, get_application_unit_names(ops_test, app=app)
)

await assert_continuous_writes_consistency(ops_test, c_writes, app)
await assert_continuous_writes_consistency(ops_test, c_writes, [app])


@pytest.mark.group(1)
Expand Down Expand Up @@ -498,7 +498,7 @@ async def test_restart_db_process_node_with_primary_shard(
ops_test, leader_unit_ip, get_application_unit_names(ops_test, app=app)
)

await assert_continuous_writes_consistency(ops_test, c_writes, app)
await assert_continuous_writes_consistency(ops_test, c_writes, [app])


@pytest.mark.group(1)
Expand Down Expand Up @@ -550,7 +550,7 @@ async def test_full_cluster_crash(
assert health_resp["status"] == "green", f"Cluster {health_resp['status']} - expected green."

# continuous writes checks
await assert_continuous_writes_consistency(ops_test, c_writes, app)
await assert_continuous_writes_consistency(ops_test, c_writes, [app])


@pytest.mark.group(1)
Expand Down Expand Up @@ -603,4 +603,4 @@ async def test_full_cluster_restart(
assert health_resp["status"] == "green", f"Cluster {health_resp['status']} - expected green."

# continuous writes checks
await assert_continuous_writes_consistency(ops_test, c_writes, app)
await assert_continuous_writes_consistency(ops_test, c_writes, [app])
2 changes: 1 addition & 1 deletion tests/integration/ha/test_ha_multi_clusters.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,4 +109,4 @@ async def test_multi_clusters_db_isolation(
await ops_test.model.remove_application(SECOND_APP_NAME)

# continuous writes checks
await assert_continuous_writes_consistency(ops_test, c_writes, app)
await assert_continuous_writes_consistency(ops_test, c_writes, [app])
Loading

0 comments on commit 77da032

Please sign in to comment.