Skip to content

Commit

Permalink
Add support for re-serializing (sharding and reassembling) CRAM conta…
Browse files Browse the repository at this point in the history
…iners to a new stream.
  • Loading branch information
cmnbroad committed May 9, 2022
1 parent f461401 commit b1ded6e
Show file tree
Hide file tree
Showing 6 changed files with 367 additions and 5 deletions.
93 changes: 93 additions & 0 deletions src/main/java/htsjdk/samtools/CRAMContainerStreamRewriter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package htsjdk.samtools;

import htsjdk.samtools.cram.build.CramIO;
import htsjdk.samtools.cram.structure.Container;
import htsjdk.samtools.cram.structure.CramHeader;
import htsjdk.samtools.util.RuntimeIOException;

import java.io.IOException;
import java.io.OutputStream;

/**
* Rewrite a series of containers to a new stream. The CRAM header and SAMFileHeader containers are automatically
* written to the stream when this class is instantiated. An EOF container is automatically written when
* {@link #finish()} is called.
*/
public class CRAMContainerStreamRewriter {
private final OutputStream outputStream;
private final String outputStreamIdentifier;
private final CramHeader cramHeader;
private final SAMFileHeader samFileHeader;
private final CRAMIndexer cramIndexer;

private long streamOffset = 0L;
private long recordCounter = 0L;

/**
* Create a CRAMContainerStreamWriter for writing a series of containers into an output stream,
* with an optional output index.
*
* @param outputStream where to write the CRAM stream.
* @param samFileHeader {@link SAMFileHeader} to be used. Sort order is determined by the sortOrder property of this arg.
* @param outputStreamIdentifier used for display in error message display
* @param indexer CRAM indexer. Can be null if no index is required.
*/
public CRAMContainerStreamRewriter(
final OutputStream outputStream,
final CramHeader cramHeader,
final SAMFileHeader samFileHeader,
final String outputStreamIdentifier,
final CRAMIndexer indexer) {
this.outputStream = outputStream;
this.cramHeader = cramHeader;
this.samFileHeader = samFileHeader;
this.outputStreamIdentifier = outputStreamIdentifier;
this.cramIndexer = indexer;

//TODO: update the SAMFileHeader with a program group to leave a paper trail?
streamOffset = CramIO.writeCramHeader(cramHeader, outputStream);
streamOffset += Container.writeSAMFileHeaderContainer(cramHeader.getCRAMVersion(), samFileHeader, outputStream);
}

/**
* Writes a container to a stream, updating the (stream-relative) global record counter and byte offsets.
*
* Since this method mutates the values in the container, the container is no longer valid in the context
* of the stream from which it originated.
*
* @param container the container to emit to the stream. the container must conform to the version and sort
* order specified in the CRAM header and SAM header provided to the constructor
* {@link #CRAMContainerStreamRewriter(OutputStream, CramHeader, SAMFileHeader, String, CRAMIndexer)}.
* All the containers serialized to a single stream using this method must have originated from the
* same original context(/stream), obtained via {@link htsjdk.samtools.cram.build.CramContainerIterator}.
*/
public void rewriteContainer(final Container container) {
// update the container and slices with the correct global record counter and byte offsets
// (required for indexing)
container.relocateContainer(recordCounter, streamOffset);

// re-serialize the entire container and slice(s), block by block
streamOffset += container.write(cramHeader.getCRAMVersion(), outputStream);
recordCounter += container.getContainerHeader().getNumberOfRecords();

if (cramIndexer != null) {
cramIndexer.processContainer(container, ValidationStringency.SILENT);
}
}

/**
* Finish writing to the stream. Flushes the record cache and optionally emits an EOF container.
*/
public void finish() {
try {
CramIO.writeCramEOF(cramHeader.getCRAMVersion(), outputStream);
outputStream.flush();
if (cramIndexer != null) {
cramIndexer.finish();
}
} catch (final IOException e) {
throw new RuntimeIOException(String.format("IOException closing stream for %s", outputStreamIdentifier));
}
}

}
1 change: 0 additions & 1 deletion src/main/java/htsjdk/samtools/CRAMIndexer.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package htsjdk.samtools;

import htsjdk.samtools.cram.structure.CompressorCache;
import htsjdk.samtools.cram.structure.Container;

