Skip to content

Commit

Permalink
add more tests [INTERNAL_BRANCH=11-29-claire/store-partitions-subset-…
Browse files Browse the repository at this point in the history
…on-planned-event]
  • Loading branch information
clairelin135 committed Dec 12, 2023
1 parent 3d9b9d4 commit a28343f
Show file tree
Hide file tree
Showing 2 changed files with 132 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@
ExternalPartitionNamesData,
ExternalScheduleExecutionErrorData,
ExternalSensorExecutionErrorData,
external_repository_data_from_def,
external_partition_set_name_for_job_name,
external_repository_data_from_def,
)
from dagster._core.host_representation.grpc_server_registry import GrpcServerRegistry
from dagster._core.host_representation.handle import JobHandle, RepositoryHandle
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -452,6 +452,9 @@ def can_watch(self):
# Whether the storage is allowed to watch the event log
return True

def has_asset_partitions_table(self) -> bool:
return False

def test_event_log_storage_store_events_and_wipe(self, test_run_id, storage):
assert len(storage.get_logs_for_run(test_run_id)) == 0
storage.store_event(
Expand Down Expand Up @@ -2677,6 +2680,134 @@ def test_store_asset_materialization_planned_event_with_partitions_subset(
assert record.partition_key is None
assert record.event_log_entry.dagster_event.partitions_subset == partitions_subset

def test_partitions_methods_on_materialization_planned_event_with_partitions_subset(
self, storage, instance
) -> None:
a = AssetKey(["a"])
gen_events_run_id = make_new_run_id()
subset_event_run_id = make_new_run_id()

@op
def gen_op():
yield AssetMaterialization(asset_key=a, partition="2023-01-05")
yield Output(1)

partitions_def = DailyPartitionsDefinition("2023-01-01")
partitions_subset = (
external_partitions_definition_from_def(partitions_def)
.get_partitions_definition()
.subset_with_partition_keys(
partitions_def.get_partition_keys_in_range(
PartitionKeyRange("2023-01-05", "2023-01-06")
)
)
.to_serializable_subset()
)

with instance_for_test() as created_instance:
if not storage.has_instance:
storage.register_instance(created_instance)

with create_and_delete_test_runs(instance, [gen_events_run_id, subset_event_run_id]):
events_one, _ = _synthesize_events(
lambda: gen_op(), instance=created_instance, run_id=gen_events_run_id
)
for event in events_one:
storage.store_event(event)

storage.store_event(
EventLogEntry(
error_info=None,
level="debug",
user_message="",
run_id=subset_event_run_id,
timestamp=time.time(),
dagster_event=DagsterEvent(
DagsterEventType.ASSET_MATERIALIZATION_PLANNED.value,
"nonce",
event_specific_data=AssetMaterializationPlannedData(
a, partition="2023-01-05"
),
),
)
)
storage.store_event(
EventLogEntry(
error_info=None,
level="debug",
user_message="",
run_id=subset_event_run_id,
timestamp=time.time(),
dagster_event=DagsterEvent(
DagsterEventType.ASSET_MATERIALIZATION_PLANNED.value,
"nonce",
event_specific_data=AssetMaterializationPlannedData(
a, partitions_subset=partitions_subset
),
),
)
)

records = storage.get_event_records(
EventRecordsFilter(
event_type=DagsterEventType.ASSET_MATERIALIZATION_PLANNED,
asset_key=a,
),
ascending=True,
)
assert len(records) == 2
single_partition_event = records[0]
assert (
single_partition_event.event_log_entry.dagster_event.partition == "2023-01-05"
)

partitions_subset_event = records[1]
event_id = partitions_subset_event.storage_id
assert partitions_subset_event.partition_key is None
assert (
partitions_subset_event.event_log_entry.dagster_event.partitions_subset
== partitions_subset
)

assert storage.get_materialized_partitions(a) == {"2023-01-05"}
planned_but_no_materialization_partitions = storage.get_latest_asset_partition_materialization_attempts_without_materializations(
a
)
if self.has_asset_partitions_table():
# When an asset partitions table is present we can fetch planned but not
# materialized partitions when planned events target a partitions subset
assert len(planned_but_no_materialization_partitions) == 2
assert planned_but_no_materialization_partitions.keys() == {
"2023-01-05",
"2023-01-06",
}
assert planned_but_no_materialization_partitions["2023-01-05"] == (
subset_event_run_id,
event_id,
)
assert planned_but_no_materialization_partitions["2023-01-06"] == (
subset_event_run_id,
event_id,
)

else:
# When an asset partitions table is not present we can only fetch planned but
# not materialized partitions when planned events target a single partition
assert planned_but_no_materialization_partitions.keys() == {"2023-01-05"}
assert planned_but_no_materialization_partitions["2023-01-05"] == (
subset_event_run_id,
single_partition_event.storage_id,
)

# When asset partitions table is not present get_latest_storage_id_by_partition
# only returns storage IDs for single-partition events
latest_storage_id_by_partition = storage.get_latest_storage_id_by_partition(
a, DagsterEventType.ASSET_MATERIALIZATION_PLANNED
)
assert latest_storage_id_by_partition == {
"2023-01-05": single_partition_event.storage_id
}

def test_get_latest_asset_partition_materialization_attempts_without_materializations_event_ids(
self, storage, instance
):
Expand Down

0 comments on commit a28343f

Please sign in to comment.