Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updated notify_slack method #5136

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions catalog/dags/common/slack.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@

from airflow.decorators import task
from airflow.exceptions import AirflowNotFoundException
from airflow.models import Variable
from airflow.models import DAG, Variable
from airflow.providers.http.hooks.http import HttpHook
from requests import Response
from typing_extensions import NotRequired, TypedDict
Expand Down Expand Up @@ -421,13 +421,13 @@ def on_failure_callback(context: dict) -> None:
@task
def notify_slack(
text: str,
dag_id: str,
username: str = "Airflow Notification",
icon_emoji: str = ":airflow:",
dag: DAG | None = None,
) -> None:
send_message(
text,
username=username,
icon_emoji=icon_emoji,
dag_id=dag_id,
dag_id=dag.dag_id,
)
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ def add_rekognition_labels():
notify_start = notify_slack.override(task_id=constants.NOTIFY_START_TASK_ID)(
text=f"Starting Rekognition label insertion\n"
f"{constants.TEMPLATE_SLACK_MESSAGE_CONFIG}",
dag_id=constants.DAG_ID,
username=constants.SLACK_USERNAME,
icon_emoji=constants.SLACK_ICON,
)
Expand All @@ -76,7 +75,6 @@ def add_rekognition_labels():
text="Resuming Rekognition label insertion " # noqa: UP031
"from position: `{{ var.value.%s }}`\n%s"
% (constants.CURRENT_POS_VAR_NAME, constants.TEMPLATE_SLACK_MESSAGE_CONFIG),
dag_id=constants.DAG_ID,
username=constants.SLACK_USERNAME,
icon_emoji=constants.SLACK_ICON,
)
Expand Down Expand Up @@ -119,7 +117,6 @@ def add_rekognition_labels():

notify_complete = notify_slack.override(task_id="notify_complete")(
text="Finished Rekognition label insertion and batched update :check_tick:",
dag_id=constants.DAG_ID,
username=constants.SLACK_USERNAME,
icon_emoji=constants.SLACK_ICON,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,6 @@ def create_new_es_index_dag(dag_config: CreateNewIndex):
f"New index { index_name } was successfully created with alias"
"{{ params.target_alias }}."
),
dag_id=dag.dag_id,
username="Create New ES Index",
icon_emoji=":elasticsearch:",
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,6 @@ def create_proportional_by_source_staging_index():
trigger_rule=TriggerRule.NONE_FAILED
)(
text=f"Reindexing complete for {destination_index_name}.",
dag_id=DAG_ID,
username="Proportional by Source Staging Index Creation",
icon_emoji=":elasticsearch:",
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@ def point_es_alias_dag(environment: str):
trigger_rule=TriggerRule.NONE_FAILED
)(
text="Alias {{ params.target_alias }} applied to index {{ params.target_index }}.",
dag_id=dag.dag_id,
username="Point Alias",
icon_emoji=":elasticsearch:",
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,6 @@ def recreate_full_staging_index():
f"{new_index_suffix}` aliased to `{target_alias}`."
),
username="Full Staging Index Creation",
dag_id=DAG_ID,
)

# Set up dependencies
Expand Down
2 changes: 1 addition & 1 deletion catalog/dags/maintenance/rotate_envfiles.py
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ def notify_complete(deleted_envfiles: dict[str, list[str]]):
files = ", ".join(envfiles)
message += f"{env}: {files}\n"
notify_slack.function(
dag_id=DAG_ID, text=f"Deleted the following environment files:\n{message}"
text=f"Deleted the following environment files:\n{message}"
)


Expand Down
3 changes: 0 additions & 3 deletions catalog/dags/popularity/popularity_refresh_dag_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,6 @@ def create_popularity_refresh_dag(popularity_refresh: PopularityRefresh):
" constants update_",
username=SLACK_USERNAME,
icon_emoji=SLACK_EMOJI,
dag_id=popularity_refresh.dag_id,
)

update_constants = (
Expand Down Expand Up @@ -139,7 +138,6 @@ def create_popularity_refresh_dag(popularity_refresh: PopularityRefresh):
" popularity scores_",
username=SLACK_USERNAME,
icon_emoji=SLACK_EMOJI,
dag_id=popularity_refresh.dag_id,
)

# Once popularity constants have been calculated, establish the cutoff time
Expand Down Expand Up @@ -173,7 +171,6 @@ def create_popularity_refresh_dag(popularity_refresh: PopularityRefresh):
),
username=SLACK_USERNAME,
icon_emoji=SLACK_EMOJI,
dag_id=popularity_refresh.dag_id,
)

# Set up task dependencies
Expand Down
Loading