Skip to content

Commit

Permalink
Clean up and move user guide Python files (#1584)
Browse files Browse the repository at this point in the history
* example-ize advanced composition files

Signed-off-by: nikki everett <[email protected]>

* example-ize basics files

Signed-off-by: nikki everett <[email protected]>

* example-ize customizing dependencies files

Signed-off-by: nikki everett <[email protected]>

* example-ize data types and i/o files

Signed-off-by: nikki everett <[email protected]>

* example-ize development lifecycle files

Signed-off-by: nikki everett <[email protected]>

* example-ize extending files

Signed-off-by: nikki everett <[email protected]>

* example-ize testing files

Signed-off-by: nikki everett <[email protected]>

* example-ize productionizing files

Signed-off-by: nikki everett <[email protected]>

* example-ize raw containers file

Signed-off-by: nikki everett <[email protected]>

* move examples that should not be processed out of the way of the docs build

Signed-off-by: nikki everett <[email protected]>

* delete user guide files from examples since they've been moved to example_code

Signed-off-by: nikki everett <[email protected]>

* add README

Signed-off-by: nikki everett <[email protected]>

* linter's mad

Signed-off-by: nikki everett <[email protected]>

* update dataclass example

Signed-off-by: nikki everett <[email protected]>

* update test wf paths

Signed-off-by: nikki everett <[email protected]>

* fix formatting

Signed-off-by: nikki everett <[email protected]>

* ignore module level import not at top of file errors

Signed-off-by: nikki everett <[email protected]>

* rename directory

Signed-off-by: nikki everett <[email protected]>

---------

Signed-off-by: nikki everett <[email protected]>
  • Loading branch information
neverett authored Apr 15, 2024
1 parent e7d36be commit 93133f9
Show file tree
Hide file tree
Showing 138 changed files with 2,352 additions and 7,207 deletions.
2 changes: 1 addition & 1 deletion .flake8
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[flake8]
max-line-length = 180
extend-ignore = E203, E266, E501, W503, E741
extend-ignore = E203, E266, E402, E501, W503, E741
exclude = .svn,CVS,.bzr,.hg,.git,__pycache__,venv/*,src/*,.rst,build
max-complexity=16
3 changes: 3 additions & 0 deletions example_code/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# Flytesnacks example code

A repository of runnable example code for Flyte.
File renamed without changes.
5 changes: 5 additions & 0 deletions example_code/advanced_composition/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Advanced Composition

These examples introduce the advanced features of the Flytekit Python SDK.
They cover more complex aspects of Flyte, including conditions, subworkflows,
dynamic workflows, map tasks, gate nodes and more.
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
from flytekit import task, workflow


@task
def t2():
print("Running t2")
return


@task
def t1():
print("Running t1")
return


@task
def t0():
print("Running t0")
return


# Chaining tasks
@workflow
def chain_tasks_wf():
t2_promise = t2()
t1_promise = t1()
t0_promise = t0()

t0_promise >> t1_promise
t1_promise >> t2_promise


# Chaining subworkflows
@workflow
def sub_workflow_1():
t1()


@workflow
def sub_workflow_0():
t0()


@workflow
def chain_workflows_wf():
sub_wf1 = sub_workflow_1()
sub_wf0 = sub_workflow_0()

sub_wf0 >> sub_wf1
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
from flytekit import current_context, task, workflow
from flytekit.exceptions.user import FlyteRecoverableException

RETRIES = 3


# Define a task to iterate precisely `n_iterations`, checkpoint its state, and recover from simulated failures.
@task(retries=RETRIES)
def use_checkpoint(n_iterations: int) -> int:
cp = current_context().checkpoint
prev = cp.read()

start = 0
if prev:
start = int(prev.decode())

# Create a failure interval to simulate failures across 'n' iterations and then succeed after configured retries
failure_interval = n_iterations // RETRIES
index = 0
for index in range(start, n_iterations):
# Simulate a deterministic failure for demonstration. Showcasing how it eventually completes within the given retries
if index > start and index % failure_interval == 0:
raise FlyteRecoverableException(f"Failed at iteration {index}, failure_interval {failure_interval}.")
# Save progress state. It is also entirely possible to save state every few intervals
cp.write(f"{index + 1}".encode())
return index


# Create a workflow that invokes the task.
# The task will automatically undergo retries in the event of a FlyteRecoverableException.
@workflow
def checkpointing_example(n_iterations: int) -> int:
return use_checkpoint(n_iterations=n_iterations)


# The local checkpoint is not utilized here because retries are not supported.
if __name__ == "__main__":
try:
checkpointing_example(n_iterations=10)
except RuntimeError as e: # noqa : F841
# Since no retries are performed, an exception is expected when run locally
pass
Original file line number Diff line number Diff line change
@@ -1,32 +1,13 @@
# %% [markdown]
# (conditional)=
#
# # Conditional
#
# ```{eval-rst}
# .. tags:: Intermediate
# ```
#
# Flytekit elevates conditions to a first-class construct named `conditional`, providing a powerful mechanism for selectively
# executing branches in a workflow. Conditions leverage static or dynamic data generated by tasks or
# received as workflow inputs. While conditions are highly performant in their evaluation,
# it's important to note that they are restricted to specific binary and logical operators
# and are applicable only to primitive values.
#
# To begin, import the necessary libraries.
# %%
import random

from flytekit import conditional, task, workflow


# %% [markdown]
# ## Simple branch
# Simple branch
#
# In this example, we introduce two tasks, `calculate_circle_circumference` and
# `calculate_circle_area`. The workflow dynamically chooses between these tasks based on whether the input
# falls within the fraction range (0-1) or not.
# %%
@task
def calculate_circle_circumference(radius: float) -> float:
return 2 * 3.14 * radius # Task to calculate the circumference of a circle
Expand Down Expand Up @@ -56,13 +37,11 @@ def shape_properties(radius: float) -> float:
print(f"Area of circle (radius={radius_large}): {shape_properties(radius=radius_large)}")


# %% [markdown]
# ## Multiple branches
# Multiple branches
#
# We establish an `if` condition with multiple branches, which will result in a failure if none of the conditions is met.
# It's important to note that any `conditional` statement in Flyte is expected to be complete,
# meaning that all possible branches must be accounted for.
# %%
@workflow
def shape_properties_with_multiple_branches(radius: float) -> float:
return (
Expand All @@ -76,17 +55,14 @@ def shape_properties_with_multiple_branches(radius: float) -> float:
)


# %% [markdown]
# :::{note}
# Take note of the usage of bitwise operators (`&`). Due to Python's PEP-335,
# the logical `and`, `or` and `not` operators cannot be overloaded.
# Flytekit employs bitwise `&` and `|` as equivalents for logical `and` and `or` operators,
# a convention also observed in other libraries.
# :::
#
# ## Consuming the output of a conditional
# Consuming the output of a conditional
#
# Here, we write a task that consumes the output returned by a `conditional`.
# %%
@workflow
def shape_properties_accept_conditional_output(radius: float) -> float:
result = (
Expand All @@ -105,13 +81,11 @@ def shape_properties_accept_conditional_output(radius: float) -> float:
print(f"Circumference of circle x Area of circle (radius={radius_small}): {shape_properties(radius=5.0)}")


# %% [markdown]
# ## Using the output of a previous task in a conditional
# Using the output of a previous task in a conditional
#
# You can check if a boolean returned from the previous task is `True`,
# but unary operations are not supported directly. Instead, use the `is_true`,
# `is_false` and `is_none` methods on the result.
# %%
@task
def coin_toss(seed: int) -> bool:
"""
Expand Down Expand Up @@ -145,29 +119,13 @@ def boolean_wf(seed: int = 5) -> int:
return conditional("coin_toss").if_(result.is_true()).then(success()).else_().then(failed())


# %% [markdown]
# :::{note}
# *How do output values acquire these methods?* In a workflow, direct access to outputs is not permitted.
# Inputs and outputs are automatically encapsulated in a special object known as {py:class}`flytekit.extend.Promise`.
# :::
#
# ## Using boolean workflow inputs in a conditional
# You can directly pass a boolean to a workflow.
# %%
# Using boolean workflow inputs in a conditional
@workflow
def boolean_input_wf(boolean_input: bool) -> int:
return conditional("boolean_input_conditional").if_(boolean_input.is_true()).then(success()).else_().then(failed())


# %% [markdown]
# :::{note}
# Observe that the passed boolean possesses a method called `is_true`.
# This boolean resides within the workflow context and is encapsulated in a specialized Flytekit object.
# This special object enables it to exhibit additional behavior.
# :::
#
# You can run the workflows locally as follows:
# %%
# Run the workflow locally
if __name__ == "__main__":
print("Running boolean_wf a few times...")
for index in range(0, 5):
Expand All @@ -177,12 +135,10 @@ def boolean_input_wf(boolean_input: bool) -> int:
)


# %% [markdown]
# ## Nested conditionals
# Nested conditionals
#
# You can nest conditional sections arbitrarily inside other conditional sections.
# However, these nested sections can only be in the `then` part of a `conditional` block.
# %%
@workflow
def nested_conditions(radius: float) -> float:
return (
Expand All @@ -208,12 +164,7 @@ def nested_conditions(radius: float) -> float:
print(f"nested_conditions(0.4): {nested_conditions(radius=0.4)}")


# %% [markdown]
# ## Using the output of a task in a conditional
#
# Let's write a fun workflow that triggers the `calculate_circle_circumference` task in the event of a "heads" outcome,
# and alternatively, runs the `calculate_circle_area` task in the event of a "tail" outcome.
# %%
# Using the output of a task in a conditional
@workflow
def consume_task_output(radius: float, seed: int = 5) -> float:
is_heads = coin_toss(seed=seed)
Expand All @@ -226,9 +177,7 @@ def consume_task_output(radius: float, seed: int = 5) -> float:
)


# %% [markdown]
# You can run the workflow locally as follows:
# %%
# Run the workflow locally
if __name__ == "__main__":
default_seed_output = consume_task_output(radius=0.4)
print(
Expand All @@ -237,50 +186,3 @@ def consume_task_output(radius: float, seed: int = 5) -> float:

custom_seed_output = consume_task_output(radius=0.4, seed=7)
print(f"Executing consume_task_output(0.4, seed=7). Expected output: calculate_circle_area => {custom_seed_output}")

# %% [markdown]
# ## Run the example on the Flyte cluster
#
# To run the provided workflows on the Flyte cluster, use the following commands:
#
# ```
# pyflyte run --remote \
# https://raw.githubusercontent.com/flyteorg/flytesnacks/master/examples/advanced_composition/advanced_composition/conditional.py \
# shape_properties --radius 3.0
# ```
#
# ```
# pyflyte run --remote \
# https://raw.githubusercontent.com/flyteorg/flytesnacks/master/examples/advanced_composition/advanced_composition/conditional.py \
# shape_properties_with_multiple_branches --radius 11.0
# ```
#
# ```
# pyflyte run --remote \
# https://raw.githubusercontent.com/flyteorg/flytesnacks/master/examples/advanced_composition/advanced_composition/conditional.py \
# shape_properties_accept_conditional_output --radius 0.5
# ```
#
# ```
# pyflyte run --remote \
# https://raw.githubusercontent.com/flyteorg/flytesnacks/master/examples/advanced_composition/advanced_composition/conditional.py \
# boolean_wf
# ```
#
# ```
# pyflyte run --remote \
# https://raw.githubusercontent.com/flyteorg/flytesnacks/master/examples/advanced_composition/advanced_composition/conditional.py \
# boolean_input_wf --boolean_input
# ```
#
# ```
# pyflyte run --remote \
# https://raw.githubusercontent.com/flyteorg/flytesnacks/master/examples/advanced_composition/advanced_composition/conditional.py \
# nested_conditions --radius 0.7
# ```
#
# ```
# pyflyte run --remote \
# https://raw.githubusercontent.com/flyteorg/flytesnacks/master/examples/advanced_composition/advanced_composition/conditional.py \
# consume_task_output --radius 0.4 --seed 7
# ```
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import logging
from functools import partial, wraps

from flytekit import task, workflow

# Create a logger to monitor the execution's progress.
logger = logging.getLogger(__file__)


# Using a single decorator
def log_io(fn):
@wraps(fn)
def wrapper(*args, **kwargs):
logger.info(f"task {fn.__name__} called with args: {args}, kwargs: {kwargs}")
out = fn(*args, **kwargs)
logger.info(f"task {fn.__name__} output: {out}")
return out

return wrapper


# Create a task named `t1` that is decorated with `log_io`.
@task
@log_io
def t1(x: int) -> int:
return x + 1


# Stacking multiple decorators
def validate_output(fn=None, *, floor=0):
@wraps(fn)
def wrapper(*args, **kwargs):
out = fn(*args, **kwargs)
if out <= floor:
raise ValueError(f"output of task {fn.__name__} must be a positive number, found {out}")
return out

if fn is None:
return partial(validate_output, floor=floor)

return wrapper


# Define a function that uses both the logging and validator decorators
@task
@log_io
@validate_output(floor=10)
def t2(x: int) -> int:
return x + 10


# Compose a workflow that calls `t1` and `t2`
@workflow
def decorating_task_wf(x: int) -> int:
return t2(x=t1(x=x))


if __name__ == "__main__":
print(f"Running decorating_task_wf(x=10) {decorating_task_wf(x=10)}")
Loading

0 comments on commit 93133f9

Please sign in to comment.