Skip to content

Commit

Permalink
Merge pull request #507 from moka-guys/feature/TSOsplitting
Browse files Browse the repository at this point in the history
Feature/tsosplitting (#507)

Co-Authored-By: MokaGuys <[email protected]>
  • Loading branch information
Aled Jones and mokaguys authored Oct 20, 2023
2 parents 363bddd + 0db3612 commit f374fae
Show file tree
Hide file tree
Showing 2 changed files with 186 additions and 54 deletions.
27 changes: 17 additions & 10 deletions automate_demultiplex_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,12 @@
"999999_A01229_0182_AHM2TSO500",
]

# TSO500 batch size (for splitting samplesheet)
if testing:
batch_size = 2
else:
batch_size = 16

# path to log file which records the output of the upload agent
upload_and_setoff_workflow_logfile = (
"{document_root}/automate_demultiplexing_logfiles/upload_agent_script_logfiles/"
Expand Down Expand Up @@ -134,7 +140,7 @@
# MokaSNP ID
mokasnp_pipeline_ID = "5091"
# TSO500 pipeline ID
TSO_pipeline_ID = "5237"
TSO_pipeline_ID = "5288" #TSO v1.6

# -- Moka WES test status--
# Test Status = NextSEQ sequencing
Expand Down Expand Up @@ -170,9 +176,9 @@
congenica_app_path = "Apps/congenica_upload_v1.3.2"
congenica_SFTP_upload_app = "applet-GFfJpj80jy1x1Bz1P1Bk3vQf"

# TSO500 app
tso500_app = "applet-GPgkz0j0jy1Yf4XxkXjVgKfv" # Apps/TSO500_v1.5.1
tso500_app_name = "TSO500_v1.5.1"
# TSO500 app
tso500_app = "applet-GZgv0Jj0jy1Yfbx3QvqyKjzp" # Apps/TSO500_v1.6.0
tso500_app_name = "TSO500_v1.6.0"
tso500_docker_image = (
"project-ByfFPz00jy1fk6PjpZ95F27J:file-Fz9Zyx00b5j8xKVkKv4fZ6JB"
)
Expand Down Expand Up @@ -383,6 +389,7 @@
TSO500_samplesheet_stage = " -isamplesheet="
TSO500_analysis_options_stage = " -ianalysis_options="
TSO500_project_name_stage = " -iproject_name="
TSO500_runfolder_name_stage = " -irunfolder_name="

# app instance types
TSO500_analysis_instance_high_throughput = "mem1_ssd1_v2_x72"
Expand Down Expand Up @@ -639,7 +646,7 @@
"Pan5085",
"Pan5112",
"Pan5114",
] # note the settings from the first item in this list are used when setting off the TSO500_output_parser commands.
]


