-
Notifications
You must be signed in to change notification settings - Fork 1.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[asset backfill] Pause when code server is down #19494
[asset backfill] Pause when code server is down #19494
Conversation
Current dependencies on/for this PR: This stack of pull requests is managed by Graphite. |
8121c76
to
017b4c8
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I put a couple of thoughts inline but this makes sense to me, not sure if @sryza wants to make a pass too
The automatic test here looks good, I think doing a manual test here once would be a good idea as well to make sure this performs as expected
For example, starting a backfill (can add some sleeps strategically to make it longer) and shutting down the code server in the middle of the backfill, then starting it back up and verifying that it resumes as expected
@@ -155,6 +155,17 @@ def replace_requested_subset(self, requested_subset: AssetGraphSubset) -> "Asset | |||
backfill_start_time=self.backfill_start_time, | |||
) | |||
|
|||
def replace_latest_storage_id(self, latest_storage_id: Optional[int]) -> "AssetBackfillData": |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe with_latest_storage_id?
return AssetBackfillData( | ||
target_subset=self.target_subset, | ||
latest_storage_id=latest_storage_id, | ||
requested_runs_for_target_roots=self.requested_runs_for_target_roots, | ||
materialized_subset=self.materialized_subset, | ||
failed_and_downstream_subset=self.failed_and_downstream_subset, | ||
requested_subset=self.requested_subset, | ||
backfill_start_time=self.backfill_start_time, | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
retuern self._replace(latest_storage_id=latest_storage_id)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ahh this is better
@@ -259,6 +263,11 @@ def submit_asset_run( | |||
return run_to_submit | |||
|
|||
|
|||
class SubmitRunRequestChunkResult(NamedTuple): | |||
chunk_submitted_runs: Sequence[Tuple[RunRequest, DagsterRun]] | |||
code_unreachable_error_raised: bool |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could this also be called "retryable_error_raised"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, good call, will switch to this
# Code server became unavailable mid-backfill. Rewind the cursor back to the cursor | ||
# from the previous iteration, to allow next iteration to reevaluate the same | ||
# events. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need do anything here to prevent the same backfill from iterating in a tight loop if this happens and its always failing? or does the fact that the backfill daemon is an IntervalDaemon keep that from happening?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just confirming - this does not stop other backfills from executing in parallel right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Every 30s, the daemon searches for all in-progress backfills and loops through each one, executing an asset backfill iteration for each.
So if a backfill had a code location error, it would be re-evaluated for on every daemon "loop". But on each loop, all the other backfills could successfully execute.
if submit_run_request_chunk_result is None: | ||
# allow the daemon to heartbeat | ||
yield None | ||
continue | ||
|
||
code_unreachable_error_raised = ( | ||
submit_run_request_chunk_result.code_unreachable_error_raised |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this isn't neccesarily related to this PR specifically, but I found the logic around this for loop pretty hard to wrap my head around - in particular the fact that it will only ever return None until it returns something, then we can guarantee on that something being the last iteration (believe I have that right?) Without that understanding it seemed like things were going to just keep on going (within the same iteration) even after one of the runs failed to submit, but that is not the case - once the submissions either finish or raise this particular error, the iteration is over
One way to make that more clear would be to move most of the logic here outside of the for loop - to make it clear that once submit_run_request_chunk_result is not None, the for loop is guaranteed to be finished
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it will only ever return None until it returns something, then we can guarantee on that something being the last iteration (believe I have that right?)
This is a confusing loop... it returning something doesn't mean it is actually the last iteration though.
The function will return a result for each run request chunk, yielding many None
s between each run request chunk to all the daemon to heartbeat.
once the submissions either finish or raise this particular error, the iteration is over
This part is true though, enabled by the break statements
Let me know if there are other ways you can imagine to make this more digestible and I can file a follow-up PR.
backfill_data_with_submitted_runs = ( | ||
backfill_data_with_submitted_runs.replace_latest_storage_id( | ||
previous_asset_backfill_data.latest_storage_id | ||
) | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
an alternative would be to, rather than updating the storage Id and then potentially un-updating it, waiting until here to update it in the 'expected' case where there is no error (and not doing that if there was an error). That won't work if we use latest_storage_id before this point though and expect it to be the new value
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a good callout.
We're currently doing a weird thing where we call execute_asset_backfill_iteration_inner
, which returns the expected resultant asset backfill data (included requested partitions and latest storage id) after all runs are submitted. This function is used for testing to assert that a backfill iteration will result in X partitions being requested.
Then because we might have mid-iteration backfill cancellations and code location errors like this happening during run submission, we un-update the requested partitions and the cursor when unexpected things happen.
I think we should refactor execute_asset_backfill_iteration_inner
to stop doing these un-updates. Instead we could handle observed changes (updated materialized/failed partitions) separately from expected state updates (partitions to request/next cursor), which I can do if I get the chance.
In the meantime, because there's already existing logic to do "un-updates", I'd prefer to keep the cursor "un-update" logic with the existing logic.
@@ -268,7 +277,9 @@ def submit_asset_runs_in_chunks( | |||
asset_graph: ExternalAssetGraph, | |||
debug_crash_flags: SingleInstigatorDebugCrashFlags, | |||
logger: logging.Logger, | |||
) -> Iterator[Optional[Sequence[Tuple[RunRequest, DagsterRun]]]]: | |||
backfill_id: Optional[str] = None, | |||
catch_code_location_load_errors: bool = False, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i only see one callsite of this method - is there a reason it needs to be toggleable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah only one callsite, i'm ok with making it non-toggleable
017b4c8
to
aa69b7d
Compare
Deploy preview for dagit-core-storybook ready! ✅ Preview Built with commit aa69b7d. |
From reading the description, this makes sense to me. I'm ✅ if @gibsondan is. |
aa69b7d
to
ec51322
Compare
ec51322
to
d37db51
Compare
Yup, tested locally and backfills are functioning as expected |
Fixes #19484.
Users were observing a backfill failure that occurred because the user code server was temporarily unreachable when creating runs. For resiliency, the ideal behavior in this case is to pause the backfill and then reevaluate it on the next iteration, to allow other backfills to be evaluated in the meantime.
This PR adds the following behavior whenever attempting to create a run results in a code location unloadable error:
Existing asset backfill idempotency logic guarantees that if a target partition is already requested, it will not be re-requested.