Skip to content

Commit

Permalink
EVA-3707 Add processor for excluding structural variants (#202)
Browse files Browse the repository at this point in the history
* add exclude structural variant processor
  • Loading branch information
nitin-ebi authored Nov 15, 2024
1 parent bd81d18 commit 3914949
Show file tree
Hide file tree
Showing 8 changed files with 386 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public class BeanNames {
public static final String ANNOTATION_PARSER_PROCESSOR = "annotation-parser-processor";
public static final String ANNOTATION_COMPOSITE_PROCESSOR = "annotation-composite-processor";
public static final String VARIANT_STATS_PROCESSOR = "variant-stats-processor";
public static final String COMPOSITE_VARIANT_PROCESSOR = "composite-variant-processor";

public static final String GENE_WRITER = "gene-writer";
public static final String ANNOTATION_WRITER = "annotation-writer";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemStreamReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.repeat.policy.SimpleCompletionPolicy;
Expand All @@ -33,14 +34,15 @@
import uk.ac.ebi.eva.pipeline.configuration.ChunkSizeCompletionPolicyConfiguration;
import uk.ac.ebi.eva.pipeline.configuration.io.readers.VcfReaderConfiguration;
import uk.ac.ebi.eva.pipeline.configuration.io.writers.VariantWriterConfiguration;
import uk.ac.ebi.eva.pipeline.configuration.jobs.steps.processors.VariantProcessorConfiguration;
import uk.ac.ebi.eva.pipeline.configuration.policies.InvalidVariantSkipPolicyConfiguration;
import uk.ac.ebi.eva.pipeline.jobs.steps.processors.VariantNoAlternateFilterProcessor;
import uk.ac.ebi.eva.pipeline.listeners.SkippedItemListener;
import uk.ac.ebi.eva.pipeline.listeners.StepProgressListener;
import uk.ac.ebi.eva.pipeline.listeners.VariantLoaderStepStatisticsListener;
import uk.ac.ebi.eva.pipeline.parameters.JobOptions;
import uk.ac.ebi.eva.pipeline.policies.InvalidVariantSkipPolicy;

import static uk.ac.ebi.eva.pipeline.configuration.BeanNames.COMPOSITE_VARIANT_PROCESSOR;
import static uk.ac.ebi.eva.pipeline.configuration.BeanNames.LOAD_VARIANTS_STEP;
import static uk.ac.ebi.eva.pipeline.configuration.BeanNames.VARIANT_READER;
import static uk.ac.ebi.eva.pipeline.configuration.BeanNames.VARIANT_WRITER;
Expand All @@ -53,8 +55,8 @@
*/
@Configuration
@EnableBatchProcessing
@Import({VcfReaderConfiguration.class, VariantWriterConfiguration.class, ChunkSizeCompletionPolicyConfiguration.class
, InvalidVariantSkipPolicyConfiguration.class})
@Import({VcfReaderConfiguration.class, VariantProcessorConfiguration.class, VariantWriterConfiguration.class,
ChunkSizeCompletionPolicyConfiguration.class, InvalidVariantSkipPolicyConfiguration.class})
public class LoadVariantsStepConfiguration {

private static final Logger logger = LoggerFactory.getLogger(LoadVariantsStepConfiguration.class);
Expand All @@ -67,6 +69,10 @@ public class LoadVariantsStepConfiguration {
@Qualifier(VARIANT_WRITER)
private ItemWriter<Variant> variantWriter;

@Autowired
@Qualifier(COMPOSITE_VARIANT_PROCESSOR)
private ItemProcessor<Variant, Variant> variantProcessor;

@Autowired
private InvalidVariantSkipPolicy invalidVariantSkipPolicy;

Expand All @@ -78,7 +84,7 @@ public Step loadVariantsStep(StepBuilderFactory stepBuilderFactory, JobOptions j
return stepBuilderFactory.get(LOAD_VARIANTS_STEP)
.<Variant, Variant>chunk(chunkSizeCompletionPolicy)
.reader(reader)
.processor(new VariantNoAlternateFilterProcessor())
.processor(variantProcessor)
.writer(variantWriter)
.faultTolerant()
.skipPolicy(invalidVariantSkipPolicy)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright 2024 EMBL - European Bioinformatics Institute
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package uk.ac.ebi.eva.pipeline.configuration.jobs.steps.processors;

import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.support.CompositeItemProcessor;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import uk.ac.ebi.eva.commons.models.data.Variant;
import uk.ac.ebi.eva.pipeline.jobs.steps.processors.ExcludeStructuralVariantsProcessor;
import uk.ac.ebi.eva.pipeline.jobs.steps.processors.VariantNoAlternateFilterProcessor;

import java.util.Arrays;

import static uk.ac.ebi.eva.pipeline.configuration.BeanNames.COMPOSITE_VARIANT_PROCESSOR;


/**
* Configuration to inject a VariantProcessor as a bean.
*/
@Configuration
public class VariantProcessorConfiguration {
@Bean(COMPOSITE_VARIANT_PROCESSOR)
@StepScope
public ItemProcessor<Variant, Variant> compositeVariantProcessor(
VariantNoAlternateFilterProcessor variantNoAlternateFilterProcessor,
ExcludeStructuralVariantsProcessor excludeStructuralVariantsProcessor) {
CompositeItemProcessor<Variant, Variant> compositeProcessor = new CompositeItemProcessor<>();
compositeProcessor.setDelegates(Arrays.asList(variantNoAlternateFilterProcessor,
excludeStructuralVariantsProcessor));

return compositeProcessor;
}

@Bean
public ExcludeStructuralVariantsProcessor excludeStructuralVariantsProcessor() {
return new ExcludeStructuralVariantsProcessor();
}

@Bean
public VariantNoAlternateFilterProcessor variantNoAlternateFilterProcessor() {
return new VariantNoAlternateFilterProcessor();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
*
* Copyright 2024 EMBL - European Bioinformatics Institute
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package uk.ac.ebi.eva.pipeline.jobs.steps.processors;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.item.ItemProcessor;
import uk.ac.ebi.eva.commons.models.data.Variant;

import java.text.MessageFormat;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

/**
* Direct implementation of VCF specification grammars from
* <a href="https://github.com/EBIvariation/vcf-validator/blob/master/src/vcf/vcf.ragel">here</a> and
* <a href="https://github.com/EBIvariation/vcf-validator/blob/master/src/vcf/vcf_v43.ragel">here</a>.
*/
public class ExcludeStructuralVariantsProcessor implements ItemProcessor<Variant, Variant> {

private static final Logger logger = LoggerFactory.getLogger(ExcludeStructuralVariantsProcessor.class);

private static String meta_contig_char = "(\\p{Alnum}|[\\p{Punct}&&[^:<>\\[\\]*=,]])";

private static String chromBasicRegEx = MessageFormat.format("([{0}&&[^#]][{0}]*)", meta_contig_char);

private static String chromContigRegEx = String.format("(<%s>)", chromBasicRegEx);

private static String chromosomeRegEx = String.format("(%s|%s)", chromBasicRegEx, chromContigRegEx);

private static String positionRegEx = "([\\p{Digit}]+)";

private static String basesRegEx = "([ACTGNactgn]+)";

private static String altIDRegEx_positive_match = "([\\p{Alnum}|[\\p{Punct}&&[^,<>]]]+)";

private static String altIDRegEx_negative_match = "([\\p{Punct}]+)";

private static String altIDRegEx = String.format("((?!%s)%s)", altIDRegEx_negative_match,
altIDRegEx_positive_match);

private static String stdPrefixRegEx = MessageFormat.format(
"<DEL>|<INS>|<DUP>|<INV>|<CNV>|<DUP:TANDEM>|<DEL:ME:{0}>|<INS:ME:{0}>", "(\\p{Alnum})+");

private static String altIndelRegEx = String.format("(%s|\\*)", stdPrefixRegEx);

private static String altOtherRegEx = String.format("((?!%s)%s)", stdPrefixRegEx,
String.format("<%s>", altIDRegEx));

/**
* See <a href="https://github.com/EBIvariation/vcf-validator/blob/be6cf8e2b35f2260166c1e6ffa1258a985a99ba3/src/vcf/vcf_v43.ragel#L190">VCF specification grammar</a>
*/
private static String altSVRegEx = String.join("|",
String.format("(\\]%s:%s\\]%s)", chromosomeRegEx, positionRegEx,
basesRegEx),
String.format("(\\[%s:%s\\[%s)", chromosomeRegEx, positionRegEx,
basesRegEx),
String.format("(%s\\]%s:%s\\])", basesRegEx, chromosomeRegEx,
positionRegEx),
String.format("(%s\\[%s:%s\\[)", basesRegEx, chromosomeRegEx,
positionRegEx),
String.format("(\\.%s)", basesRegEx),
String.format("(%s\\.)", basesRegEx));

private static String altGVCFRegEx = "(<\\*>)";

/**
* See <a href="https://github.com/EBIvariation/vcf-validator/blob/be6cf8e2b35f2260166c1e6ffa1258a985a99ba3/src/vcf/vcf_v43.ragel#L201">VCF specification grammar</a>
*/
private static String STRUCTURAL_VARIANT_REGEX = String.format("^(%s|%s|%s|%s)$", altIndelRegEx, altSVRegEx,
altGVCFRegEx, altOtherRegEx);

private static final Pattern STRUCTURAL_VARIANT_PATTERN = Pattern.compile(STRUCTURAL_VARIANT_REGEX);

@Override
public Variant process(Variant variant) {
Matcher matcher = STRUCTURAL_VARIANT_PATTERN.matcher(variant.getAlternate());
if (matcher.matches()) {
logger.info("Skipped processing structural variant " + variant);
return null;
}
return variant;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package uk.ac.ebi.eva.pipeline.configuration.jobs.steps;

import com.mongodb.client.model.Filters;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.test.JobLauncherTestUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.TestPropertySource;
import org.springframework.test.context.junit4.SpringRunner;
import uk.ac.ebi.eva.pipeline.Application;
import uk.ac.ebi.eva.pipeline.configuration.BeanNames;
import uk.ac.ebi.eva.pipeline.configuration.jobs.GenotypedVcfJobConfiguration;
import uk.ac.ebi.eva.test.configuration.BatchTestConfiguration;
import uk.ac.ebi.eva.test.configuration.TemporaryRuleConfiguration;
import uk.ac.ebi.eva.test.rules.TemporaryMongoRule;
import uk.ac.ebi.eva.utils.EvaJobParameterBuilder;

import static org.junit.Assert.assertEquals;
import static uk.ac.ebi.eva.test.utils.JobTestUtils.assertCompleted;
import static uk.ac.ebi.eva.utils.FileUtils.getResource;


@RunWith(SpringRunner.class)
@ActiveProfiles({Application.VARIANT_WRITER_MONGO_PROFILE, Application.VARIANT_ANNOTATION_MONGO_PROFILE})
@TestPropertySource({"classpath:common-configuration.properties", "classpath:test-mongo.properties"})
@ContextConfiguration(classes = {GenotypedVcfJobConfiguration.class, BatchTestConfiguration.class, TemporaryRuleConfiguration.class})
public class GenotypedVcfTestSkipStructuralVariant {
private static final String SMALL_STRUCTURAL_VARIANTS_VCF_FILE = "/input-files/vcf/small_structural_variant.vcf.gz";

private static final String SMALL_STRUCTURAL_VARIANTS_VCF_FILE_REF_ALT_STARTS_WITH_SAME_ALLELE = "/input-files/vcf/small_invalid_variant.vcf.gz";

private static final String COLLECTION_VARIANTS_NAME = "variants";

private static final String databaseName = "test_invalid_variant_db";

@Autowired
@Rule
public TemporaryMongoRule mongoRule;

@Autowired
private JobLauncherTestUtils jobLauncherTestUtils;

@Before
public void setUp() throws Exception {
mongoRule.getTemporaryDatabase(databaseName).drop();
}

@Test
public void loaderStepShouldSkipStructuralVariants() throws Exception {
// When the execute method in variantsLoad is executed
JobParameters jobParameters = new EvaJobParameterBuilder()
.collectionVariantsName(COLLECTION_VARIANTS_NAME)
.databaseName(databaseName)
.inputStudyId("1")
.inputVcf(getResource(SMALL_STRUCTURAL_VARIANTS_VCF_FILE).getAbsolutePath())
.inputVcfAggregation("NONE")
.inputVcfId("1")
.toJobParameters();

JobExecution jobExecution = jobLauncherTestUtils.launchStep(BeanNames.LOAD_VARIANTS_STEP, jobParameters);

//Then variantsLoad step should complete correctly
assertCompleted(jobExecution);

// And the number of documents in the DB should be 1 as all other variants are invalid
assertEquals(1, mongoRule.getCollection(databaseName, COLLECTION_VARIANTS_NAME).count());
assertEquals(1, mongoRule.getCollection(databaseName, COLLECTION_VARIANTS_NAME).countDocuments(Filters.eq("_id", "1_152739_A_G")));
}

/*
* This test case represents a special case of structural variants that should be skipped, but due to a bug makes its
* way into the DB.
*
* The variant has ref as "G" and alt as "G[2:421681[", which means it fits the definition of a structural variant
* and should be skipped by the variant processor that filters out structural variants.
*
* But instead what is currently happening is that after the variant is read, the normalization process in variant
* reader removes the prefix G and the variant is eventually reduced to ref "" and alt "[2:421681[", which does not get
* parsed correctly by the regex in @ExcludeStructuralVariantsProcessor and makes its way into the DB.
*
* Currently, the test case fails therefore we are skipping it for now. It should start passing once we have fixed the
* problem with the normalization in the variant reader.
*/
@Ignore
@Test
public void loaderStepShouldSkipStructuralVariantsWhereRefAndAltStartsWithSameAllele() throws Exception {
// When the execute method in variantsLoad is executed
JobParameters jobParameters = new EvaJobParameterBuilder()
.collectionVariantsName(COLLECTION_VARIANTS_NAME)
.databaseName(databaseName)
.inputStudyId("1")
.inputVcf(getResource(SMALL_STRUCTURAL_VARIANTS_VCF_FILE_REF_ALT_STARTS_WITH_SAME_ALLELE).getAbsolutePath())
.inputVcfAggregation("NONE")
.inputVcfId("1")
.toJobParameters();

JobExecution jobExecution = jobLauncherTestUtils.launchStep(BeanNames.LOAD_VARIANTS_STEP, jobParameters);

//Then variantsLoad step should complete correctly
assertCompleted(jobExecution);

assertEquals(0, mongoRule.getCollection(databaseName, COLLECTION_VARIANTS_NAME).count());
}
}
Loading

0 comments on commit 3914949

Please sign in to comment.