Skip to content

Commit

Permalink
[DPE-3426] migrate from _cat to _cluster APIs (#390)
Browse files Browse the repository at this point in the history
## Issue
We are using some `_cat` API calls to fetch data around shard
information and node statuses. The `_cat` API is designed as a
human-readable interface.

## Solution
Move the `_cat` API calls to other API endpoints such as
`_cluster/state` and `_cluster/health` while keeping the same data
model.
  • Loading branch information
skourta authored Aug 16, 2024
1 parent f98c36b commit 54d47e2
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 11 deletions.
56 changes: 49 additions & 7 deletions lib/charms/opensearch/v0/helper_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,10 +186,38 @@ def shards(
verbose: bool = False,
) -> List[Dict[str, str]]:
"""Get all shards of all indexes in the cluster."""
params = ""
if verbose:
params = "?v=true&h=index,shard,prirep,state,unassigned.reason&s=state"
return opensearch.request("GET", f"/_cat/shards{params}", host=host, alt_hosts=alt_hosts)
cluster_state = opensearch.request(
"GET", "_cluster/state/routing_table,metadata,nodes", host=host, alt_hosts=alt_hosts
)

nodes = cluster_state["nodes"]

shards_info = []
for index_name, index_data in cluster_state["routing_table"]["indices"].items():
for shard_num, shard_data in index_data["shards"].items():
for shard in shard_data:
node_data = nodes.get(shard["node"], {})
node_name = node_data.get("name", None)
node_ip = (
node_data["transport_address"].split(":")[0]
if "transport_address" in node_data
else None
)

shard_info = {
"index": index_name,
"shard": shard_num,
"prirep": shard["primary"] and "p" or "r",
"state": shard["state"],
"ip": node_ip,
"node": node_name,
}
if verbose:
shard_info["unassigned.reason"] = shard.get("unassigned_info", {}).get(
"reason", None
)
shards_info.append(shard_info)
return shards_info

@staticmethod
@retry(
Expand Down Expand Up @@ -222,10 +250,24 @@ def indices(
alt_hosts: Optional[List[str]] = None,
) -> Dict[str, Dict[str, str]]:
"""Get all shards of all indexes in the cluster."""
endpoint = "/_cat/indices?expand_wildcards=all"
# Get cluster state
cluster_state = opensearch.request(
"GET", "/_cluster/state/metadata", host=host, alt_hosts=alt_hosts
)
indices_state = cluster_state["metadata"]["indices"]

# Get cluster health
cluster_health = opensearch.request(
"GET", "/_cluster/health?level=indices", host=host, alt_hosts=alt_hosts
)
indices_health = cluster_health["indices"]

idx = {}
for index in opensearch.request("GET", endpoint, host=host, alt_hosts=alt_hosts):
idx[index["index"]] = {"health": index["health"], "status": index["status"]}
for index in indices_state.keys():
idx[index] = {
"health": indices_health[index]["status"],
"status": indices_state[index]["state"],
}
return idx

@staticmethod
Expand Down
17 changes: 13 additions & 4 deletions lib/charms/opensearch/v0/opensearch_base_charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -1183,11 +1183,20 @@ def _remove_data_role_from_dedicated_cm_if_needed( # noqa: C901
try:
for attempt in Retrying(stop=stop_after_attempt(3), wait=wait_fixed(0.5)):
with attempt:
resp = self.opensearch.request(
"GET", endpoint=f"/_cat/allocation/{self.unit_name}?format=json"
search_shards_info = self.opensearch.request(
"GET", "/*/_search_shards?expand_wildcards=all"
)
for entry in resp:
if entry.get("node") == self.unit_name and entry.get("shards") != 0:

# find the node id of the current unit
node_id = None
for node_id, node in search_shards_info["nodes"].items():
if node["name"] == self.unit_name:
break
assert node_id is not None # should never happen

# check if the node has any shards assigned to it
for shard_data in search_shards_info["shards"]:
if shard_data[0]["node"] == node_id:
raise Exception
return True
except RetryError:
Expand Down
22 changes: 22 additions & 0 deletions tests/unit/lib/test_helper_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -314,3 +314,25 @@ def test_get_cluster_settings(self, request_mock):
host=None,
alt_hosts=None,
)

@patch("charms.opensearch.v0.helper_cluster.ClusterState.shards")
def test_count_shards_by_state(self, shards):
"""Test the busy shards filtering."""
shards.return_value = [
{"index": "index1", "state": "STARTED", "node": "opensearch-0"},
{"index": "index1", "state": "INITIALIZING", "node": "opensearch-1"},
{"index": "index2", "state": "STARTED", "node": "opensearch-0"},
{"index": "index2", "state": "RELOCATING", "node": "opensearch-1"},
{"index": "index3", "state": "STARTED", "node": "opensearch-0"},
{"index": "index3", "state": "STARTED", "node": "opensearch-1"},
{"index": "index4", "state": "STARTED", "node": "opensearch-2"},
{"index": "index4", "state": "INITIALIZING", "node": "opensearch-2"},
]
self.assertDictEqual(
ClusterState.shards_by_state(self.opensearch),
{
"STARTED": 5,
"INITIALIZING": 2,
"RELOCATING": 1,
},
)

0 comments on commit 54d47e2

Please sign in to comment.