Skip to content

Commit

Permalink
Multithreaded encoder (#13)
Browse files Browse the repository at this point in the history
The encoder can run multithreaded by buffering batches of input lines and processing each batch in a worker thread. In favorable circumstances this yields higher throughput than the default single-threaded approach, but there is a trade-off of greatly increased memory usage and copying. Activated with -t,--threads.
  • Loading branch information
mlin authored Aug 29, 2018
1 parent ed56b3d commit 4b9e325
Show file tree
Hide file tree
Showing 7 changed files with 186 additions and 32 deletions.
5 changes: 4 additions & 1 deletion .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
"ostream": "cpp",
"stdexcept": "cpp",
"cerrno": "cpp",
"sstream": "cpp"
"sstream": "cpp",
"iosfwd": "cpp",
"mutex": "cpp",
"chrono": "cpp"
}
}
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ Reads VCF text from standard input if filename is empty or -
Options:
-o,--output out.spvcf Write to out.spvcf instead of standard output
-p,--period P Ensure checkpoints (full dense rows) at this period or less (default: 1000)
-t,--threads N Use multithreaded encoder with this many worker threads [EXPERIMENTAL]
-q,--quiet Suppress statistics printed to standard error
-h,--help Show this help message
Expand Down Expand Up @@ -66,6 +67,8 @@ $ bgzip -dc my.spvcf.gz | ./spvcf decode > my.decoded.vcf

There's also `spvcf squeeze` to apply the QC squeezing transformation to a pVCF, without the sparse quote-encoding.

The multithreaded encoder should be used only if the single-threaded version is a proven bottleneck. It's capable of higher throughput in favorable circumstances, but trades off memory usage and copying. The memory usage scales with threads and period.

### Tabix slicing

