diff --git a/catalog/dags/common/slack.py b/catalog/dags/common/slack.py index dfe9750b24a..0fc65c1d429 100644 --- a/catalog/dags/common/slack.py +++ b/catalog/dags/common/slack.py @@ -57,7 +57,7 @@ from airflow.decorators import task from airflow.exceptions import AirflowNotFoundException -from airflow.models import Variable +from airflow.models import DAG, Variable from airflow.providers.http.hooks.http import HttpHook from requests import Response from typing_extensions import NotRequired, TypedDict @@ -421,13 +421,13 @@ def on_failure_callback(context: dict) -> None: @task def notify_slack( text: str, - dag_id: str, username: str = "Airflow Notification", icon_emoji: str = ":airflow:", + dag: DAG | None = None, ) -> None: send_message( text, username=username, icon_emoji=icon_emoji, - dag_id=dag_id, + dag_id=dag.dag_id, ) diff --git a/catalog/dags/data_augmentation/rekognition/add_rekognition_labels_dag.py b/catalog/dags/data_augmentation/rekognition/add_rekognition_labels_dag.py index 46066e90cd6..88042f13c0d 100644 --- a/catalog/dags/data_augmentation/rekognition/add_rekognition_labels_dag.py +++ b/catalog/dags/data_augmentation/rekognition/add_rekognition_labels_dag.py @@ -65,7 +65,6 @@ def add_rekognition_labels(): notify_start = notify_slack.override(task_id=constants.NOTIFY_START_TASK_ID)( text=f"Starting Rekognition label insertion\n" f"{constants.TEMPLATE_SLACK_MESSAGE_CONFIG}", - dag_id=constants.DAG_ID, username=constants.SLACK_USERNAME, icon_emoji=constants.SLACK_ICON, ) @@ -76,7 +75,6 @@ def add_rekognition_labels(): text="Resuming Rekognition label insertion " # noqa: UP031 "from position: `{{ var.value.%s }}`\n%s" % (constants.CURRENT_POS_VAR_NAME, constants.TEMPLATE_SLACK_MESSAGE_CONFIG), - dag_id=constants.DAG_ID, username=constants.SLACK_USERNAME, icon_emoji=constants.SLACK_ICON, ) @@ -119,7 +117,6 @@ def add_rekognition_labels(): notify_complete = notify_slack.override(task_id="notify_complete")( text="Finished Rekognition label insertion and batched update :check_tick:", - dag_id=constants.DAG_ID, username=constants.SLACK_USERNAME, icon_emoji=constants.SLACK_ICON, ) diff --git a/catalog/dags/elasticsearch_cluster/create_new_es_index/create_new_es_index_dag.py b/catalog/dags/elasticsearch_cluster/create_new_es_index/create_new_es_index_dag.py index 2c1ee6117f5..5a04eb4f198 100644 --- a/catalog/dags/elasticsearch_cluster/create_new_es_index/create_new_es_index_dag.py +++ b/catalog/dags/elasticsearch_cluster/create_new_es_index/create_new_es_index_dag.py @@ -290,7 +290,6 @@ def create_new_es_index_dag(dag_config: CreateNewIndex): f"New index { index_name } was successfully created with alias" "{{ params.target_alias }}." ), - dag_id=dag.dag_id, username="Create New ES Index", icon_emoji=":elasticsearch:", ) diff --git a/catalog/dags/elasticsearch_cluster/create_proportional_by_source_staging_index/create_proportional_by_source_staging_index_dag.py b/catalog/dags/elasticsearch_cluster/create_proportional_by_source_staging_index/create_proportional_by_source_staging_index_dag.py index 243903b6eca..ed8589f6649 100644 --- a/catalog/dags/elasticsearch_cluster/create_proportional_by_source_staging_index/create_proportional_by_source_staging_index_dag.py +++ b/catalog/dags/elasticsearch_cluster/create_proportional_by_source_staging_index/create_proportional_by_source_staging_index_dag.py @@ -177,7 +177,6 @@ def create_proportional_by_source_staging_index(): trigger_rule=TriggerRule.NONE_FAILED )( text=f"Reindexing complete for {destination_index_name}.", - dag_id=DAG_ID, username="Proportional by Source Staging Index Creation", icon_emoji=":elasticsearch:", ) diff --git a/catalog/dags/elasticsearch_cluster/point_es_alias/point_es_alias_dag.py b/catalog/dags/elasticsearch_cluster/point_es_alias/point_es_alias_dag.py index cccbf890a16..0da87cbea91 100644 --- a/catalog/dags/elasticsearch_cluster/point_es_alias/point_es_alias_dag.py +++ b/catalog/dags/elasticsearch_cluster/point_es_alias/point_es_alias_dag.py @@ -95,7 +95,6 @@ def point_es_alias_dag(environment: str): trigger_rule=TriggerRule.NONE_FAILED )( text="Alias {{ params.target_alias }} applied to index {{ params.target_index }}.", - dag_id=dag.dag_id, username="Point Alias", icon_emoji=":elasticsearch:", ) diff --git a/catalog/dags/elasticsearch_cluster/recreate_staging_index/recreate_full_staging_index_dag.py b/catalog/dags/elasticsearch_cluster/recreate_staging_index/recreate_full_staging_index_dag.py index 3b16056941e..b3467366301 100644 --- a/catalog/dags/elasticsearch_cluster/recreate_staging_index/recreate_full_staging_index_dag.py +++ b/catalog/dags/elasticsearch_cluster/recreate_staging_index/recreate_full_staging_index_dag.py @@ -182,7 +182,6 @@ def recreate_full_staging_index(): f"{new_index_suffix}` aliased to `{target_alias}`." ), username="Full Staging Index Creation", - dag_id=DAG_ID, ) # Set up dependencies diff --git a/catalog/dags/maintenance/rotate_envfiles.py b/catalog/dags/maintenance/rotate_envfiles.py index 10c4233d912..5a4f20deda5 100644 --- a/catalog/dags/maintenance/rotate_envfiles.py +++ b/catalog/dags/maintenance/rotate_envfiles.py @@ -288,7 +288,7 @@ def notify_complete(deleted_envfiles: dict[str, list[str]]): files = ", ".join(envfiles) message += f"{env}: {files}\n" notify_slack.function( - dag_id=DAG_ID, text=f"Deleted the following environment files:\n{message}" + text=f"Deleted the following environment files:\n{message}" ) diff --git a/catalog/dags/popularity/popularity_refresh_dag_factory.py b/catalog/dags/popularity/popularity_refresh_dag_factory.py index bd364e5d7f7..d069e3ad97c 100644 --- a/catalog/dags/popularity/popularity_refresh_dag_factory.py +++ b/catalog/dags/popularity/popularity_refresh_dag_factory.py @@ -110,7 +110,6 @@ def create_popularity_refresh_dag(popularity_refresh: PopularityRefresh): " constants update_", username=SLACK_USERNAME, icon_emoji=SLACK_EMOJI, - dag_id=popularity_refresh.dag_id, ) update_constants = ( @@ -139,7 +138,6 @@ def create_popularity_refresh_dag(popularity_refresh: PopularityRefresh): " popularity scores_", username=SLACK_USERNAME, icon_emoji=SLACK_EMOJI, - dag_id=popularity_refresh.dag_id, ) # Once popularity constants have been calculated, establish the cutoff time @@ -173,7 +171,6 @@ def create_popularity_refresh_dag(popularity_refresh: PopularityRefresh): ), username=SLACK_USERNAME, icon_emoji=SLACK_EMOJI, - dag_id=popularity_refresh.dag_id, ) # Set up task dependencies