Skip to content

Commit

Permalink
Use DAG_DEFAULT_ARGS for all DAGs (#3928)
Browse files Browse the repository at this point in the history
* Add a test for checking that DAG default args is present

* Add default DAG args to DAGs that were missing them
  • Loading branch information
AetherUnbound authored Mar 18, 2024
1 parent 9917be0 commit fcad99d
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 2 deletions.
3 changes: 2 additions & 1 deletion catalog/dags/elasticsearch_cluster/healthcheck_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from airflow.providers.elasticsearch.hooks.elasticsearch import ElasticsearchPythonHook
from elasticsearch import Elasticsearch

from common.constants import ENVIRONMENTS, PRODUCTION, Environment
from common.constants import DAG_DEFAULT_ARGS, ENVIRONMENTS, PRODUCTION, Environment
from common.elasticsearch import get_es_host
from common.sensors.utils import is_concurrent_with_any
from common.slack import send_alert, send_message
Expand Down Expand Up @@ -161,6 +161,7 @@ def notify(env: str, message_type_and_string: tuple[MessageType, str]):
"max_active_runs": 1,
"doc_md": __doc__,
"tags": ["elasticsearch", "monitoring"],
"default_args": DAG_DEFAULT_ARGS,
}


Expand Down
3 changes: 2 additions & 1 deletion catalog/dags/maintenance/rotate_db_snapshots.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from airflow.providers.amazon.aws.operators.rds import RdsCreateDbSnapshotOperator
from airflow.providers.amazon.aws.sensors.rds import RdsSnapshotExistenceSensor

from common.constants import AWS_RDS_CONN_ID
from common.constants import AWS_RDS_CONN_ID, DAG_DEFAULT_ARGS


logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -86,6 +86,7 @@ def delete_previous_snapshots(db_identifier: str, snapshots_to_retain: int):
catchup=False,
# Use the docstring at the top of the file as md docs in the UI
doc_md=__doc__,
default_args=DAG_DEFAULT_ARGS,
render_template_as_native_obj=True,
)
def rotate_db_snapshots():
Expand Down
17 changes: 17 additions & 0 deletions catalog/tests/dags/test_dag_parsing.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,3 +71,20 @@ def test_dags_loads_correct_number_with_no_errors(relative_path, tmpdir):
dag_bag.process_file(str(DAG_FOLDER / relative_path))
assert len(dag_bag.import_errors) == 0, "Errors found during DAG import"
assert len(dag_bag.dags) == expected_count, "An unexpected # of DAGs was found"


def test_dag_uses_default_args():
# Attempt to load all DAGs in the DAG_FOLDER and check if they use the
# DAG_DEFAULT_ARGS settings
dagbag = DagBag(dag_folder=DAG_FOLDER, include_examples=False)

failures = []
for dag_id, dag in dagbag.dags.items():
# An easy proxy for this is checking if DAGs have an on_failure_callback
on_failure_callback = dag.default_args.get("on_failure_callback")
if on_failure_callback is None:
failures.append(dag_id)

assert (
not failures
), f"The following DAGs do not have DAG_DEFAULT_ARGS defined: {failures}"

0 comments on commit fcad99d

Please sign in to comment.