Skip to content

Commit

Permalink
Merge pull request #35 from Sage-Bionetworks-Workflows/ORCA-297-updat…
Browse files Browse the repository at this point in the history
…e-demo

[ORCA-297] Fixing tutorial demo (`demo.py` and documentation)
  • Loading branch information
jaymedina authored Jan 18, 2024
2 parents 433e7ae + 0517118 commit d15c5b7
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 11 deletions.
13 changes: 10 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ This Python package provides the components to connect various third-party servi

## Demonstration Script

This repository includes a demonstration script called [`demo.py`](demo.py), which showcases how you can use `py-orca` to launch and monitor your workflows on Nextflow Tower. Specifically, it illustrates how to process an RNA-seq dataset using a series of workflow runs, namely `nf-synstage`, `nf-core/rnaseq`, and `nf-synindex`. `py-orca` can be used with any Python-compatible workflow management system to orchestrate each step (_e.g._ Airflow, Prefect, Dagster). The demonstration script uses [Metaflow](https://metaflow.org/) because it's easy to run locally and has an intuitive syntax.
This repository includes a demonstration script called [`demo.py`](demo.py), which showcases how you can use `py-orca` to launch and monitor your workflows on Nextflow Tower. Specifically, it illustrates how to process an RNA-seq dataset using a series of workflow runs, namely `nf-synapse/synstage`, `nf-core/rnaseq`, and `nf-synindex`. `py-orca` can be used with any Python-compatible workflow management system to orchestrate each step (_e.g._ Airflow, Prefect, Dagster). The demonstration script uses [Metaflow](https://metaflow.org/) because it's easy to run locally and has an intuitive syntax.

The script assumes that the following environment variables are set. Before setting them up, ensure that you have an AWS profile configured for a role that has access to the dev/ops tower workspace you plan to launch your workflows from. You can set these environment variables using whatever method you prefer (_e.g._ using an `.env` file, sourcing a shell script, etc).
Refer to [`.env.example`](.env.example) for the format of their values as well as examples.
Expand All @@ -22,7 +22,7 @@ Refer to [`.env.example`](.env.example) for the format of their values as well a
- `SYNAPSE_CONNECTION_URI`
- `AWS_PROFILE` (or another source of AWS credentials)

Once your environment is set, you can create a virtual environment, install the Python dependencies, and run the demonstration script (after downloading it) as follows. Note that you will need to update the `s3_prefix` parameter so that it points to an S3 bucket that is accessible to your Tower workspace.
Once your environment variables are set, you can create a virtual environment, install the Python dependencies, and run the demonstration script (after downloading it) as follows. Note that you will need to update the `s3_prefix` parameter so that it points to an S3 bucket that is accessible to your Tower workspace.

### Creating and setting up your py-`orca` virtual environment and executing `demo.py`

Expand All @@ -34,11 +34,18 @@ source venv/bin/activate

# Install Python dependencies
python3 -m pip install 'py-orca[all]' 'metaflow' 'pyyaml' 's3fs'
```

Before running the example below, ensure that the `s3_prefix` points to an S3 bucket your Nextflow `dev`
or `prod` tower workspace has access to. In the example below, we will point to the `example-dev-project-tower-scratch` S3 bucket because we will be launching our workflows within the
`example-dev-project` workspace in `tower-dev`.
```bash
# Run the script using an example dataset
python3 demo.py run --dataset_id 'syn51514585' --s3_prefix 's3://orca-service-test-project-tower-bucket/outputs'
python3 demo.py run --dataset_id 'syn51514585' --s3_prefix 's3://example-dev-project-tower-scratch/work'
```

Once your run takes off, you can follow the output logs in your terminal, or stay updated with your workflow progress on the web client. Be sure that your `synstage` workflow run has a unique name, and is not an iteration of a previous run (i.e. `my_test_dataset_synstage_2`, `my_test_dataset_synstage_3`, and so on). This is because the `demo.py` script does not currently support being able to locate the staged samplesheet file if it has been staged under a run name that is non-unique.

The above dataset ID ([`syn51514585`](https://www.synapse.org/#!Synapse:syn51514585)) refers to the following YAML file, which should be accessible to Sage employees. Similarly, the samplesheet ID below ([`syn51514475`](https://www.synapse.org/#!Synapse:syn51514475)) should also be accessible to Sage employees. However, there is no secure way to make the output folder accessible to Sage employees, so the `synindex` step will fail if you attempt to run this script using the example dataset ID. This should be sufficient to get a feel for using `py-orca`, but feel free to create your own dataset YAML file on Synapse with an output folder that you own.

```yaml
Expand Down
19 changes: 11 additions & 8 deletions demo.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,14 @@ def get_run_name(self, suffix: str) -> str:
return f"{self.id}_{suffix}"

def synstage_info(self, samplesheet_uri: str) -> LaunchInfo:
"""Generate LaunchInfo for nf-synstage."""
"""Generate LaunchInfo for nf-synapse/synstage."""
run_name = self.get_run_name("synstage")
return LaunchInfo(
run_name=run_name,
pipeline="Sage-Bionetworks-Workflows/nf-synstage",
pipeline="Sage-Bionetworks-Workflows/nf-synapse",
revision="main",
profiles=["sage"],
profiles=["docker"],
entry_name="NF_SYNSTAGE",
params={
"input": samplesheet_uri,
},
Expand Down Expand Up @@ -124,13 +125,13 @@ class TowerRnaseqFlow(FlowSpec):
help="S3 prefix for storing output files from different runs",
)

def get_staged_samplesheet(self, samplesheet: str) -> str:
def get_staged_samplesheet(self, samplesheet: str, run_name: str) -> str:
"""Generate staged samplesheet based on synstage behavior."""
scheme, _, samplesheet_resource = samplesheet.partition("://")
if scheme != "s3":
raise ValueError("Expected an S3 URI.")
path = PurePosixPath(samplesheet_resource)
return f"{scheme}://{path.parent}/synstage/{path.name}"
return f"{scheme}://{path.parent}/synstage/{run_name}/{path.name}"

def monitor_workflow(self, workflow_id):
"""Monitor any workflow run (wait until done)."""
Expand Down Expand Up @@ -171,21 +172,23 @@ def transfer_samplesheet_to_s3(self):

@step
def launch_synstage(self):
"""Launch nf-synstage to stage Synapse files in samplesheet."""
"""Launch nf-synapse/synstage to stage Synapse files in samplesheet."""
launch_info = self.dataset.synstage_info(self.samplesheet_uri)
self.synstage_id = self.tower.launch_workflow(launch_info, "spot")
self.next(self.monitor_synstage)

@step
def monitor_synstage(self):
"""Monitor nf-synstage workflow run (wait until done)."""
"""Monitor nf-synapse/synstage workflow run (wait until done)."""
self.monitor_workflow(self.synstage_id)
self.next(self.launch_rnaseq)

@step
def launch_rnaseq(self):
"""Launch nf-core/rnaseq workflow to process RNA-seq data."""
staged_uri = self.get_staged_samplesheet(self.samplesheet_uri)
staged_uri = self.get_staged_samplesheet(
self.samplesheet_uri, self.dataset.get_run_name("synstage")
)
launch_info = self.dataset.rnaseq_info(staged_uri, self.rnaseq_outdir)
self.rnaseq_id = self.tower.launch_workflow(launch_info, "spot")
self.next(self.monitor_rnaseq)
Expand Down

0 comments on commit d15c5b7

Please sign in to comment.