Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove unnecessary sync async compat code from flowspy #14766

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 5 additions & 13 deletions src/prefect/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
from prefect._internal.compatibility.deprecated import (
deprecated_parameter,
)
from prefect._internal.concurrency.api import create_call, from_async
from prefect.blocks.core import Block
from prefect.client.orchestration import get_client
from prefect.client.schemas.actions import DeploymentScheduleCreate
Expand Down Expand Up @@ -86,7 +85,6 @@
from prefect.types.entrypoint import EntrypointType
from prefect.utilities.annotations import NotSet
from prefect.utilities.asyncutils import (
run_sync_in_worker_thread,
sync_compatible,
)
from prefect.utilities.callables import (
Expand Down Expand Up @@ -1041,9 +1039,7 @@ def my_flow(name: str = "world"):
await storage.pull_code()

full_entrypoint = str(storage.destination / entrypoint)
flow: Flow = await from_async.wait_for_call_in_new_thread(
create_call(load_flow_from_entrypoint, full_entrypoint)
)
flow: Flow = load_flow_from_entrypoint(full_entrypoint)
flow._storage = storage
flow._entrypoint = entrypoint

Expand Down Expand Up @@ -1887,10 +1883,8 @@ async def load_flow_from_flow_run(
run_logger.debug(
f"Importing flow code from module path {deployment.entrypoint}"
)
flow = await run_sync_in_worker_thread(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just remembered that I added some logic to run_sync_in_worker_thread to handle @sync_compatible functions at the module level (e.g. Block.load). We will likely need a new solution for that with this run_sync_in_worker_thread call removed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, good test!! I'll poke at this and see if I can simplify the current code and if not, I'll just close this PR

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Going to close for now, the refactors I've tried feel weird.

load_flow_from_entrypoint,
deployment.entrypoint,
use_placeholder_flow=use_placeholder_flow,
flow = load_flow_from_entrypoint(
deployment.entrypoint, use_placeholder_flow=use_placeholder_flow
)
return flow

Expand Down Expand Up @@ -1932,10 +1926,8 @@ async def load_flow_from_flow_run(
import_path = relative_path_to_current_platform(deployment.entrypoint)
run_logger.debug(f"Importing flow code from '{import_path}'")

flow = await run_sync_in_worker_thread(
load_flow_from_entrypoint,
str(import_path),
use_placeholder_flow=use_placeholder_flow,
flow = load_flow_from_entrypoint(
str(import_path), use_placeholder_flow=use_placeholder_flow
)

return flow
Expand Down
Loading