Skip to content

Commit

Permalink
Merge pull request friendsofstrandseq#41 from friendsofstrandseq/dev
Browse files Browse the repository at this point in the history
2.2.2: fixes & docs
  • Loading branch information
weber8thomas authored Sep 22, 2023
2 parents a285af1 + 38150e5 commit d5c9745
Show file tree
Hide file tree
Showing 9 changed files with 162 additions and 56 deletions.
20 changes: 10 additions & 10 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@ repos:
# id: no-commit-to-branch
repo: https://github.com/pre-commit/pre-commit-hooks
rev: v3.4.0
- hooks:
- id: snakefmt
repo: https://github.com/snakemake/snakefmt
rev: 0.4.0
- hooks:
- id: commitizen
stages:
- commit-msg
repo: https://github.com/commitizen-tools/commitizen
rev: v2.17.12
# - hooks:
# - id: snakefmt
# repo: https://github.com/snakemake/snakefmt
# rev: 0.4.0
# - hooks:
# - id: commitizen
# stages:
# - commit-msg
# repo: https://github.com/commitizen-tools/commitizen
# rev: v2.17.12
24 changes: 16 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,14 @@ sequencing data. The starting point are single-cell FASTQ files from Strand-seq
3. Sorting, Deduplicating and Indexing of BAM files through [Samtools](http://www.htslib.org/) & [sambaba](https://lomereiter.github.io/sambamba/docs/sambamba-view.html)
4. Generating features and use [ashleys-qc](https://github.com/friendsofstrandseq/ashleys-qc) model to identify high-quality cells

---

**ℹ️ Important Note**

From 2.2.0, you don't need to clone both [ashleys-qc-pipeline preprocessing module](https://github.com/friendsofstrandseq/ashleys-qc-pipeline) and [mosaicatcher-pipeline](https://github.com/friendsofstrandseq/mosaicatcher-pipeline). By using `ashleys_pipeline_only=True` combined with `ashleys_pipeline=True` in the configuration of MosaiCatcher directly, this will stop the execution after the generation of the files related to ashleys-qc-pipeline. This allow you to use a single pipeline, repository and container and limit the potential conflicts by processing the same folder (data_location) by different repositories and set of files (including the workflow/data/ref_genomes references files).

---

# Quick Start on example data

0. [Optional] Install [Singularity](https://www.sylabs.io/guides/3.0/user-guide/)
Expand Down Expand Up @@ -216,11 +224,11 @@ The list of parameters is available through the command: `snakemake -c1 --config
### EMBL parameters

| Parameter | Comment | Parameter type | Default |
| ------------------------ | ----------------------------------------------------------------------------------------------------------------------------------- | -------------- | --------------------------------------------- | --- |
| ------------------------ | ----------------------------------------------------------------------------------------------------------------------------------- | -------------- | --------------------------------------------- |
| `genecore` | Enable/disable genecore mode to give directly the genecore run folder (genecore_date_folder) | Boolean | False |
| `genecore_date_folder` | Genecore folder name to be process (Ex: "2022-11-02-H372MAFX5") | String | "" |
| `genecore_prefix` | Genecore prefix name to retrieve the date folder specified | String | "/g/korbel/STOCKS/Data/Assay/sequencing/2023" |
| `genecore_regex_element` | Regex element used to distinguish cell name from cell number (Ex: "PE20 or iTRUE | PE20") | String | "" |
| `genecore_regex_element` | Regex element used to distinguish cell name from cell number (Ex: "PE20 or iTRUE | PE20") | String |
| `samples_to_process` | List of samples to be processed in the folder (default: all samples ; sample is defined by the name between "\*\_lane1" and "PE20") | List | [] |

## Snakemake arguments
Expand Down Expand Up @@ -362,12 +370,12 @@ Selected libraries BAM files can be retrieved at the path above and can be used
- [x] replace `input_bam_location` by `data_location` (harmonization with [mosaicatcher-pipeline](https://github.com/friendsofstrandseq/mosaicatcher-pipeline.git)) ([1.3.1](https://github.com/friendsofstrandseq/ashleys-qc-pipeline/releases/tag/1.3.1))
- [x] Ashleys custom threshold parameter ([1.3.4](https://github.com/friendsofstrandseq/ashleys-qc-pipeline/releases/tag/1.3.4))
- [x] Aesthetic start + mail logging ([1.3.5](https://github.com/friendsofstrandseq/ashleys-qc-pipeline/releases/tag/1.3.5))
- [x] Plate plot ([1.3.5](https://github.com/friendsofstrandseq/ashleys-qc-pipeline/releases/tag/1.3.5)
- [x] Jupyter Notebook update ([1.3.6](https://github.com/friendsofstrandseq/ashleys-qc-pipeline/releases/tag/1.3.6)
- [x] List of commands available through list_commands parameter ([1.3.6](https://github.com/friendsofstrandseq/ashleys-qc-pipeline/releases/tag/1.3.6)
- [x] `FastQC_analysis` boolean `GC_rowcol_analysis` parameters to enable/disable optional modules ([1.3.6](https://github.com/friendsofstrandseq/ashleys-qc-pipeline/releases/tag/1.3.6)
- [x] publishdir: If specified, will copy important data (stats, plots, counts file) to a second place ([1.4.1](https://github.com/friendsofstrandseq/ashleys-qc-pipeline/releases/tag/1.4.1)
- [x] MultiQC: If specified, will trigger MultiQC to generate a QC report using FastQC, samtools stats & flagstats ([2.1.1](https://github.com/friendsofstrandseq/ashleys-qc-pipeline/releases/tag/2.1.1)
- [x] Plate plot ([1.3.5](https://github.com/friendsofstrandseq/ashleys-qc-pipeline/releases/tag/1.3.5))
- [x] Jupyter Notebook update ([1.3.6](https://github.com/friendsofstrandseq/ashleys-qc-pipeline/releases/tag/1.3.6))
- [x] List of commands available through list_commands parameter ([1.3.6](https://github.com/friendsofstrandseq/ashleys-qc-pipeline/releases/tag/1.3.6))
- [x] `FastQC_analysis` boolean `GC_rowcol_analysis` parameters to enable/disable optional modules ([1.3.6](https://github.com/friendsofstrandseq/ashleys-qc-pipeline/releases/tag/1.3.6))
- [x] publishdir: If specified, will copy important data (stats, plots, counts file) to a second place ([1.4.1](https://github.com/friendsofstrandseq/ashleys-qc-pipeline/releases/tag/1.4.1))
- [x] MultiQC: If specified, will trigger MultiQC to generate a QC report using FastQC, samtools stats & flagstats ([2.1.1](https://github.com/friendsofstrandseq/ashleys-qc-pipeline/releases/tag/2.1.1))

### Experimental feature: hand-selection of cells via Jupyter notebook

Expand Down
119 changes: 99 additions & 20 deletions afac/watchdog_ashleys.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,15 @@
# Set the path you want to watch
path_to_watch = sys.argv[1]

data_location = "/scratch/tweber/DATA/MC_DATA/STOCKS"
data_location = "/scratch/tweber/DATA/MC_DATA/STOCKS_DEV"
publishdir_location = "/g/korbel/WORKFLOW_RESULTS"
genecore_prefix = path_to_watch
profile_slurm = ["--profile", "workflow/snakemake_profiles/HPC/slurm_EMBL/"]
profile_dry_run = ["--profile", "workflow/snakemake_profiles/local/conda/"]
dry_run_options = ["-c", "1", "-n", "-q"]
snakemake_binary = "/g/korbel2/weber/miniconda3/envs/snakemake_latest/bin/snakemake"


# plates_processing_status = pd.read_csv("watchdog/processing_status.json", sep="\t")
# print(plates_processing_status)

Expand All @@ -49,11 +50,13 @@ def check_unprocessed_folder(self):
list_runs_processed = sorted([e for e in os.listdir(data_location) if e not in unwanted])
total_list_runs = sorted([e for e in os.listdir(path_to_watch) if e not in unwanted])
unprocessed_plates = set(total_list_runs).difference(list_runs_processed)
# for plate in ["2023-07-10-HLGVJAFX5"]:
for plate in unprocessed_plates:
# if plate not in plates_processing_status["plate"].values.tolist():
# plates_processing_status_plate_dict = collections.defaultdict(dict)
nb_txt_gz_files = len(glob.glob(f"{path_to_watch}/{plate}/*.txt.gz"))
if nb_txt_gz_files == 576:
# if nb_txt_gz_files == 576:
if (nb_txt_gz_files % 192) == 0:
print(f"PROCESSING {path_to_watch}/{plate}")
self.process_new_directory(f"{path_to_watch}/{plate}")
else:
Expand All @@ -67,12 +70,12 @@ def process_new_directory(self, directory_path):
start_time = time.time()

while True:
# Count the number of .txt.gz files in the new directory
# # Count the number of .txt.gz files in the new directory
txt_gz_files = glob.glob(directory_path + "/*.txt.gz")
num_files = len(txt_gz_files)

# If the desired number of files is found or timeout is reached, break the loop
if num_files == 576 or time.time() - start_time > timeout:
# # If the desired number of files is found or timeout is reached, break the loop
if (num_files % 192) != 0 or time.time() - start_time > timeout:
break

# Sleep for a while before the next poll
Expand All @@ -84,7 +87,7 @@ def process_new_directory(self, directory_path):
def process_txt_gz_files(self, directory_path, txt_gz_files, num_files):
"""Process the found .txt.gz files and execute snakemake command if conditions are met."""

if num_files == 576:
if (num_files % 192) == 0:
logging.info(f"The new directory contains exactly 576 .txt.gz files.")
self.execute_snakemake(directory_path, txt_gz_files)

Expand All @@ -93,24 +96,46 @@ def process_txt_gz_files(self, directory_path, txt_gz_files, num_files):

def execute_snakemake(self, directory_path, txt_gz_files):
"""Execute the snakemake command based on the found prefixes."""
pattern = re.compile(r"_lane1(.*?)(iTRU|PE20)(.*?)([A-H]?)(\d{2})(?:_1_|_2_)")
prefixes = list()

pattern = re.compile(r"(iTRU|PE20)\d{3}")
prefixes = set()

for file_path in txt_gz_files:
for file_path in sorted(txt_gz_files):
match = pattern.search(file_path)
# print(file_path, match)
if match:
prefix = match.group()[:4] # Get the first 4 characters, which is the prefix
prefixes.add(prefix)

if len(prefixes) > 1:
prefix = match.group(2)
# print(sample_name)
# prefix = match.group(2) + match.group(4) + match.group(5) # Concatenate the prefix, optional letter, and two digits
prefixes.append(prefix)
# indexes.add(index)
# pattern = re.compile(r"(iTRU|PE20)\d{3}")
# prefixes = set()
#
# for file_path in txt_gz_files:
# match = pattern.search(file_path)
# print(file_path)
# if match:
# prefix = match.group()[:4] # Get the first 4 characters, which is the prefix
# prefixes.add(prefix)

if len(set(prefixes)) > 1:
logging.info("Multiple different prefixes found: %s", prefixes)
elif prefixes:
self.execute_command(directory_path, prefixes.pop())
for j, file_path in enumerate(sorted(txt_gz_files)):
if (j + 1) % 192 == 0:
match = pattern.search(file_path)
sample_name = match.group(1)
cell = f"{sample_name}{prefixes[0]}{match.group(3)}{match.group(4)}96"
# print(file_path, j, match, sample_name, cell)
# print([match.group(i) for i in range(6)])
self.execute_command(directory_path, prefixes[0], sample_name)

# Debug/dev purpose - target a specific file
# self.execute_command(directory_path, prefixes[0], sample_name, cell)
else:
logging.info("No match found in any file.")

def execute_command(self, directory_path, prefix):
def execute_command(self, directory_path, prefix, sample, cell=None):
"""Execute the command."""

# Change directory and run the snakemake command
Expand All @@ -123,6 +148,7 @@ def execute_command(self, directory_path, prefix):
f"genecore_prefix={genecore_prefix}",
f"genecore_date_folder={date_folder}",
f"genecore_regex_element={prefix}",
f'samples_to_process=["{sample}"]',
"multistep_normalisation=True",
"MultiQC=True",
"split_qc_plot=False",
Expand All @@ -132,6 +158,14 @@ def execute_command(self, directory_path, prefix):
"--nolock",
]

if cell:
cmd = cmd + [
f"{data_location}/{date_folder}/{sample}/multiqc/fastqc/{cell}_1_fastqc.html",
"--rerun-triggers",
"mtime",
"--force",
]

logging.info("Running command: %s", " ".join(cmd + profile_dry_run + dry_run_options))

process = subprocess.Popen(
Expand All @@ -153,13 +187,25 @@ def execute_command(self, directory_path, prefix):

# Check the penultimate line
if str(process.returncode) == str(0):
self.run_second_command(cmd, profile_slurm, data_location, date_folder)
self.run_second_command(cmd, profile_slurm, data_location, date_folder, sample, cell)
else:
logging.info("\nThe output is not as expected.")

def run_second_command(self, cmd, profile_slurm, data_location, date_folder):
def run_second_command(self, cmd, profile_slurm, data_location, date_folder, sample, cell=None):
"""Run the second command and write the output to a log file."""

report_location = f"{publishdir_location}/{date_folder}/{sample}/reports/{sample}_ashleys-qc-pipeline_report.zip"
report_options = [
"--report",
f"{report_location}",
"--report-stylesheet",
"/g/korbel2/weber/workspace/mosaicatcher-update/workflow/report/custom-stylesheet.css",
]

pipeline = "ashleys-qc-pipeline"

# print(cmd + profile_slurm + report_options)

logging.info("\nThe output is as expected.")
logging.info("Running command: %s", " ".join(cmd + profile_slurm))

Expand All @@ -171,12 +217,45 @@ def run_second_command(self, cmd, profile_slurm, data_location, date_folder):
# Convert it to a string
current_time = now.strftime("%Y%m%d%H%M%S")

with open(f"watchdog/logs/per-run/{date_folder}_{current_time}.log", "w") as f:
process2 = subprocess.Popen(cmd + profile_slurm, stdout=f, stderr=f, universal_newlines=True)
with open(f"watchdog/logs/per-run/{date_folder}_{pipeline}_{current_time}.log", "w") as f:
process2 = subprocess.Popen(cmd + profile_dry_run, stdout=f, stderr=f, universal_newlines=True)
# process2 = subprocess.Popen(cmd + profile_slurm, stdout=f, stderr=f, universal_newlines=True)
process2.wait()

logging.info("Return code: %s", process2.returncode)

logging.info("Generating ashleys report.")
os.makedirs(os.path.dirname(report_location), exist_ok=True)
# os.makedirs(f"{publishdir_location}/{date_folder}/{sample}/reports/", exist_ok=True)
logging.info("Running command: %s", " ".join(cmd + profile_slurm + report_options))
# Change the permissions of the new directory
# subprocess.run(["chmod", "-R", "777", f"{data_location}/{date_folder}"])

with open(f"watchdog/logs/per-run/{date_folder}_{pipeline}_{current_time}_report.log", "w") as f:
print(cmd + profile_slurm + report_options)
process2 = subprocess.Popen(cmd + profile_dry_run + report_options, stdout=f, stderr=f, universal_newlines=True)
# process2 = subprocess.Popen(cmd + profile_slurm + report_options, stdout=f, stderr=f, universal_newlines=True)
process2.wait()

logging.info("Return code: %s", process2.returncode)

# ZIPFILE

import zipfile

# Check if the file exists and is a valid zip file
if zipfile.is_zipfile(report_location):
# Specify the directory where you want to extract the contents
# If you want to extract in the same directory as the zip file, just use the parent directory
extract_location = f"{publishdir_location}/{date_folder}/{sample}/reports/"

# Extract the zip file
with zipfile.ZipFile(report_location, "r") as zip_ref:
zip_ref.extractall(extract_location)
print(f"Extracted the archive to {extract_location}")
else:
print(f"{report_location} is not a valid zip file.")

# Change the permissions of the new directory
subprocess.run(["chmod", "-R", "777", f"{data_location}/{date_folder}"])

Expand Down
2 changes: 1 addition & 1 deletion config/config.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# --------------------------------------------------------
# Ashleys-QC pipeline Configuration
# --------------------------------------------------------
version: 2.2.1
version: 2.2.2

# Email for notifications about the pipeline's status
email: ""
Expand Down
5 changes: 1 addition & 4 deletions workflow/Snakefile
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
from snakemake.utils import min_version

min_version("7.14.0")
configfile_location = "config/config.yaml"


configfile_location = "config/config.yaml"
configfile: configfile_location


Expand Down
25 changes: 19 additions & 6 deletions workflow/rules/common.smk
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,13 @@ if config["mosaicatcher_pipeline"] == False:
)


def onsuccess_fct(log):
config_metadata = config_definitions = yaml.safe_load(open(configfile_location.replace("config.yaml", "config_metadata.yaml"), "r"))
log_path_new = make_log_useful_ashleys.make_log_useful(log, "SUCCESS", config, config_metadata)
def onsuccess_fct(log):
config_metadata = config_definitions = yaml.safe_load(
open(configfile_location.replace("config.yaml", "config_metadata.yaml"), "r")
)
log_path_new = make_log_useful_ashleys.make_log_useful(
log, "SUCCESS", config, config_metadata
)
shell(
'mail -s "[Snakemake] smk-wf-catalog/ashleys-qc-pipeline v{} - Run on {} - SUCCESS" {} < {}'.format(
config["version"], config["data_location"], config["email"], log_path_new
Expand All @@ -74,14 +78,19 @@ def onsuccess_fct(log):


def onerror_fct(log):
config_metadata = config_definitions = yaml.safe_load(open(configfile_location.replace("config.yaml", "config_metadata.yaml"), "r"))
log_path_new = make_log_useful_ashleys.make_log_useful(log, "ERROR", config, config_metadata)
config_metadata = config_definitions = yaml.safe_load(
open(configfile_location.replace("config.yaml", "config_metadata.yaml"), "r")
)
log_path_new = make_log_useful_ashleys.make_log_useful(
log, "ERROR", config, config_metadata
)
shell(
'mail -s "[Snakemake] smk-wf-catalog/ashleys-qc-pipeline v{} - Run on {} - ERRROR" {} < {}'.format(
config["version"], config["data_location"], config["email"], log_path_new
)
)


# Simple class to retrieve automatically files in the fastq/bam folder and create a config dataframe
class HandleInput:
def __init__(
Expand Down Expand Up @@ -264,7 +273,11 @@ class HandleInput:
"strandphaser",
]

for sample in [e for e in os.listdir(thisdir) if e not in exclude and e.endswith(".zip") is False]:
for sample in [
e
for e in os.listdir(thisdir)
if e not in exclude and e.endswith(".zip") is False
]:
# Create a list of files to process for each sample
l_files_all = [
f
Expand Down
Loading

0 comments on commit d5c9745

Please sign in to comment.