From 05a8770f6a08bbdd872ceb9d37dceba16c8a8a6a Mon Sep 17 00:00:00 2001 From: prha <1040172+prha@users.noreply.github.com> Date: Fri, 26 Apr 2024 11:33:50 -0700 Subject: [PATCH] make run id uuid in asset check tests (#21453) ## Summary & Motivation We want stricter enforcement to save disk storing run ids. ## How I Tested These Changes BK --- .../storage_tests/utils/event_log_storage.py | 39 ++++++++++--------- 1 file changed, 21 insertions(+), 18 deletions(-) diff --git a/python_modules/dagster/dagster_tests/storage_tests/utils/event_log_storage.py b/python_modules/dagster/dagster_tests/storage_tests/utils/event_log_storage.py index 8a773e6bbfaeb..4da7a771812b6 100644 --- a/python_modules/dagster/dagster_tests/storage_tests/utils/event_log_storage.py +++ b/python_modules/dagster/dagster_tests/storage_tests/utils/event_log_storage.py @@ -4971,6 +4971,7 @@ def test_asset_checks( if self.can_wipe(): storage.wipe() + run_id_1, run_id_2, run_id_3 = [make_new_run_id() for _ in range(3)] check_key_1 = AssetCheckKey(AssetKey(["my_asset"]), "my_check") check_key_2 = AssetCheckKey(AssetKey(["my_asset"]), "my_check_2") @@ -4979,7 +4980,7 @@ def test_asset_checks( error_info=None, user_message="", level="debug", - run_id="foo", + run_id=run_id_1, timestamp=time.time(), dagster_event=DagsterEvent( DagsterEventType.ASSET_CHECK_EVALUATION_PLANNED.value, @@ -4994,14 +4995,14 @@ def test_asset_checks( checks = storage.get_asset_check_execution_history(check_key_1, limit=10) assert len(checks) == 1 assert checks[0].status == AssetCheckExecutionRecordStatus.PLANNED - assert checks[0].run_id == "foo" + assert checks[0].run_id == run_id_1 assert checks[0].event assert checks[0].event.dagster_event_type == DagsterEventType.ASSET_CHECK_EVALUATION_PLANNED latest_checks = storage.get_latest_asset_check_execution_by_key([check_key_1, check_key_2]) assert len(latest_checks) == 1 assert latest_checks[check_key_1].status == AssetCheckExecutionRecordStatus.PLANNED - assert latest_checks[check_key_1].run_id == "foo" + assert latest_checks[check_key_1].run_id == run_id_1 # update the planned check storage.store_event( @@ -5009,7 +5010,7 @@ def test_asset_checks( error_info=None, user_message="", level="debug", - run_id="foo", + run_id=run_id_1, timestamp=time.time(), dagster_event=DagsterEvent( DagsterEventType.ASSET_CHECK_EVALUATION.value, @@ -5052,7 +5053,7 @@ def test_asset_checks( error_info=None, user_message="", level="debug", - run_id="foobar", + run_id=run_id_2, timestamp=time.time(), dagster_event=DagsterEvent( DagsterEventType.ASSET_CHECK_EVALUATION_PLANNED.value, @@ -5067,31 +5068,31 @@ def test_asset_checks( checks = storage.get_asset_check_execution_history(check_key_1, limit=10) assert len(checks) == 2 assert checks[0].status == AssetCheckExecutionRecordStatus.PLANNED - assert checks[0].run_id == "foobar" + assert checks[0].run_id == run_id_2 assert checks[1].status == AssetCheckExecutionRecordStatus.SUCCEEDED - assert checks[1].run_id == "foo" + assert checks[1].run_id == run_id_1 checks = storage.get_asset_check_execution_history(check_key_1, limit=1) assert len(checks) == 1 - assert checks[0].run_id == "foobar" + assert checks[0].run_id == run_id_2 checks = storage.get_asset_check_execution_history( check_key_1, limit=1, cursor=checks[0].id ) assert len(checks) == 1 - assert checks[0].run_id == "foo" + assert checks[0].run_id == run_id_1 latest_checks = storage.get_latest_asset_check_execution_by_key([check_key_1, check_key_2]) assert len(latest_checks) == 1 assert latest_checks[check_key_1].status == AssetCheckExecutionRecordStatus.PLANNED - assert latest_checks[check_key_1].run_id == "foobar" + assert latest_checks[check_key_1].run_id == run_id_2 storage.store_event( EventLogEntry( error_info=None, user_message="", level="debug", - run_id="fizbuz", + run_id=run_id_3, timestamp=time.time(), dagster_event=DagsterEvent( DagsterEventType.ASSET_CHECK_EVALUATION_PLANNED.value, @@ -5106,21 +5107,22 @@ def test_asset_checks( latest_checks = storage.get_latest_asset_check_execution_by_key([check_key_1, check_key_2]) assert len(latest_checks) == 2 assert latest_checks[check_key_1].status == AssetCheckExecutionRecordStatus.PLANNED - assert latest_checks[check_key_1].run_id == "foobar" + assert latest_checks[check_key_1].run_id == run_id_2 assert latest_checks[check_key_2].status == AssetCheckExecutionRecordStatus.PLANNED - assert latest_checks[check_key_2].run_id == "fizbuz" + assert latest_checks[check_key_2].run_id == run_id_3 def test_duplicate_asset_check_planned_events(self, storage: EventLogStorage): if self.can_wipe(): storage.wipe() + run_id = make_new_run_id() for _ in range(2): storage.store_event( EventLogEntry( error_info=None, user_message="", level="debug", - run_id="fizbuz", + run_id=run_id, timestamp=time.time(), dagster_event=DagsterEvent( DagsterEventType.ASSET_CHECK_EVALUATION_PLANNED.value, @@ -5141,7 +5143,7 @@ def test_duplicate_asset_check_planned_events(self, storage: EventLogStorage): error_info=None, user_message="", level="debug", - run_id="fizbuz", + run_id=run_id, timestamp=time.time(), dagster_event=DagsterEvent( DagsterEventType.ASSET_CHECK_EVALUATION.value, @@ -5152,7 +5154,7 @@ def test_duplicate_asset_check_planned_events(self, storage: EventLogStorage): passed=True, metadata={}, target_materialization_data=AssetCheckEvaluationTargetMaterializationData( - storage_id=42, run_id="fizbuz", timestamp=3.3 + storage_id=42, run_id=run_id, timestamp=3.3 ), severity=AssetCheckSeverity.ERROR, ), @@ -5161,12 +5163,13 @@ def test_duplicate_asset_check_planned_events(self, storage: EventLogStorage): ) def test_asset_check_evaluation_without_planned_event(self, storage: EventLogStorage): + run_id = make_new_run_id() storage.store_event( EventLogEntry( error_info=None, user_message="", level="debug", - run_id="fizbuz", + run_id=run_id, timestamp=time.time(), dagster_event=DagsterEvent( DagsterEventType.ASSET_CHECK_EVALUATION.value, @@ -5177,7 +5180,7 @@ def test_asset_check_evaluation_without_planned_event(self, storage: EventLogSto passed=True, metadata={}, target_materialization_data=AssetCheckEvaluationTargetMaterializationData( - storage_id=42, run_id="fizbuz", timestamp=3.3 + storage_id=42, run_id=run_id, timestamp=3.3 ), severity=AssetCheckSeverity.ERROR, ),