diff --git a/beagle/__init__.py b/beagle/__init__.py index 52e551355..d9888a95a 100644 --- a/beagle/__init__.py +++ b/beagle/__init__.py @@ -1 +1 @@ -__version__ = "1.89.0" +__version__ = "1.89.1" diff --git a/runner/operator/access/heme/nucleo/heme_nucleo.py b/runner/operator/access/heme/nucleo/heme_nucleo.py index 8a3ee4a17..a3491bebf 100644 --- a/runner/operator/access/heme/nucleo/heme_nucleo.py +++ b/runner/operator/access/heme/nucleo/heme_nucleo.py @@ -7,6 +7,7 @@ from collections import defaultdict from runner.operator.operator import Operator +from runner.operator.helper import pair_samples from runner.run.objects.run_creator_object import RunCreator from file_system.repository.file_repository import FileRepository @@ -85,7 +86,7 @@ def construct_sample_inputs(samples, request_id): sample_group = list(sample_group) sample_id = sample_group[0]["metadata"][settings.CMO_SAMPLE_NAME_METADATA_KEY] - fgbio_fastq_to_bam_input = group_by_run(sample_group) + fgbio_fastq_to_bam_input = pair_samples(sample_group) fgbio_fastq_to_bam_input = [ [ {"class": "File", "location": "juno://" + s[0]["path"]}, diff --git a/runner/operator/access/v2_1_0/nucleo/access_nucleo.py b/runner/operator/access/v2_1_0/nucleo/access_nucleo.py index b4ab6c360..cdefbfea5 100644 --- a/runner/operator/access/v2_1_0/nucleo/access_nucleo.py +++ b/runner/operator/access/v2_1_0/nucleo/access_nucleo.py @@ -7,6 +7,7 @@ from collections import defaultdict from runner.operator.operator import Operator +from runner.operator.helper import pair_samples from runner.run.objects.run_creator_object import RunCreator from file_system.repository.file_repository import FileRepository @@ -45,12 +46,6 @@ def group_by_sample_id(samples): return sample_pairs -def group_by_run(samples): - samples.sort(key=lambda s: s["path"].split("/")[-1]) - fastqs = zip(samples[::2], samples[1::2]) - return list(fastqs) - - def calc_avg(sample_files, field): samples_with_field = list(filter(lambda s: field in s["metadata"] and s["metadata"][field], sample_files)) field_count = len(samples_with_field) @@ -85,7 +80,7 @@ def construct_sample_inputs(samples, request_id): sample_group = list(sample_group) sample_id = sample_group[0]["metadata"][settings.CMO_SAMPLE_NAME_METADATA_KEY] - fgbio_fastq_to_bam_input = group_by_run(sample_group) + fgbio_fastq_to_bam_input = pair_samples(sample_group) fgbio_fastq_to_bam_input = [ [ {"class": "File", "location": "juno://" + s[0]["path"]}, diff --git a/runner/operator/cmo_ch/v2_1_0/nucleo/nucleo_operator.py b/runner/operator/cmo_ch/v2_1_0/nucleo/nucleo_operator.py index f9356fd30..f86dd7665 100644 --- a/runner/operator/cmo_ch/v2_1_0/nucleo/nucleo_operator.py +++ b/runner/operator/cmo_ch/v2_1_0/nucleo/nucleo_operator.py @@ -6,6 +6,7 @@ from collections import defaultdict from runner.operator.operator import Operator +from runner.operator.helper import pair_samples from runner.run.objects.run_creator_object import RunCreator from file_system.repository.file_repository import FileRepository from django.conf import settings @@ -42,10 +43,37 @@ def group_by_sample_id(samples): return sample_pairs -def group_by_run(samples): - samples.sort(key=lambda s: s["path"].split("/")[-1]) - fastqs = zip(samples[::2], samples[1::2]) - return list(fastqs) +def pair_samples(fastqs): + """ + pair sample fastqs based on the delivery directory. + + Parameters: + fastqs (list): A list of sample fastq files. + + Returns: + list: A list of tuples containing paired fastqs for a sample + """ + sample_pairs = [] + expected_pair = set(["R1", "R2"]) + # match R1 and R2 based on delivery directory + # sorting on file names is not enough as they are non-unique + for i, fastq in enumerate(fastqs): + dir = "/".join(fastq["path"].split("_R")[0:-1]) + for compare in fastqs[i + 1 :]: + compare_dir = "/".join(compare["path"].split("_R")[0:-1]) + if dir == compare_dir: + # check if R1 and R2 are present + r_check = set([fastq["metadata"]["R"], compare["metadata"]["R"]]) + if r_check.issubset(expected_pair): + # Keep ordering consistent + if fastq["metadata"]["R"] == "R1": + sample_pairs.append((fastq, compare)) + else: + sample_pairs.append((compare, fastq)) + else: + sample_name = fastq["metadata"]["cmoSampleName"] + raise Exception(f"Improper pairing for: {sample_name}") + return sample_pairs def calc_avg(sample_files, field): @@ -81,7 +109,7 @@ def construct_sample_inputs(samples, request_id): sample_group = list(sample_group) sample_id = sample_group[0]["metadata"][settings.CMO_SAMPLE_NAME_METADATA_KEY] - fgbio_fastq_to_bam_input = group_by_run(sample_group) + fgbio_fastq_to_bam_input = pair_samples(sample_group) fgbio_fastq_to_bam_input = [ [ {"class": "File", "location": "juno://" + s[0]["path"]}, diff --git a/runner/operator/helper.py b/runner/operator/helper.py index 6c842665d..e52d0f9d4 100644 --- a/runner/operator/helper.py +++ b/runner/operator/helper.py @@ -143,3 +143,36 @@ def init_metadata(): metadata["runId"] = "" metadata["preservation"] = "" return metadata + + +def pair_samples(fastqs): + """ + pair sample fastqs based on the delivery directory. + + Parameters: + fastqs (list): A list of sample fastq files. + + Returns: + list: A list of tuples containing paired fastqs for a sample + """ + sample_pairs = [] + expected_pair = set(["R1", "R2"]) + # match R1 and R2 based on delivery directory + # sorting on file names is not enough as they are non-unique + for i, fastq in enumerate(fastqs): + dir = "/".join(fastq["path"].split("_R")[0:-1]) + for compare in fastqs[i + 1 :]: + compare_dir = "/".join(compare["path"].split("_R")[0:-1]) + if dir == compare_dir: + # check if R1 and R2 are present + r_check = set([fastq["metadata"]["R"], compare["metadata"]["R"]]) + if r_check.issubset(expected_pair): + # Keep ordering consistent + if fastq["metadata"]["R"] == "R1": + sample_pairs.append((fastq, compare)) + else: + sample_pairs.append((compare, fastq)) + else: + sample_name = fastq["metadata"]["cmoSampleName"] + raise Exception(f"Improper pairing for: {sample_name}") + return sample_pairs diff --git a/runner/run/objects/run_creator_object.py b/runner/run/objects/run_creator_object.py index bcd1136c8..bc411f739 100644 --- a/runner/run/objects/run_creator_object.py +++ b/runner/run/objects/run_creator_object.py @@ -64,6 +64,7 @@ def create(self): try: run.job_group = JobGroup.objects.get(id=self.job_group_id) except JobGroup.DoesNotExist: + run.job_group = JobGroup.objects.create() print("[JobGroup] %s" % self.job_group_id) try: run.job_group_notifier = JobGroupNotifier.objects.get(id=self.job_group_notifier_id)