diff --git a/src/dou_dag_generator.py b/src/dou_dag_generator.py index 842ffd2..7188e99 100755 --- a/src/dou_dag_generator.py +++ b/src/dou_dag_generator.py @@ -16,6 +16,7 @@ from dataclasses import asdict from datetime import datetime, timedelta from typing import Dict, List +import json import pandas as pd from airflow import DAG @@ -24,6 +25,7 @@ from airflow.operators.python import BranchPythonOperator, PythonOperator from airflow.providers.microsoft.mssql.hooks.mssql import MsSqlHook from airflow.providers.postgres.hooks.postgres import PostgresHook +from airflow.providers.slack.notifications.slack import SlackNotifier sys.path.insert(0, os.path.abspath(os.path.dirname(__file__))) @@ -46,17 +48,38 @@ class DouDigestDagGenerator(): raise EnvironmentError("Environment variable RO_DOU__DAG_CONF_DIR not found!") YAMLS_DIR_LIST = [dag_confs for dag_confs in YAMLS_DIR.split(":")] + SLACK_CONN_ID = "slack_notify_rodou_dagrun" parser = YAMLParser searchers: Dict[str, BaseSearcher] - def __init__(self, on_retry_callback=None, on_failure_callback=None): + def __init__(self): self.searchers = { 'DOU': DOUSearcher(), 'QD': QDSearcher(), 'INLABS': INLABSSearcher(), } - self.on_retry_callback = on_retry_callback - self.on_failure_callback = on_failure_callback + try: + conn = BaseHook.get_connection(self.SLACK_CONN_ID) + description = json.loads(conn.description) + slack_notifier = SlackNotifier( + slack_conn_id=self.SLACK_CONN_ID, + text=( + ":bomb:" + "\n`DAG` {{ ti.dag_id }}" + "\n`State` {{ ti.state }}" + "\n`Task` {{ ti.task_id }}" + "\n`Execution` {{ ti.execution_date }}" + "\n`Log` {{ ti.log_url }}" + ), + channel=description["channel"], + ) + except Exception as e: + logging.info("Connection to DAG run notifier not configured: %s", str(e)) + slack_notifier = None + + self.on_failure_callback = slack_notifier + self.on_retry_callback = slack_notifier + @staticmethod def prepare_doc_md(specs: DAGConfig, config_file: str) -> str: