diff --git a/docs/getting_started/tasks_and_workflows.md b/docs/getting_started/tasks_and_workflows.md index ef1bb3973..24c6a29c3 100644 --- a/docs/getting_started/tasks_and_workflows.md +++ b/docs/getting_started/tasks_and_workflows.md @@ -240,7 +240,7 @@ workflow_with_subworkflow(num_samples=10, seed=3) ``` ```{important} -Learn more about subworkflows in the {ref}`User Guide `. +Learn more about subworkflows in the {ref}`User Guide `. ``` ### Specifying Dependencies without Passing Data diff --git a/examples/advanced_composition/Dockerfile b/examples/advanced_composition/Dockerfile index c5e6701a7..616f39814 100644 --- a/examples/advanced_composition/Dockerfile +++ b/examples/advanced_composition/Dockerfile @@ -10,19 +10,14 @@ ENV LANG C.UTF-8 ENV LC_ALL C.UTF-8 ENV PYTHONPATH /root -# This is necessary for opencv to work -RUN apt-get update && apt-get install -y libsm6 libxext6 libxrender-dev ffmpeg build-essential curl - -WORKDIR /root +RUN apt-get update && apt-get install -y build-essential curl # Virtual environment ENV VENV /opt/venv RUN python3 -m venv ${VENV} ENV PATH="${VENV}/bin:$PATH" -# Install Python dependencies -COPY requirements.txt /root -RUN pip install -r /root/requirements.txt +RUN pip install flytekit==1.10.2 # Copy the actual code COPY . /root diff --git a/examples/advanced_composition/README.md b/examples/advanced_composition/README.md index 9a523bf27..1e4c34202 100644 --- a/examples/advanced_composition/README.md +++ b/examples/advanced_composition/README.md @@ -2,17 +2,16 @@ # Advanced Composition -This section of the user guide introduces the advanced features of the flytekit Python SDK. +This section of the user guide introduces the advanced features of the Flytekit Python SDK. These examples cover more complex aspects of Flyte, including conditions, subworkflows, dynamic workflows, map tasks, gate nodes and more. ```{auto-examples-toc} -conditions +conditional chain_entities -subworkflows -dynamics +subworkflow +dynamic_workflow map_task -merge_sort eager_workflows decorating_tasks decorating_workflows diff --git a/examples/advanced_composition/advanced_composition/chain_entities.py b/examples/advanced_composition/advanced_composition/chain_entities.py index 30533bcdb..890c489a1 100644 --- a/examples/advanced_composition/advanced_composition/chain_entities.py +++ b/examples/advanced_composition/advanced_composition/chain_entities.py @@ -1,41 +1,40 @@ # %% [markdown] # (chain_flyte_entities)= # -# # Chain Flyte Entities +# # Chaining Flyte Entities # # ```{eval-rst} # .. tags:: Basic # ``` # -# flytekit provides a mechanism to chain Flyte entities using the `>>` operator. +# Flytekit offers a mechanism for chaining Flyte entities using the `>>` operator. +# This is particularly valuable when chaining tasks and subworkflows without the need for data flow between the entities. # # ## Tasks # -# Let's enforce an order for `t1()` to happen after `t0()`, and for `t2()` to happen after `t1()`. -# -# Import the necessary dependencies. +# Let's establish a sequence where `t1()` occurs after `t0()`, and `t2()` follows `t1()`. # %% from flytekit import task, workflow @task def t2(): - pass + print("Running t2") + return @task def t1(): - pass + print("Running t1") + return @task def t0(): - pass + print("Running t0") + return -# %% [markdown] -# We want to enforce an order here: `t0()` followed by `t1()` followed by `t2()`. -# %% @workflow def chain_tasks_wf(): t2_promise = t2() @@ -47,9 +46,10 @@ def chain_tasks_wf(): # %% [markdown] -# ## Chain SubWorkflows +# (chain_subworkflow)= +# ## Sub workflows # -# Similar to tasks, you can chain {ref}`subworkflows `. +# Just like tasks, you can chain {ref}`subworkflows `. # %% @workflow def sub_workflow_1(): @@ -61,9 +61,6 @@ def sub_workflow_0(): t0() -# %% [markdown] -# Use `>>` to chain the subworkflows. -# %% @workflow def chain_workflows_wf(): sub_wf1 = sub_workflow_1() @@ -73,10 +70,21 @@ def chain_workflows_wf(): # %% [markdown] -# Run the workflows locally. +# To run the provided workflows on the Flyte cluster, use the following commands: # -# %% -if __name__ == "__main__": - print(f"Running {__file__} main...") - print(f"Running chain_tasks_wf()... {chain_tasks_wf()}") - print(f"Running chain_workflows_wf()... {chain_workflows_wf()}") +# ``` +# pyflyte run --remote \ +# https://raw.githubusercontent.com/flyteorg/flytesnacks/master/examples/advanced_composition/advanced_composition/chain_entities.py \ +# chain_tasks_wf +# ``` +# +# ``` +# pyflyte run --remote \ +# https://raw.githubusercontent.com/flyteorg/flytesnacks/master/examples/advanced_composition/advanced_composition/chain_entities.py \ +# chain_workflows_wf +# ``` +# +# :::{note} +# Chaining tasks and subworkflows is not supported in local environments. +# Follow the progress of this issue [here](https://github.com/flyteorg/flyte/issues/4080). +# ::: diff --git a/examples/advanced_composition/advanced_composition/checkpoint.py b/examples/advanced_composition/advanced_composition/checkpoint.py index 13f8ce561..be825249d 100644 --- a/examples/advanced_composition/advanced_composition/checkpoint.py +++ b/examples/advanced_composition/advanced_composition/checkpoint.py @@ -5,47 +5,48 @@ # .. tags:: MachineLearning, Intermediate # ``` # -# :::{note} -# This feature is available from Flytekit version 0.30.0b6+ and needs a Flyte backend version of at least 0.19.0+. -# ::: +# A checkpoint in Flyte serves to recover a task from a previous failure by preserving the task's state before the failure +# and resuming from the latest recorded state. # -# A checkpoint recovers a task from a previous failure by recording the state of a task before the failure and -# resuming from the latest recorded state. +# ## Why intra-task checkpoints? # -# ## Why Intra-task Checkpoints? +# The inherent design of Flyte, being a workflow engine, allows users to break down operations, programs or ideas +# into smaller tasks within workflows. In the event of a task failure, the workflow doesn't need to rerun the +# previously completed tasks. Instead, it can retry the specific task that encountered an issue. +# Once the problematic task succeeds, it won't be rerun. Consequently, the natural boundaries between tasks act as implicit checkpoints. # -# Flyte, at its core, is a workflow engine. Workflows provide a way to break up an operation/program/idea -# logically into smaller tasks. If a task fails, the workflow does not need to run the previously completed tasks. It can -# simply retry the task that failed. Eventually, when the task succeeds, it will not run again. Thus, task boundaries -# naturally serve as checkpoints. +# However, there are scenarios where breaking a task into smaller tasks is either challenging or undesirable due to the associated overhead. +# This is especially true when running a substantial computation in a tight loop. +# In such cases, users may consider splitting each loop iteration into individual tasks using dynamic workflows. +# Yet, the overhead of spawning new tasks, recording intermediate results, and reconstructing the state can incur additional expenses. # -# There are cases where it is not easy or desirable to break a task into smaller tasks, because running a task -# adds to the overhead. This is true when running a large computation in a tight-loop. In such cases, users can -# split each loop iteration into its own task using {ref}`dynamic workflows `, but the overhead of spawning new tasks, recording -# intermediate results, and reconstituting the state can be expensive. +# ### Use case: Model training # -# ### Model-training Use Case +# An exemplary scenario illustrating the utility of intra-task checkpointing is during model training. +# In situations where executing multiple epochs or iterations with the same dataset might be time-consuming, +# setting task boundaries can incur a high bootstrap time and be costly. # -# An example of this case is model training. Running multiple epochs or different iterations with the same -# dataset can take a long time, but the bootstrap time may be high and creating task boundaries can be expensive. +# Flyte addresses this challenge by providing a mechanism to checkpoint progress within a task execution, +# saving it as a file or set of files. In the event of a failure, the checkpoint file can be re-read to +# resume most of the state without rerunning the entire task. +# This feature opens up possibilities to leverage alternate, more cost-effective compute systems, +# such as [AWS spot instances](https://aws.amazon.com/ec2/spot/), +# [GCP pre-emptible instances](https://cloud.google.com/compute/docs/instances/preemptible) and others. # -# To tackle this, Flyte offers a way to checkpoint progress within a task execution as a file or a set of files. These -# checkpoints can be written synchronously or asynchronously. In case of failure, the checkpoint file can be re-read to resume -# most of the state without re-running the entire task. This opens up the opportunity to use alternate compute systems with -# lower guarantees like [AWS Spot Instances](https://aws.amazon.com/ec2/spot/), [GCP Pre-emptible Instances](https://cloud.google.com/compute/docs/instances/preemptible), etc. +# These instances offer great performance at significantly lower price points compared to their on-demand or reserved counterparts. +# This becomes feasible when tasks are constructed in a fault-tolerant manner. +# For tasks running within a short duration, e.g., less than 10 minutes, the likelihood of failure is negligible, +# and task-boundary-based recovery provides substantial fault tolerance for successful completion. # -# These instances offer great performance at much lower price-points as compared to their on-demand or reserved alternatives. -# This is possible if you construct the tasks in a fault-tolerant manner. In most cases, when the task runs for a short duration, -# e.g., less than 10 minutes, the potential of failure is insignificant and task-boundary-based recovery offers -# significant fault-tolerance to ensure successful completion. +# However, as the task execution time increases, the cost of re-running it also increases, +# reducing the chances of successful completion. This is precisely where Flyte's intra-task checkpointing proves to be highly beneficial. # -# But as the time for a task increases, the cost of re-running it increases, and reduces the chances of successful -# completion. This is where Flyte's intra-task checkpointing truly shines. +# Here's an example illustrating how to develop tasks that leverage intra-task checkpointing. +# It's important to note that Flyte currently offers the low-level API for checkpointing. +# Future integrations aim to incorporate higher-level checkpointing APIs from popular training frameworks +# like Keras, PyTorch, Scikit-learn, and big-data frameworks such as Spark and Flink, enhancing their fault-tolerance capabilities. # -# Let's look at an example of how to develop tasks which utilize intra-task checkpointing. It only provides the low-level API, though. We intend to integrate -# higher-level checkpointing APIs available in popular training frameworks like Keras, Pytorch, Scikit-learn, and -# big-data frameworks like Spark and Flink to supercharge their fault-tolerance. - +# To begin, import the necessary libraries and set the number of task retries to `3`. # %% from flytekit import current_context, task, workflow from flytekit.exceptions.user import FlyteRecoverableException @@ -54,50 +55,58 @@ # %% [markdown] -# This task shows how checkpoints can help resume execution in case of a failure. This is an example task and shows the API for -# the checkpointer. The checkpoint system exposes other APIs. For a detailed understanding, refer to the [checkpointer code](https://github.com/flyteorg/flytekit/blob/master/flytekit/core/checkpointer.py). -# -# The goal of this method is to loop for exactly n_iterations, checkpointing state and recovering from simualted failures. +# We define a task to iterate precisely `n_iterations`, checkpoint its state, and recover from simulated failures. # %% @task(retries=RETRIES) def use_checkpoint(n_iterations: int) -> int: cp = current_context().checkpoint prev = cp.read() + start = 0 if prev: start = int(prev.decode()) - # create a failure interval so we can create failures for across 'n' iterations and then succeed after - # configured retries + # Create a failure interval to simulate failures across 'n' iterations and then succeed after configured retries failure_interval = n_iterations // RETRIES - i = 0 - for i in range(start, n_iterations): - # simulate a deterministic failure, for demonstration. We want to show how it eventually completes within - # the given retries - if i > start and i % failure_interval == 0: - raise FlyteRecoverableException(f"Failed at iteration {i}, failure_interval {failure_interval}") - # save progress state. It is also entirely possible save state every few intervals. - cp.write(f"{i + 1}".encode()) - - return i + index = 0 + for index in range(start, n_iterations): + # Simulate a deterministic failure for demonstration. Showcasing how it eventually completes within the given retries + if index > start and index % failure_interval == 0: + raise FlyteRecoverableException(f"Failed at iteration {index}, failure_interval {failure_interval}.") + # Save progress state. It is also entirely possible to save state every few intervals + cp.write(f"{index + 1}".encode()) + return index # %% [markdown] -# The workflow here simply calls the task. The task itself -# will be retried for the {ref}`FlyteRecoverableException `. +# The checkpoint system offers additional APIs, documented in the code accessible at +# [checkpointer code](https://github.com/flyteorg/flytekit/blob/master/flytekit/core/checkpointer.py). +# +# Create a workflow that invokes the task. +# The task will automatically undergo retries in the event of a {ref}`FlyteRecoverableException `. # %% @workflow -def example(n_iterations: int) -> int: +def checkpointing_example(n_iterations: int) -> int: return use_checkpoint(n_iterations=n_iterations) # %% [markdown] -# The checkpoint is stored locally, but it is not used since retries are not supported. -# +# The local checkpoint is not utilized here because retries are not supported. # %% if __name__ == "__main__": try: - example(n_iterations=10) + checkpointing_example(n_iterations=10) except RuntimeError as e: # noqa : F841 - # no retries are performed, so an exception is expected when run locally. + # Since no retries are performed, an exception is expected when run locally pass + +# %% [markdown] +# ## Run the example on the Flyte cluster +# +# To run the provided workflow on the Flyte cluster, use the following command: +# +# ``` +# pyflyte run --remote \ +# https://raw.githubusercontent.com/flyteorg/flytesnacks/master/examples/advanced_composition/advanced_composition/checkpoint.py \ +# checkpointing_example --n_iterations 10 +# ``` diff --git a/examples/advanced_composition/advanced_composition/conditional.py b/examples/advanced_composition/advanced_composition/conditional.py new file mode 100644 index 000000000..b0c685114 --- /dev/null +++ b/examples/advanced_composition/advanced_composition/conditional.py @@ -0,0 +1,286 @@ +# %% [markdown] +# (conditional)= +# +# # Conditional +# +# ```{eval-rst} +# .. tags:: Intermediate +# ``` +# +# Flytekit elevates conditions to a first-class construct named `conditional`, providing a powerful mechanism for selectively +# executing branches in a workflow. Conditions leverage static or dynamic data generated by tasks or +# received as workflow inputs. While conditions are highly performant in their evaluation, +# it's important to note that they are restricted to specific binary and logical operators +# and are applicable only to primitive values. +# +# To begin, import the necessary libraries. +# %% +import random + +from flytekit import conditional, task, workflow + + +# %% [markdown] +# ## Simple branch +# +# In this example, we introduce two tasks, `calculate_circle_circumference` and +# `calculate_circle_area`. The workflow dynamically chooses between these tasks based on whether the input +# falls within the fraction range (0-1) or not. +# %% +@task +def calculate_circle_circumference(radius: float) -> float: + return 2 * 3.14 * radius # Task to calculate the circumference of a circle + + +@task +def calculate_circle_area(radius: float) -> float: + return 3.14 * radius * radius # Task to calculate the area of a circle + + +@workflow +def shape_properties(radius: float) -> float: + return ( + conditional("shape_properties") + .if_((radius >= 0.1) & (radius < 1.0)) + .then(calculate_circle_circumference(radius=radius)) + .else_() + .then(calculate_circle_area(radius=radius)) + ) + + +if __name__ == "__main__": + radius_small = 0.5 + print(f"Circumference of circle (radius={radius_small}): {shape_properties(radius=radius_small)}") + + radius_large = 3.0 + print(f"Area of circle (radius={radius_large}): {shape_properties(radius=radius_large)}") + + +# %% [markdown] +# ## Multiple branches +# +# We establish an `if` condition with multiple branches, which will result in a failure if none of the conditions is met. +# It's important to note that any `conditional` statement in Flyte is expected to be complete, +# meaning that all possible branches must be accounted for. +# %% +@workflow +def shape_properties_with_multiple_branches(radius: float) -> float: + return ( + conditional("shape_properties_with_multiple_branches") + .if_((radius >= 0.1) & (radius < 1.0)) + .then(calculate_circle_circumference(radius=radius)) + .elif_((radius >= 1.0) & (radius <= 10.0)) + .then(calculate_circle_area(radius=radius)) + .else_() + .fail("The input must be within the range of 0 to 10.") + ) + + +# %% [markdown] +# :::{note} +# Take note of the usage of bitwise operators (`&`). Due to Python's PEP-335, +# the logical `and`, `or` and `not` operators cannot be overloaded. +# Flytekit employs bitwise `&` and `|` as equivalents for logical `and` and `or` operators, +# a convention also observed in other libraries. +# ::: +# +# ## Consuming the output of a conditional +# Here, we write a task that consumes the output returned by a `conditional`. +# %% +@workflow +def shape_properties_accept_conditional_output(radius: float) -> float: + result = ( + conditional("shape_properties_accept_conditional_output") + .if_((radius >= 0.1) & (radius < 1.0)) + .then(calculate_circle_circumference(radius=radius)) + .elif_((radius >= 1.0) & (radius <= 10.0)) + .then(calculate_circle_area(radius=radius)) + .else_() + .fail("The input must exist between 0 and 10.") + ) + return calculate_circle_area(radius=result) + + +if __name__ == "__main__": + print(f"Circumference of circle x Area of circle (radius={radius_small}): {shape_properties(radius=5.0)}") + + +# %% [markdown] +# ## Using the output of a previous task in a conditional +# +# You can check if a boolean returned from the previous task is `True`, +# but unary operations are not supported directly. Instead, use the `is_true`, +# `is_false` and `is_none` methods on the result. +# %% +@task +def coin_toss(seed: int) -> bool: + """ + Mimic a condition to verify the successful execution of an operation + """ + r = random.Random(seed) + if r.random() < 0.5: + return True + return False + + +@task +def failed() -> int: + """ + Mimic a task that handles failure + """ + return -1 + + +@task +def success() -> int: + """ + Mimic a task that handles success + """ + return 0 + + +@workflow +def boolean_wf(seed: int = 5) -> int: + result = coin_toss(seed=seed) + return conditional("coin_toss").if_(result.is_true()).then(success()).else_().then(failed()) + + +# %% [markdown] +# :::{note} +# *How do output values acquire these methods?* In a workflow, direct access to outputs is not permitted. +# Inputs and outputs are automatically encapsulated in a special object known as {py:class}`flytekit.extend.Promise`. +# ::: +# +# ## Using boolean workflow inputs in a conditional +# You can directly pass a boolean to a workflow. +# %% +@workflow +def boolean_input_wf(boolean_input: bool) -> int: + return conditional("boolean_input_conditional").if_(boolean_input.is_true()).then(success()).else_().then(failed()) + + +# %% [markdown] +# :::{note} +# Observe that the passed boolean possesses a method called `is_true`. +# This boolean resides within the workflow context and is encapsulated in a specialized Flytekit object. +# This special object enables it to exhibit additional behavior. +# ::: +# +# You can run the workflows locally as follows: +# %% +if __name__ == "__main__": + print("Running boolean_wf a few times...") + for index in range(0, 5): + print(f"The output generated by boolean_wf = {boolean_wf(seed=index)}") + print( + f"Boolean input: {True if index < 2 else False}; workflow output: {boolean_input_wf(boolean_input=True if index < 2 else False)}" + ) + + +# %% [markdown] +# ## Nested conditionals +# +# You can nest conditional sections arbitrarily inside other conditional sections. +# However, these nested sections can only be in the `then` part of a `conditional` block. +# %% +@workflow +def nested_conditions(radius: float) -> float: + return ( + conditional("nested_conditions") + .if_((radius >= 0.1) & (radius < 1.0)) + .then( + conditional("inner_nested_conditions") + .if_(radius < 0.5) + .then(calculate_circle_circumference(radius=radius)) + .elif_((radius >= 0.5) & (radius < 0.9)) + .then(calculate_circle_area(radius=radius)) + .else_() + .fail("0.9 is an outlier.") + ) + .elif_((radius >= 1.0) & (radius <= 10.0)) + .then(calculate_circle_area(radius=radius)) + .else_() + .fail("The input must be within the range of 0 to 10.") + ) + + +if __name__ == "__main__": + print(f"nested_conditions(0.4): {nested_conditions(radius=0.4)}") + + +# %% [markdown] +# ## Using the output of a task in a conditional +# +# Let's write a fun workflow that triggers the `calculate_circle_circumference` task in the event of a "heads" outcome, +# and alternatively, runs the `calculate_circle_area` task in the event of a "tail" outcome. +# %% +@workflow +def consume_task_output(radius: float, seed: int = 5) -> float: + is_heads = coin_toss(seed=seed) + return ( + conditional("double_or_square") + .if_(is_heads.is_true()) + .then(calculate_circle_circumference(radius=radius)) + .else_() + .then(calculate_circle_area(radius=radius)) + ) + + +# %% [markdown] +# You can run the workflow locally as follows: +# %% +if __name__ == "__main__": + default_seed_output = consume_task_output(radius=0.4) + print( + f"Executing consume_task_output(0.4) with default seed=5. Expected output: calculate_circle_circumference => {default_seed_output}" + ) + + custom_seed_output = consume_task_output(radius=0.4, seed=7) + print(f"Executing consume_task_output(0.4, seed=7). Expected output: calculate_circle_area => {custom_seed_output}") + +# %% [markdown] +# ## Run the example on the Flyte cluster +# +# To run the provided workflows on the Flyte cluster, use the following commands: +# +# ``` +# pyflyte run --remote \ +# https://raw.githubusercontent.com/flyteorg/flytesnacks/master/examples/advanced_composition/advanced_composition/conditions.py \ +# shape_properties --radius 3.0 +# ``` +# +# ``` +# pyflyte run --remote \ +# https://raw.githubusercontent.com/flyteorg/flytesnacks/master/examples/advanced_composition/advanced_composition/conditions.py \ +# shape_properties_with_multiple_branches --radius 11.0 +# ``` +# +# ``` +# pyflyte run --remote \ +# https://raw.githubusercontent.com/flyteorg/flytesnacks/master/examples/advanced_composition/advanced_composition/conditions.py \ +# shape_properties_accept_conditional_output --radius 0.5 +# ``` +# +# ``` +# pyflyte run --remote \ +# https://raw.githubusercontent.com/flyteorg/flytesnacks/master/examples/advanced_composition/advanced_composition/conditions.py \ +# boolean_wf +# ``` +# +# ``` +# pyflyte run --remote \ +# https://raw.githubusercontent.com/flyteorg/flytesnacks/master/examples/advanced_composition/advanced_composition/conditions.py \ +# boolean_input_wf --boolean_input +# ``` +# +# ``` +# pyflyte run --remote \ +# https://raw.githubusercontent.com/flyteorg/flytesnacks/master/examples/advanced_composition/advanced_composition/conditions.py \ +# nested_conditions --radius 0.7 +# ``` +# +# ``` +# pyflyte run --remote \ +# https://raw.githubusercontent.com/flyteorg/flytesnacks/master/examples/advanced_composition/advanced_composition/conditions.py \ +# consume_task_output --radius 0.4 --seed 7 +# ``` diff --git a/examples/advanced_composition/advanced_composition/conditions.py b/examples/advanced_composition/advanced_composition/conditions.py deleted file mode 100644 index 07df3864d..000000000 --- a/examples/advanced_composition/advanced_composition/conditions.py +++ /dev/null @@ -1,285 +0,0 @@ -# %% [markdown] -# (conditional)= -# -# # Conditions -# -# ```{eval-rst} -# .. tags:: Intermediate -# ``` -# -# Flytekit supports conditions as a first class construct in the language. Conditions offer a way to selectively execute -# branches of a workflow based on static or dynamic data produced by other tasks or come in as workflow inputs. -# Conditions are very performant to be evaluated. However, they are limited to certain binary and logical operators and can -# only be performed on primitive values. - -# %% [markdown] -# Import the necessary modules. -# %% -import random - -from flytekit import conditional, task, workflow - - -# %% [markdown] -# ## Conditional with simple branch -# -# In this example, we define two tasks `square` and `double`. Depending on whether the workflow input is a -# fraction (0-1) or not, the respective task is executed. -# %% -@task -def square(n: float) -> float: - """ - Parameters: - n (float): name of the parameter for the task is derived from the name of the input variable, and - the type is automatically mapped to Types.Integer - Return: - float: The label for the output is automatically assigned and the type is deduced from the annotation - """ - return n * n - - -@task -def double(n: float) -> float: - """ - Parameters: - n (float): name of the parameter for the task is derived from the name of the input variable - and the type is mapped to ``Types.Integer`` - Return: - float: The label for the output is auto-assigned and the type is deduced from the annotation - """ - return 2 * n - - -@workflow -def multiplier(my_input: float) -> float: - return ( - conditional("fractions") - .if_((my_input >= 0.1) & (my_input <= 1.0)) - .then(double(n=my_input)) - .else_() - .then(square(n=my_input)) - ) - - -if __name__ == "__main__": - print(f"Output of multiplier(my_input=3.0): {multiplier(my_input=3.0)}") - print(f"Output of multiplier(my_input=0.5): {multiplier(my_input=0.5)}") - - -# %% [markdown] -# ## Conditional with multiple branches -# -# In this example, we define an `if` condition with multiple branches. It fails if none of the conditions is met. Flyte -# expects any `conditional()` statement to be **complete**. This means all possible branches should be handled. -# -# :::{note} -# Notice the use of bitwise (&). Python (PEP-335) doesn't allow overloading of the logical `and`, `or`, and `not` operators. Flytekit uses bitwise `&` and `|` as logical `and` and `or` operators. This is a common practice in other libraries too. -# ::: -# %% -@workflow -def multiplier_2(my_input: float) -> float: - return ( - conditional("fractions") - .if_((my_input > 0.1) & (my_input < 1.0)) - .then(double(n=my_input)) - .elif_((my_input > 1.0) & (my_input <= 10.0)) - .then(square(n=my_input)) - .else_() - .fail("The input must be between 0 and 10") - ) - - -if __name__ == "__main__": - print(f"Output of multiplier_2(my_input=10.0): {multiplier_2(my_input=10.0)}") - - -# %% [markdown] -# ## Consuming the output of a conditional -# -# In this example, we consume the output returned by the `conditional()` in the subsequent task. -# %% -@workflow -def multiplier_3(my_input: float) -> float: - result = ( - conditional("fractions") - .if_((my_input > 0.1) & (my_input < 1.0)) - .then(double(n=my_input)) - .elif_((my_input > 1.0) & (my_input < 10.0)) - .then(square(n=my_input)) - .else_() - .fail("The input must be between 0 and 10") - ) - - # the 'result' will either be the output of `double` or `square`. If none of the conditions is true, - # it gives a failure message. - return double(n=result) - - -if __name__ == "__main__": - print(f"Output of multiplier_3(my_input=5.0): {multiplier_3(my_input=5.0)}") - - -# %% [markdown] -# ## Using the output of a previous task in a conditional -# -# It is possible to test if a boolean returned from the previous task is True. But unary operations are not -# supported. Use the `is_true`, `is_false` or `is_` on the result instead. -# -# :::{note} -# How do output values get these methods? -# In a workflow, no output can be accessed directly. The inputs and outputs are auto-wrapped in a special object called {py:class}`flytekit.extend.Promise`. -# ::: -# -# In this example, we create a biased coin whose seed can be controlled. -# %% -@task -def coin_toss(seed: int) -> bool: - """ - Mimic some condition to check if the operation was successfully executed. - """ - r = random.Random(seed) - if r.random() < 0.5: - return True - return False - - -@task -def failed() -> int: - """ - Mimic a task that handles failure - """ - return -1 - - -@task -def success() -> int: - """ - Mimic a task that handles success - """ - return 0 - - -@workflow -def basic_boolean_wf(seed: int = 5) -> int: - result = coin_toss(seed=seed) - return conditional("test").if_(result.is_true()).then(success()).else_().then(failed()) - - -# %% [markdown] -# ## Using boolean workflow inputs in a conditional -# -# It is possible to pass a boolean directly to a workflow. -# -# :::{note} -# Note that the boolean passed has a method named `is_true`. This boolean is present within -# the workflow context and is wrapped in a Flytekit special object. This special object allows it to have the additional -# behavior. -# ::: -# %% -@workflow -def bool_input_wf(b: bool) -> int: - return conditional("test").if_(b.is_true()).then(success()).else_().then(failed()) - - -# %% [markdown] -# The workflow can be executed locally. -# %% -if __name__ == "__main__": - print("Running basic_boolean_wf a few times") - for i in range(0, 5): - print(f"Basic boolean wf output {basic_boolean_wf()}") - print(f"Boolean input {True if i < 2 else False}, workflow output {bool_input_wf(b=True if i < 2 else False)}") - - -# %% [markdown] -# ## Nested conditionals -# -# It is possible to arbitrarily nest conditional sections inside other conditional sections. The conditional sections can only be in the -# `then` part of the previous conditional block. -# This example shows how float comparisons can be used to create a multi-level nested workflow. -# %% -@workflow -def nested_conditions(my_input: float) -> float: - return ( - conditional("fractions") - .if_((my_input > 0.1) & (my_input < 1.0)) - .then( - conditional("inner_fractions") - .if_(my_input < 0.5) - .then(double(n=my_input)) - .elif_((my_input > 0.5) & (my_input < 0.7)) - .then(square(n=my_input)) - .else_() - .fail("Only <0.7 allowed") - ) - .elif_((my_input > 1.0) & (my_input < 10.0)) - .then(square(n=my_input)) - .else_() - .then(double(n=my_input)) - ) - - -# %% [markdown] -# The nested conditionals can be executed locally. -# %% -if __name__ == "__main__": - print(f"nested_conditions(0.4) -> {nested_conditions(my_input=0.4)}") - - -# %% [markdown] -# ## Using the output of a task in the conditional and consuming its output -# -# Finally, we specify an output of an upstream task in the conditional and -# then consume its output in a downstream task, `double`. -# Outputs are computed as a subset of outputs produced by `then` nodes. In this -# example, we call `square()` in one condition and `calc_sum()` in another. -# %% -@task -def calc_sum(a: float, b: float) -> float: - """ - returns the sum of a and b. - """ - return a + b - - -# %% [markdown] -# Altogether, the workflow that consumes outputs from conditionals can be constructed as shown. -# -# :::{tip} -# A useful mental model to consume outputs of conditions is to think of them as ternary operators in programming -# languages. The only difference is that they can be `n`-ary. In Python, this is equivalent to -# -# ```python -# x = 0 if m < 0 else 1 -# ``` -# ::: -# %% -@workflow -def consume_outputs(my_input: float, seed: int = 5) -> float: - is_heads = coin_toss(seed=seed) - res = ( - conditional("double_or_square") - .if_(is_heads.is_true()) - .then(square(n=my_input)) - .else_() - .then(calc_sum(a=my_input, b=my_input)) - ) - - # Regardless of the result, call ``double`` before - # the variable `res` is returned. In this case, ``res`` stores the value of the ``square`` or ``double`` of the variable `my_input` - return double(n=res) - - -# %% [markdown] -# The workflow can be executed locally. -# -# %% -if __name__ == "__main__": - print( - "consume_outputs(0.4) with default seed=5. This should " - f"return output of calc_sum => {consume_outputs(my_input=0.4)}" - ) - print( - "consume_outputs(0.4, seed=7), this should return " - f"output of square => {consume_outputs(my_input=0.4, seed=7)}" - ) diff --git a/examples/advanced_composition/advanced_composition/decorating_tasks.py b/examples/advanced_composition/advanced_composition/decorating_tasks.py index c57c1374a..39d018e08 100644 --- a/examples/advanced_composition/advanced_composition/decorating_tasks.py +++ b/examples/advanced_composition/advanced_composition/decorating_tasks.py @@ -96,15 +96,24 @@ def t2(x: int) -> int: # Finally, we compose a workflow that calls `t1` and `t2`. # %% @workflow -def wf(x: int) -> int: +def decorating_task_wf(x: int) -> int: return t2(x=t1(x=x)) if __name__ == "__main__": - print(f"Running wf(x=10) {wf(x=10)}") - + print(f"Running decorating_task_wf(x=10) {decorating_task_wf(x=10)}") # %% [markdown] +# ## Run the example on the Flyte cluster +# +# To run the provided workflow on the Flyte cluster, use the following command: +# +# ``` +# pyflyte run --remote \ +# https://raw.githubusercontent.com/flyteorg/flytesnacks/master/examples/advanced_composition/advanced_composition/decorating_tasks.py \ +# decorating_task_wf --x 10 +# ``` +# # In this example, you learned how to modify the behavior of tasks via function decorators using the built-in # {py:func}`~functools.wraps` decorator pattern. To learn more about how to extend Flyte at a deeper level, for # example creating custom types, custom tasks or backend plugins, diff --git a/examples/advanced_composition/advanced_composition/decorating_workflows.py b/examples/advanced_composition/advanced_composition/decorating_workflows.py index 7c6442843..9958503c8 100644 --- a/examples/advanced_composition/advanced_composition/decorating_workflows.py +++ b/examples/advanced_composition/advanced_composition/decorating_workflows.py @@ -13,13 +13,14 @@ # tasks, we need to do a little extra work to make sure that the DAG underlying the workflow executes tasks in the # correct order. # -# ## Setup-Teardown Pattern +# ## Setup-teardown pattern # # The main use case of decorating `@workflow`-decorated functions is to establish a setup-teardown pattern to execute task # before and after your main workflow logic. This is useful when integrating with other external services # like [wandb](https://wandb.ai/site) or [clearml](https://clear.ml/), which enable you to track metrics of model # training runs. - +# +# To begin, import the necessary libraries. # %% from functools import partial, wraps from unittest.mock import MagicMock @@ -29,10 +30,9 @@ from flytekit.core.node_creation import create_node # %% [markdown] -# First, let's define the tasks that we want for setup and teardown. In this example, we'll use the +# Let's define the tasks we need for setup and teardown. In this example, we use the # {py:class}`unittest.mock.MagicMock` class to create a fake external service that we want to initialize at the # beginning of our workflow and finish at the end. - # %% external_service = MagicMock() @@ -54,11 +54,9 @@ def teardown(): # if you need to link Flyte with the external service so that you reference the same unique identifier in both the # external service and Flyte. # -# ## Workflow Decorator +# ## Workflow decorator # -# Next we create the decorator that we'll use to wrap our workflow function. - - +# We create a decorator that we want to use to wrap our workflow function. # %% def setup_teardown(fn=None, *, before, after): @wraps(fn) @@ -107,15 +105,12 @@ def wrapper(*args, **kwargs): # 3. When `fn` is called, under the hood Flytekit creates all the nodes associated with the workflow function body # 4. The code within the `if ctx.compilation_state is not None:` conditional is executed at compile time, which # is where we extract the first and last nodes associated with the workflow function body at index `1` and `-2`. -# 5. Finally, we use the `>>` right shift operator to ensure that `before_node` executes before the +# 5. The `>>` right shift operator ensures that `before_node` executes before the # first node and `after_node` executes after the last node of the main workflow function body. - -# %% [markdown] +# # ## Defining the DAG # -# Now let's define two tasks that will constitute the workflow - - +# We define two tasks that will constitute the workflow. # %% @task def t1(x: float) -> float: @@ -129,26 +124,28 @@ def t2(x: float) -> float: # %% [markdown] # And then create our decorated workflow: - - # %% @workflow @setup_teardown(before=setup, after=teardown) -def wf(x: float) -> float: +def decorating_workflow(x: float) -> float: return t2(x=t1(x=x)) if __name__ == "__main__": - print(wf(x=10.0)) + print(decorating_workflow(x=10.0)) # %% [markdown] -# In this example, you learned how to modify the behavior of a workflow by defining a `setup_teardown` decorator -# that can be applied to any workflow in your project. This is useful when integrating with other external services -# like [wandb](https://wandb.ai/site) or [clearml](https://clear.ml/), which enable you to track metrics of model -# training runs. +# ## Run the example on the Flyte cluster +# +# To run the provided workflow on the Flyte cluster, use the following command: +# +# ``` +# pyflyte run --remote \ +# https://raw.githubusercontent.com/flyteorg/flytesnacks/master/examples/advanced_composition/advanced_composition/decorating_workflows.py \ +# decorating_workflow --x 10.0 +# ``` # # To define workflows imperatively, refer to {ref}`this example `, -# and to learn more about how to extend Flyte at a deeper level, for example creating custom types, custom tasks, or +# and to learn more about how to extend Flyte at a deeper level, for example creating custom types, custom tasks or # backend plugins, see {ref}`Extending Flyte `. -# diff --git a/examples/advanced_composition/advanced_composition/dynamic_workflow.py b/examples/advanced_composition/advanced_composition/dynamic_workflow.py new file mode 100644 index 000000000..6e5b5c0ad --- /dev/null +++ b/examples/advanced_composition/advanced_composition/dynamic_workflow.py @@ -0,0 +1,256 @@ +# %% [markdown] +# (dynamic_workflow)= +# +# # Dynamic Workflows +# +# ```{eval-rst} +# .. tags:: Intermediate +# ``` +# +# A workflow whose directed acyclic graph (DAG) is computed at run-time is a {py:func}`~flytekit.dynamic` workflow. +# The tasks in a dynamic workflow are executed at runtime using dynamic inputs. +# This type of workflow shares similarities with the {py:func}`~flytekit.workflow`, as it employs a Python-esque DSL +# to declare dependencies between the tasks or define new workflows. A key distinction lies in the dynamic workflow being assessed at runtime. +# This means that the inputs are initially materialized and forwarded to dynamic workflow, resembling the behavior of a task. +# However, the return value from a dynamic workflow is a {py:class}`~flytekit.extend.Promise` object, +# which can be materialized by the subsequent tasks. +# +# Think of a dynamic workflow as a combination of a task and a workflow. +# It is used to dynamically decide the parameters of a workflow at runtime. +# It is both compiled and executed at run-time. You can define a dynamic workflow using the `@dynamic` decorator. +# +# Within the `@dynamic` context, each invocation of a {py:func}`~flytekit.task` or a derivative of +# {py:class}`~flytekit.core.base_task.Task` class leads to deferred evaluation using a promise, +# rather than the immediate materialization of the actual value. While nesting other `@dynamic` and +# `@workflow` constructs within this task is possible, direct interaction with the outputs of a task/workflow is limited, +# as they are lazily evaluated. If interaction with the outputs is desired, it is recommended to separate the +# logic in a dynamic workflow and create a new task to read and resolve the outputs. +# +# Dynamic workflows become essential when you require: +# +# - Modifying the logic of the code at runtime +# - Changing or deciding on feature extraction parameters on-the-go +# - Building AutoML pipelines +# - Tuning hyperparameters during execution +# +# This example utilizes dynamic workflow to count the common characters between any two strings. +# +# To begin, we import the required libraries. +# %% +from flytekit import dynamic, task, workflow + + +# %% [markdown] +# We define a task that returns the index of a character, where A-Z/a-z is equivalent to 0-25. +# %% +@task +def return_index(character: str) -> int: + if character.islower(): + return ord(character) - ord("a") + else: + return ord(character) - ord("A") + + +# %% [markdown] +# We also create a task that prepares a list of 26 characters by populating the frequency of each character. +# %% +@task +def update_list(freq_list: list[int], list_index: int) -> list[int]: + freq_list[list_index] += 1 + return freq_list + + +# %% [markdown] +# We define a task to calculate the number of common characters between the two strings. +# %% +@task +def derive_count(freq1: list[int], freq2: list[int]) -> int: + count = 0 + for i in range(26): + count += min(freq1[i], freq2[i]) + return count + + +# %% [markdown] +# We define a dynamic workflow to accomplish the following: +# +# 1. Initialize an empty 26-character list to be passed to the `update_list` task +# 2. Iterate through each character of the first string (`s1`) and populate the frequency list +# 3. Iterate through each character of the second string (`s2`) and populate the frequency list +# 4. Determine the number of common characters by comparing the two frequency lists +# +# The looping process is contingent on the number of characters in both strings, which is unknown until runtime. +# %% +@dynamic +def count_characters(s1: str, s2: str) -> int: + # s1 and s2 should be accessible + + # Initialize empty lists with 26 slots each, corresponding to every alphabet (lower and upper case) + freq1 = [0] * 26 + freq2 = [0] * 26 + + # Loop through characters in s1 + for i in range(len(s1)): + # Calculate the index for the current character in the alphabet + index = return_index(character=s1[i]) + # Update the frequency list for s1 + freq1 = update_list(freq_list=freq1, list_index=index) + # index and freq1 are not accessible as they are promises + + # looping through the string s2 + for i in range(len(s2)): + # Calculate the index for the current character in the alphabet + index = return_index(character=s2[i]) + # Update the frequency list for s2 + freq2 = update_list(freq_list=freq2, list_index=index) + # index and freq2 are not accessible as they are promises + + # Count the common characters between s1 and s2 + return derive_count(freq1=freq1, freq2=freq2) + + +# %% [markdown] +# A dynamic workflow is modeled as a task in the backend, +# but the body of the function is executed to produce a workflow at run-time. +# In both dynamic and static workflows, the output of tasks are promise objects. +# +# Propeller executes the dynamic task within its Kubernetes pod, resulting in a compiled DAG, which is then accessible in the console. +# It utilizes the information acquired during the dynamic task's execution to schedule and execute each node within the dynamic task. +# Visualization of the dynamic workflow's graph in the UI becomes available only after the dynamic task has completed its execution. +# +# When a dynamic task is executed, it generates the entire workflow as its output, termed the *futures file*. +# This nomenclature reflects the anticipation that the workflow is yet to be executed, and all subsequent outputs are considered futures. +# +# :::{note} +# Local execution works when a `@dynamic` decorator is used because Flytekit treats it as a task that runs with native Python inputs. +# ::: +# +# Define a workflow that triggers the dynamic workflow. +# %% +@workflow +def dynamic_wf(s1: str, s2: str) -> int: + return count_characters(s1=s1, s2=s2) + + +# %% [markdown] +# You can run the workflow locally as follows: +# %% +if __name__ == "__main__": + print(dynamic_wf(s1="Pear", s2="Earth")) + + +# %% [markdown] +# ## Why use Dynamic Workflows? +# +# ### Flexibility +# +# Dynamic workflows streamline the process of building pipelines, offering the flexibility to design workflows +# according to the unique requirements of your project. This level of adaptability is not achievable with static workflows. +# +# ### Lower pressure on etcd +# +# The workflow Custom Resource Definition (CRD) and the states associated with static workflows are stored in etcd, +# the Kubernetes database. This database maintains Flyte workflow CRDs as key-value pairs, tracking the status of each node's execution. +# +# However, there is a limitation with etcd — a hard limit on data size, encompassing the workflow and node status sizes. +# Consequently, it's crucial to ensure that static workflows don't excessively consume memory. +# +# In contrast, dynamic workflows offload the workflow specification (including node/task definitions and connections) to the blobstore. +# Still, the statuses of nodes are stored in the workflow CRD within etcd. +# +# Dynamic workflows help alleviate some of the pressure on etcd storage space, providing a solution to mitigate storage constraints. +# +# ## Dynamic workflows vs. map tasks +# +# Dynamic tasks come with overhead for large fan-out tasks as they store metadata for the entire workflow. +# In contrast, {ref}`map tasks ` prove efficient for such extensive fan-out scenarios since they refrain from storing metadata, +# resulting in less noticeable overhead. +# +# (advanced_merge_sort)= +# ## Merge sort +# +# Merge sort is a perfect example to showcase how to seamlessly achieve recursion using dynamic workflows. +# Flyte imposes limitations on the depth of recursion to prevent misuse and potential impacts on the overall stability of the system. +# %% +from typing import Tuple + +from flytekit import conditional, dynamic, task, workflow + + +@task +def split(numbers: list[int]) -> Tuple[list[int], list[int], int, int]: + return ( + numbers[0 : int(len(numbers) / 2)], + numbers[int(len(numbers) / 2) :], + int(len(numbers) / 2), + int(len(numbers)) - int(len(numbers) / 2), + ) + + +@task +def merge(sorted_list1: list[int], sorted_list2: list[int]) -> list[int]: + result = [] + while len(sorted_list1) > 0 and len(sorted_list2) > 0: + # Compare the current element of the first array with the current element of the second array. + # If the element in the first array is smaller, append it to the result and increment the first array index. + # Otherwise, do the same with the second array. + if sorted_list1[0] < sorted_list2[0]: + result.append(sorted_list1.pop(0)) + else: + result.append(sorted_list2.pop(0)) + + # Extend the result with the remaining elements from both arrays + result.extend(sorted_list1) + result.extend(sorted_list2) + + return result + + +@task +def sort_locally(numbers: list[int]) -> list[int]: + return sorted(numbers) + + +@dynamic +def merge_sort_remotely(numbers: list[int], run_local_at_count: int) -> list[int]: + split1, split2, new_count1, new_count2 = split(numbers=numbers) + sorted1 = merge_sort(numbers=split1, numbers_count=new_count1, run_local_at_count=run_local_at_count) + sorted2 = merge_sort(numbers=split2, numbers_count=new_count2, run_local_at_count=run_local_at_count) + return merge(sorted_list1=sorted1, sorted_list2=sorted2) + + +@workflow +def merge_sort(numbers: list[int], numbers_count: int, run_local_at_count: int = 5) -> list[int]: + return ( + conditional("terminal_case") + .if_(numbers_count <= run_local_at_count) + .then(sort_locally(numbers=numbers)) + .else_() + .then(merge_sort_remotely(numbers=numbers, run_local_at_count=run_local_at_count)) + ) + + +# %% [markdown] +# By simply adding the `@dynamic` annotation, the `merge_sort_remotely` function transforms into a plan of execution, +# generating a Flyte workflow with four distinct nodes. These nodes run remotely on potentially different hosts, +# with Flyte ensuring proper data reference passing and maintaining execution order with maximum possible parallelism. +# +# `@dynamic` is essential in this context because the number of times `merge_sort` needs to be triggered is unknown at compile time. +# The dynamic workflow calls a static workflow, which subsequently calls the dynamic workflow again, +# creating a recursive and flexible execution structure. +# +# ## Run the example on the Flyte cluster +# +# To run the provided workflows on the Flyte cluster, you can use the following commands: +# +# ``` +# pyflyte run --remote \ +# https://raw.githubusercontent.com/flyteorg/flytesnacks/master/examples/advanced_composition/advanced_composition/dynamics.py \ +# dynamic_wf --s1 "Pear" --s2 "Earth" +# ``` +# +# ``` +# pyflyte run --remote \ +# https://raw.githubusercontent.com/flyteorg/flytesnacks/master/examples/advanced_composition/advanced_composition/dynamics.py \ +# merge_sort --numbers '[1813, 3105, 3260, 2634, 383, 7037, 3291, 2403, 315, 7164]' --numbers_count 10 +# ``` diff --git a/examples/advanced_composition/advanced_composition/dynamics.py b/examples/advanced_composition/advanced_composition/dynamics.py deleted file mode 100644 index 2e5acf816..000000000 --- a/examples/advanced_composition/advanced_composition/dynamics.py +++ /dev/null @@ -1,216 +0,0 @@ -# %% [markdown] -# (dynamic_workflows)= -# -# # Dynamic Workflows -# -# ```{eval-rst} -# .. tags:: Intermediate -# ``` -# -# ```{image} https://img.shields.io/badge/Blog%20Post-Dynamic%20Workflows-blue?style=for-the-badge -# :alt: Dynamic Workflows Blog Post -# :target: https://flyte.org/blog/dynamic-workflows-in-flyte -# ``` -# -# A workflow is typically static when the directed acyclic graph's (DAG) structure is known at compile-time. -# However, in cases where a run-time parameter (for example, the output of an earlier task) determines the full DAG structure, you can use dynamic workflows by decorating a function with `@dynamic`. -# -# A dynamic workflow is similar to the {py:func}`~flytekit.workflow`, in that it represents a python-esque DSL to -# declare task interactions or new workflows. One significant difference between a regular workflow and dynamic (workflow) is that -# the latter is evaluated at runtime. This means the inputs are first materialized and sent to the actual function, -# as if it were a task. However, the return value from a dynamic workflow is a Promise object instead of an actual value, -# which is fulfilled by evaluating the various tasks invoked in the dynamic workflow. -# -# Think of a dynamic workflow as a parent graph node that spins off new child nodes which would represent a new child graph. -# At runtime, dynamic workflows receive input and create new workflows. These new workflows have graph nodes. -# -# Within the `@dynamic` context (function), every invocation of a {py:func}`~flytekit.task` or a derivative of -# {py:class}`~flytekit.core.base_task.Task` class will result in deferred evaluation using a promise, instead -# of the actual value being materialized. You can also nest other `@dynamic` and `@workflow` constructs within this -# task, but it is not possible to interact with the outputs of a `task/workflow` as they are lazily evaluated. -# If you want to interact with the outputs, break up the logic in dynamic and create a new task to read and resolve the outputs. -# -# Refer to {py:func}`~flytekit.dynamic` for documentation. -# -# Here's a code example that counts the common characters between any two strings. - -# %% [markdown] -# Let's first import all the required libraries. -# %% -import typing - -from flytekit import dynamic, task, workflow - - -# %% [markdown] -# Next, we write a task that returns the index of a character (A-Z/a-z is equivalent to 0 to 25). -# %% -@task -def return_index(character: str) -> int: - """ - Computes the character index (which needs to fit into the 26 characters list)""" - if character.islower(): - return ord(character) - ord("a") - else: - return ord(character) - ord("A") - - -# %% [markdown] -# We now write a task that prepares the 26-character list by populating the frequency of every character. -# %% -@task -def update_list(freq_list: typing.List[int], list_index: int) -> typing.List[int]: - """ - Notes the frequency of characters""" - freq_list[list_index] += 1 - return freq_list - - -# %% [markdown] -# Next we find the number of common characters between the two strings. -# %% -@task -def derive_count(freq1: typing.List[int], freq2: typing.List[int]) -> int: - """ - Derives the number of common characters""" - count = 0 - for i in range(26): - count += min(freq1[i], freq2[i]) - return count - - -# %% [markdown] -# In this step, we perform the following: -# -# 1. Initialize the empty 26-character list to be sent to the `update_list` task -# 2. Loop through every character of the first string (s1) and populate the frequency list -# 3. Loop through every character of the second string (s2) and populate the frequency list -# 4. Derive the number of common characters by comparing the two frequency lists -# -# The looping is dependent on the number of characters of both the strings which aren't known until the run time. If the `@task` decorator is used to encapsulate the calls mentioned above, the compilation will fail very early on due to the absence of the literal values. -# Therefore, `@dynamic` decorator has to be used. -# -# Dynamic workflow is effectively both a task and a workflow. The key thing to note is that the \_body of tasks is run at run time and the -# body of workflows is run at compile (aka registration) time. Essentially, this is what a dynamic workflow leverages -- it’s a workflow that is compiled at run time (the best of both worlds)! -# -# At execution (run) time, Flytekit runs the compilation step, and produces -# a `WorkflowTemplate` (from the dynamic workflow), which Flytekit then passes back to Flyte Propeller for further running, exactly how sub-workflows are handled. -# -# :::{note} -# The dynamic pattern isn't the most efficient method to iterate over a list. [Map tasks](https://github.com/flyteorg/flytekit/blob/8528268a29a07fe7e9ce9f7f08fea68c41b6a60b/flytekit/core/map_task.py/) -# might be more efficient in certain cases. But they only work for Python tasks (tasks decorated with the @task decorator) not SQL, Spark, and so on. -# ::: -# -# We now define a dynamic workflow that encapsulates the above mentioned points. -# %% -@dynamic -def count_characters(s1: str, s2: str) -> int: - """ - Calls the required tasks and returns the final result""" - - # s1 and s2 are accessible - - # initialize an empty list consisting of 26 empty slots corresponding to every alphabet (lower and upper case) - freq1 = [0] * 26 - freq2 = [0] * 26 - - # looping through the string s1 - for i in range(len(s1)): - - # index and freq1 are not accessible as they are promises - index = return_index(character=s1[i]) - freq1 = update_list(freq_list=freq1, list_index=index) - - # looping through the string s2 - for i in range(len(s2)): - - # index and freq2 are not accessible as they are promises - index = return_index(character=s2[i]) - freq2 = update_list(freq_list=freq2, list_index=index) - - # counting the common characters - return derive_count(freq1=freq1, freq2=freq2) - - -# %% [markdown] -# When tasks are called within any workflow, they return Promise objects. Likewise, in a dynamic workflow, the tasks' outputs are Promise objects that cannot be directly accessed (they shall be fulfilled by Flyte later). -# Because of this fact, operations on the `index` variable like `index + 1` are not valid. -# To manage this problem, the values need to be passed to the other tasks to unwrap them. -# -# :::{note} -# The local execution will work when a `@dynamic` decorator is used because Flytekit treats it like a `task` that will run with the Python native inputs. -# ::: -# -# Therefore, there are no Promise objects locally within the function decorated with `@dynamic` as it is treated as a `task`. - -# %% [markdown] -# Finally, we define a workflow that calls the dynamic workflow. -# %% -@workflow -def wf(s1: str, s2: str) -> int: - """ - Calls the dynamic workflow and returns the result""" - - # sending two strings to the workflow - return count_characters(s1=s1, s2=s2) - - -if __name__ == "__main__": - print(wf(s1="Pear", s2="Earth")) - - -# %% [markdown] -# ## Dynamic Workflows Under the Hood -# -# ### What Is a Dynamic Workflow? -# -# A workflow whose directed acyclic graph (DAG) is computed at run-time is a {ref}`dynamic workflow `. The tasks in a dynamic workflow are executed at runtime using dynamic inputs. -# -# Think of a dynamic workflow as a combination of a task and a workflow. It is used to dynamically decide the parameters of a workflow at runtime. It is both compiled and executed at run-time. You can define a dynamic workflow using the `@dynamic` decorator. -# -# ### Why Use Dynamic Workflows? -# -# #### Flexibility -# -# Dynamic workflows simplify your pipelines, providing the flexibility to design workflows based on your project’s requirements, which can’t be achieved using static workflows. -# -# #### Lower Pressure on etcd -# -# The workflow CRD and the states associated with static workflows are stored in etcd, which is the Kubernetes database. This database stores Flyte workflow CRD as key-value pairs and keeps track of the status of each node’s execution. -# A limitation of etcd is that there is a hard limit on the data size (data size refers to the aggregate of the size of the workflow and the status of the nodes). -# Due to this limitation, you need to ensure that your static workflows don’t consume too much memory. -# -# Dynamic workflows offload the workflow spec (node/task definitions and connections, etc) to the blobstore but the node statuses are stored in the FlyteWorkflow CRD (in etcd). -# Dynamic workflows alleviate a portion of etcd storage space thereby reducing pressure on etcd. -# -# ### How Is a Dynamic Workflow Executed? -# -# FlytePropeller executes the dynamic task in its k8s pod and results in a compiled Flyte DAG which is made available in the FlyteConsole. -# FlytePropeller uses the information obtained by executing the dynamic task to schedule and execute every node within the dynamic task. -# You can visualize the dynamic workflow’s graph in the UI only after the dynamic task has completed execution. -# -# When a dynamic task is executed, it generates the entire workflow as its output. This output is known as the **futures file**. -# It is named so because the workflow is yet to be executed and all the subsequent outputs are futures. -# -# ### How Does Flyte Handle Dynamic Workflows? -# -# A dynamic workflow is modeled as a task in the backend, but the body of the function is executed to produce a workflow at run-time. In both dynamic and static workflows, the output of tasks are Promise objects. -# -# :::{note} -# When a dynamic (or static) workflow calls a task, the workflow returns a {py:class}`Promise ` object. You can’t interact with this Promise object directly since it uses lazy evaluation (it defers the evaluation until absolutely needed). You can unwrap the Promise object by passing it to a task or a dynamic workflow. -# ::: -# -# {ref}`Here ` is an example of house price prediction using dynamic workflows. -# -# ### Where Are Dynamic Workflows Used? -# -# Dynamic workflow comes into the picture when you need to: -# -# 1. Modify the logic of the code at runtime -# 2. Change or decide on feature extraction parameters on-the-go -# 3. Build AutoML pipelines -# 4. Tune hyperparameters during execution -# -# ### Dynamic versus Map Tasks -# -# Dynamic tasks have overhead for large fan-out tasks because they store metadata for the entire workflow. In contrast, map tasks are efficient for these large fan-out tasks since they don’t store the metadata, as a consequence of which overhead is less apparent. diff --git a/examples/advanced_composition/advanced_composition/eager_workflows.py b/examples/advanced_composition/advanced_composition/eager_workflows.py index 35e38ed37..a0af45e92 100644 --- a/examples/advanced_composition/advanced_composition/eager_workflows.py +++ b/examples/advanced_composition/advanced_composition/eager_workflows.py @@ -21,7 +21,7 @@ # `pyflyte register`, or `pyflyte serialize`. This means that the workflow is static # and cannot change its shape at any point: all of the variables defined as an input # to the workflow or as an output of a task or subworkflow are promises. -# {ref}`Dynamic workflows `, on the other hand, are compiled +# {ref}`Dynamic workflows `, on the other hand, are compiled # at runtime so that they can materialize the inputs of the workflow as Python values # and use them to determine the shape of the execution graph. # @@ -29,7 +29,7 @@ # create extremely flexible workflows that give you run-time access to # intermediary task/subworkflow outputs. # -# ## Why Eager Workflows? +# ## Why eager workflows? # # Both static and dynamic workflows have a key limitation: while they provide # compile-time and run-time type safety, respectively, they both suffer from @@ -41,7 +41,6 @@ # the python constructs that you're familiar with via the `asyncio` API. To # understand what this looks like, let's define a very basic eager workflow # using the `@eager` decorator. - # %% from flytekit import task, workflow from flytekit.experimental import eager @@ -81,8 +80,7 @@ async def simple_eager_workflow(x: int) -> int: # Unlike in static and dynamic workflows, this variable is actually # the Python integer that is the result of `x + 1` and not a promise. # -# %% [markdown] -# ## How it Works +# ## How it works # # When you decorate a function with `@eager`, any function invoked within it # that's decorated with `@task`, `@workflow`, or `@eager` becomes @@ -110,7 +108,7 @@ async def simple_eager_workflow(x: int) -> int: # compile-time analysis and first-class data lineage tracking. # ``` # -# Similar to {ref}`dynamic workflows `, eager workflows are +# Similar to {ref}`dynamic workflows `, eager workflows are # actually tasks. The main difference is that, while dynamic workflows compile # a static workflow at runtime using materialized inputs, eager workflows do # not compile any workflow at all. Instead, they use the {py:class}`~flytekit.remote.remote.FlyteRemote` @@ -120,8 +118,7 @@ async def simple_eager_workflow(x: int) -> int: # Python object in the underlying runtime environment. We'll see how to configure # `@eager` functions to run on a remote Flyte cluster # {ref}`later in this guide `. - -# %% [markdown] +# # ## What can you do with eager workflows? # # In this section we'll cover a few of the use cases that you can accomplish @@ -134,6 +131,7 @@ async def simple_eager_workflow(x: int) -> int: # task and subworkflow outputs as Python values and do operations on them just # like you would in any other Python function. Let's look at another example: + # %% @eager async def another_eager_workflow(x: int) -> int: @@ -149,15 +147,12 @@ async def another_eager_workflow(x: int) -> int: # Since out is an actual Python integer and not a promise, we can do operations # on it at runtime, inside the eager workflow function body. This is not possible # with static or dynamic workflows. - - -# %% [markdown] -# ### Pythonic Conditionals +# +# ### Pythonic conditionals # # As you saw in the `simple_eager_workflow` workflow above, you can use regular # Python conditionals in your eager workflows. Let's look at a more complicated # example: - # %% @task def gt_100(x: int) -> bool: @@ -183,9 +178,7 @@ async def eager_workflow_with_conditionals(x: int) -> int: # In the above example, we're using the eager workflow's Python runtime # to check if `out` is negative, but we're also using the `gt_100` task in the # `elif` statement, which will be executed in a separate Flyte task. - - -# %% [markdown] +# # ### Loops # # You can also gather the outputs of multiple tasks or subworkflows into a list: @@ -209,7 +202,6 @@ async def eager_workflow_with_for_loop(x: int) -> int: # ### Static subworkflows # # You can also invoke static workflows from within an eager workflow: - # %% @workflow def subworkflow(x: int) -> int: @@ -228,7 +220,6 @@ async def eager_workflow_with_static_subworkflow(x: int) -> int: # ### Eager subworkflows # # You can have nest eager subworkflows inside a parent eager workflow: - # %% @eager async def eager_subworkflow(x: int) -> int: @@ -245,7 +236,6 @@ async def nested_eager_workflow(x: int) -> int: # ### Catching exceptions # # You can also catch exceptions in eager workflows through `EagerException`: - # %% from flytekit.experimental import EagerException @@ -274,15 +264,13 @@ async def eager_workflow_with_exception(x: int) -> int: # ```{note} # This is a current limitation in the `@eager` workflow implementation. # ```` - - -# %% [markdown] -# ## Executing Eager Workflows +# +# ## Executing eager workflows # # As with most Flyte constructs, you can execute eager workflows both locally # and remotely. # -# ### Local Execution +# ### Local execution # # You can execute eager workflows locally by simply calling them like a regular # `async` function: @@ -296,11 +284,10 @@ async def eager_workflow_with_exception(x: int) -> int: # This just uses the `asyncio.run` function to execute the eager workflow just # like any other Python async code. This is useful for local debugging as you're # developing your workflows and tasks. - -# %% [markdown] +# # (eager_workflows_remote)= # -# ### Remote Flyte Cluster Execution +# ### Remote Flyte cluster execution # # Under the hood, `@eager` workflows use the {py:class}`~flytekit.remote.remote.FlyteRemote` # object to kick off task, static workflow, and eager workflow executions. @@ -334,7 +321,7 @@ async def eager_workflow_with_exception(x: int) -> int: # cluster to authenticate via a client key. # %% [markdown] -# ### Sandbox Flyte Cluster Execution +# ### Sandbox Flyte cluster execution # # When using a sandbox cluster started with `flytectl demo start`, however, the # `client_secret_group` and `client_secret_key` are not required, since the @@ -367,9 +354,8 @@ async def eager_workflow_sandbox(x: int) -> int: # object. This means that you need to pre-register all Flyte entities that are # invoked inside of the eager workflow. # ``` - -# %% [markdown] -# ### Registering and Running +# +# ### Registering and running # # Assuming that your `flytekit` code is configured correctly, you will need to # register all of the task and subworkflows that are used with your eager @@ -399,9 +385,8 @@ async def eager_workflow_sandbox(x: int) -> int: # that `pyflyte run` has no way of knowing what tasks and subworkflows are # invoked inside of it. # ``` - -# %% [markdown] -# ## Eager Workflows on Flyte Console +# +# ## Eager workflows on Flyte console # # Since eager workflows are an experimental feature, there is currently no # first-class representation of them on Flyte Console, the UI for Flyte. @@ -431,15 +416,13 @@ async def eager_workflow_sandbox(x: int) -> int: # :alt: Eager Workflow Deck # :class: with-shadow # ::: - - -# %% [markdown] +# # ## Limitations # # As this feature is still experimental, there are a few limitations that you # need to keep in mind: # -# - You cannot invoke {ref}`dynamic workflows `, +# - You cannot invoke {ref}`dynamic workflows `, # {ref}`map tasks `, or {ref}`launch plans ` inside an # eager workflow. # - [Context managers](https://docs.python.org/3/library/contextlib.html) will @@ -459,9 +442,8 @@ async def eager_workflow_sandbox(x: int) -> int: # - Flyte console currently does not have a first-class way of viewing eager # workflows, but it can be accessed via the task list view and the execution # graph is viewable via Flyte Decks. - -# %% [markdown] -# ## Summary of Workflows +# +# ## Summary of workflows # # Eager workflows are a powerful new construct that trades-off compile-time type # safety for flexibility in the shape of the execution graph. The table below diff --git a/examples/advanced_composition/advanced_composition/map_task.py b/examples/advanced_composition/advanced_composition/map_task.py index 803f0b47f..86719a9ff 100644 --- a/examples/advanced_composition/advanced_composition/map_task.py +++ b/examples/advanced_composition/advanced_composition/map_task.py @@ -7,83 +7,85 @@ # .. tags:: Intermediate # ``` # -# ```{image} https://img.shields.io/badge/Blog%20Post-Map%20Tasks-blue?style=for-the-badge -# :alt: Map Task Blog Post -# :target: https://blog.flyte.org/map-tasks-in-flyte -# ``` +# Using a map task in Flyte allows for the execution of a pod task or a regular task across a series of inputs within a single workflow node. +# This capability eliminates the need to create individual nodes for each instance, leading to substantial performance improvements. # -# A map task allows you to execute a pod task or a regular task on a series of inputs within a single workflow node. -# This enables you to execute numerous instances of the task without having to create a node for each instance, resulting in significant performance improvements. +# Map tasks find utility in diverse scenarios, such as: # -# Map tasks find application in various scenarios, including: +# 1. Executing the same code logic on multiple inputs +# 2. Concurrent processing of multiple data batches +# 3. Hyperparameter optimization # -# - When multiple inputs require running through the same code logic. -# - Processing multiple data batches concurrently. -# - Conducting hyperparameter optimization. +# The following examples demonstrate how to use map tasks with both single and multiple inputs. # -# Now, let's delve into an example! - -# %% [markdown] -# First, import the libraries. +# To begin, import the required libraries. # %% -from typing import List - -from flytekit import Resources, map_task, task, workflow - +from flytekit import map_task, task, workflow # %% [markdown] -# Define a task to be used in the map task. -# -# :::{note} -# A map task can only accept one input and produce one output. -# ::: +# Here's a simple workflow that uses {py:func}`map_task `. # %% -@task -def a_mappable_task(a: int) -> str: - inc = a + 2 - stringified = str(inc) - return stringified +threshold = 11 -# %% [markdown] -# Also define a task to reduce the mapped output to a string. -# %% @task -def coalesce(b: List[str]) -> str: - coalesced = "".join(b) - return coalesced - +def detect_anomalies(data_point: int) -> bool: + return data_point > threshold -# %% [markdown] -# To repeat the execution of the `a_mappable_task` across a collection of inputs, use the {py:func}`~flytekit:flytekit.map_task` function from flytekit. -# In this example, the input `a` is of type `List[int]`. -# The `a_mappable_task` is executed for each element in the list. -# -# You can utilize the `with_overrides` function to set resources specifically for individual map tasks. -# This allows you to customize resource allocations such as memory usage. -# %% @workflow -def my_map_workflow(a: List[int]) -> str: - mapped_out = map_task(a_mappable_task)(a=a).with_overrides( - requests=Resources(mem="300Mi"), - limits=Resources(mem="500Mi"), - retries=1, - ) - coalesced = coalesce(b=mapped_out) - return coalesced +def map_workflow(data: list[int] = [10, 12, 11, 10, 13, 12, 100, 11, 12, 10]) -> list[bool]: + # Use the map task to apply the anomaly detection function to each data point + return map_task(detect_anomalies)(data_point=data) -# %% [markdown] -# Finally, you can run the workflow locally. -# %% if __name__ == "__main__": - result = my_map_workflow(a=[1, 2, 3, 4, 5]) - print(f"{result}") + print(f"Anomalies Detected: {map_workflow()}") + # %% [markdown] +# To customize resource allocations, such as memory usage for individual map tasks, +# you can leverage `with_overrides`. Here's an example using the `detect_anomalies` map task within a workflow: +# +# ```python +# from flytekit import Resources +# +# +# @workflow +# def map_workflow_with_resource_overrides(data: list[int] = [10, 12, 11, 10, 13, 12, 100, 11, 12, 10]) -> list[bool]: +# return map_task(detect_anomalies)(data_point=data).with_overrides(requests=Resources(mem="2Gi")) +# ``` +# +# You can use {py:class}`~flytekit.TaskMetadata` to set attributes such as `cache`, `cache_version`, `interruptible`, `retries` and `timeout`. +# ```python +# from flytekit import TaskMetadata +# +# +# @workflow +# def map_workflow_with_metadata(data: list[int] = [10, 12, 11, 10, 13, 12, 100, 11, 12, 10]) -> list[bool]: +# return map_task(detect_anomalies, metadata=TaskMetadata(cache=True, cache_version="0.1", retries=1))( +# data_point=data +# ) +# ``` +# +# You can also configure `concurrency` and `min_success_ratio` for a map task: +# - `concurrency` limits the number of mapped tasks that can run in parallel to the specified batch size. +# If the input size exceeds the concurrency value, multiple batches will run serially until all inputs are processed. +# If left unspecified, it implies unbounded concurrency. +# - `min_success_ratio` determines the minimum fraction of total jobs that must complete successfully before terminating +# the map task and marking it as successful. +# +# ```python +# @workflow +# def map_workflow_with_additional_params(data: list[int] = [10, 12, 11, 10, 13, 12, 100, 11, 12, 10]) -> list[bool]: +# return map_task(detect_anomalies, concurrency=1, min_success_ratio=0.75)(data_point=data) +# ``` +# +# A map task internally uses a compression algorithm (bitsets) to handle every Flyte workflow node’s metadata, +# which would have otherwise been in the order of 100s of bytes. +# # When defining a map task, avoid calling other tasks in it. Flyte -# can't accurately register tasks that call other tasks. While Flyte +# can't accurately register tasks that call other tasks. While Flyte # will correctly execute a task that calls other tasks, it will not be # able to give full performance advantages. This is # especially true for map tasks. @@ -104,18 +106,17 @@ def suboptimal_mappable_task(a: int) -> str: # %% [markdown] -# By default, the map task utilizes the Kubernetes Array plugin for execution. +# By default, the map task utilizes the Kubernetes array plugin for execution. # However, map tasks can also be run on alternate execution backends. -# For example, you can configure the map task to run on [AWS Batch](https://docs.flyte.org/en/latest/deployment/plugin_setup/aws/batch.html#deployment-plugin-setup-aws-array), +# For example, you can configure the map task to run on +# [AWS Batch](https://docs.flyte.org/en/latest/deployment/plugin_setup/aws/batch.html#deployment-plugin-setup-aws-array), # a provisioned service that offers scalability for handling large-scale tasks. - -# %% [markdown] -# ## Map a Task with Multiple Inputs +# +# ## Map a task with multiple inputs # # You might need to map a task with multiple inputs. # # For instance, consider a task that requires three inputs. - # %% @task def multi_input_task(quantity: int, price: float, shipping: float) -> float: @@ -123,23 +124,21 @@ def multi_input_task(quantity: int, price: float, shipping: float) -> float: # %% [markdown] -# In some cases, you may want to map this task with only the ``quantity`` input, while keeping the other inputs unchanged. +# You may want to map this task with only the ``quantity`` input, while keeping the other inputs unchanged. # Since a map task accepts only one input, you can achieve this by partially binding values to the map task. # This can be done using the {py:func}`functools.partial` function. - # %% import functools @workflow -def multiple_workflow(list_q: List[int] = [1, 2, 3, 4, 5], p: float = 6.0, s: float = 7.0) -> List[float]: +def multiple_inputs_map_workflow(list_q: list[int] = [1, 2, 3, 4, 5], p: float = 6.0, s: float = 7.0) -> list[float]: partial_task = functools.partial(multi_input_task, price=p, shipping=s) return map_task(partial_task)(quantity=list_q) # %% [markdown] # Another possibility is to bind the outputs of a task to partials. - # %% @task def get_price() -> float: @@ -147,7 +146,7 @@ def get_price() -> float: @workflow -def multiple_workflow_with_task_output(list_q: List[int] = [1, 2, 3, 4, 5], s: float = 6.0) -> List[float]: +def map_workflow_partial_with_task_output(list_q: list[int] = [1, 2, 3, 4, 5], s: float = 6.0) -> list[float]: p = get_price() partial_task = functools.partial(multi_input_task, price=p, shipping=s) return map_task(partial_task)(quantity=list_q) @@ -155,12 +154,11 @@ def multiple_workflow_with_task_output(list_q: List[int] = [1, 2, 3, 4, 5], s: f # %% [markdown] # You can also provide multiple lists as input to a ``map_task``. - # %% @workflow -def multiple_workflow_with_lists( - list_q: List[int] = [1, 2, 3, 4, 5], list_p: List[float] = [6.0, 9.0, 8.7, 6.5, 1.2], s: float = 6.0 -) -> List[float]: +def map_workflow_with_lists( + list_q: list[int] = [1, 2, 3, 4, 5], list_p: list[float] = [6.0, 9.0, 8.7, 6.5, 1.2], s: float = 6.0 +) -> list[float]: partial_task = functools.partial(multi_input_task, shipping=s) return map_task(partial_task)(quantity=list_q, price=list_p) @@ -169,3 +167,79 @@ def multiple_workflow_with_lists( # ```{note} # It is important to note that you cannot provide a list as an input to a partial task. # ``` +# +# ## Run the example on the Flyte cluster +# +# To run the provided workflows on the Flyte cluster, use the following commands: +# +# ``` +# pyflyte run --remote \ +# https://raw.githubusercontent.com/flyteorg/flytesnacks/master/examples/advanced_composition/advanced_composition/map_task.py \ +# map_workflow +# ``` +# +# ``` +# pyflyte run --remote \ +# https://raw.githubusercontent.com/flyteorg/flytesnacks/master/examples/advanced_composition/advanced_composition/map_task.py \ +# map_workflow_with_additional_params +# ``` +# +# ``` +# pyflyte run --remote \ +# https://raw.githubusercontent.com/flyteorg/flytesnacks/master/examples/advanced_composition/advanced_composition/map_task.py \ +# multiple_inputs_map_workflow +# ``` +# +# ``` +# pyflyte run --remote \ +# https://raw.githubusercontent.com/flyteorg/flytesnacks/master/examples/advanced_composition/advanced_composition/map_task.py \ +# map_workflow_partial_with_task_output +# ``` +# +# ``` +# pyflyte run --remote \ +# https://raw.githubusercontent.com/flyteorg/flytesnacks/master/examples/advanced_composition/advanced_composition/map_task.py \ +# map_workflow_with_lists +# ``` +# +# ## ArrayNode +# +# :::{important} +# This feature is experimental and the API is subject to breaking changes. +# If you encounter any issues please consider submitting a +# [bug report](https://github.com/flyteorg/flyte/issues/new?assignees=&labels=bug%2Cuntriaged&projects=&template=bug_report.yaml&title=%5BBUG%5D+). +# ::: +# +# ArrayNode map tasks serve as a seamless substitution for regular map tasks, differing solely in the submodule +# utilized to import the `map_task` function. Specifically, you will need to import `map_task` from the experimental module as illustrated below: +# +# ```python +# from flytekit import task, workflow +# from flytekit.experimental import map_task +# +# @task +# def t(a: int) -> int: +# ... +# +# @workflow +# def array_node_wf(xs: list[int]) -> list[int]: +# return map_task(t)(a=xs) +# ``` +# +# Flyte introduces map task to enable parallelization of homogeneous operations, +# offering efficient evaluation and a user-friendly API. Because it’s implemented as a backend plugin, +# its evaluation is independent of core Flyte logic, which generates subtask executions that lack full Flyte functionality. +# ArrayNode tackles this issue by offering robust support for subtask executions. +# It also extends mapping capabilities across all plugins and Flyte node types. +# This enhancement will be a part of our move from the experimental phase to general availability. +# +# In contrast to map tasks, an ArrayNode provides the following enhancements: +# +# - **Wider mapping support**. ArrayNode extends mapping capabilities beyond Kubernetes tasks, encompassing tasks such as Python tasks, container tasks and pod tasks. +# - **Cache management**. It supports both cache serialization and cache overwriting for subtask executions. +# - **Intra-task checkpointing**. ArrayNode enables intra-task checkpointing, contributing to improved execution reliability. +# - **Workflow recovery**. Subtasks remain recoverable during the workflow recovery process. (This is a work in progress.) +# - **Subtask failure handling**. The mechanism handles subtask failures effectively, ensuring that running subtasks are appropriately aborted. +# - **Multiple input values**. Subtasks can be defined with multiple input values, enhancing their versatility. +# +# We expect the performance of ArrayNode map tasks to compare closely to standard map tasks. diff --git a/examples/advanced_composition/advanced_composition/merge_sort.py b/examples/advanced_composition/advanced_composition/merge_sort.py deleted file mode 100644 index 55d28e194..000000000 --- a/examples/advanced_composition/advanced_composition/merge_sort.py +++ /dev/null @@ -1,132 +0,0 @@ -# %% [markdown] -# (advanced_merge_sort)= -# -# # Implementing Merge Sort -# -# ```{eval-rst} -# .. tags:: Intermediate -# ``` -# -# FlyteIdl (the fundamental building block of the Flyte Language) allows various programming language features: -# conditionals, recursion, custom typing, and more. -# -# This tutorial will walk you through writing a simple Distributed Merge Sort algorithm. It'll show usage of conditions -# as well as recursion using dynamically generated workflows. Flyte imposes limitation on the depth of the recursion to -# avoid mis-use and potentially affecting the overall stability of the system. - -# %% -import typing -from datetime import datetime -from random import random, seed -from typing import Tuple - -from flytekit import conditional, dynamic, task, workflow - -# seed random number generator -seed(datetime.now().microsecond) - - -# %% [markdown] -# A simple split function that divides a list into two halves. - - -# %% -@task -def split(numbers: typing.List[int]) -> Tuple[typing.List[int], typing.List[int], int, int]: - return ( - numbers[0 : int(len(numbers) / 2)], - numbers[int(len(numbers) / 2) :], - int(len(numbers) / 2), - int(len(numbers)) - int(len(numbers) / 2), - ) - - -# %% [markdown] -# One sample implementation for merging. In a more real world example, this might merge file streams and only load -# chunks into the memory. -# %% -@task -def merge(sorted_list1: typing.List[int], sorted_list2: typing.List[int]) -> typing.List[int]: - result = [] - while len(sorted_list1) > 0 and len(sorted_list2) > 0: - # Check if current element of first array is smaller than current element of second array. If yes, - # store first array element and increment first array index. Otherwise do same with second array - if sorted_list1[0] < sorted_list2[0]: - result.append(sorted_list1.pop(0)) - else: - result.append(sorted_list2.pop(0)) - - result.extend(sorted_list1) - result.extend(sorted_list2) - - return result - - -# %% [markdown] -# Generally speaking, the algorithm will recurse through the list, splitting it in half until it reaches a size that we -# know is efficient enough to run locally. At which point it'll just use the python-builtin sorted function. - - -# %% [markdown] -# This runs the sorting completely locally. It's faster and more efficient to do so if the entire list fits in memory. -# %% -@task -def sort_locally(numbers: typing.List[int]) -> typing.List[int]: - return sorted(numbers) - - -# %% [markdown] -# Let's now define the typical merge sort algorithm. We split, merge-sort each half then finally merge. With the simple -# addition of the `@dynamic` annotation, this function will instead generate a plan of execution (a flyte workflow) with -# 4 different nodes that will all run remotely on potentially different hosts. Flyte takes care of ensuring references -# of data are properly passed around and order of execution is maintained with maximum possible parallelism. -# %% -@dynamic -def merge_sort_remotely(numbers: typing.List[int], run_local_at_count: int) -> typing.List[int]: - split1, split2, new_count1, new_count2 = split(numbers=numbers) - sorted1 = merge_sort(numbers=split1, numbers_count=new_count1, run_local_at_count=run_local_at_count) - sorted2 = merge_sort(numbers=split2, numbers_count=new_count2, run_local_at_count=run_local_at_count) - return merge(sorted_list1=sorted1, sorted_list2=sorted2) - - -# %% [markdown] -# Putting it all together, this is the workflow that also serves as the entry point of execution. Given an unordered set -# of numbers, their length as well as the size at which to sort locally, it runs a condition on the size. The condition -# should look and flow naturally to a python developer. Binary arithmetic and logical operations on simple types as well -# as logical operations on conditions are supported. This condition checks if the current size of the numbers is below -# the cut-off size to run locally, if so, it runs the sort_locally task. Otherwise it runs the above dynamic workflow -# that recurse down the list. -# %% -@workflow -def merge_sort(numbers: typing.List[int], numbers_count: int, run_local_at_count: int = 10) -> typing.List[int]: - return ( - conditional("terminal_case") - .if_(numbers_count <= run_local_at_count) - .then(sort_locally(numbers=numbers)) - .else_() - .then(merge_sort_remotely(numbers=numbers, run_local_at_count=run_local_at_count)) - ) - - -# %% [markdown] -# A helper function to generate inputs for running the workflow locally. -# %% -def generate_inputs(numbers_count: int) -> typing.List[int]: - generated_list = [] - # generate random numbers between 0-1 - for _ in range(numbers_count): - value = int(random() * 10000) - generated_list.append(value) - - return generated_list - - -# %% [markdown] -# The entire workflow can be executed locally as follows: -# -# %% -if __name__ == "__main__": - count = 20 - x = generate_inputs(count) - print(x) - print(f"Running Merge Sort Locally...{merge_sort(numbers=x, numbers_count=count)}") diff --git a/examples/advanced_composition/advanced_composition/subworkflow.py b/examples/advanced_composition/advanced_composition/subworkflow.py new file mode 100644 index 000000000..c61d0c5ba --- /dev/null +++ b/examples/advanced_composition/advanced_composition/subworkflow.py @@ -0,0 +1,152 @@ +# %% [markdown] +# (subworkflow)= +# +# # Subworkflows +# +# ```{eval-rst} +# .. tags:: Intermediate +# ``` +# +# Subworkflows share similarities with {ref}`launch plans `, as both enable users to initiate one workflow from within another. +# The distinction lies in the analogy: think of launch plans as "pass by pointer" and subworkflows as "pass by value." +# +# ## When to use subworkflows? +# +# Subworkflows offer an elegant solution for managing parallelism between a workflow and its launched sub-flows, +# as they execute within the same context as the parent workflow. +# Consequently, all nodes of a subworkflow adhere to the overall constraints imposed by the parent workflow. +# +# Consider this scenario: when workflow `A` is integrated as a subworkflow of workflow `B`, +# running workflow `B` results in the entire graph of workflow `A` being duplicated into workflow `B` at the point of invocation. +# +# Here's an example illustrating the calculation of slope, intercept and the corresponding y-value. +# %% +from flytekit import task, workflow + + +@task +def slope(x: list[int], y: list[int]) -> float: + sum_xy = sum([x[i] * y[i] for i in range(len(x))]) + sum_x_squared = sum([x[i] ** 2 for i in range(len(x))]) + n = len(x) + return (n * sum_xy - sum(x) * sum(y)) / (n * sum_x_squared - sum(x) ** 2) + + +@task +def intercept(x: list[int], y: list[int], slope: float) -> float: + mean_x = sum(x) / len(x) + mean_y = sum(y) / len(y) + intercept = mean_y - slope * mean_x + return intercept + + +@workflow +def slope_intercept_wf(x: list[int], y: list[int]) -> (float, float): + slope_value = slope(x=x, y=y) + intercept_value = intercept(x=x, y=y, slope=slope_value) + return (slope_value, intercept_value) + + +@task +def regression_line(val: int, slope_value: float, intercept_value: float) -> float: + return (slope_value * val) + intercept_value # y = mx + c + + +@workflow +def regression_line_wf(val: int = 5, x: list[int] = [-3, 0, 3], y: list[int] = [7, 4, -2]) -> float: + slope_value, intercept_value = slope_intercept_wf(x=x, y=y) + return regression_line(val=val, slope_value=slope_value, intercept_value=intercept_value) + + +# %% [markdown] +# The `slope_intercept_wf` computes the slope and intercept of the regression line. +# Subsequently, the `regression_line_wf` triggers `slope_intercept_wf` and then computes the y-value. +# +# To execute the workflow locally, use the following: +# %% +if __name__ == "__main__": + print(f"Executing regression_line_wf(): {regression_line_wf()}") + + +# %% [markdown] +# It's possible to nest a workflow that contains a subworkflow within another workflow. +# Workflows can be easily constructed from other workflows, even if they function as standalone entities. +# Each workflow in this module has the capability to exist and run independently. +# %% +@workflow +def nested_regression_line_wf() -> float: + return regression_line_wf() + + +# %% [markdown] +# You can run the nested workflow locally as well. +# %% +if __name__ == "__main__": + print(f"Running nested_regression_line_wf(): {nested_regression_line_wf()}") + +# %% [markdown] +# ## External workflow +# +# When launch plans are employed within a workflow to initiate the execution of a pre-defined workflow, +# a new external execution is triggered. This results in a distinct execution ID and can be identified +# as a separate entity. +# +# These external invocations of a workflow, initiated using launch plans from a parent workflow, +# are termed as external workflows. They may have separate parallelism constraints since the context is not shared. +# +# :::{tip} +# If your deployment uses {ref}`multiple Kubernetes clusters `, +# external workflows may offer a way to distribute the workload of a workflow across multiple clusters. +# ::: +# +# Here's an example that illustrates the concept of external workflows: +# %% + +from flytekit import LaunchPlan + +launch_plan = LaunchPlan.get_or_create( + regression_line_wf, "regression_line_workflow", default_inputs={"val": 7, "x": [-3, 0, 3], "y": [7, 4, -2]} +) + + +@workflow +def nested_regression_line_lp() -> float: + # Trigger launch plan from within a workflow + return launch_plan() + + +# %% [markdown] +# :::{figure} https://raw.githubusercontent.com/flyteorg/static-resources/main/flytesnacks/user_guide/flyte_external_workflow_execution.png +# :alt: External workflow execution +# :class: with-shadow +# ::: +# +# In the console screenshot above, note that the launch plan execution ID differs from that of the workflow. +# +# You can run a workflow containing an external workflow locally as follows: +# %% +if __name__ == "__main__": + print(f"Running nested_regression_line_lp(): {nested_regression_line_lp}") + +# %% [markdown] +# ## Run the example on the Flyte cluster +# +# To run the provided workflows on the Flyte cluster, use the following commands: +# +# ``` +# pyflyte run --remote \ +# https://raw.githubusercontent.com/flyteorg/flytesnacks/master/examples/advanced_composition/advanced_composition/subworkflow.py \ +# regression_line_wf +# ``` +# +# ``` +# pyflyte run --remote \ +# https://raw.githubusercontent.com/flyteorg/flytesnacks/master/examples/advanced_composition/advanced_composition/subworkflow.py \ +# nested_regression_line_wf +# ``` +# +# ``` +# pyflyte run --remote \ +# https://raw.githubusercontent.com/flyteorg/flytesnacks/master/examples/advanced_composition/advanced_composition/subworkflow.py \ +# nested_regression_line_lp +# ``` diff --git a/examples/advanced_composition/advanced_composition/subworkflows.py b/examples/advanced_composition/advanced_composition/subworkflows.py deleted file mode 100644 index be4eff792..000000000 --- a/examples/advanced_composition/advanced_composition/subworkflows.py +++ /dev/null @@ -1,194 +0,0 @@ -# %% [markdown] -# (subworkflows)= -# -# # SubWorkflows -# -# ```{eval-rst} -# .. tags:: Intermediate -# ``` -# -# Subworkflows are similar to {ref}`launch plans ` since they allow users to kick off one workflow from within another. -# -# What's the Difference? -# Consider launch plans as pass by pointer and subworkflows as pass by value. -# -# :::{note} -# Flyte's handling of dynamic workflows necessitates the use of subworkflows. -# We provide this capability at the user level rather than hiding it. The pros and cons of -# using subworkflows are discussed below. -# ::: -# -# ## When Should I Use SubWorkflows? -# -# Subworkflows provide a clean solution to control parallelism between a workflow and its launched sub-flows -# because they execute within the same context as the parent workflow. -# Thus, all nodes of a subworkflow are bound by the total constraint on the parent workflow. -# -# Consider this: When Workflow A is included as a subworkflow of Workflow B, and when Workflow B is run, the entire graph of workflow A is -# copied into workflow B at the point where it is invoked. -# -# Let's look at an example of subworkflow. - -# %% [markdown] -# ## Example -# -# Import the required dependencies into the environment. -# %% -import typing -from typing import Tuple - -from flytekit import task, workflow - -# %% [markdown] -# Next, define a task that uses named outputs. -# As a best practice, usually try and define `NamedTuple` as a distinct type (although it can be defined inline). -# %% -op = typing.NamedTuple("OutputsBC", t1_int_output=int, c=str) - - -@task -def t1(a: int) -> op: - return op(a + 2, "world") - - -# %% [markdown] -# Then define a subworkflow like a typical workflow. -# %% -@workflow -def my_subwf(a: int = 42) -> Tuple[str, str]: - x, y = t1(a=a) - u, v = t1(a=x) - return y, v - - -# %% [markdown] -# We call the above-mentioned workflow above in a `parent` workflow below -# which demonstrates how to override the node name of a task (or subworkflow in this case). -# -# Nodes are typically named sequentially: `n0`, `n1`, and so on. Since the inner `my_subwf` also has a `n0`, you might -# want to modify the first node's name. Because node IDs must be different within a workflow graph, -# Flyte automatically prepends an attribute to the inner `n0`. -# %% -@workflow -def parent_wf(a: int) -> Tuple[int, str, str]: - x, y = t1(a=a).with_overrides(node_name="node-t1-parent") - u, v = my_subwf(a=x) - return x, u, v - - -# %% [markdown] -# :::{note} -# For improved presentation or readability, the `with_overrides` method provides a new name to the graph-node. -# ::: - -# %% [markdown] -# You can run the subworkflows locally. -# %% -if __name__ == "__main__": - print(f"Running parent_wf(a=3) {parent_wf(a=3)}") - - -# %% [markdown] -# Interestingly, we can nest a workflow that has a subworkflow within a workflow. -# Workflows can be simply composed from other workflows, even if they are standalone entities. Each of the -# workflows in this module can exist and run independently. -# %% -@workflow -def nested_parent_wf(a: int) -> Tuple[int, str, str, str]: - x, y = my_subwf(a=a) - m, n, o = parent_wf(a=a) - return m, n, o, y - - -# %% [markdown] -# You can run the nested workflows locally as well. -# %% -if __name__ == "__main__": - print(f"Running nested_parent_wf(a=3) {nested_parent_wf(a=3)}") - -# %% [markdown] -# :::{note} -# You can {ref}`chain and execute subworkflows ` similar to chained flyte tasks. -# ::: - -# %% [markdown] -# ## External Workflow -# -# When launch plans are used within a workflow to launch the execution of a previously defined workflow, a new -# external execution is launched, with a separate execution ID and can be observed as a distinct entity in -# FlyteConsole/Flytectl. -# -# They may have separate parallelism constraints since the context is not shared. -# We refer to such external invocations of a workflow using launch plans from a parent workflow as `External Workflows`. -# -# :::{tip} -# If your deployment uses {ref}`multiple Kubernetes clusters `, then external workflows may allow you to distribute the workload of a workflow to multiple clusters. -# ::: -# -# Here's an example demonstrating external workflows: - -# %% [markdown] -# Import the required dependencies into the environment. -# %% -import typing # noqa: E402 -from collections import Counter # noqa: E402 -from typing import Dict, Tuple # noqa: E402 - -from flytekit import LaunchPlan, task, workflow # noqa: E402 - - -# %% [markdown] -# Define a task that computes the frequency of each word in a string, and returns a dictionary mapping every word to its count. -# %% -@task -def count_freq_words(input_string1: str) -> Dict: - # input_string = "The cat sat on the mat" - words = input_string1.split() - wordCount = dict(Counter(words)) - return wordCount - - -# %% [markdown] -# Construct a workflow that executes the previously-defined task. -# %% -@workflow -def ext_workflow(my_input: str) -> Dict: - result = count_freq_words(input_string1=my_input) - return result - - -# %% [markdown] -# Next, create a launch plan. -# %% -external_lp = LaunchPlan.get_or_create( - ext_workflow, - "parent_workflow_execution", -) - - -# %% [markdown] -# Define another task that returns the repeated keys (in our case, words) from a dictionary. -# %% -@task -def count_repetitive_words(word_counter: Dict) -> typing.List[str]: - repeated_words = [key for key, value in word_counter.items() if value > 1] - return repeated_words - - -# %% [markdown] -# Define a workflow that triggers the launch plan of the previously-defined workflow. -# %% -@workflow -def parent_workflow(my_input1: str) -> typing.List[str]: - my_op1 = external_lp(my_input=my_input1) - my_op2 = count_repetitive_words(word_counter=my_op1) - return my_op2 - - -# %% [markdown] -# Here, `parent_workflow` is an external workflow. This can also be run locally. -# -# %% -if __name__ == "__main__": - print("Running parent workflow...") - print(parent_workflow(my_input1="the cat took the apple and ate the apple")) diff --git a/examples/advanced_composition/advanced_composition/waiting_for_external_inputs.py b/examples/advanced_composition/advanced_composition/waiting_for_external_inputs.py index 9ff31ce65..0d5c6648d 100644 --- a/examples/advanced_composition/advanced_composition/waiting_for_external_inputs.py +++ b/examples/advanced_composition/advanced_composition/waiting_for_external_inputs.py @@ -3,7 +3,7 @@ # # *New in Flyte 1.3.0* # -# There are use cases where we want a workflow execution to pause, only to continue +# There are use cases where you may want a workflow execution to pause, only to continue # when some time has passed or when it receives some inputs that are external to # the workflow execution inputs. You can think of these as execution-time inputs, # since they need to be supplied to the workflow after it's launched. Examples of @@ -30,8 +30,7 @@ # functions, {func}`@dynamic `-decorated functions, or # {ref}`imperative workflows `. # ::: - -# %% [markdown] +# # ## Pause executions with the `sleep` node # # The simplest case is when you want your workflow to {py:func}`~flytekit.sleep` @@ -40,7 +39,6 @@ # Though this type of node may not be used often in a production setting, # you might want to use it, for example, if you want to simulate a delay in # your workflow to mock out the behavior of some long-running computation. - # %% from datetime import timedelta @@ -87,7 +85,6 @@ def sleep_wf(num: int) -> int: # but before publishing it you want to give it a custom title. You can achieve # this by defining a `wait_for_input` node that takes a `str` input and # finalizes the report: - # %% import typing @@ -168,8 +165,6 @@ def reporting_with_approval_wf(data: typing.List[float]) -> dict: # You can also use the output of the `approve` function as a promise, feeding # it to a subsequent task. Let's create a version of our report-publishing # workflow where the approval happens after `create_report`: - - # %% @workflow def approval_as_promise_wf(data: typing.List[float]) -> dict: @@ -188,14 +183,13 @@ def approval_as_promise_wf(data: typing.List[float]) -> dict: # %% [markdown] -# ## Working with Conditionals +# ## Working with conditionals # # The node constructs by themselves are useful, but they become even more # useful when we combine them with other Flyte constructs, like {ref}`conditionals `. # # To illustrate this, let's extend the report-publishing use case so that we -# produce and "invalid report" output in case we don't approve the final report: - +# produce an "invalid report" output in case we don't approve the final report: # %% from flytekit import conditional @@ -293,4 +287,3 @@ def conditional_wf(data: typing.List[float]) -> dict: # # node is in the `signals` list above # remote.set_signal("review-passes", execution.id.name, True) # ``` -# diff --git a/examples/advanced_composition/requirements.in b/examples/advanced_composition/requirements.in deleted file mode 100644 index 3cfb26e38..000000000 --- a/examples/advanced_composition/requirements.in +++ /dev/null @@ -1 +0,0 @@ -flytekit diff --git a/examples/advanced_composition/requirements.txt b/examples/advanced_composition/requirements.txt deleted file mode 100644 index a1a517f1b..000000000 --- a/examples/advanced_composition/requirements.txt +++ /dev/null @@ -1,348 +0,0 @@ -# -# This file is autogenerated by pip-compile with Python 3.8 -# by the following command: -# -# pip-compile --resolver=backtracking requirements.in -# -adlfs==2023.8.0 - # via flytekit -aiobotocore==2.5.4 - # via s3fs -aiohttp==3.8.5 - # via - # adlfs - # aiobotocore - # gcsfs - # s3fs -aioitertools==0.11.0 - # via aiobotocore -aiosignal==1.3.1 - # via aiohttp -arrow==1.2.3 - # via cookiecutter -async-timeout==4.0.3 - # via aiohttp -attrs==23.1.0 - # via aiohttp -azure-core==1.29.4 - # via - # adlfs - # azure-identity - # azure-storage-blob -azure-datalake-store==0.0.53 - # via adlfs -azure-identity==1.14.0 - # via adlfs -azure-storage-blob==12.17.0 - # via adlfs -binaryornot==0.4.4 - # via cookiecutter -botocore==1.31.17 - # via aiobotocore -cachetools==5.3.1 - # via google-auth -certifi==2023.7.22 - # via - # kubernetes - # requests -cffi==1.15.1 - # via - # azure-datalake-store - # cryptography -chardet==5.2.0 - # via binaryornot -charset-normalizer==3.2.0 - # via - # aiohttp - # requests -click==8.1.7 - # via - # cookiecutter - # flytekit - # rich-click -cloudpickle==2.2.1 - # via flytekit -cookiecutter==2.3.0 - # via flytekit -croniter==1.4.1 - # via flytekit -cryptography==41.0.3 - # via - # azure-identity - # azure-storage-blob - # msal - # pyjwt - # pyopenssl -dataclasses-json==0.5.9 - # via flytekit -decorator==5.1.1 - # via gcsfs -deprecated==1.2.14 - # via flytekit -diskcache==5.6.3 - # via flytekit -docker==6.1.3 - # via flytekit -docker-image-py==0.1.12 - # via flytekit -docstring-parser==0.15 - # via flytekit -flyteidl==1.5.17 - # via flytekit -flytekit==1.9.1 - # via -r requirements.in -frozenlist==1.4.0 - # via - # aiohttp - # aiosignal -fsspec==2023.9.0 - # via - # adlfs - # flytekit - # gcsfs - # s3fs -gcsfs==2023.9.0 - # via flytekit -gitdb==4.0.10 - # via gitpython -gitpython==3.1.35 - # via flytekit -google-api-core==2.11.1 - # via - # google-cloud-core - # google-cloud-storage -google-auth==2.22.0 - # via - # gcsfs - # google-api-core - # google-auth-oauthlib - # google-cloud-core - # google-cloud-storage - # kubernetes -google-auth-oauthlib==1.0.0 - # via gcsfs -google-cloud-core==2.3.3 - # via google-cloud-storage -google-cloud-storage==2.10.0 - # via gcsfs -google-crc32c==1.5.0 - # via google-resumable-media -google-resumable-media==2.6.0 - # via google-cloud-storage -googleapis-common-protos==1.60.0 - # via - # flyteidl - # flytekit - # google-api-core - # grpcio-status -grpcio==1.53.0 - # via - # flytekit - # grpcio-status -grpcio-status==1.53.0 - # via flytekit -idna==3.4 - # via - # requests - # yarl -importlib-metadata==6.8.0 - # via - # flytekit - # keyring -importlib-resources==6.0.1 - # via keyring -isodate==0.6.1 - # via azure-storage-blob -jaraco-classes==3.3.0 - # via keyring -jinja2==3.1.2 - # via cookiecutter -jmespath==1.0.1 - # via botocore -joblib==1.3.2 - # via flytekit -jsonpickle==3.0.2 - # via flytekit -keyring==24.2.0 - # via flytekit -kubernetes==27.2.0 - # via flytekit -markdown-it-py==3.0.0 - # via rich -markupsafe==2.1.3 - # via jinja2 -marshmallow==3.20.1 - # via - # dataclasses-json - # marshmallow-enum - # marshmallow-jsonschema -marshmallow-enum==1.5.1 - # via - # dataclasses-json - # flytekit -marshmallow-jsonschema==0.13.0 - # via flytekit -mdurl==0.1.2 - # via markdown-it-py -more-itertools==10.1.0 - # via jaraco-classes -msal==1.23.0 - # via - # azure-datalake-store - # azure-identity - # msal-extensions -msal-extensions==1.0.0 - # via azure-identity -multidict==6.0.4 - # via - # aiohttp - # yarl -mypy-extensions==1.0.0 - # via typing-inspect -natsort==8.4.0 - # via flytekit -numpy==1.24.4 - # via - # flytekit - # pandas - # pyarrow -oauthlib==3.2.2 - # via - # kubernetes - # requests-oauthlib -packaging==23.1 - # via - # docker - # marshmallow -pandas==1.5.3 - # via flytekit -portalocker==2.7.0 - # via msal-extensions -protobuf==4.24.3 - # via - # flyteidl - # google-api-core - # googleapis-common-protos - # grpcio-status - # protoc-gen-swagger -protoc-gen-swagger==0.1.0 - # via flyteidl -pyarrow==10.0.1 - # via flytekit -pyasn1==0.5.0 - # via - # pyasn1-modules - # rsa -pyasn1-modules==0.3.0 - # via google-auth -pycparser==2.21 - # via cffi -pygments==2.16.1 - # via rich -pyjwt[crypto]==2.8.0 - # via msal -pyopenssl==23.2.0 - # via flytekit -python-dateutil==2.8.2 - # via - # arrow - # botocore - # croniter - # flytekit - # kubernetes - # pandas -python-json-logger==2.0.7 - # via flytekit -python-slugify==8.0.1 - # via cookiecutter -pytimeparse==1.1.8 - # via flytekit -pytz==2023.3.post1 - # via - # flytekit - # pandas -pyyaml==6.0.1 - # via - # cookiecutter - # flytekit - # kubernetes -regex==2023.8.8 - # via docker-image-py -requests==2.31.0 - # via - # azure-core - # azure-datalake-store - # cookiecutter - # docker - # flytekit - # gcsfs - # google-api-core - # google-cloud-storage - # kubernetes - # msal - # requests-oauthlib -requests-oauthlib==1.3.1 - # via - # google-auth-oauthlib - # kubernetes -rich==13.5.2 - # via - # cookiecutter - # flytekit - # rich-click -rich-click==1.6.1 - # via flytekit -rsa==4.9 - # via google-auth -s3fs==2023.9.0 - # via flytekit -six==1.16.0 - # via - # azure-core - # google-auth - # isodate - # kubernetes - # python-dateutil -smmap==5.0.0 - # via gitdb -sortedcontainers==2.4.0 - # via flytekit -statsd==3.3.0 - # via flytekit -text-unidecode==1.3 - # via python-slugify -typing-extensions==4.7.1 - # via - # aioitertools - # azure-core - # azure-storage-blob - # flytekit - # rich - # typing-inspect -typing-inspect==0.9.0 - # via dataclasses-json -urllib3==1.26.16 - # via - # botocore - # docker - # flytekit - # google-auth - # kubernetes - # requests -websocket-client==1.6.2 - # via - # docker - # kubernetes -wheel==0.41.2 - # via flytekit -wrapt==1.15.0 - # via - # aiobotocore - # deprecated - # flytekit -yarl==1.9.2 - # via aiohttp -zipp==3.16.2 - # via - # importlib-metadata - # importlib-resources diff --git a/examples/basics/Dockerfile b/examples/basics/Dockerfile index 5140e65c6..d0ebb2333 100644 --- a/examples/basics/Dockerfile +++ b/examples/basics/Dockerfile @@ -10,7 +10,7 @@ ENV LANG C.UTF-8 ENV LC_ALL C.UTF-8 ENV PYTHONPATH /root -RUN apt-get update && apt-get install -y libsm6 libxext6 libxrender-dev ffmpeg build-essential curl +RUN apt-get update && apt-get install -y build-essential curl # Virtual environment ENV VENV /opt/venv diff --git a/examples/basics/basics/workflow.py b/examples/basics/basics/workflow.py index 83e16d00e..ab9e47778 100644 --- a/examples/basics/basics/workflow.py +++ b/examples/basics/basics/workflow.py @@ -76,7 +76,7 @@ def simple_wf(x: list[int], y: list[int]) -> float: # # :::{note} # You can learn more about creating dynamic Flyte workflows by referring -# to {ref}`dynamic workflows `. +# to {ref}`dynamic workflows `. # In a dynamic workflow, unlike a simple workflow, the inputs are pre-materialized. # However, each task invocation within the dynamic workflow still generates a promise that is evaluated lazily. # Bear in mind that a workflow can have tasks, other workflows and dynamic workflows. diff --git a/examples/data_types_and_io/data_types_and_io/attribute_access.py b/examples/data_types_and_io/data_types_and_io/attribute_access.py index 4a2621c86..35697f3fb 100644 --- a/examples/data_types_and_io/data_types_and_io/attribute_access.py +++ b/examples/data_types_and_io/data_types_and_io/attribute_access.py @@ -1,159 +1,146 @@ # %% [markdown] # (attribute_access)= # -# # Attribute Access +# # Accessing Attributes # # ```{eval-rst} # .. tags:: Basic # ``` # -# Flyte allows users to access attributes directly on output promises for List, Dict, Dataclass, and combinations of these types. This allows users to pass attributes of the output directly in workflows, making it more convenient to work with complex data structures. +# You can directly access attributes on output promises for lists, dicts, dataclasses and combinations of these types in Flyte. +# This functionality facilitates the direct passing of output attributes within workflows, +# enhancing the convenience of working with complex data structures. # -# First, import the necessary dependencies and define a common task for later use. +# To begin, import the required dependencies and define a common task for subsequent use. # %% from dataclasses import dataclass -from typing import Dict, List from dataclasses_json import dataclass_json -from flytekit import WorkflowFailurePolicy, task, workflow +from flytekit import task, workflow @task -def print_str(a: str): - print(a) +def print_message(message: str): + print(message) return # %% [markdown] # ## List -# You can access the output list by index. +# You can access an output list using index notation. +# # :::{important} -# Currently, Flyte doesn't support accessing output promises by list slicing. +# Flyte currently does not support output promise access through list slicing. # ::: # %% @task -def list_task() -> List[str]: - return ["a", "b"] +def list_task() -> list[str]: + return ["apple", "banana"] @workflow def list_wf(): - o = list_task() - print_str(a=o[0]) - + items = list_task() + first_item = items[0] + print_message(message=first_item) -# %% [markdown] -# You can run the workflow locally. -# %% -if __name__ == "__main__": - list_wf() # %% [markdown] -# ## Dict -# You can access the output dict by key. +# ## Dictionary +# Access the output dictionary by specifying the key. # %% @task -def dict_task() -> Dict[str, str]: - return {"a": "b"} +def dict_task() -> dict[str, str]: + return {"fruit": "banana"} @workflow def dict_wf(): - o = dict_task() - print_str(a=o["a"]) + fruit_dict = dict_task() + print_message(message=fruit_dict["fruit"]) # %% [markdown] -# You can run the workflow locally. -# %% -if __name__ == "__main__": - dict_wf() - -# %% [markdown] -# ## Python Dataclass -# You can also access an attribute of a dataclass. +# ## Data class +# Directly access an attribute of a dataclass. # %% @dataclass_json @dataclass -class foo: - a: str +class Fruit: + name: str @task -def dataclass_task() -> foo: - return foo(a="b") +def dataclass_task() -> Fruit: + return Fruit(name="banana") @workflow def dataclass_wf(): - o = dataclass_task() - print_str(a=o.a) - + fruit_instance = dataclass_task() + print_message(message=fruit_instance.name) -# %% [markdown] -# You can run the workflow locally. -# %% -if __name__ == "__main__": - dataclass_wf() # %% [markdown] -# ## Complex Examples -# Combinations of List, Dict, and Dataclass also work. +# ## Complex type +# Combinations of list, dict and dataclass also work effectively. # %% @task -def advance_task() -> (Dict[str, List[str]], List[Dict[str, str]], Dict[str, foo]): - return {"a": ["b"]}, [{"a": "b"}], {"a": foo(a="b")} +def advance_task() -> (dict[str, list[str]], list[dict[str, str]], dict[str, Fruit]): + return {"fruits": ["banana"]}, [{"fruit": "banana"}], {"fruit": Fruit(name="banana")} @task -def print_list(a: List[str]): - print(a) +def print_list(fruits: list[str]): + print(fruits) @task -def print_dict(a: Dict[str, str]): - print(a) +def print_dict(fruit_dict: dict[str, str]): + print(fruit_dict) @workflow def advanced_workflow(): - dl, ld, dd = advance_task() - print_str(a=dl["a"][0]) - print_str(a=ld[0]["a"]) - print_str(a=dd["a"].a) + dictionary_list, list_dict, dict_dataclass = advance_task() + print_message(message=dictionary_list["fruits"][0]) + print_message(message=list_dict[0]["fruit"]) + print_message(message=dict_dataclass["fruit"].name) - print_list(a=dl["a"]) - print_dict(a=ld[0]) + print_list(fruits=dictionary_list["fruits"]) + print_dict(fruit_dict=list_dict[0]) # %% [markdown] -# You can run the workflow locally. +# You can run all the workflows locally as follows: # %% if __name__ == "__main__": + list_wf() + dict_wf() + dataclass_wf() advanced_workflow() # %% [markdown] -# ## Failed Examples -# The workflows will fail when there is an exception (e.g. out of range). -# %% -@task -def failed_task() -> (List[str], Dict[str, str], foo): - return ["a", "b"], {"a": "b"}, foo(a="b") - - -@workflow( - # The workflow will not fail if one of the nodes encounters an error, as long as there are other nodes that can still be executed. - failure_policy=WorkflowFailurePolicy.FAIL_AFTER_EXECUTABLE_NODES_COMPLETE -) -def failed_workflow(): - # This workflow is supposed to fail due to exceptions - l, d, f = failed_task() - print_str(a=l[100]) - print_str(a=d["b"]) - # This task will fail at compile time - # print_str(a=f.b) - - -# %% [markdown] -# failed_workflow should fail. +# ## Failure scenario +# The following workflow fails because it attempts to access indices and keys that are out of range: +# +# ```python +# from flytekit import WorkflowFailurePolicy +# +# +# @task +# def failed_task() -> (list[str], dict[str, str], Fruit): +# return ["apple", "banana"], {"fruit": "banana"}, Fruit(name="banana") +# +# +# @workflow( +# # The workflow remains unaffected if one of the nodes encounters an error, as long as other executable nodes are still available +# failure_policy=WorkflowFailurePolicy.FAIL_AFTER_EXECUTABLE_NODES_COMPLETE +# ) +# def failed_workflow(): +# fruits_list, fruit_dict, fruit_instance = failed_task() +# print_message(message=fruits_list[100]) # Accessing an index that doesn't exist +# print_message(message=fruit_dict["fruits"]) # Accessing a non-existent key +# print_message(message=fruit_instance.fruit) # Accessing a non-existent param +# ``` diff --git a/examples/house_price_prediction/README.md b/examples/house_price_prediction/README.md index 5ebece507..45fc22441 100644 --- a/examples/house_price_prediction/README.md +++ b/examples/house_price_prediction/README.md @@ -19,7 +19,7 @@ In this example, we will train our data on the XGBoost model to predict house pr House price prediction pipeline for one region doesn't require a {py:func}`~flytekit:flytekit.dynamic` workflow. When multiple regions are involved, to iterate through the regions at run-time and thereby build the DAG, Flyte workflow has to be {py:func}`~flytekit:flytekit.dynamic`. ```{tip} -Refer to {ref}`dynamic_workflows` section to learn more about dynamic workflows. +Refer to {ref}`dynamic_workflow` section to learn more about dynamic workflows. ``` ## Dataset diff --git a/examples/house_price_prediction/house_price_prediction/multiregion_house_price_predictor.py b/examples/house_price_prediction/house_price_prediction/multiregion_house_price_predictor.py index 8c57dcb3a..24d292249 100644 --- a/examples/house_price_prediction/house_price_prediction/multiregion_house_price_predictor.py +++ b/examples/house_price_prediction/house_price_prediction/multiregion_house_price_predictor.py @@ -1,7 +1,7 @@ # %% [markdown] # # Predicting House Price in Multiple Regions Using XGBoost and Dynamic Workflows # -# In this tutorial, we will understand how to predict house prices in multiple regions using XGBoost, and {ref}`dynamic workflows ` in Flyte. +# In this tutorial, we will understand how to predict house prices in multiple regions using XGBoost, and {ref}`dynamic workflows ` in Flyte. # # We will split the generated dataset into train, test and validation set. # @@ -72,6 +72,7 @@ # %% [markdown] # Next, we create a {py:func}`~flytekit:flytekit.dynamic` workflow to generate and split the data for multiple regions. + # %% @dynamic(cache=True, cache_version="0.1", limits=Resources(mem="600Mi")) def generate_and_split_data_multiloc( @@ -126,7 +127,6 @@ def parallel_fit_predict( def multi_region_house_price_prediction_model_trainer( seed: int = 7, number_of_houses: int = NUM_HOUSES_PER_LOCATION ) -> typing.List[typing.List[float]]: - # generate and split the data split_data_vals = generate_and_split_data_multiloc( locations=LOCATIONS, diff --git a/flyte_tests.txt b/flyte_tests.txt index 5de785ff6..dfc1da541 100644 --- a/flyte_tests.txt +++ b/flyte_tests.txt @@ -1,8 +1,8 @@ examples/advanced_composition/advanced_composition/chain_entities.py -examples/advanced_composition/advanced_composition/conditions.py +examples/advanced_composition/advanced_composition/conditional.py examples/advanced_composition/advanced_composition/decorating_tasks.py examples/advanced_composition/advanced_composition/decorating_workflows.py -examples/advanced_composition/advanced_composition/dynamics.py +examples/advanced_composition/advanced_composition/dynamic_workflow.py examples/advanced_composition/advanced_composition/map_task.py examples/advanced_composition/advanced_composition/waiting_for_external_inputs.py examples/basics/basics/documenting_workflows.py diff --git a/flyte_tests_manifest.json b/flyte_tests_manifest.json index 8e5590c37..8059e9c13 100644 --- a/flyte_tests_manifest.json +++ b/flyte_tests_manifest.json @@ -5,14 +5,20 @@ "path": "core", "examples": [ ["advanced_composition.chain_entities.chain_workflows_wf", {}], - ["advanced_composition.conditions.consume_outputs", { "my_input": 10.0 }], - ["advanced_composition.decorating_tasks.wf", { "x": 10 }], - ["advanced_composition.decorating_workflows.wf", { "x": 19.8 }], - ["advanced_composition.dynamics.wf", { "s1": "Pear", "s2": "Earth" }], [ - "advanced_composition.map_task.my_map_workflow", - { "a": [1, 2, 3, 4, 5] } + "advanced_composition.conditional.consume_task_output", + { "radius": 0.7 } ], + ["advanced_composition.decorating_tasks.decorating_task_wf", { "x": 10 }], + [ + "advanced_composition.decorating_workflows.decorating_workflow", + { "x": 19.8 } + ], + [ + "advanced_composition.dynamic_workflow.dynamic_wf", + { "s1": "Pear", "s2": "Earth" } + ], + ["advanced_composition.map_task.map_workflow_with_lists", {}], [ "advanced_composition.waiting_for_external_inputs.sleep_wf", { "num": 5 }