Author: Lijun Yu
Email: [email protected]
A pipeline system for efficient execution.
pip install py-turbo
Pyturbo
utilizes multiple level of abstract to efficiently execute parallel tasks.
- Worker: a process.
- Stage: a group of peer workers processing the same type of tasks.
- Task: a data unit transferred between stages. At each stage, a task is processed by one worker and will result in one or multiple downstream tasks.
- Pipeline: a set of sequential stages.
- Job: a data unit for a pipeline, typically a wrapped task for the first stage.
- Result: output of a job processed by one pipeline, typically a set of output tasks from the last stage.
- System: a set of peer pipelines processing the same type of jobs.
from pyturbo import ReorderStage, Stage, System
class Stage1(Stage): # Define a stage
def allocate_resource(self, resources, ...):
... # Optional: split resources and determine number of workers.
def process(self, task):
... # Process function for each worker process. Returns one or a series of downstream tasks.
... # Repeat for Stage2, Stage3
class Stage4(ReorderStage): # Define a reorder stage, typically for the final stage
def get_sequence_id(self, task):
... # Return the order of each task. Start from 0.
def process(self, task):
...
class MySystem(System):
def get_stages(self, resources):
... # Define the stages in a pipeline with given resources.
def get_results(self, results_gen):
... # Define how to extract final results from output tasks.
def main():
system = MySystem(num_pipeline) # Set debug=True to run in a single process
system.start() # Build and start system
jobs = [...]
system.add_jobs(jobs) # Submit jobs
for job in system.wait_jobs(len(jobs)):
print(job.results) # Process result
system.end() # End system
See options.md
See demo.py for an example implementation.
See version.md.