default_panel_properties = {
Expand Down Expand Up @@ -1255,7 +1262,7 @@
},
"Pan4969": { # TSO500 no UTRs. TERT promoter
"TSO500": True,
"sambamba_bedfile": "Pan5130dataSambamba.bed",
"sambamba_bedfile": "Pan5205dataSambamba.bed",
"clinical_coverage_depth": 100,
"multiqc_coverage_level": 100,
"coverage_min_basecall_qual": 25,
Expand All @@ -1264,7 +1271,7 @@
"Pan5085": { # TSO500 High throughput Synnovis. no UTRs. TERT promoter
"TSO500": True,
"TSO500_high_throughput": True,
"sambamba_bedfile": "Pan5130dataSambamba.bed",
"sambamba_bedfile": "Pan5205dataSambamba.bed",
"clinical_coverage_depth": 100,
"multiqc_coverage_level": 100,
"coverage_min_basecall_qual": 25,
Expand All @@ -1273,7 +1280,7 @@
"Pan5112": { # TSO500 High throughput BSPS. no UTRs. TERT promoter
"TSO500": True,
"TSO500_high_throughput": True,
"sambamba_bedfile": "Pan5130dataSambamba.bed",
"sambamba_bedfile": "Pan5205dataSambamba.bed",
"clinical_coverage_depth": 100,
"multiqc_coverage_level": 100,
"coverage_min_basecall_qual": 25,
Expand All @@ -1283,7 +1290,7 @@
"Pan5114": { # TSO500 High throughput Control. no UTRs. TERT promoter
"TSO500": True,
"TSO500_high_throughput": True,
"sambamba_bedfile": "Pan5130dataSambamba.bed",
"sambamba_bedfile": "Pan5205dataSambamba.bed",
"clinical_coverage_depth": 100,
"multiqc_coverage_level": 100,
"coverage_min_basecall_qual": 25,
Expand Down Expand Up @@ -1764,7 +1771,7 @@
}

duty_csv_id = (
"project-ByfFPz00jy1fk6PjpZ95F27J:applet-GQg9J280jy1Zf79KGx9gk5K3"
"project-ByfFPz00jy1fk6PjpZ95F27J:applet-GZYx3Kj0kKj3YBV7qgK6VjXQ"
)
duty_csv_inputs = {
# tso_pannumbers should not include the dry lab pan number
Expand Down
213 changes: 169 additions & 44 deletions upload_and_setoff_workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,12 @@ def __init__(self, runfolder):
+ self.runfolder_name
+ "_congenica_upload_commands.sh"
)
self.TSO500_post_run_command_script = (
config.DNA_Nexus_workflow_logfolder
+ self.runfolder_name
+ "_TSO_post_run_commands.sh"
)
#TODO copy lines above to create separate dx run commands output script for TSO (to be run by duty binfx)
self.nexus_project_name = ""
self.nexus_path = ""
self.nexus_project_id = ""
Expand Down Expand Up @@ -148,6 +154,9 @@ def __init__(self, runfolder, now, debug_mode=False):
# list of fastqs to get ngs run number and WES batch
self.list_of_processed_samples = []

#list of TSO samplesheets
self.TSO500_samplesheets_list = []

# DNA Nexus commands to be built on later
self.source_command = "#!/bin/bash\n. %s" % (
config.sdk_source_cmd
Expand Down Expand Up @@ -336,7 +345,7 @@ def quarterback(self):
if TSO500_sample_list:
self.list_of_processed_samples, self.fastq_string = (
TSO500_sample_list,
self.runfolder_obj.runfolder_samplesheet_path,
self.runfolder_obj.runfolder_samplesheet_path, #this sets the fastq_string to be the samplesheet path
)

else:
Expand Down Expand Up @@ -367,9 +376,12 @@ def quarterback(self):
view_users_list, admin_users_list
).rstrip()
)
# split tso samplesheet and write split versions to the runfolder
# build upload agent command for fastq upload and write stdout to ua_stdout_log
# pass path to function which checks files were uploaded without error
if TSO500_sample_list:
# split TSO samplesheet to multiple sheets with <=16 samples/sheet
self.TSO500_samplesheets_list = self.split_TSO500_samplesheet()
backup_attempt_count = 1
while backup_attempt_count < 5:
self.loggers.script.info(
Expand All @@ -385,6 +397,7 @@ def quarterback(self):
# increase backup count
backup_attempt_count += 1

#upload fastqs. if TSO500 run, this uploads the samplesheet to the project root
self.look_for_upload_errors(self.upload_fastqs())

# upload cluster density files and check upload was successful.
Expand Down Expand Up @@ -624,6 +637,68 @@ def check_for_TSO500(self):
open(self.loggers.upload_agent.filepath, "w").close()
return sample_list

def split_TSO500_samplesheet(self):
"""
take TSO500 samplesheet and split in to parts with <=16 samples/sheet
write samplesheets to runfolder
return list of samplesheet paths? or just names (if they're saved in the runfolder,
they'll be uploaded to DNAnexus, can access from there for dx run cmds)
"""
# samplesheet in the runfolder
samplesheet_file = os.path.join(self.runfolder_obj.runfolderpath, self.runfolder_obj.runfolder_samplesheet_name)

samplesheet_header = []
samples = []
no_sample_lines = 0
expected_data_headers = ["Sample_ID", "Sample_Name", "index"]

# Read all lines from the sample sheet
with open(samplesheet_file) as samplesheet:
for line in reversed(samplesheet.readlines()):
# stop when get to data headers section
if any(header in line for header in expected_data_headers):
break
# skip empty lines (check first element of the line, after splitting on comma)
elif len(line.split(",")[0]) < 2:
pass
# If its a line containing a sample::
elif line.startswith("TSO"):
samples.append(line)
no_sample_lines += 1
# get header
with open(samplesheet_file) as samplesheet:
for line in samplesheet.readlines():
# stop when get to data headers section- add header line to header then break
if any(header in line for header in expected_data_headers):
samplesheet_header.append(line)
break
else:
samplesheet_header.append(line)

# reverse samples list to get back in correct order (starting at sample 1)
samples.reverse()

# Split samples into batches (size specified in config)
# batches is a list of lists, where each list is a subset of the samples from the samplesheet
# e.g. if batch_size=16, each list will contain up to 16 samples
batches = [samples[i:i + config.batch_size] for i in range(0, len(samples), config.batch_size)]

# Write batches to separate files named "PartXofY", and add samplesheet to list
samplesheet_list = []
number_of_batches = len(batches)
samplesheet_base_name = samplesheet_file.split(".csv")[0]
for samplesheet_count, batch in enumerate(batches, start=1):
#capture samplesheet file path to write samplesheet paths to the runfolder
samplesheet_filepath = "%sPart%sof%s.csv" % (samplesheet_base_name,samplesheet_count,number_of_batches)
# capture samplesheet name to write to list- use runfolder name
samplesheet_name = "%s_SampleSheetPart%sof%s.csv" % (self.runfolder_obj.runfolder_name,samplesheet_count,number_of_batches)
samplesheet_list.append(samplesheet_name)
with open(samplesheet_filepath, "a") as new_samplesheet:
new_samplesheet.writelines(samplesheet_header)
new_samplesheet.writelines(batch)

return(samplesheet_list)

def check_for_development_run(self):
"""
Read samplesheet looking for development pan number.
Expand Down Expand Up @@ -1561,47 +1636,23 @@ def start_building_dx_run_cmds(self):
for cmd in self.determine_exome_depth_requirements(pannnumber_list):
commands_list.append(cmd)

# write TSO commands if a TSO run.
if TSO500:
commands_list.append("#The TSOapp is set off first. This utilises the --wait flag, so the bash script waits until this job finishes before running the coverage, hap.py and fastqc commands using the samplesheet to determine expected files and thier locations ")
commands_list.append("#All jobs apart from control samples are added to the depends on list used to delay multiqc")
# build command for the TSO500 app and set off fastqc commands
commands_list.append(self.create_tso500_command())
commands_list.append(self.add_to_depends_list("TSO500", 'depends_list'))
commands_list.append("#The TSOapp is set off once for each samplesheet made")
commands_list.append("#Other jobs must be set off manually by running the file once the pipeline has finished")
# build commands for the TSO500 app and set off fastqc commands (need a command per samplesheet)
for samplesheet in self.TSO500_samplesheets_list:
commands_list.append(self.create_tso500_command(samplesheet))

self.build_TSO500_post_run_commands()

# TSO500 multiqc commands are written to a separate file with a function called above
if not TSO500:
commands_list.append(self.create_multiqc_command())
commands_list.append(self.add_to_depends_list("MultiQC", 'depends_list'))
commands_list.append(self.create_upload_multiqc_command(TSO500))
commands_list.append(self.add_to_depends_list("UploadMultiQC", 'depends_list'))

# For TSO samples, the fastqs are created within DNAnexus and the
# commands are generated using sample names parsed from the
# samplesheet. If for whatever reason those fastqs are not created
# by the DNAnexus app, the downstream job will not set off and
# therefore will produce no job ID to provide to the depends_list,
# which will create an error/ slack alert. To solve this problem,
# the job ID is only added to the depends list if it exits
for sample in self.list_of_processed_samples:
pannumber = re.search(r"Pan\d+", sample).group()
commands_list.append(
self.create_fastqc_command(sample)
)
# Only add to depends_list if job ID from previous command
# is not empty
commands_list.append(self.if_jobid_exists_depends % self.add_to_depends_list(sample, 'depends_list'))

commands_list.append(self.create_sambamba_cmd(sample, pannumber))
# Exclude negative controls from the depends list as the NTC
# coverage calculation can often fail. We want the coverage
# report for the NTC sample to help assess contamination.
# Only add to depends_list if job ID from previous command
# is not empty
commands_list.append(self.if_jobid_exists_depends % self.add_to_depends_list(sample, 'depends_list'))

if "HD200" in sample:
commands_list.append(self.create_sompy_cmd(sample, pannumber))
# Only add to depends_list if job ID from previous command
# is not empty
commands_list.append(self.if_jobid_exists_depends % self.add_to_depends_list("sompy", 'depends_list'))

commands_list.append(self.create_multiqc_command())
commands_list.append(self.add_to_depends_list("MultiQC", 'depends_list'))
commands_list.append(self.create_upload_multiqc_command(TSO500))
commands_list.append(self.add_to_depends_list("UploadMultiQC", 'depends_list'))
# setoff the below commands later as they are not depended upon by
# MultiQC but are required for duty_csv
if rpkm_list:
Expand All @@ -1613,10 +1664,81 @@ def start_building_dx_run_cmds(self):
commands_list.append(self.add_to_depends_list("rpkm", 'depends_list'))
commands_list.append(self.add_to_depends_list("depends", 'depends_list_recombined'))

commands_list.append(self.create_duty_csv_command())
if not TSO500:
commands_list.append(self.create_duty_csv_command())

return commands_list

def build_TSO500_post_run_commands(self):
"""
Function to build TSO500 commands to run after pipeline, i.e.
Fastqc, sambamba, sompy, multiqc, upload multiqc and duty_csv
Commands must be written to file _TSO_post_run_commands.sh
which can be run manually once pipeline done.
For TSO samples, the fastqs are created within DNAnexus and the
commands are generated using sample names parsed from the
samplesheet. If for whatever reason those fastqs are not created
by the DNAnexus app, the downstream job will not set off and
therefore will produce no job ID to provide to the depends_list,
which will create an error/ slack alert. To solve this problem,
the job ID is only added to the depends list if it exits
"""
# Update script log file to say what is being done.
self.loggers.script.info("Building dx run commands for TSO500 post pipeline processing")

# list to hold all commands.
TSO500 = True
TSOcommands_list = []
TSOcommands_list.append(self.source_command)
TSOcommands_list.append(self.empty_depends)
TSOcommands_list.append(self.empty_gatk_depends)

for sample in self.list_of_processed_samples:
pannumber = re.search(r"Pan\d+", sample).group()
TSOcommands_list.append(
self.create_fastqc_command(sample)
)
# Only add to depends_list if job ID from previous command
# is not empty
TSOcommands_list.append(self.if_jobid_exists_depends % self.add_to_depends_list(sample, 'depends_list'))
TSOcommands_list.append(self.if_jobid_exists_depends % ('echo ${jobid}'))

TSOcommands_list.append(self.create_sambamba_cmd(sample, pannumber))
# Exclude negative controls from the depends list as the NTC
# coverage calculation can often fail. We want the coverage
# report for the NTC sample to help assess contamination.
# Only add to depends_list if job ID from previous command
# is not empty
TSOcommands_list.append(self.if_jobid_exists_depends % self.add_to_depends_list(sample, 'depends_list'))
TSOcommands_list.append(self.if_jobid_exists_depends % ('echo ${jobid}'))

if "HD200" in sample:
TSOcommands_list.append(self.create_sompy_cmd(sample, pannumber))
# Only add to depends_list if job ID from previous command
# is not empty
TSOcommands_list.append(self.if_jobid_exists_depends % self.add_to_depends_list("sompy", 'depends_list'))
TSOcommands_list.append(self.if_jobid_exists_depends % ('echo ${jobid}'))

TSOcommands_list.append(self.create_multiqc_command())
TSOcommands_list.append(self.add_to_depends_list("MultiQC", 'depends_list'))
TSOcommands_list.append(self.if_jobid_exists_depends % ('echo ${jobid}'))
TSOcommands_list.append(self.create_upload_multiqc_command(TSO500))
TSOcommands_list.append(self.add_to_depends_list("UploadMultiQC", 'depends_list'))
TSOcommands_list.append(self.if_jobid_exists_depends % ('echo ${jobid}'))

TSOcommands_list.append(self.create_duty_csv_command())
TSOcommands_list.append(self.if_jobid_exists_depends % ('echo ${jobid}'))

with open(
self.runfolder_obj.TSO500_post_run_command_script, "w"
) as TSO500_commands:
# remove any None values from the command_list
TSO500_commands.writelines(
[line + "\n" for line in filter(None, TSOcommands_list)]
)

return TSOcommands_list

def determine_exome_depth_requirements(self,pannnumber_list):
"""
This function takes a list of all pan numbers found on this run.
Expand Down Expand Up @@ -1806,7 +1928,7 @@ def create_fastqc_command(self, fastqs):

return dx_command

def create_tso500_command(self):
def create_tso500_command(self,samplesheet):
"""
Build dx run command for tso500 docker app.
Will assess if it's a novaseq or not from the runfoldername and if it's
Expand Down Expand Up @@ -1860,13 +1982,16 @@ def create_tso500_command(self):
config.TSO500_samplesheet_stage,
self.runfolder_obj.nexus_project_id
+ ":"
+ self.runfolder_obj.runfolder_samplesheet_name,
+ self.runfolder_subdir
+ "/"
+ samplesheet,
config.TSO500_project_name_stage,
self.runfolder_obj.nexus_project_name,
config.TSO500_runfolder_name_stage,
self.runfolder_subdir,
config.TSO500_analysis_options_stage,
TSO500_analysis_options,
instance_type,
"--wait ",
self.dest,
self.dest_cmd,
self.token,
Expand Down

0 comments on commit f374fae

Please sign in to comment.