Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Break out or change the scheduler part #12

Open
druvus opened this issue Jan 23, 2018 · 1 comment
Open

Break out or change the scheduler part #12

druvus opened this issue Jan 23, 2018 · 1 comment

Comments

@druvus
Copy link
Collaborator

druvus commented Jan 23, 2018

We discussed earlier to find a better way of submitting/distributing jobs. I have used snakemake quite a lot and I think it could be an option to use their API instead to get a more stable solution. They have support for several schedulers plus kubernetes.

I include a small example I found below

#!/usr/bin/env python3
"""
rule all:
    input:
        "reads.counts"
rule unpack_fastq:
    '''Unpack a FASTQ file'''
    output: "{file}.fastq"
    input: "{file}.fastq.gz"
    resources: time=60, mem=100
    params: "{file}.params"
    threads: 8
    log: 'unpack.log'
    shell:
        '''zcat {input} > {output}
        echo finished 1>&2 {log}
        '''
rule count:
    '''Count reads in a FASTQ file'''
    output: counts="{file}.counts"
    input: fastq="{file}.fastq"
    run:
        n = 0
        with open(input.fastq) as f:
            for _ in f:
                n += 1
        with open(output.counts, 'w') as f:
            print(n / 4, file=f)
"""

In pure python this is equivalent to the following code.

workflow.include("pipeline.conf")

shell.prefix("set -euo pipefail;")

@workflow.rule(name='all', lineno=6, snakefile='.../Snakefile')
@workflow.input("reads.counts")
@workflow.norun()
@workflow.run
def __all(input, output, params, wildcards, threads, resources, log, version):
    pass


@workflow.rule(name='unpack_fastq', lineno=17, snakefile='.../Snakefile')
@workflow.docstring("""Unpack a FASTQ file""")
@workflow.output("{file}.fastq")
@workflow.input("{file}.fastq.gz")

@workflow.resources(time=60, mem=100)
@workflow.params("{file}.params")
@workflow.threads(8)
@workflow.log('unpack.log')
@workflow.shellcmd(
    """zcat {input} > {output}
        echo finished 1>&2 {log}
        """
)
@workflow.run
def __unpack_fastq(input, output, params, wildcards, threads, resources, log, version):
    shell("""zcat {input} > {output}
        echo finished 1>&2 > {log}
        """
)


@workflow.rule(name='count', lineno=52, snakefile='.../Snakefile')
@workflow.docstring("""Count reads in a FASTQ file""")
@workflow.output(counts = "{file}.counts")
@workflow.input(fastq = "{file}.fastq")
@workflow.run
def __count(input, output, params, wildcards, threads, resources, log, version):
    n = 0
    with open(input.fastq) as f:
        for _ in f:
            n += 1
    with open(output.counts, 'w') as f:
        print(n / 4, file=f)


### End of output from snakemake --print-compilation


workflow.check()
print("Dry run first ...")
workflow.execute(dryrun=True, updated_files=[])
print("And now for real")
workflow.execute(dryrun=False, updated_files=[], resources=dict())

Another option that I have used earlier is ipython-cluster-helper but it probably other options available.

@RickardSjogren
Copy link
Contributor

My suggestion is to sub-class or change the PipelineGenerator-class. That class parses the config-files and should contain all information needed to output a snakemake workflow. Currently new_pipeline_collection outputs a dict of script-strings, so an alternative to that function should be enough. The workflow DAG:s from our work will be very simple.

A simple executor-class is needed as well to start the pipeline and format the output to something the optimizer can use.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants