-
Notifications
You must be signed in to change notification settings - Fork 307
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Update eager task launching & monitoring #3042
Conversation
Signed-off-by: Yee Hing Tong <[email protected]>
Signed-off-by: Yee Hing Tong <[email protected]>
Signed-off-by: Yee Hing Tong <[email protected]>
Signed-off-by: Yee Hing Tong <[email protected]>
Signed-off-by: Yee Hing Tong <[email protected]>
Signed-off-by: Yee Hing Tong <[email protected]>
Signed-off-by: Yee Hing Tong <[email protected]>
Signed-off-by: Yee Hing Tong <[email protected]>
Signed-off-by: Yee Hing Tong <[email protected]>
…hread fails Signed-off-by: Yee Hing Tong <[email protected]>
Code Review Agent Run Status
|
Signed-off-by: Yee Hing Tong <[email protected]>
Signed-off-by: Yee Hing Tong <[email protected]>
Signed-off-by: Yee Hing Tong <[email protected]>
Signed-off-by: Yee Hing Tong <[email protected]>
Signed-off-by: Yee Hing Tong <[email protected]>
Code Review Agent Run Status
|
Signed-off-by: Yee Hing Tong <[email protected]>
Code Review Agent Run Status
|
Signed-off-by: Yee Hing Tong <[email protected]>
Signed-off-by: Yee Hing Tong <[email protected]>
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #3042 +/- ##
==========================================
- Coverage 82.79% 79.47% -3.33%
==========================================
Files 3 202 +199
Lines 186 21390 +21204
Branches 0 2756 +2756
==========================================
+ Hits 154 16999 +16845
- Misses 32 3616 +3584
- Partials 0 775 +775 ☔ View full report in Codecov by Sentry. |
Signed-off-by: Yee Hing Tong <[email protected]>
Signed-off-by: Yee Hing Tong <[email protected]>
Code Review Agent Run Status
|
Signed-off-by: Yee Hing Tong <[email protected]>
Signed-off-by: Yee Hing Tong <[email protected]>
Code Review Agent Run Status
|
Signed-off-by: Yee Hing Tong <[email protected]>
Signed-off-by: Yee Hing Tong <[email protected]>
Signed-off-by: Yee Hing Tong <[email protected]>
Signed-off-by: Yee Hing Tong <[email protected]>
Signed-off-by: Yee Hing Tong <[email protected]>
Code Review Agent Run Status
|
Code Review Agent Run #be129bActionable Suggestions - 7
Additional Suggestions - 3
Review Details
|
Changelist by BitoThis pull request implements the following key changes.
|
wi1 = WorkItem(entity=t1, wf_exec=fwex, input_kwargs={}) | ||
wi2 = WorkItem(entity=t1, wf_exec=fwex, input_kwargs={}) | ||
wi2.uuid = wi1.uuid | ||
assert wi1 == wi2 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The test case test_work_item_hashing_equality()
manually sets the uuid
to test equality but doesn't verify hash equality. Consider adding an assertion to verify that hash(wi1) == hash(wi2)
since equal objects should have equal hashes.
Code suggestion
Check the AI-generated fix before applying
assert wi1 == wi2 | |
assert wi1 == wi2 | |
assert hash(wi1) == hash(wi2) |
Code Review Run #be129b
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
wi1 = WorkItem(entity=t1, wf_exec=fwex, input_kwargs={}) | ||
wi2 = WorkItem(entity=t1, wf_exec=fwex, input_kwargs={}) | ||
wi2.uuid = wi1.uuid | ||
assert wi1 == wi2 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider adding test cases to verify WorkItem
equality behavior when input_kwargs
or wf_exec
differ between instances. The current test only verifies equality for identical objects.
Code suggestion
Check the AI-generated fix before applying
assert wi1 == wi2 | |
assert wi1 == wi2 | |
# Test inequality cases | |
wi3 = WorkItem(entity=t1, wf_exec=fwex, input_kwargs={'param': 'value'}) | |
wi3.uuid = wi1.uuid | |
assert wi1 != wi3 # Different input_kwargs | |
wi4 = WorkItem(entity=t1, wf_exec=fwex, input_kwargs={}) | |
assert wi1 != wi4 # Different UUIDs |
Code Review Run #be129b
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
flytekit/core/worker_queue.py
Outdated
python_interface: typing.Optional[Interface] = None | ||
uuid: typing.Optional[uuid.UUID] = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider initializing python_interface
and uuid
in __init__
or __post_init__
instead of using class-level defaults, since these are already being set in __post_init__
.
Code suggestion
Check the AI-generated fix before applying
python_interface: typing.Optional[Interface] = None | |
uuid: typing.Optional[uuid.UUID] = None | |
python_interface: typing.Optional[Interface] | |
uuid: typing.Optional[uuid.UUID] |
Code Review Run #be129b
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
flytekit/core/worker_queue.py
Outdated
target=self._execute, daemon=True, name="controller-thread" | ||
) | ||
self.__runner_thread.start() | ||
atexit.register(self._close, event=self.stopping_condition, runner=self.__runner_thread) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider using weakref.finalize()
instead of atexit.register()
for cleanup. atexit
handlers may not run if the program exits abnormally, while weakref.finalize()
provides more reliable cleanup.
Code suggestion
Check the AI-generated fix before applying
atexit.register(self._close, event=self.stopping_condition, runner=self.__runner_thread) | |
import weakref | |
weakref.finalize(self, self._close, event=self.stopping_condition, runner=self.__runner_thread) |
Code Review Run #be129b
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Quick glance over
|
||
exc = EagerException(f"Error executing {work.entity.name} with error: {work.wf_exec.closure.error}") | ||
work.set_error(exc) | ||
return self.status == ItemStatus.SUCCESS or self.status == ItemStatus.FAILED |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ready
seems like a weird name here. Should this be completed
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
left over from when it was an asyncio.Future. Let me change it to is_in_terminal_state
flytekit/core/worker_queue.py
Outdated
elif update.wf_exec.closure.phase == WorkflowExecutionPhase.FAILED: | ||
update.status = ItemStatus.FAILED | ||
else: | ||
assert item.status == ItemStatus.RUNNING |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this end up being a more detailed error just in case this is not true?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
replacing with a debug log line, just to capture the other arm of the conditional is all.
Signed-off-by: Yee Hing Tong <[email protected]>
Code Review Agent Run #bfa426Actionable Suggestions - 1
Review Details
|
Signed-off-by: Yee Hing Tong <[email protected]>
Why are the changes needed?
This change simplifies the runner that kicks off executions for eager tasks by making its main executor function
async
, thus removing the need to handle an explicit look. Also the background functions that launch and monitor executions don't need to be async.What changes were proposed in this pull request?
worker_queue
file into one.add
function inController
which is called by the call handler in promise.py to be async. Because of this, the async call handler can now just await on this function.Controller
object that actually launch and monitor the executions to be sync instead of async.How was this patch tested?
Tested using local sandbox and running the internal hpo example.
Setup process
Screenshots
Check all the applicable boxes
Related PRs
Docs link
Summary by Bito
This PR refactors the eager task execution system by simplifying async/sync interaction and streamlining the Controller class. Major enhancements include improved thread safety, error handling, and execution state management. Changes include variable renaming for clarity (wi to work_item), restructured Python interface handling, and enhanced state management through method renaming. Implementation includes comprehensive test coverage and improved logging with context manager implementation.Unit tests added: True
Estimated effort to review (1-5, lower is better): 4