Skip to content

Commit

Permalink
Add Elasticsearch healthcheck dag
Browse files Browse the repository at this point in the history
  • Loading branch information
sarayourfriend committed Feb 5, 2024
1 parent def36a7 commit 57915ca
Show file tree
Hide file tree
Showing 3 changed files with 204 additions and 1 deletion.
163 changes: 163 additions & 0 deletions catalog/dags/elasticsearch_cluster/healthcheck_dag.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
"""
Monitor staging and production Elasticsearch cluster health endpoint.
Requests the cluster health and alerts under the following conditions:
- Red cluster health
- Unexpected number of nodes
- Unresponsive cluster
Additionally, the DAG will notify (rather than alert) when the cluster health is yellow.
Yellow cluster health may or may not be an issue, depending on whether it is expected,
and occurs whenever shards and replicas are being relocated (e.g., during reindexes).
It is worthwhile to notify in these cases, as an assurance, but we could choose to add
logic that ignores yellow cluster health during data refresh or other similar operations.
"""

import json
import logging
from datetime import datetime

from airflow.decorators import dag, task
from airflow.providers.elasticsearch.hooks.elasticsearch import ElasticsearchPythonHook
from elasticsearch import Elasticsearch

from common.constants import PRODUCTION, STAGING, Environment
from common.slack import send_alert, send_message
from elasticsearch_cluster.shared import get_es_host


logger = logging.getLogger(__name__)


_DAG_ID = "{env}_cluster_healthcheck"

EXPECTED_NODE_COUNT = 6
EXPECTED_DATA_NODE_COUNT = 3
EXPECTED_MASTER_NODE_COUNT = 3


def _alert_no_response(env: Environment):
send_alert(
f"Elasticsearch {env} cluster failed to respond to healthcheck request",
_DAG_ID.format(env=env),
)

logger.error("Cluster failed to respond to healthcheck")


def _format_response_body(response_body: dict) -> str:
return f"""
Full healthcheck response body:
```
{json.dumps(response_body, indent=4)}
```
"""


def _alert_unexpected_status(env: Environment, response_body: dict):
status = response_body["status"]

send_alert(
f"""
Elasticsearch {env} cluster status is {status}.
{_format_response_body(response_body)}
""",
_DAG_ID.format(env=env),
)
logger.error(f"Unexpected cluster health status; {json.dumps(response_body)}")


def _alert_unexpected_node_count(env: Environment, response_body: dict):
node_count = response_body["number_of_nodes"]
data_node_count = response_body["number_of_data_nodes"]
master_node_count = node_count - data_node_count

send_alert(
f"""
Elasticsearch {env} cluster node count is **{node_count}**.
Expected {EXPECTED_NODE_COUNT} total nodes.
Master nodes: **{master_node_count}** of expected {EXPECTED_MASTER_NODE_COUNT}
Data nodes: **{data_node_count}** of expected {EXPECTED_DATA_NODE_COUNT}
This is a critical status change, **investigate ASAP**.
If this is expected (e.g., during controlled node or cluster changes), acknowledge immediately with explanation.
{_format_response_body(response_body)}
""",
_DAG_ID.format(env=env),
)
logger.error(f"Unexpected node count; {json.dumps(response_body)}")


def _notify_yellow_cluster_health(env: Environment, response_body: dict):
send_message(
f"""
Elasticsearch {env} cluster health is **yellow**.
This does not mean something is necessarily wrong, but if this is not expected (e.g., data refresh) then investigate cluster health now.
{_format_response_body(response_body)}
""",
_DAG_ID.format(env=env),
)
logger.info(f"Cluster health was yellow; {json.dumps(response_body)}")


@task
def ping_healthcheck(es_host: str):
es_conn: Elasticsearch = ElasticsearchPythonHook(hosts=[es_host]).get_conn

response = es_conn.cluster.health()

return response.body


@task
def notify(env: Environment, response_body: dict):
status = response_body["status"]

if status == "red":
return _alert_unexpected_status(env, response_body)

if response_body["number_of_nodes"] != EXPECTED_NODE_COUNT:
return _alert_unexpected_node_count(env, response_body)

if status == "yellow":
return _notify_yellow_cluster_health(env, response_body)

logger.info(f"Cluster health was green; {json.dumps(response_body)}")


def _cluster_healthcheck_dag(env: Environment):
es_host = get_es_host(env)
healthcheck_response = ping_healthcheck(es_host)

es_host >> healthcheck_response >> notify(env, healthcheck_response)


_SHARED_DAG_ARGS = {
# Every 15 minutes
"schedule": "*/15 * * * *",
"start_date": datetime(2024, 2, 4),
"catchup": False,
"max_active_runs": 1,
"doc_md": __doc__,
"tags": ["elasticsearch", "monitoring"],
}


@dag(dag_id=_DAG_ID.format(env=STAGING), **_SHARED_DAG_ARGS)
def staging_cluster_healthcheck():
_cluster_healthcheck_dag(STAGING)


