diff --git a/docs/concepts/catalog.md b/docs/concepts/catalog.md index 7b68d962..4d549f0a 100644 --- a/docs/concepts/catalog.md +++ b/docs/concepts/catalog.md @@ -1,5 +1,7 @@ [tasks](task.md) might also need to pass ```files``` between them. +## Concept + For example: ```python linenums="1" @@ -20,7 +22,7 @@ consume() ``` -## Runnable representation +## Syntax The same can be represented in ```runnable``` as [catalog](../reference.md/#catalog). diff --git a/docs/concepts/index.md b/docs/concepts/index.md index 18f9eeb0..c44aeaba 100644 --- a/docs/concepts/index.md +++ b/docs/concepts/index.md @@ -41,7 +41,7 @@ pipeline.execute() ``` -- ```runnable``` exposes the functions ```generate``` and ```consume``` as [tasks](task.md). +- ```runnable``` wraps the functions ```generate``` and ```consume``` as [tasks](task.md). - Tasks can [access and return](parameters.md/#access_returns) parameters. - Tasks can also share files between them using [catalog](catalog.md). - Tasks are stitched together as [pipeline](pipeline.md) diff --git a/docs/concepts/map.md b/docs/concepts/map.md index a98f96dd..aad92874 100644 --- a/docs/concepts/map.md +++ b/docs/concepts/map.md @@ -1,830 +1,186 @@ -```map``` nodes in runnable allows you to execute a sequence of nodes (i.e a pipeline) for all the items in a list. This is similar to -[Map state of AWS Step functions](https://docs.aws.amazon.com/step-functions/latest/dg/amazon-states-language-map-state.html) or -[loops in Argo workflows](https://argo-workflows.readthedocs.io/en/latest/walk-through/loops/). +```map``` nodes in runnable allows to execute a [pipeline](pipeline.md) for all the items in a list. -Conceptually, map node can be represented in python like below. +## Concept -```python -#technically it is async for -for i in iterable_parameter: - # a pipeline of steps - execute_first_step(i) - execute_second_step(i) - ... -``` - -You can control the parallelism by configuration of the executor. - -## Example +A relatable example from data science would be doing a grid search over hyper parameters where the training pipeline needs +to run on every hyper parameter. -Below is an example of processing a inventory of files (50) in parallel batches of 10 files per batch. -The ```stride``` parameter controls the chunk size and every batch is given the start index -of the files to process. -=== "visualization" - - The step "chunk files" identifies the number of files to process and computes the start index of every - batch of files to process for a chunk size of 10, the stride. - - "Process Chunk" pipelines are then triggered in parallel to process the chunk of files between ```start index``` - and ```start index + stride``` - - ```mermaid +```mermaid flowchart TD - chunkify([Chunk files]):::green + gridSearch([Grid Search]):::green success([Success]):::green - subgraph one[Process Chunk] + subgraph one[Parameter 1] process_chunk1([Process Chunk]):::yellow success_chunk1([Success]):::yellow process_chunk1 --> success_chunk1 end - subgraph two[Process Chunk] + subgraph two[Parameter ...] process_chunk2([Process Chunk]):::yellow success_chunk2([Success]):::yellow process_chunk2 --> success_chunk2 end - subgraph three[Process Chunk] + subgraph three[Parameter n] process_chunk3([Process Chunk]):::yellow success_chunk3([Success]):::yellow process_chunk3 --> success_chunk3 end - subgraph four[Process Chunk] - process_chunk4([Process Chunk]):::yellow - success_chunk4([Success]):::yellow + reduce([Reduce]):::green - process_chunk4 --> success_chunk4 - end - subgraph five[Process Chunk] - process_chunk5([Process Chunk]):::yellow - success_chunk5([Success]):::yellow + gridSearch --> one --> reduce + gridSearch --> two --> reduce + gridSearch --> three --> reduce + reduce --> success - process_chunk5 --> success_chunk5 - end + classDef yellow stroke:#FFFF00 + classDef green stroke:#0f0 +``` +The ```reduce``` step is part of the ```map``` state definition. - chunkify -- (stride=10, start_index=0)--> one --> success - chunkify -- (stride=10, start_index=10)--> two --> success - chunkify -- (stride=10, start_index=20)--> three --> success - chunkify -- (stride=10, start_index=30)--> four --> success - chunkify -- (stride=10, start_index=40)--> five --> success +[API Documentation](../reference.md/#map) - classDef yellow stroke:#FFFF00 - classDef green stroke:#0f0 - ``` +Conceptually, map node can be represented in python as: -=== "python sdk" +```python +for i in iterable_parameter: + # a pipeline of steps + x = execute_first_step(i) + score = execute_second_step(i, x) - The ```start_index``` argument for the function ```process_chunk``` is dynamically set by iterating - over ```chunks```. +reduce(score) # could be as simple as a list of scores indexed by i or a custom reducer function/lambda +... +``` - If the argument ```start_index``` is not provided, you can still access the current - value by ```runnable_MAP_VARIABLE``` environment variable. - The environment variable ```runnable_MAP_VARIABLE``` is a dictionary with keys as - ```iterate_as``` - This instruction is set while defining the map node. +## Syntax - You can run this example by ```python examples/concepts/map.py``` +The ```runnable``` syntax for the above example: - ```python linenums="1" hl_lines="30-31 35 68-74" - --8<-- "examples/concepts/map.py" - ``` +=== "sdk" -=== "pipeline in yaml" + ```python linenums="1" + from runnable import PythonTask, Map, Pipeline - The ```start_index``` argument for the function ```process_chunk``` is dynamically set by iterating - over ```chunks```. + def execute_first_step(i): # (1) + ... - This instruction is set while defining the map node. - Note that the ```branch``` of the map node has a similar schema of the pipeline. + return x # (2) - You can run this example by ```runnable execute examples/concepts/map.yaml``` + def execute_second_step(i, x): # (3) + ... - ```yaml linenums="1" hl_lines="23-26" - --8<-- "examples/concepts/map.yaml" - ``` + def get_iterable_branch(): # (4) + first_step_task = PythonTask(name="execute_first_step", + function="execute_first_step", + returns=["x"]) + + second_step_task = PythonTask(name="execute_second_step", + function="execute_second_step", + terminate_with_success=True) -=== "pipeline with shell tasks" + pipeline = Pipeline(steps=[first_step_task,second_step_task]) - The task ```chunk files``` sets the parameters ```stride``` and ```chunks``` similar to the python - functions. + def main(): + generate_task = PythonTask(name="generate_task", + function="generate", + returns=["iterable_parameter"]) # (5) - The map branch "iterate and execute" iterates over chunks and exposes the current start_index of - as environment variable ```runnable_MAP_VARIABLE```. + iterate_task = Map(name="iterate", + branch=get_iterable_branch(), + iterate_on="iterable_parameter", # (6) + iterate_as="i", + terminate_with_success=True) # (7) - The environment variable ```runnable_MAP_VARIABLE``` is a json string with keys of the ```iterate_as```. + pipeline = Pipeline(steps=[generate_task, iterate_task]) - You can run this example by ```runnable execute examples/concepts/map_shell.yaml``` + pipeline.execute() + return pipeline + + if __name__ == "__main__": + main() - ```yaml linenums="1" hl_lines="26-27 29-32" - --8<-- "examples/concepts/map_shell.yaml" ``` -=== "Run log" - - The step log of the ```iterate and execute``` has branches for every dynamically executed branch - of the format ```iterate and execute.```. - - ```json linenums="1" - { - "run_id": "simple-turing-0153", - "dag_hash": "", - "use_cached": false, - "tag": "", - "original_run_id": "", - "status": "SUCCESS", - "steps": { - "chunk files": { - "name": "chunk files", - "internal_name": "chunk files", - "status": "SUCCESS", - "step_type": "task", - "message": "", - "mock": false, - "code_identities": [ - { - "code_identifier": "30ca73bb01ac45db08b1ca75460029da142b53fa", - "code_identifier_type": "git", - "code_identifier_dependable": true, - "code_identifier_url": "https://github.com/AstraZeneca/runnable-core.git", - "code_identifier_message": "" - } - ], - "attempts": [ - { - "attempt_number": 1, - "start_time": "2024-01-18 01:54:00.038461", - "end_time": "2024-01-18 01:54:00.045343", - "duration": "0:00:00.006882", - "status": "SUCCESS", - "message": "", - "parameters": {} - } - ], - "user_defined_metrics": {}, - "branches": {}, - "data_catalog": [ - { - "name": "chunk_files.execution.log", - "data_hash": "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855", - "catalog_relative_path": "simple-turing-0153/chunk_files.execution.log", - "catalog_handler_location": ".catalog", - "stage": "put" - } - ] - }, - "iterate and execute": { - "name": "iterate and execute", - "internal_name": "iterate and execute", - "status": "SUCCESS", - "step_type": "map", - "message": "", - "mock": false, - "code_identities": [ - { - "code_identifier": "30ca73bb01ac45db08b1ca75460029da142b53fa", - "code_identifier_type": "git", - "code_identifier_dependable": true, - "code_identifier_url": "https://github.com/AstraZeneca/runnable-core.git", - "code_identifier_message": "" - } - ], - "attempts": [], - "user_defined_metrics": {}, - "branches": { - "iterate and execute.0": { - "internal_name": "iterate and execute.0", - "status": "SUCCESS", - "steps": { - "iterate and execute.0.execute": { - "name": "execute", - "internal_name": "iterate and execute.0.execute", - "status": "SUCCESS", - "step_type": "task", - "message": "", - "mock": false, - "code_identities": [ - { - "code_identifier": "30ca73bb01ac45db08b1ca75460029da142b53fa", - "code_identifier_type": "git", - "code_identifier_dependable": true, - "code_identifier_url": "https://github.com/AstraZeneca/runnable-core.git", - "code_identifier_message": "" - } - ], - "attempts": [ - { - "attempt_number": 1, - "start_time": "2024-01-18 01:54:00.221240", - "end_time": "2024-01-18 01:54:00.222560", - "duration": "0:00:00.001320", - "status": "SUCCESS", - "message": "", - "parameters": { - "chunks": [ - 0, - 10, - 20, - 30, - 40 - ], - "stride": 10 - } - } - ], - "user_defined_metrics": {}, - "branches": {}, - "data_catalog": [ - { - "name": "execute.execution.log_0", - "data_hash": "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855", - "catalog_relative_path": "simple-turing-0153/execute.execution.log_0", - "catalog_handler_location": ".catalog", - "stage": "put" - } - ] - }, - "iterate and execute.0.success": { - "name": "success", - "internal_name": "iterate and execute.0.success", - "status": "SUCCESS", - "step_type": "success", - "message": "", - "mock": false, - "code_identities": [ - { - "code_identifier": "30ca73bb01ac45db08b1ca75460029da142b53fa", - "code_identifier_type": "git", - "code_identifier_dependable": true, - "code_identifier_url": "https://github.com/AstraZeneca/runnable-core.git", - "code_identifier_message": "" - } - ], - "attempts": [ - { - "attempt_number": 1, - "start_time": "2024-01-18 01:54:00.301335", - "end_time": "2024-01-18 01:54:00.302161", - "duration": "0:00:00.000826", - "status": "SUCCESS", - "message": "", - "parameters": { - "chunks": [ - 0, - 10, - 20, - 30, - 40 - ], - "stride": 10 - } - } - ], - "user_defined_metrics": {}, - "branches": {}, - "data_catalog": [] - } - } - }, - "iterate and execute.10": { - "internal_name": "iterate and execute.10", - "status": "SUCCESS", - "steps": { - "iterate and execute.10.execute": { - "name": "execute", - "internal_name": "iterate and execute.10.execute", - "status": "SUCCESS", - "step_type": "task", - "message": "", - "mock": false, - "code_identities": [ - { - "code_identifier": "30ca73bb01ac45db08b1ca75460029da142b53fa", - "code_identifier_type": "git", - "code_identifier_dependable": true, - "code_identifier_url": "https://github.com/AstraZeneca/runnable-core.git", - "code_identifier_message": "" - } - ], - "attempts": [ - { - "attempt_number": 1, - "start_time": "2024-01-18 01:54:00.396194", - "end_time": "2024-01-18 01:54:00.397462", - "duration": "0:00:00.001268", - "status": "SUCCESS", - "message": "", - "parameters": { - "chunks": [ - 0, - 10, - 20, - 30, - 40 - ], - "stride": 10 - } - } - ], - "user_defined_metrics": {}, - "branches": {}, - "data_catalog": [ - { - "name": "execute.execution.log_10", - "data_hash": "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855", - "catalog_relative_path": "simple-turing-0153/execute.execution.log_10", - "catalog_handler_location": ".catalog", - "stage": "put" - } - ] - }, - "iterate and execute.10.success": { - "name": "success", - "internal_name": "iterate and execute.10.success", - "status": "SUCCESS", - "step_type": "success", - "message": "", - "mock": false, - "code_identities": [ - { - "code_identifier": "30ca73bb01ac45db08b1ca75460029da142b53fa", - "code_identifier_type": "git", - "code_identifier_dependable": true, - "code_identifier_url": "https://github.com/AstraZeneca/runnable-core.git", - "code_identifier_message": "" - } - ], - "attempts": [ - { - "attempt_number": 1, - "start_time": "2024-01-18 01:54:00.469211", - "end_time": "2024-01-18 01:54:00.470266", - "duration": "0:00:00.001055", - "status": "SUCCESS", - "message": "", - "parameters": { - "chunks": [ - 0, - 10, - 20, - 30, - 40 - ], - "stride": 10 - } - } - ], - "user_defined_metrics": {}, - "branches": {}, - "data_catalog": [] - } - } - }, - "iterate and execute.20": { - "internal_name": "iterate and execute.20", - "status": "SUCCESS", - "steps": { - "iterate and execute.20.execute": { - "name": "execute", - "internal_name": "iterate and execute.20.execute", - "status": "SUCCESS", - "step_type": "task", - "message": "", - "mock": false, - "code_identities": [ - { - "code_identifier": "30ca73bb01ac45db08b1ca75460029da142b53fa", - "code_identifier_type": "git", - "code_identifier_dependable": true, - "code_identifier_url": "https://github.com/AstraZeneca/runnable-core.git", - "code_identifier_message": "" - } - ], - "attempts": [ - { - "attempt_number": 1, - "start_time": "2024-01-18 01:54:00.558053", - "end_time": "2024-01-18 01:54:00.561472", - "duration": "0:00:00.003419", - "status": "SUCCESS", - "message": "", - "parameters": { - "chunks": [ - 0, - 10, - 20, - 30, - 40 - ], - "stride": 10 - } - } - ], - "user_defined_metrics": {}, - "branches": {}, - "data_catalog": [ - { - "name": "execute.execution.log_20", - "data_hash": "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855", - "catalog_relative_path": "simple-turing-0153/execute.execution.log_20", - "catalog_handler_location": ".catalog", - "stage": "put" - } - ] - }, - "iterate and execute.20.success": { - "name": "success", - "internal_name": "iterate and execute.20.success", - "status": "SUCCESS", - "step_type": "success", - "message": "", - "mock": false, - "code_identities": [ - { - "code_identifier": "30ca73bb01ac45db08b1ca75460029da142b53fa", - "code_identifier_type": "git", - "code_identifier_dependable": true, - "code_identifier_url": "https://github.com/AstraZeneca/runnable-core.git", - "code_identifier_message": "" - } - ], - "attempts": [ - { - "attempt_number": 1, - "start_time": "2024-01-18 01:54:00.660092", - "end_time": "2024-01-18 01:54:00.661215", - "duration": "0:00:00.001123", - "status": "SUCCESS", - "message": "", - "parameters": { - "chunks": [ - 0, - 10, - 20, - 30, - 40 - ], - "stride": 10 - } - } - ], - "user_defined_metrics": {}, - "branches": {}, - "data_catalog": [] - } - } - }, - "iterate and execute.30": { - "internal_name": "iterate and execute.30", - "status": "SUCCESS", - "steps": { - "iterate and execute.30.execute": { - "name": "execute", - "internal_name": "iterate and execute.30.execute", - "status": "SUCCESS", - "step_type": "task", - "message": "", - "mock": false, - "code_identities": [ - { - "code_identifier": "30ca73bb01ac45db08b1ca75460029da142b53fa", - "code_identifier_type": "git", - "code_identifier_dependable": true, - "code_identifier_url": "https://github.com/AstraZeneca/runnable-core.git", - "code_identifier_message": "" - } - ], - "attempts": [ - { - "attempt_number": 1, - "start_time": "2024-01-18 01:54:00.765689", - "end_time": "2024-01-18 01:54:00.766705", - "duration": "0:00:00.001016", - "status": "SUCCESS", - "message": "", - "parameters": { - "chunks": [ - 0, - 10, - 20, - 30, - 40 - ], - "stride": 10 - } - } - ], - "user_defined_metrics": {}, - "branches": {}, - "data_catalog": [ - { - "name": "execute.execution.log_30", - "data_hash": "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855", - "catalog_relative_path": "simple-turing-0153/execute.execution.log_30", - "catalog_handler_location": ".catalog", - "stage": "put" - } - ] - }, - "iterate and execute.30.success": { - "name": "success", - "internal_name": "iterate and execute.30.success", - "status": "SUCCESS", - "step_type": "success", - "message": "", - "mock": false, - "code_identities": [ - { - "code_identifier": "30ca73bb01ac45db08b1ca75460029da142b53fa", - "code_identifier_type": "git", - "code_identifier_dependable": true, - "code_identifier_url": "https://github.com/AstraZeneca/runnable-core.git", - "code_identifier_message": "" - } - ], - "attempts": [ - { - "attempt_number": 1, - "start_time": "2024-01-18 01:54:00.851112", - "end_time": "2024-01-18 01:54:00.852454", - "duration": "0:00:00.001342", - "status": "SUCCESS", - "message": "", - "parameters": { - "chunks": [ - 0, - 10, - 20, - 30, - 40 - ], - "stride": 10 - } - } - ], - "user_defined_metrics": {}, - "branches": {}, - "data_catalog": [] - } - } - }, - "iterate and execute.40": { - "internal_name": "iterate and execute.40", - "status": "SUCCESS", - "steps": { - "iterate and execute.40.execute": { - "name": "execute", - "internal_name": "iterate and execute.40.execute", - "status": "SUCCESS", - "step_type": "task", - "message": "", - "mock": false, - "code_identities": [ - { - "code_identifier": "30ca73bb01ac45db08b1ca75460029da142b53fa", - "code_identifier_type": "git", - "code_identifier_dependable": true, - "code_identifier_url": "https://github.com/AstraZeneca/runnable-core.git", - "code_identifier_message": "" - } - ], - "attempts": [ - { - "attempt_number": 1, - "start_time": "2024-01-18 01:54:00.950911", - "end_time": "2024-01-18 01:54:00.952000", - "duration": "0:00:00.001089", - "status": "SUCCESS", - "message": "", - "parameters": { - "chunks": [ - 0, - 10, - 20, - 30, - 40 - ], - "stride": 10 - } - } - ], - "user_defined_metrics": {}, - "branches": {}, - "data_catalog": [ - { - "name": "execute.execution.log_40", - "data_hash": "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855", - "catalog_relative_path": "simple-turing-0153/execute.execution.log_40", - "catalog_handler_location": ".catalog", - "stage": "put" - } - ] - }, - "iterate and execute.40.success": { - "name": "success", - "internal_name": "iterate and execute.40.success", - "status": "SUCCESS", - "step_type": "success", - "message": "", - "mock": false, - "code_identities": [ - { - "code_identifier": "30ca73bb01ac45db08b1ca75460029da142b53fa", - "code_identifier_type": "git", - "code_identifier_dependable": true, - "code_identifier_url": "https://github.com/AstraZeneca/runnable-core.git", - "code_identifier_message": "" - } - ], - "attempts": [ - { - "attempt_number": 1, - "start_time": "2024-01-18 01:54:01.032790", - "end_time": "2024-01-18 01:54:01.034254", - "duration": "0:00:00.001464", - "status": "SUCCESS", - "message": "", - "parameters": { - "chunks": [ - 0, - 10, - 20, - 30, - 40 - ], - "stride": 10 - } - } - ], - "user_defined_metrics": {}, - "branches": {}, - "data_catalog": [] - } - } - } - }, - "data_catalog": [] - }, - "success": { - "name": "success", - "internal_name": "success", - "status": "SUCCESS", - "step_type": "success", - "message": "", - "mock": false, - "code_identities": [ - { - "code_identifier": "30ca73bb01ac45db08b1ca75460029da142b53fa", - "code_identifier_type": "git", - "code_identifier_dependable": true, - "code_identifier_url": "https://github.com/AstraZeneca/runnable-core.git", - "code_identifier_message": "" - } - ], - "attempts": [ - { - "attempt_number": 1, - "start_time": "2024-01-18 01:54:01.141928", - "end_time": "2024-01-18 01:54:01.142928", - "duration": "0:00:00.001000", - "status": "SUCCESS", - "message": "", - "parameters": { - "chunks": [ - 0, - 10, - 20, - 30, - 40 - ], - "stride": 10 - } - } - ], - "user_defined_metrics": {}, - "branches": {}, - "data_catalog": [] - } - }, - "parameters": { - "chunks": [ - 0, - 10, - 20, - 30, - 40 - ], - "stride": 10 - }, - "run_config": { - "executor": { - "service_name": "local", - "service_type": "executor", - "enable_parallel": false, - "placeholders": {} - }, - "run_log_store": { - "service_name": "file-system", - "service_type": "run_log_store" - }, - "secrets_handler": { - "service_name": "do-nothing", - "service_type": "secrets" - }, - "catalog_handler": { - "service_name": "file-system", - "service_type": "catalog" - }, - "experiment_tracker": { - "service_name": "do-nothing", - "service_type": "experiment_tracker" - }, - "pipeline_file": "", - "parameters_file": "", - "configuration_file": "examples/configs/fs-catalog-run_log.yaml", - "tag": "", - "run_id": "simple-turing-0153", - "variables": {}, - "use_cached": false, - "original_run_id": "", - "dag": { - "start_at": "chunk files", - "name": "", - "description": "", - "steps": { - "chunk files": { - "type": "task", - "name": "chunk files", - "next": "iterate and execute", - "on_failure": "", - "executor_config": {}, - "catalog": null, - "max_attempts": 1, - "command": "examples.concepts.map.chunk_files", - "node_name": "chunk files" - }, - "iterate and execute": { - "type": "map", - "name": "iterate and execute", - "is_composite": true, - "next": "success", - "on_failure": "", - "executor_config": {}, - "iterate_on": "chunks", - "iterate_as": "start_index", - "branch": { - "start_at": "execute", - "name": "", - "description": "", - "steps": { - "execute": { - "type": "task", - "name": "execute", - "next": "success", - "on_failure": "", - "executor_config": {}, - "catalog": null, - "max_attempts": 1, - "command": "examples.concepts.map.process_chunk", - "node_name": "execute" - }, - "success": { - "type": "success", - "name": "success" - }, - "fail": { - "type": "fail", - "name": "fail" - } - } - } - }, - "success": { - "type": "success", - "name": "success" - }, - "fail": { - "type": "fail", - "name": "fail" - } - } - }, - "dag_hash": "", - "execution_plan": "chained" - } - } + 1. Takes in an input parameter ```i```, the current value of the iteration. + 2. returns a parameter ```x```. + 3. ```i``` is the current value of iteration, ```x``` is the return parameter of function call at iteration ```i```. + 4. returns a ```pipeline``` whose tasks are dependent on an iterable ```i``` + 5. returns the parameter ```iterable_parameter```. + 6. loop over ```iterable_parameter``` executing ```iterable_branch``` over each value of ```i```. + 7. Present ```i``` as input argument to all tasks of ```iterable_branch```. + + +=== "yaml" + + ```yaml linenums="1" + branch: &branch # (1) + start_at: execute_first_step + steps: + execute_first_step: # (2) + type: task + command: execute_first_step + next: execute_second_step + returns: + - x # (3) + execute_second_step: + type: task + command: execute_second_step # (4) + next: success + + + dag: + start_at: generate_task + steps: + generate_task: + type: task + command: generate + returns: + - iterable_parameter # (5) + iterate_task: + type: map + branch: *branch # (6) + iterate_on: iterable_parameter # (7) + iterate_as: i # (8) + next: success ``` + 1. The pipeline to iterate over an iterable parameter + 2. The ```task``` expects ```i```, the current value of iteration. + 3. The ```task``` returns ```x```. + 4. The ```task``` expects ```i```, the current value of iteration and ```x``` at the current iteration. + 5. returns a iterable, ```iterable_parameter```. + 6. the branch to iterate over + 7. the parameter to iterate on, returned by a task ```generate_task```. + 8. present the current value of iteration as ```i``` to all the tasks of the branch. + + +## Reduce + +### Default behavior + +The [returns](parameters.md/#access_returns) of the tasks of the iterable branch are reduced to a list indexed +by the order of ```iterable```. In the above example, there would be ```parameter``` available for downstream steps of +```iterate_task``` that is a list of all ```x```s observed during the iteration. + +For clarity, the default reducer is: ```lambda *x: list(x) # returns a list of the args``` + +### Custom reduce + +The ```map``` state also accepts a argument ```reducer``` which could be a ```lambda``` or ```function``` that +accepts ```*args``` (a non-keyword variable length argument list) and returns a reduced value. +The downstream steps of ```iterate_task``` would use the reduced value. + ## Traversal @@ -835,16 +191,37 @@ redirected to ```success``` if that is the desired behavior. The map step is considered successful only if all the branches of the step have terminated successfully. -## Parameters -All the tasks defined in the branches of the map pipeline can -[access to parameters and data as usual](../concepts/task.md). +## Complete example + +=== "Default reducer" + + Uses the default reducer + + === "sdk" + + ```python linenums="1" + --8<-- "examples/07-map/map.py" + ``` + + === "yaml" + + ```yaml linenums="1" + --8<-- "examples/07-map/map.yaml" + ``` + +=== "Custom reducer" + + Differs from default reducer to a ```lambda *x: max(x)``` reducer. + === "sdk" -!!! warning + ```python linenums="1" + --8<-- "examples/07-map/custom_reducer.py" + ``` - The parameters can be updated by all the tasks and the last task to execute overwrites - the previous changes. + === "yaml" - Since the order of execution is not guaranteed, its best to avoid mutating the same parameters in - the steps belonging to map step. + ```yaml linenums="1" + --8<-- "examples/07-map/custom_reducer.yaml" + ``` diff --git a/docs/concepts/parallel.md b/docs/concepts/parallel.md index 1c2f882c..0c9afc36 100644 --- a/docs/concepts/parallel.md +++ b/docs/concepts/parallel.md @@ -1,558 +1,115 @@ -Parallel nodes in runnable allows you to run multiple pipelines in parallel and use your compute resources efficiently. +```parallel``` node in runnable embed multiple ```pipeline``` as branches. -## Example +[API Documentation](../reference.md/#parallel) -!!! note "Only stubs?" +--- - All the steps in the below example are ```stubbed``` for convenience. The functionality is similar - even if the steps are execution units like ```tasks``` or any other nodes. +## Concept - We support deeply [nested steps](../concepts/nesting.md). For example, a step in the parallel branch can be a ```map``` which internally - loops over a ```dag``` and so on. Though this functionality is useful, it can be difficult to debug and - understand in large code bases. +The below diagram shows training a baseline model and CNN model in parallel and picking the best +model for inference. -Below is a stubbed out example of a pipeline that trains two models in parallel and create an ensemble model to -do the inference. The models XGBoost and Random Forest (RF model) are trained in parallel and training of the -ensemble model happens only after both models are (successfully) trained. - -=== "Visualisation" - - In the below visualisation, the green lined steps happen in sequence and wait for the previous step to - successfully complete. - - The branches lined in yellow run in parallel to each other but sequential within the branch. - - - - ```mermaid - flowchart TD +```mermaid + flowchart LR getFeatures([Get Features]):::green trainStep(Train Models):::green - ensembleModel([Ensemble Modelling]):::green + chooseBest([Evaluate Models]):::green inference([Run Inference]):::green success([Success]):::green - prepareXG([Prepare for XGBoost]):::yellow - trainXG([Train XGBoost]):::yellow - successXG([XGBoost success]):::yellow - prepareXG --> trainXG --> successXG + prepareBase([Prepare for baseline model]):::yellow + trainBase([Train XGBoost]):::yellow + successBase([success]):::yellow + prepareBase --> trainBase --> successBase - trainRF([Train RF model]):::yellow - successRF([RF Model success]):::yellow - trainRF --> successRF + trainCNN([Train CNN model]):::yellow + successCNN([CNNModel success]):::yellow + trainCNN --> successCNN getFeatures --> trainStep - trainStep --> prepareXG - trainStep --> trainRF - successXG --> ensembleModel - successRF --> ensembleModel - ensembleModel --> inference + trainStep --> prepareBase + trainStep --> trainCNN + successBase --> chooseBest + successCNN --> chooseBest + chooseBest --> inference inference --> success classDef yellow stroke:#FFFF00 classDef green stroke:#0f0 +``` - ``` - -=== "Pipeline in yaml" +The branch for training the ```baseline``` and ```cnn``` are pipelines themselves are defined as any other [pipeline](pipeline.md). - ```yaml linenums="1" - --8<-- "examples/concepts/parallel.yaml" - ``` +The step ```Train Models``` is a parallel step that has the ```branches``` as the individual pipelines. -=== "python sdk" +## Syntax - You can run this example by: ```python examples/concepts/parallel.py``` +=== "sdk" ```python linenums="1" - --8<-- "examples/concepts/parallel.py" + from runnable import Pipeline, Parallel + def get_baseline_pipeline(): + ... + pipeline = Pipeline(...) + return pipeline + + def get_cnn_pipeline(): + ... + pipeline = Pipeline(...) + return pipeline + + def main(): + train_models = Parallel(name="train models", + branches={'baseline': get_baseline_pipeline, 'cnn': get_cnn_pipeline()}, + terminate_with_success=True) + pipeline = Pipeline(steps=[train_models]) + + pipeline.execute + + return pipeline ``` -=== "Run log" - - The step log for the parallel branch ```Train models``` has branches which have similar - structure to a run log. - - ```json linenums="1" - { - "run_id": "savory-pike-0201", - "dag_hash": "", - "use_cached": false, - "tag": "", - "original_run_id": "", - "status": "SUCCESS", - "steps": { - "Get Features": { - "name": "Get Features", - "internal_name": "Get Features", - "status": "SUCCESS", - "step_type": "stub", - "message": "", - "mock": false, - "code_identities": [ - { - "code_identifier": "f0a2719001de9be30c27069933e4b4a64a065e2b", - "code_identifier_type": "git", - "code_identifier_dependable": true, - "code_identifier_url": "https://github.com/AstraZeneca/runnable-core.git", - "code_identifier_message": "" - } - ], - "attempts": [ - { - "attempt_number": 1, - "start_time": "2024-01-18 02:01:10.978646", - "end_time": "2024-01-18 02:01:10.978665", - "duration": "0:00:00.000019", - "status": "SUCCESS", - "message": "", - "parameters": {} - } - ], - "user_defined_metrics": {}, - "branches": {}, - "data_catalog": [] - }, - "Train Models": { - "name": "Train Models", - "internal_name": "Train Models", - "status": "SUCCESS", - "step_type": "parallel", - "message": "", - "mock": false, - "code_identities": [ - { - "code_identifier": "f0a2719001de9be30c27069933e4b4a64a065e2b", - "code_identifier_type": "git", - "code_identifier_dependable": true, - "code_identifier_url": "https://github.com/AstraZeneca/runnable-core.git", - "code_identifier_message": "" - } - ], - "attempts": [], - "user_defined_metrics": {}, - "branches": { - "Train Models.XGBoost": { - "internal_name": "Train Models.XGBoost", - "status": "SUCCESS", - "steps": { - "Train Models.XGBoost.Prepare for XGBoost": { - "name": "Prepare for XGBoost", - "internal_name": "Train Models.XGBoost.Prepare for XGBoost", - "status": "SUCCESS", - "step_type": "stub", - "message": "", - "mock": false, - "code_identities": [ - { - "code_identifier": "f0a2719001de9be30c27069933e4b4a64a065e2b", - "code_identifier_type": "git", - "code_identifier_dependable": true, - "code_identifier_url": "https://github.com/AstraZeneca/runnable-core.git", - "code_identifier_message": "" - } - ], - "attempts": [ - { - "attempt_number": 1, - "start_time": "2024-01-18 02:01:11.132822", - "end_time": "2024-01-18 02:01:11.132840", - "duration": "0:00:00.000018", - "status": "SUCCESS", - "message": "", - "parameters": {} - } - ], - "user_defined_metrics": {}, - "branches": {}, - "data_catalog": [] - }, - "Train Models.XGBoost.Train XGBoost": { - "name": "Train XGBoost", - "internal_name": "Train Models.XGBoost.Train XGBoost", - "status": "SUCCESS", - "step_type": "stub", - "message": "", - "mock": false, - "code_identities": [ - { - "code_identifier": "f0a2719001de9be30c27069933e4b4a64a065e2b", - "code_identifier_type": "git", - "code_identifier_dependable": true, - "code_identifier_url": "https://github.com/AstraZeneca/runnable-core.git", - "code_identifier_message": "" - } - ], - "attempts": [ - { - "attempt_number": 1, - "start_time": "2024-01-18 02:01:11.216418", - "end_time": "2024-01-18 02:01:11.216430", - "duration": "0:00:00.000012", - "status": "SUCCESS", - "message": "", - "parameters": {} - } - ], - "user_defined_metrics": {}, - "branches": {}, - "data_catalog": [] - }, - "Train Models.XGBoost.success": { - "name": "success", - "internal_name": "Train Models.XGBoost.success", - "status": "SUCCESS", - "step_type": "success", - "message": "", - "mock": false, - "code_identities": [ - { - "code_identifier": "f0a2719001de9be30c27069933e4b4a64a065e2b", - "code_identifier_type": "git", - "code_identifier_dependable": true, - "code_identifier_url": "https://github.com/AstraZeneca/runnable-core.git", - "code_identifier_message": "" - } - ], - "attempts": [ - { - "attempt_number": 1, - "start_time": "2024-01-18 02:01:11.291222", - "end_time": "2024-01-18 02:01:11.292140", - "duration": "0:00:00.000918", - "status": "SUCCESS", - "message": "", - "parameters": {} - } - ], - "user_defined_metrics": {}, - "branches": {}, - "data_catalog": [] - } - } - }, - "Train Models.RF Model": { - "internal_name": "Train Models.RF Model", - "status": "SUCCESS", - "steps": { - "Train Models.RF Model.Train RF": { - "name": "Train RF", - "internal_name": "Train Models.RF Model.Train RF", - "status": "SUCCESS", - "step_type": "stub", - "message": "", - "mock": false, - "code_identities": [ - { - "code_identifier": "f0a2719001de9be30c27069933e4b4a64a065e2b", - "code_identifier_type": "git", - "code_identifier_dependable": true, - "code_identifier_url": "https://github.com/AstraZeneca/runnable-core.git", - "code_identifier_message": "" - } - ], - "attempts": [ - { - "attempt_number": 1, - "start_time": "2024-01-18 02:01:11.379438", - "end_time": "2024-01-18 02:01:11.379453", - "duration": "0:00:00.000015", - "status": "SUCCESS", - "message": "", - "parameters": {} - } - ], - "user_defined_metrics": {}, - "branches": {}, - "data_catalog": [] - }, - "Train Models.RF Model.success": { - "name": "success", - "internal_name": "Train Models.RF Model.success", - "status": "SUCCESS", - "step_type": "success", - "message": "", - "mock": false, - "code_identities": [ - { - "code_identifier": "f0a2719001de9be30c27069933e4b4a64a065e2b", - "code_identifier_type": "git", - "code_identifier_dependable": true, - "code_identifier_url": "https://github.com/AstraZeneca/runnable-core.git", - "code_identifier_message": "" - } - ], - "attempts": [ - { - "attempt_number": 1, - "start_time": "2024-01-18 02:01:11.458716", - "end_time": "2024-01-18 02:01:11.459695", - "duration": "0:00:00.000979", - "status": "SUCCESS", - "message": "", - "parameters": {} - } - ], - "user_defined_metrics": {}, - "branches": {}, - "data_catalog": [] - } - } - } - }, - "data_catalog": [] - }, - "Ensemble Modelling": { - "name": "Ensemble Modelling", - "internal_name": "Ensemble Modelling", - "status": "SUCCESS", - "step_type": "stub", - "message": "", - "mock": false, - "code_identities": [ - { - "code_identifier": "f0a2719001de9be30c27069933e4b4a64a065e2b", - "code_identifier_type": "git", - "code_identifier_dependable": true, - "code_identifier_url": "https://github.com/AstraZeneca/runnable-core.git", - "code_identifier_message": "" - } - ], - "attempts": [ - { - "attempt_number": 1, - "start_time": "2024-01-18 02:01:11.568072", - "end_time": "2024-01-18 02:01:11.568085", - "duration": "0:00:00.000013", - "status": "SUCCESS", - "message": "", - "parameters": {} - } - ], - "user_defined_metrics": {}, - "branches": {}, - "data_catalog": [] - }, - "Run Inference": { - "name": "Run Inference", - "internal_name": "Run Inference", - "status": "SUCCESS", - "step_type": "stub", - "message": "", - "mock": false, - "code_identities": [ - { - "code_identifier": "f0a2719001de9be30c27069933e4b4a64a065e2b", - "code_identifier_type": "git", - "code_identifier_dependable": true, - "code_identifier_url": "https://github.com/AstraZeneca/runnable-core.git", - "code_identifier_message": "" - } - ], - "attempts": [ - { - "attempt_number": 1, - "start_time": "2024-01-18 02:01:11.650023", - "end_time": "2024-01-18 02:01:11.650037", - "duration": "0:00:00.000014", - "status": "SUCCESS", - "message": "", - "parameters": {} - } - ], - "user_defined_metrics": {}, - "branches": {}, - "data_catalog": [] - }, - "success": { - "name": "success", - "internal_name": "success", - "status": "SUCCESS", - "step_type": "success", - "message": "", - "mock": false, - "code_identities": [ - { - "code_identifier": "f0a2719001de9be30c27069933e4b4a64a065e2b", - "code_identifier_type": "git", - "code_identifier_dependable": true, - "code_identifier_url": "https://github.com/AstraZeneca/runnable-core.git", - "code_identifier_message": "" - } - ], - "attempts": [ - { - "attempt_number": 1, - "start_time": "2024-01-18 02:01:11.727802", - "end_time": "2024-01-18 02:01:11.728651", - "duration": "0:00:00.000849", - "status": "SUCCESS", - "message": "", - "parameters": {} - } - ], - "user_defined_metrics": {}, - "branches": {}, - "data_catalog": [] - } - }, - "parameters": {}, - "run_config": { - "executor": { - "service_name": "local", - "service_type": "executor", - "enable_parallel": false, - "placeholders": {} - }, - "run_log_store": { - "service_name": "file-system", - "service_type": "run_log_store" - }, - "secrets_handler": { - "service_name": "do-nothing", - "service_type": "secrets" - }, - "catalog_handler": { - "service_name": "file-system", - "service_type": "catalog" - }, - "experiment_tracker": { - "service_name": "do-nothing", - "service_type": "experiment_tracker" - }, - "pipeline_file": "", - "parameters_file": "", - "configuration_file": "examples/configs/fs-catalog-run_log.yaml", - "tag": "", - "run_id": "savory-pike-0201", - "variables": {}, - "use_cached": false, - "original_run_id": "", - "dag": { - "start_at": "Get Features", - "name": "", - "description": "", - "steps": { - "Get Features": { - "type": "stub", - "name": "Get Features", - "next": "Train Models", - "on_failure": "", - "executor_config": {}, - "catalog": null, - "max_attempts": 1 - }, - "Train Models": { - "type": "parallel", - "name": "Train Models", - "next": "Ensemble Modelling", - "on_failure": "", - "executor_config": {}, - "branches": { - "XGBoost": { - "start_at": "Prepare for XGBoost", - "name": "", - "description": "", - "steps": { - "Prepare for XGBoost": { - "type": "stub", - "name": "Prepare for XGBoost", - "next": "Train XGBoost", - "on_failure": "", - "executor_config": {}, - "catalog": null, - "max_attempts": 1 - }, - "Train XGBoost": { - "type": "stub", - "name": "Train XGBoost", - "next": "success", - "on_failure": "", - "executor_config": {}, - "catalog": null, - "max_attempts": 1 - }, - "success": { - "type": "success", - "name": "success" - }, - "fail": { - "type": "fail", - "name": "fail" - } - } - }, - "RF Model": { - "start_at": "Train RF", - "name": "", - "description": "", - "steps": { - "Train RF": { - "type": "stub", - "name": "Train RF", - "next": "success", - "on_failure": "", - "executor_config": {}, - "catalog": null, - "max_attempts": 1 - }, - "success": { - "type": "success", - "name": "success" - }, - "fail": { - "type": "fail", - "name": "fail" - } - } - } - } - }, - "Ensemble Modelling": { - "type": "stub", - "name": "Ensemble Modelling", - "next": "Run Inference", - "on_failure": "", - "executor_config": {}, - "catalog": null, - "max_attempts": 1 - }, - "Run Inference": { - "type": "stub", - "name": "Run Inference", - "next": "success", - "on_failure": "", - "executor_config": {}, - "catalog": null, - "max_attempts": 1 - }, - "success": { - "type": "success", - "name": "success" - }, - "fail": { - "type": "fail", - "name": "fail" - } - } - }, - "dag_hash": "", - "execution_plan": "chained" - } - } - ``` +=== "yaml" + ```yaml linenums="1" + branch: &baseline + start_at: prepare + steps: + ... + + branch: &cnn + start_at: train + steps: + ... + + dag: + description: | + This example demonstrates the use of the Parallel step. + + parallel step takes a mapping of branches which are pipelines themselves. + + start_at: parallel_step + steps: + parallel_step: + type: parallel + next: success + branches: + baseline: *baseline + cnn: *cnn + + ``` +!!! warning "Execution" -All pipelines, nested or parent, have the same structure as defined in -[pipeline definition](../concepts/pipeline.md). + The pipelines of the parallel branch should not execute during the definition of ```parallel``` step. + In case, you want to execute the individual branches in isolation, use a flag to control it. -The parent pipeline defines a step ```Train models``` which is a parallel step. -The branches, XGBoost and RF model, are pipelines themselves. + eg: the functions ```get_baseline``` and ```get_cnn``` can take a argument ```execute``` which is defaulted to True. + During the composition of ```parallel``` step, pass in execute as False. ## Traversal @@ -562,17 +119,16 @@ redirected to ```success``` if that is the desired behavior. The parallel step is considered successful only if all the branches of the step have terminated successfully. +## Complete example -## Parameters +=== "sdk" -All the tasks defined in the branches of the parallel pipeline can -[access to parameters and data as usual](../concepts/task.md). - - -!!! warning + ```python linenums="1" hl_lines="53-57" + --8<-- "examples/06-parallel/parallel.py" + ``` - The parameters can be updated by all the tasks and the last task to execute overwrites - the previous changes. +=== "yaml" - Since the order of execution is not guaranteed, its best to avoid mutating the same parameters in - the steps belonging to parallel step. + ```yaml linenums="1" hl_lines="40-45" + --8<-- "examples/06-parallel/parallel.yaml" + ``` diff --git a/docs/concepts/parameters.md b/docs/concepts/parameters.md index 36af9ec3..fb92c1b0 100644 --- a/docs/concepts/parameters.md +++ b/docs/concepts/parameters.md @@ -1,5 +1,7 @@ ```parameters``` are data that can be passed from one ```task``` to another. +## Concept + For example, in the below snippet, the parameters ```x``` and ```y``` are passed from ```generate``` to ```consume```. diff --git a/docs/concepts/pipeline.md b/docs/concepts/pipeline.md index 685582cc..b373ec78 100644 --- a/docs/concepts/pipeline.md +++ b/docs/concepts/pipeline.md @@ -15,6 +15,7 @@ A ```workflow``` is a sequence of ```steps``` to perform.
+## Concept A visual example of a workflow: @@ -57,6 +58,10 @@ and [stub](task.md/#stub). === "sdk" + - [x] The first step of the ```steps``` is the start of the workflow. + - [x] The order of execution follows the order of the tasks in the list. + - [x] The terminal nodes ```success``` and ```fail``` are added automatically. + ```python linenums="1" --8<-- "examples/02-sequential/traversal.py" ``` @@ -64,13 +69,17 @@ and [stub](task.md/#stub). 1. Start the pipeline. 2. The order of the steps is the execution order - - [x] The first step of the ```steps``` is the start of the workflow. - - [x] The order of execution follows the order of the tasks in the list. - - [x] The terminal nodes ```success``` and ```fail``` are added automatically. + + === "yaml" + - [x] The first step is the step corresponding to ```start_at``` + - [x] The mapping defined in the steps. + - [x] The ```next``` step after a successful execution of a ```step```. + - [x] ```success``` as ```next``` node implies successful execution of the pipeline. + ```yaml linenums="1" --8<-- "examples/02-sequential/traversal.yaml" ``` @@ -80,10 +89,7 @@ and [stub](task.md/#stub). 3. Add the success and fail nodes. - - [x] The first step is the step corresponding to ```start_at``` - - [x] The mapping defined in the steps. - - [x] The ```next``` step after a successful execution of a ```step```. - - [x] Needs explicit definition of ```success``` and ```fail``` nodes. +
@@ -93,8 +99,7 @@ and [stub](task.md/#stub). By default, any failure during the execution of step will traverse to ```fail``` node marking the execution as failed. -The ```fail``` node is implicitly added to the pipeline in python SDK while it -has to be stated in the yaml. +The ```fail``` node is implicitly added to the pipeline. This behavior can be over-ridden to follow a different path based on expected failures. diff --git a/docs/concepts/task.md b/docs/concepts/task.md index 6a64511b..441dbc6e 100644 --- a/docs/concepts/task.md +++ b/docs/concepts/task.md @@ -12,6 +12,8 @@ the python function/notebook/shell script/stubs. ## Python functions +Uses python functions as tasks. + [API Documentation](../reference.md/#pythontask) ### Example @@ -106,7 +108,7 @@ ecosystem while shell provides a interface to non-python executables. === "yaml" - ```yaml linenums="1" hl_lines="19-23" + ```yaml linenums="1" hl_lines="16-23" --8<-- "examples/01-tasks/scripts.yaml" ``` diff --git a/docs/reference.md b/docs/reference.md index 0cd9cf34..0104737c 100644 --- a/docs/reference.md +++ b/docs/reference.md @@ -1,44 +1,88 @@ -## Catalog +## PythonTask === "sdk" - ::: runnable.Catalog + ::: runnable.PythonTask options: show_root_heading: true show_bases: false + show_docstring_description: true heading_level: 3 === "yaml" - + Attributes: + + - ```name```: the name of the task + - ```command```: the dotted path reference to the function. + - ```next```: the next node to call if the function succeeds. Use ```success``` to terminate + the pipeline successfully or ```fail``` to terminate with fail. + - ```on_failure```: The next node in case of failure. + - ```catalog```: mapping of cataloging items + - ```overrides```: mapping of step overrides from global configuration. + + ```yaml + dag: + steps: + name: <> + type: task + command: <> + next: <> + on_failure: <> + catalog: # Any cataloging to be done. + overrides: # mapping of overrides of global configuration + ```
-## Stub + +## NotebookTask === "sdk" - ::: runnable.Stub + ::: runnable.NotebookTask options: show_root_heading: true show_bases: false + show_docstring_description: true heading_level: 3 === "yaml" + Attributes: + + - ```name```: the name of the task + - ```command```: the path to the notebook relative to the project root. + - ```next```: the next node to call if the function succeeds. Use ```success``` to terminate + the pipeline successfully or ```fail``` to terminate with fail. + - ```on_failure```: The next node in case of failure. + - ```catalog```: mapping of cataloging items + - ```overrides```: mapping of step overrides from global configuration. + + ```yaml + dag: + steps: + name: <> + type: task + command: <> + next: <> + on_failure: <> + catalog: # Any cataloging to be done. + overrides: # mapping of overrides of global configuration + ```
-## PythonTask + +## Catalog === "sdk" - ::: runnable.PythonTask + ::: runnable.Catalog options: show_root_heading: true show_bases: false - show_docstring_description: true heading_level: 3 === "yaml" @@ -47,15 +91,14 @@
-## ShellTask +## Stub === "sdk" - ::: runnable.ShellTask + ::: runnable.Stub options: show_root_heading: true show_bases: false - show_docstring_description: true heading_level: 3 === "yaml" @@ -65,11 +108,12 @@
-## NotebookTask + +## ShellTask === "sdk" - ::: runnable.NotebookTask + ::: runnable.ShellTask options: show_root_heading: true show_bases: false @@ -79,8 +123,12 @@ === "yaml" +
+ + + ## Parallel diff --git a/docs/yaml.md b/docs/yaml.md deleted file mode 100644 index f811d22b..00000000 --- a/docs/yaml.md +++ /dev/null @@ -1,8 +0,0 @@ -::: runnable.tasks.PythonTaskType - options: - show_root_heading: true - show_bases: false - show_docstring_description: true - show_members: false - -
diff --git a/examples/06-parallel/parallel.py b/examples/06-parallel/parallel.py index 89be2d77..4be49423 100644 --- a/examples/06-parallel/parallel.py +++ b/examples/06-parallel/parallel.py @@ -15,7 +15,7 @@ from runnable import NotebookTask, Parallel, Pipeline, PythonTask, ShellTask, Stub -def traversal(execute: bool = True): +def traversal(): """ Use the pattern of using "execute" to control the execution of the pipeline. @@ -46,9 +46,6 @@ def traversal(execute: bool = True): # The order of execution follows the order of the tasks in the list. pipeline = Pipeline(steps=[stub_task, python_task, shell_task, notebook_task]) - if execute: # Do not execute the pipeline if we are using it as a branch - pipeline.execute() - return pipeline @@ -56,7 +53,7 @@ def main(): parallel_step = Parallel( name="parallel_step", terminate_with_success=True, - branches={"branch1": traversal(execute=False), "branch2": traversal(execute=False)}, + branches={"branch1": traversal(), "branch2": traversal()}, ) pipeline = Pipeline(steps=[parallel_step]) diff --git a/examples/concepts/catalog.py b/examples/concepts/catalog.py deleted file mode 100644 index 921732d1..00000000 --- a/examples/concepts/catalog.py +++ /dev/null @@ -1,87 +0,0 @@ -""" -A pipeline to demonstrate using the catalog service to create and retrieve content. - -You can run this pipeline by: - python run examples/concepts/catalog.py -""" - -from pathlib import Path - - -def create_content_in_data_folder(): - """ - Create a data directory and write a file "hello.txt" in the data folder. - """ - Path("data").mkdir(parents=True, exist_ok=True) - with open(Path("data") / "hello.txt", "w") as f: - f.write("Hello from data folder!!") - - -def create_content_in_another_folder(): - """ - Create a "another" directory and write a file "world.txt" in it. - """ - Path("another").mkdir(parents=True, exist_ok=True) - with open(Path("another") / "world.txt", "w") as f: - f.write("Hello from another folder!!") - - -def retrieve_content_from_both(): - """ - Display the contents of the files in data and "another" folder - """ - with open(Path("data") / "hello.txt", "r") as f: - print(f.read()) - - with open(Path("another") / "world.txt", "r") as f: - print(f.read()) - - -def main(): - from runnable import Catalog, Pipeline, PythonTask, ShellTask - - # This step creates a file in the data folder and syncs it to the catalog. - data_catalog = Catalog(put=["data/hello.txt"]) - data_create = PythonTask( - name="create_content_in_data_folder", - function=create_content_in_data_folder, - catalog=data_catalog, - ) - - # This step creates a file in the another folder and syncs it to the catalog. - another_catalog = Catalog(put=["another/world.txt"]) - another_create = PythonTask( - name="create_content_in_another_folder", - function=create_content_in_another_folder, - catalog=another_catalog, - ) - - # Delete the another folder to showcase that the folder will be recreated - # when we run the retrieve task. - delete_another_folder = ShellTask( - name="delete_another_folder", - command="rm -rf another/", - ) - - # This step retrieves the file from the catalog and prints its content. - all_catalog = Catalog(get=["**/*"]) - retrieve = PythonTask( - name="retrieve_content_from_both", - function=retrieve_content_from_both, - catalog=all_catalog, - terminate_with_success=True, - ) - - pipeline = Pipeline( - steps=[data_create, another_create, delete_another_folder, retrieve], - add_terminal_nodes=True, - ) - - # Override the default configuration file with the one that has file-system as the catalog. - _ = pipeline.execute(configuration_file="examples/configs/fs-catalog.yaml") - - return pipeline - - -if __name__ == "__main__": - main() diff --git a/examples/concepts/catalog.yaml b/examples/concepts/catalog.yaml deleted file mode 100644 index f671eca7..00000000 --- a/examples/concepts/catalog.yaml +++ /dev/null @@ -1,45 +0,0 @@ -dag: - description: | - An example pipeline to showcase catalog functionality. - - The pipeline consists of four steps: - create_content_in_data_folder: Creates a file in "data" folder and syncs it to catalog - create_content_in_another_folder: Creates another file in "another" folder and syncs it to catalog - delete_another_folder: Deletes the another folder to showcase that it is recreated later. - retrieve_content_from_both: Retrieves the content from both "data" and "another - - You can run this pipeline by: - runnable execute -f examples/concepts/catalog.yaml -c examples/configs/fs-catalog.yaml - - start_at: create_content_in_data_folder - steps: - create_content_in_data_folder: - type: task - command: examples.concepts.catalog.create_content_in_data_folder - catalog: - put: - - "data/hello.txt" - next: create_content_in_another_folder - create_content_in_another_folder: - type: task - command: examples.concepts.catalog.create_content_in_another_folder - catalog: - put: - - "another/world.txt" - next: delete_another_folder - delete_another_folder: - type: task - command_type: shell - command: rm -rf another - next: retrieve_content_from_both - retrieve_content_from_both: - type: task - command: examples.concepts.catalog.retrieve_content_from_both - catalog: - get: - - "**/*" - next: success - success: - type: success - fail: - type: fail diff --git a/examples/concepts/map.py b/examples/concepts/map.py deleted file mode 100644 index b3ceb4c0..00000000 --- a/examples/concepts/map.py +++ /dev/null @@ -1,86 +0,0 @@ -""" -An example pipeline of using "map" to process a sequence of nodes repeatedly over a iterable -parameter. - -The start_index argument for the function process_chunk is dynamically set by iterating over chunks. - -If the argument start_index is not provided, you can still access the current value by -runnable_MAP_VARIABLE environment variable. The environment variable runnable_MAP_VARIABLE -is a dictionary with keys as iterate_as. - -Run this pipeline by: - python examples/concepts/map.py -""" - - -def chunk_files(): - """ - Identify the number of chunks and files to execute per batch. - - Set the parameter "chunks" to be the start indexes of batch. - Set the parameter "stride" to be the number of files to - execute per batch. - """ - return 10, list(range(0, 50, 10)) - # create_model( - # "DynamicModel", - # chunks=(List[int], list(range(0, 50, 10))), - # stride=(int, 10), - # )() - - -def process_chunk(stride: int, start_index: int): - """ - The function processes a chunk of files. - The files between the start_index and the start_index + stride - are processed per chunk. - """ - for i in range(start_index, start_index + stride, stride): - pass - - return stride * start_index - - -def main(): - """ - The pythonic equivalent of the following pipeline. - - chunks = chunk_files() - - for start_index in chunks.chunks: - process_chunk(chunks.stride, start_index) - - """ - from runnable import Map, Pipeline, PythonTask - - execute = PythonTask( - name="execute", - function=process_chunk, - returns=["me"], - terminate_with_success=True, - ) - - execute_branch = Pipeline(steps=[execute], add_terminal_nodes=True) - - generate = PythonTask( - name="chunk files", - function=chunk_files, - returns=["stride", "chunks"], - ) - iterate_and_execute = Map( - name="iterate and execute", - branch=execute_branch, - iterate_on="chunks", # iterate on chunks parameter set by execute step - iterate_as="start_index", # expose the current start_index as the iterate_as parameter - terminate_with_success=True, - ) - - pipeline = Pipeline(steps=[generate, iterate_and_execute], add_terminal_nodes=True) - - _ = pipeline.execute(configuration_file="examples/configs/fs-catalog-chunked_run_log.yaml") - - return pipeline - - -if __name__ == "__main__": - main() diff --git a/examples/concepts/map.yaml b/examples/concepts/map.yaml deleted file mode 100644 index cee8e0b7..00000000 --- a/examples/concepts/map.yaml +++ /dev/null @@ -1,48 +0,0 @@ -dag: - description: | - This pipeline demonstrates the usage of map state to dynamically - execute workflows in parallel. - - The step "chunk files" identifies the total number of batches to - execute in parallel and sets the parameters - - start_index of every batch to process, chunks - - number of files to process per batch, stride. - - The step "iterate and execute" iterates on "chunks" and the - parameter name per chunk is set to be "start_index". - - Run this example by: - runnable execute -f examples/concepts/map.yaml - start_at: chunk files - steps: - chunk files: - type: task - command_type: python - command: "examples.concepts.map.chunk_files" - returns: - - name: stride - kind: json - - name: chunks - kind: json - next: iterate and execute - iterate and execute: - type: map - iterate_on: chunks - iterate_as: start_index - next: success - branch: - start_at: execute - steps: - execute: - type: task - command_type: python - command: "examples.concepts.map.process_chunk" - next: success - success: - type: success - fail: - type: fail - success: - type: success - fail: - type: fail diff --git a/examples/concepts/map_returns.py b/examples/concepts/map_returns.py deleted file mode 100644 index 07a530fb..00000000 --- a/examples/concepts/map_returns.py +++ /dev/null @@ -1,94 +0,0 @@ -""" -An example pipeline of using "map" to process a sequence of nodes repeatedly over a iterable -parameter. - -The start_index argument for the function process_chunk is dynamically set by iterating over chunks. - -If the argument start_index is not provided, you can still access the current value by -runnable_MAP_VARIABLE environment variable. The environment variable runnable_MAP_VARIABLE -is a dictionary with keys as iterate_as. - -Run this pipeline by: - python examples/concepts/map.py -""" - - -def chunk_files(): - """ - Identify the number of chunks and files to execute per batch. - - Set the parameter "chunks" to be the start indexes of batch. - Set the parameter "stride" to be the number of files to - execute per batch. - """ - return 10, list(range(0, 50, 10)) - - -def process_chunk(stride: int, start_index: int): - """ - The function processes a chunk of files. - The files between the start_index and the start_index + stride - are processed per chunk. - """ - for i in range(start_index, start_index + stride, stride): - pass - - return f"processed {start_index} to {start_index + stride}" - - -def read_processed_chunk(processed: str): - print(processed) - - -def main(): - """ - The pythonic equivalent of the following pipeline. - - chunks = chunk_files() - - for start_index in chunks.chunks: - process_chunk(chunks.stride, start_index) - - """ - from runnable import Map, Pipeline, PythonTask - - execute = PythonTask( - name="execute", - function=process_chunk, - returns=["processed"], - ) - - read_chunk = PythonTask( - name="read processed chunk", - function=read_processed_chunk, - terminate_with_success=True, - ) - - execute_branch = Pipeline( - steps=[execute, read_chunk], - add_terminal_nodes=True, - ) - - generate = PythonTask( - name="chunk files", - function=chunk_files, - returns=["stride", "chunks"], - ) - iterate_and_execute = Map( - name="iterate and execute", - branch=execute_branch, - iterate_on="chunks", # iterate on chunks parameter set by execute step - iterate_as="start_index", # expose the current start_index as the iterate_as parameter - reducer="lambda *x: [f'reduced {y}' for y in x]", - terminate_with_success=True, - ) - - pipeline = Pipeline(steps=[generate, iterate_and_execute], add_terminal_nodes=True) - - _ = pipeline.execute(configuration_file="examples/configs/fs-catalog-run_log.yaml") - - return pipeline - - -if __name__ == "__main__": - main() diff --git a/examples/concepts/map_shell.yaml b/examples/concepts/map_shell.yaml deleted file mode 100644 index 1037b31f..00000000 --- a/examples/concepts/map_shell.yaml +++ /dev/null @@ -1,57 +0,0 @@ -dag: - description: | - This pipeline demonstrates the usage of map state to dynamically - execute workflows in parallel. - - The step "chunk files" identifies the total number of batches to - execute in parallel and sets the parameters - - start_index of every batch to process, chunks - - number of files to process per batch, stride. - - The step "iterate and execute" iterates on "chunks" and the - parameter name per chunk is set to be "start_index". - - The shell script can access the start_index as - runnable_MAP_VARIABLE environment variable. - - Run this pipeline by: - runnable execute -f examples/concepts/map_shell.yaml - - start_at: chunk files - steps: - chunk files: - type: task - command_type: shell - returns: - - name: stride - kind: json - - name: chunks - kind: json - command: | - export stride=10 && - export chunks="[0, 10, 20, 30, 40]" - next: iterate and execute - iterate and execute: - type: map - iterate_on: chunks - iterate_as: start_index - next: success - branch: - start_at: execute - steps: - execute: - type: task - command_type: shell - command: | - echo stride - echo start_index - # prints 10 and 0, 10, 20, 30, 40 - next: success - success: - type: success - fail: - type: fail - success: - type: success - fail: - type: fail diff --git a/examples/concepts/nesting.py b/examples/concepts/nesting.py deleted file mode 100644 index 7d1b85df..00000000 --- a/examples/concepts/nesting.py +++ /dev/null @@ -1,68 +0,0 @@ -""" -An example to demonstrate nesting workflows within workflows. - - -Run this pipeline by: - python examples/concepts/nesting.py - -""" - -from typing import List - -from runnable import Map, Parallel, Pipeline, PythonTask, Stub - - -def generate_list(): - return List[int], list(range(2)) - - -def main(): - stub = Stub(name="executable", terminate_with_success=True) - # A dummy pipeline that does nothing interesting - stubbed_pipeline = Pipeline(steps=[stub], add_terminal_nodes=True) - - # A map step that executes the stubbed pipeline dynamically - # This step represents 2 parallel workflows when executed. - inner_most_map = Map( - name="inner most", - branch=stubbed_pipeline, - iterate_on="array", # Parameter defined in line #20 - iterate_as="y", - terminate_with_success=True, - ) - - # A pipeline with map state. - map_pipeline = Pipeline(steps=[inner_most_map], add_terminal_nodes=True) - - # A parallel step that executes a map_pipeline and stubbed pipeline - # By nesting a map within the parallel step, the total number of workflows is 4 (2 X 2 = 4) - nested_parallel = Parallel( - name="nested parallel", - branches={"a": map_pipeline, "b": map_pipeline}, - terminate_with_success=True, - ) - - # A pipeline with one nested parallel step - nested_parallel_pipeline = Pipeline(steps=[nested_parallel], add_terminal_nodes=True) - - list_generator = PythonTask(name="generate list", function=generate_list, returns=["array"]) - - # A map step that iterates over array and executes nested_parallel_pipeline - # The total number of workflows is 50 by this time (2 X 2 X 2 = 8) - outer_most_map = Map( - name="outer most", - branch=nested_parallel_pipeline, - iterate_on="array", - iterate_as="x", - terminate_with_success=True, - ) - - root_pipeline = Pipeline(steps=[list_generator, outer_most_map], add_terminal_nodes=True) - - _ = root_pipeline.execute(configuration_file="examples/configs/fs-catalog-run_log.yaml") - - return root_pipeline - - -if __name__ == "__main__": - main() diff --git a/examples/concepts/nesting.yaml b/examples/concepts/nesting.yaml deleted file mode 100644 index 93f594f9..00000000 --- a/examples/concepts/nesting.yaml +++ /dev/null @@ -1,81 +0,0 @@ -dag: - description: | - An example of nesting pipelines within pipelines. - - Run this pipeline by: - runnable execute -f examples/concepts/nesting.yaml - - start_at: generate_list - steps: - generate_list: - type: task - command_type: shell - returns: - - name: array - kind: json - command: export array="[0, 1]" - next: outer most map - outer most map: - type: map - iterate_on: array - iterate_as: xarg - next: success - branch: - start_at: nested parallel - steps: - nested parallel: - type: parallel - next: success - branches: - a: - start_at: inner most map - steps: - inner most map: - type: map - iterate_on: array - iterate_as: yarg - next: success - branch: - start_at: executable - steps: - executable: - type: stub - next: success - success: - type: success - fail: - type: fail - success: - type: success - fail: - type: fail - b: - start_at: inner most map - steps: - inner most map: - type: map - iterate_on: array - iterate_as: yarg - next: success - branch: - start_at: executable - steps: - executable: - type: stub - next: success - success: - type: success - fail: - type: fail - success: - type: success - fail: - type: fail - success: - type: success - fail: - type: fail - success: - type: success - fail: - type: fail diff --git a/examples/concepts/notebook_native_parameters.ipynb b/examples/concepts/notebook_native_parameters.ipynb deleted file mode 100644 index 33a29b75..00000000 --- a/examples/concepts/notebook_native_parameters.ipynb +++ /dev/null @@ -1,88 +0,0 @@ -{ - "cells": [ - { - "cell_type": "code", - "execution_count": 1, - "id": "3e98e89e-765c-42d4-81ea-c371c2eab14d", - "metadata": {}, - "outputs": [], - "source": [ - "from pydantic import BaseModel\n", - "\n", - "\n", - "class EggsModel(BaseModel):\n", - " ham: str\n", - "\n", - "\n", - "class EverythingModel(BaseModel):\n", - " spam: str\n", - " eggs: EggsModel\n", - "\n", - "\n", - "\n", - "class CustomObject:\n", - " def __init__(self, value):\n", - " self.value = 42" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "e7f0aab2", - "metadata": { - "tags": [ - "parameters" - ] - }, - "outputs": [], - "source": [ - "# Note the tag parameters\n", - "spam = \"Change me\" \n", - "eggs =\"Change me\"" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "0e04f11a", - "metadata": {}, - "outputs": [], - "source": [ - "eggs = EggsModel.model_validate(eggs) # Cast the dict to EggsModel object" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "9f1cbac6-cada-42b0-8fb1-ddb25a88836c", - "metadata": {}, - "outputs": [], - "source": [ - "spam =\"World\"\n", - "eggs = EggsModel(ham=\"No, Thank you!!\")\n", - "custom = CustomObject(value=42)" - ] - } - ], - "metadata": { - "kernelspec": { - "display_name": "Python 3 (ipykernel)", - "language": "python", - "name": "python3" - }, - "language_info": { - "codemirror_mode": { - "name": "ipython", - "version": 3 - }, - "file_extension": ".py", - "mimetype": "text/x-python", - "name": "python", - "nbconvert_exporter": "python", - "pygments_lexer": "ipython3", - "version": "3.8.16" - } - }, - "nbformat": 4, - "nbformat_minor": 5 -} diff --git a/examples/concepts/notebook_native_parameters.yaml b/examples/concepts/notebook_native_parameters.yaml deleted file mode 100644 index 0144d119..00000000 --- a/examples/concepts/notebook_native_parameters.yaml +++ /dev/null @@ -1,37 +0,0 @@ -dag: - description: | - This is a sample pipeline with one step that executes a notebook. - - The step name "notebook" has the "command_type" to be notebook to - let runnable know to execute a notebook while the command is the - path to the notebook relative to the project root. - - The notebook is executed in the same environment as the current - project, you can import any module that was installed for the project. - - You can run this pipeline as: - runnable execute -f examples/concepts/notebook_native_parameters.yaml -p examples/concepts/parameters.yaml - - start_at: notebook - steps: - notebook: - type: task - command_type: notebook - command: examples/concepts/notebook_native_parameters.ipynb - returns: - - name: spam - kind: json - - name: eggs - kind: json - - name: custom - kind: object - next: consume_notebook - consume_notebook: - type: task - command_type: notebook - command: examples/concepts/notebook_native_parameters_consume.ipynb - next: success - success: - type: success - fail: - type: fail diff --git a/examples/concepts/notebook_native_parameters_consume.ipynb b/examples/concepts/notebook_native_parameters_consume.ipynb deleted file mode 100644 index ae5ca22e..00000000 --- a/examples/concepts/notebook_native_parameters_consume.ipynb +++ /dev/null @@ -1,82 +0,0 @@ -{ - "cells": [ - { - "cell_type": "code", - "execution_count": 1, - "id": "3e98e89e-765c-42d4-81ea-c371c2eab14d", - "metadata": {}, - "outputs": [], - "source": [ - "from pydantic import BaseModel\n", - "\n", - "\n", - "class EggsModel(BaseModel):\n", - " ham: str\n", - "\n", - "\n", - "class EverythingModel(BaseModel):\n", - " spam: str\n", - " eggs: EggsModel\n" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "e7f0aab2", - "metadata": { - "tags": [ - "parameters" - ] - }, - "outputs": [], - "source": [ - "# Note the tag parameters\n", - "spam = \"Change me\" \n", - "eggs =\"Change me\"\n", - "custom = \"change me\"" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "0e04f11a", - "metadata": {}, - "outputs": [], - "source": [ - "eggs = EggsModel.model_validate(eggs) # Cast the dict to EggsModel object" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "9f1cbac6-cada-42b0-8fb1-ddb25a88836c", - "metadata": {}, - "outputs": [], - "source": [ - "spam =\"World\"\n", - "eggs = EggsModel(ham=\"No, Thank you!!\")\n" - ] - } - ], - "metadata": { - "kernelspec": { - "display_name": "Python 3 (ipykernel)", - "language": "python", - "name": "python3" - }, - "language_info": { - "codemirror_mode": { - "name": "ipython", - "version": 3 - }, - "file_extension": ".py", - "mimetype": "text/x-python", - "name": "python", - "nbconvert_exporter": "python", - "pygments_lexer": "ipython3", - "version": "3.8.16" - } - }, - "nbformat": 4, - "nbformat_minor": 5 -} diff --git a/examples/concepts/notebook_native_parameters_consume_out.ipynb b/examples/concepts/notebook_native_parameters_consume_out.ipynb deleted file mode 100644 index ec796d3f..00000000 --- a/examples/concepts/notebook_native_parameters_consume_out.ipynb +++ /dev/null @@ -1,121 +0,0 @@ -{ - "cells": [ - { - "cell_type": "code", - "execution_count": 1, - "id": "3e98e89e-765c-42d4-81ea-c371c2eab14d", - "metadata": { - "ploomber": { - "timestamp_end": 1712723378.894586, - "timestamp_start": 1712723378.892554 - } - }, - "outputs": [], - "source": [ - "from pydantic import BaseModel\n", - "\n", - "\n", - "class EggsModel(BaseModel):\n", - " ham: str\n", - "\n", - "\n", - "class EverythingModel(BaseModel):\n", - " spam: str\n", - " eggs: EggsModel\n" - ] - }, - { - "cell_type": "code", - "execution_count": 2, - "id": "e7f0aab2", - "metadata": { - "ploomber": { - "timestamp_end": 1712723378.894782, - "timestamp_start": 1712723378.894609 - }, - "tags": [ - "parameters" - ] - }, - "outputs": [], - "source": [ - "# Note the tag parameters\n", - "spam = \"Change me\" \n", - "eggs =\"Change me\"\n", - "custom = \"change me\"" - ] - }, - { - "cell_type": "code", - "execution_count": 3, - "id": "59862ac0", - "metadata": { - "ploomber": { - "timestamp_end": 1712723378.894933, - "timestamp_start": 1712723378.894796 - }, - "tags": [ - "injected-parameters" - ] - }, - "outputs": [], - "source": [ - "# Injected parameters\n", - "spam = \"World\"\n", - "eggs = {\"ham\": \"No, Thank you!!\"}\n" - ] - }, - { - "cell_type": "code", - "execution_count": 4, - "id": "0e04f11a", - "metadata": { - "ploomber": { - "timestamp_end": 1712723378.895087, - "timestamp_start": 1712723378.894946 - } - }, - "outputs": [], - "source": [ - "eggs = EggsModel.model_validate(eggs) # Cast the dict to EggsModel object" - ] - }, - { - "cell_type": "code", - "execution_count": 5, - "id": "9f1cbac6-cada-42b0-8fb1-ddb25a88836c", - "metadata": { - "ploomber": { - "timestamp_end": 1712723378.895227, - "timestamp_start": 1712723378.895099 - } - }, - "outputs": [], - "source": [ - "spam =\"World\"\n", - "eggs = EggsModel(ham=\"No, Thank you!!\")\n" - ] - } - ], - "metadata": { - "kernelspec": { - "display_name": "Python 3 (ipykernel)", - "language": "python", - "name": "python3" - }, - "language_info": { - "codemirror_mode": { - "name": "ipython", - "version": 3 - }, - "file_extension": ".py", - "mimetype": "text/x-python", - "name": "python", - "nbconvert_exporter": "python", - "pygments_lexer": "ipython3", - "version": "3.8.16" - } - }, - "nbformat": 4, - "nbformat_minor": 5 -} diff --git a/examples/concepts/notebook_native_parameters_out.ipynb b/examples/concepts/notebook_native_parameters_out.ipynb deleted file mode 100644 index 4546caaf..00000000 --- a/examples/concepts/notebook_native_parameters_out.ipynb +++ /dev/null @@ -1,127 +0,0 @@ -{ - "cells": [ - { - "cell_type": "code", - "execution_count": 1, - "id": "3e98e89e-765c-42d4-81ea-c371c2eab14d", - "metadata": { - "ploomber": { - "timestamp_end": 1712723378.486399, - "timestamp_start": 1712723378.482792 - } - }, - "outputs": [], - "source": [ - "from pydantic import BaseModel\n", - "\n", - "\n", - "class EggsModel(BaseModel):\n", - " ham: str\n", - "\n", - "\n", - "class EverythingModel(BaseModel):\n", - " spam: str\n", - " eggs: EggsModel\n", - "\n", - "\n", - "\n", - "class CustomObject:\n", - " def __init__(self, value):\n", - " self.value = 42" - ] - }, - { - "cell_type": "code", - "execution_count": 2, - "id": "e7f0aab2", - "metadata": { - "ploomber": { - "timestamp_end": 1712723378.486896, - "timestamp_start": 1712723378.486585 - }, - "tags": [ - "parameters" - ] - }, - "outputs": [], - "source": [ - "# Note the tag parameters\n", - "spam = \"Change me\" \n", - "eggs =\"Change me\"" - ] - }, - { - "cell_type": "code", - "execution_count": 3, - "id": "bb75b0e8", - "metadata": { - "ploomber": { - "timestamp_end": 1712723378.487227, - "timestamp_start": 1712723378.486928 - }, - "tags": [ - "injected-parameters" - ] - }, - "outputs": [], - "source": [ - "# Injected parameters\n", - "spam = \"Hello\"\n", - "eggs = {\"ham\": \"Yes, please!!\"}\n" - ] - }, - { - "cell_type": "code", - "execution_count": 4, - "id": "0e04f11a", - "metadata": { - "ploomber": { - "timestamp_end": 1712723378.487586, - "timestamp_start": 1712723378.487256 - } - }, - "outputs": [], - "source": [ - "eggs = EggsModel.model_validate(eggs) # Cast the dict to EggsModel object" - ] - }, - { - "cell_type": "code", - "execution_count": 5, - "id": "9f1cbac6-cada-42b0-8fb1-ddb25a88836c", - "metadata": { - "ploomber": { - "timestamp_end": 1712723378.488026, - "timestamp_start": 1712723378.487615 - } - }, - "outputs": [], - "source": [ - "spam =\"World\"\n", - "eggs = EggsModel(ham=\"No, Thank you!!\")\n", - "custom = CustomObject(value=42)" - ] - } - ], - "metadata": { - "kernelspec": { - "display_name": "Python 3 (ipykernel)", - "language": "python", - "name": "python3" - }, - "language_info": { - "codemirror_mode": { - "name": "ipython", - "version": 3 - }, - "file_extension": ".py", - "mimetype": "text/x-python", - "name": "python", - "nbconvert_exporter": "python", - "pygments_lexer": "ipython3", - "version": "3.8.16" - } - }, - "nbformat": 4, - "nbformat_minor": 5 -} diff --git a/examples/concepts/parallel.py b/examples/concepts/parallel.py deleted file mode 100644 index 8b40596a..00000000 --- a/examples/concepts/parallel.py +++ /dev/null @@ -1,66 +0,0 @@ -""" -This is a stubbed pipeline that demonstrates parallel - pipeline execution. - Note that the branches schema is same as dag schema. - - All the steps are mocked and they will just pass through. - Use this pattern to define the skeleton of your pipeline and - flesh out the steps later. - - You can run this pipeline by: - python examples/concepts/parallel.py -""" - -from runnable import Parallel, Pipeline, Stub - - -def main(): - # The steps in XGBoost training pipeline - prepare_xgboost = Stub(name="Prepare for XGBoost") - train_xgboost = Stub(name="Train XGBoost", terminate_with_success=True) - - # prepare_xgboost >> train_xgboost - - # The pipeline for XGBoost training - xgboost = Pipeline( - name="XGBoost", - steps=[prepare_xgboost, train_xgboost], - # start_at=prepare_xgboost, - add_terminal_nodes=True, - ) - - # The steps and pipeline in Random Forest training - train_rf = Stub(name="Train RF", terminate_with_success=True) - rfmodel = Pipeline( - steps=[train_rf], - # start_at=train_rf, - add_terminal_nodes=True, - ) - - # The steps in parent pipeline - get_features = Stub(name="Get Features") - # The parallel step definition. - # Branches are just pipelines themselves - train_models = Parallel( - name="Train Models", - branches={"XGBoost": xgboost, "RF Model": rfmodel}, - ) - ensemble_model = Stub(name="Ensemble Modelling") - run_inference = Stub(name="Run Inference", terminate_with_success=True) - - # get_features >> train_models >> ensemble_model >> run_inference - - # The parent pipeline - pipeline = Pipeline( - steps=[get_features, train_models, ensemble_model, run_inference], - # start_at=get_features, - add_terminal_nodes=True, - ) - - _ = pipeline.execute(configuration_file="examples/configs/fs-catalog-run_log.yaml") - - return pipeline - - -if __name__ == "__main__": - main() diff --git a/examples/concepts/parallel.yaml b/examples/concepts/parallel.yaml deleted file mode 100644 index 9fd54690..00000000 --- a/examples/concepts/parallel.yaml +++ /dev/null @@ -1,54 +0,0 @@ -dag: - description: | - This is a stubbed pipeline that demonstrates parallel - pipeline execution. - Note that the branches schema is same as dag schema. - - All the steps are mocked and they will just pass through. - Use this pattern to define the skeleton of your pipeline and - flesh out the steps later. - - You can run this pipeline by: - runnable execute -f examples/concepts/parallel.yaml - start_at: Get Features - steps: - Get Features: - type: stub - next: Train Models - Train Models: - type: parallel - next: Ensemble Modelling - branches: - XGBoost: - start_at: Prepare for XGBoost - steps: - Prepare for XGBoost: - type: stub - next: Train XGBoost - Train XGBoost: - type: stub - next: XGBoost success - XGBoost success: - type: success - fail: - type: fail - RF model: - start_at: Train RF Model - steps: - Train RF Model: - type: stub - next: RF Model Success - RF Model Success: - type: success - fail: - type: fail - Ensemble Modelling: - type: stub - next: Run Inference - Run Inference: - type: stub - next: success - success: - type: success - fail: - type: fail diff --git a/examples/concepts/parameters.yaml b/examples/concepts/parameters.yaml deleted file mode 100644 index 4bc1a8ac..00000000 --- a/examples/concepts/parameters.yaml +++ /dev/null @@ -1,3 +0,0 @@ -spam: "Hello" -eggs: - ham: "Yes, please!!" diff --git a/examples/concepts/simple.py b/examples/concepts/simple.py deleted file mode 100644 index 069c86dd..00000000 --- a/examples/concepts/simple.py +++ /dev/null @@ -1,36 +0,0 @@ -""" -A simple pipeline with a simple function that just prints "Hello World!". - -Run this pipeline by: - python examples/concepts/simple.py -""" - -from runnable import Pipeline, PythonTask - - -def simple_function(): - """ - A simple function that just prints "Hello World!". - """ - print("Hello World!") - - -def main(): - simple_task = PythonTask( - name="simple", - function=simple_function, - terminate_with_success=True, - ) - - pipeline = Pipeline( - steps=[simple_task], - add_terminal_nodes=True, - ) - - pipeline.execute() # (1) - - return pipeline - - -if __name__ == "__main__": - main() diff --git a/examples/concepts/simple.yaml b/examples/concepts/simple.yaml deleted file mode 100644 index 1854c11b..00000000 --- a/examples/concepts/simple.yaml +++ /dev/null @@ -1,17 +0,0 @@ -dag: - description: | - A simple pipeline with a simple function that just prints "Hello World!". - - Run this pipeline by: - runnable execute -f examples/concepts/simple.yaml - start_at: simple - steps: - simple: - type: task - command: "examples.concepts.simple.simple_function" - command_type: python - next: success - success: - type: success - fail: - type: fail diff --git a/examples/concepts/simple_notebook.ipynb b/examples/concepts/simple_notebook.ipynb deleted file mode 100644 index 167f26de..00000000 --- a/examples/concepts/simple_notebook.ipynb +++ /dev/null @@ -1,46 +0,0 @@ -{ - "cells": [ - { - "cell_type": "code", - "execution_count": null, - "id": "3e98e89e-765c-42d4-81ea-c371c2eab14d", - "metadata": {}, - "outputs": [], - "source": [ - "def function():\n", - " print(\"hello world\")" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "9f1cbac6-cada-42b0-8fb1-ddb25a88836c", - "metadata": {}, - "outputs": [], - "source": [ - "function()" - ] - } - ], - "metadata": { - "kernelspec": { - "display_name": "Python 3 (ipykernel)", - "language": "python", - "name": "python3" - }, - "language_info": { - "codemirror_mode": { - "name": "ipython", - "version": 3 - }, - "file_extension": ".py", - "mimetype": "text/x-python", - "name": "python", - "nbconvert_exporter": "python", - "pygments_lexer": "ipython3", - "version": "3.8.16" - } - }, - "nbformat": 4, - "nbformat_minor": 5 -} diff --git a/examples/concepts/simple_notebook.yaml b/examples/concepts/simple_notebook.yaml deleted file mode 100644 index 27f8de77..00000000 --- a/examples/concepts/simple_notebook.yaml +++ /dev/null @@ -1,29 +0,0 @@ -dag: - description: | - This is a sample pipeline with one step that executes a notebook. - - The step name "notebook" has the "command_type" to be notebook to - let runnable know to execute a notebook while the command is the - path to the notebook relative to the project root. - - The notebook is executed in the same environment as the current - project, you can import any module that was installed for the project. - - You can run this pipeline as: - runnable execute -f examples/concepts/simple_notebook.yaml - - start_at: notebook - steps: - notebook: - type: task - command_type: notebook - returns: - - name: a - - name: b - - name: c - command: examples/concepts/simple_notebook.ipynb - next: success - success: - type: success - fail: - type: fail diff --git a/examples/concepts/simple_notebook_out.ipynb b/examples/concepts/simple_notebook_out.ipynb deleted file mode 100644 index 410609b9..00000000 --- a/examples/concepts/simple_notebook_out.ipynb +++ /dev/null @@ -1,117 +0,0 @@ -{ - "cells": [ - { - "cell_type": "code", - "execution_count": 1, - "id": "0ee2a616", - "metadata": { - "ploomber": { - "timestamp_end": 1712723380.314318, - "timestamp_start": 1712723380.314131 - }, - "tags": [ - "injected-parameters" - ] - }, - "outputs": [], - "source": [ - "# Injected parameters\n" - ] - }, - { - "cell_type": "code", - "execution_count": 2, - "id": "3e98e89e-765c-42d4-81ea-c371c2eab14d", - "metadata": { - "ploomber": { - "timestamp_end": 1712723380.314528, - "timestamp_start": 1712723380.314339 - } - }, - "outputs": [], - "source": [ - "def add(x, y):\n", - " return x + y" - ] - }, - { - "cell_type": "code", - "execution_count": 3, - "id": "9f1cbac6-cada-42b0-8fb1-ddb25a88836c", - "metadata": { - "ploomber": { - "timestamp_end": 1712723380.31542, - "timestamp_start": 1712723380.314542 - } - }, - "outputs": [], - "source": [ - "def multiply(x, y):\n", - " return x * y\n", - "\n", - "from pydantic import BaseModel\n", - "\n", - "class EggsModel(BaseModel):\n", - " ham: str\n", - "\n", - "\n", - "class ObjectType:\n", - " def __init__(self):\n", - " self.salute = \"hello\"" - ] - }, - { - "cell_type": "code", - "execution_count": 4, - "id": "9dcadc93-aa77-4a0a-9465-2e33eef4da44", - "metadata": { - "ploomber": { - "timestamp_end": 1712723380.315571, - "timestamp_start": 1712723380.315436 - } - }, - "outputs": [], - "source": [ - "a = add(40, 2)" - ] - }, - { - "cell_type": "code", - "execution_count": 5, - "id": "7b872cdf-820b-47b5-8f22-15c4b69c8637", - "metadata": { - "ploomber": { - "timestamp_end": 1712723380.315741, - "timestamp_start": 1712723380.315585 - } - }, - "outputs": [], - "source": [ - "b = multiply(2, 100)\n", - "\n", - "c = EggsModel(ham=\"hello\")" - ] - } - ], - "metadata": { - "kernelspec": { - "display_name": "Python 3 (ipykernel)", - "language": "python", - "name": "python3" - }, - "language_info": { - "codemirror_mode": { - "name": "ipython", - "version": 3 - }, - "file_extension": ".py", - "mimetype": "text/x-python", - "name": "python", - "nbconvert_exporter": "python", - "pygments_lexer": "ipython3", - "version": "3.8.16" - } - }, - "nbformat": 4, - "nbformat_minor": 5 -} diff --git a/examples/concepts/task_shell_parameters.yaml b/examples/concepts/task_shell_parameters.yaml deleted file mode 100644 index a4b2ff80..00000000 --- a/examples/concepts/task_shell_parameters.yaml +++ /dev/null @@ -1,49 +0,0 @@ -dag: - description: | - This is a sample pipeline to show the parameter flow for shell types. - - The step "access initial" just displays the initial parameters defined in examples/concepts/parameters.yaml - The step modify_initial updates the parameters and sets them back as environment variables. - The step display_again displays the updated parameters from modify_initial and updates them. - - You can run this pipeline as: - runnable execute -f examples/concepts/task_shell_parameters.yaml -p examples/concepts/parameters.yaml - - start_at: access initial - steps: - access initial: - type: task - command_type: shell - command: | - env - next: modify initial - modify initial: - type: task - command_type: shell - returns: - - name: spam - kind: json - - name: eggs - kind: json - command: | - export spam='World' - export eggs='{"ham": "No, Thank you!!"}' - next: display again - display again: - type: task - command_type: shell - returns: - - name: spam - kind: json - - name: eggs - kind: json - - command: | - env && \ - export spam='Universe' && \ - export eggs='{"ham": "Maybe, one more.."}' - next: success - success: - type: success - fail: - type: fail diff --git a/examples/concepts/task_shell_simple.yaml b/examples/concepts/task_shell_simple.yaml deleted file mode 100644 index afe55bcc..00000000 --- a/examples/concepts/task_shell_simple.yaml +++ /dev/null @@ -1,22 +0,0 @@ -dag: - description: | - This is a sample pipeline with one step that executes a shell command. - - The step name "shell" has the "command_type" to be shell to - let runnable know to execute a shell while the command is directly - executed in the current environment. - - You can run this pipeline as: - runnable execute -f examples/concepts/task_shell_simple.yaml - - start_at: shell - steps: - shell: - type: task - command_type: shell - command: echo "Hello world!!" - next: success - success: - type: success - fail: - type: fail diff --git a/mkdocs.yml b/mkdocs.yml index 897e33cf..eafbba23 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -122,11 +122,9 @@ nav: - "Parameters": "concepts/parameters.md" - "Catalog": "concepts/catalog.md" - "Secrets": "concepts/secrets.md" - - "Nodes": - - "Parallel": "concepts/parallel.md" - - "Map": "concepts/map.md" - - "Nesting": "concepts/nesting.md" - - "Reproducibility": "concepts/run-log.md" + - "Parallel": "concepts/parallel.md" + - "Map": "concepts/map.md" + - "Reproducibility": "concepts/run-log.md" - "Run pipeline": - "Overview": "configurations/overview.md" - "Executor": @@ -140,7 +138,7 @@ nav: - "Catalog": "configurations/catalog.md" - "Secrets": "configurations/secrets.md" # - "Experiment tracking": "configurations/experiment-tracking.md" - - "Python SDK": "sdk.md" - - "YAML": "yaml.md" + # - "Python SDK": "sdk.md" + # - "YAML": "yaml.md" - "Reference": "reference.md" # - "Roadmap": "roadmap.md"