From aed1ca452db6cc2dc210299e90cf3673ac1df1db Mon Sep 17 00:00:00 2001 From: Ashir Amin Date: Thu, 16 Jan 2025 11:10:12 -0600 Subject: [PATCH] Updated notify_slack method (#5136) Co-authored-by: Dhruv Bhanushali --- catalog/dags/common/slack.py | 6 +++--- .../rekognition/add_rekognition_labels_dag.py | 3 --- .../create_new_es_index/create_new_es_index_dag.py | 1 - .../create_proportional_by_source_staging_index_dag.py | 1 - .../point_es_alias/point_es_alias_dag.py | 1 - .../recreate_full_staging_index_dag.py | 1 - catalog/dags/maintenance/rotate_envfiles.py | 2 +- catalog/dags/popularity/popularity_refresh_dag_factory.py | 3 --- 8 files changed, 4 insertions(+), 14 deletions(-) diff --git a/catalog/dags/common/slack.py b/catalog/dags/common/slack.py index 25092c49bac..90243b0d708 100644 --- a/catalog/dags/common/slack.py +++ b/catalog/dags/common/slack.py @@ -57,7 +57,7 @@ from airflow.decorators import task from airflow.exceptions import AirflowNotFoundException -from airflow.models import Variable +from airflow.models import DAG, Variable from airflow.providers.http.hooks.http import HttpHook from requests import Response from typing_extensions import NotRequired, TypedDict @@ -421,13 +421,13 @@ def on_failure_callback(context: dict) -> None: @task def notify_slack( text: str, - dag_id: str, username: str = "Airflow Notification", icon_emoji: str = ":airflow:", + dag: DAG | None = None, ) -> None: send_message( text, username=username, icon_emoji=icon_emoji, - dag_id=dag_id, + dag_id=dag.dag_id, ) diff --git a/catalog/dags/data_augmentation/rekognition/add_rekognition_labels_dag.py b/catalog/dags/data_augmentation/rekognition/add_rekognition_labels_dag.py index 031da933e2c..722841157e5 100644 --- a/catalog/dags/data_augmentation/rekognition/add_rekognition_labels_dag.py +++ b/catalog/dags/data_augmentation/rekognition/add_rekognition_labels_dag.py @@ -65,7 +65,6 @@ def add_rekognition_labels(): notify_start = notify_slack.override(task_id=constants.NOTIFY_START_TASK_ID)( text=f"Starting Rekognition label insertion\n" f"{constants.TEMPLATE_SLACK_MESSAGE_CONFIG}", - dag_id=constants.DAG_ID, username=constants.SLACK_USERNAME, icon_emoji=constants.SLACK_ICON, ) @@ -74,7 +73,6 @@ def add_rekognition_labels(): text="Resuming Rekognition label insertion " # noqa: UP031 "from position: `{{ var.value.%s }}`\n%s" % (constants.CURRENT_POS_VAR_NAME, constants.TEMPLATE_SLACK_MESSAGE_CONFIG), - dag_id=constants.DAG_ID, username=constants.SLACK_USERNAME, icon_emoji=constants.SLACK_ICON, ) @@ -117,7 +115,6 @@ def add_rekognition_labels(): notify_complete = notify_slack.override(task_id="notify_complete")( text="Finished Rekognition label insertion and batched update :check_tick:", - dag_id=constants.DAG_ID, username=constants.SLACK_USERNAME, icon_emoji=constants.SLACK_ICON, ) diff --git a/catalog/dags/elasticsearch_cluster/create_new_es_index/create_new_es_index_dag.py b/catalog/dags/elasticsearch_cluster/create_new_es_index/create_new_es_index_dag.py index 55e0f775065..8c833e714f0 100644 --- a/catalog/dags/elasticsearch_cluster/create_new_es_index/create_new_es_index_dag.py +++ b/catalog/dags/elasticsearch_cluster/create_new_es_index/create_new_es_index_dag.py @@ -290,7 +290,6 @@ def create_new_es_index_dag(dag_config: CreateNewIndex): f"New index {index_name} was successfully created with alias" "{{ params.target_alias }}." ), - dag_id=dag.dag_id, username="Create New ES Index", icon_emoji=":elasticsearch:", ) diff --git a/catalog/dags/elasticsearch_cluster/create_proportional_by_source_staging_index/create_proportional_by_source_staging_index_dag.py b/catalog/dags/elasticsearch_cluster/create_proportional_by_source_staging_index/create_proportional_by_source_staging_index_dag.py index 243903b6eca..ed8589f6649 100644 --- a/catalog/dags/elasticsearch_cluster/create_proportional_by_source_staging_index/create_proportional_by_source_staging_index_dag.py +++ b/catalog/dags/elasticsearch_cluster/create_proportional_by_source_staging_index/create_proportional_by_source_staging_index_dag.py @@ -177,7 +177,6 @@ def create_proportional_by_source_staging_index(): trigger_rule=TriggerRule.NONE_FAILED )( text=f"Reindexing complete for {destination_index_name}.", - dag_id=DAG_ID, username="Proportional by Source Staging Index Creation", icon_emoji=":elasticsearch:", ) diff --git a/catalog/dags/elasticsearch_cluster/point_es_alias/point_es_alias_dag.py b/catalog/dags/elasticsearch_cluster/point_es_alias/point_es_alias_dag.py index cccbf890a16..0da87cbea91 100644 --- a/catalog/dags/elasticsearch_cluster/point_es_alias/point_es_alias_dag.py +++ b/catalog/dags/elasticsearch_cluster/point_es_alias/point_es_alias_dag.py @@ -95,7 +95,6 @@ def point_es_alias_dag(environment: str): trigger_rule=TriggerRule.NONE_FAILED )( text="Alias {{ params.target_alias }} applied to index {{ params.target_index }}.", - dag_id=dag.dag_id, username="Point Alias", icon_emoji=":elasticsearch:", ) diff --git a/catalog/dags/elasticsearch_cluster/recreate_staging_index/recreate_full_staging_index_dag.py b/catalog/dags/elasticsearch_cluster/recreate_staging_index/recreate_full_staging_index_dag.py index 3b16056941e..b3467366301 100644 --- a/catalog/dags/elasticsearch_cluster/recreate_staging_index/recreate_full_staging_index_dag.py +++ b/catalog/dags/elasticsearch_cluster/recreate_staging_index/recreate_full_staging_index_dag.py @@ -182,7 +182,6 @@ def recreate_full_staging_index(): f"{new_index_suffix}` aliased to `{target_alias}`." ), username="Full Staging Index Creation", - dag_id=DAG_ID, ) # Set up dependencies diff --git a/catalog/dags/maintenance/rotate_envfiles.py b/catalog/dags/maintenance/rotate_envfiles.py index 10c4233d912..5a4f20deda5 100644 --- a/catalog/dags/maintenance/rotate_envfiles.py +++ b/catalog/dags/maintenance/rotate_envfiles.py @@ -288,7 +288,7 @@ def notify_complete(deleted_envfiles: dict[str, list[str]]): files = ", ".join(envfiles) message += f"{env}: {files}\n" notify_slack.function( - dag_id=DAG_ID, text=f"Deleted the following environment files:\n{message}" + text=f"Deleted the following environment files:\n{message}" ) diff --git a/catalog/dags/popularity/popularity_refresh_dag_factory.py b/catalog/dags/popularity/popularity_refresh_dag_factory.py index bd364e5d7f7..d069e3ad97c 100644 --- a/catalog/dags/popularity/popularity_refresh_dag_factory.py +++ b/catalog/dags/popularity/popularity_refresh_dag_factory.py @@ -110,7 +110,6 @@ def create_popularity_refresh_dag(popularity_refresh: PopularityRefresh): " constants update_", username=SLACK_USERNAME, icon_emoji=SLACK_EMOJI, - dag_id=popularity_refresh.dag_id, ) update_constants = ( @@ -139,7 +138,6 @@ def create_popularity_refresh_dag(popularity_refresh: PopularityRefresh): " popularity scores_", username=SLACK_USERNAME, icon_emoji=SLACK_EMOJI, - dag_id=popularity_refresh.dag_id, ) # Once popularity constants have been calculated, establish the cutoff time @@ -173,7 +171,6 @@ def create_popularity_refresh_dag(popularity_refresh: PopularityRefresh): ), username=SLACK_USERNAME, icon_emoji=SLACK_EMOJI, - dag_id=popularity_refresh.dag_id, ) # Set up task dependencies