Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Related projects #71

Open
constantinpape opened this issue Apr 27, 2020 · 16 comments
Open

Related projects #71

constantinpape opened this issue Apr 27, 2020 · 16 comments

Comments

@constantinpape
Copy link
Member

constantinpape commented Apr 27, 2020

To follow up on #70, here are some related libraries I had a look at while implementing my research code for scalable algorithms in https://github.com/constantinpape/cluster_tools and how I think they relate to what we want to do:

  • Low level computation libraries: generate the computation graph (usually a DAG) to execute a given job. It's probably a good idea to use one of these, but (from what I understand) they solve a more low-level computation scheduling problem
    • dask is the de-facto standard library for this.
    • ray was developed with machine-learning applications in mind, I don't know if it gained much traction.
  • Job scheduling libraries: to schedule a workflow of interdependent jobs, again usually represented as a DAG. From what I found none of these really solve the "multi computation target" problem.
    • luigi: job dependencies are specified as python classes. I am using this in cluster_tools, but I find it opaque for expressing more complex job relations and re-running tasks, so I would probably not choose it again. initially developed at spotify.
    • airflow: also purely python based, I don't know much more. initially develped at airbnb.
    • metaflow: also purely python based, I don't know much more. Initially developed at netflix.
    • snakemake: workflow description similar to make. Popular in bioinformatics.
    • galaxy: as far as I understand it all individual jobs are containerized, I don't really know how workflows are specified. As far as I understand, this comes closes to solving the "multi computation targets" problem. Also popular in bioinformatics.

Additional resources mentioned by @wolny and @jkh1:

@k-dominik
Copy link

@m-novikov @Tomaz-Vieira and I had a discussion about how to go on with this and came up with some metrics to compare the tools:

Metrics for comparing job runners

  • implementation language
  • extendability
  • activity (on github)
  • support for different executors
  • non-python compatibility / how to run generic executables
  • one-shot / server
  • minimal example to run an ilastik batch job

We'll look into the list and try to characterize the different tools according to the metrics
Current distribution of work:

  • Maksim: airflow, snakemake
  • Tomaz: galaxy
  • Dominik: metaflow

@m-novikov
Copy link
Contributor

Airflow Snakemake
Language Python Python
Extendability Extendable executors and operators Runs arbitrary shell commands
Activity (github) 45 pull-requests merged last week 3 pull-requests merged last week
Executors Has kubernetes executor, seems possible to implement unicore one Communicates with cluster using templated command
Non-python compat Good, runs arbitrary bash Good, runs arbitrary bash
One-shot / server Server One-shot

@constantinpape
Copy link
Member Author

45 pull-requests merged last week

Wow!

@k-dominik
Copy link

k-dominik commented May 28, 2020

Metaflow
implementation language Python
extendability no developer docs
activity (on github) > 20 commits during past month
support for different executors not really, titus and aws
non python compatibility probably yes ;)
one-shot / server it monitors progress

@jkh1
Copy link

jkh1 commented May 29, 2020

Here are two lists of workflow management systems:
- https://github.com/common-workflow-language/common-workflow-language/wiki/Existing-Workflow-systems
- https://github.com/pditommaso/awesome-pipeline

It's hard to choose because some wfms are somewhat specialized or don't support complex workflows or are tied to a technology or language (but many are also language-agnostic).
I'd suggest looking for those used in the bioinformatics communities as you're more likely to find people with experience in them in the lab. Have a look at Nexflow.
I'd also be interested in trying to move your workflow to Galaxy. Think of Galaxy as a web front-end for an HPC.

@constantinpape
Copy link
Member Author

Thanks @jkh1. I had a quick look at Nextflow yesterday, and I am not so sure if it's such a good fit for us: the workflow specification look rather static and for us python is a pretty big advantage over groovy, because it makes integration with our tools easier.
That being said, we should have a closer look.

Galaxy is on our "to-explore list", see #71 (comment). Once we have a first overview, it would be great to hear more of your or Yi's perspective.

@Tomaz-Vieira
Copy link
Contributor

Tomaz-Vieira commented Jun 8, 2020

