diff --git a/.gitignore b/.gitignore index ee5beae4..f53c03b2 100644 --- a/.gitignore +++ b/.gitignore @@ -6,6 +6,9 @@ __pycache__/ # C extensions *.so +# examples run time +*_out* + # Distribution / packaging .Python build/ diff --git a/examples/01-tasks/notebook.py b/examples/01-tasks/notebook.py index 68447032..afc99e40 100644 --- a/examples/01-tasks/notebook.py +++ b/examples/01-tasks/notebook.py @@ -16,7 +16,7 @@ │   └── simple_notebook_out.ipynb └── notebook.execution.log -The notebook simple_notebook_out.ipynb has the captured stdout of "Hello World!". +The notebook simple_notebook__out.ipynb has the captured stdout of "Hello World!". """ from runnable import NotebookTask, Pipeline diff --git a/examples/01-tasks/notebook.yaml b/examples/01-tasks/notebook.yaml index db79591a..f1826210 100644 --- a/examples/01-tasks/notebook.yaml +++ b/examples/01-tasks/notebook.yaml @@ -17,7 +17,7 @@ dag: │   └── simple_notebook_out.ipynb └── notebook.execution.log - The notebook simple_notebook_out.ipynb has the captured stdout of "Hello World!". + The notebook simple_notebook__out.ipynb has the captured stdout of "Hello World!". You can run this pipeline as: runnable execute -f examples/01-tasks/notebook.yaml diff --git a/examples/02-sequential/default_fail.py b/examples/02-sequential/default_fail.py index 92a4e578..c504e36d 100644 --- a/examples/02-sequential/default_fail.py +++ b/examples/02-sequential/default_fail.py @@ -8,7 +8,8 @@ step 1 >> step 2 >> fail -You can run this example by: python examples/02-sequential/default_fail.py +You can run this example by: + python examples/02-sequential/default_fail.py """ from examples.common.functions import raise_ex diff --git a/examples/02-sequential/default_fail.yaml b/examples/02-sequential/default_fail.yaml index f8e423f9..4802ccb3 100644 --- a/examples/02-sequential/default_fail.yaml +++ b/examples/02-sequential/default_fail.yaml @@ -6,7 +6,8 @@ dag: The default behavior is to traverse to step type fail and mark the run as failed. - You can run this pipeline by: runnable execute -f examples/02-sequential/default_fail.yaml + You can run this pipeline by: + runnable execute -f examples/02-sequential/default_fail.yaml start_at: step 1 steps: step 1: diff --git a/examples/02-sequential/on_failure_fail.py b/examples/02-sequential/on_failure_fail.py index cb4fa6cd..ce7892c9 100644 --- a/examples/02-sequential/on_failure_fail.py +++ b/examples/02-sequential/on_failure_fail.py @@ -12,7 +12,8 @@ This pattern is handy when you need to do something before eventually failing (eg: sending a notification, updating status, etc...) -Run this pipeline as: python examples/02-sequential/on_failure_fail.py +Run this pipeline as: + python examples/02-sequential/on_failure_fail.py """ from examples.common.functions import raise_ex diff --git a/examples/02-sequential/on_failure_fail.yaml b/examples/02-sequential/on_failure_fail.yaml index 6e94242a..0521038e 100644 --- a/examples/02-sequential/on_failure_fail.yaml +++ b/examples/02-sequential/on_failure_fail.yaml @@ -12,6 +12,9 @@ dag: This pattern is handy when you need to do something before eventually failing (eg: sending a notification, updating status, etc...) + + Run this pipeline as: + runnable execute -f examples/02-sequential/default_fail.yaml start_at: step_1 steps: step_1: diff --git a/examples/02-sequential/on_failure_succeed.py b/examples/02-sequential/on_failure_succeed.py index 6015bd01..b21c2cf1 100644 --- a/examples/02-sequential/on_failure_succeed.py +++ b/examples/02-sequential/on_failure_succeed.py @@ -12,7 +12,8 @@ This pattern is handy when you are expecting a failure of a step and have ways to handle it. -Run this pipeline: python examples/02-sequential/on_failure_succeed.py +Run this pipeline: + python examples/02-sequential/on_failure_succeed.py """ from examples.common.functions import raise_ex diff --git a/examples/02-sequential/on_failure_succeed.yaml b/examples/02-sequential/on_failure_succeed.yaml index 3977e175..50c7c4b6 100644 --- a/examples/02-sequential/on_failure_succeed.yaml +++ b/examples/02-sequential/on_failure_succeed.yaml @@ -31,7 +31,7 @@ dag: next: success step_4: type: stub - next: fail + next: success success: type: success fail: diff --git a/examples/03-parameters/passing_parameters_notebook.py b/examples/03-parameters/passing_parameters_notebook.py index ecb00783..4ada02c1 100644 --- a/examples/03-parameters/passing_parameters_notebook.py +++ b/examples/03-parameters/passing_parameters_notebook.py @@ -1,3 +1,18 @@ +""" +Demonstrates passing parameters to and from a notebook. + +runnable can extract JSON serializable types, pydantic models, objects from notebook. +eg: write_parameters_from_notebook + +But can only inject JSON type parameters to a notebook. +eg: read_parameters_in_notebook +pydantic parameters are injected as dictionary. + +Run the below example as: + python examples/03-parameters/passing_parameters_notebook.py + +""" + from examples.common.functions import read_parameter from runnable import NotebookTask, Pipeline, PythonTask, metric, pickled diff --git a/examples/03-parameters/passing_parameters_notebook.yaml b/examples/03-parameters/passing_parameters_notebook.yaml index e69de29b..4eb25c0c 100644 --- a/examples/03-parameters/passing_parameters_notebook.yaml +++ b/examples/03-parameters/passing_parameters_notebook.yaml @@ -0,0 +1,41 @@ +dag: + description: | + Demonstrates passing parameters to and from a notebook. + + runnable can extract JSON serializable types, pydantic models, objects from notebook. + eg: write_parameters_from_notebook + + But can only inject JSON type parameters to a notebook. + eg: read_parameters_in_notebook + pydantic parameters are injected as dictionary. + + Run the below example as: + runnable execute examples/03-parameters/passing_parameters_notebook.yaml + start_at: write_parameters_from_notebook + steps: + write_parameters_from_notebook: + type: task + command_type: notebook + command: examples/common/write_parameters.ipynb + returns: + - name: df + kind: object + - name: integer + - name: floater + - name: stringer + - name: pydantic_param + - name: score + next: read_parameters + read_parameters: + type: task + command: examples.common.functions.read_parameter + next: read_parameters_in_notebook + read_parameters_in_notebook: + type: task + command_type: notebook + command: examples/common/read_parameters.ipynb + next: success + success: + type: success + fail: + type: fail diff --git a/examples/03-parameters/passing_parameters_python.py b/examples/03-parameters/passing_parameters_python.py index baf9c1e5..ee9a72d6 100644 --- a/examples/03-parameters/passing_parameters_python.py +++ b/examples/03-parameters/passing_parameters_python.py @@ -3,7 +3,7 @@ tasks of the pipeline. The function, set_parameter, returns - - simple python data types (int, float, str) + - JSON serializable types - pydantic models - pandas dataframe, any "object" type @@ -13,6 +13,9 @@ Use pickled even for python data types is advised for reasonably large collections. +Run the below example as: + python examples/03-parameters/passing_parameters_python.py + """ from examples.common.functions import read_parameter, write_parameter diff --git a/examples/03-parameters/passing_parameters_python.yaml b/examples/03-parameters/passing_parameters_python.yaml index e69de29b..b2d73b30 100644 --- a/examples/03-parameters/passing_parameters_python.yaml +++ b/examples/03-parameters/passing_parameters_python.yaml @@ -0,0 +1,41 @@ +dag: + description: | + The below example shows how to set/get parameters in python + tasks of the pipeline. + + The function, set_parameter, returns + - JSON serializable + - pydantic models + - pandas dataframe, any "object" type + + pydantic models are implicitly handled by runnable + but "object" types should be marked as "pickled". + + Use pickled even for python data types is advised for + reasonably large collections. + + Run the pipeline as: + runnable execute -f examples/03-parameters/passing_parameters_python.yaml + start_at: write_parameters + steps: + write_parameters: + type: task + command: examples.common.functions.write_parameter + returns: + - name: df + kind: object + - name: integer + - name: floater + - name: stringer + - name: pydantic_param + - name: score + + next: read_parameters + read_parameters: + type: task + command: examples.common.functions.read_parameter + next: success + success: + type: success + fail: + type: fail diff --git a/examples/03-parameters/passing_parameters_shell.py b/examples/03-parameters/passing_parameters_shell.py index 3ef14bc2..20bbaf90 100644 --- a/examples/03-parameters/passing_parameters_shell.py +++ b/examples/03-parameters/passing_parameters_shell.py @@ -1,3 +1,18 @@ +""" +Demonstrates passing parameters to and from shell scripts. + +We can extract only JSON serializable parameters from shell scripts. +eg: write_parameters_in_shell + +We can only read json style parameters from shell scripts. +eg: read_parameters_in_shell +pydantic parameters are injected as json. + +Run the below example as: + python examples/03-parameters/passing_parameters_shell.py + +""" + from examples.common.functions import read_unpickled_parameter from runnable import Pipeline, PythonTask, ShellTask, metric @@ -25,11 +40,28 @@ def main(): read_parameters = PythonTask( function=read_unpickled_parameter, name="read_parameters", + ) + + read_parameters_command = """ + if [ "$integer" = 1 ] \ + && [ "$floater" = 3.14 ] \ + && [ "$stringer" = "hello" ] \ + && [ "$pydantic_param" = '{"x": 10, "foo": "bar"}' ]; then + echo "yaay" + exit 0; + else + echo "naay" + exit 1; + fi + """ + read_parameters_in_shell = ShellTask( + name="read_parameters_in_shell", + command=read_parameters_command, terminate_with_success=True, ) pipeline = Pipeline( - steps=[write_parameters_in_shell, read_parameters], + steps=[write_parameters_in_shell, read_parameters, read_parameters_in_shell], ) _ = pipeline.execute() diff --git a/examples/03-parameters/passing_parameters_shell.yaml b/examples/03-parameters/passing_parameters_shell.yaml index e69de29b..b12e48d0 100644 --- a/examples/03-parameters/passing_parameters_shell.yaml +++ b/examples/03-parameters/passing_parameters_shell.yaml @@ -0,0 +1,55 @@ +dag: + description: | + Demonstrates passing parameters to and from shell scripts. + + We can extract only json style parameters from shell scripts. + eg: write_parameters_in_shell + + We can only read json style parameters from shell scripts. + eg: read_parameters_in_shell + pydantic parameters are injected as json. + + Run the pipeline as: + runnable execute -f examples/03-parameters/passing_parameters_shell.yaml + + start_at: write_parameters_in_shell + steps: + write_parameters_in_shell: + type: task + command_type: shell + command: | + export integer=1 + export floater=3.14 + export stringer="hello" + export pydantic_param='{"x": 10, "foo": "bar"}' + export score=0.9 + returns: + - name: integer + - name: floater + - name: stringer + - name: pydantic_param + - name: score + next: read_parameters + read_parameters: + type: task + command: examples.common.functions.read_unpickled_parameter + next: read_parameters_in_shell + read_parameters_in_shell: + type: task + command_type: shell + command: | + if [ "$integer" = 1 ] \ + && [ "$floater" = 3.14 ] \ + && [ "$stringer" = "hello" ] \ + && [ "$pydantic_param" = '{"x": 10, "foo": "bar"}' ]; then + echo "yaay" + exit 0; + else + echo "naay" + exit 1; + fi + next: success + success: + type: success + fail: + type: fail diff --git a/examples/03-parameters/static_parameters_non_python.py b/examples/03-parameters/static_parameters_non_python.py index 41eae659..0a095879 100644 --- a/examples/03-parameters/static_parameters_non_python.py +++ b/examples/03-parameters/static_parameters_non_python.py @@ -11,8 +11,14 @@ foo: bar runnable exposes the nested parameters as dictionary for notebook based tasks -as a json string for the shell based tasks. +and as a json string for the shell based tasks. +You can set the initial parameters from environment variables as well. +eg: Any environment variable prefixed by "RUNNABLE_PRM_" will be picked up by runnable + + +Run this pipeline as: + python examples/03-parameters/static_parameters_non_python.py """ from runnable import NotebookTask, Pipeline, ShellTask diff --git a/examples/03-parameters/static_parameters_non_python.yaml b/examples/03-parameters/static_parameters_non_python.yaml index ba581fa8..cf7809d5 100644 --- a/examples/03-parameters/static_parameters_non_python.yaml +++ b/examples/03-parameters/static_parameters_non_python.yaml @@ -12,7 +12,16 @@ dag: foo: bar runnable exposes the nested parameters as dictionary for notebook based tasks - as a json string for the shell based tasks. + and as a json string for the shell based tasks. + + You can set the initial parameters from environment variables as well. + eg: Any environment variable prefixed by "RUNNABLE_PRM_" will be picked up by runnable + + + Run this pipeline as: + runnable execute -f 03-parameters/static_parameters_non_python.yaml \ + -p common/initial_parameters.yaml + start_at: read_params_in_notebook steps: read_params_in_notebook: diff --git a/examples/03-parameters/static_parameters_python.py b/examples/03-parameters/static_parameters_python.py index 7c82ea3a..abe8aeff 100644 --- a/examples/03-parameters/static_parameters_python.py +++ b/examples/03-parameters/static_parameters_python.py @@ -14,8 +14,16 @@ If no annotation is provided, the parameter is assumed to be a dictionary. eg: read_initial_params_as_json +You can set the initial parameters from environment variables as well. +eg: Any environment variable prefixed by "RUNNABLE_PRM_" will be picked up by runnable + +Run this pipeline as: + python examples/03-parameters/static_parameters_python.py + """ +import os + from examples.common.functions import ( read_initial_params_as_json, read_initial_params_as_pydantic, @@ -46,4 +54,6 @@ def main(): if __name__ == "__main__": + # Any parameter prefixed by "RUNNABLE_PRM_" will be picked up by runnable + os.environ["RUNNABLE_PRM_envvar"] = "from env" main() diff --git a/examples/03-parameters/static_parameters_python.yaml b/examples/03-parameters/static_parameters_python.yaml index ea0b8b7a..f86302d7 100644 --- a/examples/03-parameters/static_parameters_python.yaml +++ b/examples/03-parameters/static_parameters_python.yaml @@ -14,6 +14,13 @@ dag: If no annotation is provided, the parameter is assumed to be a dictionary. eg: read_initial_params_as_json + + You can set the initial parameters from environment variables as well. + eg: Any environment variable prefixed by "RUNNABLE_PRM_" will be picked up by runnable + + Run this pipeline by: + runnable execute -f 03-parameters/static_parameters_python.yaml \ + -p examples/common/initial_parameters.yaml start_at: read_params_as_pydantic steps: read_params_as_pydantic: diff --git a/examples/04-catalog/catalog.py b/examples/04-catalog/catalog.py new file mode 100644 index 00000000..f6288031 --- /dev/null +++ b/examples/04-catalog/catalog.py @@ -0,0 +1,131 @@ +""" +Demonstrates moving files within tasks. + +- generate_data: creates df.csv and data_folder/data.txt + +- delete_local_after_generate: deletes df.csv and data_folder/data.txt + This step ensures that the local files are deleted after the step + +- read_data_py: reads df.csv and data_folder/data.txt + +- delete_local_after_python_get: deletes df.csv and data_folder/data.txt + This step ensures that the local files are deleted after the step + +- read_data_shell: reads df.csv and data_folder/data.txt + +- delete_local_after_shell_get: deletes df.csv and data_folder/data.txt + This step ensures that the local files are deleted after the step + +- read_data_notebook: reads df.csv and data_folder/data.txt + +- delete_local_after_notebook_get: deletes df.csv and data_folder/data.txt + +Use this pattern to move files that are not dill friendly. + +All the files are stored in catalog. + +.catalog +└── silly-joliot-0610 + ├── data_folder + │   └── data.txt + ├── deleteaftergenerate.execution.log + ├── deleteaftergeneratenotebook.execution.log + ├── deleteaftergeneratepython.execution.log + ├── deleteaftergenerateshell.execution.log + ├── df.csv + ├── examples + │   └── common + │   └── read_files_out.ipynb + ├── generatedata.execution.log + ├── readdatanotebook.execution.log + ├── readdatapy.execution.log + └── readdatashell.execution.log + +5 directories, 11 files + +Run this pipeline as: + python examples/04-catalog/catalog.py + +""" + +from examples.common.functions import read_files, write_files +from runnable import Catalog, NotebookTask, Pipeline, PythonTask, ShellTask + + +def main(): + write_catalog = Catalog(put=["df.csv", "data_folder/data.txt"]) + generate_data = PythonTask( + name="generate_data", + function=write_files, + catalog=write_catalog, + ) + + delete_files_command = """ + rm df.csv || true && \ + rm data_folder/data.txt || true + """ + # delete from local files after generate + # since its local catalog, we delete to show "get from catalog" + delete_local_after_generate = ShellTask( + name="delete_after_generate", + command=delete_files_command, + ) + + read_catalog = Catalog(get=["df.csv", "data_folder/data.txt"]) + read_data_python = PythonTask( + name="read_data_py", + function=read_files, + catalog=read_catalog, + ) + + delete_local_after_python_get = ShellTask( + name="delete_after_generate_python", + command=delete_files_command, + ) + + read_data_shell_command = """ + (ls df.csv >> /dev/null 2>&1 && echo yes) || exit 1 && \ + (ls data_folder/data.txt >> /dev/null 2>&1 && echo yes) || exit 1 + """ + read_data_shell = ShellTask( + name="read_data_shell", + command=read_data_shell_command, + catalog=read_catalog, + ) + + delete_local_after_shell_get = ShellTask( + name="delete_after_generate_shell", + command=delete_files_command, + ) + + read_data_notebook = NotebookTask( + notebook="examples/common/read_files.ipynb", + name="read_data_notebook", + catalog=read_catalog, + ) + + delete_local_after_notebook_get = ShellTask( + name="delete_after_generate_notebook", + command=delete_files_command, + terminate_with_success=True, + ) + + pipeline = Pipeline( + steps=[ + generate_data, + delete_local_after_generate, + read_data_python, + delete_local_after_python_get, + read_data_shell, + delete_local_after_shell_get, + read_data_notebook, + delete_local_after_notebook_get, + ] + ) + _ = pipeline.execute() + + return pipeline + + +if __name__ == "__main__": + main() diff --git a/examples/04-catalog/catalog.yaml b/examples/04-catalog/catalog.yaml new file mode 100644 index 00000000..64d90e24 --- /dev/null +++ b/examples/04-catalog/catalog.yaml @@ -0,0 +1,119 @@ +dag: + description: | + Demonstrates moving files within tasks. + + - generate_data: creates df.csv and data_folder/data.txt + + - delete_local_after_generate: deletes df.csv and data_folder/data.txt + This step ensures that the local files are deleted after the step + + - read_data_py: reads df.csv and data_folder/data.txt + + - delete_local_after_python_get: deletes df.csv and data_folder/data.txt + This step ensures that the local files are deleted after the step + + - read_data_shell: reads df.csv and data_folder/data.txt + + - delete_local_after_shell_get: deletes df.csv and data_folder/data.txt + This step ensures that the local files are deleted after the step + + - read_data_notebook: reads df.csv and data_folder/data.txt + + - delete_local_after_notebook_get: deletes df.csv and data_folder/data.txt + + Use this pattern to move files that are not dill friendly. + + All the files are stored in catalog. + + .catalog + └── silly-joliot-0610 + ├── data_folder + │   └── data.txt + ├── deleteaftergenerate.execution.log + ├── deleteaftergeneratenotebook.execution.log + ├── deleteaftergeneratepython.execution.log + ├── deleteaftergenerateshell.execution.log + ├── df.csv + ├── examples + │   └── common + │   └── read_files_out.ipynb + ├── generatedata.execution.log + ├── readdatanotebook.execution.log + ├── readdatapy.execution.log + └── readdatashell.execution.log + + 5 directories, 11 files + + Run this pipeline as: + runnable execute -f examples/04-catalog/catalog.yaml + start_at: generate_data + steps: + generate_data: + type: task + command: examples.common.functions.write_files + catalog: + put: + - df.csv + - data_folder/data.txt + next: delete_files_after_generate + delete_files_after_generate: + type: task + command_type: shell + command: | + rm df.csv || true && \ + rm data_folder/data.txt || true + next: read_data_python + read_data_python: + type: task + command_type: python + command: examples.common.functions.read_files + catalog: + get: + - df.csv + - data_folder/data.txt + next: delete_local_after_python_get + delete_local_after_python_get: + type: task + command_type: shell + command: | + rm df.csv || true && \ + rm data_folder/data.txt || true + next: read_data_shell + read_data_shell: + type: task + command_type: shell + command: | + (ls df.csv >> /dev/null 2>&1 && echo yes) || exit 1 && \ + (ls data_folder/data.txt >> /dev/null 2>&1 && echo yes) || exit 1 + catalog: + get: + - df.csv + - data_folder/data.txt + next: delete_local_after_shell_get + delete_local_after_shell_get: + type: task + command_type: shell + command: | + rm df.csv || true && \ + rm data_folder/data.txt || true + next: read_data_notebook + read_data_notebook: + type: task + command_type: notebook + command: "examples/common/read_files.ipynb" + catalog: + get: + - df.csv + - data_folder/data.txt + next: delete_local_after_notebook_get + delete_local_after_notebook_get: + type: task + command_type: shell + command: | + rm df.csv || true && \ + rm data_folder/data.txt || true + next: success + success: + type: success + fail: + type: fail diff --git a/examples/06-parallel/nesting.py b/examples/06-parallel/nesting.py new file mode 100644 index 00000000..27bf543c --- /dev/null +++ b/examples/06-parallel/nesting.py @@ -0,0 +1,81 @@ +""" +Example to show case nesting of parallel steps. + +runnable does not put a limit on the nesting of parallel steps. +Deeply nested pipelines can be hard to read and not all +executors support it. + +Run this pipeline as: + python examples/06-parallel/nesting.py +""" + +from examples.common.functions import hello +from runnable import NotebookTask, Parallel, Pipeline, PythonTask, ShellTask, Stub + + +def traversal(execute: bool = True): + """ + Use the pattern of using "execute" to control the execution of the pipeline. + + The same pipeline can be run independently from the command line. + + WARNING: If the execution is not controlled by "execute", the pipeline will be executed + even during the definition of the branch in parallel steps. + """ + stub_task = Stub(name="hello stub") + + python_task = PythonTask( + name="hello python", + function=hello, + ) + + shell_task = ShellTask( + name="hello shell", + command="echo 'Hello World!'", + ) + + notebook_task = NotebookTask( + name="hello notebook", + notebook="examples/common/simple_notebook.ipynb", + terminate_with_success=True, + ) + + # The pipeline has a mix of tasks. + # The order of execution follows the order of the tasks in the list. + pipeline = Pipeline(steps=[stub_task, python_task, shell_task, notebook_task]) + + if execute: # Do not execute the pipeline if we are using it as a branch + pipeline.execute() + + return pipeline + + +def parallel_pipeline(execute: bool = True): + parallel_step = Parallel( + name="parallel step", + terminate_with_success=True, + branches={"branch1": traversal(execute=False), "branch2": traversal(execute=False)}, + ) + + pipeline = Pipeline(steps=[parallel_step]) + + if execute: + pipeline.execute() + return pipeline + + +def main(): + # Create a parallel step with parallel steps as branches. + parallel_step = Parallel( + name="nested_parallel", + terminate_with_success=True, + branches={"branch1": parallel_pipeline(execute=False), "branch2": parallel_pipeline(execute=False)}, + ) + + pipeline = Pipeline(steps=[parallel_step]) + pipeline.execute() + return pipeline + + +if __name__ == "__main__": + main() diff --git a/examples/06-parallel/nesting.yaml b/examples/06-parallel/nesting.yaml new file mode 100644 index 00000000..32b189f3 --- /dev/null +++ b/examples/06-parallel/nesting.yaml @@ -0,0 +1,62 @@ +branch: &simple_branch + description: | + Use this pattern to define repeatable branch + + This pipeline is similar to one defined in: + examples/02-sequential/traversal.yaml + start_at: hello stub + steps: + hello stub: + type: stub + next: hello python + hello python: + type: task + command_type: python + command: examples.common.functions.hello # dotted path to the function. + next: hello shell + hello shell: + type: task + command_type: shell + command: echo "Hello World!" # Command to run + next: hello notebook + hello notebook: + type: task + command_type: notebook + command: examples/common/simple_notebook.ipynb # The path is relative to the root of the project. + next: success + success: + type: success + fail: + type: fail + + +# This branch is similar to a branch parallel.yaml +nested_branch: &nested_branch + start_at: parallel_step + steps: + parallel_step: + type: parallel + next: success + branches: + branch1: *simple_branch + branch2: *simple_branch + success: + type: success + failure: + type: fail + + +# The pipeline of nested parallel branches +dag: + start_at: parallel_step + steps: + parallel_step: + type: parallel + next: success + branches: + branch1: *nested_branch + branch2: *nested_branch + success: + type: success + failure: + type: fail diff --git a/examples/06-parallel/parallel.py b/examples/06-parallel/parallel.py new file mode 100644 index 00000000..89be2d77 --- /dev/null +++ b/examples/06-parallel/parallel.py @@ -0,0 +1,69 @@ +""" +This example demonstrates the use of the Parallel step. + +The branches of the parallel step are themselves pipelines and can be defined +as shown in 02-sequential/traversal.py. + +WARNING, the function returning the pipeline should not executed +during the definition of the branch in parallel steps. + +Run this pipeline as: + python examples/06-parallel/parallel.py +""" + +from examples.common.functions import hello +from runnable import NotebookTask, Parallel, Pipeline, PythonTask, ShellTask, Stub + + +def traversal(execute: bool = True): + """ + Use the pattern of using "execute" to control the execution of the pipeline. + + The same pipeline can be run independently from the command line. + + WARNING: If the execution is not controlled by "execute", the pipeline will be executed + even during the definition of the branch in parallel steps. + """ + stub_task = Stub(name="hello stub") + + python_task = PythonTask( + name="hello python", + function=hello, + ) + + shell_task = ShellTask( + name="hello shell", + command="echo 'Hello World!'", + ) + + notebook_task = NotebookTask( + name="hello notebook", + notebook="examples/common/simple_notebook.ipynb", + terminate_with_success=True, + ) + + # The pipeline has a mix of tasks. + # The order of execution follows the order of the tasks in the list. + pipeline = Pipeline(steps=[stub_task, python_task, shell_task, notebook_task]) + + if execute: # Do not execute the pipeline if we are using it as a branch + pipeline.execute() + + return pipeline + + +def main(): + parallel_step = Parallel( + name="parallel_step", + terminate_with_success=True, + branches={"branch1": traversal(execute=False), "branch2": traversal(execute=False)}, + ) + + pipeline = Pipeline(steps=[parallel_step]) + + pipeline.execute() + return pipeline + + +if __name__ == "__main__": + main() diff --git a/examples/06-parallel/parallel.yaml b/examples/06-parallel/parallel.yaml new file mode 100644 index 00000000..c76d7502 --- /dev/null +++ b/examples/06-parallel/parallel.yaml @@ -0,0 +1,53 @@ +branch: &branch + description: | + Use this pattern to define repeatable branch + + This pipeline is the same as the one defined in examples/02-sequential/traversal.yaml + start_at: hello stub + steps: + hello stub: + type: stub + next: hello python + hello python: + type: task + command_type: python + command: examples.common.functions.hello # dotted path to the function. + next: hello shell + hello shell: + type: task + command_type: shell + command: echo "Hello World!" # Command to run + next: hello notebook + hello notebook: + type: task + command_type: notebook + command: examples/common/simple_notebook.ipynb # The path is relative to the root of the project. + next: success + success: + type: success + fail: + type: fail + + +dag: + description: | + This example demonstrates the use of the Parallel step. + + parallel step takes a mapping of branches which are pipelines themselves. + + Run this pipeline as: + runnable execute -f examples/06-parallel/parallel.yaml + + + start_at: parallel_step + steps: + parallel_step: + type: parallel + next: success + branches: + branch1: *branch + branch2: *branch + success: + type: success + failure: + type: fail diff --git a/examples/07-map/custom_reducer.py b/examples/07-map/custom_reducer.py new file mode 100644 index 00000000..9d178cfa --- /dev/null +++ b/examples/07-map/custom_reducer.py @@ -0,0 +1,125 @@ +""" +map states allows to repeat a branch for each value of an iterable. + +The below example can written, in python, as: + +chunks = [1, 2, 3] + +for chunk in chunks: + # Any task within the pipeline can access the value of chunk as an argument. + processed = process_chunk(chunk) + + # The value of processed for every iteration is the value returned by the steps + # of the current execution. For example, the value of processed + # for chunk=1, is chunk*10 = 10 for downstream steps. + read_processed_chunk(chunk, processed) + +It is possible to use a custom reducer, for example, this reducer is a max of the collection. +# Once the reducer is applied, processed is reduced to a single value. +assert processed == max(chunk * 10 for chunk in chunks) +""" + +from examples.common.functions import ( + assert_custom_reducer, + process_chunk, + read_processed_chunk, +) +from runnable import Map, NotebookTask, Pipeline, PythonTask, ShellTask + + +def iterable_branch(execute: bool = True): + """ + Use the pattern of using "execute" to control the execution of the pipeline. + + The same pipeline can be run independently from the command line. + + WARNING: If the execution is not controlled by "execute", the pipeline will be executed + even during the definition of the branch in parallel steps. + """ + # The python function to process a single chunk of data. + # In the example, we are multiplying the chunk by 10. + process_chunk_task_python = PythonTask( + name="execute_python", + function=process_chunk, + returns=["processed_python"], + ) + + # return parameters within a map branch have to be unique + # The notebook takes in the value of processed_python as an argument. + # and returns a new parameter "processed_notebook" which is 10*processed_python + process_chunk_task_notebook = NotebookTask( + name="execute_notebook", + notebook="examples/common/process_chunk.ipynb", + returns=["processed_notebook"], + ) + + # following the pattern, the shell takes in the value of processed_notebook as an argument. + # and returns a new parameter "processed_shell" which is 10*processed_notebook. + shell_command = """ + if [ "$processed_python" = $( expr 10 '*' "$chunk" ) ] \ + && [ "$processed_notebook" = $( expr 10 '*' "$processed_python" ) ] ; then + echo "yaay" + else + echo "naay" + exit 1; + fi + export processed_shell=$( expr 10 '*' "$processed_notebook") + """ + + process_chunk_task_shell = ShellTask( + name="execute_shell", + command=shell_command, + returns=["processed_shell"], + ) + + # A downstream step of process_ which reads the parameter "processed". + # The value of processed is within the context of the branch. + # For example, for chunk=1, the value of processed_python is chunk*10 = 10 + # the value of processed_notebook is processed_python*10 = 100 + # the value of processed_shell is processed_notebook*10 = 1000 + read_chunk = PythonTask( + name="read processed chunk", + function=read_processed_chunk, + terminate_with_success=True, + ) + + pipeline = Pipeline( + steps=[process_chunk_task_python, process_chunk_task_notebook, process_chunk_task_shell, read_chunk], + add_terminal_nodes=True, + ) + + if execute: + pipeline.execute() + + return pipeline + + +def main(): + # Create a map state which iterates over a list of chunks. + # chunk is the value of the iterable. + # Upon completion of the map state, all the parameters of the tasks + # within the pipeline will be processed by the reducer. + # In this case, the reducer is the max of all the processed chunks. + map_state = Map( + name="map state", + iterate_on="chunks", + iterate_as="chunk", + reducer="lambda *x: max(x)", + branch=iterable_branch(execute=False), + ) + + collect = PythonTask( + name="collect", + function=assert_custom_reducer, + terminate_with_success=True, + ) + + pipeline = Pipeline(steps=[map_state, collect]) + + pipeline.execute(parameters_file="examples/common/initial_parameters.yaml") + + return pipeline + + +if __name__ == "__main__": + main() diff --git a/examples/07-map/custom_reducer.yaml b/examples/07-map/custom_reducer.yaml new file mode 100644 index 00000000..3189924e --- /dev/null +++ b/examples/07-map/custom_reducer.yaml @@ -0,0 +1,81 @@ +branch: &branch + start_at: execute_python + steps: + execute_python: + type: task + command: examples.common.functions.process_chunk + returns: + - name: processed_python + next: execute_notebook + execute_notebook: + type: task + command_type: notebook + command: examples/common/process_chunk.ipynb + returns: + - name: processed_notebook + next: execute_shell + execute_shell: + type: task + command_type: shell + command: | + if [ "$processed_python" = $( expr 10 '*' "$chunk" ) ] \ + && [ "$processed_notebook" = $( expr 10 '*' "$processed_python" ) ] ; then + echo "yaay" + else + echo "naay" + exit 1; + fi + export processed_shell=$( expr 10 '*' "$processed_notebook") + returns: + - name: processed_shell + next: read_chunk + read_chunk: + type: task + command: examples.common.functions.read_processed_chunk + next: success + success: + type: success + fail: + type: fail + +dag: + description: | + map states allows to repeat a branch for each value of an iterable. + + The below example can written, in python, as: + + chunks = [1, 2, 3] + + for chunk in chunks: + # Any task within the pipeline can access the value of chunk as an argument. + processed = process_chunk(chunk) + + # The value of processed for every iteration is the value returned by the steps + # of the current execution. For example, the value of processed + # for chunk=1, is chunk*10 = 10 for downstream steps. + read_processed_chunk(chunk, processed) + + It is possible to use a custom reducer, for example, this reducer is a max of the collection. + # Once the reducer is applied, processed is reduced to a single value. + assert processed == max(chunk * 10 for chunk in chunks) + + Run this pipeline as: + runnable execute -f examples/07-map/custom_reducer.yaml \ + -p examples/common/initial_parameters.yaml + start_at: map_state + steps: + map_state: + type: map + branch: *branch + iterate_on: chunks + iterate_as: chunk + reducer: "lambda *x: max(x)" + next: collect + collect: + type: task + command: examples.common.functions.assert_custom_reducer + next: success + success: + type: success + fail: + type: fail diff --git a/examples/07-map/map.py b/examples/07-map/map.py new file mode 100644 index 00000000..e8afb0e8 --- /dev/null +++ b/examples/07-map/map.py @@ -0,0 +1,129 @@ +""" +map states allows to repeat a branch for each value of an iterable. + +The below example can written, in python, as: + +chunks = [1, 2, 3] + +for chunk in chunks: + # Any task within the pipeline can access the value of chunk as an argument. + processed = process_chunk(chunk) + + # The value of processed for every iteration is the value returned by the steps + # of the current execution. For example, the value of processed + # for chunk=1, is chunk*10 = 10 for downstream steps. + read_processed_chunk(chunk, processed) + +# Outside of loop, processed is a list of all the processed chunks. +# This is also called as the reduce pattern. +assert processed == [chunk * 10 for chunk in chunks] + +Run this pipeline as: + python examples/07-map/map.py +""" + +from examples.common.functions import ( + assert_default_reducer, + process_chunk, + read_processed_chunk, +) +from runnable import Map, NotebookTask, Pipeline, PythonTask, ShellTask + + +def iterable_branch(execute: bool = True): + """ + Use the pattern of using "execute" to control the execution of the pipeline. + + The same pipeline can be run independently from the command line. + + WARNING: If the execution is not controlled by "execute", the pipeline will be executed + even during the definition of the branch in parallel steps. + """ + # The python function to process a single chunk of data. + # In the example, we are multiplying the chunk by 10. + process_chunk_task_python = PythonTask( + name="execute_python", + function=process_chunk, + returns=["processed_python"], + ) + + # return parameters within a map branch have to be unique + # The notebook takes in the value of processed_python as an argument. + # and returns a new parameter "processed_notebook" which is 10*processed_python + process_chunk_task_notebook = NotebookTask( + name="execute_notebook", + notebook="examples/common/process_chunk.ipynb", + returns=["processed_notebook"], + ) + + # following the pattern, the shell takes in the value of processed_notebook as an argument. + # and returns a new parameter "processed_shell" which is 10*processed_notebook. + shell_command = """ + if [ "$processed_python" = $( expr 10 '*' "$chunk" ) ] \ + && [ "$processed_notebook" = $( expr 10 '*' "$processed_python" ) ] ; then + echo "yaay" + else + echo "naay" + exit 1; + fi + export processed_shell=$( expr 10 '*' "$processed_notebook") + """ + + process_chunk_task_shell = ShellTask( + name="execute_shell", + command=shell_command, + returns=["processed_shell"], + ) + + # A downstream step of process_ which reads the parameter "processed". + # The value of processed is within the context of the branch. + # For example, for chunk=1, the value of processed_python is chunk*10 = 10 + # the value of processed_notebook is processed_python*10 = 100 + # the value of processed_shell is processed_notebook*10 = 1000 + read_chunk = PythonTask( + name="read processed chunk", + function=read_processed_chunk, + terminate_with_success=True, + ) + + pipeline = Pipeline( + steps=[process_chunk_task_python, process_chunk_task_notebook, process_chunk_task_shell, read_chunk], + add_terminal_nodes=True, + ) + + if execute: + pipeline.execute() + + return pipeline + + +def main(): + # Create a map state which iterates over a list of chunks. + # chunk is the value of the iterable. + map_state = Map( + name="map state", + iterate_on="chunks", + iterate_as="chunk", + branch=iterable_branch(execute=False), + ) + + # Outside of the loop, processed is a list of all the processed chunks. + # This is also called as the reduce pattern. + # the value of processed_python is [10, 20, 30] + # the value of processed_notebook is [100, 200, 300] + # the value of processed_shell is [1000, 2000, 3000] + collect = PythonTask( + name="collect", + function=assert_default_reducer, + terminate_with_success=True, + ) + + pipeline = Pipeline(steps=[map_state, collect]) + + pipeline.execute(parameters_file="examples/common/initial_parameters.yaml") + + return pipeline + + +if __name__ == "__main__": + main() diff --git a/examples/07-map/map.yaml b/examples/07-map/map.yaml new file mode 100644 index 00000000..d61828cd --- /dev/null +++ b/examples/07-map/map.yaml @@ -0,0 +1,82 @@ +branch: &branch + start_at: execute_python + steps: + execute_python: + type: task + command: examples.common.functions.process_chunk + returns: + - name: processed_python + next: execute_notebook + execute_notebook: + type: task + command_type: notebook + command: examples/common/process_chunk.ipynb + returns: + - name: processed_notebook + next: execute_shell + execute_shell: + type: task + command_type: shell + command: | + if [ "$processed_python" = $( expr 10 '*' "$chunk" ) ] \ + && [ "$processed_notebook" = $( expr 10 '*' "$processed_python" ) ] ; then + echo "yaay" + else + echo "naay" + exit 1; + fi + export processed_shell=$( expr 10 '*' "$processed_notebook") + returns: + - name: processed_shell + next: read_chunk + read_chunk: + type: task + command: examples.common.functions.read_processed_chunk + next: success + success: + type: success + fail: + type: fail + + +dag: + description: | + map states allows to repeat a branch for each value of an iterable. + + The below example can written, in python, as: + + chunks = [1, 2, 3] + + for chunk in chunks: + # Any task within the pipeline can access the value of chunk as an argument. + processed = process_chunk(chunk) + + # The value of processed for every iteration is the value returned by the steps + # of the current execution. For example, the value of processed + # for chunk=1, is chunk*10 = 10 for downstream steps. + read_processed_chunk(chunk, processed) + + # Outside of loop, processed is a list of all the processed chunks. + # This is also called as the reduce pattern. + assert processed == [chunk * 10 for chunk in chunks] + + Run this pipeline as: + runnable execute -f examples/07-map/map.yaml \ + -p examples/common/initial_parameters.yaml + + start_at: map_state + steps: + map_state: + type: map + branch: *branch + iterate_on: chunks + iterate_as: chunk + next: collect + collect: + type: task + command: examples.common.functions.assert_default_reducer + next: success + success: + type: success + fail: + type: fail diff --git a/examples/08-mocking/default.yaml b/examples/08-mocking/default.yaml new file mode 100644 index 00000000..deca7551 --- /dev/null +++ b/examples/08-mocking/default.yaml @@ -0,0 +1,13 @@ +# Mocked executors are handy to temporarily disable +# all executions in the pipeline. +# Traversal nodes, like task, parallel, map work +# still function + +# Any executor can be made mocking by changing the type to "mocked" +# Arbitrary key words are allowed in mocked executor. + + +executor: + type: mocked + config: + what: I am allowed diff --git a/examples/08-mocking/patching.yaml b/examples/08-mocking/patching.yaml new file mode 100644 index 00000000..c81e64a0 --- /dev/null +++ b/examples/08-mocking/patching.yaml @@ -0,0 +1,19 @@ +# Mocked executors are handy to temporarily disable +# all executions in the pipeline. +# Traversal nodes, like task, parallel, map work +# still function + +# Any executor can be made mocking by changing the type to "mocked" +# Arbitrary key words are allowed in mocked executor. + + +executor: + type: mocked + config: + patches: + hello python: + command: examples.common.functions.mocked_hello + hello shell: + command: echo "hello from mocked" + hello notebook: + command: examples/common/simple_notebook_mocked.ipynb diff --git a/examples/09-retry/config.yaml b/examples/09-retry/config.yaml new file mode 100644 index 00000000..60aaa083 --- /dev/null +++ b/examples/09-retry/config.yaml @@ -0,0 +1,4 @@ +executor: + type: retry + config: + run_id: grating-hugle-0551 diff --git a/examples/09-retry/python_tasks.py b/examples/09-retry/python_tasks.py new file mode 100644 index 00000000..7c4648f5 --- /dev/null +++ b/examples/09-retry/python_tasks.py @@ -0,0 +1,60 @@ +""" +The below example showcases setting up known initial parameters for a pipeline +of only python tasks + +The initial parameters as defined in the yaml file are: + simple: 1 + complex_param: + x: 10 + y: "hello world!!" + +runnable allows using pydantic models for deeply nested parameters and +casts appropriately based on annotation. eg: read_initial_params_as_pydantic + +If no annotation is provided, the parameter is assumed to be a dictionary. +eg: read_initial_params_as_json + +You can set the initial parameters from environment variables as well. +eg: Any environment variable prefixed by "RUNNABLE_PRM_" will be picked up by runnable + +Run this pipeline as: + python examples/03-parameters/static_parameters_python.py + +""" + +import os + +from examples.common.functions import ( + read_initial_params_as_json, + read_initial_params_as_pydantic, +) +from runnable import Pipeline, PythonTask + + +def main(): + read_params_as_pydantic = PythonTask( + function=read_initial_params_as_pydantic, + name="read_params_as_pydantic", + ) + + read_params_as_json = PythonTask( + function=read_initial_params_as_json, + terminate_with_success=True, + name="read_params_json", + ) + + pipeline = Pipeline( + steps=[read_params_as_pydantic, read_params_as_json], + add_terminal_nodes=True, + ) + + _ = pipeline.execute(parameters_file="examples/common/initial_parameters.yaml") + + return pipeline + + +if __name__ == "__main__": + # Any parameter prefixed by "RUNNABLE_PRM_" will be picked up by runnable + os.environ["RUNNABLE_PRM_envvar"] = "from env" + main() + del os.environ["RUNNABLE_PRM_envvar"] diff --git a/examples/09-retry/python_tasks.yaml b/examples/09-retry/python_tasks.yaml new file mode 100644 index 00000000..f86302d7 --- /dev/null +++ b/examples/09-retry/python_tasks.yaml @@ -0,0 +1,37 @@ +dag: + description: | + The below example showcases setting up known initial parameters for a pipeline + of only python tasks + + The initial parameters as defined in the yaml file are: + simple: 1 + complex_param: + x: 10 + y: "hello world!!" + + runnable allows using pydantic models for deeply nested parameters and + casts appropriately based on annotation. eg: read_initial_params_as_pydantic + + If no annotation is provided, the parameter is assumed to be a dictionary. + eg: read_initial_params_as_json + + You can set the initial parameters from environment variables as well. + eg: Any environment variable prefixed by "RUNNABLE_PRM_" will be picked up by runnable + + Run this pipeline by: + runnable execute -f 03-parameters/static_parameters_python.yaml \ + -p examples/common/initial_parameters.yaml + start_at: read_params_as_pydantic + steps: + read_params_as_pydantic: + type: task + command: examples.common.functions.read_initial_params_as_pydantic + next: read_params_json + read_params_json: + type: task + command: examples.common.functions.read_initial_params_as_json + next: success + success: + type: success + fail: + type: fail diff --git a/examples/README.md b/examples/README.md index 476478ae..5e743f21 100644 --- a/examples/README.md +++ b/examples/README.md @@ -4,7 +4,7 @@ All examples have both python SDK and yaml representations. Please use this as an index to find specific example. -- common: Has python functions/notebooks/scripts that are used across the examples +- [common](./common/): Has python functions/notebooks/scripts that are used across the examples - 01-tasks: Examples of the tasks that can be part of the pipeline. @@ -12,51 +12,82 @@ Please use this as an index to find specific example. - [python_tasks.py](./01-tasks/python_tasks.py), [python_tasks.yaml](./01-tasks/python_tasks.yaml): uses python functions as tasks. The stdout/stderr of all the tasks are captured and stored in the catalog. + - [notebook.py](./01-tasks/notebook.py), [notebook.yaml](./01-tasks/notebook.yaml): uses notebooks as tasks The executed notebook is captured in the catalog. + - [scripts.py](./01-tasks/scripts.py), [scripts.yaml](./01-tasks/scripts.yaml): uses shell scripts as tasks The stdout/stderr of all scripts are captured and stored in the catalog. --- -The above examples showcase executable units of the pipeline. -The next section has examples on stitching these tasks together for complex operations. +This section has examples on stitching these tasks together for complex operations. +We only show sequential pipeline while parallel and dynamic pipelines are +shown in later sections. - 02-sequential: Examples of stitching tasks together including behavior in case of failures. - - [traversal.py](./02-sequential/traversal.py), [traversal.yaml](./02-sequential/traversal.yaml): A pipeline which is a mixed bag of notebooks, python functions and - shell scripts. + - [traversal.py](./02-sequential/traversal.py), [traversal.yaml](./02-sequential/traversal.yaml): A pipeline which is a mixed bag of notebooks, python functions and shell scripts. + - [default_fail.py](./02-sequential/default_fail.py), [default_fail.yaml](./02-sequential/default_fail.yaml): The default failure behavior. + - [on_failure_fail](./02-sequential/on_failure_fail.py), [on_faliure_fail.yaml](./02-sequential/on_failure_fail.yaml) On failure of a step, do some action and fail - - [on_failure_success.py](./02-sequential/on_failure_succeed.py), [on_failure_success.yaml](./02-sequential/on_failure_succeed.yaml): On failure of a step, take a different route + + - [on_failure_success.py](./02-sequential/on_failure_succeed.py), [on_failure_success.yaml](./02-sequential/on_failure_succeed.yaml): On failure of a step, take a different route and succeed --- -The above examples show stitching complex operations of the pipeline. -The next section has examples on communicating between tasks during execution. +This section has examples on communicating between tasks during execution. +We only focusses on "parameters" while the next section focusses on "files". - 03: Examples of passing parameters between tasks of a pipeline. Below table summarizes the input/output types of different task types. For ex: notebooks can only take JSON serializable parameters as input but can return json/pydantic/objects. Any python object that could be serialized using "dill" can be used. - | | Input | Output | - | -------- | :---------------------: | :----------------------: | - | python | json,pydantic, object | json, pydantic, object | - | notebook | json | json, pydantic, object | - | shell | json | json | + | | Input | Output | + | -------- | :---------------------: | :----------------------: | + | python | json, pydantic, object | json, pydantic, object | + | notebook | json | json, pydantic, object | + | shell | json | json | - [static_parameters_python.py](./03-parameters/static_parameters_python.py), [static_parameters_python.yaml](./03-parameters/static_parameters_python.yaml): A pipeline to show the access of static or known parameters by python tasks. + Any environment variables prefixed by RUNNABLE_PRM_ are recognized as parameters and + can override parameters defined by the file. + - [static_parameters_non_python.py](./03-parameters/static_parameters_non_python.py), [static_parameters_non_python.yaml](./03-parameters/static_parameters_non_python.yaml): A pipeline to show the access of static or known parameters by python tasks. - - [passing_parameters_python.py](./03-parameters/passing_parameters_python.py), [passing_parameters_python.yaml](./03-parameters/passing_parameters_python.yaml): shows the mechanism of passing parameters (simple python datatypes, objects, pydantic models) and registering metrics between python tasks. + Any environment variables prefixed by RUNNABLE_PRM_ are recognized as parameters and + can override parameters defined by the file. + + - [passing_parameters_python.py](./03-parameters/passing_parameters_python.py), [passing_parameters_python.yaml](./03-parameters/passing_parameters_python.yaml): shows the mechanism of passing parameters (JSON serializable, objects, pydantic models) and registering metrics between python tasks. - [passing_parameters_notebook.py](./03-parameters/passing_parameters_notebook.py), [passing_parameters_notebook.yaml](./03-parameters/passing_parameters_notebook.yaml): shows the mechanism of passing parameters between notebook tasks. Please note that - we cannot inject pydantic models or objects into the notebook. + we cannot inject pydantic models or objects into the notebook but can capture them as return values. - [passing_parameters_shell.py](./03-parameters/passing_parameters_shell.py), [passing_parameters_shell.yaml](./03-parameters/passing_parameters_shell.yaml): shows the mechanism of passing parameters between shell tasks. Please note that - we cannot inject pydantic models or objects into shells. + we cannot inject/capture pydantic models or objects in shells. + +--- + +This section focusses on moving files between tasks. + +- 04: Examples of moving files between tasks of the pipeline. + + - [catalog.py](./04-catalog/catalog.py), [catalog.yaml](./04-catalog/catalog.yaml): demonstrate moving files between python, shell and notebook tasks. + +--- + +This section focusses on exposing secrets to tasks. All secrets are exposed as environment +variables. The secrets are destroyed after the completion of the task. + + +--- + +Below are the examples of constructing parallel graphs and nested graphs. + +Creating parallel graphs is simple as the branches are themselves pipelines. diff --git a/examples/common/functions.py b/examples/common/functions.py index 36eb3d9a..489f1a64 100644 --- a/examples/common/functions.py +++ b/examples/common/functions.py @@ -1,4 +1,5 @@ -from typing import Dict, Union +from pathlib import Path +from typing import Dict, List, Union import pandas as pd from pydantic import BaseModel @@ -9,6 +10,11 @@ def hello(): print("Hello World!") +def mocked_hello(): + "Mock of the hello function" + print("Hello from mock") + + def raise_ex(): "A function that raises an exception" raise Exception("This is an exception") @@ -24,12 +30,15 @@ def read_initial_params_as_pydantic( floater: float, stringer: str, pydantic_param: ComplexParams, + envvar: str, ): + print(envvar) assert integer == 1 assert floater == 3.14 assert stringer == "hello" assert pydantic_param.x == 10 assert pydantic_param.foo == "bar" + assert envvar == "from env" def read_initial_params_as_json( @@ -87,3 +96,64 @@ def read_unpickled_parameter( assert pydantic_param.x == 10 assert pydantic_param.foo == "bar" assert score == 0.9 + + +def write_files(): + data = {"calories": [420, 380, 390], "duration": [50, 40, 45]} + df = pd.DataFrame(data) + + df.to_csv("df.csv", index=False) + + Path("data_folder").mkdir(parents=True, exist_ok=True) + with open("data_folder/data.txt", "w", encoding="utf-8") as f: + f.write("hello world") + + +def read_files(): + df = pd.read_csv("df.csv") + assert df.shape == (3, 2) + + with open("data_folder/data.txt", "r", encoding="utf-8") as f: + data = f.read() + + assert data.strip() == "hello world" + + +def process_chunk(chunk: int): + """ + An example function that processes a chunk of data. + We are multiplying the chunk by 10. + """ + return chunk * 10 + + +def read_processed_chunk(chunk: int, processed_python: int, processed_notebook: int, processed_shell: int): + """ + A downstream step of process_chunk of map state which reads the processed chunk. + Since the process_chunk returns the chunk multiplied by 10, we assert that. + """ + assert chunk * 10 == processed_python + assert processed_python * 10 == processed_notebook + assert processed_notebook * 10 == processed_shell + + +def assert_default_reducer( + processed_python: List[int], processed_notebook: List[int], processed_shell: List[int], chunks: List[int] +) -> int: + """ + Demonstrates the default reducer which just returns the list of processed chunks. + """ + assert processed_python == [chunk * 10 for chunk in chunks] + assert processed_notebook == [chunk * 100 for chunk in chunks] + assert processed_shell == [chunk * 1000 for chunk in chunks] + + +def assert_custom_reducer( + processed_python: int, processed_notebook: int, processed_shell: int, chunks: List[int] +) -> int: + """ + Asserts the custom reducer returns the max of all the processed chunks. + """ + assert processed_python == max(chunk * 10 for chunk in chunks) + assert processed_notebook == max(chunk * 100 for chunk in chunks) + assert processed_shell == max(chunk * 1000 for chunk in chunks) diff --git a/examples/common/initial_parameters.yaml b/examples/common/initial_parameters.yaml index eb987ed4..a60e9a47 100644 --- a/examples/common/initial_parameters.yaml +++ b/examples/common/initial_parameters.yaml @@ -4,3 +4,5 @@ stringer : hello pydantic_param: x: 10 foo: bar + +chunks: [1, 2, 3] diff --git a/examples/common/process_chunk.ipynb b/examples/common/process_chunk.ipynb new file mode 100644 index 00000000..2ac01390 --- /dev/null +++ b/examples/common/process_chunk.ipynb @@ -0,0 +1,60 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": null, + "id": "41a71aa7", + "metadata": { + "tags": [ + "parameters" + ] + }, + "outputs": [], + "source": [ + "chunk = None\n", + "processed_python = None" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "764f661d", + "metadata": {}, + "outputs": [], + "source": [ + "assert chunk*10 == processed_python" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "3e98e89e-765c-42d4-81ea-c371c2eab14d", + "metadata": {}, + "outputs": [], + "source": [ + "processed_notebook = processed_python*10" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.8.16" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/examples/common/read_files.ipynb b/examples/common/read_files.ipynb new file mode 100644 index 00000000..a9093c14 --- /dev/null +++ b/examples/common/read_files.ipynb @@ -0,0 +1,43 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": null, + "id": "41a71aa7", + "metadata": {}, + "outputs": [], + "source": [ + "import pandas as pd\n", + "\n", + "df = pd.read_csv(\"df.csv\")\n", + "assert df.shape == (3, 2)\n", + "\n", + "with open(\"data_folder/data.txt\", \"r\", encoding=\"utf-8\") as f:\n", + " data = f.read()\n", + "\n", + "assert data.strip() == \"hello world\"" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.8.16" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/examples/common/read_parameters_out.ipynb b/examples/common/read_parameters_out.ipynb deleted file mode 100644 index ea6525f8..00000000 --- a/examples/common/read_parameters_out.ipynb +++ /dev/null @@ -1,105 +0,0 @@ -{ - "cells": [ - { - "cell_type": "code", - "execution_count": 1, - "id": "41a71aa7", - "metadata": { - "ploomber": { - "timestamp_end": 1713380823.765499, - "timestamp_start": 1713380823.765069 - }, - "tags": [ - "parameters" - ] - }, - "outputs": [], - "source": [ - "integer = None\n", - "stringer = None\n", - "floater = None\n", - "pydantic_param = None\n", - "score = None" - ] - }, - { - "cell_type": "code", - "execution_count": 2, - "id": "d88d58c6", - "metadata": { - "ploomber": { - "timestamp_end": 1713380823.765846, - "timestamp_start": 1713380823.765527 - }, - "tags": [ - "injected-parameters" - ] - }, - "outputs": [], - "source": [ - "# Injected parameters\n", - "integer = 1\n", - "floater = 3.14\n", - "stringer = \"hello\"\n", - "pydantic_param = {\"x\": 10, \"foo\": \"bar\"}\n" - ] - }, - { - "cell_type": "code", - "execution_count": 3, - "id": "3e98e89e-765c-42d4-81ea-c371c2eab14d", - "metadata": { - "ploomber": { - "timestamp_end": 1713380823.766088, - "timestamp_start": 1713380823.765864 - } - }, - "outputs": [], - "source": [ - "assert integer == 1\n", - "assert stringer == \"hello\"\n", - "assert floater == 3.14" - ] - }, - { - "cell_type": "code", - "execution_count": 4, - "id": "faf6769e", - "metadata": { - "ploomber": { - "timestamp_end": 1713380823.766474, - "timestamp_start": 1713380823.766105 - } - }, - "outputs": [], - "source": [ - "from examples.common.functions import ComplexParams\n", - "\n", - "pydantic_param = ComplexParams(**pydantic_param)\n", - "assert pydantic_param.x == 10\n", - "assert pydantic_param.foo == \"bar\"" - ] - } - ], - "metadata": { - "kernelspec": { - "display_name": "Python 3 (ipykernel)", - "language": "python", - "name": "python3" - }, - "language_info": { - "codemirror_mode": { - "name": "ipython", - "version": 3 - }, - "file_extension": ".py", - "mimetype": "text/x-python", - "name": "python", - "nbconvert_exporter": "python", - "pygments_lexer": "ipython3", - "version": "3.8.16" - } - }, - "nbformat": 4, - "nbformat_minor": 5 -} diff --git a/examples/common/simple_notebook_mocked.ipynb b/examples/common/simple_notebook_mocked.ipynb new file mode 100644 index 00000000..cb0218d1 --- /dev/null +++ b/examples/common/simple_notebook_mocked.ipynb @@ -0,0 +1,46 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": null, + "id": "3e98e89e-765c-42d4-81ea-c371c2eab14d", + "metadata": {}, + "outputs": [], + "source": [ + "def function():\n", + " print(\"hello from mock\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "8eac7a3f", + "metadata": {}, + "outputs": [], + "source": [ + "function()" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.8.16" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/examples/common/simple_notebook_out.ipynb b/examples/common/simple_notebook_out.ipynb index 91e156fe..e3562adc 100644 --- a/examples/common/simple_notebook_out.ipynb +++ b/examples/common/simple_notebook_out.ipynb @@ -3,11 +3,11 @@ { "cell_type": "code", "execution_count": 1, - "id": "c8a68d0d", + "id": "bd34d156", "metadata": { "ploomber": { - "timestamp_end": 1713380822.228675, - "timestamp_start": 1713380822.228447 + "timestamp_end": 1714453073.951735, + "timestamp_start": 1714453073.951505 }, "tags": [ "injected-parameters" @@ -24,8 +24,8 @@ "id": "3e98e89e-765c-42d4-81ea-c371c2eab14d", "metadata": { "ploomber": { - "timestamp_end": 1713380822.22899, - "timestamp_start": 1713380822.228748 + "timestamp_end": 1714453073.951955, + "timestamp_start": 1714453073.95176 } }, "outputs": [], @@ -40,8 +40,8 @@ "id": "8eac7a3f", "metadata": { "ploomber": { - "timestamp_end": 1713380822.229158, - "timestamp_start": 1713380822.229008 + "timestamp_end": 1714453073.952089, + "timestamp_start": 1714453073.951969 } }, "outputs": [ diff --git a/examples/common/write_parameters_out.ipynb b/examples/common/write_parameters_out.ipynb deleted file mode 100644 index e6c06d4a..00000000 --- a/examples/common/write_parameters_out.ipynb +++ /dev/null @@ -1,100 +0,0 @@ -{ - "cells": [ - { - "cell_type": "code", - "execution_count": 1, - "id": "41a71aa7", - "metadata": { - "ploomber": { - "timestamp_end": 1713380822.509565, - "timestamp_start": 1713380822.508958 - }, - "tags": [ - "parameters" - ] - }, - "outputs": [], - "source": [ - "import pandas as pd\n", - "\n", - "from examples.common.functions import ComplexParams\n", - "\n", - "pydantic_param = ComplexParams(x=10, foo=\"bar\")\n" - ] - }, - { - "cell_type": "code", - "execution_count": 2, - "id": "d53507d8", - "metadata": { - "ploomber": { - "timestamp_end": 1713380822.509736, - "timestamp_start": 1713380822.509595 - }, - "tags": [ - "injected-parameters" - ] - }, - "outputs": [], - "source": [ - "# Injected parameters\n" - ] - }, - { - "cell_type": "code", - "execution_count": 3, - "id": "764f661d", - "metadata": { - "ploomber": { - "timestamp_end": 1713380822.511416, - "timestamp_start": 1713380822.509754 - } - }, - "outputs": [], - "source": [ - "data = {\"calories\": [420, 380, 390], \"duration\": [50, 40, 45]}\n", - "\n", - "df = pd.DataFrame(data)" - ] - }, - { - "cell_type": "code", - "execution_count": 4, - "id": "3e98e89e-765c-42d4-81ea-c371c2eab14d", - "metadata": { - "ploomber": { - "timestamp_end": 1713380822.511728, - "timestamp_start": 1713380822.51144 - } - }, - "outputs": [], - "source": [ - "integer = 1\n", - "floater = 3.14\n", - "stringer = \"hello\"\n", - "score = 0.9" - ] - } - ], - "metadata": { - "kernelspec": { - "display_name": "Python 3 (ipykernel)", - "language": "python", - "name": "python3" - }, - "language_info": { - "codemirror_mode": { - "name": "ipython", - "version": 3 - }, - "file_extension": ".py", - "mimetype": "text/x-python", - "name": "python", - "nbconvert_exporter": "python", - "pygments_lexer": "ipython3", - "version": "3.8.16" - } - }, - "nbformat": 4, - "nbformat_minor": 5 -} diff --git a/examples/configs/argo-config-full.yaml b/examples/configs/argo-config-full.yaml index 0600cc5b..ced0ab90 100644 --- a/examples/configs/argo-config-full.yaml +++ b/examples/configs/argo-config-full.yaml @@ -5,12 +5,14 @@ executor: max_workflow_duration_in_seconds: 86400 # Apply to spec node_selector: parallelism: 1 #apply to spec - service_account_name: pipeline-runner + service_account_name1: pipeline-runner resources: limits: memory: 140Mi cpu: 100m requests: + memory: 100Mi + cpu: 10m retry_strategy: limit: 0 #int retryPolicy: "always" diff --git a/examples/configs/fs-catalog-run_log.yaml b/examples/configs/fs-catalog-run_log.yaml deleted file mode 100644 index 03487500..00000000 --- a/examples/configs/fs-catalog-run_log.yaml +++ /dev/null @@ -1,5 +0,0 @@ -catalog: - type: file-system # (1) - -run_log_store: - type: file-system # (1) diff --git a/examples/configs/fs-catalog.yaml b/examples/configs/fs-catalog.yaml deleted file mode 100644 index 48597735..00000000 --- a/examples/configs/fs-catalog.yaml +++ /dev/null @@ -1,2 +0,0 @@ -catalog: - type: file-system # (1) diff --git a/examples/configs/fs-run_log.yaml b/examples/configs/fs-run_log.yaml deleted file mode 100644 index 896955d3..00000000 --- a/examples/configs/fs-run_log.yaml +++ /dev/null @@ -1,2 +0,0 @@ -run_log_store: - type: file-system diff --git a/examples/configs/local-container.yaml b/examples/configs/local-container.yaml index b16589a4..ac46db23 100644 --- a/examples/configs/local-container.yaml +++ b/examples/configs/local-container.yaml @@ -2,14 +2,6 @@ executor: type: "local-container" # (1) config: docker_image: runnable:latest # (2) - environment: - key: value # (3) run_log_store: # (4) - type: file-system - -catalog: - type: file-system - -secrets: - type: do-nothing + type: chunked-fs diff --git a/examples/configs/secrets-env-default.yaml b/examples/configs/secrets-env-default.yaml deleted file mode 100644 index 33975b9b..00000000 --- a/examples/configs/secrets-env-default.yaml +++ /dev/null @@ -1,2 +0,0 @@ -secrets: - type: env-secrets-manager diff --git a/examples/configs/secrets-env-ps.yaml b/examples/configs/secrets-env-ps.yaml deleted file mode 100644 index 3aaad3bd..00000000 --- a/examples/configs/secrets-env-ps.yaml +++ /dev/null @@ -1,4 +0,0 @@ -secrets: - type: env-secrets-manager - config: - prefix: "runnable_" diff --git a/pyproject.toml b/pyproject.toml index 6781373a..86726c1c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -97,7 +97,6 @@ runnable = 'runnable.cli:cli' [tool.poetry.plugins."secrets"] "do-nothing" = "runnable.secrets:DoNothingSecretManager" "dotenv" = "runnable.extensions.secrets.dotenv.implementation:DotEnvSecrets" -"env-secrets-manager" = "runnable.extensions.secrets.env_secrets.implementation:EnvSecretsManager" # Plugins for Run Log store [tool.poetry.plugins."run_log_store"] diff --git a/runnable/__init__.py b/runnable/__init__.py index e21c7739..2228f6e6 100644 --- a/runnable/__init__.py +++ b/runnable/__init__.py @@ -2,6 +2,7 @@ # TODO: Might need to add Rich to pyinstaller part import logging +import os from logging.config import dictConfig from rich.console import Console @@ -29,6 +30,8 @@ pickled, ) +os.environ["_PLOOMBER_TELEMETRY_DEBUG"] = "false" + ## TODO: Summary should be a bit better for catalog. ## If the execution fails, hint them about the retry executor. # Make the retry executor loose! diff --git a/runnable/datastore.py b/runnable/datastore.py index bdf41a9e..c7125351 100644 --- a/runnable/datastore.py +++ b/runnable/datastore.py @@ -402,7 +402,10 @@ def search_step_by_internal_name(self, i_name: str) -> Tuple[StepLog, Union[Bran """ dot_path = i_name.split(".") if len(dot_path) == 1: - return self.steps[i_name], None + try: + return self.steps[i_name], None + except KeyError as e: + raise exceptions.StepLogNotFoundError(self.run_id, i_name) from e current_steps = self.steps current_step = None diff --git a/runnable/entrypoints.py b/runnable/entrypoints.py index bdddeb7e..4a1b6da8 100644 --- a/runnable/entrypoints.py +++ b/runnable/entrypoints.py @@ -60,6 +60,8 @@ def prepare_configurations( variables = utils.gather_variables() templated_configuration = {} + configuration_file = os.environ.get("RUNNABLE_CONFIGURATION_FILE", configuration_file) + if configuration_file: templated_configuration = utils.load_yaml(configuration_file) or {} @@ -144,8 +146,8 @@ def prepare_configurations( def execute( - configuration_file: str, pipeline_file: str, + configuration_file: str = "", tag: str = "", run_id: str = "", parameters_file: str = "", @@ -196,6 +198,10 @@ def execute( run_context.progress = progress executor.execute_graph(dag=run_context.dag) # type: ignore + if not executor._local: + executor.send_return_code(stage="traversal") + return + run_log = run_context.run_log_store.get_run_log_by_id(run_id=run_context.run_id, full=False) if run_log.status == defaults.SUCCESS: @@ -205,6 +211,10 @@ def execute( except Exception as e: # noqa: E722 console.print(e, style=defaults.error_style) progress.update(pipeline_execution_task, description="[red] Errored execution", completed=True) + run_log = run_context.run_log_store.get_run_log_by_id(run_id=run_context.run_id, full=False) + run_log.status = defaults.FAIL + run_context.run_log_store.add_branch_log(run_log, run_context.run_id) + raise e executor.send_return_code() @@ -235,6 +245,8 @@ def execute_single_node( """ from runnable import nodes + configuration_file = os.environ.get("RUNNABLE_CONFIGURATION_FILE", configuration_file) + run_context = prepare_configurations( configuration_file=configuration_file, pipeline_file=pipeline_file, @@ -422,6 +434,8 @@ def fan( """ from runnable import nodes + configuration_file = os.environ.get("RUNNABLE_CONFIGURATION_FILE", configuration_file) + run_context = prepare_configurations( configuration_file=configuration_file, pipeline_file=pipeline_file, diff --git a/runnable/extensions/executor/__init__.py b/runnable/extensions/executor/__init__.py index 791c483f..4353c9e3 100644 --- a/runnable/extensions/executor/__init__.py +++ b/runnable/extensions/executor/__init__.py @@ -185,14 +185,11 @@ def _sync_catalog(self, stage: str, synced_catalogs=None) -> Optional[List[DataC data_catalogs = [] for name_pattern in node_catalog_settings.get(stage) or []: if stage == "get": - get_catalog_progress = self._context.progress.add_task(f"Getting from catalog {name_pattern}", total=1) data_catalog = self._context.catalog_handler.get( name=name_pattern, run_id=self._context.run_id, compute_data_folder=compute_data_folder ) - self._context.progress.update(get_catalog_progress, completed=True, visible=False, refresh=True) elif stage == "put": - put_catalog_progress = self._context.progress.add_task(f"Putting in catalog {name_pattern}", total=1) data_catalog = self._context.catalog_handler.put( name=name_pattern, run_id=self._context.run_id, @@ -200,8 +197,6 @@ def _sync_catalog(self, stage: str, synced_catalogs=None) -> Optional[List[DataC synced_catalogs=synced_catalogs, ) - self._context.progress.update(put_catalog_progress, completed=True, visible=False) - logger.debug(f"Added data catalog: {data_catalog} to step log") data_catalogs.extend(data_catalog) diff --git a/runnable/extensions/executor/argo/implementation.py b/runnable/extensions/executor/argo/implementation.py index 135f3e6b..ef894e7b 100644 --- a/runnable/extensions/executor/argo/implementation.py +++ b/runnable/extensions/executor/argo/implementation.py @@ -1033,6 +1033,9 @@ def _gather_task_templates_of_dag( if working_on.node_type not in ["success", "fail"] and working_on._get_on_failure_node(): failure_node = dag.get_node_by_name(working_on._get_on_failure_node()) + render_obj = get_renderer(working_on)(executor=self, node=failure_node) + render_obj.render(list_of_iter_values=list_of_iter_values.copy()) + failure_template_name = self.get_clean_name(failure_node) # If a task template for clean name exists, retrieve it failure_template = templates.get( @@ -1040,7 +1043,6 @@ def _gather_task_templates_of_dag( DagTaskTemplate(name=failure_template_name, template=failure_template_name), ) failure_template.depends.append(f"{clean_name}.Failed") - templates[failure_template_name] = failure_template # If we are in a map node, we need to add the values as arguments diff --git a/runnable/extensions/executor/local_container/implementation.py b/runnable/extensions/executor/local_container/implementation.py index f1e794f1..be711283 100644 --- a/runnable/extensions/executor/local_container/implementation.py +++ b/runnable/extensions/executor/local_container/implementation.py @@ -5,7 +5,7 @@ from pydantic import Field from rich import print -from runnable import defaults, integration, utils +from runnable import defaults, utils from runnable.datastore import StepLog from runnable.defaults import TypeMapVariable from runnable.extensions.executor import GenericExecutor @@ -145,16 +145,6 @@ def trigger_job(self, node: BaseNode, map_variable: TypeMapVariable = None, **kw logger.debug("Here is the resolved executor config") logger.debug(executor_config) - if executor_config.get("run_in_local", False): - # Do not change config but only validate the configuration. - # Trigger the job on local system instead of a container - integration.validate(self, self._context.run_log_store) - integration.validate(self, self._context.catalog_handler) - integration.validate(self, self._context.secrets_handler) - - self.execute_node(node=node, map_variable=map_variable, **kwargs) - return - command = utils.get_node_execution_command(node, map_variable=map_variable) self._spin_container( @@ -172,7 +162,7 @@ def trigger_job(self, node: BaseNode, map_variable: TypeMapVariable = None, **kw "Note: If you do not see any docker issue from your side and the code works properly on local execution" "please raise a bug report." ) - logger.warning(msg) + logger.error(msg) step_log.status = defaults.FAIL self._context.run_log_store.add_step_log(step_log, self._context.run_id) @@ -212,6 +202,7 @@ def _spin_container( f"Please provide a docker_image using executor_config of the step {node.name} or at global config" ) + print("container", self._volumes) # TODO: Should consider using getpass.getuser() when running the docker container? Volume permissions container = client.containers.create( image=docker_image, @@ -260,7 +251,9 @@ class LocalContainerComputeFileSystemRunLogstore(BaseIntegration): service_provider = "file-system" # The actual implementation of the service def configure_for_traversal(self, **kwargs): - from runnable.extensions.run_log_store.file_system.implementation import FileSystemRunLogstore + from runnable.extensions.run_log_store.file_system.implementation import ( + FileSystemRunLogstore, + ) self.executor = cast(LocalContainerExecutor, self.executor) self.service = cast(FileSystemRunLogstore, self.service) @@ -272,7 +265,9 @@ def configure_for_traversal(self, **kwargs): } def configure_for_execution(self, **kwargs): - from runnable.extensions.run_log_store.file_system.implementation import FileSystemRunLogstore + from runnable.extensions.run_log_store.file_system.implementation import ( + FileSystemRunLogstore, + ) self.executor = cast(LocalContainerExecutor, self.executor) self.service = cast(FileSystemRunLogstore, self.service) @@ -280,6 +275,40 @@ def configure_for_execution(self, **kwargs): self.service.log_folder = self.executor._container_log_location +class LocalContainerComputeChunkedFS(BaseIntegration): + """ + Integration pattern between Local container and File System catalog + """ + + executor_type = "local-container" + service_type = "run_log_store" # One of secret, catalog, datastore + service_provider = "chunked-fs" # The actual implementation of the service + + def configure_for_traversal(self, **kwargs): + from runnable.extensions.run_log_store.chunked_file_system.implementation import ( + ChunkedFileSystemRunLogStore, + ) + + self.executor = cast(LocalContainerExecutor, self.executor) + self.service = cast(ChunkedFileSystemRunLogStore, self.service) + + write_to = self.service.log_folder + self.executor._volumes[str(Path(write_to).resolve())] = { + "bind": f"{self.executor._container_log_location}", + "mode": "rw", + } + + def configure_for_execution(self, **kwargs): + from runnable.extensions.run_log_store.chunked_file_system.implementation import ( + ChunkedFileSystemRunLogStore, + ) + + self.executor = cast(LocalContainerExecutor, self.executor) + self.service = cast(ChunkedFileSystemRunLogStore, self.service) + + self.service.log_folder = self.executor._container_log_location + + class LocalContainerComputeFileSystemCatalog(BaseIntegration): """ Integration pattern between Local container and File System catalog @@ -290,7 +319,9 @@ class LocalContainerComputeFileSystemCatalog(BaseIntegration): service_provider = "file-system" # The actual implementation of the service def configure_for_traversal(self, **kwargs): - from runnable.extensions.catalog.file_system.implementation import FileSystemCatalog + from runnable.extensions.catalog.file_system.implementation import ( + FileSystemCatalog, + ) self.executor = cast(LocalContainerExecutor, self.executor) self.service = cast(FileSystemCatalog, self.service) @@ -302,7 +333,9 @@ def configure_for_traversal(self, **kwargs): } def configure_for_execution(self, **kwargs): - from runnable.extensions.catalog.file_system.implementation import FileSystemCatalog + from runnable.extensions.catalog.file_system.implementation import ( + FileSystemCatalog, + ) self.executor = cast(LocalContainerExecutor, self.executor) self.service = cast(FileSystemCatalog, self.service) diff --git a/runnable/extensions/executor/mocked/implementation.py b/runnable/extensions/executor/mocked/implementation.py index ab6c6dd3..e6e74802 100644 --- a/runnable/extensions/executor/mocked/implementation.py +++ b/runnable/extensions/executor/mocked/implementation.py @@ -18,7 +18,7 @@ def create_executable(params: Dict[str, Any], model: Type[BaseTaskType], node_na class EasyModel(model): # type: ignore model_config = ConfigDict(extra="ignore") - swallow_all = EasyModel(**params, node_name=node_name) + swallow_all = EasyModel(node_name=node_name, **params) return swallow_all @@ -26,6 +26,8 @@ class MockedExecutor(GenericExecutor): service_name: str = "mocked" _local_executor: bool = True + model_config = ConfigDict(extra="ignore") + patches: Dict[str, Any] = Field(default_factory=dict) @property @@ -64,6 +66,10 @@ def execute_from_graph(self, node: BaseNode, map_variable: TypeMapVariable = Non step_log.step_type = node.node_type step_log.status = defaults.PROCESSING + self._context.run_log_store.add_step_log(step_log, self._context.run_id) + + logger.info(f"Executing node: {node.get_summary()}") + # Add the step log to the database as per the situation. # If its a terminal node, complete it now if node.node_type in ["success", "fail"]: @@ -132,3 +138,17 @@ def _resolve_executor_config(self, node: BaseNode): def execute_job(self, node: TaskNode): pass + + def execute_node(self, node: BaseNode, map_variable: TypeMapVariable = None, **kwargs): + """ + The entry point for all executors apart from local. + We have already prepared for node execution. + + Args: + node (BaseNode): The node to execute + map_variable (dict, optional): If the node is part of a map, send in the map dictionary. Defaults to None. + + Raises: + NotImplementedError: _description_ + """ + ... diff --git a/runnable/extensions/executor/retry/implementation.py b/runnable/extensions/executor/retry/implementation.py index 09256dd1..4f4cb376 100644 --- a/runnable/extensions/executor/retry/implementation.py +++ b/runnable/extensions/executor/retry/implementation.py @@ -6,6 +6,7 @@ from runnable.datastore import RunLog from runnable.defaults import TypeMapVariable from runnable.extensions.executor import GenericExecutor +from runnable.extensions.nodes import TaskNode from runnable.nodes import BaseNode logger = logging.getLogger(defaults.LOGGER_NAME) @@ -31,6 +32,7 @@ class RetryExecutor(GenericExecutor): _local: bool = True _original_run_log: Optional[RunLog] = None + _restart_initiated: bool = False @property def _context(self): @@ -38,7 +40,7 @@ def _context(self): @cached_property def original_run_log(self): - self.original_run_log = self._context.run_log_store.get_run_log_by_id( + return self._context.run_log_store.get_run_log_by_id( run_id=self.run_id, full=True, ) @@ -140,10 +142,14 @@ def _is_step_eligible_for_rerun(self, node: BaseNode, map_variable: TypeMapVaria node_step_log_name = node._get_step_log_name(map_variable=map_variable) logger.info(f"Scanning previous run logs for node logs of: {node_step_log_name}") + if self._restart_initiated: + return True + try: previous_attempt_log, _ = self.original_run_log.search_step_by_internal_name(node_step_log_name) except exceptions.StepLogNotFoundError: logger.warning(f"Did not find the node {node.name} in previous run log") + self._restart_initiated = True return True # We should re-run the node. logger.info(f"The original step status: {previous_attempt_log.status}") @@ -152,7 +158,11 @@ def _is_step_eligible_for_rerun(self, node: BaseNode, map_variable: TypeMapVaria return False # We need not run the node logger.info(f"The new execution should start executing graph from this node {node.name}") + self._restart_initiated = True return True def execute_node(self, node: BaseNode, map_variable: TypeMapVariable = None, **kwargs): self._execute_node(node, map_variable=map_variable, **kwargs) + + def execute_job(self, node: TaskNode): + pass diff --git a/runnable/extensions/nodes.py b/runnable/extensions/nodes.py index 21477585..dd76bd5f 100644 --- a/runnable/extensions/nodes.py +++ b/runnable/extensions/nodes.py @@ -5,7 +5,7 @@ from collections import OrderedDict from copy import deepcopy from datetime import datetime -from typing import Any, Dict, List, Optional, Tuple, Union, cast +from typing import Annotated, Any, Callable, Dict, List, Optional, Tuple, Union, cast from pydantic import ( ConfigDict, @@ -14,10 +14,15 @@ field_serializer, field_validator, ) -from typing_extensions import Annotated from runnable import datastore, defaults, utils -from runnable.datastore import JsonParameter, MetricParameter, ObjectParameter, StepLog +from runnable.datastore import ( + JsonParameter, + MetricParameter, + ObjectParameter, + Parameter, + StepLog, +) from runnable.defaults import TypeMapVariable from runnable.graph import Graph, create_graph from runnable.nodes import CompositeNode, ExecutableNode, TerminalNode @@ -46,8 +51,6 @@ def parse_from_config(cls, config: Dict[str, Any]) -> "TaskNode": task_config = {k: v for k, v in config.items() if k not in TaskNode.model_fields.keys()} node_config = {k: v for k, v in config.items() if k in TaskNode.model_fields.keys()} - task_config["node_name"] = config.get("name") - executable = create_task(task_config) return cls(executable=executable, **node_config, **task_config) @@ -543,10 +546,14 @@ def execute_as_graph(self, map_variable: TypeMapVariable = None, **kwargs): iterate_on = None try: iterate_on = self._context.run_log_store.get_parameters(self._context.run_id)[self.iterate_on].get_value() - except KeyError: + except KeyError as e: raise Exception( - f"Expected parameter {self.iterate_on} not present in Run Log parameters, was it ever set before?" - ) + ( + f"Expected parameter {self.iterate_on}", + "not present in Run Log parameters", + "was it ever set before?", + ) + ) from e if not isinstance(iterate_on, list): raise Exception("Only list is allowed as a valid iterator type") @@ -599,29 +606,44 @@ def fan_in(self, map_variable: TypeMapVariable = None, **kwargs): # The final value of the parameter is the result of the reduce function. reducer_f = self.get_reducer_function() - if map_variable: - # If we are in a map state already, the param should have an index of the map variable. - for _, v in map_variable.items(): - for branch_return in self.branch_returns: - param_name, _ = branch_return - to_reduce = [] - for iter_variable in iterate_on: - to_reduce.append(params[f"{iter_variable}_{param_name}"].get_value()) + def update_param(params: Dict[str, Parameter], reducer_f: Callable, map_prefix: str = ""): + from runnable.extensions.executor.mocked.implementation import ( + MockedExecutor, + ) - param_name = f"{v}_{param_name}" - params[param_name].value = reducer_f(to_reduce) - params[param_name].reduced = True - else: for branch_return in self.branch_returns: param_name, _ = branch_return to_reduce = [] for iter_variable in iterate_on: - to_reduce.append(params[f"{iter_variable}_{param_name}"].get_value()) - - params[param_name].value = reducer_f(*to_reduce) + try: + to_reduce.append(params[f"{iter_variable}_{param_name}"].get_value()) + except KeyError as e: + if isinstance(self._context.executor, MockedExecutor): + pass + else: + raise Exception( + ( + f"Expected parameter {iter_variable}_{param_name}", + "not present in Run Log parameters", + "was it ever set before?", + ) + ) from e + + param_name = f"{map_prefix}{param_name}" + if to_reduce: + params[param_name].value = reducer_f(*to_reduce) + else: + params[param_name].value = "" params[param_name].reduced = True + if map_variable: + # If we are in a map state already, the param should have an index of the map variable. + for _, v in map_variable.items(): + update_param(params, reducer_f, map_prefix=f"{v}_") + else: + update_param(params, reducer_f) + self._context.run_log_store.set_parameters(parameters=params, run_id=self._context.run_id) diff --git a/runnable/extensions/secrets/env_secrets/__init__.py b/runnable/extensions/secrets/env_secrets/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/runnable/extensions/secrets/env_secrets/implementation.py b/runnable/extensions/secrets/env_secrets/implementation.py deleted file mode 100644 index 5b88d1bd..00000000 --- a/runnable/extensions/secrets/env_secrets/implementation.py +++ /dev/null @@ -1,42 +0,0 @@ -import logging -import os - -from runnable import defaults, exceptions -from runnable.secrets import BaseSecrets - -logger = logging.getLogger(defaults.LOGGER_NAME) - - -class EnvSecretsManager(BaseSecrets): - """ - A secret manager via environment variables. - - This secret manager returns nothing if the key does not match - """ - - service_name: str = "env-secrets-manager" - prefix: str = "" - suffix: str = "" - - def get(self, name: str = "", **kwargs) -> str: - """ - If a name is provided, we look for that in the environment. - If a environment variable by that name is not found, we raise an Exception. - - If a name is not provided, we return an empty dictionary. - - Args: - name (str): The name of the secret to retrieve - - Raises: - Exception: If the secret by the name is not found. - - Returns: - [type]: [description] - """ - - try: - return os.environ[f"{self.prefix}{name}{self.suffix}"] - except KeyError as _e: - logger.exception(f"Secret {self.prefix}{name}{self.suffix} not found in environment") - raise exceptions.SecretNotFoundError(secret_name=name, secret_setting="environment") from _e diff --git a/runnable/graph.py b/runnable/graph.py index f9a54172..e66de8e6 100644 --- a/runnable/graph.py +++ b/runnable/graph.py @@ -74,6 +74,7 @@ def get_node_by_internal_name(self, internal_name: str) -> "BaseNode": for _, value in self.nodes.items(): if value.internal_name == internal_name: return value + print("graph", internal_name) raise exceptions.NodeNotFoundError(internal_name) def __str__(self): # pragma: no cover diff --git a/runnable/parameters.py b/runnable/parameters.py index 7cc59680..d1767644 100644 --- a/runnable/parameters.py +++ b/runnable/parameters.py @@ -36,7 +36,7 @@ def get_user_set_parameters(remove: bool = False) -> Dict[str, JsonParameter]: try: parameters[key.lower()] = JsonParameter(kind="json", value=json.loads(value)) except json.decoder.JSONDecodeError: - logger.error(f"Parameter {key} could not be JSON decoded, adding the literal value") + logger.warning(f"Parameter {key} could not be JSON decoded, adding the literal value") parameters[key.lower()] = JsonParameter(kind="json", value=value) if remove: diff --git a/runnable/sdk.py b/runnable/sdk.py index d14d8ce6..fd736f85 100644 --- a/runnable/sdk.py +++ b/runnable/sdk.py @@ -2,6 +2,7 @@ import logging import os +import re from abc import ABC, abstractmethod from pathlib import Path from typing import Any, Callable, Dict, List, Optional, Union @@ -25,7 +26,7 @@ from rich.table import Column from typing_extensions import Self -from runnable import console, defaults, entrypoints, graph, utils +from runnable import console, defaults, entrypoints, exceptions, graph, utils from runnable.extensions.nodes import ( FailNode, MapNode, @@ -310,8 +311,6 @@ class NotebookTask(BaseTask): """ notebook: str = Field(serialization_alias="command") - - notebook_output_path: Optional[str] = Field(default=None, alias="notebook_output_path", validate_default=True) optional_ploomber_args: Optional[Dict[str, Any]] = Field(default=None, alias="optional_ploomber_args") @computed_field @@ -591,6 +590,7 @@ def model_post_init(self, __context: Any) -> None: Any definition of pipeline should have one node that terminates with success. """ + # TODO: Bug with repeat names success_path: List[StepType] = [] on_failure_paths: List[List[StepType]] = [] @@ -637,7 +637,8 @@ def model_post_init(self, __context: Any) -> None: self._dag.check_graph() def return_dag(self) -> graph.Graph: - return self._dag + dag_definition = self._dag.model_dump(by_alias=True, exclude_none=True) + return graph.create_graph(dag_definition) def execute( self, @@ -708,7 +709,8 @@ def execute( caller_stack = inspect.stack()[1] relative_to_root = str(Path(caller_stack.filename).relative_to(Path.cwd())) - module_to_call = f"{relative_to_root.replace('/', '.').replace('.py', '')}.{caller_stack.function}" + module_name = re.sub(r"\b.py\b", "", relative_to_root.replace("/", ".")) + module_to_call = f"{module_name}.{caller_stack.function}" run_context.pipeline_file = f"{module_to_call}.py" @@ -728,15 +730,20 @@ def execute( pipeline_execution_task = progress.add_task("[dark_orange] Starting execution .. ", total=1) run_context.executor.execute_graph(dag=run_context.dag) + if not run_context.executor._local: + return {} + run_log = run_context.run_log_store.get_run_log_by_id(run_id=run_context.run_id, full=False) if run_log.status == defaults.SUCCESS: progress.update(pipeline_execution_task, description="[green] Success", completed=True) else: progress.update(pipeline_execution_task, description="[red] Failed", completed=True) + raise exceptions.ExecutionFailedError(run_context.run_id) except Exception as e: # noqa: E722 console.print(e, style=defaults.error_style) progress.update(pipeline_execution_task, description="[red] Errored execution", completed=True) + raise if run_context.executor._local: return run_context.run_log_store.get_run_log_by_id(run_id=run_context.run_id) diff --git a/runnable/tasks.py b/runnable/tasks.py index 3b2403c8..98d90997 100644 --- a/runnable/tasks.py +++ b/runnable/tasks.py @@ -8,11 +8,14 @@ import subprocess import sys from datetime import datetime +from pathlib import Path from pickle import PicklingError from string import Template -from typing import Any, Dict, List, Literal, Optional, Tuple +from typing import Any, Dict, List, Literal, Tuple -from pydantic import BaseModel, ConfigDict, Field, ValidationInfo, field_validator +from pydantic import BaseModel, ConfigDict, Field, field_validator + +# from rich import print from rich.console import Console from stevedore import driver @@ -34,9 +37,6 @@ # TODO: Can we add memory peak, cpu usage, etc. to the metrics? -console = Console(file=io.StringIO()) - - class TaskReturns(BaseModel): name: str kind: Literal["json", "object", "metric"] = Field(default="json") @@ -46,7 +46,6 @@ class BaseTaskType(BaseModel): """A base task class which does the execution of command defined by the user.""" task_type: str = Field(serialization_alias="command_type") - node_name: str = Field(exclude=True) secrets: List[str] = Field(default_factory=list) returns: List[TaskReturns] = Field(default_factory=list, alias="returns") @@ -153,7 +152,7 @@ def execution_context(self, map_variable: TypeMapVariable = None, allow_complex: if not allow_complex: params = {key: value for key, value in params.items() if isinstance(value, JsonParameter)} - log_file_name = self.node_name # + ".execution.log" + log_file_name = self._context.executor._context_node.internal_name if map_variable: for _, value in map_variable.items(): log_file_name += "_" + str(value) @@ -163,16 +162,17 @@ def execution_context(self, map_variable: TypeMapVariable = None, allow_complex: log_file = open(log_file_name, "w") parameters_in = copy.deepcopy(params) - f = io.StringIO() + task_console = Console(file=io.StringIO()) try: with contextlib.redirect_stdout(f): # with contextlib.nullcontext(): - yield params - print(console.file.getvalue()) # type: ignore + yield params, task_console + print(task_console.file.getvalue()) # type: ignore except Exception as e: # pylint: disable=broad-except logger.exception(e) finally: + task_console = None # type: ignore print(f.getvalue()) # print to console log_file.write(f.getvalue()) # Print to file @@ -233,7 +233,7 @@ def execute_command( """Execute the notebook as defined by the command.""" attempt_log = StepAttempt(status=defaults.FAIL, start_time=str(datetime.now())) - with self.execution_context(map_variable=map_variable) as params, self.expose_secrets() as _: + with self.execution_context(map_variable=map_variable) as (params, task_console), self.expose_secrets() as _: module, func = utils.get_module_and_attr_names(self.command) sys.path.insert(0, os.getcwd()) # Need to add the current directory to path imported_module = importlib.import_module(module) @@ -245,7 +245,7 @@ def execute_command( logger.info(f"Calling {func} from {module} with {filtered_parameters}") user_set_parameters = f(**filtered_parameters) # This is a tuple or single value except Exception as e: - console.log(e, style=defaults.error_style, markup=False) + task_console.log(e, style=defaults.error_style, markup=False) raise exceptions.CommandCallError(f"Function call: {self.command} did not succeed.\n") from e attempt_log.input_parameters = params.copy() @@ -289,8 +289,8 @@ def execute_command( except Exception as _e: msg = f"Call to the function {self.command} did not succeed.\n" attempt_log.message = msg - console.print_exception(show_locals=False) - console.log(_e, style=defaults.error_style) + task_console.print_exception(show_locals=False) + task_console.log(_e, style=defaults.error_style) attempt_log.end_time = str(datetime.now()) @@ -302,25 +302,25 @@ class NotebookTaskType(BaseTaskType): task_type: str = Field(default="notebook", serialization_alias="command_type") command: str - notebook_output_path: Optional[str] = Field(default=None, validate_default=True) optional_ploomber_args: dict = {} @field_validator("command") @classmethod - def notebook_should_end_with_ipynb(cls, command: str): + def notebook_should_end_with_ipynb(cls, command: str) -> str: if not command.endswith(".ipynb"): raise Exception("Notebook task should point to a ipynb file") return command - @field_validator("notebook_output_path") - @classmethod - def correct_notebook_output_path(cls, notebook_output_path: str, info: ValidationInfo): - if notebook_output_path: - return notebook_output_path + @property + def notebook_output_path(self) -> str: + node_name = self._context.executor._context_node.internal_name + sane_name = "".join(x for x in node_name if x.isalnum()) + + output_path = Path(".", self.command) + file_name = output_path.parent / (output_path.stem + f"{sane_name}_out.ipynb") - command = info.data["command"] - return "".join(command.split(".")[:-1]) + "_out.ipynb" + return str(file_name) def get_cli_options(self) -> Tuple[str, dict]: return "notebook", {"command": self.command, "notebook-output-path": self.notebook_output_path} @@ -344,17 +344,26 @@ def execute_command( import ploomber_engine as pm from ploomber_engine.ipython import PloomberClient - notebook_output_path = self.notebook_output_path or "" + notebook_output_path = self.notebook_output_path - with self.execution_context( - map_variable=map_variable, allow_complex=False - ) as params, self.expose_secrets() as _: + with self.execution_context(map_variable=map_variable, allow_complex=False) as ( + params, + _, + ), self.expose_secrets() as _: if map_variable: for key, value in map_variable.items(): notebook_output_path += "_" + str(value) - params[key] = value + params[key] = JsonParameter(kind="json", value=value) + + # Remove any {v}_unreduced parameters from the parameters + copy_params = copy.deepcopy(params) + unprocessed_params = [k for k, v in copy_params.items() if not v.reduced] - notebook_params = {k: v.get_value() for k, v in params.items()} + for key in list(copy_params.keys()): + if any(key.endswith(f"_{k}") for k in unprocessed_params): + del copy_params[key] + + notebook_params = {k: v.get_value() for k, v in copy_params.items()} ploomber_optional_args = self.optional_ploomber_args @@ -377,6 +386,11 @@ def execute_command( try: for task_return in self.returns: param_name = Template(task_return.name).safe_substitute(map_variable) # type: ignore + + if map_variable: + for _, v in map_variable.items(): + param_name = f"{v}_{param_name}" + output_parameters[param_name] = task_return_to_parameter( task_return=task_return, value=namespace[task_return.name], @@ -453,95 +467,98 @@ def execute_command( secret_value = context.run_context.secrets_handler.get(key) subprocess_env[key] = secret_value - with self.execution_context(map_variable=map_variable, allow_complex=False) as params: - subprocess_env.update({k: v.get_value() for k, v in params.items()}) - - # Json dumps all runnable environment variables - for key, value in subprocess_env.items(): - if isinstance(value, str): - continue - subprocess_env[key] = json.dumps(value) - - collect_delimiter = "=== COLLECT ===" - - command = self.command.strip() + f" && echo '{collect_delimiter}' && env" - logger.info(f"Executing shell command: {command}") - - capture = False - return_keys = {x.name: x for x in self.returns} - - proc = subprocess.Popen( - command, - shell=True, - env=subprocess_env, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - text=True, - ) - result = proc.communicate() - logger.debug(result) - logger.info(proc.returncode) - - if proc.returncode != 0: - msg = ",".join(result[1].split("\n")) - attempt_log.status = defaults.FAIL - attempt_log.end_time = str(datetime.now()) - attempt_log.message = msg - console.print(msg, style=defaults.error_style) - return attempt_log - - # for stderr - for line in result[1].split("\n"): - if line.strip() == "": - continue - console.print(line, style=defaults.warning_style) + try: + with self.execution_context(map_variable=map_variable, allow_complex=False) as (params, task_console): + subprocess_env.update({k: v.get_value() for k, v in params.items()}) + + # Json dumps all runnable environment variables + for key, value in subprocess_env.items(): + if isinstance(value, str): + continue + subprocess_env[key] = json.dumps(value) + + collect_delimiter = "=== COLLECT ===" + + command = self.command.strip() + f" && echo '{collect_delimiter}' && env" + logger.info(f"Executing shell command: {command}") + + capture = False + return_keys = {x.name: x for x in self.returns} + + proc = subprocess.Popen( + command, + shell=True, + env=subprocess_env, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + ) + result = proc.communicate() + logger.debug(result) + logger.info(proc.returncode) + + if proc.returncode != 0: + msg = ",".join(result[1].split("\n")) + task_console.print(msg, style=defaults.error_style) + raise exceptions.CommandCallError(msg) + + # for stderr + for line in result[1].split("\n"): + if line.strip() == "": + continue + task_console.print(line, style=defaults.warning_style) - output_parameters: Dict[str, Parameter] = {} - metrics: Dict[str, Parameter] = {} + output_parameters: Dict[str, Parameter] = {} + metrics: Dict[str, Parameter] = {} - # only from stdout - for line in result[0].split("\n"): - if line.strip() == "": - continue + # only from stdout + for line in result[0].split("\n"): + if line.strip() == "": + continue - logger.info(line) - console.print(line) + logger.info(line) + task_console.print(line) - if line.strip() == collect_delimiter: - # The lines from now on should be captured - capture = True - continue + if line.strip() == collect_delimiter: + # The lines from now on should be captured + capture = True + continue - if capture: - key, value = line.strip().split("=", 1) - if key in return_keys: - task_return = return_keys[key] + if capture: + key, value = line.strip().split("=", 1) + if key in return_keys: + task_return = return_keys[key] - try: - value = json.loads(value) - except json.JSONDecodeError: - value = value + try: + value = json.loads(value) + except json.JSONDecodeError: + value = value - output_parameter = task_return_to_parameter( - task_return=task_return, - value=value, - ) + output_parameter = task_return_to_parameter( + task_return=task_return, + value=value, + ) - if task_return.kind == "metric": - metrics[task_return.name] = output_parameter + if task_return.kind == "metric": + metrics[task_return.name] = output_parameter - param_name = task_return.name - if map_variable: - for _, v in map_variable.items(): - param_name = f"{param_name}_{v}" + param_name = task_return.name + if map_variable: + for _, v in map_variable.items(): + param_name = f"{v}_{param_name}" - output_parameters[param_name] = output_parameter + output_parameters[param_name] = output_parameter - attempt_log.output_parameters = output_parameters - attempt_log.user_defined_metrics = metrics - params.update(output_parameters) + attempt_log.output_parameters = output_parameters + attempt_log.user_defined_metrics = metrics + params.update(output_parameters) - attempt_log.status = defaults.SUCCESS + attempt_log.status = defaults.SUCCESS + except exceptions.CommandCallError as e: + msg = f"Call to the command {self.command} did not succeed" + logger.exception(msg) + logger.exception(e) + attempt_log.status = defaults.FAIL attempt_log.end_time = str(datetime.now()) return attempt_log diff --git a/tests/runnable/extensions/secrets/test_env_secrets_manager.py b/tests/runnable/extensions/secrets/test_env_secrets_manager.py deleted file mode 100644 index 60294d45..00000000 --- a/tests/runnable/extensions/secrets/test_env_secrets_manager.py +++ /dev/null @@ -1,48 +0,0 @@ -import pytest -import os - -from runnable.extensions.secrets.env_secrets.implementation import EnvSecretsManager -from runnable import exceptions - - -def test_env_secrets_manager_raises_error_if_name_provided_and_not_present(): - manager = EnvSecretsManager() - - with pytest.raises(exceptions.SecretNotFoundError): - manager.get("environment") - - -def test_env_secrets_returns_secret_if_present_in_environment(monkeypatch): - monkeypatch.setenv("TEST_SECRET", "test_secret") - - manager = EnvSecretsManager() - assert manager.get("TEST_SECRET") == "test_secret" - - -def test_env_secrets_returns_secret_if_present_in_environment_with_prefix(monkeypatch): - monkeypatch.setenv("PREFIX_TEST_SECRET", "test_secret") - - manager = EnvSecretsManager(prefix="PREFIX_") - assert manager.get("TEST_SECRET") == "test_secret" - - -def test_env_secrets_returns_secret_if_present_in_environment_with_suffix(monkeypatch): - monkeypatch.setenv("TEST_SECRET_SUFFIX", "test_secret") - - manager = EnvSecretsManager(suffix="_SUFFIX") - assert manager.get("TEST_SECRET") == "test_secret" - - -def test_env_secrets_returns_secret_if_present_in_environment_with_suffix_and_prefix(monkeypatch): - monkeypatch.setenv("PREFIX_TEST_SECRET_SUFFIX", "test_secret") - - manager = EnvSecretsManager(suffix="_SUFFIX", prefix="PREFIX_") - assert manager.get("TEST_SECRET") == "test_secret" - - -def test_env_secrets_returns_matched_secrets_with_suffix(monkeypatch): - monkeypatch.setenv("TEST_SECRET_SUFFIX", "test_secret") - - manager = EnvSecretsManager(suffix="_SUFFIX") - - assert manager.get("TEST_SECRET") == "test_secret" diff --git a/tests/runnable/extensions/test_node_extensions.py b/tests/runnable/extensions/test_node_extensions.py index 06c43844..3218b933 100644 --- a/tests/runnable/extensions/test_node_extensions.py +++ b/tests/runnable/extensions/test_node_extensions.py @@ -2,7 +2,6 @@ from runnable import defaults from runnable.extensions import nodes as nodes - from runnable.tasks import BaseTaskType @@ -13,7 +12,7 @@ def instantiable_base_class(monkeypatch): def test_task_node_parse_from_config_seperates_task_from_node_confifg(mocker, monkeypatch): - base_task = BaseTaskType(node_name="test", task_type="dummy") + base_task = BaseTaskType(task_type="dummy") mock_create_task = mocker.MagicMock(return_value=base_task) command_config = {"to_be_sent_to_task": "yes"} @@ -26,8 +25,6 @@ def test_task_node_parse_from_config_seperates_task_from_node_confifg(mocker, mo monkeypatch.setattr(nodes, "create_task", mock_create_task) task_node = nodes.TaskNode.parse_from_config({**node_config, **command_config}) - command_config["node_name"] = "test" - mock_create_task.assert_called_once_with(command_config) assert task_node.executable == base_task @@ -39,7 +36,7 @@ def test_task_node_mocks_if_mock_is_true(mocker, monkeypatch): monkeypatch.setattr(nodes.TaskNode, "_context", mock_context) mock_context.run_log_store.create_attempt_log = mocker.MagicMock(return_value=mock_attempt_log) - base_task = BaseTaskType(node_name="test", task_type="dummy") + base_task = BaseTaskType(task_type="dummy") task_node = nodes.TaskNode(name="test", internal_name="test", next_node="next_node", executable=base_task) attempt_log = task_node.execute(mock=True) diff --git a/tests/runnable/test_tasks.py b/tests/runnable/test_tasks.py deleted file mode 100644 index 145efbce..00000000 --- a/tests/runnable/test_tasks.py +++ /dev/null @@ -1,21 +0,0 @@ -import pytest - - -from runnable import tasks - - -@pytest.fixture -def configuration(): - return {"node_name": "dummy", "task_type": "dummy"} - - -def test_base_task_execute_command_raises_not_implemented_error(configuration): - base_execution_type = tasks.BaseTaskType(**configuration) - - with pytest.raises(NotImplementedError): - base_execution_type.execute_command() - - -def test_notebook_raises_exception_if_command_is_not_a_notebook(): - with pytest.raises(Exception): - tasks.NotebookTaskType(command="path to notebook") diff --git a/tests/test_examples.py b/tests/test_examples.py index 9eabf0da..974a4263 100644 --- a/tests/test_examples.py +++ b/tests/test_examples.py @@ -1,181 +1,174 @@ import importlib import os import subprocess -from contextlib import contextmanager, nullcontext -from pathlib import Path +from contextlib import contextmanager import pytest from runnable import exceptions from runnable.entrypoints import execute -# # (file, is_fail?, kwargs) -python_examples = [ - ("01-tasks/notebook", False, None), - ("01-tasks/python_tasks", False, None), - ("01-tasks/scripts", False, None), - ("01-tasks/stub", False, None), - ("02-sequential/default_fail", False, None), - ("02-sequential/on_failure_fail", False, None), - ("02-sequential/on_failure_succeed", False, None), - ("02-sequential/traversal", False, None), - ("03-parameters/passing_parameters_notebook", False, None), - ("03-parameters/passing_parameters_python", False, None), - ("03-parameters/passing_parameters_shell", False, None), - ("03-parameters/static_parameters_non_python", False, None), - ("03-parameters/static_parameters_python", False, None), -] - def list_python_examples(): for example in python_examples: yield example +@contextmanager +def container_context(): + os.environ["RUNNABLE_CONFIGURATION_FILE"] = "examples/configs/local-container.yaml" + os.environ["RUNNABLE_PRM_envvar"] = "from env" + yield + del os.environ["RUNNABLE_CONFIGURATION_FILE"] + del os.environ["RUNNABLE_PRM_envvar"] + + +@contextmanager +def chunked_fs_context(): + os.environ["RUNNABLE_CONFIGURATION_FILE"] = "examples/configs/chunked-fs-run_log.yaml" + os.environ["RUNNABLE_PRM_envvar"] = "from env" + yield + del os.environ["RUNNABLE_CONFIGURATION_FILE"] + del os.environ["RUNNABLE_PRM_envvar"] + + +@contextmanager +def mocked_context(): + os.environ["RUNNABLE_CONFIGURATION_FILE"] = "examples/08-mocking/default.yaml" + os.environ["RUNNABLE_PRM_envvar"] = "from env" + yield + del os.environ["RUNNABLE_CONFIGURATION_FILE"] + del os.environ["RUNNABLE_PRM_envvar"] + + +@contextmanager +def patched_context(): + os.environ["RUNNABLE_CONFIGURATION_FILE"] = "examples/08-mocking/patching.yaml" + os.environ["RUNNABLE_PRM_envvar"] = "from env" + yield + del os.environ["RUNNABLE_CONFIGURATION_FILE"] + del os.environ["RUNNABLE_PRM_envvar"] + + +@contextmanager +def default_context(): + os.environ["RUNNABLE_PRM_envvar"] = "from env" + yield + del os.environ["RUNNABLE_PRM_envvar"] + + +@contextmanager +def argo_context(): + os.environ["RUNNABLE_CONFIGURATION_FILE"] = "examples/configs/argo-config.yaml" + yield + subprocess.run(["argo", "lint", "--offline", "argo-pipeline.yaml"], check=True) + del os.environ["RUNNABLE_CONFIGURATION_FILE"] + + +contexts = [default_context, chunked_fs_context, mocked_context, argo_context] + +python_examples = [ + ("01-tasks/notebook", False, []), + ("01-tasks/python_tasks", False, []), + ("01-tasks/scripts", False, []), + ("01-tasks/stub", False, []), + ("02-sequential/default_fail", True, []), + ("02-sequential/on_failure_fail", True, []), + ("02-sequential/on_failure_succeed", False, []), + ("02-sequential/traversal", False, []), + ("03-parameters/passing_parameters_notebook", False, []), + ("03-parameters/passing_parameters_python", False, []), + ("03-parameters/passing_parameters_shell", False, []), + ("03-parameters/static_parameters_non_python", False, []), + ("03-parameters/static_parameters_python", False, []), + ("04-catalog/catalog", False, [mocked_context]), + ("06-parallel/parallel", False, []), + ("06-parallel/nesting", False, []), + ("07-map/map", False, []), + ("07-map/custom_reducer", False, []), +] + + @pytest.mark.parametrize("example", list_python_examples()) +@pytest.mark.parametrize("context", contexts) # @pytest.mark.no_cover @pytest.mark.e2e -def test_python_examples(example): +def test_python_examples(example, context): print(f"Testing {example}...") - mod, status, context = example + mod, status, ignore_contexts = example + if context in ignore_contexts: + return - if not context: - context = nullcontext() - else: - context = context() + context = context() imported_module = importlib.import_module(f"examples.{mod.replace('/', '.')}") f = getattr(imported_module, "main") - try: - with context: + with context: + try: f() - except exceptions.ExecutionFailedError: - if not status: - raise + except exceptions.ExecutionFailedError: + print("Example failed") + if not status: + raise -# examples = [ -# ("concepts/catalog.yaml", False, {"configuration_file": "examples/configs/fs-catalog.yaml"}), -# ("concepts/map.yaml", False, {}), -# ("concepts/map_shell.yaml", False, {}), -# ("concepts/nesting.yaml", False, {}), -# ("concepts/notebook_native_parameters.yaml", False, {"parameters_file": "examples/concepts/parameters.yaml"}), -# ("concepts/parallel.yaml", False, {}), -# ("concepts/simple_notebook.yaml", False, {}), -# ("concepts/simple.yaml", False, {}), -# ("catalog.yaml", False, {"configuration_file": "examples/configs/fs-catalog.yaml"}), -# ("default-fail.yaml", True, {}), -# ("on-failure.yaml", False, {}), -# ("parallel-fail.yaml", True, {}), -# ] +@pytest.mark.parametrize("example", list_python_examples()) +@pytest.mark.parametrize("context", contexts) +# @pytest.mark.no_cover +@pytest.mark.e2e +def test_yaml_examples(example, context): + print(f"Testing {example}...") + file, status, ignore_contexts = example + if context in ignore_contexts: + return -# def list_examples(): -# for example in examples: -# yield example + context = context() + example_file = f"examples/{file}.yaml" + parameters_file = "examples/common/initial_parameters.yaml" + with context: + try: + execute(pipeline_file=example_file, parameters_file=parameters_file) + except exceptions.ExecutionFailedError: + if not status: + raise -# @pytest.mark.parametrize("example", list_examples()) -# @pytest.mark.no_cover -# @pytest.mark.e2e -# def test_yaml_examples(example): -# print(f"Testing {example}...") -# examples_path = Path("examples") -# file_path, status, kwargs = example -# try: -# full_file_path = examples_path / file_path -# configuration_file = kwargs.pop("configuration_file", "") -# execute(configuration_file=configuration_file, pipeline_file=str(full_file_path.resolve()), **kwargs) -# except exceptions.ExecutionFailedError: -# if not status: -# raise - - -# @pytest.mark.parametrize("example", list_examples()) -# @pytest.mark.no_cover -# @pytest.mark.e2e -# def test_yaml_examples_argo(example): -# print(f"Testing {example}...") -# examples_path = Path("examples") -# file_path, status, kwargs = example -# try: -# full_file_path = examples_path / file_path -# kwargs.pop("configuration_file", "") -# configuration_file = "examples/configs/argo-config.yaml" -# execute(configuration_file=configuration_file, pipeline_file=str(full_file_path.resolve()), **kwargs) -# subprocess.run(["argo", "lint", "--offline", "argo-pipeline.yaml"], check=True) -# except exceptions.ExecutionFailedError: -# if not status: -# raise - - -# @pytest.mark.parametrize("example", list_examples()) -# @pytest.mark.no_cover -# @pytest.mark.e2e_container -# def test_yaml_examples_container(example): -# print(f"Testing {example}...") -# examples_path = Path("examples") -# file_path, status, kwargs = example -# try: -# full_file_path = examples_path / file_path -# kwargs.pop("configuration_file", "") -# configuration_file = "examples/configs/local-container.yaml" -# os.environ["runnable_VAR_default_docker_image"] = "runnable:3.8" -# execute(configuration_file=configuration_file, pipeline_file=str(full_file_path), **kwargs) -# except exceptions.ExecutionFailedError: -# if not status: -# raise - - -# @contextmanager -# def secrets_env_context(): -# os.environ["secret"] = "secret_value" -# os.environ["runnable_CONFIGURATION_FILE"] = "examples/configs/secrets-env-default.yaml" -# yield -# del os.environ["secret"] -# del os.environ["runnable_CONFIGURATION_FILE"] - - -# # function, success, context -# python_examples = [ -# ("catalog", False, None), -# ("catalog_simple", False, None), -# ("mocking", False, None), -# ("on_failure", False, None), -# ("parameters", False, None), -# ("parameters_simple", False, None), -# ("concepts.catalog", False, None), -# ("concepts.map", False, None), -# ("concepts.nesting", False, None), -# ("concepts.parallel", False, None), -# ("concepts.simple", False, None), -# ] - - -# def list_python_examples(): -# for example in python_examples: -# yield example - - -# @pytest.mark.parametrize("example", list_python_examples()) -# @pytest.mark.no_cover -# @pytest.mark.e2e -# def test_python_examples(example): -# print(f"Testing {example}...") - -# mod, status, context = example - -# if not context: -# context = nullcontext() -# else: -# context = context() - -# imported_module = importlib.import_module(f"examples.{mod}") -# f = getattr(imported_module, "main") -# try: -# with context: -# f() -# except exceptions.ExecutionFailedError: -# if not status: -# raise + +@pytest.mark.parametrize("example", list_python_examples()) +@pytest.mark.container +def test_python_examples_container(example): + print(f"Testing {example}...") + + mod, status, _ = example + context = container_context() + + imported_module = importlib.import_module(f"examples.{mod.replace('/', '.')}") + f = getattr(imported_module, "main") + with context: + try: + f() + except exceptions.ExecutionFailedError: + print("Example failed") + if not status: + raise + + +@pytest.mark.parametrize("example", list_python_examples()) +@pytest.mark.container +def test_yaml_examples_container(example): + print(f"Testing {example}...") + file, status, _ = example + + context = container_context() + + example_file = f"examples/{file}.yaml" + parameters_file = "examples/common/initial_parameters.yaml" + + with context: + try: + execute(pipeline_file=example_file, parameters_file=parameters_file) + except exceptions.ExecutionFailedError: + if not status: + raise diff --git a/tox.ini b/tox.ini index 32ca062d..e303c793 100644 --- a/tox.ini +++ b/tox.ini @@ -6,9 +6,11 @@ envlist = python3.9, mypy [testenv] whitelist_externals = poetry +setenv = + _PLOOMBER_TELEMETRY_DEBUG = false commands = poetry install -E docker -E notebook --without docs,binary,perf,tutorial - poetry run python -m pytest -m "not e2e_container" --cov=runnable/ tests/ + poetry run python -m pytest -m "not container" --cov=runnable/ tests/ [testenv:mypy] whitelist_externals = poetry