Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

update syntax for eager #1774

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/checks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ jobs:
uv venv $GITHUB_WORKSPACE/.venv
source $GITHUB_WORKSPACE/.venv/bin/activate
if [ -f requirements.in ]; then uv pip install -r requirements.in; fi
uv pip install "flytekit>=1.12.2" "numpy<2.0.0"
uv pip install "flytekit>=1.15.0a1" "numpy<2.0.0"
pip freeze
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
Expand Down
24 changes: 13 additions & 11 deletions dev-requirements.txt
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
#
# This file is autogenerated by pip-compile with Python 3.12
# by the following command:
#
# pip-compile dev-requirements.in
#
# This file was autogenerated by uv via the following command:
# uv pip compile dev-requirements.in -o dev-requirements.txt
adlfs==2024.7.0
# via flytekit
aiobotocore==2.15.1
Expand All @@ -20,6 +16,8 @@ aioitertools==0.12.0
# via aiobotocore
aiosignal==1.3.1
# via aiohttp
annotated-types==0.7.0
# via pydantic
attrs==24.2.0
# via
# aiohttp
Expand Down Expand Up @@ -72,7 +70,7 @@ cryptography==43.0.1
# azure-storage-blob
# msal
# pyjwt
dask[array,dataframe]==2024.9.1
dask==2024.9.1
# via
# -r dev-requirements.in
# dask-expr
Expand Down Expand Up @@ -272,16 +270,18 @@ pyasn1-modules==0.4.1
# via google-auth
pycparser==2.22
# via cffi
pydantic==2.10.3
# via -r dev-requirements.in
pydantic-core==2.27.1
# via pydantic
pyflakes==3.2.0
# via autoflake
pygments==2.18.0
# via
# flytekit
# rich
pyjwt[crypto]==2.9.0
# via
# msal
# pyjwt
pyjwt==2.9.0
# via msal
pytest==8.3.3
# via -r dev-requirements.in
python-dateutil==2.9.0.post0
Expand Down Expand Up @@ -346,6 +346,8 @@ typing-extensions==4.12.2
# flytekit
# mashumaro
# mypy
# pydantic
# pydantic-core
# rich-click
# typing-inspect
typing-inspect==0.9.0
Expand Down
19 changes: 6 additions & 13 deletions docs-requirements.txt
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
#
# This file is autogenerated by pip-compile with Python 3.12
# by the following command:
#
# pip-compile docs-requirements.in
#
# This file was autogenerated by uv via the following command:
# uv pip compile docs-requirements.in -o docs-requirements.txt
accessible-pygments==0.0.5
# via pydata-sphinx-theme
adlfs==2024.7.0
Expand Down Expand Up @@ -403,10 +399,8 @@ pygments==2.18.0
# sphinx
# sphinx-prompt
# sphinx-tabs
pyjwt[crypto]==2.9.0
# via
# msal
# pyjwt
pyjwt==2.9.0
# via msal
python-dateutil==2.9.0.post0
# via
# botocore
Expand Down Expand Up @@ -475,6 +469,8 @@ scikit-learn==1.5.2
# via -r docs-requirements.in
scipy==1.14.1
# via scikit-learn
setuptools==75.6.0
# via torch
six==1.16.0
# via
# asttokens
Expand Down Expand Up @@ -609,6 +605,3 @@ yarl==1.13.1
# via aiohttp
zipp==3.20.2
# via importlib-metadata

# The following packages are considered to be unsafe in a requirements file:
# setuptools
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from flytekit import task, workflow
from flytekit.experimental import eager
from flytekit import eager, task, workflow


# Example 1
Expand All @@ -15,21 +14,21 @@ def double(x: int) -> int:

@eager
async def simple_eager_workflow(x: int) -> int:
out = await add_one(x=x)
out = add_one(x=x)
if out < 0:
return -1
return await double(x=out)
return double(x=out)


# Example 2
@eager
async def another_eager_workflow(x: int) -> int:
out = await add_one(x=x)
out = add_one(x=x)

# out is a Python integer
out = out - 1

return await double(x=out)
return double(x=out)


# Example 3
Expand All @@ -40,14 +39,14 @@ def gt_100(x: int) -> bool:

@eager
async def eager_workflow_with_conditionals(x: int) -> int:
out = await add_one(x=x)
out = add_one(x=x)

if out < 0:
return -1
elif await gt_100(x=out):
elif gt_100(x=out):
return 100
else:
out = await double(x=out)
out = double(x=out)

assert out >= -1
return out
Expand All @@ -58,15 +57,20 @@ async def eager_workflow_with_conditionals(x: int) -> int:
import asyncio


@task
async def add_one_async(x: int) -> int:
return x + 1


@eager
async def eager_workflow_with_for_loop(x: int) -> int:
outputs = []

for i in range(x):
outputs.append(add_one(x=i))
outputs.append(add_one_async(x=i))

outputs = await asyncio.gather(*outputs)
return await double(x=sum(outputs))
return double(x=sum(outputs))


# Example 5
Expand All @@ -79,27 +83,27 @@ def subworkflow(x: int) -> int:

@eager
async def eager_workflow_with_static_subworkflow(x: int) -> int:
out = await subworkflow(x=x)
out = subworkflow(x=x)
assert out == (x + 1) * 2
return out


# Example 6
# Eager subworkflows
# Nested eager tasks
@eager
async def eager_subworkflow(x: int) -> int:
return await add_one(x=x)
return add_one(x=x)


@eager
async def nested_eager_workflow(x: int) -> int:
out = await eager_subworkflow(x=x)
return await double(x=out)
return double(x=out)


# Example 7
# Catching exceptions
from flytekit.experimental import EagerException
from flytekit.exceptions.eager import EagerException


@task
Expand All @@ -123,23 +127,3 @@ async def eager_workflow_with_exception(x: int) -> int:
if __name__ == "__main__":
result = asyncio.run(simple_eager_workflow(x=5))
print(f"Result: {result}") # "Result: 12"


# Sandbox Flyte cluster execution
# See docs for full steps
from flytekit.configuration import Config
from flytekit.remote import FlyteRemote


@eager(
remote=FlyteRemote(
config=Config.for_sandbox(),
default_project="flytesnacks",
default_domain="development",
)
)
async def eager_workflow_sandbox(x: int) -> int:
out = await add_one(x=x)
if out < 0:
return -1
return await double(x=out)
33 changes: 0 additions & 33 deletions examples/data_types_and_io/data_types_and_io/pickle_type.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,39 +26,6 @@ def superhero_wf(name: str = "Thor", power: str = "Flight") -> str:
return greet_superhero(superhero=superhero)


# Batch size
# By default, if the list subtype is unrecognized, a single pickle file is generated.
# To optimize serialization and deserialization performance for scenarios involving a large number of items
# or significant list elements, you can specify a batch size.
# This feature allows for the processing of each batch as a separate pickle file.
# The following example demonstrates how to set the batch size.
from typing import Iterator

from flytekit.types.pickle.pickle import BatchSize
from typing_extensions import Annotated


@task
def welcome_superheroes(names: list[str], powers: list[str]) -> Annotated[list[Superhero], BatchSize(3)]:
return [Superhero(name, power) for name, power in zip(names, powers)]


@task
def greet_superheroes(superheroes: list[Superhero]) -> Iterator[str]:
for superhero in superheroes:
yield f"👋 Hello {superhero.name}! Your superpower is {superhero.power}."


@workflow
def superheroes_wf(
names: list[str] = ["Thor", "Spiderman", "Hulk"],
powers: list[str] = ["Flight", "Surface clinger", "Shapeshifting"],
) -> Iterator[str]:
superheroes = welcome_superheroes(names=names, powers=powers)
return greet_superheroes(superheroes=superheroes)


# Run the workflow locally
if __name__ == "__main__":
print(f"Superhero wf: {superhero_wf()}")
print(f"Superhero(es) wf: {superheroes_wf()}")
Loading