Skip to content

Commit

Permalink
serialize subset on event first stab
Browse files Browse the repository at this point in the history
  • Loading branch information
clairelin135 committed Nov 30, 2023
1 parent 37c822b commit ef66c9f
Show file tree
Hide file tree
Showing 3 changed files with 123 additions and 12 deletions.
30 changes: 27 additions & 3 deletions python_modules/dagster/dagster/_core/events/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@
RawMetadataValue,
normalize_metadata,
)
from dagster._core.errors import HookExecutionError
from dagster._core.definitions.partition import PartitionsSubset
from dagster._core.errors import DagsterInvariantViolationError, HookExecutionError
from dagster._core.execution.context.system import IPlanContext, IStepContext, StepExecutionContext
from dagster._core.execution.plan.handle import ResolvedFromDynamicStepHandle, StepHandle
from dagster._core.execution.plan.inputs import StepInputData
Expand Down Expand Up @@ -695,6 +696,12 @@ def partition(self) -> Optional[str]:
else:
return None

@property
def partitions_subset(self) -> Optional[PartitionsSubset]:
if self.event_type == DagsterEventType.ASSET_MATERIALIZATION_PLANNED:
return self.asset_materialization_planned_data.partitions_subset
return None

@property
def step_input_data(self) -> "StepInputData":
_assert_type("step_input_data", DagsterEventType.STEP_INPUT, self.event_type)
Expand Down Expand Up @@ -1504,14 +1511,31 @@ def __new__(
class AssetMaterializationPlannedData(
NamedTuple(
"_AssetMaterializationPlannedData",
[("asset_key", AssetKey), ("partition", Optional[str])],
[
("asset_key", AssetKey),
("partition", Optional[str]),
("partitions_subset", Optional["PartitionsSubset"]),
],
)
):
def __new__(cls, asset_key: AssetKey, partition: Optional[str] = None):
def __new__(
cls,
asset_key: AssetKey,
partition: Optional[str] = None,
partitions_subset: Optional["PartitionsSubset"] = None,
):
if partitions_subset and partition:
raise DagsterInvariantViolationError(
"Cannot provide both partition and partitions_subset"
)

return super(AssetMaterializationPlannedData, cls).__new__(
cls,
asset_key=check.inst_param(asset_key, "asset_key", AssetKey),
partition=check.opt_str_param(partition, "partition"),
partitions_subset=check.opt_inst_param(
partitions_subset, "partitions_subset", PartitionsSubset
),
)


Expand Down
51 changes: 42 additions & 9 deletions python_modules/dagster/dagster/_core/instance/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -842,6 +842,11 @@ def nux_enabled(self) -> bool:
else:
return nux_enabled_by_default

@property
def is_cloud_instance(self) -> bool:
# TODO change this to False
return True

# run monitoring

@property
Expand Down Expand Up @@ -1360,29 +1365,47 @@ def _log_asset_planned_events(
"a run with a partition range."
)

partitions_def = (
external_partitions_def_data.get_partitions_definition()
)
partition_keys = partitions_def.get_partition_keys_in_range(
PartitionKeyRange(partition_range_start, partition_range_end)
)
# For now, yielding materialization planned events for single run backfills
# is only supported on cloud
if self.is_cloud_instance:
partitions_def = (
external_partitions_def_data.get_partitions_definition()
)
partitions_subset = partitions_def.subset_with_partition_keys(
partitions_def.get_partition_keys_in_range(
PartitionKeyRange(
partition_range_start, partition_range_end
)
)
).to_serializable_subset()

event = DagsterEvent(
event_type_value=DagsterEventType.ASSET_MATERIALIZATION_PLANNED.value,
job_name=job_name,
message=(
f"{job_name} intends to materialize asset {asset_key.to_string()}"
),
event_specific_data=AssetMaterializationPlannedData(
asset_key, partitions_subset=partitions_subset
),
step_key=step.key,
)
self.report_dagster_event(event, dagster_run.run_id, logging.DEBUG)
else:
partition = (
partition_tag
if check.not_none(output.properties).is_asset_partitioned
else None
)
partition_keys = [partition]

for partition_key in partition_keys:
event = DagsterEvent(
event_type_value=DagsterEventType.ASSET_MATERIALIZATION_PLANNED.value,
job_name=job_name,
message=(
f"{job_name} intends to materialize asset {asset_key.to_string()}"
),
event_specific_data=AssetMaterializationPlannedData(
asset_key, partition=partition_key
asset_key, partition=partition
),
step_key=step.key,
)
Expand Down Expand Up @@ -1957,6 +1980,16 @@ def get_event_records(
Returns:
List[EventLogRecord]: List of event log records stored in the event log storage.
"""
from dagster._core.events import DagsterEventType

if self.is_cloud_instance:
if (
event_records_filter.event_type == DagsterEventType.ASSET_MATERIALIZATION_PLANNED
and event_records_filter.asset_partitions
):
# TODO: Raise warning that events targeting multiple partitions will not be returned
pass

return self._event_storage.get_event_records(event_records_filter, limit, ascending)

@public
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@
from dagster._core.definitions.dependency import NodeHandle
from dagster._core.definitions.job_base import InMemoryJob
from dagster._core.definitions.multi_dimensional_partitions import MultiPartitionKey
from dagster._core.definitions.partition import PartitionKeyRange
from dagster._core.definitions.time_window_partitions import DailyPartitionsDefinition
from dagster._core.definitions.unresolved_asset_job_definition import define_asset_job
from dagster._core.errors import DagsterInvalidInvocationError
from dagster._core.event_api import EventLogCursor, EventRecordsResult
Expand All @@ -66,6 +68,9 @@
from dagster._core.execution.plan.handle import StepHandle
from dagster._core.execution.plan.objects import StepFailureData, StepSuccessData
from dagster._core.execution.stats import StepEventStatus
from dagster._core.host_representation.external_data import (
external_partitions_definition_from_def,
)
from dagster._core.host_representation.origin import (
ExternalJobOrigin,
ExternalRepositoryOrigin,
Expand Down Expand Up @@ -2560,6 +2565,55 @@ def _assert_matches_not_including_event_id(result, expected):
{},
)

def test_store_asset_materialization_planned_event_with_partitions_subset(
self, storage, instance
) -> None:
a = AssetKey(["a"])
run_id_1 = make_new_run_id()

partitions_def = DailyPartitionsDefinition("2023-01-01")
partitions_subset = (
external_partitions_definition_from_def(partitions_def)
.get_partitions_definition()
.subset_with_partition_keys(
partitions_def.get_partition_keys_in_range(
PartitionKeyRange("2023-01-05", "2023-01-10")
)
)
.to_serializable_subset()
)

# TODO make this test only run for cloud storage
# TODO add test that asserts exception is raised when partition key and subset provided
with create_and_delete_test_runs(instance, [run_id_1]):
storage.store_event(
EventLogEntry(
error_info=None,
level="debug",
user_message="",
run_id=run_id_1,
timestamp=time.time(),
dagster_event=DagsterEvent(
DagsterEventType.ASSET_MATERIALIZATION_PLANNED.value,
"nonce",
event_specific_data=AssetMaterializationPlannedData(
a, partitions_subset=partitions_subset
),
),
)
)

records = storage.get_event_records(
EventRecordsFilter(
event_type=DagsterEventType.ASSET_MATERIALIZATION_PLANNED,
asset_key=a,
)
)
assert len(records) == 1
record = records[0]
assert record.partition_key is None
assert record.event_log_entry.dagster_event.partitions_subset == partitions_subset

def test_get_latest_asset_partition_materialization_attempts_without_materializations_event_ids(
self, storage, instance
):
Expand Down

0 comments on commit ef66c9f

Please sign in to comment.