diff --git a/docs/concepts/catalog.md b/docs/concepts/catalog.md index 384eb196..7b68d962 100644 --- a/docs/concepts/catalog.md +++ b/docs/concepts/catalog.md @@ -1,486 +1,85 @@ -!!! note "Opt out" +[tasks](task.md) might also need to pass ```files``` between them. - Pipelines need not use the ```catalog``` if they prefer other ways to transfer - data between tasks. The default configuration of ```do-nothing``` is no-op by design. - We kindly request to raise a feature request to make us aware of the eco-system. +For example: -# TODO: Simplify this +```python linenums="1" -Catalog provides a way to store and retrieve data generated by the individual steps of the dag to downstream -steps of the dag. It can be any storage system that indexes its data by a unique identifier. +def generate(): + with open("data.csv", "w"): + # write content + ... -For example, a local directory structure partitioned by a ```run_id``` or S3 bucket prefixed by ```run_id```. +def consume(): + with open("data.csv", "r"): + # read content + ... -!!! tip inline end "Checkpoint" +generate() +consume() - Cataloging happens even if the step execution eventually fails. This behavior - can be used to recover from a failed run from a checkpoint. - - - -The directory structure within a partition is the same as the project directory structure. This enables you to -get/put data in the catalog as if you are working with local directory structure. Every interaction with the catalog -(either by API or configuration) results in an entry in the [```run log```](../concepts/run-log.md/#step_log) - -Internally, runnable also uses the catalog to store execution logs of tasks i.e stdout and stderr from -[python](../concepts/task.md/#python) or [shell](../concepts/task.md/#shell) and executed notebook -from [notebook tasks](../concepts/task.md/#notebook). - -Since the catalog captures the data files flowing through the pipeline and the execution logs, it enables you -to debug failed pipelines or keep track of data lineage. - - - - -!!! warning "Storage considerations" - - Since the data is stored per-run, it might cause the catalog to inflate. - - Please consider some clean up - mechanisms to regularly prune catalog for executions that are not relevant. - - - - -## Example - - - -=== "Configuration" +``` - Below is a sample configuration that uses the local file system as a catalog store. - The default location of the catalog is ```.catalog``` and is configurable. - Every execution of the pipeline will create a sub-directory of name ```run_id``` to store the artifacts - generated from the execution of the pipeline. +## Runnable representation - ```yaml - --8<-- "examples/configs/fs-catalog.yaml" - ``` +The same can be represented in ```runnable``` as [catalog](../reference.md/#catalog). - 1. Use local file system as a central catalog, defaults to ```.catalog``` +For example, the above snippet would be: -=== "python sdk" +=== "sdk" - In the below example, the steps ```create_content_in_data_folder``` and ```create_content_in_another_folder``` - create content for downstream steps, i.e ```retrieve_content_from_both``` to consume. + ```python linenums="1" - !!! note "Delete?" + from runnable import PythonTask, Pipeline, Catalog - Since we are executing in local compute and creating sub-directory ```another```, it might be mistaken that - we are not cataloging anything. We delete ```another``` directory between steps - to demonstrate that we indeed move files in and out of the catalog. + write_catalog = Catalog(put=["data.csv"]) + read_catalog = Catalog(get=["read.csv"]) - The highlighted lines in the below example show how to specify the files to get/put from the catalog using python SDK. + generate_task = PythonTask(name="generate", function=generate, catalog=write_catalog) + consume_task = PythonTask(name="consume", function=consume, catalog=read_catalog) - ```python linenums="1" hl_lines="44 52 68" - --8<-- "examples/concepts/catalog.py" + pipeline = Pipeline(steps=[generate_task, consume_task]) + pipeline.execute() ``` === "yaml" - In the below example, the steps ```data_create``` and ```another_create``` create content for - downstream steps, i.e ```retrieve``` to consume. - - !!! note "Delete?" - - Since we are executing in local compute and creating sub-directory ```another```, it might be mistaken that - we are not cataloging anything. We delete ```another``` directory between steps - to demonstrate that we indeed move files in and out of the catalog. - - The highlighted lines in the below example show how to specify the files to get/put from the catalog using - yaml. - - - ```yaml linenums="1" hl_lines="19-21 26-28 38-40" - --8<-- "examples/concepts/catalog.yaml" + ```yaml linenums="1" + dag: + start_at: generate_data + steps: + generate: + type: task + command: examples.common.functions.write_files + catalog: + put: + - data.csv + next: consume + consume: + type: task + command_type: python + command: examples.common.functions.read_files + catalog: + get: + - df.csv + - data_folder/data.txt + next: success + success: + type: success + fail: + type: fail ``` -!!! note "glob pattern" - - We use [glob pattern](https://docs.python.org/3/library/glob.html) to search for files. - - Note that, the pattern to recursively match all directories is ```**/*``` - - -The execution results in the ```catalog``` populated with the artifacts and the execution logs of the tasks. - - -=== "Directory structure" - - The directory structure within the ```catalog``` for the execution, i.e meek-stonebraker-0626, resembles - the project directory structure. - - The execution logs of all the tasks are also present in the ```catalog```. +## Example - ``` - >>> tree .catalog - .catalog - └── meek-stonebraker-0626 - ├── another - │   └── world.txt - ├── create_content_in_another_folder.execution.log - ├── create_content_in_data_folder.execution.log - ├── data - │   └── hello.txt - ├── delete_another_folder.execution.log - └── retrieve_content_from_both.execution.log +=== "sdk" - 4 directories, 6 files + ```python linenums="1" + --8<-- "examples/04-catalog/catalog.py" ``` -=== "Run log" - - The run log captures the data identities of the data flowing through the catalog. - +=== "yaml" - ```json linenums="1" hl_lines="38-53 84-99 169-191" - { - "run_id": "meek-stonebraker-0626", - "dag_hash": "", - "use_cached": false, - "tag": "", - "original_run_id": "", - "status": "SUCCESS", - "steps": { - "create_content_in_data_folder": { - "name": "create_content_in_data_folder", - "internal_name": "create_content_in_data_folder", - "status": "SUCCESS", - "step_type": "task", - "message": "", - "mock": false, - "code_identities": [ - { - "code_identifier": "6029841c3737fe1163e700b4324d22a469993bb0", - "code_identifier_type": "git", - "code_identifier_dependable": true, - "code_identifier_url": "https://github.com/AstraZeneca/runnable-core.git", - "code_identifier_message": "" - } - ], - "attempts": [ - { - "attempt_number": 1, - "start_time": "2024-01-06 06:26:56.279278", - "end_time": "2024-01-06 06:26:56.284564", - "duration": "0:00:00.005286", - "status": "SUCCESS", - "message": "", - "parameters": {} - } - ], - "user_defined_metrics": {}, - "branches": {}, - "data_catalog": [ - { - "name": "create_content_in_data_folder.execution.log", - "data_hash": "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855", - "catalog_relative_path": "meek-stonebraker-0626/create_content_in_data_folder.execution.log", - "catalog_handler_location": ".catalog", - "stage": "put" - }, - { - "name": "data/hello.txt", - "data_hash": "6ccad99847c78bfdc7a459399c9957893675d4fec2d675cec750b50ab4842542", - "catalog_relative_path": "meek-stonebraker-0626/data/hello.txt", - "catalog_handler_location": ".catalog", - "stage": "put" - } - ] - }, - "create_content_in_another_folder": { - "name": "create_content_in_another_folder", - "internal_name": "create_content_in_another_folder", - "status": "SUCCESS", - "step_type": "task", - "message": "", - "mock": false, - "code_identities": [ - { - "code_identifier": "6029841c3737fe1163e700b4324d22a469993bb0", - "code_identifier_type": "git", - "code_identifier_dependable": true, - "code_identifier_url": "https://github.com/AstraZeneca/runnable-core.git", - "code_identifier_message": "" - } - ], - "attempts": [ - { - "attempt_number": 1, - "start_time": "2024-01-06 06:26:56.353734", - "end_time": "2024-01-06 06:26:56.357519", - "duration": "0:00:00.003785", - "status": "SUCCESS", - "message": "", - "parameters": {} - } - ], - "user_defined_metrics": {}, - "branches": {}, - "data_catalog": [ - { - "name": "create_content_in_another_folder.execution.log", - "data_hash": "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855", - "catalog_relative_path": "meek-stonebraker-0626/create_content_in_another_folder.execution.log", - "catalog_handler_location": ".catalog", - "stage": "put" - }, - { - "name": "another/world.txt", - "data_hash": "869ae2ac8365d5353250fc502b084a28b2029f951ea7da0a6948f82172accdfd", - "catalog_relative_path": "meek-stonebraker-0626/another/world.txt", - "catalog_handler_location": ".catalog", - "stage": "put" - } - ] - }, - "delete_another_folder": { - "name": "delete_another_folder", - "internal_name": "delete_another_folder", - "status": "SUCCESS", - "step_type": "task", - "message": "", - "mock": false, - "code_identities": [ - { - "code_identifier": "6029841c3737fe1163e700b4324d22a469993bb0", - "code_identifier_type": "git", - "code_identifier_dependable": true, - "code_identifier_url": "https://github.com/AstraZeneca/runnable-core.git", - "code_identifier_message": "" - } - ], - "attempts": [ - { - "attempt_number": 1, - "start_time": "2024-01-06 06:26:56.428437", - "end_time": "2024-01-06 06:26:56.450148", - "duration": "0:00:00.021711", - "status": "SUCCESS", - "message": "", - "parameters": {} - } - ], - "user_defined_metrics": {}, - "branches": {}, - "data_catalog": [ - { - "name": "delete_another_folder.execution.log", - "data_hash": "a9b49c92ed63cb54a8b02c0271a925d9fac254034ed45df83f3ff24c0bd53ef6", - "catalog_relative_path": "meek-stonebraker-0626/delete_another_folder.execution.log", - "catalog_handler_location": ".catalog", - "stage": "put" - } - ] - }, - "retrieve_content_from_both": { - "name": "retrieve_content_from_both", - "internal_name": "retrieve_content_from_both", - "status": "SUCCESS", - "step_type": "task", - "message": "", - "mock": false, - "code_identities": [ - { - "code_identifier": "6029841c3737fe1163e700b4324d22a469993bb0", - "code_identifier_type": "git", - "code_identifier_dependable": true, - "code_identifier_url": "https://github.com/AstraZeneca/runnable-core.git", - "code_identifier_message": "" - } - ], - "attempts": [ - { - "attempt_number": 1, - "start_time": "2024-01-06 06:26:56.520948", - "end_time": "2024-01-06 06:26:56.530135", - "duration": "0:00:00.009187", - "status": "SUCCESS", - "message": "", - "parameters": {} - } - ], - "user_defined_metrics": {}, - "branches": {}, - "data_catalog": [ - { - "name": "data/hello.txt", - "data_hash": "6ccad99847c78bfdc7a459399c9957893675d4fec2d675cec750b50ab4842542", - "catalog_relative_path": "data/hello.txt", - "catalog_handler_location": ".catalog", - "stage": "get" - }, - { - "name": "another/world.txt", - "data_hash": "869ae2ac8365d5353250fc502b084a28b2029f951ea7da0a6948f82172accdfd", - "catalog_relative_path": "another/world.txt", - "catalog_handler_location": ".catalog", - "stage": "get" - }, - { - "name": "retrieve_content_from_both.execution.log", - "data_hash": "0a085cb15df6c70c5859b44cc62bfdc98383600ba4f2983124375a4f64f1ae83", - "catalog_relative_path": "meek-stonebraker-0626/retrieve_content_from_both.execution.log", - "catalog_handler_location": ".catalog", - "stage": "put" - } - ] - }, - "success": { - "name": "success", - "internal_name": "success", - "status": "SUCCESS", - "step_type": "success", - "message": "", - "mock": false, - "code_identities": [ - { - "code_identifier": "6029841c3737fe1163e700b4324d22a469993bb0", - "code_identifier_type": "git", - "code_identifier_dependable": true, - "code_identifier_url": "https://github.com/AstraZeneca/runnable-core.git", - "code_identifier_message": "" - } - ], - "attempts": [ - { - "attempt_number": 1, - "start_time": "2024-01-06 06:26:56.591948", - "end_time": "2024-01-06 06:26:56.592032", - "duration": "0:00:00.000084", - "status": "SUCCESS", - "message": "", - "parameters": {} - } - ], - "user_defined_metrics": {}, - "branches": {}, - "data_catalog": [] - } - }, - "parameters": {}, - "run_config": { - "executor": { - "service_name": "local", - "service_type": "executor", - "enable_parallel": false, - "placeholders": {} - }, - "run_log_store": { - "service_name": "buffered", - "service_type": "run_log_store" - }, - "secrets_handler": { - "service_name": "do-nothing", - "service_type": "secrets" - }, - "catalog_handler": { - "service_name": "file-system", - "service_type": "catalog" - }, - "experiment_tracker": { - "service_name": "do-nothing", - "service_type": "experiment_tracker" - }, - "pipeline_file": "", - "parameters_file": "", - "configuration_file": "examples/configs/fs-catalog.yaml", - "tag": "", - "run_id": "meek-stonebraker-0626", - "variables": {}, - "use_cached": false, - "original_run_id": "", - "dag": { - "start_at": "create_content_in_data_folder", - "name": "", - "description": "", - "internal_branch_name": "", - "steps": { - "create_content_in_data_folder": { - "type": "task", - "name": "create_content_in_data_folder", - "internal_name": "create_content_in_data_folder", - "internal_branch_name": "", - "is_composite": false - }, - "create_content_in_another_folder": { - "type": "task", - "name": "create_content_in_another_folder", - "internal_name": "create_content_in_another_folder", - "internal_branch_name": "", - "is_composite": false - }, - "retrieve_content_from_both": { - "type": "task", - "name": "retrieve_content_from_both", - "internal_name": "retrieve_content_from_both", - "internal_branch_name": "", - "is_composite": false - }, - "delete_another_folder": { - "type": "task", - "name": "delete_another_folder", - "internal_name": "delete_another_folder", - "internal_branch_name": "", - "is_composite": false - }, - "success": { - "type": "success", - "name": "success", - "internal_name": "success", - "internal_branch_name": "", - "is_composite": false - }, - "fail": { - "type": "fail", - "name": "fail", - "internal_name": "fail", - "internal_branch_name": "", - "is_composite": false - } - } - }, - "dag_hash": "", - "execution_plan": "chained" - } - } + ```yaml linenums="1" + --8<-- "examples/04-catalog/catalog.yaml" ``` - - - -## Using python API - -Files could also be cataloged using [python API](../interactions.md) - - -This functionality is possible in [python](../concepts/task.md/#python_functions) -and [notebook](../concepts/task.md/#notebook) tasks. - -```python linenums="1" hl_lines="11 23 35 45" ---8<-- "examples/concepts/catalog_api.py" -``` - - - - -## Passing Data Objects - -Data objects can be shared between [python](../concepts/task.md/#python_functions) or -[notebook](../concepts/task.md/#notebook) tasks, -instead of serializing data and deserializing to file structure, using -[get_object](../interactions.md/#runnable.get_object) and [put_object](../interactions.md/#runnable.put_object). - -Internally, we use [pickle](https:/docs.python.org/3/library/pickle.html) to serialize and -deserialize python objects. Please ensure that the object can be serialized via pickle. - -### Example - -In the below example, the step ```put_data_object``` puts a pydantic object into the catalog while the step -```retrieve_object``` retrieves the pydantic object from the catalog and prints it. - -You can run this example by ```python run examples/concepts/catalog_object.py``` - -```python linenums="1" hl_lines="10 30 38" ---8<-- "examples/concepts/catalog_object.py" -``` diff --git a/docs/concepts/index.md b/docs/concepts/index.md new file mode 100644 index 00000000..18f9eeb0 --- /dev/null +++ b/docs/concepts/index.md @@ -0,0 +1,53 @@ +Without any orchestrator, the simplest pipeline could be the below functions: + + +```python linenums="1" +def generate(): + ... + # write some files, data.csv + ... + # return objects or simple python data types. + return x, y + +def consume(x, y): + ... + # read from data.csv + # do some computation with x and y + + +# Stich the functions together +# This is the driver pattern. +x, y = generate() +consume(x, y) +``` + +## Runnable representation + +The same workflow in ```runnable``` would be: + +```python linenums="1" +from runnable import PythonTask, pickled, catalog, Pipeline + +generate_task = PythonTask(name="generate", function=generate, + returns=[pickled("x"), y], + catalog=Catalog(put=["data.csv"]) + +consume_task = PythonTask(name="consume", function=consume, + catalog=Catalog(get=["data.csv"]) + +pipeline = Pipeline(steps=[generate_task, consume_task]) +pipeline.execute() + +``` + + +- ```runnable``` exposes the functions ```generate``` and ```consume``` as [tasks](task.md). +- Tasks can [access and return](parameters.md/#access_returns) parameters. +- Tasks can also share files between them using [catalog](catalog.md). +- Tasks are stitched together as [pipeline](pipeline.md) +- The execution environment is configured via # todo + + +## Examples + +All the concepts are accompanied by [examples](https://github.com/AstraZeneca/runnable/tree/main/examples). diff --git a/docs/concepts/parameters.md b/docs/concepts/parameters.md index 1de3b83e..36af9ec3 100644 --- a/docs/concepts/parameters.md +++ b/docs/concepts/parameters.md @@ -1,48 +1,253 @@ -## TODO: Concretly show an example! +```parameters``` are data that can be passed from one ```task``` to another. -In runnable, ```parameters``` are python data types that can be passed from one ```task``` -to the next ```task```. These parameters can be accessed by the ```task``` either as -environment variables, arguments of the ```python function``` or using the -[API](../interactions.md). +For example, in the below snippet, the parameters ```x``` and ```y``` are passed from +```generate``` to ```consume```. -## Initial parameters +```python +x, y = generate() # returns x and y as output +consume(x, y) # consumes x, y as input arguments. +``` -The initial parameters of the pipeline can set by using a ```yaml``` file and presented -during execution +The data types of ```x``` and ```y``` can be: -```--parameters-file, -parameters``` while using the [runnable CLI](../usage.md/#usage) +- JSON serializable: int, string, float, list, dict including pydantic models. +- Objects: Any [dill](https://dill.readthedocs.io/en/latest/) friendly objects. -or by using ```parameters_file``` with [the sdk](..//sdk.md/#runnable.Pipeline.execute). -They can also be set using environment variables which override the parameters defined by the file. +## Compatibility + +Below table summarizes the input/output types of different task types. +For ex: notebooks can only take JSON serializable parameters as input +but can return json/pydantic/objects. + +| | Input | Output | +| -------- | :---------------------: | :----------------------: | +| python | json, pydantic, object via function arguments | json, pydantic, object as ```returns``` | +| notebook | json via cell tagged with ```parameters``` | json, pydantic, object as ```returns``` | +| shell | json via environment variables | json environmental variables as ```returns``` | + + + +## Project parameters + +Project parameters can be defined using a ```yaml``` file. These parameters can then be +over-ridden by tasks of the pipeline. + +They can also be provided by environment variables prefixed by ```RUNNABLE_PRM_```. +Environmental variables over-ride ```yaml``` parameters. + + +!!! warning inline end "Type casting" + + Annotating the arguments of python function ensures the right data type of arguments. + + It is advised to ```cast``` the parameters in notebook tasks or shell. === "yaml" Deeply nested yaml objects are supported. ```yaml - --8<-- "examples/concepts/parameters.yaml" + --8<-- "examples/common/initial_parameters.yaml" ``` === "environment variables" - Any environment variables prefixed with ```runnable_PRM_ ``` are interpreted as - parameters by the ```tasks```. - The yaml formatted parameters can also be defined as: ```shell - export runnable_PRM_spam="hello" - export runnable_PRM_eggs='{"ham": "Yes, please!!"}' + export runnable_PRM_integer="1" + export runnable_PRM_floater="3.14" + export runnable_PRM_stringer="hello" + export runnable_PRM_pydantic_param="{'x': 10, 'foo': bar}" + export runnable_PRM_chunks="[1, 2, 3]" ``` Parameters defined by environment variables override parameters defined by ```yaml```. This can be useful to do a quick experimentation without changing code. -## Parameters flow +### Accessing parameters + +=== "python" + + The functions have arguments that correspond to the project parameters. + + Without annotations for nested params, they are sent in as dictionary. + + ```python + --8<-- "examples/03-parameters/static_parameters_python.py" + ``` + +=== "notebook & shell" + + The notebook has cell tagged with ```parameters``` which are substituted at run time. + + The shell script has access to them as environmental variables. + + ```python + --8<-- "examples/03-parameters/static_parameters_non_python.py" + ``` + + + +## Access & returns + +### access + +The access of parameters returned by upstream tasks is similar to [project parameters](#project-parameters) + + +### returns + +Tasks can return parameters which can then be accessed by downstream tasks. + +The syntax is inspired by: + +```python +def generate(): + ... + return x, y + +def consume(x, y): + ... + +x, y = generate() # returns x and y as output +consume(x, y) # consumes x, y as input arguments. +``` + +and implemented in ```runnable``` as: + +=== "sdk" + + ```python + from runnable import PythonTask + # The returns syntax can be used for notebook and shell scripts too. + generate_task = PythonTask(function="generate", returns=["x", "y"]) + consume_task = PythonTask(function="consume") + + ``` +=== "yaml" + + ```yaml + generate: + type: task + command: generate + next: consume + returns: + - name: x + - name: y + consume: + ... + ``` + +!!! warning "order of returns" + + The order of ```returns``` should match the order of the python function returning them. + + +### marking returns as ```metric``` or ```object``` + +JSON style parameters can be marked as a ```metric``` in +[python functions](task.md/#python-functions), [notebook](task.md/#notebook), [shell](task.md/#shell). Metric parameters can be accessed as normal parameters in downstream steps. + +Returns marked as ```pickled``` in [python functions](task.md/#python-functions), [notebook](task.md/#notebook) are serialized using ```dill```. + +### Example + +```python +import pandas as pd + +# Assuming a function return a pandas dataframe and a score +def generate(): + ... + return df, score + +# Downstream step consuming the df and score +def consume(df: pd.Dataframe, score: float): + ... +``` + +=== "sdk" + + ```python + from runnable import metric, pickled, PythonTask + + generate_task = PythonTask(function="generate", + returns=[pickled("df"), # pickle df + metric("score")]) # mark score as metric + + consume_task = PythonTask(function="consume") + + ``` + +=== "yaml" + + ```yaml + generate: + type: task + command: generate + next: consume + returns: + - name: df + kind: object + - name: score + kind: metric + consume: + ... + ``` + + +## Complete Example + +=== "python" + + === "python" + + ```python linenums="1" hl_lines="28-34" + --8<-- "examples/03-parameters/passing_parameters_python.py" + ``` + + === "yaml" + + ```yaml linenums="1" hl_lines="25-32" + --8<-- "examples/03-parameters/passing_parameters_python.yaml" + ``` + +=== "notebook" + + To access parameters, the cell should be tagged with ```parameters```. Only + JSON style parameters can be injected in. + + Any python variable defined during the execution of the notebook matching the + name in ```returns``` is inferred as a parameter. The variable can be either + JSON type or objects. + + === "python" + + ```python linenums="1" hl_lines="24-29" + --8<-- "examples/03-parameters/passing_parameters_notebook.py" + ``` + + === "yaml" + + ```yaml linenums="1" hl_lines="21-28" + --8<-- "examples/03-parameters/passing_parameters_notebook.yaml" + ``` + +=== "shell" + + Shell tasks can only access/return JSON style parameters + + === "python" + + ```python linenums="1" hl_lines="30-36" + --8<-- "examples/03-parameters/passing_parameters_shell.py" + ``` + + === "yaml" -Tasks can access and return parameters and the patterns are specific to the -```command_type``` of the task nodes. Please refer to [tasks](../concepts/task.md) -for more information. + ```yaml linenums="1" hl_lines="26-31" + --8<-- "examples/03-parameters/passing_parameters_shell.yaml" + ``` diff --git a/docs/concepts/pipeline.md b/docs/concepts/pipeline.md index 5398eaaa..685582cc 100644 --- a/docs/concepts/pipeline.md +++ b/docs/concepts/pipeline.md @@ -1,231 +1,171 @@ -???+ tip inline end "Steps" - - In runnable, a step can be a simple ```task``` or ```stub``` or complex nested pipelines like - ```parallel``` branches, embedded ```dags``` or dynamic workflows. - - In this section, we use ```stub``` for convenience. For more in depth information about other types, - please see the relevant section. - - In **runnable**, we use the words -- ```dag```, ```workflows``` and ```pipeline``` interchangeably. +- ```workflows``` and ```pipeline``` interchangeably. - ```nodes```, ```steps``` interchangeably. +A ```workflow``` is a sequence of ```steps``` to perform. -Dag or directed acyclic graphs are a way to define your pipelines. -Its a graph representation of the list of tasks you want to perform and the order of it. - - -
- - -## Example -Below is an example pipeline. +!!! info "Composite pipelines" - -=== "yaml" - - ``` yaml linenums="1" - --8<-- "examples/concepts/traversal.yaml" - ``` - - -=== "python" - - ``` python linenums="1" - --8<-- "examples/concepts/traversal.py" - ``` + ```runnable``` pipelines are composable. For example, a pipeline can have + a parallel node which in itself has many pipelines running in parallel.
-A closer look at the example: +A visual example of a workflow: -## start_at - -- [x] start_at step is the starting node of the traversal. +```mermaid +stateDiagram-v2 + direction lr + state "hello stub" as start_at + state "hello python" as step_2 + state "hello notebook" as step_3 + state "hello shell" as step_4 + state "Success" as success -=== "yaml" + [*] --> start_at + start_at --> step_2 : #9989; + step_2 --> step_3 : #9989; + step_3 --> step_4 : #9989; + step_4 --> success : #9989; + success --> [*] +``` - The value should be valid key in ```steps``` +???+ abstract "Traversal" - ```yaml linenums="10" hl_lines="1" - --8<-- "examples/concepts/traversal.yaml:10:12" - ``` + Start at ```hello stub```. -=== "python" + If it is successful, go to ```next``` step of the pipeline until we reach the success state. - The node should be part of ```steps``` + Any failure in execution of step would, by default, go to the ```fail``` state. - ```python linenums="32" hl_lines="3" - --8<-- "examples/concepts/traversal.py:32:36" - ``` -By using a ```parallel``` node as starting node, you can get the behavior of multi-root graph. -
+## Syntax -## Steps +The above pipeline can be written in runnable as below. It is a mixed bag of +[python functions](task.md/#python-functions), [notebook](task.md/#notebook), [shell](task.md/#shell) +and [stub](task.md/#stub). -- [x] Apart from the terminal nodes (```success``` and ```fail```), the pipeline should have at least -one more node. +[API Documentation](../reference.md/#pipeline) +=== "sdk" -???+ warning inline end "Step names" + ```python linenums="1" + --8<-- "examples/02-sequential/traversal.py" + ``` - In runnable, the names of steps should not have ```%``` or ```.``` in them. + 1. Start the pipeline. + 2. The order of the steps is the execution order - You can name them as descriptive as you want. + - [x] The first step of the ```steps``` is the start of the workflow. + - [x] The order of execution follows the order of the tasks in the list. + - [x] The terminal nodes ```success``` and ```fail``` are added automatically. === "yaml" - ```yaml linenums="12" - --8<-- "examples/concepts/traversal.yaml:12:21" + ```yaml linenums="1" + --8<-- "examples/02-sequential/traversal.yaml" ``` -=== "python" + 1. Start the pipeline at this step. + 2. State the ```next``` node, if it succeeds. + 3. Add the success and fail nodes. - ```python linenums="14" hl_lines="1-6 19-23" - --8<-- "examples/concepts/traversal.py:14:36" - ``` + - [x] The first step is the step corresponding to ```start_at``` + - [x] The mapping defined in the steps. + - [x] The ```next``` step after a successful execution of a ```step```. + - [x] Needs explicit definition of ```success``` and ```fail``` nodes.
-## Linking - -- [x] All nodes except for ```success``` and ```fail``` nodes need to have a ```next``` -step to execute upon successful execution. +## on failure +By default, any failure during the execution of step will traverse to ```fail``` node +marking the execution as failed. +The ```fail``` node is implicitly added to the pipeline in python SDK while it +has to be stated in the yaml. -Visually, the above pipeline can be seen as: -???+ abstract inline end "Traversal" +This behavior can be over-ridden to follow a different path based on expected failures. - Start at step1. +### on failure success - If it is successful, go to ```next``` step of the pipeline until we reach the success state. - Any failure in execution of step would, by default, go to the fail state. +```step 1``` fails as the function raises an exception. +```step 4``` is an alternate node to a successful execution. +```step 4``` is the step to execution in case of the failure. +=== "pseudo code" -```mermaid -stateDiagram-v2 - state "Start at step 1" as start_at - state "step 2" as step_2 - state "step 3" as step_3 - state "Success" as success - state "Fail" as fail - - - [*] --> start_at - start_at --> step_2 : #9989; - step_2 --> step_3 : #9989; - step_3 --> success : #9989; - start_at --> fail: #10060; - step_2--> fail: #10060; - step_3--> fail: #10060; - success --> [*] - fail --> [*] -``` - + ```python -=== "yaml" + try: + raise_exception() + except: + # suppress exception + do_something() - ```yaml linenums="15" hl_lines="4 7 10" - --8<-- "examples/concepts/traversal.yaml:12:21" ``` -=== "python" +=== "sdk" - - ```python linenums="14" hl_lines="7-17" - --8<-- "examples/concepts/traversal.py:14:36" + ```python linenums="1" hl_lines="24 29 34 31" + --8<-- "examples/02-sequential/on_failure_succeed.py" ``` + 1. ```terminate_with_success``` is ```true``` traverses to success node. -### on failure - -By default, any failure during the execution of step will traverse to ```fail``` node -marking the execution as failed. You can override this behavior by using ```on_failure``` === "yaml" - ```yaml hl_lines="21" - --8<-- "examples/on-failure.yaml" + ```yaml linenums="1" hl_lines="23 25 32-34" + --8<-- "examples/02-sequential/on_failure_succeed.yaml" ``` -=== "python" - - ```python hl_lines="10" - --8<-- "examples/on_failure.py" - ``` - -=== "traversal" - - ```mermaid - stateDiagram-v2 - state "Start at step 1" as start_at - state "step 2" as step_2 - state "step 3" as step_3 - state "Success" as success +### On failure fail - [*] --> start_at - start_at --> step_2 : #10060; - start_at --> step_3 : #9989; - step_3 --> success : #9989; - success --> [*] - ``` - - -
+```step 1``` fails as the function raises an exception. +```step 4``` is an alternate node to a successful execution. -## Terminating -- [x] All pipelines should have one and only one Success and Fail state +```step 4``` is the step to execution in case of the failure. -Reaching one of these states as part of traversal indicates the status of the pipeline. +=== "pseudo code" -=== "yaml" - - The type determines the node to be a ```success``` or ``fail`` state. + ```python - The name can be anything that you prefer. + try: + raise_exception() + except: + # raise exception after doing something. + do_something() + raise - ``` yaml linenums="1" - --8<-- "examples/concepts/traversal.yaml:22:25" ``` -=== "python" - - Setting ```add_terminal_nodes``` to be ```true``` during pipeline creation adds - ```success``` and ```fail``` states with the names success and fail. +=== "sdk" - ``` python linenums="1" hl_lines="4" - --8<-- "examples/concepts/traversal.py:31:35" + ```python linenums="1" hl_lines="24 29 34 31" + --8<-- "examples/02-sequential/on_failure_fail.py" ``` - Individual steps can link + 1. ```terminate_with_failure``` is ```true``` traverses to fail node. - - success state by setting ```terminate_with_success``` to ```True``` - - fail state by setting ```terminate_with_fail``` to ```True``` - You can, alternatively, create a ```success``` and ```fail``` state and link them together. - - ```python - from runnable import Success, Fail - - success = Success(name="Custom Success") - fail = Fail(name="Custom Failure") +=== "yaml" + ```yaml linenums="1" hl_lines="23 25 32-34" + --8<-- "examples/02-sequential/on_failure_fail.yaml" ``` diff --git a/docs/concepts/stub.md b/docs/concepts/stub.md deleted file mode 100644 index f7e83b30..00000000 --- a/docs/concepts/stub.md +++ /dev/null @@ -1,41 +0,0 @@ -Stub nodes in runnable are just like -[```Pass``` state](https://docs.aws.amazon.com/step-functions/latest/dg/amazon-states-language-pass-state.html) -in AWS Step Functions or ```pass``` in python code. It is a placeholder and useful when you want to debug or -design your pipeline. - -Stub nodes can take arbitrary number of parameters and is always a success. - -## Example - -!!! note annotate inline end "Intuition" - - Designing a pipeline is similar to writing a modular program. Stub nodes are handy to create a placeholder - for some step that will be implemented in the future. - - During debugging, changing a node to ```stub``` will let you focus on the actual bug without having to - execute the additional steps. - - -=== "yaml" - - In the below example, all the steps are ```stub``` nodes. The only required field is - the ```next``` which is needed for graph traversal. As seen in ```step 2``` definition, - they can have arbitrary fields. - - - ``` yaml hl_lines="20-24" - --8<-- "examples/mocking.yaml" - ``` - -=== "python" - - In the below example, all the steps are ```stub``` nodes. - - ``` python hl_lines="21-24" - --8<-- "examples/mocking.py" - ``` - -The only required field is the ```name```, ```next``` which is needed for graph traversal. - -- yaml definition needs ```next``` to be defined as part of the step definition. -- python SDK can define the ```next``` when linking the nodes as part of the pipeline. diff --git a/docs/concepts/task.md b/docs/concepts/task.md index ced25339..6a64511b 100644 --- a/docs/concepts/task.md +++ b/docs/concepts/task.md @@ -1,522 +1,144 @@ Task nodes are the execution units of the pipeline. -In runnable, a ```command``` in a task node can be [python functions](#python_functions), -[Jupyter notebooks](#notebook) or a [shell scripts](#shell). -All task nodes can take arguments, retrieve and create files/objects and return -arguments, though their access patterns are different. +They can be [python functions](#python_functions), [notebooks](#notebook), +[shell scripts](#shell) or [stubs](#stub) - -In the below examples, we define a pipeline either using python SDK or yaml format but both are equivalent -and all the pipelines can be expressed in either formats. +In the below examples, highlighted lines of the code are the relevant bits while +the rest of the python code (or yaml) defines and executes a pipeline that executes +the python function/notebook/shell script/stubs. --- ## Python functions -Python is the default ```command type``` of a task node. The ```command``` -should be the dotted path to the python function. - -!!! example "Dotted path" - - Assuming the below project structure: - - - The ```command``` for the ```outer_function``` should be ```outer_functions.outer_function``` - - - The ```command``` for ```inner_function``` should be ```module_inner.inner_functions.inner_function``` - - - ``` - .. - ├── outer_functions.py - │   ├── outer_function() - ├── module_inner - │   ├── inner_functions.py - │   | ├── inner_function() - .. - - ``` +[API Documentation](../reference.md/#pythontask) ### Example - -=== "python" +=== "sdk" !!! tip inline end "Structuring" It is best to keep the application specific functions in a different module than the pipeline definition, if you are using Python SDK. - In this example, we combined them as one module for convenience. - - You can execute this pipeline using ```examples/concepts/simple.py``` - - ```python linenums="1" hl_lines="4-8" - --8<-- "examples/concepts/simple.py" - ``` - -=== "yaml" - - You can execute this by runnable execute -f examples/concepts/simple.yaml - - ```yaml linenums="1" - --8<-- "examples/concepts/simple.yaml" - ``` - - -### Closer look - - -Lines 4-8 in the python code defines the function that we want to execute as - part of the pipeline. They are *plain old python functions*. - -The rest of the python code (or yaml) defines and executes a pipeline that executes a task whose ```command``` -is to execute this function. - - -### Fields - -- ```command``` : Should refer to the function in [dotted path notation](#python_functions). -- ```command_type```: Defaults to python and not needed for python task types. -- [next](pipeline.md/#linking): is required for any step of the pipeline except for success and fail steps. -- [on_failure](pipeline.md/#on_failure): Name of the step to execute if the step fails. -- catalog: Optional required for data access patterns from/to the central storage. - - -### Accessing parameters - -!!! tip "Mutability" - - Functions mutating the input parameters is idiomatic is python. However, functions as part of runnable - pipeline should return the mutated parameters for downstream steps to have access to them. - - For example, unless the function ```mutating_function``` returns the updated parameters, runnable will - not know about the change. - - - ```python - d = {"name": "monty"} - print(d) - ">>> {'name': 'monty'}" - - def mutating_function(input_dict): - input_dict["name"] = "python" - - - mutating_function(d) - print(d) - ">>>{'name': 'python'}" + ```python linenums="1" hl_lines="29-33" + --8<-- "examples/01-tasks/python_tasks.py" ``` + +=== "yaml" -Please refer to [Initial Parameters](parameters.md/#initial_parameters) for more information about setting -initial parameters. - -Lets assume that the initial parameters are: - -```yaml ---8<-- "examples/concepts/parameters.yaml" -``` - -- [x] Passing parameters between steps - - -=== "Natively" - - Internally, runnable stores the parameters in serialised json format. - - ### ^^Input arguments to the function^^ - - Any arguments passed into the function should be at the root level of the json object. - Arguments with type annotations will be casted appropriately. - Arguments with no type annotation will be sent in as ```dict```. - - In the below example, in line 13 and 28, arguments ```spam``` and ```eggs``` are at the root level in - the yaml representation and also are annotated in the function signature. They are sent in to the function - as arguments with proper type conversion. - - !!! warning "Annotation" - - Without annotations, runnable cannot determine the type and can cause unexpected behavior. - - This is especially true in distributed executors (eg: argo workflows). - - - ### ^^Output arguments of function^^ - - Only pydantic models are allowed to be return types of a function. There is no need - for any type annotation for return type but is advised for a cleaner code. - - Output arguments are stored in json format by - [model_dump](https://docs.pydantic.dev/latest/concepts/serialization/#modelmodel_dump), - respecting the alias. - - The model structure of the pydantic model would be added to the root structure. This is - useful when you want to add or modify parameters at the root level. For example, line 25 - would update all the initial parameters. - - To update a subset of existing parameters at the root level, you can either create a new model or - use [DynamicModel](https://docs.pydantic.dev/latest/concepts/models/#dynamic-model-creation). - For example, lines 42-45 create a dynamic model to update the ```eggs``` parameter. - - - !!! warning "caution" - - Returning "eggs" in line 42 would result in a new parameter "ham" at the root level - as it looses the nested structure. - - - You can run this example using: ```python run examples/concepts/task_native_parameters.py``` - - ```python linenums="1" - --8<-- "examples/concepts/task_native_parameters.py" - ``` - - -=== "Using the API" - - runnable also has [python API](../interactions.md) to access parameters. - - Use [get_parameter](../interactions.md/#runnable.get_parameter) to access a parameter at the root level. - You can optionally specify the ```type``` by using ```cast_as``` argument to the API. - For example, line 19 would cast ```eggs```parameter into ```EggsModel```. - Native python types do not need any explicit ```cast_as``` argument. - - Use [set_parameter](../interactions.md/#runnable.set_parameter) to set parameters at the root level. - Multiple parameters can be set at the same time, for example, line 26 would set both the ```spam``` - and ```eggs``` in a single call. - - The pydantic models would be serialised to json format using - [model_dump](https://docs.pydantic.dev/latest/concepts/serialization/#modelmodel_dump), respecting the alias. - - - You can run this example by: ```python run examples/concepts/task_api_parameters.py``` - - ```python linenums="1" - --8<-- "examples/concepts/task_api_parameters.py" - ``` + !!! example "Dotted path" -=== "Using environment variables" + Assuming the below project structure: - Any environment variable with ```runnable_PRM_``` is understood to be a parameter in runnable. + - The ```command``` for the ```outer_function``` should be ```outer_functions.outer_function``` - Before the execution of the ```command```, all the parameters at the root level are set as environment variables - with the key prefixed by ```runnable_PRM_```. Python functions that are called during the execution of the command - can also access them as environment variables. + - The ```command``` for ```inner_function``` should be ```module_inner.inner_functions.inner_function``` - After the execution of the ```command```, the environment is "scanned" again to identify changes to the existing - variables prefixed by ```runnable_PRM_```. All updated variables are stored at the root level. - Parameters set by environment variables over-ride the parameters defined by the initial parameters which can be - handy to quickly experiment without modifying code or to dynamically adjust behavior when running in - orchestrators like Argo or AWS step functions. + ``` + .. + ├── outer_functions.py + │   ├── outer_function() + ├── module_inner + │   ├── inner_functions.py + │   | ├── inner_function() + .. - You can run this example by: ```python run examples/concepts/task_env_parameters.py``` + ``` - ```python linenums="1" - --8<-- "examples/concepts/task_env_parameters.py" + ```yaml linenums="1" hl_lines="20-23" + --8<-- "examples/01-tasks/python_tasks.yaml" ``` -!!! abstract "Verbose?" - - We acknowledge that using pydantic models as our - [Data transfer objects](https://stackoverflow.com/questions/1051182/what-is-a-data-transfer-object-dto) is verbose in comparison to using - ```dict```. - - The advantages of using strongly typed DTO has long term advantages of implicit validation, typing hints - in editors. This choice is inspired from [FastAPI's](https://fastapi.tiangolo.com/features/#pydantic-features) - ways of working. - - -### Passing data and execution logs - -Please refer to [catalog](../concepts/catalog.md) for more details and examples on passing -data between tasks and the storage of execution logs. - ---- +
## Notebook -Jupyter notebooks are supported as part of a task step. We internally use +Jupyter notebooks are supported as tasks. We internally use [Ploomber engine](https://github.com/ploomber/ploomber-engine) for executing notebooks. The output is saved to the same location as the input notebook but with ```_out``` post-fixed to -the name of the notebook. This is configurable by ```notebook_output_path``` -while defining the task either via yaml or the sdk. +the name of the notebook and is also saved in the ```catalog``` for logging and ease of debugging. -The output notebook is also saved in the ```catalog``` for logging and ease of debugging. +[API Documentation](../reference.md/#notebooktask) ### Example -=== "Notebook" - - The below is a simple notebook for demonstration. - - Below is just a screenshot, the original notebook can be found at ```examples/concepts/simple_notebook.yaml```. - - -
- ![Image title](../assets/screenshots/simple_notebook.png){ width="800" height="600"} -
-
- -=== "Pipeline" +=== "sdk" - The same pipeline can also be defined via the SDK. - - ```yaml linenums="1" - --8<-- "examples/concepts/simple_notebook.yaml" + ```python linenums="1" hl_lines="29-33" + --8<-- "examples/01-tasks/notebook.py" ``` -### Closer look - -The structure of the pipeline remains the same as with any ```task```. In the pipeline definition, - ```command_type``` in line number 19 specifies the type of the task to be a notebook - while the ```command``` in line number 20 specifies the location of the notebook relative to the project root. - -The notebook executed in the same shell session, thanks to ploomber engine, so any libraries installed in -the current project are readily available. - - -### Fields - -- ```command``` : Should be the location of the notebook relative to the project root. It should end with ```.ipynb```. -- ```command_type```: Should be ```notebook``` to execute notebooks. -- ```notebook_output_path```: the location of the executed notebook. Defaults to the -notebook name defined in ```command``` with ```_out``` post-fixed. The location should be relative -to the project root and also would be stored in catalog in the same location. -- [next](../concepts/pipeline.md/#linking): is required for any step of the pipeline except for success and fail steps. -- [on_failure](../concepts/pipeline.md/#on_failure): Name of the step to execute if the step fails. -- catalog: Optional required for data access patterns from/to the central storage. - -### ploomber arguments - -Please refer to -[ploomber arguments](https://engine.ploomber.io/en/latest/api/api.html#execute-notebook) -for available arguments. During the execution of the notebook, we set - -- input_path: To refer to command -- output_path: to refer to notebook_output_path. -- parameters: To the dictionary of available parameters. -- log_output: True. -- progress_bar: False - -You can set additional arguments or override these by sending an optional dictionary -```optional_ploomber_args```. - - - - - -### Accessing parameters - -Please refer to [Initial Parameters](parameters.md/#initial_parameters) for more information about setting -initial parameters. - -Assume that the initial parameters are: - -```yaml ---8<-- "examples/concepts/parameters.yaml" -``` - -- [x] Passing parameters between steps - -=== "Natively" - - !!! note - - The actual notebook is available in examples/concepts/notebook_native_parameters.ipynb. Below are some - screenshots to show the detail. - - - === "pipeline definition" - - ```yaml title="Pipeline definition" - --8<-- "examples/concepts/notebook_native_parameters.yaml" - ``` - - === "Notebook" - -
- ![Image title](../assets/screenshots/notebook_native_parameters.png){ width="800" height="600"} -
-
- - - ### ^^Input parameters to the notebook^^ - - Input parameters to the notebook are "injected" into the notebook by tagging the cell as "parameters". - Please refer to - [Ploomber engine](https://engine.ploomber.io/en/latest/user-guide/running.html#parametrizing-notebooks) - for more details. - - - For example, the initial parameters will be passed to the notebook as shown below. - -
- ![Image title](../assets/screenshots/notebook_input_parameters.png){ width="800" height="600"} -
-
- - The cell with the ```parameters``` tag will be introspected and variables defined in that cell would be - replaced with the variables passed into the notebook during execution. - - Nested parameters will be sent in as a ```dict ```. - - ### ^^Output parameters from the notebook^^ - - Similar to the input parameters, outputs from the notebook ca be indicated by tagging the cell. Please - ensure The tagged cell should ```print``` the dictionary as the output and nothing else. - - The default ```tag``` to indicate output parameters is ```runnable_output``` but can be configured by - ```output_cell_tag``` while defining the task in both SDK and yaml. - - -
- ![Image title](../assets/screenshots/notebook_output_parameters.png){ width="800" height="600"} -
-
- - - -=== "Using the API" - - As seen in [python tasks](#python_functions), you can use the python API's to get and set parameters. - - - === "pipeline definition" - - The below pipeline reuses the native parameters notebook to modify the initial parameters, retrieves - them via the ```get_parameter``` API and updates the parameters by ```set_parameter``` API. - - ```yaml title="Pipeline definition" - --8<-- "examples/concepts/notebook_api_parameters.yaml" - ``` - - === "Notebook" - - Below is just a screenshot of the notebook, the original notebook can be found at - ```examples/concepts/notebook_api_parameters.ipynb``` - - -
- ![Image title](../assets/screenshots/notebook_api_parameters.png){ width="800" height="600"} -
-
- - - In the output notebook, you might see a cell with a tag ```injected_parameters``` at the - start of the notebook, this should not interfere with the way the notebook behaves. - - -=== "Using environment variables" - - As seen in [python tasks](#python_functions), you can get/set the parameters by using environment variables. - Any variable with prefix ```runnable_PRM_``` is identified to be a parameter. - - === "pipeline definition" - - The below pipeline reuses the native parameters notebook to modify the initial parameters, by using environment variables. - - ```yaml title="Pipeline definition" - --8<-- "examples/concepts/notebook_env_parameters.yaml" - ``` - - === "Notebook" - - Below is just a screenshot of the notebook, the original notebook can be found at - ```examples/concepts/notebook_env_parameters.ipynb``` - - -
- ![Image title](../assets/screenshots/notebook_env_parameters.png){ width="800" height="600"} -
-
- +=== "yaml" -### Passing data and execution logs + ```yaml linenums="1" hl_lines="27-31" + --8<-- "examples/01-tasks/notebook.yaml" + ``` -Please refer to [catalog](catalog.md) for more details and examples on passing -data between tasks and the storage of execution logs. ---- +
## Shell [Python functions](#python_functions) and [Jupyter notebooks](#notebook) provide a rich interface to the python ecosystem while shell provides a interface to non-python executables. -We internally use [Popen](https://docs.python.org/3.8/library/subprocess.html#subprocess.Popen) -to execute the command. +[API Documentation](../reference.md/#shelltask) ### Example -```yaml title="Pipeline definition" ---8<-- "examples/concepts/task_shell_simple.yaml" -``` - -### Fields - -- ```command``` : Should refer to the exact command to execute. Multiple commands can be run by using the ```&&``` delimiter. -- ```command_type```: Should be shell. -- [next](pipeline.md/#linking): is required for any step of the pipeline except for success and fail steps. -- [on_failure](pipeline.md/#on_failure): Name of the step to execute if the step fails. -- catalog: Optional required for data access patterns from/to the central storage. - - -### Accessing parameters - -Please refer to [Initial Parameters](parameters.md/#initial_parameters) for more information about setting -initial parameters. +=== "sdk" -Assuming the initial parameters are: - -```yaml ---8<-- "examples/concepts/parameters.yaml" -``` - -- [x] Passing parameters between steps - -The only way ```shell``` commands can pass parameters between steps is via the ```environment``` variables. -Any environment variable with prefix ```runnable_PRM_``` should be understood as a parameter inside the shell -script/command. Nested parameters are set in json string format. + ```python linenums="1" hl_lines="22-26" + --8<-- "examples/01-tasks/scripts.py" + ``` -To pass parameter to downstream steps, set/update environment variables with ```runnable_PRM_``` prefix. The -execution environment is "scanned" for updated environment variables and stored for downstream steps. +=== "yaml" -```yaml linenums="1" ---8<-- "examples/concepts/task_shell_parameters.yaml" -``` + ```yaml linenums="1" hl_lines="19-23" + --8<-- "examples/01-tasks/scripts.yaml" + ``` -In the above example, the execution is specified with initial parameters by the ```-p``` option. -In line 18, we just display the parameters prefixed by ```runnable_PRM_``. The next step ```modify_initial``` -updates the parameters by setting new environment variables in line 26 and 27. +## Stub -The next step ```display_again``` displays the updated parameters and updates them for downstream steps in -lines 33-35. +Stub nodes in runnable are just like ```pass``` or ```...``` in python code. +It is a placeholder and useful when you want to debug ordesign your pipeline. +Stub nodes can take arbitrary number of parameters and is always a success. -!!! note "Output" +[API Documentation](../reference.md/#stub) - You might notice that the output might have a few extra lines starting with ```runnable```. You can ignore - them as they are generated by internal mechanisms of runnable. +### Example +!!! note annotate inline end "Intuition" + Designing a pipeline is similar to writing a modular program. Stub nodes are handy to create a placeholder + for some step that will be implemented in the future. + During debugging, changing a node to ```stub``` will let you focus on the actual bug without having to + execute the additional steps. -### Passing data and execution logs -Please refer to [catalog](catalog.md) for more details and examples on passing -data between tasks and the storage of execution logs. +=== "sdk" + ```python linenums="1" hl_lines="23 28 30" + --8<-- "examples/01-tasks/stub.py" + ``` -## Experiment tracking +=== "yaml" -Please refer to [experiment tracking](experiment-tracking.md) for more details and examples on experiment tracking. + ```yaml linenums="1" hl_lines="19-29" + --8<-- "examples/01-tasks/stub.yaml" + ``` diff --git a/docs/concepts/the-big-picture.md b/docs/concepts/the-big-picture.md index cb62f5ff..998958d1 100644 --- a/docs/concepts/the-big-picture.md +++ b/docs/concepts/the-big-picture.md @@ -1,5 +1,17 @@ runnable revolves around the concept of pipelines or workflows and tasks that happen within them. + +# Invert this and give examples + +- Explain task, python, notebook or shell. +- Then a pipeline stitching various tasks. +- Then flow of parameters/metrics/objects. +- File flow. +- secrets. +- parallel +- map + + --- A [workflow](pipeline.md) is simply a series of steps that you want to execute for a desired outcome. diff --git a/docs/index.md b/docs/index.md index f03189ae..bdb5d771 100644 --- a/docs/index.md +++ b/docs/index.md @@ -12,7 +12,7 @@ ## Example -The data science specific code is a well-known +The below data science flavored code is a well-known [iris example from scikit-learn](https://scikit-learn.org/stable/auto_examples/linear_model/plot_iris_logistic.html). @@ -21,7 +21,7 @@ The data science specific code is a well-known ``` -1. Return objects X and Y. +1. Return two serialized objects X and Y. 2. Store the file `iris_logistic.png` for future reference. 3. Define the sequence of tasks. 4. Define a pipeline with the tasks @@ -70,13 +70,10 @@ The difference between native driver and runnable orchestration: ``` -1. Return objects X and Y. -2. Store the file `iris_logistic.png` for future reference. -3. Define the sequence of tasks. --- -- [x] Absolutely no change in data science code to make it `runnable` +- [x] ```Domain``` code remains completely independent of ```driver``` code. - [x] The ```driver``` function has an equivalent and intuitive runnable expression - [x] Reproducible by default, runnable stores metadata about code/data/config for every execution. - [x] The pipeline is `runnable` in any environment. @@ -84,64 +81,10 @@ The difference between native driver and runnable orchestration: ## But why runnable? -Obviously, there are a lot of orchestration tools in python. A well maintained and curated [list is +Obviously, there are a lot of orchestration tools. A well maintained and curated [list is available here](https://github.com/EthicalML/awesome-production-machine-learning/). -Below is a rough comparison of `runnable` to others. - - -|Feature|runnable|Airflow|Argo workflows|Metaflow|ZenML|Kedro| -|:---:|:---:|:---:|:---:|:---:|:---:|:---:| -|Cross platform|:white_check_mark:|:white_check_mark:|:white_check_mark:|:white_check_mark:|:white_check_mark:|:white_check_mark:| -|Bring your own infrastructure |:white_check_mark:|:x:|:x:|:x:|:x:|:white_check_mark:| -|Local executions|:white_check_mark:|:x:|:x:|:white_check_mark:|:white_check_mark:|:white_check_mark:| -|Bring your own code|:white_check_mark:|:x:|:x:|:x:|:x:|:x:| -|Reproducibility of executions|:white_check_mark:|:x:|:x:|:white_check_mark:|:white_check_mark:|:white_check_mark:| -|Easy to move on|:white_check_mark:|:X:|:x:|:x:|:x:|:white_check_mark:| -|End to end platform|:x:|:white_check_mark:|:x:|:white_check_mark:|:white_check_mark:|:x:| -|Task level orchestration|:x:|:white_check_mark:|:white_check_mark:|:x:|:x:|:x:| -|Notebook as tasks|:white_check_mark:|:x:|:x:|:x:|:x:|:x:| -|Unit testable pipelines|:white_check_mark:|:x:|:x:|:white_check_mark:|:white_check_mark:|:x:| -|Multi language support|:white_check_mark:|:white_check_mark:|:white_check_mark:|:X:|:x:|:x:| - - - - -They can be broadly classified in three categories: - -- __Native orchestrators__: These orchestrators are responsible for task level orchestration, -resource management on chosen infrastructure. Examples: - - - Airflow - - Argo workflows - - AWS step functions - - -#### runnable is complimentary to these orchestrators and is designed to enable data teams use them effectively. - -- __Platforms__: These are meant to provide end to end platform for training, deploying and -serving of ML models. Examples: - - - Dagster - - Prefect - - Flyte - - They have specific infrastructure requirements and are great if the entire organization buys into - their philosophy and ways of working. - -#### runnable is designed to work with your infrastructure and ways of working instead of dictating them. - - - -- __Meta orchestrators__: Orchestrators using the native orchestrators but provide a simplified -SDK tailored for typical data oriented tasks. Examples include: - - - Kedro: cross platform transpiler. - - Metaflow: A mix of platform and SDK. - - ZenML: A mix of platform and SDK. - -runnable is a _meta orchestrator_ with different design decisions. - +```runnable``` stands out based on these design principles.
@@ -151,8 +94,7 @@ runnable is a _meta orchestrator_ with different design decisions. Your application code remains as it is. Runnable exists outside of it. - - No API's or decorators or imposed structure. - - Most often it is a single file. + - No API's or decorators or any imposed structure. [:octicons-arrow-right-24: Getting started](concepts/the-big-picture.md) @@ -160,10 +102,9 @@ runnable is a _meta orchestrator_ with different design decisions. --- - Runnable can be adapted to your infrastructure stack instead of dictating it. + Minimal disruption to your current infrastructure patterns. - - Intentionally minimal in scope as a composer of pipelines in native orchestrators. - - Every execution is ready to be deployed to production. + - ```runnable``` composes pipeline definitions suited to your infrastructure. [:octicons-arrow-right-24: Infrastructure](configurations/overview.md) @@ -204,3 +145,5 @@ runnable is a _meta orchestrator_ with different design decisions.
+ +## Comparisons/alternatives diff --git a/docs/reference.md b/docs/reference.md new file mode 100644 index 00000000..0cd9cf34 --- /dev/null +++ b/docs/reference.md @@ -0,0 +1,146 @@ +## Catalog + +=== "sdk" + + ::: runnable.Catalog + options: + show_root_heading: true + show_bases: false + heading_level: 3 + +=== "yaml" + + + +
+ +## Stub + +=== "sdk" + + ::: runnable.Stub + options: + show_root_heading: true + show_bases: false + heading_level: 3 + +=== "yaml" + + + +
+ +## PythonTask + +=== "sdk" + + ::: runnable.PythonTask + options: + show_root_heading: true + show_bases: false + show_docstring_description: true + heading_level: 3 + +=== "yaml" + + + +
+ +## ShellTask + +=== "sdk" + + ::: runnable.ShellTask + options: + show_root_heading: true + show_bases: false + show_docstring_description: true + heading_level: 3 + +=== "yaml" + + + +
+ + +## NotebookTask + +=== "sdk" + + ::: runnable.NotebookTask + options: + show_root_heading: true + show_bases: false + show_docstring_description: true + heading_level: 3 + +=== "yaml" + + +
+ +## Parallel + + +=== "sdk" + + ::: runnable.Parallel + options: + show_root_heading: true + show_bases: false + show_docstring_description: true + heading_level: 3 + +=== "yaml" + + + +
+ +## Map + +=== "sdk" + + ::: runnable.Map + options: + show_root_heading: true + show_bases: false + show_docstring_description: true + heading_level: 3 + +=== "yaml" + +
+ + + +::: runnable.Success + options: + show_root_heading: true + show_bases: false + show_docstring_description: true + +
+ +::: runnable.Fail + options: + show_root_heading: true + show_bases: false + show_docstring_description: true + +
+ +## Pipeline + +=== "sdk" + + ::: runnable.Pipeline + options: + show_root_heading: true + show_bases: false + show_docstring_description: true + heading_level: 3 + +=== "yaml" diff --git a/docs/yaml.md b/docs/yaml.md new file mode 100644 index 00000000..f811d22b --- /dev/null +++ b/docs/yaml.md @@ -0,0 +1,8 @@ +::: runnable.tasks.PythonTaskType + options: + show_root_heading: true + show_bases: false + show_docstring_description: true + show_members: false + +
diff --git a/examples/01-tasks/notebook.yaml b/examples/01-tasks/notebook.yaml index f1826210..d0c48965 100644 --- a/examples/01-tasks/notebook.yaml +++ b/examples/01-tasks/notebook.yaml @@ -29,7 +29,3 @@ dag: command_type: notebook command: examples/common/simple_notebook.ipynb # The path is relative to the root of the project. next: success - success: - type: success - fail: - type: fail diff --git a/examples/01-tasks/python_tasks.py b/examples/01-tasks/python_tasks.py index 5a493c5a..0cf1f58e 100644 --- a/examples/01-tasks/python_tasks.py +++ b/examples/01-tasks/python_tasks.py @@ -3,7 +3,9 @@ python examples/01-tasks/python_tasks.py -The stdout of "Hello World!" would be captured as execution log and stored in the catalog. +The stdout of "Hello World!" would be captured as execution +log and stored in the catalog. + An example of the catalog structure: .catalog @@ -22,7 +24,8 @@ def main(): # Create a tasks which calls the function "hello" - # If this step executes successfully, the pipeline will terminate with success + # If this step executes successfully, + # the pipeline will terminate with success hello_task = PythonTask( name="hello", function=hello, diff --git a/examples/01-tasks/python_tasks.yaml b/examples/01-tasks/python_tasks.yaml index 1a86719c..c5e5d6d4 100644 --- a/examples/01-tasks/python_tasks.yaml +++ b/examples/01-tasks/python_tasks.yaml @@ -18,10 +18,6 @@ dag: start_at: hello_task steps: hello_task: - type: task # The functional unit of the pipeline which does the work. + type: task command: examples.common.functions.hello # dotted path to the function. - next: success # If this function succeeds, mark the pipeline as success - success: - type: success - fail: - type: fail + next: success diff --git a/examples/01-tasks/scripts.yaml b/examples/01-tasks/scripts.yaml index ee1687e3..1f04b1c4 100644 --- a/examples/01-tasks/scripts.yaml +++ b/examples/01-tasks/scripts.yaml @@ -1,27 +1,20 @@ dag: description: | - This is a sample pipeline with one step that executes a shell command. + This is a sample pipeline with one step that + executes a shell command. You can run this pipeline by: runnable execute -f examples/01-tasks/scripts.yaml - The command can be anything that can be executed in a shell. - The stdout/stderr of the execution is captured as execution log and - stored in the catalog. - For example: .catalog └── seasoned-perlman-1355 └── hello.execution.log - start_at: notebook + start_at: shell steps: - notebook: + shell: type: task command_type: shell command: echo "hello world!!" # The path is relative to the root of the project. next: success - success: - type: success - fail: - type: fail diff --git a/examples/01-tasks/stub.py b/examples/01-tasks/stub.py index ea3ac68c..8899fb30 100644 --- a/examples/01-tasks/stub.py +++ b/examples/01-tasks/stub.py @@ -4,10 +4,12 @@ step 1 >> step 2 >> step 3 >> success All the steps are mocked and they will just pass through. - Use this pattern to define the skeleton of your pipeline and flesh out the steps later. + Use this pattern to define the skeleton of your pipeline + and flesh out the steps later. - Note that you can give any arbitrary keys to the steps (like step 2). This is handy - to mock steps within mature pipelines. + Note that you can give any arbitrary keys to the steps + (like step 2). + This is handy to mock steps within mature pipelines. You can run this pipeline by: python examples/01-tasks/stub.py @@ -21,16 +23,21 @@ def main(): step1 = Stub(name="step1") # It takes arbitrary arguments - # Useful for temporarily silencing steps within mature pipelines + # Useful for temporarily silencing steps within + # mature pipelines step2 = Stub(name="step2", what="is this thing") step3 = Stub(name="step3", terminate_with_success=True) - pipeline = Pipeline(steps=[step1, step2, step3], add_terminal_nodes=True) + pipeline = Pipeline( + steps=[step1, step2, step3], + add_terminal_nodes=True, + ) pipeline.execute() - # A function that creates pipeline should always return a Pipeline object + # A function that creates pipeline should always return a + # Pipeline object return pipeline diff --git a/examples/01-tasks/stub.yaml b/examples/01-tasks/stub.yaml index 788828fe..f7001cb3 100644 --- a/examples/01-tasks/stub.yaml +++ b/examples/01-tasks/stub.yaml @@ -5,10 +5,12 @@ dag: step 1 >> step 2 >> step 3 >> success All the steps are mocked and they will just pass through. - Use this pattern to define the skeleton of your pipeline and flesh out the steps later. + Use this pattern to define the skeleton of your pipeline + and flesh out the steps later. - Note that you can give any arbitrary keys to the steps (like step 2). This is handy - to mock steps within mature pipelines. + Note that you can give any arbitrary keys to the steps + (like step 2). + This is handy to mock steps within mature pipelines. You can run this pipeline by: runnable execute -f examples/01-tasks/stub.yaml @@ -25,7 +27,3 @@ dag: step 3: type: stub next: success - success: - type: success - fail: - type: fail diff --git a/examples/02-sequential/default_fail.yaml b/examples/02-sequential/default_fail.yaml index 4802ccb3..ec7fe156 100644 --- a/examples/02-sequential/default_fail.yaml +++ b/examples/02-sequential/default_fail.yaml @@ -21,7 +21,3 @@ dag: step 3: type: stub # This will never execute next: success - success: - type: success - fail: - type: fail diff --git a/examples/02-sequential/on_failure_fail.py b/examples/02-sequential/on_failure_fail.py index ce7892c9..ef6fefe2 100644 --- a/examples/02-sequential/on_failure_fail.py +++ b/examples/02-sequential/on_failure_fail.py @@ -26,7 +26,7 @@ def main(): step_2 = Stub(name="step 2") step_3 = Stub(name="step 3", terminate_with_success=True) - step_4 = Stub(name="step 4", terminate_with_failure=True) + step_4 = Stub(name="step 4", terminate_with_failure=True) # (1) step_1.on_failure = step_4.name diff --git a/examples/02-sequential/on_failure_fail.yaml b/examples/02-sequential/on_failure_fail.yaml index 0521038e..2159b6c1 100644 --- a/examples/02-sequential/on_failure_fail.yaml +++ b/examples/02-sequential/on_failure_fail.yaml @@ -32,7 +32,3 @@ dag: step_4: type: stub next: fail - success: - type: success - fail: - type: fail diff --git a/examples/02-sequential/on_failure_succeed.py b/examples/02-sequential/on_failure_succeed.py index b21c2cf1..9a730d27 100644 --- a/examples/02-sequential/on_failure_succeed.py +++ b/examples/02-sequential/on_failure_succeed.py @@ -26,7 +26,7 @@ def main(): step_2 = Stub(name="step 2") step_3 = Stub(name="step 3", terminate_with_success=True) - step_4 = Stub(name="step 4", terminate_with_success=True) + step_4 = Stub(name="step 4", terminate_with_success=True) # (1) step_1.on_failure = step_4.name diff --git a/examples/02-sequential/on_failure_succeed.yaml b/examples/02-sequential/on_failure_succeed.yaml index 50c7c4b6..27a19b58 100644 --- a/examples/02-sequential/on_failure_succeed.yaml +++ b/examples/02-sequential/on_failure_succeed.yaml @@ -32,7 +32,3 @@ dag: step_4: type: stub next: success - success: - type: success - fail: - type: fail diff --git a/examples/02-sequential/traversal.py b/examples/02-sequential/traversal.py index bcb35254..d8457a4e 100644 --- a/examples/02-sequential/traversal.py +++ b/examples/02-sequential/traversal.py @@ -47,7 +47,14 @@ def main(): # The pipeline has a mix of tasks. # The order of execution follows the order of the tasks in the list. - pipeline = Pipeline(steps=[stub_task, python_task, shell_task, notebook_task]) + pipeline = Pipeline( + steps=[ # (2) + stub_task, # (1) + python_task, + shell_task, + notebook_task, + ] + ) pipeline.execute() diff --git a/examples/02-sequential/traversal.yaml b/examples/02-sequential/traversal.yaml index 88ed3632..1b0413b3 100644 --- a/examples/02-sequential/traversal.yaml +++ b/examples/02-sequential/traversal.yaml @@ -24,11 +24,11 @@ dag: You can run this pipeline as: runnable execute -f examples/02-sequential/traversal.yaml - start_at: hello stub + start_at: hello stub # (1) steps: hello stub: type: stub - next: hello python + next: hello python # (2) hello python: type: task command_type: python @@ -43,8 +43,4 @@ dag: type: task command_type: notebook command: examples/common/simple_notebook.ipynb # The path is relative to the root of the project. - next: success - success: - type: success - fail: - type: fail + next: success # (3) diff --git a/examples/03-parameters/passing_parameters_notebook.yaml b/examples/03-parameters/passing_parameters_notebook.yaml index 4eb25c0c..aa56d28f 100644 --- a/examples/03-parameters/passing_parameters_notebook.yaml +++ b/examples/03-parameters/passing_parameters_notebook.yaml @@ -25,6 +25,7 @@ dag: - name: stringer - name: pydantic_param - name: score + kind: metric next: read_parameters read_parameters: type: task @@ -35,7 +36,3 @@ dag: command_type: notebook command: examples/common/read_parameters.ipynb next: success - success: - type: success - fail: - type: fail diff --git a/examples/03-parameters/passing_parameters_python.yaml b/examples/03-parameters/passing_parameters_python.yaml index b2d73b30..1919e786 100644 --- a/examples/03-parameters/passing_parameters_python.yaml +++ b/examples/03-parameters/passing_parameters_python.yaml @@ -29,13 +29,10 @@ dag: - name: stringer - name: pydantic_param - name: score + kind: metric next: read_parameters read_parameters: type: task command: examples.common.functions.read_parameter next: success - success: - type: success - fail: - type: fail diff --git a/examples/03-parameters/passing_parameters_shell.yaml b/examples/03-parameters/passing_parameters_shell.yaml index b12e48d0..51977425 100644 --- a/examples/03-parameters/passing_parameters_shell.yaml +++ b/examples/03-parameters/passing_parameters_shell.yaml @@ -49,7 +49,3 @@ dag: exit 1; fi next: success - success: - type: success - fail: - type: fail diff --git a/examples/03-parameters/static_parameters_non_python.yaml b/examples/03-parameters/static_parameters_non_python.yaml index cf7809d5..35f4d0d9 100644 --- a/examples/03-parameters/static_parameters_non_python.yaml +++ b/examples/03-parameters/static_parameters_non_python.yaml @@ -44,7 +44,3 @@ dag: exit 1; fi next: success - success: - type: success - fail: - type: fail diff --git a/examples/03-parameters/static_parameters_python.py b/examples/03-parameters/static_parameters_python.py index abe8aeff..6d783150 100644 --- a/examples/03-parameters/static_parameters_python.py +++ b/examples/03-parameters/static_parameters_python.py @@ -32,11 +32,30 @@ def main(): + """ + Signature of read_initial_params_as_pydantic + def read_initial_params_as_pydantic( + integer: int, + floater: float, + stringer: str, + pydantic_param: ComplexParams, + envvar: str, + ): + """ read_params_as_pydantic = PythonTask( function=read_initial_params_as_pydantic, name="read_params_as_pydantic", ) + """ + Signature of read_initial_params_as_json + def read_initial_params_as_json( + integer: int, + floater: float, + stringer: str, + pydantic_param: Dict[str, Union[int, str]], + ): + """ read_params_as_json = PythonTask( function=read_initial_params_as_json, terminate_with_success=True, diff --git a/examples/03-parameters/static_parameters_python.yaml b/examples/03-parameters/static_parameters_python.yaml index f86302d7..8448cc1f 100644 --- a/examples/03-parameters/static_parameters_python.yaml +++ b/examples/03-parameters/static_parameters_python.yaml @@ -31,7 +31,3 @@ dag: type: task command: examples.common.functions.read_initial_params_as_json next: success - success: - type: success - fail: - type: fail diff --git a/examples/04-catalog/catalog.yaml b/examples/04-catalog/catalog.yaml index 64d90e24..07d8abcf 100644 --- a/examples/04-catalog/catalog.yaml +++ b/examples/04-catalog/catalog.yaml @@ -113,7 +113,3 @@ dag: rm df.csv || true && \ rm data_folder/data.txt || true next: success - success: - type: success - fail: - type: fail diff --git a/examples/06-parallel/nesting.yaml b/examples/06-parallel/nesting.yaml index 32b189f3..7e202ad0 100644 --- a/examples/06-parallel/nesting.yaml +++ b/examples/06-parallel/nesting.yaml @@ -24,10 +24,6 @@ branch: &simple_branch command_type: notebook command: examples/common/simple_notebook.ipynb # The path is relative to the root of the project. next: success - success: - type: success - fail: - type: fail # This branch is similar to a branch parallel.yaml @@ -40,10 +36,6 @@ nested_branch: &nested_branch branches: branch1: *simple_branch branch2: *simple_branch - success: - type: success - failure: - type: fail # The pipeline of nested parallel branches @@ -56,7 +48,3 @@ dag: branches: branch1: *nested_branch branch2: *nested_branch - success: - type: success - failure: - type: fail diff --git a/examples/06-parallel/parallel.yaml b/examples/06-parallel/parallel.yaml index c76d7502..a4675ce2 100644 --- a/examples/06-parallel/parallel.yaml +++ b/examples/06-parallel/parallel.yaml @@ -23,10 +23,6 @@ branch: &branch command_type: notebook command: examples/common/simple_notebook.ipynb # The path is relative to the root of the project. next: success - success: - type: success - fail: - type: fail dag: @@ -47,7 +43,3 @@ dag: branches: branch1: *branch branch2: *branch - success: - type: success - failure: - type: fail diff --git a/examples/07-map/custom_reducer.yaml b/examples/07-map/custom_reducer.yaml index 3189924e..e88b14b6 100644 --- a/examples/07-map/custom_reducer.yaml +++ b/examples/07-map/custom_reducer.yaml @@ -33,10 +33,6 @@ branch: &branch type: task command: examples.common.functions.read_processed_chunk next: success - success: - type: success - fail: - type: fail dag: description: | @@ -75,7 +71,3 @@ dag: type: task command: examples.common.functions.assert_custom_reducer next: success - success: - type: success - fail: - type: fail diff --git a/examples/07-map/map.yaml b/examples/07-map/map.yaml index d61828cd..0c8c0b7f 100644 --- a/examples/07-map/map.yaml +++ b/examples/07-map/map.yaml @@ -33,10 +33,6 @@ branch: &branch type: task command: examples.common.functions.read_processed_chunk next: success - success: - type: success - fail: - type: fail dag: @@ -76,7 +72,3 @@ dag: type: task command: examples.common.functions.assert_default_reducer next: success - success: - type: success - fail: - type: fail diff --git a/examples/09-retry/python_tasks.yaml b/examples/09-retry/python_tasks.yaml index f86302d7..8448cc1f 100644 --- a/examples/09-retry/python_tasks.yaml +++ b/examples/09-retry/python_tasks.yaml @@ -31,7 +31,3 @@ dag: type: task command: examples.common.functions.read_initial_params_as_json next: success - success: - type: success - fail: - type: fail diff --git a/examples/catalog.py b/examples/catalog.py deleted file mode 100644 index c94071f2..00000000 --- a/examples/catalog.py +++ /dev/null @@ -1,69 +0,0 @@ -""" -Example pipeline to demonstrate passing data files between tasks. - -You can run this pipeline by: - python run examples/catalog.py -""" - -from runnable import Catalog, Pipeline, ShellTask, Stub - - -def main(): - # Make the data folder if it does not exist - set_up = ShellTask( - name="Setup", - command="mkdir -p data", - ) - - # create a catalog instruction to put a file into the catalog - create_catalog = Catalog(put=["data/hello.txt"]) - # This task will create a file in the data folder and attaches the instruction - # to put the file into the catalog. - create = ShellTask( - name="Create Content", - command='echo "Hello from runnable" >> data/hello.txt', - catalog=create_catalog, - ) - - # We remove the data folder to ensure that the data folder is cleaned up. - # This is to show that the retrieve step just does not read from existing data - # This step is stubbed to prevent any accidental deletion, make it a ShellTask - first_clean = Stub( - name="Clean up to get again", - command="rm -rf data", - ) - - # We create a catalog instruction to retrieve a file from the catalog - # Here we use "compute_folder_name" to point to the directory of interest. - # You can alteratively ignore compute_folder_name and get "data/hello.txt" - # You can use wild card following glob patterns to retrieve multiple files. - get_catalog = Catalog(get=["data/hello.txt"]) - # This task will retrieve the file from the catalog and attach the instruction - # to retrieve the file from the catalog before execution. - retrieve = ShellTask( - name="Retrieve Content", - command="cat data/hello.txt", - catalog=get_catalog, - ) - - # We clean up. Note that this step is stubbed to prevent any accidental deletion, - # Make it a ShellTask to actually clean up. - clean_up = Stub( - name="Clean up", - command="rm -rf data", - terminate_with_success=True, - ) - - pipeline = Pipeline( - steps=[set_up, create, first_clean, retrieve, clean_up], - add_terminal_nodes=True, - ) - - # override the default configuration to use file-system catalog. - pipeline.execute(configuration_file="examples/configs/fs-catalog.yaml") - - return pipeline - - -if __name__ == "__main__": - main() diff --git a/examples/catalog.yaml b/examples/catalog.yaml deleted file mode 100644 index b70489f1..00000000 --- a/examples/catalog.yaml +++ /dev/null @@ -1,52 +0,0 @@ -dag: - description: | - This is a simple pipeline that demonstrates passing data between steps. - - 1. Setup: We setup a data folder, we ignore if it is already present - 2. Create Content: We create a "hello.txt" and "put" the file in catalog - 3. Clean up to get again: We remove the data folder. Note that this is stubbed to prevent - accidental deletion of your contents. You can change type to task to make really run. - 4. Retrieve Content: We "get" the file "hello.txt" from the catalog and show the contents - 5. Cleanup: We remove the data folder. Note that this is stubbed to prevent accidental deletion. - - - You can run this pipeline by: - runnable execute -f examples/catalog.yaml -c examples/configs/fs-catalog.yaml - start_at: Setup - steps: - Setup: - type: task - command_type: shell - command: mkdir -p data # (1) - next: Create Content - Create Content: - type: task - command_type: shell - command: | - echo "Hello from runnable" >> data/hello.txt - next: Clean up to get again - catalog: # (2) - put: - - data/hello.txt - Clean up to get again: - type: stub # (3) - command_type: shell - command: rm -rf data - next: Retrieve Content - Retrieve Content: - type: task - command_type: shell - command: cat data/hello.txt # (4) - catalog: - get: - - "data/hello.txt" # You can use wild cards following glob pattern - next: Clean up - Clean up: - type: stub # (6) - command_type: shell - command: rm -rf data - next: success - success: - type: success - fail: - type: fail diff --git a/examples/catalog_simple.py b/examples/catalog_simple.py deleted file mode 100644 index 02c719a0..00000000 --- a/examples/catalog_simple.py +++ /dev/null @@ -1,39 +0,0 @@ -""" -An example pipeline to demonstrate the use of file-system catalog. - -Run this pipeline by: - python examples/concepts/catalog_simple.py - -""" - -from runnable import Catalog, Pipeline, ShellTask - - -def main(): - # Make the data folder if it does not exist - set_up = ShellTask(name="Setup", command="mkdir -p data") - - # create a catalog instruction to put a file into the catalog - create_catalog = Catalog(put=["data/hello.txt"]) - # This task will create a file in the data folder and attaches the instruction - # to put the file into the catalog. - create = ShellTask( - name="Create Content", - command='echo "Hello from runnable" >> data/hello.txt', - catalog=create_catalog, - terminate_with_success=True, - ) - - pipeline = Pipeline( - steps=[set_up, create], - add_terminal_nodes=True, - ) - - # override the default configuration to use file-system catalog. - pipeline.execute(configuration_file="examples/configs/fs-catalog.yaml") - - return pipeline - - -if __name__ == "__main__": - main() diff --git a/examples/comparisions/README.md b/examples/comparisions/README.md new file mode 100644 index 00000000..769790f9 --- /dev/null +++ b/examples/comparisions/README.md @@ -0,0 +1 @@ +To avoid any subjective bias in comparisons, we take the same example and implement it in alternate frameworks. diff --git a/examples/iris_demo.py b/examples/iris_demo.py index 0283b310..b0f250f1 100644 --- a/examples/iris_demo.py +++ b/examples/iris_demo.py @@ -49,6 +49,7 @@ def generate_plots(X: np.ndarray, Y: np.ndarray, logreg: LogisticRegression): plt.savefig("iris_logistic.png") + # TODO: What is the right value? return 0.6 diff --git a/examples/parallel-fail.yaml b/examples/parallel-fail.yaml deleted file mode 100644 index 589a9710..00000000 --- a/examples/parallel-fail.yaml +++ /dev/null @@ -1,53 +0,0 @@ -dag: - description: | - This is a simple pipeline that does 2 parallel branches at step 2. - step1 inside branch_a fails resulting in step2 to fail and eventually the graph. - - Note that the branches schema is same as dag schema. - - You can run this pipeline by: - runnable execute -f examples/parallel-fail.yaml - start_at: step 1 - steps: - step 1: - type: stub - next: step 2 - step 2: - type: parallel - next: step 3 - branches: - branch_a: - start_at: step 1 - steps: - step 1: - type: task - command_type: shell - command: exit 1 # This will fail - next: step 2 - step 2: - type: stub - next: success - success: - type: success - fail: - type: fail - branch_b: - start_at: step 1 - steps: - step 1: - type: stub - next: step 2 - step 2: - type: stub - next: success - success: - type: success - fail: - type: fail - step 3: - type: stub - next: success - success: - type: success - fail: - type: fail diff --git a/examples/parameters.py b/examples/parameters.py deleted file mode 100644 index d039fa83..00000000 --- a/examples/parameters.py +++ /dev/null @@ -1,84 +0,0 @@ -""" -The initial parameters defined in the parameters file are: -simple: 1 -inner: - x: 10 - y: "hello" - -You can execute this pipeline by: python examples/parameters.py - -""" - -from typing import Tuple - -from pydantic import BaseModel - - -class InnerModel(BaseModel): - """ - Captures the "inner" part of the parameters. - The model definition can be as nested as you want. - """ - - x: int - y: str - - -class NestedModel(BaseModel): # (1) - """ - Captures the whole parameter space of the application. - """ - - simple: int - inner: InnerModel - - -def display(simple: int, inner: InnerModel): # (2) - """ - The parameter "simple" and "inner" can be accessed by name. - runnable understands the parameter "inner" as a pydantic model from - annotation and casts it as a pydantic model. - """ - print(simple) - print(inner) - - -def return_parameters(simple: int, inner: InnerModel) -> Tuple[int, InnerModel]: # (3) - """ - The parameter "simple" and "inner" can be accessed by name. - You can redefine the parameters by returning a pydantic model. - """ - simple = 2 - inner.x = 30 - inner.y = "Hello Universe!!" - - return simple, inner - - -def main(): - from runnable import Pipeline, PythonTask - - display_task = PythonTask(name="display", function=display) - - return_parameters_task = PythonTask( - name="return_parameters", - function=return_parameters, - returns=[ - "simple", - "inner", - ], - terminate_with_success=True, - ) - - pipeline = Pipeline( - steps=[display_task, return_parameters_task], - add_terminal_nodes=True, - ) - - _ = pipeline.execute(parameters_file="examples/parameters_initial.yaml") - - return pipeline - - -if __name__ == "__main__": - main() diff --git a/examples/parameters_env.yaml b/examples/parameters_env.yaml deleted file mode 100644 index 46d7e420..00000000 --- a/examples/parameters_env.yaml +++ /dev/null @@ -1,47 +0,0 @@ -dag: - description: | - This is a simple pipeline that demonstrates how to use - environment variables to access parameters. - - All parameters are prefixed by runnable_PRM_ in json serialized form. - To set a parameter, you need to set the environment variable with the prefix - - You can run this example: - runnable execute -f examples/parameters_env.yaml -p examples/parameters_initial.yaml - - start_at: display - steps: - display: - type: task - command_type: shell - command: | - env | grep simple - env | grep inner - # prints simple=1 - # prints inner={"x": 10, "y": "hello world!!"} - next: update params - update params: - type: task - command_type: shell - next: display again - command: | - export simple=10 && - export inner='{"x": 100, "y": "hello universe!!"}' - returns: # collect simple and inner from environment - - name: simple - kind: json - - name: inner - kind: json - display again: - type: task - command_type: shell - command: | - env | grep simple - env | grep inner - # prints simple=1 - # prints inner={"x": 100, "y": "hello universe!!"} - next: success - success: - type: success - fail: - type: fail diff --git a/examples/parameters_simple.py b/examples/parameters_simple.py deleted file mode 100644 index 28c20f6d..00000000 --- a/examples/parameters_simple.py +++ /dev/null @@ -1,93 +0,0 @@ -""" -The initial parameters defined in the parameters file are: -simple: 1 -inner: - x: 10 - y: "hello" - -You can execute this pipeline by: python examples/parameters.py - -""" - -from pydantic import BaseModel - - -class InnerModel(BaseModel): - """ - Captures the "inner" part of the parameters. - The model definition can be as nested as you want. - """ - - x: int - y: str - - -class NestedModel(BaseModel): # (1) - """ - Captures the whole parameter space of the application. - """ - - simple: int - inner: InnerModel - - -def display(simple: int, inner: InnerModel): # (2) - """ - The parameter "simple" and "inner" can be accessed by name. - runnable understands the parameter "inner" as a pydantic model from - annotation and casts it as a pydantic model. - """ - assert simple == 1 - assert inner.x == 10 - assert inner.y == "hello world!!" - - -class ObjectType: - def __init__(self): - self.salute = "hello" - - -def return_parameters(): - """ - The parameter "simple" and "inner" can be accessed by name. - You can redefine the parameters by returning a pydantic model. - """ - x = 2 - y = "hello Universe!!" - - return x, y, ObjectType() - - -def display_object(obj: ObjectType): - print(obj.salute) - - -def main(): - from runnable import Pipeline, PythonTask, pickled - - display_task = PythonTask(name="display", function=display) - - return_parameters_task = PythonTask( - name="return_parameters", - function=return_parameters, - returns=["x", "y", pickled("obj")], - ) - - display_object_task = PythonTask( - name="display_object", - function=display_object, - terminate_with_success=True, - ) - - pipeline = Pipeline( - steps=[display_task, return_parameters_task, display_object_task], - add_terminal_nodes=True, - ) - - _ = pipeline.execute(parameters_file="examples/parameters_initial.yaml") - - return pipeline - - -if __name__ == "__main__": - main() diff --git a/examples/python-tasks.yaml b/examples/python-tasks.yaml deleted file mode 100644 index fb8ad48d..00000000 --- a/examples/python-tasks.yaml +++ /dev/null @@ -1,33 +0,0 @@ -dag: - description: | - This is a simple pipeline that does 3 steps - in sequence. - - In this example: - 1. First step: returns a "parameter" x - as a Pydantic model - 2. Second step: Consumes that parameter - and prints it - - You can run this pipeline by: - runnable execute -f examples/python-tasks.yaml - start_at: step 1 # (1) - steps: - step 1: # (2) - type: task - command: examples.functions.return_parameter # (3) - returns: - - name: x - kind: json - - name: y - kind: json - next: step 2 # (4) - step 2: - type: task - command_type: python - command: examples.functions.display_parameter - next: success # (5) - success: - type: success # (6) - fail: - type: fail diff --git a/examples/retry-fail.yaml b/examples/retry-fail.yaml deleted file mode 100644 index 66eca916..00000000 --- a/examples/retry-fail.yaml +++ /dev/null @@ -1,41 +0,0 @@ -dag: - description: | - This is a simple pipeline that demonstrates retrying failures. - - 1. Setup: We setup a data folder, we ignore if it is already present - 2. Create Content: We create a "hello.txt" and "put" the file in catalog - 3. Retrieve Content: We "get" the file "hello.txt" from the catalog and show the contents - 5. Cleanup: We remove the data folder. Note that this is stubbed to prevent accidental deletion. - - - You can run this pipeline by: - runnable execute -f examples/retry-fail.yaml -c examples/configs/fs-catalog-run_log.yaml \ - --run-id wrong-file-name - start_at: Setup - steps: - Setup: - type: task - command_type: shell - command: mkdir -p data # (1) - next: Create Content - Create Content: - type: task - command_type: shell - command: | - echo "Hello from runnable" >> data/hello.txt - next: Retrieve Content - catalog: # (2) - put: - - data/hello.txt - Retrieve Content: - type: task - command_type: shell - command: cat data/hello1.txt # (3) - catalog: - get: - - "data/hello.txt" # You can use wild cards following glob pattern - next: success - success: - type: success - fail: - type: fail diff --git a/examples/retry-fixed.yaml b/examples/retry-fixed.yaml deleted file mode 100644 index 654d5709..00000000 --- a/examples/retry-fixed.yaml +++ /dev/null @@ -1,44 +0,0 @@ -dag: - description: | - This is a simple pipeline that demonstrates passing data between steps. - - 1. Setup: We setup a data folder, we ignore if it is already present - 2. Create Content: We create a "hello.txt" and "put" the file in catalog - 3. Clean up to get again: We remove the data folder. Note that this is stubbed to prevent - accidental deletion of your contents. You can change type to task to make really run. - 4. Retrieve Content: We "get" the file "hello.txt" from the catalog and show the contents - 5. Cleanup: We remove the data folder. Note that this is stubbed to prevent accidental deletion. - - - You can run this pipeline by: - runnable execute -f examples/retry-fixed.yaml -c examples/configs/fs-catalog-run_log.yaml \ - --use-cached wrong-file-name - - start_at: Setup - steps: - Setup: - type: task # (1) - command_type: shell - command: mkdir -p data - next: Create Content - Create Content: - type: stub # (2) - command_type: shell - command: | - echo "Hello from runnable" >> data/hello.txt - next: Retrieve Content - catalog: - put: - - data/hello.txt - Retrieve Content: - type: task - command_type: shell - command: cat data/hello.txt - catalog: - get: - - "data/hello.txt" # You can use wild cards following glob pattern - next: success - success: - type: success - fail: - type: fail diff --git a/mkdocs.yml b/mkdocs.yml index e49128fb..897e33cf 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -114,26 +114,15 @@ nav: - "runnable": - "Introduction": "index.md" - "Usage": "usage.md" - # - "Example": - # - "Pipeline Definition": "example/example.md" - # - "Steps": "example/steps.md" - # - "Flow of data": "example/dataflow.md" - # - "Reproducibility": "example/reproducibility.md" - # - "Experiment tracking": "example/experiment-tracking.md" - # - "Secrets": "example/secrets.md" - # - "Retry failures": "example/retry-after-failure.md" - "Why runnable?": "why-runnable.md" - "Define pipeline": - - "tl;dr": "concepts/the-big-picture.md" + - "concepts/index.md" + - "Tasks": concepts/task.md - "Pipeline": "concepts/pipeline.md" - # - "Executor": "concepts/executor.md" - #- "Parameters": "concepts/parameters.md" + - "Parameters": "concepts/parameters.md" - "Catalog": "concepts/catalog.md" - # - "Experiment tracking": "concepts/experiment-tracking.md" - "Secrets": "concepts/secrets.md" - "Nodes": - - "Stub": "concepts/stub.md" - - "Task": "concepts/task.md" - "Parallel": "concepts/parallel.md" - "Map": "concepts/map.md" - "Nesting": "concepts/nesting.md" @@ -152,4 +141,6 @@ nav: - "Secrets": "configurations/secrets.md" # - "Experiment tracking": "configurations/experiment-tracking.md" - "Python SDK": "sdk.md" + - "YAML": "yaml.md" + - "Reference": "reference.md" # - "Roadmap": "roadmap.md" diff --git a/runnable/graph.py b/runnable/graph.py index e66de8e6..c7cb4284 100644 --- a/runnable/graph.py +++ b/runnable/graph.py @@ -342,6 +342,8 @@ def create_graph(dag_config: Dict[str, Any], internal_branch_name: str = "") -> node = create_node(name, step_config=step_config, internal_branch_name=internal_branch_name) graph.add_node(node) + graph.add_terminal_nodes(internal_branch_name=internal_branch_name) + graph.check_graph() return graph diff --git a/runnable/sdk.py b/runnable/sdk.py index fd736f85..dff11912 100644 --- a/runnable/sdk.py +++ b/runnable/sdk.py @@ -220,7 +220,6 @@ def create_node(self) -> TaskNode: class PythonTask(BaseTask): """ An execution node of the pipeline of python functions. - Please refer to [concepts](concepts/task.md) for more information. Attributes: name (str): The name of the node.