-
Notifications
You must be signed in to change notification settings - Fork 1.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Log materialization planned event for run with partition range #18305
Conversation
Current dependencies on/for this PR:
This stack of pull requests is managed by Graphite. |
2ed5c9f
to
37c822b
Compare
Deploy preview for dagit-storybook ready! ✅ Preview Built with commit 315cc58. |
Deploy preview for dagit-core-storybook ready! ✅ Preview Built with commit 315cc58. |
37c822b
to
315cc58
Compare
315cc58
to
fbd8901
Compare
ee96526
to
c12261a
Compare
037fc5a
to
3d65c2b
Compare
Must be merged with internal PR: https://github.com/dagster-io/internal/pull/7615 |
@@ -1303,8 +1319,12 @@ def _ensure_persisted_execution_plan_snapshot( | |||
return execution_plan_snapshot_id | |||
|
|||
def _log_asset_planned_events( | |||
self, dagster_run: DagsterRun, execution_plan_snapshot: "ExecutionPlanSnapshot" | |||
self, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
_log_asset_planned_events
is a pretty long function already and it's getting significantly longer here. Is there a way to break it up a little?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yup, split the materialization planned events logic out into a different function
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider putting the event generation on a staticmethod on DagsterEvent
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yep, added
@@ -1900,6 +1908,24 @@ def external_schedule_data_from_def(schedule_def: ScheduleDefinition) -> Externa | |||
) | |||
|
|||
|
|||
def can_build_external_partitions_definition_from_def(partitions_def: PartitionsDefinition): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return type annotation plz
job_def.partitions_def | ||
) | ||
if job_def.partitions_def | ||
and can_build_external_partitions_definition_from_def(job_def.partitions_def) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Without this check, we would just error, right? What's wrong with that? Might be better to find out early than discover a problem later along the way?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This check guards against the backcompat dynamic partitions case (a function that returns partition keys)
Without it we raise an error, but what we want is to not yield the asset materialization planned events w/ a subset
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This check guards against the backcompat dynamic partitions case (a function that returns partition keys)
Is it currently possible to launch a range backfill using function-based dynamic partitions? I would be comfortable saying that's not allowed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not possible from the UI but I don't think we gate it if you were to yield a run request with tags or something like that.
Agree that we can make it not allowed, I don't think anyone is doing that anyway
and check.not_none(output.properties).is_asset_partitioned | ||
): | ||
partitions_def = ( | ||
external_partitions_def_data.get_partitions_definition() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This might not be super cheap, right? For static partitions it's O(# partitions), and for time window partitions it involves some pendulum stuff. Thoughts on caching the PartitionsDefinition
in pipeline_and_execution_plan_cache
instead of the ExternalPartitionsDefinitionData
?
Another benefit of this would be spreading ExternalPartitionsDefinitionData
to fewer places – longer term it could be nice to get rid of that and just use @whitelist_for_serdes
on PartitionsDefinition
instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thoughts on caching the PartitionsDefinition in pipeline_and_execution_plan_cache instead of the ExternalPartitionsDefinitionData?
You mean passing around the PartitionsDefinition
instead of the external partitions definition everywhere, right? I'm good with this, this probably saves us some partitions def roundtrips in the asset backfill case
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You mean passing around the PartitionsDefinition instead of the external partitions definition everywhere, right?
Exactly
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wondering if we need to worry about it being unclear that backcompat dynamic partitions defs won't be passed into create_run
...
Could name the new param assets_partitions_def
? What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could name the new param assets_partitions_def? What do you think?
That's a good idea
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, Ive gone ahead with the rename
though I changed it to asset_job_partitions_def
since I think that is a better description
8214fd2
to
a7211b9
Compare
caf5788
to
47d613a
Compare
caddca9
to
e3a0ab0
Compare
@@ -110,6 +112,7 @@ def create_valid_pipeline_run( | |||
status=DagsterRunStatus.NOT_STARTED, | |||
external_job_origin=external_pipeline.get_external_origin(), | |||
job_code_origin=external_pipeline.get_python_origin(), | |||
asset_job_partitions_def=code_location.get_asset_job_partitions_def(external_pipeline), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the size of the partitions def typically? This can still be quite large if it's a static partitions def but small in all other cases?
For static partitions def, would we just want the serialization of the partition keys. Maybe micro-optimization...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the point of partition ranges is just to help with write throughput, then should there be some threshold of how large the ranges are?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can still be quite large if it's a static partitions def but small in all other cases?
Yes, this is true...
Are we worried about performance if the size of the partitions def is large? Between serializing a large execution plan versus having a large in-memory partitions def being passed around, I lean toward having the in-memory object
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the point of partition ranges is just to help with write throughput, then should there be some threshold of how large the ranges are?
I think its to minimize the # of created runs (maybe the same as what you mean by write throughput).
I think it's sensible to limit the size of a range, though the range should be large enough to target all partitions of a reasonably-sized partitions def.
Maybe we need to first decide a limit on the size of partitions defs to enforce this?
|
||
# For now, yielding materialization planned events for single run backfills | ||
# is only supported on cloud | ||
if self.is_cloud_instance and check.not_none(output.properties).is_asset_partitioned: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the flag necessary? Could we always populate the subset but just update the partitions table in the storage layer?
We subclass DagsterInstance in cloud, and use it as a place to override default behavior. Adding an is_cloud_instance
flag in OSS feels off to me and could promote more confusing intertwining of logic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a good call, I believe OSS functions the same regardless of whether partition_key=None
vs partitions_subset
is populated
@@ -1303,8 +1319,12 @@ def _ensure_persisted_execution_plan_snapshot( | |||
return execution_plan_snapshot_id | |||
|
|||
def _log_asset_planned_events( | |||
self, dagster_run: DagsterRun, execution_plan_snapshot: "ExecutionPlanSnapshot" | |||
self, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider putting the event generation on a staticmethod on DagsterEvent
?
@@ -166,3 +171,103 @@ def my_other_asset(my_asset): | |||
) | |||
) | |||
assert record.event_log_entry.dagster_event.event_specific_data.partition is None | |||
|
|||
|
|||
def test_subset_on_asset_materialization_planned_event_for_single_run_backfill_allowed(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add tests flexing how some of the asset partition reads are affected?
get_materialized_partitions
, get_latest_storage_id_by_partition
, get_latest_tags_by_partition
, get_latest_asset_partition_materialization_attempts_without_materializations
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've added tests for these methods
Except get_latest_tags_by_partition
, that method filters by event type and materialization planned events can't contain tags right now
21ca2e7
to
8bffa6b
Compare
e3a0ab0
to
2ddbb10
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks good to me
) | ||
|
||
partition_tag = dagster_run.tags.get(PARTITION_NAME_TAG) | ||
partition_range_start, partition_range_end = ( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not blocking feedback, but pushing this down into a utility, or method on DagsterRun
could help minimize the number of places in the codebase we need to parse these tags.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a good call, though would prefer to split this out into a separate PR to fully refactor out existing references to these tags
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That makes sense
8bffa6b
to
9007970
Compare
2ddbb10
to
a28343f
Compare
a28343f
to
18d362a
Compare
…ore-partitions-subset-on-planned-event]
…on-planned-event]
18d362a
to
24c9557
Compare
This PR enables the UI to correctly display failed/in progress status for single-run backfills, in cloud.
This includes two major changes:
1. Log a materialization planned event containing a partitions subset for single-run backfills upon run creation
If all of the below conditions apply, we log an partitions subset planned event for each asset:
Otherwise, we fallback to the status quo behavior:
partition=None
. We still log a planned event in this case because in certain places (i.e. asset backfills) we query planned events to see which assets are planned to execute.2. Threads an
assets_job_partitions_def
through the run creation callsitesWe need access to the partitions definition of the assets/job in order to build the target partitions subset from the partition range. The alternative would be to serialize a list of targeted partition keys on the execution plan snapshot for partition ranged runs, which bloats the snapshot.
There are two different ways runs are created from jobs:
execute_in_process
and the execution plan is created from thereIn the first case, the code location holds the external repository data in memory. We can then fetch the external partitions definition of the job, and thread this into
create_run
. We could also fetch the partitions definition per-asset from the code location, but that feels extraneous given that we know (1) the partitions definition on the job and (2) whether each asset is partitioned or not.In the second case, we don't have access to the code location, so instead this PR constructs the external partitions definition data from the job def and threads that into
create_run
.Testing
Tested locally on cloud and OSS.
Corresponding internal PR: https://github.com/dagster-io/internal/pull/7615