Skip to content

Commit

Permalink
pr feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
clairelin135 committed Dec 28, 2023
1 parent d10e60e commit aa88d4d
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 85 deletions.
18 changes: 18 additions & 0 deletions python_modules/dagster/dagster/_core/events/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1451,6 +1451,24 @@ def capture_logs(
),
)

@staticmethod
def build_asset_materialization_planned_event(
job_name: str,
step_key: str,
asset_materialization_planned_data: "AssetMaterializationPlannedData",
) -> "DagsterEvent":
"""Constructs an asset materialization planned event, to be logged by the caller."""
event = DagsterEvent(
event_type_value=DagsterEventType.ASSET_MATERIALIZATION_PLANNED.value,
job_name=job_name,
message=(
f"{job_name} intends to materialize asset {asset_materialization_planned_data.asset_key.to_string()}"
),
event_specific_data=asset_materialization_planned_data,
step_key=step_key,
)
return event


def get_step_output_event(
events: Sequence[DagsterEvent], step_key: str, output_name: Optional[str] = "result"
Expand Down
62 changes: 20 additions & 42 deletions python_modules/dagster/dagster/_core/instance/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -848,10 +848,6 @@ def nux_enabled(self) -> bool:
else:
return nux_enabled_by_default

@property
def is_cloud_instance(self) -> bool:
return False

# run monitoring

@property
Expand Down Expand Up @@ -1332,7 +1328,6 @@ def _log_materialization_planned_event_for_asset(
from dagster._core.events import (
AssetMaterializationPlannedData,
DagsterEvent,
DagsterEventType,
)

partition_tag = dagster_run.tags.get(PARTITION_NAME_TAG)
Expand Down Expand Up @@ -1370,40 +1365,24 @@ def _log_materialization_planned_event_for_asset(
"Creating a run targeting a partition range is not supported for jobs partitioned with function-based dynamic partitions"
)

# For now, yielding materialization planned events for single run backfills
# is only supported on cloud
if self.is_cloud_instance and check.not_none(output.properties).is_asset_partitioned:
if check.not_none(output.properties).is_asset_partitioned:
partitions_subset = job_partitions_def.subset_with_partition_keys(
job_partitions_def.get_partition_keys_in_range(
PartitionKeyRange(partition_range_start, partition_range_end)
)
).to_serializable_subset()

if partitions_subset:
event = DagsterEvent(
event_type_value=DagsterEventType.ASSET_MATERIALIZATION_PLANNED.value,
job_name=job_name,
message=(f"{job_name} intends to materialize asset {asset_key.to_string()}"),
event_specific_data=AssetMaterializationPlannedData(
asset_key, partitions_subset=partitions_subset
),
step_key=step.key,
)
self.report_dagster_event(event, dagster_run.run_id, logging.DEBUG)

else:
partition = (
partition_tag if check.not_none(output.properties).is_asset_partitioned else None
)

event = DagsterEvent(
event_type_value=DagsterEventType.ASSET_MATERIALIZATION_PLANNED.value,
job_name=job_name,
message=(f"{job_name} intends to materialize asset {asset_key.to_string()}"),
event_specific_data=AssetMaterializationPlannedData(asset_key, partition=partition),
step_key=step.key,
)
self.report_dagster_event(event, dagster_run.run_id, logging.DEBUG)
partition = (
partition_tag if check.not_none(output.properties).is_asset_partitioned else None
)
materialization_planned = DagsterEvent.build_asset_materialization_planned_event(
job_name,
step.key,
AssetMaterializationPlannedData(
asset_key, partition=partition, partitions_subset=partitions_subset
),
)
self.report_dagster_event(materialization_planned, dagster_run.run_id, logging.DEBUG)

def _log_asset_planned_events(
self,
Expand Down Expand Up @@ -1991,15 +1970,14 @@ def get_event_records(
"""
from dagster._core.events import DagsterEventType

if self.is_cloud_instance:
if (
event_records_filter.event_type == DagsterEventType.ASSET_MATERIALIZATION_PLANNED
and event_records_filter.asset_partitions
):
warnings.warn(
"Asset materialization planned events with partitions subsets will not be "
"returned when the event records filter contains the asset_partitions argument"
)
if (
event_records_filter.event_type == DagsterEventType.ASSET_MATERIALIZATION_PLANNED
and event_records_filter.asset_partitions
):
warnings.warn(
"Asset materialization planned events with partitions subsets will not be "
"returned when the event records filter contains the asset_partitions argument"
)

return self._event_storage.get_event_records(event_records_filter, limit, ascending)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
AssetKey,
AssetOut,
DagsterEventType,
DagsterInstance,
EventRecordsFilter,
Output,
asset,
Expand Down Expand Up @@ -181,14 +180,6 @@ def my_asset():
return 0

with instance_for_test() as instance:

class FakeCloudInstance(DagsterInstance):
@property
def is_cloud_instance(self) -> bool:
return True

instance.__class__ = FakeCloudInstance

materialize_to_memory(
[my_asset],
instance=instance,
Expand All @@ -207,32 +198,6 @@ def is_cloud_instance(self) -> bool:
)


def test_subset_on_asset_materialization_planned_event_for_single_run_backfill_not_allowed():
partitions_def = StaticPartitionsDefinition(["a", "b", "c"])

@asset(partitions_def=partitions_def)
def my_asset():
return 0

with instance_for_test() as instance:
materialize_to_memory(
[my_asset],
instance=instance,
tags={ASSET_PARTITION_RANGE_START_TAG: "a", ASSET_PARTITION_RANGE_END_TAG: "b"},
)

records = instance.get_event_records(
EventRecordsFilter(
DagsterEventType.ASSET_MATERIALIZATION_PLANNED,
AssetKey("my_asset"),
)
)
assert len(records) == 1
record = records[0]
assert record.event_log_entry.dagster_event.event_specific_data.partitions_subset is None
assert record.event_log_entry.dagster_event.event_specific_data.partition is None


def test_single_run_backfill_with_unpartitioned_and_partitioned_mix():
partitions_def = StaticPartitionsDefinition(["a", "b", "c"])

Expand All @@ -245,14 +210,6 @@ def unpartitioned():
return 0

with instance_for_test() as instance:

class FakeCloudInstance(DagsterInstance):
@property
def is_cloud_instance(self) -> bool:
return True

instance.__class__ = FakeCloudInstance

materialize_to_memory(
[partitioned, unpartitioned],
instance=instance,
Expand Down

0 comments on commit aa88d4d

Please sign in to comment.