If the regular `bgzip` and `tabix` utilities are used to block-compress and index a spVCF file, then `spvcf tabix` can take a genomic range slice from it, extracting spVCF which decodes standalone. (The regular `tabix` utility generates the index, but using it to take the slice would yield a broken fragment.) Internally, this entails decoding a small bit of the spVCF, determined by the `spvcf encode --period` option.
Expand Down
10 changes: 8 additions & 2 deletions spvcf.wdl
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ task spvcf {
File in_gz
Boolean squeeze = false
Boolean decode = false
String release = "v0.3.0"
Boolean multithread_encode = false
String release = "v0.5.0"

parameter_meta {
in_gz: "stream"
Expand All @@ -16,11 +17,16 @@ task spvcf {
wget -nv https://github.com/dnanexus-rnd/GLnexus/raw/master/cli/dxapplet/resources/usr/local/bin/bgzip
chmod +x spvcf bgzip

threads_arg=""
if [ "${multithread_encode}" == "true" ]; then
threads_arg="--threads $(nproc)"
fi

nm=$(basename "${in_gz}" .vcf.gz)
nm=$(basename "$nm" .spvcf.gz)
nm="$nm.${if decode then 'vcf.gz' else 'spvcf.gz'}"
mkdir out
pigz -dc "${in_gz}" | ./spvcf ${if decode then 'decode' else 'encode'} ${if squeeze then '-S' else ''} | ./bgzip -@ $(nproc) > out/$nm
pigz -dc "${in_gz}" | ./spvcf ${if decode then 'decode' else 'encode'} ${if squeeze then '-S' else ''} $threads_arg | ./bgzip -@ $(nproc) > out/$nm
}

runtime {
Expand Down
147 changes: 133 additions & 14 deletions src/main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@
#include <fstream>
#include <getopt.h>
#include <unistd.h>
#include <assert.h>
#include <deque>
#include <future>
#include <mutex>
#include "spVCF.h"

using namespace std;
Expand All @@ -13,6 +17,95 @@ enum class CodecMode {
decode
};

// Run encoder in a multithreaded way by buffering batches of input lines and
// spawning a thread to work on each batch. Below, main_codec has a simpler
// single-threaded default way to run the codec.
spVCF::transcode_stats multithreaded_encode(CodecMode mode, uint64_t checkpoint_period, bool squeeze, size_t thread_count,
istream& input_stream, ostream& output_stream) {
assert(mode != CodecMode::decode);

mutex mu;
// mu protects the following two variables:
deque<future<pair<spVCF::transcode_stats,shared_ptr<vector<string>>>>> output_batches;
bool input_complete = false;

// spawn "sink" task to await output blocks in order and write them to output_stream
future<spVCF::transcode_stats> sink = async(launch::async, [&]() {
spVCF::transcode_stats ans;
while (true) {
future<pair<spVCF::transcode_stats,shared_ptr<vector<string>>>> batch;
{
lock_guard<mutex> lock(mu);
if (output_batches.empty()) {
if (input_complete) {
break;
} else {
using namespace chrono_literals;
this_thread::sleep_for(100us);
continue;
}
}
batch = move(output_batches.front());
output_batches.pop_front();
}
auto rslt = move(batch.get());
for (auto& line : *rslt.second) {
output_stream << line << '\n';
if (!output_stream.good()) {
throw runtime_error("I/O error");
}
}
ans += rslt.first;
}
return ans;
});

// worker task to process a batch of input lines into output_batches
auto worker = [&](shared_ptr<vector<string>> input_batch) {
unique_ptr<spVCF::Transcoder> tc = spVCF::NewEncoder(checkpoint_period, (mode == CodecMode::encode), squeeze);
auto output_lines = make_shared<vector<string>>();
for (auto& input_line : *input_batch) {
output_lines->push_back(tc->ProcessLine(&input_line[0]));
}
return make_pair(tc->Stats(), output_lines);
};

// In driver thread, read batches of lines from input_stream, and spawn a worker
// for each batch. If the number of outstanding tasks hits thread_count, wait
// for some of them to drain.
auto input_batch = make_shared<vector<string>>();
string input_line;
for (; getline(input_stream, input_line); ) {
input_batch->push_back(move(input_line));
assert(input_line.empty());
if (input_batch->size() >= checkpoint_period) {
while (true) {
lock_guard<mutex> lock(mu);
if (output_batches.size() >= thread_count) {
using namespace chrono_literals;
this_thread::sleep_for(100us);
continue;
}
output_batches.push_back(async(launch::async, worker, input_batch));
break;
}
input_batch = make_shared<vector<string>>();
}
}
{
lock_guard<mutex> lock(mu);
if (!input_batch->empty()) {
output_batches.push_back(async(launch::async, worker, input_batch));
}
input_complete = true;
}
if (!input_stream.eof() || input_stream.bad()) {
throw runtime_error("I/O error");
}

return sink.get();
}

void help_codec(CodecMode mode) {
switch (mode) {
case CodecMode::encode:
Expand All @@ -23,6 +116,7 @@ void help_codec(CodecMode mode) {
<< "Options:" << endl
<< " -o,--output out.spvcf Write to out.spvcf instead of standard output" << endl
<< " -p,--period P Ensure checkpoints (full dense rows) at this period or less (default: 1000)" << endl
<< " -t,--threads N Use multithreaded encoder with this number of worker threads" << endl
<< " -q,--quiet Suppress statistics printed to standard error" << endl
<< " -h,--help Show this help message" << endl << endl
<< "Lossy transformation to increase compression: " << endl
Expand All @@ -38,6 +132,7 @@ void help_codec(CodecMode mode) {
<< "Reads VCF text from standard input if filename is empty or -" << endl << endl
<< "Options:" << endl
<< " -o,--output out.vcf Write to out.vcf instead of standard output" << endl
<< " -t,--threads N Use multithreaded encoder with this many worker threads [EXPERIMENTAL]" << endl
<< " -q,--quiet Suppress statistics printed to standard error" << endl
<< " -h,--help Show this help message" << endl << endl;
cout << "Squeezing is a lossy transformation to reduce pVCF size and increase compressibility." << endl
Expand Down Expand Up @@ -66,18 +161,20 @@ int main_codec(int argc, char *argv[], CodecMode mode) {
bool quiet = false;
string output_filename;
uint64_t checkpoint_period = 1000;
size_t thread_count = 1;

static struct option long_options[] = {
{"help", no_argument, 0, 'h'},
{"squeeze", no_argument, 0, 'S'},
{"period", required_argument, 0, 'p'},
{"threads", required_argument, 0, 't'},
{"quiet", no_argument, 0, 'q'},
{"output", required_argument, 0, 'o'},
{0, 0, 0, 0}
};

int c;
while (-1 != (c = getopt_long(argc, argv, "hSp:qo:",
while (-1 != (c = getopt_long(argc, argv, "hSp:qo:t:",
long_options, nullptr))) {
switch (c) {
case 'h':
Expand All @@ -91,7 +188,7 @@ int main_codec(int argc, char *argv[], CodecMode mode) {
squeeze = true;
break;
case 'p':
if (mode != CodecMode::encode) {
if (mode == CodecMode::decode) {
help_codec(mode);
return -1;
}
Expand All @@ -102,6 +199,18 @@ int main_codec(int argc, char *argv[], CodecMode mode) {
return -1;
}
break;
case 't':
if (mode == CodecMode::decode) {
help_codec(mode);
return -1;
}
errno=0;
thread_count = strtoull(optarg, nullptr, 10);
if (errno) {
cerr << "spvcf: couldn't parse --threads" << endl;
return -1;
}
break;
case 'q':
quiet = true;
break;
Expand Down Expand Up @@ -154,19 +263,30 @@ int main_codec(int argc, char *argv[], CodecMode mode) {
output_stream->setf(ios_base::unitbuf);

// Encode or decode
unique_ptr<spVCF::Transcoder> tc;
if (mode == CodecMode::decode) {
tc = spVCF::NewDecoder();
} else {
tc = spVCF::NewEncoder(checkpoint_period, (mode == CodecMode::encode), squeeze);
}
string input_line;
for (; getline(*input_stream, input_line); ) {
*output_stream << tc->ProcessLine(&input_line[0]);
*output_stream << '\n';
if (input_stream->fail() || input_stream->bad() || !output_stream->good()) {
spVCF::transcode_stats stats;
if (thread_count <= 1) {
unique_ptr<spVCF::Transcoder> tc;
if (mode == CodecMode::decode) {
tc = spVCF::NewDecoder();
} else {
tc = spVCF::NewEncoder(checkpoint_period, (mode == CodecMode::encode), squeeze);
}
string input_line;
for (; getline(*input_stream, input_line); ) {
*output_stream << tc->ProcessLine(&input_line[0]);
*output_stream << '\n';
if (input_stream->fail() || input_stream->bad() || !output_stream->good()) {
throw runtime_error("I/O error");
}
}
if (input_stream->bad()) {
throw runtime_error("I/O error");
}
stats = tc->Stats();
} else {
assert(mode != CodecMode::decode);
stats = multithreaded_encode(mode, checkpoint_period, squeeze, thread_count,
*input_stream, *output_stream);
}

// Close up
Expand All @@ -179,7 +299,6 @@ int main_codec(int argc, char *argv[], CodecMode mode) {

// Output stats
if (!quiet) {
auto stats = tc->Stats();
cerr.imbue(locale(""));
cerr << "N = " << fixed << stats.N << endl;
cerr << "dense cells = " << fixed << stats.N*stats.lines << endl;
Expand Down
7 changes: 2 additions & 5 deletions src/spVCF.cc
Original file line number Diff line number Diff line change
Expand Up @@ -278,12 +278,9 @@ const char* EncoderImpl::ProcessLine(char* input_line) {
}

// CHECKPOINT -- return a densely-encode row -- if we've switched to a new
// chromosome OR we've hit the specified period OR we've passed half the
// period and this line is mostly dense anyway.
// chromosome OR we've hit the specified period
if (chrom_ != tokens[0] ||
(checkpoint_period_ > 0 &&
(since_checkpoint_ >= checkpoint_period_ ||
(since_checkpoint_*2 >= checkpoint_period_ && sparse_cells*2 >= N)))) {
(checkpoint_period_ > 0 && since_checkpoint_ >= checkpoint_period_)) {
buffer_.Clear();
for (int t = 0; t < tokens.size(); t++) {
if (t > 0) {
Expand Down
11 changes: 11 additions & 0 deletions src/spVCF.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,17 @@ struct transcode_stats {

uint64_t squeezed_cells = 0; // cells whose QC measures were dropped
uint64_t checkpoints = 0; // checkpoints (purposely dense rows to aid partial decoding)

void operator+=(const transcode_stats& rhs) {
N = std::max(N, rhs.N);
lines += rhs.lines;
sparse_cells += rhs.sparse_cells;
sparse75_lines += rhs.sparse75_lines;
sparse90_lines += rhs.sparse90_lines;
sparse99_lines += rhs.sparse99_lines;
squeezed_cells += rhs.squeezed_cells;
checkpoints += rhs.checkpoints;
}
};

class Transcoder {
Expand Down
35 changes: 25 additions & 10 deletions test/spVCF.t
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,16 @@ D=/tmp/spVCFTests
rm -rf $D
mkdir -p $D

plan tests 20
plan tests 25

pigz -dc "$HERE/data/small.vcf.gz" > $D/small.vcf
"$EXE" encode -o $D/small.spvcf $D/small.vcf
is "$?" "0" "filename I/O"
is "$(cat $D/small.spvcf | wc -c)" "36986142" "filename I/O output size"
is "$(cat $D/small.spvcf | wc -c)" "36976928" "filename I/O output size"

pigz -dc "$HERE/data/small.vcf.gz" | "$EXE" encode -q > $D/small.spvcf
is "$?" "0" "piped I/O"
is "$(cat $D/small.spvcf | wc -c)" "36986142" "piped I/O output size"
is "$(cat $D/small.spvcf | wc -c)" "36976928" "piped I/O output size"

"$EXE" decode -o $D/small.roundtrip.vcf $D/small.spvcf
is "$?" "0" "decode"
Expand All @@ -30,12 +30,12 @@ is "$(cat $D/small.vcf | grep -v ^# | sha256sum)" \
"roundtrip fidelity"
is "$(egrep -o "spVCF_checkpointPOS=[0-9]+" $D/small.spvcf | uniq | cut -f2 -d = | tr '\n' ' ')" \
"5030088 5085728 5142746 5225415 5232998 5243839 5252753 5264502 5274001 " \
"5030088 5142716 5232967 5252665 5273811 " \
"checkpoint positions"
"$EXE" encode -S -p 500 -o $D/small.squeezed.spvcf $D/small.vcf
is "$?" "0" "squeeze"
is "$(cat $D/small.squeezed.spvcf | wc -c)" "17453976" "squeezed output size"
is "$(cat $D/small.squeezed.spvcf | wc -c)" "17447427" "squeezed output size"
"$EXE" decode -q -o $D/small.squeezed.roundtrip.vcf $D/small.squeezed.spvcf
is "$?" "0" "squeezed roundtrip decode"
Expand All @@ -50,24 +50,24 @@ is "$(cat $D/small.squeezed_only.vcf | grep -v ^# | sha256sum)" \
"squeeze (only) fidelity"
is "$(egrep -o "spVCF_checkpointPOS=[0-9]+" $D/small.squeezed.spvcf | uniq | cut -f2 -d = | tr '\n' ' ')" \
"5030088 5053371 5085752 5111059 5142907 5219436 5225476 5229291 5233041 5238611 5244009 5248275 5252854 5257548 5265256 5271818 " \
"5030088 5085728 5142746 5225415 5232998 5243839 5252707 5264483 5273919 " \
"squeezed checkpoint positions"
bgzip -c $D/small.squeezed.spvcf > $D/small.squeezed.spvcf.gz
tabix $D/small.squeezed.spvcf.gz
"$EXE" tabix -o $D/small.squeezed.slice.spvcf $D/small.squeezed.spvcf.gz chr21:5143000-5219900
"$EXE" tabix -o $D/small.squeezed.slice.spvcf $D/small.squeezed.spvcf.gz chr21:5143000-5226000
is "$?" "0" "tabix slice"
is "$(egrep -o "spVCF_checkpointPOS=[0-9]+" $D/small.squeezed.slice.spvcf | uniq -c | tr -d ' ' | tr '\n' ' ')" \
"249spVCF_checkpointPOS=5143363 20spVCF_checkpointPOS=5219436 " \
"slice checkpoints"
"497spVCF_checkpointPOS=5143363 28spVCF_checkpointPOS=5225415 " \
"slice checkpoint"
"$EXE" decode $D/small.squeezed.slice.spvcf > $D/small.squeezed.slice.vcf
is "$?" "0" "decode tabix slice"
bgzip -c $D/small.squeezed.roundtrip.vcf > $D/small.squeezed.roundtrip.vcf.gz
tabix $D/small.squeezed.roundtrip.vcf.gz
tabix $D/small.squeezed.roundtrip.vcf.gz chr21:5143000-5219900 > $D/small.squeezed.roundtrip.slice.vcf
tabix $D/small.squeezed.roundtrip.vcf.gz chr21:5143000-5226000 > $D/small.squeezed.roundtrip.slice.vcf
is "$(cat $D/small.squeezed.slice.vcf | grep -v ^# | sha256sum)" \
"$(cat $D/small.squeezed.roundtrip.slice.vcf | grep -v ^# | sha256sum)" \
"slice fidelity"
Expand All @@ -77,4 +77,19 @@ is "$(cat $D/small.squeezed.slice_chr21.spvcf | sha256sum)" \
"$(cat $D/small.squeezed.spvcf | sha256sum)" \
"chromosome slice"
pigz -dc "$HERE/data/small.vcf.gz" | "$EXE" encode -t $(nproc) - > $D/small.mt.spvcf
is "$?" "0" "multithreaded encode"
is "$(cat $D/small.mt.spvcf | wc -c)" "36973335" "multithreaded output size"
time "$EXE" decode -o $D/small.mt.roundtrip.vcf $D/small.mt.spvcf
is "$?" "0" "decode from multithreaded"
is "$(cat $D/small.vcf | grep -v ^# | sha256sum)" \
"$(cat $D/small.mt.roundtrip.vcf | grep -v ^# | sha256sum)" \
"multithreaded roundtrip fidelity"
is "$(egrep -o "spVCF_checkpointPOS=[0-9]+" $D/small.mt.spvcf | uniq | cut -f2 -d = | tr '\n' ' ')" \
"5030088 5139057 5232476 5252288 5273680 " \
"multithreaded checkpoint positions"
rm -rf $D

0 comments on commit 4b9e325

Please sign in to comment.