Skip to content

Commit

Permalink
Add Slack Alert for non-dated provider DAGs with no data (#5041)
Browse files Browse the repository at this point in the history
* Raise an error if a non-dated provider ingests 0 records

* Update and add tests
  • Loading branch information
stacimc authored Oct 18, 2024
1 parent 7bf2859 commit 9c22fce
Show file tree
Hide file tree
Showing 3 changed files with 102 additions and 12 deletions.
41 changes: 38 additions & 3 deletions catalog/dags/common/loader/reporting.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ def clean_duration(duration: float | list[float] | None) -> str | None:
def clean_record_counts(
record_counts_by_media_type: MediaTypeRecordMetrics | list[MediaTypeRecordMetrics],
media_types: Sequence[str],
) -> dict[str, RecordMetrics]:
) -> MediaTypeRecordMetrics:
# If a list of record_counts dicts is provided, sum all of the individual values
if isinstance(record_counts_by_media_type, list):
return {
Expand All @@ -89,19 +89,46 @@ def clean_record_counts(

def skip_report_completion(
duration: str | None,
record_counts_by_media_type: dict[str, RecordMetrics],
record_counts_by_media_type: MediaTypeRecordMetrics,
) -> bool:
"""
Detect if report_completion should be skipped by determining if there was
an upstream failure that prevented rows from being ingested; in this case, the
error will have already been reported to Slack instead.
"""
return (
# Duration must be provided and be a value greater than 1 second
duration is None or duration in ("inf", "less than 1 sec")
) and (
# Record counts by media type must be provided and at least one value must
# be truthy (i.e. not None)
# be truthy (i.e. not None). Note that if there was no error during
# ingestion, and the provider simply ingested 0 records, this
# condition will not be True.
not record_counts_by_media_type
or all([val is None for val in record_counts_by_media_type.values()])
)


def detect_missing_records(
dated: bool,
record_counts_by_media_type: MediaTypeRecordMetrics,
) -> bool:
"""
Detect when a DAG unexpectedly upserts 0 records, and may require further
investigation. This is used to detect cases where a DAG may break silently,
preventing records from being ingested but not causing any Airflow errors.
Presently, this detects the simple case where a non-dated DAG (which should
be ingesting _all_ records for its provider) upserts 0 records.
"""
return not dated and all(
[
not count or not count.upserted
for count in record_counts_by_media_type.values()
]
)


def report_completion(
dag_id: str,
media_types: Sequence[str],
Expand Down Expand Up @@ -132,6 +159,8 @@ def report_completion(
within the catalog database.
- `date_range`: The range of time this ingestion covers. If the ingestion covers
the entire provided dataset, "all" is provided
If the DAG is a non-dated DAG and yet upserted 0 records, an error is raised.
"""
is_aggregate_duration = isinstance(duration, list)

Expand Down Expand Up @@ -188,5 +217,11 @@ def report_completion(
" pulls that may happen concurrently._"
)

# Raise an error to alert maintainers of a possible broken DAG, if non-dated
# and 0 records were upserted. We raise an error only after building the message,
# so that duplicate counts/etc are available in the logs.
if detect_missing_records(dated, record_counts_by_media_type):
raise ValueError("No records were ingested.")

send_message(message, dag_id=dag_id, username="Airflow DAG Load Data Complete")
return message
1 change: 1 addition & 0 deletions catalog/dags/providers/provider_dag_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,7 @@ def create_report_load_completion(
task_id="report_load_completion",
python_callable=reporting.report_completion,
op_kwargs=op_kwargs,
retries=0,
trigger_rule=TriggerRule.ALL_DONE,
)

Expand Down
72 changes: 63 additions & 9 deletions catalog/tests/dags/common/loader/test_reporting.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
RecordMetrics,
clean_duration,
clean_record_counts,
detect_missing_records,
report_completion,
skip_report_completion,
)
Expand Down Expand Up @@ -38,6 +39,37 @@ def test_report_completion(should_send_message):
send_message_mock.called = should_send_message


@pytest.mark.parametrize(
"dated, record_counts_by_media_type, expected_result",
[
# True if non-dated, and no media types have records upserted
(False, {"image": RecordMetrics(0, 0, 0, 0)}, True),
(
False,
{"image": RecordMetrics(0, 0, 0, 0), "audio": RecordMetrics(0, 0, 0, 0)},
True,
),
# Handles null counts
(False, {"image": None}, True),
# Handles null upserted counts
(False, {"image": RecordMetrics(None, 0, 0, 0)}, True),
# False if _any_ media type had records upserted
(False, {"image": RecordMetrics(100, 0, 0, 0)}, False),
(
False,
{"image": RecordMetrics(0, 0, 0, 0), "audio": RecordMetrics(100, 0, 0, 0)},
False,
),
# Always False if DAG is dated
(True, {"image": RecordMetrics(100, 0, 0, 0)}, False),
(True, {"image": None}, False),
(True, {"image": RecordMetrics(None, 0, 0, 0)}, False),
],
)
def test_detect_missing_records(dated, record_counts_by_media_type, expected_result):
assert detect_missing_records(dated, record_counts_by_media_type) == expected_result


def _make_report_completion_contents_data(media_type: str):
return [
# Happy path
Expand Down Expand Up @@ -158,6 +190,9 @@ def test_report_completion_contents(
with mock.patch("common.loader.reporting.send_message"):
record_counts_by_media_type = {**audio_data, **image_data}
should_skip = skip_report_completion(None, record_counts_by_media_type)
should_alert_missing_records = detect_missing_records(
dated, record_counts_by_media_type
)
try:
message = report_completion(
"jamendo_workflow",
Expand All @@ -171,10 +206,22 @@ def test_report_completion_contents(
except AirflowSkipException:
assert should_skip, "AirflowSkipException raised unexpectedly"
return
except ValueError:
assert should_alert_missing_records, "ValueError raised unexpectedly"
return

# Assert that if we were supposed to skip or alert missing records, we did not
# get this far
assert not should_skip, "Completion was reported when it should have skipped."
assert (
not should_alert_missing_records
), "Completion was reported instead of alerting missing records."

for expected in [audio_expected, image_expected]:
assert (
expected in message
), "Completion message doesn't contain expected text"

# Split message into "sections"
parts = message.strip().split("\n")
# Get the date section
Expand Down Expand Up @@ -216,17 +263,24 @@ def test_report_completion_contents_with_lists(
record_counts_by_media_type = [
{**audio, **image} for audio, image in zip(audio_data, image_data)
]

message = report_completion(
"Jamendo",
["audio", "image"],
None,
record_counts_by_media_type,
dated,
date_range_start,
date_range_end,
should_alert_missing_records = detect_missing_records(
dated, clean_record_counts(record_counts_by_media_type, ["audio", "image"])
)

try:
message = report_completion(
"Jamendo",
["audio", "image"],
None,
record_counts_by_media_type,
dated,
date_range_start,
date_range_end,
)
except ValueError:
assert should_alert_missing_records, "ValueError raised unexpectedly"
return

for expected in [audio_expected, image_expected]:
assert (
expected in message
Expand Down

0 comments on commit 9c22fce

Please sign in to comment.