Skip to content

Commit

Permalink
make run id uuid in asset check tests (#21453)
Browse files Browse the repository at this point in the history
## Summary & Motivation
We want stricter enforcement to save disk storing run ids.

## How I Tested These Changes
BK
  • Loading branch information
prha authored Apr 26, 2024
1 parent 264beb6 commit 05a8770
Showing 1 changed file with 21 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4971,6 +4971,7 @@ def test_asset_checks(
if self.can_wipe():
storage.wipe()

run_id_1, run_id_2, run_id_3 = [make_new_run_id() for _ in range(3)]
check_key_1 = AssetCheckKey(AssetKey(["my_asset"]), "my_check")
check_key_2 = AssetCheckKey(AssetKey(["my_asset"]), "my_check_2")

Expand All @@ -4979,7 +4980,7 @@ def test_asset_checks(
error_info=None,
user_message="",
level="debug",
run_id="foo",
run_id=run_id_1,
timestamp=time.time(),
dagster_event=DagsterEvent(
DagsterEventType.ASSET_CHECK_EVALUATION_PLANNED.value,
Expand All @@ -4994,22 +4995,22 @@ def test_asset_checks(
checks = storage.get_asset_check_execution_history(check_key_1, limit=10)
assert len(checks) == 1
assert checks[0].status == AssetCheckExecutionRecordStatus.PLANNED
assert checks[0].run_id == "foo"
assert checks[0].run_id == run_id_1
assert checks[0].event
assert checks[0].event.dagster_event_type == DagsterEventType.ASSET_CHECK_EVALUATION_PLANNED

latest_checks = storage.get_latest_asset_check_execution_by_key([check_key_1, check_key_2])
assert len(latest_checks) == 1
assert latest_checks[check_key_1].status == AssetCheckExecutionRecordStatus.PLANNED
assert latest_checks[check_key_1].run_id == "foo"
assert latest_checks[check_key_1].run_id == run_id_1

# update the planned check
storage.store_event(
EventLogEntry(
error_info=None,
user_message="",
level="debug",
run_id="foo",
run_id=run_id_1,
timestamp=time.time(),
dagster_event=DagsterEvent(
DagsterEventType.ASSET_CHECK_EVALUATION.value,
Expand Down Expand Up @@ -5052,7 +5053,7 @@ def test_asset_checks(
error_info=None,
user_message="",
level="debug",
run_id="foobar",
run_id=run_id_2,
timestamp=time.time(),
dagster_event=DagsterEvent(
DagsterEventType.ASSET_CHECK_EVALUATION_PLANNED.value,
Expand All @@ -5067,31 +5068,31 @@ def test_asset_checks(
checks = storage.get_asset_check_execution_history(check_key_1, limit=10)
assert len(checks) == 2
assert checks[0].status == AssetCheckExecutionRecordStatus.PLANNED
assert checks[0].run_id == "foobar"
assert checks[0].run_id == run_id_2
assert checks[1].status == AssetCheckExecutionRecordStatus.SUCCEEDED
assert checks[1].run_id == "foo"
assert checks[1].run_id == run_id_1

checks = storage.get_asset_check_execution_history(check_key_1, limit=1)
assert len(checks) == 1
assert checks[0].run_id == "foobar"
assert checks[0].run_id == run_id_2

checks = storage.get_asset_check_execution_history(
check_key_1, limit=1, cursor=checks[0].id
)
assert len(checks) == 1
assert checks[0].run_id == "foo"
assert checks[0].run_id == run_id_1

latest_checks = storage.get_latest_asset_check_execution_by_key([check_key_1, check_key_2])
assert len(latest_checks) == 1
assert latest_checks[check_key_1].status == AssetCheckExecutionRecordStatus.PLANNED
assert latest_checks[check_key_1].run_id == "foobar"
assert latest_checks[check_key_1].run_id == run_id_2

storage.store_event(
EventLogEntry(
error_info=None,
user_message="",
level="debug",
run_id="fizbuz",
run_id=run_id_3,
timestamp=time.time(),
dagster_event=DagsterEvent(
DagsterEventType.ASSET_CHECK_EVALUATION_PLANNED.value,
Expand All @@ -5106,21 +5107,22 @@ def test_asset_checks(
latest_checks = storage.get_latest_asset_check_execution_by_key([check_key_1, check_key_2])
assert len(latest_checks) == 2
assert latest_checks[check_key_1].status == AssetCheckExecutionRecordStatus.PLANNED
assert latest_checks[check_key_1].run_id == "foobar"
assert latest_checks[check_key_1].run_id == run_id_2
assert latest_checks[check_key_2].status == AssetCheckExecutionRecordStatus.PLANNED
assert latest_checks[check_key_2].run_id == "fizbuz"
assert latest_checks[check_key_2].run_id == run_id_3

def test_duplicate_asset_check_planned_events(self, storage: EventLogStorage):
if self.can_wipe():
storage.wipe()

run_id = make_new_run_id()
for _ in range(2):
storage.store_event(
EventLogEntry(
error_info=None,
user_message="",
level="debug",
run_id="fizbuz",
run_id=run_id,
timestamp=time.time(),
dagster_event=DagsterEvent(
DagsterEventType.ASSET_CHECK_EVALUATION_PLANNED.value,
Expand All @@ -5141,7 +5143,7 @@ def test_duplicate_asset_check_planned_events(self, storage: EventLogStorage):
error_info=None,
user_message="",
level="debug",
run_id="fizbuz",
run_id=run_id,
timestamp=time.time(),
dagster_event=DagsterEvent(
DagsterEventType.ASSET_CHECK_EVALUATION.value,
Expand All @@ -5152,7 +5154,7 @@ def test_duplicate_asset_check_planned_events(self, storage: EventLogStorage):
passed=True,
metadata={},
target_materialization_data=AssetCheckEvaluationTargetMaterializationData(
storage_id=42, run_id="fizbuz", timestamp=3.3
storage_id=42, run_id=run_id, timestamp=3.3
),
severity=AssetCheckSeverity.ERROR,
),
Expand All @@ -5161,12 +5163,13 @@ def test_duplicate_asset_check_planned_events(self, storage: EventLogStorage):
)

def test_asset_check_evaluation_without_planned_event(self, storage: EventLogStorage):
run_id = make_new_run_id()
storage.store_event(
EventLogEntry(
error_info=None,
user_message="",
level="debug",
run_id="fizbuz",
run_id=run_id,
timestamp=time.time(),
dagster_event=DagsterEvent(
DagsterEventType.ASSET_CHECK_EVALUATION.value,
Expand All @@ -5177,7 +5180,7 @@ def test_asset_check_evaluation_without_planned_event(self, storage: EventLogSto
passed=True,
metadata={},
target_materialization_data=AssetCheckEvaluationTargetMaterializationData(
storage_id=42, run_id="fizbuz", timestamp=3.3
storage_id=42, run_id=run_id, timestamp=3.3
),
severity=AssetCheckSeverity.ERROR,
),
Expand Down

0 comments on commit 05a8770

Please sign in to comment.