Skip to content

Commit

Permalink
add slack notifier callback
Browse files Browse the repository at this point in the history
  • Loading branch information
vitorbellini committed Apr 10, 2024
1 parent 784e8f6 commit de80178
Showing 1 changed file with 26 additions and 3 deletions.
29 changes: 26 additions & 3 deletions src/dou_dag_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from dataclasses import asdict
from datetime import datetime, timedelta
from typing import Dict, List
import json

import pandas as pd
from airflow import DAG
Expand All @@ -24,6 +25,7 @@
from airflow.operators.python import BranchPythonOperator, PythonOperator
from airflow.providers.microsoft.mssql.hooks.mssql import MsSqlHook
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.providers.slack.notifications.slack import SlackNotifier


sys.path.insert(0, os.path.abspath(os.path.dirname(__file__)))
Expand All @@ -46,17 +48,38 @@ class DouDigestDagGenerator():
raise EnvironmentError("Environment variable RO_DOU__DAG_CONF_DIR not found!")

YAMLS_DIR_LIST = [dag_confs for dag_confs in YAMLS_DIR.split(":")]
SLACK_CONN_ID = "slack_notify_rodou_dagrun"
parser = YAMLParser
searchers: Dict[str, BaseSearcher]

def __init__(self, on_retry_callback=None, on_failure_callback=None):
def __init__(self):
self.searchers = {
'DOU': DOUSearcher(),
'QD': QDSearcher(),
'INLABS': INLABSSearcher(),
}
self.on_retry_callback = on_retry_callback
self.on_failure_callback = on_failure_callback
try:
conn = BaseHook.get_connection(self.SLACK_CONN_ID)
description = json.loads(conn.description)
slack_notifier = SlackNotifier(
slack_conn_id=self.SLACK_CONN_ID,
text=(
":bomb:"
"\n`DAG` {{ ti.dag_id }}"
"\n`State` {{ ti.state }}"
"\n`Task` {{ ti.task_id }}"
"\n`Execution` {{ ti.execution_date }}"
"\n`Log` {{ ti.log_url }}"
),
channel=description["channel"],
)
except Exception as e:
logging.info("Connection to DAG run notifier not configured: %s", str(e))
slack_notifier = None

self.on_failure_callback = slack_notifier
self.on_retry_callback = slack_notifier


@staticmethod
def prepare_doc_md(specs: DAGConfig, config_file: str) -> str:
Expand Down

0 comments on commit de80178

Please sign in to comment.