From 24c95574b14ac5fe913299abe0860ab9ff7e0487 Mon Sep 17 00:00:00 2001 From: Claire Lin Date: Sat, 9 Dec 2023 11:06:09 +0800 Subject: [PATCH] add more tests [INTERNAL_BRANCH=11-29-claire/store-partitions-subset-on-planned-event] --- .../host_representation/code_location.py | 2 +- .../storage_tests/utils/event_log_storage.py | 131 ++++++++++++++++++ 2 files changed, 132 insertions(+), 1 deletion(-) diff --git a/python_modules/dagster/dagster/_core/host_representation/code_location.py b/python_modules/dagster/dagster/_core/host_representation/code_location.py index 5de10e09923a1..c97a6726124ba 100644 --- a/python_modules/dagster/dagster/_core/host_representation/code_location.py +++ b/python_modules/dagster/dagster/_core/host_representation/code_location.py @@ -38,8 +38,8 @@ ExternalPartitionNamesData, ExternalScheduleExecutionErrorData, ExternalSensorExecutionErrorData, - external_repository_data_from_def, external_partition_set_name_for_job_name, + external_repository_data_from_def, ) from dagster._core.host_representation.grpc_server_registry import GrpcServerRegistry from dagster._core.host_representation.handle import JobHandle, RepositoryHandle diff --git a/python_modules/dagster/dagster_tests/storage_tests/utils/event_log_storage.py b/python_modules/dagster/dagster_tests/storage_tests/utils/event_log_storage.py index 43acc1644e6df..957acdf62b9e1 100644 --- a/python_modules/dagster/dagster_tests/storage_tests/utils/event_log_storage.py +++ b/python_modules/dagster/dagster_tests/storage_tests/utils/event_log_storage.py @@ -452,6 +452,9 @@ def can_watch(self): # Whether the storage is allowed to watch the event log return True + def has_asset_partitions_table(self) -> bool: + return False + def test_event_log_storage_store_events_and_wipe(self, test_run_id, storage): assert len(storage.get_logs_for_run(test_run_id)) == 0 storage.store_event( @@ -2677,6 +2680,134 @@ def test_store_asset_materialization_planned_event_with_partitions_subset( assert record.partition_key is None assert record.event_log_entry.dagster_event.partitions_subset == partitions_subset + def test_partitions_methods_on_materialization_planned_event_with_partitions_subset( + self, storage, instance + ) -> None: + a = AssetKey(["a"]) + gen_events_run_id = make_new_run_id() + subset_event_run_id = make_new_run_id() + + @op + def gen_op(): + yield AssetMaterialization(asset_key=a, partition="2023-01-05") + yield Output(1) + + partitions_def = DailyPartitionsDefinition("2023-01-01") + partitions_subset = ( + external_partitions_definition_from_def(partitions_def) + .get_partitions_definition() + .subset_with_partition_keys( + partitions_def.get_partition_keys_in_range( + PartitionKeyRange("2023-01-05", "2023-01-06") + ) + ) + .to_serializable_subset() + ) + + with instance_for_test() as created_instance: + if not storage.has_instance: + storage.register_instance(created_instance) + + with create_and_delete_test_runs(instance, [gen_events_run_id, subset_event_run_id]): + events_one, _ = _synthesize_events( + lambda: gen_op(), instance=created_instance, run_id=gen_events_run_id + ) + for event in events_one: + storage.store_event(event) + + storage.store_event( + EventLogEntry( + error_info=None, + level="debug", + user_message="", + run_id=subset_event_run_id, + timestamp=time.time(), + dagster_event=DagsterEvent( + DagsterEventType.ASSET_MATERIALIZATION_PLANNED.value, + "nonce", + event_specific_data=AssetMaterializationPlannedData( + a, partition="2023-01-05" + ), + ), + ) + ) + storage.store_event( + EventLogEntry( + error_info=None, + level="debug", + user_message="", + run_id=subset_event_run_id, + timestamp=time.time(), + dagster_event=DagsterEvent( + DagsterEventType.ASSET_MATERIALIZATION_PLANNED.value, + "nonce", + event_specific_data=AssetMaterializationPlannedData( + a, partitions_subset=partitions_subset + ), + ), + ) + ) + + records = storage.get_event_records( + EventRecordsFilter( + event_type=DagsterEventType.ASSET_MATERIALIZATION_PLANNED, + asset_key=a, + ), + ascending=True, + ) + assert len(records) == 2 + single_partition_event = records[0] + assert ( + single_partition_event.event_log_entry.dagster_event.partition == "2023-01-05" + ) + + partitions_subset_event = records[1] + event_id = partitions_subset_event.storage_id + assert partitions_subset_event.partition_key is None + assert ( + partitions_subset_event.event_log_entry.dagster_event.partitions_subset + == partitions_subset + ) + + assert storage.get_materialized_partitions(a) == {"2023-01-05"} + planned_but_no_materialization_partitions = storage.get_latest_asset_partition_materialization_attempts_without_materializations( + a + ) + if self.has_asset_partitions_table(): + # When an asset partitions table is present we can fetch planned but not + # materialized partitions when planned events target a partitions subset + assert len(planned_but_no_materialization_partitions) == 2 + assert planned_but_no_materialization_partitions.keys() == { + "2023-01-05", + "2023-01-06", + } + assert planned_but_no_materialization_partitions["2023-01-05"] == ( + subset_event_run_id, + event_id, + ) + assert planned_but_no_materialization_partitions["2023-01-06"] == ( + subset_event_run_id, + event_id, + ) + + else: + # When an asset partitions table is not present we can only fetch planned but + # not materialized partitions when planned events target a single partition + assert planned_but_no_materialization_partitions.keys() == {"2023-01-05"} + assert planned_but_no_materialization_partitions["2023-01-05"] == ( + subset_event_run_id, + single_partition_event.storage_id, + ) + + # When asset partitions table is not present get_latest_storage_id_by_partition + # only returns storage IDs for single-partition events + latest_storage_id_by_partition = storage.get_latest_storage_id_by_partition( + a, DagsterEventType.ASSET_MATERIALIZATION_PLANNED + ) + assert latest_storage_id_by_partition == { + "2023-01-05": single_partition_event.storage_id + } + def test_get_latest_asset_partition_materialization_attempts_without_materializations_event_ids( self, storage, instance ):