Skip to content

Commit

Permalink
Add bundle support for FeatureInputs.
Browse files Browse the repository at this point in the history
  • Loading branch information
cmnbroad committed Aug 20, 2024
1 parent 9f2fbb5 commit c2b6beb
Show file tree
Hide file tree
Showing 13 changed files with 1,098 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ private StandardArgumentDefinitions(){}
public static final String INTERVALS_SHORT_NAME = "L";
public static final String COMPARISON_SHORT_NAME = "comp";
public static final String READ_INDEX_SHORT_NAME = READ_INDEX_LONG_NAME;
public static final String PRIMARY_INPUT_LONG_NAME = "primary";
public static final String PRIMARY_INPUT_SHORT_NAME = "PI";
public static final String SECONDARY_INPUT_LONG_NAME = "secondaryI";
public static final String SECONDARY_INPUT_SHORT_NAME = "SI";
public static final String LENIENT_SHORT_NAME = "LE";
public static final String READ_VALIDATION_STRINGENCY_SHORT_NAME = "VS";
public static final String SAMPLE_ALIAS_SHORT_NAME = "ALIAS";
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
package org.broadinstitute.hellbender.engine;

import htsjdk.beta.io.bundle.Bundle;
import htsjdk.beta.io.bundle.BundleJSON;
import htsjdk.beta.io.bundle.BundleResource;
import htsjdk.beta.io.bundle.BundleResourceType;
import htsjdk.io.IOPath;
import htsjdk.samtools.SAMSequenceDictionary;
import htsjdk.samtools.util.IOUtil;
import htsjdk.samtools.util.Locatable;
Expand Down Expand Up @@ -148,7 +153,7 @@ public FeatureDataSource(final File featureFile) {
* generated name, and will look ahead the default number of bases ({@link #DEFAULT_QUERY_LOOKAHEAD_BASES})
* during queries that produce cache misses.
*
* @param featurePath path or URI to source of Features
* @param featurePath path or URI to source of Features (may be a Bundle)
*/
public FeatureDataSource(final String featurePath) {
this(featurePath, null, DEFAULT_QUERY_LOOKAHEAD_BASES, null);
Expand All @@ -159,7 +164,7 @@ public FeatureDataSource(final String featurePath) {
* name. We will look ahead the default number of bases ({@link #DEFAULT_QUERY_LOOKAHEAD_BASES}) during queries
* that produce cache misses.
*
* @param featureFile file containing Features
* @param featureFile file or Bundle containing Features
* @param name logical name for this data source (may be null)
*/
public FeatureDataSource(final File featureFile, final String name) {
Expand All @@ -170,7 +175,7 @@ public FeatureDataSource(final File featureFile, final String name) {
* Creates a FeatureDataSource backed by the provided File and assigns this data source the specified logical
* name. We will look ahead the specified number of bases during queries that produce cache misses.
*
* @param featureFile file containing Features
* @param featureFile file or Bundle containing Features
* @param name logical name for this data source (may be null)
* @param queryLookaheadBases look ahead this many bases during queries that produce cache misses
*/
Expand All @@ -181,7 +186,7 @@ public FeatureDataSource(final File featureFile, final String name, final int qu
/**
* Creates a FeatureDataSource backed by the resource at the provided path.
*
* @param featurePath path to file or GenomicsDB url containing features
* @param featurePath path to file or GenomicsDB url or Bundle containing features
* @param name logical name for this data source (may be null)
* @param queryLookaheadBases look ahead this many bases during queries that produce cache misses
* @param targetFeatureType When searching for a {@link FeatureCodec} for this data source, restrict the search to codecs
Expand All @@ -195,7 +200,7 @@ public FeatureDataSource(final String featurePath, final String name, final int
* Creates a FeatureDataSource backed by the provided FeatureInput. We will look ahead the specified number of bases
* during queries that produce cache misses.
*
* @param featureInput a FeatureInput specifying a source of Features
* @param featureInput a FeatureInput specifying a source of Features (or a Bundle)
* @param queryLookaheadBases look ahead this many bases during queries that produce cache misses
* @param targetFeatureType When searching for a {@link FeatureCodec} for this data source, restrict the search to codecs
* that produce this type of Feature. May be null, which results in an unrestricted search.
Expand All @@ -207,7 +212,7 @@ public FeatureDataSource(final FeatureInput<T> featureInput, final int queryLook
/**
* Creates a FeatureDataSource backed by the resource at the provided path.
*
* @param featurePath path to file or GenomicsDB url containing features
* @param featurePath path to file or GenomicsDB url or Bundle containing features
* @param name logical name for this data source (may be null)
* @param queryLookaheadBases look ahead this many bases during queries that produce cache misses
* @param targetFeatureType When searching for a {@link FeatureCodec} for this data source, restrict the search to codecs
Expand All @@ -224,7 +229,7 @@ public FeatureDataSource(final String featurePath, final String name, final int
* Creates a FeatureDataSource backed by the provided FeatureInput. We will look ahead the specified number of bases
* during queries that produce cache misses.
*
* @param featureInput a FeatureInput specifying a source of Features
* @param featureInput a FeatureInput specifying a source of Features (may be a Bundle)
* @param queryLookaheadBases look ahead this many bases during queries that produce cache misses
* @param targetFeatureType When searching for a {@link FeatureCodec} for this data source, restrict the search to codecs
* that produce this type of Feature. May be null, which results in an unrestricted search.
Expand All @@ -241,7 +246,7 @@ public FeatureDataSource(final FeatureInput<T> featureInput, final int queryLook
* Creates a FeatureDataSource backed by the provided FeatureInput. We will look ahead the specified number of bases
* during queries that produce cache misses.
*
* @param featureInput a FeatureInput specifying a source of Features
* @param featureInput a FeatureInput specifying a source of Features (may be a Bundle)
* @param queryLookaheadBases look ahead this many bases during queries that produce cache misses
* @param targetFeatureType When searching for a {@link FeatureCodec} for this data source, restrict the search to codecs
* that produce this type of Feature. May be null, which results in an unrestricted search.
Expand All @@ -259,7 +264,7 @@ public FeatureDataSource(final FeatureInput<T> featureInput, final int queryLook
* Creates a FeatureDataSource backed by the provided FeatureInput. We will look ahead the specified number of bases
* during queries that produce cache misses.
*
* @param featureInput a FeatureInput specifying a source of Features
* @param featureInput a FeatureInput specifying a source of Features (may be a Bundle)
* @param queryLookaheadBases look ahead this many bases during queries that produce cache misses
* @param targetFeatureType When searching for a {@link FeatureCodec} for this data source, restrict the search to codecs
* that produce this type of Feature. May be null, which results in an unrestricted search.
Expand All @@ -278,7 +283,7 @@ public FeatureDataSource(final FeatureInput<T> featureInput, final int queryLook
* Creates a FeatureDataSource backed by the provided FeatureInput. We will look ahead the specified number of bases
* during queries that produce cache misses.
*
* @param featureInput a FeatureInput specifying a source of Features
* @param featureInput a FeatureInput specifying a source of Features (may be a Bundle)
* @param queryLookaheadBases look ahead this many bases during queries that produce cache misses
* @param targetFeatureType When searching for a {@link FeatureCodec} for this data source, restrict the search to codecs
* that produce this type of Feature. May be null, which results in an unrestricted search.
Expand All @@ -296,7 +301,7 @@ public FeatureDataSource(final FeatureInput<T> featureInput, final int queryLook
* Creates a FeatureDataSource backed by the provided FeatureInput. We will look ahead the specified number of bases
* during queries that produce cache misses.
*
* @param featureInput a FeatureInput specifying a source of Features
* @param featureInput a FeatureInput specifying a source of Features (may be a Bundle)
* @param queryLookaheadBases look ahead this many bases during queries that produce cache misses
* @param targetFeatureType When searching for a {@link FeatureCodec} for this data source, restrict the search to codecs
* that produce this type of Feature. May be null, which results in an unrestricted search.
Expand Down Expand Up @@ -369,9 +374,26 @@ private static <T extends Feature> FeatureReader<T> getFeatureReader(final Featu
} catch (final ClassCastException e) {
throw new UserException("GenomicsDB inputs can only be used to provide VariantContexts.", e);
}
} else if (featureInput.hasExtension(BundleJSON.BUNDLE_EXTENSION)) {
// the feature input specifies a serialized json bundle file
final Bundle vcfBundle = BundleJSON.toBundle(htsjdk.beta.plugin.IOUtils.getStringFromPath(featureInput), GATKPath::new);
final IOPath vcfPath = vcfBundle.getOrThrow(BundleResourceType.CT_VARIANT_CONTEXTS).getIOPath().get();
// to get the codec we have to use the path of the underlying vcf resource, not the bundle path
final FeatureInput<T> fi = new FeatureInput<T>(vcfPath.getRawInputString(), featureInput.getName());
final FeatureCodec<T, ?> codec = getCodecForFeatureInput(fi, targetFeatureType, setNameOnCodec);
// propagate the bundle path, not the vcf path, to the reader, so that downstream code can retrieve
// the index path from the bundle
return getTribbleFeatureReader(featureInput, codec, cloudWrapper, cloudIndexWrapper);
} else if (featureInput.getParentBundle() != null) {
// the featureInput was created from a bundle list expansion (i.e, MultiVariantWalkers). it has the
// primary resource as the underlying resource path, and the containing bundle attached as the
// "parent bundle". Use the original FI to get the codec, but to get the feature reader, we use
// the FI that contains the bundle path, since the feature reader may require acccess to the index
final FeatureCodec<T, ?> codec = getCodecForFeatureInput(featureInput, targetFeatureType, setNameOnCodec);
return getTribbleFeatureReader(featureInput, codec, cloudWrapper, cloudIndexWrapper);
} else {
final FeatureCodec<T, ?> codec = getCodecForFeatureInput(featureInput, targetFeatureType, setNameOnCodec);
if ( featureInput.getFeaturePath().toLowerCase().endsWith(BCI_FILE_EXTENSION) ) {
if (featureInput.getFeaturePath().toLowerCase().endsWith(BCI_FILE_EXTENSION)) {
return new Reader(featureInput, codec);
}
return getTribbleFeatureReader(featureInput, codec, cloudWrapper, cloudIndexWrapper);
Expand Down Expand Up @@ -419,18 +441,48 @@ private static <T extends Feature> FeatureReader<T> getFeatureReader(final Featu
private static <T extends Feature> AbstractFeatureReader<T, ?> getTribbleFeatureReader(final FeatureInput<T> featureInput, final FeatureCodec<T, ?> codec, final Function<SeekableByteChannel, SeekableByteChannel> cloudWrapper, final Function<SeekableByteChannel, SeekableByteChannel> cloudIndexWrapper) {
Utils.nonNull(codec);
try {
// Must get the path to the data file from the codec here:
final String absoluteRawPath = featureInput.getRawInputString();

// Instruct the reader factory to not require an index. We will require one ourselves as soon as
// a query by interval is attempted.
final boolean requireIndex = false;

// Only apply the wrappers if the feature input is in a remote location which will benefit from prefetching.
if (BucketUtils.isEligibleForPrefetching(featureInput)) {
return AbstractFeatureReader.getFeatureReader(absoluteRawPath, null, codec, requireIndex, cloudWrapper, cloudIndexWrapper);
if (featureInput.hasExtension(BundleJSON.BUNDLE_EXTENSION)) {
final Bundle vcfBundle = BundleJSON.toBundle(htsjdk.beta.plugin.IOUtils.getStringFromPath(featureInput), GATKPath::new);
final IOPath vcfPath = vcfBundle.getOrThrow(BundleResourceType.CT_VARIANT_CONTEXTS).getIOPath().get();
final Optional<BundleResource> vcfIndexPath = vcfBundle.get(BundleResourceType.CT_VARIANTS_INDEX);
final String rawIndexResourcePath =
vcfIndexPath.isPresent() ? vcfIndexPath.get().getIOPath().get().getRawInputString() : null;

// Only apply the wrappers if the feature input is in a remote location which will benefit from prefetching.
if (BucketUtils.isEligibleForPrefetching(vcfPath)) {
final String absoluteRawPath = vcfPath.getRawInputString();
return AbstractFeatureReader.getFeatureReader(absoluteRawPath, rawIndexResourcePath, codec, requireIndex, cloudWrapper, cloudIndexWrapper);
} else {
return AbstractFeatureReader.getFeatureReader(vcfPath.getRawInputString(), rawIndexResourcePath, codec, requireIndex, Utils.identityFunction(), Utils.identityFunction());
}
} else if (featureInput.getParentBundle() != null) {
final Bundle vcfBundle = featureInput.getParentBundle();
// code path for when a user has specified multiple bundles on the command line, so there is no single
// serialized bundle file to access
final IOPath vcfPath = vcfBundle.getOrThrow(BundleResourceType.CT_VARIANT_CONTEXTS).getIOPath().get();
// Only apply the wrappers if the feature input is in a remote location which will benefit from prefetching.
final Optional<BundleResource> vcfIndexPath = vcfBundle.get(BundleResourceType.CT_VARIANTS_INDEX);
final String rawIndexResourcePath =
vcfIndexPath.isPresent() ? vcfIndexPath.get().getIOPath().get().getRawInputString() : null;
final String absoluteRawPath = vcfPath.getRawInputString();
if (BucketUtils.isEligibleForPrefetching(vcfPath)) {
return AbstractFeatureReader.getFeatureReader(absoluteRawPath, rawIndexResourcePath, codec, requireIndex, cloudWrapper, cloudIndexWrapper);
} else {
return AbstractFeatureReader.getFeatureReader(absoluteRawPath, rawIndexResourcePath, codec, requireIndex, Utils.identityFunction(), Utils.identityFunction());
}
} else {
return AbstractFeatureReader.getFeatureReader(absoluteRawPath, null, codec, requireIndex, Utils.identityFunction(), Utils.identityFunction());
final String absoluteRawPath = featureInput.getRawInputString();

// Only apply the wrappers if the feature input is in a remote location which will benefit from prefetching.
if (BucketUtils.isEligibleForPrefetching(featureInput)) {
return AbstractFeatureReader.getFeatureReader(absoluteRawPath, null, codec, requireIndex, cloudWrapper, cloudIndexWrapper);
} else {
return AbstractFeatureReader.getFeatureReader(absoluteRawPath, null, codec, requireIndex, Utils.identityFunction(), Utils.identityFunction());
}
}
} catch (final TribbleException e) {
throw new GATKException("Error initializing feature reader for path " + featureInput.getFeaturePath(), e);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.broadinstitute.hellbender.engine;

import com.google.common.annotations.VisibleForTesting;
import htsjdk.beta.io.bundle.Bundle;
import htsjdk.tribble.Feature;
import htsjdk.tribble.FeatureCodec;
import org.apache.logging.log4j.LogManager;
Expand Down Expand Up @@ -52,6 +53,11 @@ public final class FeatureInput<T extends Feature> extends GATKPath implements S
*/
private transient Class<FeatureCodec<T, ?>> featureCodecClass;

/**
* retain any containing bundle in case we need to extract other resources from it
*/
private Bundle parentBundle;

/**
* Delimiter between the logical name and the file name in the --argument_name logical_name:feature_file syntax
*/
Expand Down Expand Up @@ -129,6 +135,34 @@ public FeatureInput(final String rawInputSpecifier, final String name, final Map
setTagAttributes(keyValueMap);
}

/**
* Construct a FeatureInput from a Bundle.
*
* @param primaryResourcePath the path for the primary feature resource for this bundle
* @param featureBundle an existing Bundle object; resources in this bundle MUST be IOPathBundleResources (that is,
* they must be backed by an IOPath, not an in-memory object)
* @param name the tag name for this feature input - may be null
*/
public FeatureInput(
final GATKPath primaryResourcePath,
final Bundle featureBundle,
final String name) {
super(primaryResourcePath);
// retain the containing bundle for later so we can interrogate it for other resources, like the index
this.parentBundle = featureBundle;
if (name != null) {
if (primaryResourcePath.getTag() != null) {
logger.warn(String.format(
"FeatureInput: user-provided tag name %s will be replaced with %s",
primaryResourcePath.getTag(),
name));
}
setTag(name);
}

}


/**
* Remember the FeatureCodec class for this input the first time it is discovered so we can bypass dynamic codec
* discovery when multiple FeatureDataSources are created for the same input.
Expand All @@ -144,6 +178,14 @@ public void setFeatureCodecClass(final Class<FeatureCodec<T, ?>> featureCodecCla
return this.featureCodecClass;
}

/**
* @return the parent bundle for this FeatureInput, if this input was derived from a Bundle. May
* return {@code null}. The returned bundle can be interrogated for companion resources.
*/
public Bundle getParentBundle() {
return parentBundle;
}

/**
* creates a name from the given filePath by finding the absolute path of the given input
*/
Expand Down
Loading

0 comments on commit c2b6beb

Please sign in to comment.