Skip to content

Commit

Permalink
Extract shared Elasticsearch cluster connection utilities
Browse files Browse the repository at this point in the history
  • Loading branch information
sarayourfriend committed Feb 5, 2024
1 parent 8a939a4 commit def36a7
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 8 deletions.
3 changes: 3 additions & 0 deletions catalog/dags/common/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
STAGING = "staging"
PRODUCTION = "production"

Environment = Literal["staging", "production"]
ENVIRONMENTS = [STAGING, PRODUCTION]

CONTACT_EMAIL = os.getenv("CONTACT_EMAIL")

DAG_DEFAULT_ARGS = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
from datetime import timedelta

from airflow.decorators import task, task_group
from airflow.models.connection import Connection
from airflow.providers.elasticsearch.hooks.elasticsearch import ElasticsearchPythonHook
from airflow.sensors.python import PythonSensor

Expand All @@ -21,12 +20,6 @@
GET_CURRENT_INDEX_CONFIG_TASK_NAME = "get_current_index_configuration"


@task
def get_es_host(environment: str):
conn = Connection.get_connection_from_secrets(f"elasticsearch_http_{environment}")
return conn.host


@task
def get_index_name(media_type: str, index_suffix: str):
return f"{media_type}-{index_suffix}".lower()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@
CREATE_NEW_INDEX_CONFIGS,
CreateNewIndex,
)
from elasticsearch_cluster.shared import get_es_host


logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -188,7 +189,7 @@ def create_new_es_index_dag(config: CreateNewIndex):
with dag:
prevent_concurrency = prevent_concurrency_with_dags(config.blocking_dags)

es_host = es.get_es_host(environment=config.environment)
es_host = get_es_host(environment=config.environment)

index_name = es.get_index_name(
media_type="{{ params.media_type }}",
Expand Down
11 changes: 11 additions & 0 deletions catalog/dags/elasticsearch_cluster/shared.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from airflow.decorators import task
from airflow.models.connection import Connection
from airflow.models.xcom_arg import XComArg

from common.constants import Environment


@task
def get_es_host(environment: Environment) -> XComArg:
conn = Connection.get_connection_from_secrets(f"elasticsearch_http_{environment}")
return conn.host

0 comments on commit def36a7

Please sign in to comment.