As I was looking through Galaxy, I stumbled this, which I think is very relevant:

Common Workflow Language (CWL)

CWL
Language YAML (wraps executables rather than libraries/function calls)
Extendability Workflows can be composed and reused. Spec can use vendor extensions
Activity (github) 2 merged PRs last week for the spec repo. The reference runner (cwltool) and Galaxy (which cna consume cwl files) are very active
Executors There are several implementations of the spec. Galaxy has partial support, Airflow should be production-ready
Non-python compat Good, runs arbitrary binaries
One-shot / server Implementation-dependent. Cwltool seems to be one-shot, the others are mostly servers

CWL is a specification for running workflows, written in YAML. It is a mechanism for wrapping command line executions, defining its inputs, outputs, environment, required resources and dependencies between steps of a workflow, and it has enough room in its configuration space to have runs be reproducible. There are multiple implementations of the standard, including a reference implementation built by the people responsible for the specification, which can be used to run workflows locally. Other implementations claim to be able to run workflows on top of pretty much anything (Slurm, Torque, Openstack, AWS, Azure). There seems to be decent adoption of the spec in the genomics communities.

It seems to me like this would be the most flexible approach (at least when compared to committing to Galaxy and their custom XML workflow descriptions) , since it wouldn't tie anyone to any particular service/framework/runner, and instead the users could choose the one they prefer (including, potentially, Galaxy, which is what I was supposed to look into in the first place =P )

@jkh1
Copy link

jkh1 commented Jun 8, 2020

I support going with CWL. CWL is promoted by EOSC-Life at least as a standard for capturing adequate descriptions of workflows. It has indeed some traction in bioinformatics where it can be seen as competing with snakemake and nextflow.

to run workflows on top of pretty much anything

This is not limited to CWL. Galaxy, snakemake and nextflow can do this.

@emilmelnikov
Copy link
Member

emilmelnikov commented Jun 12, 2020

Dagster

implementation language Python
extendability tasks are composable, pipelines are not
activity (on github) 1.8k stars, 112 active issues last week (76 closed + 36 new), 1 merged PR last week
support for different executors k8s, Celery, dask, Airflow, AWS, GCP; seems possible to implement a custom one
non-python compatibility / how to run generic executables Using subprocess, some integrations exist (e.g. for bash)
one-shot / server both (uses executors as servers)

Dagster's main idea is to have a functional pipeline description solution, that is, it requires to define data dependencies between tasks, not the execution order. All environment-related side-effects (e.g. sending a notification or updating a status table) should be moved out of tasks, so that they could be changed depending on environment. All writes to external sources (filesystem, database, etc.) should be described in the task. The framework also encourages users to check quality of the data as it flows through the system, using type annotations and validations. Conditional execution is supported: conditionally yield in the task body, but connect all outputs in the pipeline definition. Tasks can be generated programmatically. Also, tasks can be nested. Failed tasks can be re-executed (as least from UI).

Example

example_pipeline.py

import os

import numpy
import scipy.ndimage

from dagster import execute_pipeline, pipeline, solid


# Define a task. "context" is a required parameter.
@solid
def create_data(context):
    data = numpy.random.default_rng(42).random((300, 200))
    context.log.info(f"created random data with shape {data.shape}")
    # Shortcut for "yield dagster.Output(data, name="result")"
    # Can also return multiple outputs just by using multiple "yield" statements.
    return data


# Type annotations are used as shortcuts for input/output definitions.
@solid
def gaussian(context, data, *, sigma: float):
    return scipy.ndimage.gaussian_filter(data, sigma)


@solid
def difference(context, a, b):
    return a - b


# Pipeline definition: just call solids without "context" as if they are functions.
@pipeline
def my_pipeline():
    data = create_data()
    # Aliases are needed to create 2 different DAG nodes from the same solid.
    # Note that "sigma" parameters are missing; they are read from external config.
    feature1 = gaussian.alias("gaussian_1")(data)
    feature2 = gaussian.alias("gaussian_2")(data)
    difference(feature1, feature2)

config.yaml

# Automatically save intermediate results in a filesystem
storage:
  filesystem:

# Configuration for unspecified values.
# Could be also configured from Python with dicts.
# The structure depends on the specific task, but one can define a config schema if needed.
solids:
  gaussian_1:
    inputs:
      sigma:
        value: 5.0
  gaussian_2:
    inputs:
      sigma:
        value: 3.0

Invocation: dagster pipeline execute -f example_pipeline.py -p my_pipeline -e config.yaml

It's also possible to run this inside a Python script, or start a dagit web UI and click "Execute" there.

dagit UI (shows an execution error of the same pipeline):

image

@emilmelnikov
Copy link
Member

emilmelnikov commented Jun 12, 2020

Prefect

implementation language Python
extendability tasks are composable, pipelines are not
activity (on github) 2.6k stars, 29 active issues last week (15 closed + 14 new), 30 active PRs last week (23 merged + 7 proposed)
support for different executors dask, k8s, AWS Fargate, custom environment
non-python compatibility / how to run generic executables ShellTask + a lot of integrations
one-shot / server both (has server backend with dashboard and DB)

Prefect is "Airflow 2.0", started by people from the original Airflow team. Tasks are connected with data dependencies, and could be parametrized. There is a rich notion of a task state and conditions on which a task is supposed to be run. A task is run depending on it's trigger: it could be triggered by inbound tasks, or always run (suitable for shutting down things or clearing resources). Reference tasks define the success state of the whole pipeline (e.g. if all tasks are OK, but the server failed to shut down, consider it a success). Tasks can also be retried with the specified delay, skipped or paused until explicitly resumed (from within the task itself).

Example

example_flow.py

import numpy
import scipy.ndimage
import prefect

from prefect import task, Flow, Parameter


# Define a task.
# Tasks can also be defined as classes.
@task
def create_data():
    data = numpy.random.default_rng(42).random((300, 200))
    logger = prefect.context.get("logger")
    logger.info(f"created random data with shape {data.shape}")
    return data


@task
def gaussian(data, *, sigma: float):
    return scipy.ndimage.gaussian_filter(data, sigma)


@task
def difference(a, b):
    return a - b


# Functional way to create a pipeline.
# There is also a verbose imperative way.
with Flow("My Flow") as flow:
    data = create_data()
    sigma1 = Parameter("sigma1")
    sigma2 = Parameter("sigma2")
    feature1 = gaussian(data, sigma=sigma1)
    feature2 = gaussian(data, sigma=sigma2)
    difference(feature1, feature2)


# Run the flow with parameters.
state = flow.run(sigma1=3.0, sigma2=5.0)
assert state.is_successful()

# This will register the workflow with the server.
# flow.register()

Invocation: just run with python example_flow.py.

Web UI is packaged as docker/docker-compose (server, PostgreSQL DB, GraphQL engine) with the total size of ~3GB.

@constantinpape
Copy link
Member Author

constantinpape commented Jun 12, 2020

Thanks for posting the two other frameworks @emilmelnikov. I think we can start to vote now which tools we should further explore by implementing the example workflow (I am working on this and will share it soon).

  • Airflow 👍
  • Snakemake 👀
  • Metaflow 😄
  • Galaxy 🎉
  • Nextflow 😕
  • Dagster ❤️
  • Prefect 🚀

Just use the symbol to vote for each tool. I would say everyone has 3 votes and in the end we should at least go with one of the python tools (Airflow, Metaflow, Dagster, Prefect) and one of the non-python tools (Snakemake, Galaxy, Nextflow).

@jkh1
Copy link

jkh1 commented Jun 12, 2020

Do you have a list of features you require? Does it have to be python-based? I would suggest to look more into those already widely used in the EMBL and wider bioinformatics communities. It seems to me that many of the features listed for the last two are available from at least some of the others but maybe would need to be looked at in more details there. Also I am thinking here about interoperability and who's going to maintain and/or re-use this.
Also on the Galaxy/CWL front, there is no CWL runner for Galaxy, it's in the works but won't be ready any time soon. What will be soon available is a CWL description of a Galaxy workflow.

@constantinpape
Copy link
Member Author

