-
Notifications
You must be signed in to change notification settings - Fork 1.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
add indirect execution context access #14954
add indirect execution context access #14954
Conversation
Current dependencies on/for this PR:
This stack of pull requests is managed by Graphite. |
37be28c
to
8b69394
Compare
Deploy preview for dagit-core-storybook ready! ✅ Preview Built with commit 9457af6. |
8b69394
to
d0752c8
Compare
1ea5b5c
to
df619f3
Compare
df619f3
to
d13f3fc
Compare
If we do this, I think it should be a getter of the class (e.g. AssetExecutionContext.get()) so that the type of the context is clear. |
fad7e83
to
b6ba500
Compare
@@ -1300,15 +1305,24 @@ def typed_event_stream_error_message(self) -> Optional[str]: | |||
def set_requires_typed_event_stream(self, *, error_message: Optional[str] = None) -> None: | |||
self._step_execution_context.set_requires_typed_event_stream(error_message=error_message) | |||
|
|||
@staticmethod | |||
def get_current() -> Optional["OpExecutionContext"]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should these raise or return None when there is no current context?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO we should raise
with time_execution_scope() as timer_result, enter_execution_context( | ||
step_context | ||
) as compute_context: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is hoisted up here to the top of the iterator call stack since context managers + active generators get goofy and if you raise an exception based on a yielded value in a frame above where the context manager is opened it does not get closed
Interested @slopp 's feedback on naming, etc |
@@ -405,3 +405,18 @@ def the_op(context: int): | |||
@asset | |||
def the_asset(context: int): | |||
pass | |||
|
|||
|
|||
def test_get_context(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
might be worth testing thread behavior? I guess ContextVar should handle it fine
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ya feel pretty good about thread & coroutine concurrency
https://docs.python.org/3/library/contextvars.html
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Req'ing change based on the in-memory instantiations issue (comment inline)
asset_ctx = AssetExecutionContext(step_context) | ||
op_ctx = OpExecutionContext(step_context) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are going to switch this from an IS-A to a HAS-A relationship soon.
@alangenfeld It's subtle, but I think we should add a method on AssetExecutionContext
now called get_op_execution_context
and just that for the this instance. I'm a bit spooked by having two objects in memory for this. cc: @jamiedemaria
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The thing that led me this current approach was ensuring that the passed down context
was ==
to the result of _ExecutionContext.get()
. It will requires some shenaigans to maintain that equality if we do the proposed shifting.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure I'm quite following the issue/proposed solution here. can you add some elaboration?
I'm not sure I understand the use case enough to have an opinion on the name I think the only thing that is potentially confusing as a user is whether this method has to be called... eg @asset
def my_asset(context: AssetExecutionContext):
context.get_current() # what? is this necessary; why or why not? Does re-using the standard Python Otherwise fine with |
5f99d0b
to
9889bd2
Compare
@slopp the use case here is avoid the need to pass contexts around entirely and instead provide a global accessor. You would instead be able to write code like the following: @asset
def my_asset():
context = AssetExecutionContext.get_current() There are tradeoffs to this approach, but we have certainly gotten this feature request in the past. |
asset_ctx = AssetExecutionContext(step_context) | ||
asset_token = _current_asset_execution_context.set(asset_ctx) | ||
|
||
try: | ||
if context_annotation is EmptyAnnotation: | ||
# if no type hint has been given, default to: | ||
# * AssetExecutionContext for sda steps not in graph-backed assets, and asset_checks | ||
# * OpExecutionContext for non sda steps | ||
# * OpExecutionContext for ops in graph-backed assets | ||
if is_asset_check: | ||
yield asset_ctx | ||
elif is_op_in_graph_asset or not is_sda_step: | ||
yield asset_ctx.get_op_execution_context() | ||
else: | ||
yield asset_ctx | ||
elif context_annotation is AssetExecutionContext: | ||
yield asset_ctx | ||
else: | ||
yield asset_ctx.get_op_execution_context() | ||
finally: | ||
_current_asset_execution_context.reset(asset_token) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍🏻 thanks much more comfortable with this
@@ -1300,15 +1306,34 @@ def typed_event_stream_error_message(self) -> Optional[str]: | |||
def set_requires_typed_event_stream(self, *, error_message: Optional[str] = None) -> None: | |||
self._step_execution_context.set_requires_typed_event_stream(error_message=error_message) | |||
|
|||
@staticmethod | |||
def get() -> "OpExecutionContext": |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i updated these to just get
now that they raise + slopps feedback
looks good on my end, will leave to others w/ feedback to approve |
Thanks for clarifying. Yes this new capability makes sense to me. I've seen a few users get very tripped up by the use of Understanding the use case, I still don't have super strong opinions on the names. |
Also would like @PedramNavid and @tacastillo's thoughts on this. It solves a problem that some users have but at the cost of yet-another-way of doing things. |
From what I understand @benpankow has an internal use case for this functionality (being able to swap in a insights compatible resource without changing the API). In the current form of the PR the new static If we feel like we are close to resolution I am fine getting the buy in to mark this |
Oh wow this is beautiful. I'm a much bigger fan of pulling the contexts into the scope. I used to like the magic arguments, but it's often a stumbling point for users. And I want to say we might've had an early churn because they've said something along the lines of:
There are handfuls of users that need this. I often get asked:
Once this isn't experimental, my vote would be to appoint this as the best practice and to move away from magic arguments unless needed (sounds like there's a use case for keeping them?) I'd say the benefits of it outweigh the overall cost of the addition of it. |
Generally supportive! I assume same experience with regards to type-ahead/completions? So long as we maintain support for existing context magic for a while I think it's fine. How would this work with functions called from an asset? You would still need to pass context right? Or would this work? def my_wrapper_func(x):
if AssetExecutionContext.some_method():
do_something_with(x)
@asset
def my_asset():
x = 123
my_wrapper_func(x) |
@PedramNavid @tacastillo The tradeoff I'd like you to consider is 1) should we allow both methods and accept that there are "two ways of doing things" 2) should we drive heavy towards to global accessor and incur switching costs and 3) what is the price we pay for having code depend on global state. @tacastillo in terms of "my vote would be to appoint this as the best practice and to move away from magic arguments unless needed (sounds like there's a use case for keeping them?)" consider the cases where we have resource initialization or sensors or anywhere else where there is a subtly different context. Should we have N global accessors and error when users call the wrong one? Or consolidate into a single That all being said as @alangenfeld said this solves an immediate problem for Ben and does not make this public, so we can go ahead and land this. Seems like whether or not to make this public is a sep discussion. |
9889bd2
to
9457af6
Compare
Deploy preview for dagit-storybook ready! ✅ Preview Built with commit 9457af6. |
adds a means to get the current op/asset execution context via a function call, setup using a `ContextVar`. Exposed as `OpExecutionContext.get()` & `AssetExecutionContext.get()` ## How I Tested These Changes added test
adds a means to get the current op/asset execution context via a function call, setup using a
ContextVar
. Exposed asOpExecutionContext.get()
&AssetExecutionContext.get()
How I Tested These Changes
added test