Skip to content

Commit

Permalink
[DPE-3905] - Large deployment relations - Integration tests (#281)
Browse files Browse the repository at this point in the history
## Issue
This PR implements
[DPE-3905](https://warthogs.atlassian.net/browse/DPE-3905). Namely, this
PR implements:
- Integration tests for Large deployment relations
- various fixes 


[DPE-3905]:
https://warthogs.atlassian.net/browse/DPE-3905?atlOrigin=eyJpIjoiNWRkNTljNzYxNjVmNDY3MDlhMDU5Y2ZhYzA5YTRkZjUiLCJwIjoiZ2l0aHViLWNvbS1KU1cifQ

---------

Co-authored-by: phvalguima <[email protected]>
  • Loading branch information
Mehdi-Bendriss and phvalguima authored May 4, 2024
1 parent 6f1322a commit 093a086
Show file tree
Hide file tree
Showing 5 changed files with 293 additions and 19 deletions.
2 changes: 1 addition & 1 deletion lib/charms/opensearch/v0/opensearch_backups.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ def _on_peer_relation_changed(self, event) -> None:
event.defer()
return

if not (data := event.relation.data.get(event.app)):
if not (data := event.relation.data.get(event.app)) or not data.get("data"):
return
data = PeerClusterRelData.from_str(data["data"])
s3_credentials = data.credentials.s3
Expand Down
14 changes: 11 additions & 3 deletions lib/charms/opensearch/v0/opensearch_relation_peer_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,9 @@ def _on_peer_cluster_relation_changed(self, event: RelationChangedEvent):
return

# get list of relations with this orchestrator
target_relation_ids = [rel.id for rel in self.charm.model.relations[self.relation_name]]
target_relation_ids = [
rel.id for rel in self.charm.model.relations[self.relation_name] if len(rel.units) > 0
]

# fetch emitting app planned units and broadcast
self._put_planned_units(
Expand Down Expand Up @@ -200,7 +202,11 @@ def _on_peer_cluster_relation_departed(self, event: RelationDepartedEvent) -> No
return

# we need to update the fleet planned units
target_relation_ids = [rel.id for rel in self.charm.model.relations[self.relation_name]]
target_relation_ids = [
rel.id
for rel in self.charm.model.relations[self.relation_name]
if rel.id != event.relation.id and len(rel.units) > 0
]
self._put_planned_units(event.app.name, 0, target_relation_ids)

def refresh_relation_data(self, event: EventBase) -> None:
Expand All @@ -209,7 +215,9 @@ def refresh_relation_data(self, event: EventBase) -> None:
return

# all relations with the current orchestrator
all_relation_ids = [rel.id for rel in self.charm.model.relations[self.relation_name]]
all_relation_ids = [
rel.id for rel in self.charm.model.relations[self.relation_name] if len(rel.units) > 0
]

# get deployment descriptor of current app
deployment_desc = self.charm.opensearch_peer_cm.deployment_desc()
Expand Down
3 changes: 2 additions & 1 deletion tests/integration/ha/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,12 +200,13 @@ async def get_number_of_shards_by_node(ops_test: OpsTest, unit_ip: str) -> Dict[
wait=wait_fixed(wait=15) + wait_random(0, 5),
stop=stop_after_attempt(25),
)
async def all_nodes(ops_test: OpsTest, unit_ip: str) -> List[Node]:
async def all_nodes(ops_test: OpsTest, unit_ip: str, app: str = APP_NAME) -> List[Node]:
"""Fetch all cluster nodes."""
response = await http_request(
ops_test,
"GET",
f"https://{unit_ip}:9200/_nodes",
app=app,
)
nodes = response.get("nodes", {})

Expand Down
237 changes: 237 additions & 0 deletions tests/integration/ha/test_large_deployments_relations.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,237 @@
#!/usr/bin/env python3
# Copyright 2023 Canonical Ltd.
# See LICENSE file for licensing details.

import asyncio
import logging
import time

import pytest
from charms.opensearch.v0.constants_charm import PClusterNoRelation, TLSRelationMissing
from pytest_operator.plugin import OpsTest

from ..helpers import MODEL_CONFIG, SERIES, get_leader_unit_ip
from ..helpers_deployments import wait_until
from ..tls.test_tls import TLS_CERTIFICATES_APP_NAME
from .continuous_writes import ContinuousWrites
from .helpers import all_nodes
from .test_horizontal_scaling import IDLE_PERIOD

logger = logging.getLogger(__name__)

REL_ORCHESTRATOR = "peer-cluster-orchestrator"
REL_PEER = "peer-cluster"

MAIN_APP = "opensearch-main"
FAILOVER_APP = "opensearch-failover"
DATA_APP = "opensearch-data"
INVALID_APP = "opensearch-invalid"

CLUSTER_NAME = "log-app"
INVALID_CLUSTER_NAME = "timeseries"

APP_UNITS = {MAIN_APP: 3, FAILOVER_APP: 3, DATA_APP: 2, INVALID_APP: 1}


@pytest.mark.runner(["self-hosted", "linux", "X64", "jammy", "xlarge"])
@pytest.mark.group(1)
@pytest.mark.abort_on_fail
@pytest.mark.skip_if_deployed
async def test_build_and_deploy(ops_test: OpsTest) -> None:
"""Build and deploy one unit of OpenSearch."""
# it is possible for users to provide their own cluster for HA testing.
# Hence, check if there is a pre-existing cluster.
my_charm = await ops_test.build_charm(".")
await ops_test.model.set_config(MODEL_CONFIG)

# Deploy TLS Certificates operator.
config = {"ca-common-name": "CN_CA"}
await asyncio.gather(
ops_test.model.deploy(TLS_CERTIFICATES_APP_NAME, channel="stable", config=config),
ops_test.model.deploy(
my_charm,
application_name=MAIN_APP,
num_units=3,
series=SERIES,
config={"cluster_name": CLUSTER_NAME},
),
ops_test.model.deploy(
my_charm,
application_name=FAILOVER_APP,
num_units=3,
series=SERIES,
config={"cluster_name": CLUSTER_NAME, "init_hold": True},
),
ops_test.model.deploy(
my_charm,
application_name=DATA_APP,
num_units=2,
series=SERIES,
config={"cluster_name": CLUSTER_NAME, "init_hold": True, "roles": "data.hot,ml"},
),
ops_test.model.deploy(
my_charm,
application_name=INVALID_APP,
num_units=1,
series=SERIES,
config={"cluster_name": INVALID_CLUSTER_NAME, "init_hold": True, "roles": "data.cold"},
),
)

# wait until the TLS operator is ready
await wait_until(
ops_test,
apps=[TLS_CERTIFICATES_APP_NAME],
apps_statuses=["active"],
units_statuses=["active"],
wait_for_exact_units={TLS_CERTIFICATES_APP_NAME: 1},
idle_period=IDLE_PERIOD,
)

# confirm all apps are blocked because NO TLS relation established
await wait_until(
ops_test,
apps=list(APP_UNITS.keys()),
apps_full_statuses={
MAIN_APP: {"blocked": [TLSRelationMissing]},
FAILOVER_APP: {"blocked": [PClusterNoRelation]},
DATA_APP: {"blocked": [PClusterNoRelation]},
INVALID_APP: {"blocked": [PClusterNoRelation]},
},
units_full_statuses={
MAIN_APP: {"units": {"blocked": [TLSRelationMissing]}},
FAILOVER_APP: {"units": {"active": []}},
DATA_APP: {"units": {"active": []}},
INVALID_APP: {"units": {"active": []}},
},
wait_for_exact_units={app: units for app, units in APP_UNITS.items()},
idle_period=IDLE_PERIOD,
)


@pytest.mark.runner(["self-hosted", "linux", "X64", "jammy", "xlarge"])
@pytest.mark.group(1)
@pytest.mark.abort_on_fail
async def test_invalid_conditions(ops_test: OpsTest) -> None:
"""Check invalid conditions under different states."""
# integrate an app with the main-orchestrator when TLS is not related to the provider
await ops_test.model.integrate(f"{FAILOVER_APP}:{REL_PEER}", f"{MAIN_APP}:{REL_ORCHESTRATOR}")
await wait_until(
ops_test,
apps=[MAIN_APP, FAILOVER_APP],
apps_full_statuses={
MAIN_APP: {"blocked": [TLSRelationMissing]},
FAILOVER_APP: {
"waiting": ["TLS not fully configured in related 'main-orchestrator'."]
},
},
units_full_statuses={
MAIN_APP: {"units": {"blocked": [TLSRelationMissing]}},
FAILOVER_APP: {"units": {"active": []}},
},
wait_for_exact_units={
MAIN_APP: APP_UNITS[MAIN_APP],
FAILOVER_APP: APP_UNITS[FAILOVER_APP],
},
idle_period=IDLE_PERIOD,
)

# integrate TLS to all applications
for app in [MAIN_APP, FAILOVER_APP, DATA_APP, INVALID_APP]:
await ops_test.model.integrate(app, TLS_CERTIFICATES_APP_NAME)

await wait_until(
ops_test,
apps=[MAIN_APP, FAILOVER_APP, DATA_APP, INVALID_APP],
apps_full_statuses={
MAIN_APP: {"active": []},
FAILOVER_APP: {"active": []},
DATA_APP: {"blocked": [PClusterNoRelation]},
INVALID_APP: {"blocked": [PClusterNoRelation]},
},
units_statuses=["active"],
wait_for_exact_units={app: units for app, units in APP_UNITS.items()},
idle_period=IDLE_PERIOD,
)

c_writes = ContinuousWrites(ops_test, app=MAIN_APP)
await c_writes.start()
time.sleep(120)
await c_writes.stop()

# fetch nodes, we should have 6 nodes (main + failover)-orchestrators
leader_unit_ip = await get_leader_unit_ip(ops_test, app=MAIN_APP)
nodes = await all_nodes(ops_test, leader_unit_ip, app=MAIN_APP)
assert len(nodes) == 6, f"Wrong node count. Expecting 6 online nodes, found: {len(nodes)}."

# integrate cluster with different name
await ops_test.model.integrate(f"{INVALID_APP}:{REL_PEER}", f"{MAIN_APP}:{REL_ORCHESTRATOR}")
await wait_until(
ops_test,
apps=[MAIN_APP, INVALID_APP],
apps_full_statuses={
MAIN_APP: {"active": []},
INVALID_APP: {
"blocked": ["Cannot relate 2 clusters with different 'cluster_name' values."]
},
},
units_statuses=["active"],
wait_for_exact_units={MAIN_APP: APP_UNITS[MAIN_APP], INVALID_APP: APP_UNITS[INVALID_APP]},
idle_period=IDLE_PERIOD,
)

# delete the invalid app name
await ops_test.model.remove_application(
INVALID_APP, block_until_done=True, force=True, destroy_storage=True, no_wait=True
)


@pytest.mark.runner(["self-hosted", "linux", "X64", "jammy", "xlarge"])
@pytest.mark.group(1)
@pytest.mark.abort_on_fail
async def test_large_deployment_fully_formed(
ops_test: OpsTest, c_writes: ContinuousWrites, c_writes_runner
) -> None:
"""Test that under optimal conditions all the nodes form the same big cluster."""
await ops_test.model.integrate(f"{DATA_APP}:{REL_PEER}", f"{MAIN_APP}:{REL_ORCHESTRATOR}")
await ops_test.model.integrate(f"{DATA_APP}:{REL_PEER}", f"{FAILOVER_APP}:{REL_ORCHESTRATOR}")

await wait_until(
ops_test,
apps=[MAIN_APP, FAILOVER_APP, DATA_APP],
apps_statuses=["active"],
units_statuses=["active"],
wait_for_exact_units={
app: units for app, units in APP_UNITS.items() if app != INVALID_APP
},
idle_period=IDLE_PERIOD,
)

# fetch nodes, we should have 6 nodes (main + failover)-orchestrators
leader_unit_ip = await get_leader_unit_ip(ops_test, app=MAIN_APP)
nodes = await all_nodes(ops_test, leader_unit_ip, app=MAIN_APP)
assert len(nodes) == 8, f"Wrong node count. Expecting 8 online nodes, found: {len(nodes)}."

# check the roles
auto_gen_roles = ["cluster_manager", "coordinating_only", "data", "ingest", "ml"]
data_roles = ["data", "ml"]
for app, node_count in [(MAIN_APP, 3), (FAILOVER_APP, 3), (DATA_APP, 2)]:
current_app_nodes = [node for node in nodes if node.app_name == app]
assert (
len(current_app_nodes) == node_count
), f"Wrong count for {app}:{len(current_app_nodes)} - expected:{node_count}"

roles = current_app_nodes[0].roles
temperature = current_app_nodes[0].temperature
if app in [MAIN_APP, FAILOVER_APP]:
assert sorted(roles) == sorted(
auto_gen_roles
), f"Wrong roles for {app}:{roles} - expected:{auto_gen_roles}"
assert temperature is None, f"Wrong temperature for {app}:{roles} - expected:None"
else:
assert sorted(roles) == sorted(
data_roles
), f"Wrong roles for {app}:{roles} - expected:{data_roles}"
assert (
temperature == "hot"
), f"Wrong temperature for {app}:{temperature} - expected:hot"
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
SERIES,
check_cluster_formation_successful,
cluster_health,
get_application_unit_ids,
get_application_unit_names,
get_application_unit_status,
get_leader_unit_ip,
)
from ..helpers_deployments import wait_until
Expand Down Expand Up @@ -109,17 +109,12 @@ async def test_set_roles_manually(
assert sorted(node.roles) == ["cluster_manager", "data"], "roles unchanged"
assert node.temperature == "cold", "Temperature unchanged."

# scale up cluster by 1 unit, this should break the quorum and put the charm in a blocked state
# scale up cluster by 1 unit, this should give the new node the same roles
await ops_test.model.applications[app].add_unit(count=1)
# TODO: this should have to go once we full trust that quorum is automatically established
await wait_until(
ops_test,
apps=[app],
apps_full_statuses={
app: {
"blocked": [PClusterWrongNodesCountForQuorum],
"active": [],
},
},
units_full_statuses={
app: {
"units": {
Expand All @@ -131,10 +126,43 @@ async def test_set_roles_manually(
wait_for_exact_units=len(nodes) + 1,
idle_period=IDLE_PERIOD,
)
# new_unit_id = max(
# [int(unit.name.split("/")[1]) for unit in ops_test.model.applications[app].units]
# )
new_nodes = await all_nodes(ops_test, leader_unit_ip)
assert len(new_nodes) == len(nodes) + 1

# remove new unit
last_unit_id = sorted(get_application_unit_ids(ops_test, app))[-1]
await ops_test.model.applications[app].destroy_unit(f"{app}/{last_unit_id}")


@pytest.mark.group(1)
@pytest.mark.abort_on_fail
async def test_switch_back_to_auto_generated_roles(
ops_test: OpsTest, c_writes: ContinuousWrites, c_writes_runner
) -> None:
"""Check roles changes in all nodes."""
app = (await app_name(ops_test)) or APP_NAME

leader_unit_ip = await get_leader_unit_ip(ops_test, app=app)
nodes = await all_nodes(ops_test, leader_unit_ip)

await ops_test.model.applications[app].set_config({"roles": ""})
await wait_until(
ops_test,
apps=[app],
apps_statuses=["active"],
units_statuses=["active"],
wait_for_exact_units=len(nodes),
idle_period=IDLE_PERIOD,
)

app_unit_status = await get_application_unit_status(ops_test, app=app)
assert any(unit.value == "active" for unit in app_unit_status.values())
# assert app_unit_status[new_unit_id].message == PClusterWrongNodesCountForQuorum
# check that nodes' roles have indeed changed
nodes = await all_nodes(ops_test, leader_unit_ip)
for node in nodes:
assert sorted(node.roles) == [
"cluster_manager",
"coordinating_only",
"data",
"ingest",
"ml",
]
assert node.temperature is None, "Node temperature was erroneously set."

0 comments on commit 093a086

Please sign in to comment.