constantinpape commented Jun 12, 2020

Do you have a list of features you require?

A (probably not exhaustive) list of requirements:

  • extendable job runner specs to deploy locally, on slurm, some cloud solution etc.
  • easy to integrate with existing python code base: ilastik / cloud-ilastik. large data processing tools.
  • robust to partial job failures; easy to rerun when intermediate results change
  • good logging / monitoring capabilities (e.g web dashboards like offered by luigi or airflow)
  • support for interactivity in the context of cloud-ilastik (we are not sure ourselves yet how exactly this will look)
  • it should be developer friendly, i.e. no need to containerize things to run

Does it have to be python-based?

It's not strictly necessary, but given that everything we need to support is in python, it would make a lot of sense. If it's not python based, the additional boilerplate code should really be minimal.
Also, given that we are all python developers, this is a big maintainability plus.

I would suggest to look more into those already widely used in the EMBL and wider bioinformatics communities. It seems to me that many of the features listed for the last two are available from at least some of the others but maybe would need to be looked at in more details there. Also I am thinking here about interoperability and who's going to maintain and/or re-use this.

Yes, it would be good to have interoperability with these tools.
However, my initial impression when looking at something like Galaxy or snakemake was always that these don't really fit our use cases well and that it would require a lot of boilerplate to port one of my more complex workflows, that contain iterative or recursive job specifications, to this.

In any case, our current plan is to make a prototype implementation for an example workflow (still work in progress) with a couple of tools to actually quantify this.

Also on the Galaxy/CWL front, there is no CWL runner for Galaxy, it's in the works but won't be ready any time soon. What will be soon available is a CWL description of a Galaxy workflow.

Thanks, that's good to know!

@constantinpape
Copy link
Member Author

constantinpape commented Jul 7, 2020

Mini Hackathon Summary

We had a mini hackathon where @emilmelnikov @Tomaz-Vieira @k-dominik and @m-novikov worked on implementing the example workflow I put together in some of the tools we discussed here.
The proof of concept implementations are here: https://github.com/constantinpape/cloud-ilastik-example-project/pulls.

Tools

We have tried the following tools:

  • @m-novikov: prefect: implementing the workflow was relatively straight forward, the code is easy to understand, it is possible to also implement the "scatter-gather" style tasks in prefect. One remark by Maxim: it would be nice to hide some of the complexity of tasks like connected components in some sort of "meta node", there seems to be a subflow concept for this, but seems like it's still in the discussion stage. Maxim hasn't tried actually running in parallel mode yet.
  • @emilmelnikov: dagster: implementing the high level dependencies of the workflow is easy (i.e. "Ilastik Prediction" -> "Threshold" ...), for parallelization this still uses a threadpool. Emil thinks that using dagster also for the parallelization part is possible but probably not a perfect fit.
  • @k-dominik: galaxy: setting up a dev instance of galaxy was quite some work; the galaxy lingo is pretty different from what we are used to. There is a galaxy instance running on the EMBL server, but in order to deploy something there it needs to go through an admin of the instance.
  • @Tomaz-Vieira: cwl: yaml based workflow description, supposed to be portable. Default runner seems quite limited in features, but is easy to use for development. Most sophisticated implementation seems to be Arvados. Not meant to express control flow (loops, branching). Can express software environment in which workflows should be run, but how much the implementations will honor those is still unexplored.

Of course all conclusions are preliminary after only a few hours of working on this.

Discussion

High-level vs low-level orchestration

We had a rather long discussion about the following point:
There are two separate concepts:

  • Orchestration of the high-level workflow (= task inter-dependencies)
  • Orchestration of (parallel) deployment of the actual computation

In principle both could be handled in the same large computational graph, but it's also possible to split it into one graph for the high-level workflow and separate graphs for the tasks.
Having it in one big graph would provide most flexibility on how the computation is run:

  • Several block-wise (embarassingly parallel) tasks could be piped from one gather task to the next and one could perform all operation for one block without waiting for all blocks finishing for a given tasks.
  • This allows quite elegant error handling, because one can just stop processing the blocks which failed (and if properly implemented even exclude them from gather tasks and continue afterwards)