@dag(dag_id=_DAG_ID.format(env=PRODUCTION), **_SHARED_DAG_ARGS)
def production_cluster_healthcheck():
_cluster_healthcheck_dag(PRODUCTION)


staging_cluster_healthcheck()
production_cluster_healthcheck()
4 changes: 3 additions & 1 deletion catalog/tests/dags/test_dag_parsing.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import pytest
from airflow.models import DagBag

from common.constants import MEDIA_TYPES
from common.constants import ENVIRONMENTS, MEDIA_TYPES
from providers.provider_reingestion_workflows import (
PROVIDER_REINGESTION_WORKFLOWS as REINGESTION_WORKFLOW_CONFIGS,
)
Expand All @@ -25,6 +25,7 @@
"data_refresh/dag_factory.py",
"data_refresh/create_filtered_index_dag.py",
"elasticsearch_cluster/recreate_staging_index/recreate_full_staging_index_dag.py",
"elasticsearch_cluster/healthcheck_dag.py",
"oauth2/authorize_dag.py",
"oauth2/token_refresh_dag.py",
"database/delete_records/delete_records_dag.py",
Expand All @@ -41,6 +42,7 @@
"popularity/popularity_refresh_dag_factory.py": len(MEDIA_TYPES),
"data_refresh/dag_factory.py": len(MEDIA_TYPES),
"data_refresh/create_filtered_index_dag.py": len(MEDIA_TYPES),
"elasticsearch_cluster/healthcheck_dag.py": len(ENVIRONMENTS),
}


Expand Down
38 changes: 38 additions & 0 deletions documentation/catalog/reference/DAGs.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ The following are DAGs grouped by their primary tag:
| ------------------------------------------------------------------- | ----------------- |
| [`create_new_production_es_index`](#create_new_production_es_index) | `None` |
| [`create_new_staging_es_index`](#create_new_staging_es_index) | `None` |
| [`production_cluster_healthcheck`](#production_cluster_healthcheck) | `*/15 * * * *` |
| [`staging_cluster_healthcheck`](#staging_cluster_healthcheck) | `*/15 * * * *` |

### Maintenance

Expand Down Expand Up @@ -163,6 +165,7 @@ The following is documentation associated with each DAG (where available):
1. [`phylopic_reingestion_workflow`](#phylopic_reingestion_workflow)
1. [`phylopic_workflow`](#phylopic_workflow)
1. [`pr_review_reminders`](#pr_review_reminders)
1. [`production_cluster_healthcheck`](#production_cluster_healthcheck)
1. [`rawpixel_workflow`](#rawpixel_workflow)
1. [`recreate_audio_popularity_calculation`](#recreate_audio_popularity_calculation)
1. [`recreate_full_staging_index`](#recreate_full_staging_index)
Expand All @@ -172,6 +175,7 @@ The following is documentation associated with each DAG (where available):
1. [`science_museum_workflow`](#science_museum_workflow)
1. [`smithsonian_workflow`](#smithsonian_workflow)
1. [`smk_workflow`](#smk_workflow)
1. [`staging_cluster_healthcheck`](#staging_cluster_healthcheck)
1. [`staging_database_restore`](#staging_database_restore)
1. [`stocksnap_workflow`](#stocksnap_workflow)
1. [`wikimedia_commons_workflow`](#wikimedia_commons_workflow)
Expand Down Expand Up @@ -1032,6 +1036,23 @@ Unfortunately the DAG does not know when someone is on vacation. It is up to the
author of the PR to re-assign review if one of the randomly selected reviewers
is unavailable for the time period during which the PR should be reviewed.

### `production_cluster_healthcheck`

Monitor staging and production Elasticsearch cluster health endpoint.

Requests the cluster health and alerts under the following conditions:

- Red cluster health
- Unexpected number of nodes
- Unresponsive cluster

Additionally, the DAG will notify (rather than alert) when the cluster health is
yellow. Yellow cluster health may or may not be an issue, depending on whether
it is expected, and occurs whenever shards and replicas are being relocated
(e.g., during reindexes). It is worthwhile to notify in these cases, as an
assurance, but we could choose to add logic that ignores yellow cluster health
during data refresh or other similar operations.

### `rawpixel_workflow`

Content Provider: Rawpixel
Expand Down Expand Up @@ -1175,6 +1196,23 @@ Output: TSV file containing the media metadata.

Notes: https://www.smk.dk/en/article/smk-api/

### `staging_cluster_healthcheck`

Monitor staging and production Elasticsearch cluster health endpoint.

Requests the cluster health and alerts under the following conditions:

- Red cluster health
- Unexpected number of nodes
- Unresponsive cluster

Additionally, the DAG will notify (rather than alert) when the cluster health is
yellow. Yellow cluster health may or may not be an issue, depending on whether
it is expected, and occurs whenever shards and replicas are being relocated
(e.g., during reindexes). It is worthwhile to notify in these cases, as an
assurance, but we could choose to add logic that ignores yellow cluster health
during data refresh or other similar operations.

### `staging_database_restore`

#### Update the staging database
Expand Down

0 comments on commit 57915ca

Please sign in to comment.