diff --git a/catalog/dags/common/loader/reporting.py b/catalog/dags/common/loader/reporting.py index 5bd63b3abe1..c674d0bce11 100644 --- a/catalog/dags/common/loader/reporting.py +++ b/catalog/dags/common/loader/reporting.py @@ -74,7 +74,7 @@ def clean_duration(duration: float | list[float] | None) -> str | None: def clean_record_counts( record_counts_by_media_type: MediaTypeRecordMetrics | list[MediaTypeRecordMetrics], media_types: Sequence[str], -) -> dict[str, RecordMetrics]: +) -> MediaTypeRecordMetrics: # If a list of record_counts dicts is provided, sum all of the individual values if isinstance(record_counts_by_media_type, list): return { @@ -89,19 +89,46 @@ def clean_record_counts( def skip_report_completion( duration: str | None, - record_counts_by_media_type: dict[str, RecordMetrics], + record_counts_by_media_type: MediaTypeRecordMetrics, ) -> bool: + """ + Detect if report_completion should be skipped by determining if there was + an upstream failure that prevented rows from being ingested; in this case, the + error will have already been reported to Slack instead. + """ return ( # Duration must be provided and be a value greater than 1 second duration is None or duration in ("inf", "less than 1 sec") ) and ( # Record counts by media type must be provided and at least one value must - # be truthy (i.e. not None) + # be truthy (i.e. not None). Note that if there was no error during + # ingestion, and the provider simply ingested 0 records, this + # condition will not be True. not record_counts_by_media_type or all([val is None for val in record_counts_by_media_type.values()]) ) +def detect_missing_records( + dated: bool, + record_counts_by_media_type: MediaTypeRecordMetrics, +) -> bool: + """ + Detect when a DAG unexpectedly upserts 0 records, and may require further + investigation. This is used to detect cases where a DAG may break silently, + preventing records from being ingested but not causing any Airflow errors. + + Presently, this detects the simple case where a non-dated DAG (which should + be ingesting _all_ records for its provider) upserts 0 records. + """ + return not dated and all( + [ + not count or not count.upserted + for count in record_counts_by_media_type.values() + ] + ) + + def report_completion( dag_id: str, media_types: Sequence[str], @@ -132,6 +159,8 @@ def report_completion( within the catalog database. - `date_range`: The range of time this ingestion covers. If the ingestion covers the entire provided dataset, "all" is provided + + If the DAG is a non-dated DAG and yet upserted 0 records, an error is raised. """ is_aggregate_duration = isinstance(duration, list) @@ -188,5 +217,11 @@ def report_completion( " pulls that may happen concurrently._" ) + # Raise an error to alert maintainers of a possible broken DAG, if non-dated + # and 0 records were upserted. We raise an error only after building the message, + # so that duplicate counts/etc are available in the logs. + if detect_missing_records(dated, record_counts_by_media_type): + raise ValueError("No records were ingested.") + send_message(message, dag_id=dag_id, username="Airflow DAG Load Data Complete") return message diff --git a/catalog/dags/providers/provider_dag_factory.py b/catalog/dags/providers/provider_dag_factory.py index 3c6bbaedb8d..8a2cddf4ec2 100644 --- a/catalog/dags/providers/provider_dag_factory.py +++ b/catalog/dags/providers/provider_dag_factory.py @@ -365,6 +365,7 @@ def create_report_load_completion( task_id="report_load_completion", python_callable=reporting.report_completion, op_kwargs=op_kwargs, + retries=0, trigger_rule=TriggerRule.ALL_DONE, ) diff --git a/catalog/tests/dags/common/loader/test_reporting.py b/catalog/tests/dags/common/loader/test_reporting.py index cbe54f17a07..5f4e068617d 100644 --- a/catalog/tests/dags/common/loader/test_reporting.py +++ b/catalog/tests/dags/common/loader/test_reporting.py @@ -7,6 +7,7 @@ RecordMetrics, clean_duration, clean_record_counts, + detect_missing_records, report_completion, skip_report_completion, ) @@ -38,6 +39,37 @@ def test_report_completion(should_send_message): send_message_mock.called = should_send_message +@pytest.mark.parametrize( + "dated, record_counts_by_media_type, expected_result", + [ + # True if non-dated, and no media types have records upserted + (False, {"image": RecordMetrics(0, 0, 0, 0)}, True), + ( + False, + {"image": RecordMetrics(0, 0, 0, 0), "audio": RecordMetrics(0, 0, 0, 0)}, + True, + ), + # Handles null counts + (False, {"image": None}, True), + # Handles null upserted counts + (False, {"image": RecordMetrics(None, 0, 0, 0)}, True), + # False if _any_ media type had records upserted + (False, {"image": RecordMetrics(100, 0, 0, 0)}, False), + ( + False, + {"image": RecordMetrics(0, 0, 0, 0), "audio": RecordMetrics(100, 0, 0, 0)}, + False, + ), + # Always False if DAG is dated + (True, {"image": RecordMetrics(100, 0, 0, 0)}, False), + (True, {"image": None}, False), + (True, {"image": RecordMetrics(None, 0, 0, 0)}, False), + ], +) +def test_detect_missing_records(dated, record_counts_by_media_type, expected_result): + assert detect_missing_records(dated, record_counts_by_media_type) == expected_result + + def _make_report_completion_contents_data(media_type: str): return [ # Happy path @@ -158,6 +190,9 @@ def test_report_completion_contents( with mock.patch("common.loader.reporting.send_message"): record_counts_by_media_type = {**audio_data, **image_data} should_skip = skip_report_completion(None, record_counts_by_media_type) + should_alert_missing_records = detect_missing_records( + dated, record_counts_by_media_type + ) try: message = report_completion( "jamendo_workflow", @@ -171,10 +206,22 @@ def test_report_completion_contents( except AirflowSkipException: assert should_skip, "AirflowSkipException raised unexpectedly" return + except ValueError: + assert should_alert_missing_records, "ValueError raised unexpectedly" + return + + # Assert that if we were supposed to skip or alert missing records, we did not + # get this far + assert not should_skip, "Completion was reported when it should have skipped." + assert ( + not should_alert_missing_records + ), "Completion was reported instead of alerting missing records." + for expected in [audio_expected, image_expected]: assert ( expected in message ), "Completion message doesn't contain expected text" + # Split message into "sections" parts = message.strip().split("\n") # Get the date section @@ -216,17 +263,24 @@ def test_report_completion_contents_with_lists( record_counts_by_media_type = [ {**audio, **image} for audio, image in zip(audio_data, image_data) ] - - message = report_completion( - "Jamendo", - ["audio", "image"], - None, - record_counts_by_media_type, - dated, - date_range_start, - date_range_end, + should_alert_missing_records = detect_missing_records( + dated, clean_record_counts(record_counts_by_media_type, ["audio", "image"]) ) + try: + message = report_completion( + "Jamendo", + ["audio", "image"], + None, + record_counts_by_media_type, + dated, + date_range_start, + date_range_end, + ) + except ValueError: + assert should_alert_missing_records, "ValueError raised unexpectedly" + return + for expected in [audio_expected, image_expected]: assert ( expected in message