However, this approach also comes with a down-sides:

  • Most workflow management tools seem to be designed with mostly high-level orchestration in mind.
  • Having too many nodes in the computational graph might incur a large cost.
  • Having all in one framework would make it harder to incorporate other tools, e.g. Fiji. It's possible, but would def. break the computational graph.

The fact that we deal with large 3d volumes, where parallelizing (non-trivial) computation creates dependencies between the tasks (through scatter-gather tasks and dependencies of errors) that are not there for applications where one mostly needs to parallelize over independent files (e.g. a lot of 2d images).

For the first approach (big computational graph), it looks like prefect is the way to go.
For the second approach (separate computational graphs), it would probably be best to use a tool more tailored for deploying computations (dask!) for deployment/paralellization and then handle dependencies with one of the workflow managers (we could still go with prefect; but in this case it might also not be so hard to switch the workflow manager).

Scope

What usage modes / application do we actually want to support? Who is the target audience?

  • Port the algorithms from cluster_tools so that they are maintainable and usable by others. More general, create framework for large 3d image processing for members of the lab and other developers.
  • Provide computational backend for cloud_ilastik. Only for the (static) batch processing use case or also for the interactive use case?
  • Make large 3d image processing available to users at EMBL (and HBP) through a convenient tool.

Next Steps

Implement the reference workflow with dask, figure out if / how we can implement some of the additional requirements with dask, e.g. handling computation failure.
Figure out the scope ;).

@jkh1
Copy link

jkh1 commented Jul 7, 2020

The distinction between low/high level is what I was trying to get at because indeed some workflow management systems are designed for the high level case of chaining tools together. If the workflow is embedded in the application itself then you need lower level management which usually means interfacing with the programming language used which some wfm (e.g. for python) do.

Regarding scope and audience, systems like Galaxy are meant to make HPC accessible to non computer-savvy users. It is essentially a server providing a GUI around tools to compose workflows and transparently dispatch them to an HPC backend. There's a bit of infrastructure admin involved but then I believe this scales better in terms of number of users and it is not limited to one scientific domain which means that one EMBL instance could serve all of EMBL. This should be a consideration if support is handed over to some EMBL support team as we don't want to scatter support over too many different systems. If the wfm is to be embedded in the application then obviously it's for the developers to support :)

@constantinpape
Copy link
Member Author

Here's my thinking after digesting the discussion today a bit more:
For my use-cases the option of separating low-level + high-level computation would make more sense.
I think writing some logic around the low level computation framework (most likely dask) for error handling etc. and working on integrating it nicely with the high-level framework is more feasible then having the all-in-one framework solution (for which as far as I can see none of the workflow managers is really made).
This has the additional benefit that it makes the whole approach more modular and it might be possible to support more than one high level framework (at least for some use cases).
I think this model would fit the ilastik batch prediction use case pretty well too.

However, it is not a good fit for the interactive ilastik use-case. But I think the requirements for this use-case is so different that we would need to end up with different systems anyways. If we could use dask for the interactive ilastik backend, we could still share functionality though. Let's see.

Regarding scope and audience, systems like Galaxy are meant to make HPC accessible to non computer-savvy users. It is essentially a server providing a GUI around tools to compose workflows and transparently dispatch them to an HPC backend. There's a bit of infrastructure admin involved but then I believe this scales better in terms of number of users and it is not limited to one scientific domain which means that one EMBL instance could serve all of EMBL. This should be a consideration if support is handed over to some EMBL support team as we don't want to scatter support over too many different systems. If the wfm is to be embedded in the application then obviously it's for the developers to support :)

I think the requirements for "More general, create framework for large 3d image processing for members of the lab and other developers." and "Make large 3d image processing available to users at EMBL (and HBP) through a convenient tool." are indeed very different.

For the former Galaxy (and also cwl) are not a good fit.
There model of execution is just not a good fit for developing and deploying large-scale research code, where neither dependencies nor the steos workflow are fixed.

On the other hand, it could make sense to port some more stable workflows to Galaxy to make them more easily available.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

6 participants