From fcad99d47242c4b3eb90572d4b8ac3d7b977524f Mon Sep 17 00:00:00 2001 From: Madison Swain-Bowden Date: Mon, 18 Mar 2024 10:48:21 -0700 Subject: [PATCH] Use DAG_DEFAULT_ARGS for all DAGs (#3928) * Add a test for checking that DAG default args is present * Add default DAG args to DAGs that were missing them --- .../elasticsearch_cluster/healthcheck_dag.py | 3 ++- catalog/dags/maintenance/rotate_db_snapshots.py | 3 ++- catalog/tests/dags/test_dag_parsing.py | 17 +++++++++++++++++ 3 files changed, 21 insertions(+), 2 deletions(-) diff --git a/catalog/dags/elasticsearch_cluster/healthcheck_dag.py b/catalog/dags/elasticsearch_cluster/healthcheck_dag.py index 8ebecd4ed18..e4455dd7729 100644 --- a/catalog/dags/elasticsearch_cluster/healthcheck_dag.py +++ b/catalog/dags/elasticsearch_cluster/healthcheck_dag.py @@ -25,7 +25,7 @@ from airflow.providers.elasticsearch.hooks.elasticsearch import ElasticsearchPythonHook from elasticsearch import Elasticsearch -from common.constants import ENVIRONMENTS, PRODUCTION, Environment +from common.constants import DAG_DEFAULT_ARGS, ENVIRONMENTS, PRODUCTION, Environment from common.elasticsearch import get_es_host from common.sensors.utils import is_concurrent_with_any from common.slack import send_alert, send_message @@ -161,6 +161,7 @@ def notify(env: str, message_type_and_string: tuple[MessageType, str]): "max_active_runs": 1, "doc_md": __doc__, "tags": ["elasticsearch", "monitoring"], + "default_args": DAG_DEFAULT_ARGS, } diff --git a/catalog/dags/maintenance/rotate_db_snapshots.py b/catalog/dags/maintenance/rotate_db_snapshots.py index 19fa84ba9b3..4c76c4a5aec 100644 --- a/catalog/dags/maintenance/rotate_db_snapshots.py +++ b/catalog/dags/maintenance/rotate_db_snapshots.py @@ -23,7 +23,7 @@ from airflow.providers.amazon.aws.operators.rds import RdsCreateDbSnapshotOperator from airflow.providers.amazon.aws.sensors.rds import RdsSnapshotExistenceSensor -from common.constants import AWS_RDS_CONN_ID +from common.constants import AWS_RDS_CONN_ID, DAG_DEFAULT_ARGS logger = logging.getLogger(__name__) @@ -86,6 +86,7 @@ def delete_previous_snapshots(db_identifier: str, snapshots_to_retain: int): catchup=False, # Use the docstring at the top of the file as md docs in the UI doc_md=__doc__, + default_args=DAG_DEFAULT_ARGS, render_template_as_native_obj=True, ) def rotate_db_snapshots(): diff --git a/catalog/tests/dags/test_dag_parsing.py b/catalog/tests/dags/test_dag_parsing.py index d7232795b04..ca2500aa8bc 100644 --- a/catalog/tests/dags/test_dag_parsing.py +++ b/catalog/tests/dags/test_dag_parsing.py @@ -71,3 +71,20 @@ def test_dags_loads_correct_number_with_no_errors(relative_path, tmpdir): dag_bag.process_file(str(DAG_FOLDER / relative_path)) assert len(dag_bag.import_errors) == 0, "Errors found during DAG import" assert len(dag_bag.dags) == expected_count, "An unexpected # of DAGs was found" + + +def test_dag_uses_default_args(): + # Attempt to load all DAGs in the DAG_FOLDER and check if they use the + # DAG_DEFAULT_ARGS settings + dagbag = DagBag(dag_folder=DAG_FOLDER, include_examples=False) + + failures = [] + for dag_id, dag in dagbag.dags.items(): + # An easy proxy for this is checking if DAGs have an on_failure_callback + on_failure_callback = dag.default_args.get("on_failure_callback") + if on_failure_callback is None: + failures.append(dag_id) + + assert ( + not failures + ), f"The following DAGs do not have DAG_DEFAULT_ARGS defined: {failures}"