/**
Expand Down
25 changes: 24 additions & 1 deletion src/main/java/htsjdk/samtools/cram/structure/Container.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public class Container {
private final List<Slice> slices;

// container's byte offset from the start of the containing stream, used for indexing
private final long containerByteOffset;
private long containerByteOffset;

/**
* Create a Container with a {@link ReferenceContext} derived from its {@link Slice}s.
Expand Down Expand Up @@ -190,6 +190,7 @@ public int write(final CRAMVersion cramVersion, final OutputStream outputStream)
// landmark 0 = byte length of the compression header
// landmarks after 0 = byte length of the compression header plus all slices before this one
landmarks.add(tempOutputStream.size());
slice.byteOffsetOfContainer = containerByteOffset;
slice.write(cramVersion, tempOutputStream);
}
getContainerHeader().setLandmarks(landmarks);
Expand Down Expand Up @@ -335,6 +336,28 @@ public List<SAMRecord> getSAMRecords(
public CompressionHeader getCompressionHeader() { return compressionHeader; }
public AlignmentContext getAlignmentContext() { return containerHeader.getAlignmentContext(); }
public long getContainerByteOffset() { return containerByteOffset; }

/**
* Update the stream-relative values (global record counter and stream byte offset) for this
* container. For use when re-serializing a container that has been read from an existing stream
* into a new stream. This method mutates the container and it's slices - the container is no
* longer valid in the context of it's original stream.
*
* @param containerRecordCounter the new global record counter for this container
* @param streamByteOffset the new stream byte offset counter for this container
* @return the updated global record counter
*/
public long relocateContainer(final long containerRecordCounter, final long streamByteOffset) {
this.containerByteOffset = streamByteOffset;
this.getContainerHeader().setGlobalRecordCounter(containerRecordCounter);

long sliceRecordCounter = containerRecordCounter;
for (final Slice slice : getSlices()) {
sliceRecordCounter = slice.relocateSlice(sliceRecordCounter, streamByteOffset);
}
return sliceRecordCounter;
}

public List<Slice> getSlices() { return slices; }
public boolean isEOF() {
return containerHeader.isEOF() && (getSlices() == null || getSlices().size() == 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public class ContainerHeader {
// total length of all blocks in this container (total length of this container, minus the Container Header).
private final AlignmentContext alignmentContext;
private final int recordCount;
private final long globalRecordCounter;
private long globalRecordCounter;
private final long baseCount;
private final int blockCount;

Expand Down Expand Up @@ -249,4 +249,8 @@ public boolean isEOF() {
return v3 || v2;
}

void setGlobalRecordCounter(final long recordCounter) {
this.globalRecordCounter = recordCount;
}

}
20 changes: 18 additions & 2 deletions src/main/java/htsjdk/samtools/cram/structure/Slice.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public class Slice {
// Slice header components as defined in the spec
private final AlignmentContext alignmentContext; // ref sequence, alignment start and span
private final int nRecords;
private final long globalRecordCounter;
private long globalRecordCounter;
private final int nSliceBlocks; // includes the core block and external blocks, but not the header block
private List<Integer> contentIDs;
private int embeddedReferenceBlockContentID = EMBEDDED_REFERENCE_ABSENT_CONTENT_ID;
Expand All @@ -78,7 +78,7 @@ public class Slice {

private final CompressionHeader compressionHeader;
private final SliceBlocks sliceBlocks;
private final long byteOffsetOfContainer;
public long byteOffsetOfContainer;

private Block sliceHeaderBlock;

Expand Down Expand Up @@ -518,6 +518,22 @@ public void normalizeCRAMRecords(final List<CRAMCompressionRecord> cramCompressi
}
}

/**
* Update the stream-relative values (global record counter and container stream byte offset) for
* this slice. For use when re-serializing a container that has been read from an existing stream
* into a new stream. This method mutates the container and it's slices - the container is no
* longer valid in the context of it's original stream.
*
* @param sliceRecordCounter the new global record counter for this slice
* @param containerByteOffset the new stream byte offset counter for this slice's enclosing container
* @return the updated global record counter
*/
long relocateSlice(final long sliceRecordCounter, final long containerByteOffset) {
this.byteOffsetOfContainer = containerByteOffset;
this.globalRecordCounter = sliceRecordCounter;
return sliceRecordCounter + getNumberOfRecords();
}

private int getReferenceOffset(final boolean hasEmbeddedReference) {
final ReferenceContext sliceReferenceContext = getAlignmentContext().getReferenceContext();
return sliceReferenceContext.isMappedSingleRef() && hasEmbeddedReference ?
Expand Down
Loading

0 comments on commit b1ded6e

Please sign in to comment.