Skip to content

Commit

Permalink
feat: new scan performance objective in compressor
Browse files Browse the repository at this point in the history
  • Loading branch information
lwwmanning committed Nov 18, 2024
1 parent 18986c2 commit 39cdcf3
Show file tree
Hide file tree
Showing 25 changed files with 258 additions and 67 deletions.
2 changes: 1 addition & 1 deletion bench-vortex/benches/compress_noci.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
mod tokio_runtime;

use core::cell::LazyCell;
use core::str::FromStr;
use core::sync::atomic::{AtomicBool, Ordering};
use std::cell::LazyCell;
use std::io::Cursor;
use std::path::Path;
use std::sync::Arc;
Expand Down
4 changes: 1 addition & 3 deletions bench-vortex/benches/compressor_throughput.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,7 @@ fn strings(c: &mut Criterion) {

let varbinview_arr = VarBinViewArray::from_iter_str(gen_varbin_words(1_000_000, 0.00005));
let (codes, values) = dict_encode_varbinview(&varbinview_arr);
group.throughput(Throughput::Bytes(
varbinview_arr.clone().into_array().nbytes() as u64,
));
group.throughput(Throughput::Bytes(varbinview_arr.clone().into_array().nbytes() as u64));
group.bench_function("dict_decode_varbinview", |b| {
b.iter_batched(
|| DictArray::try_new(codes.clone().into_array(), values.clone().into_array()).unwrap(),
Expand Down
4 changes: 2 additions & 2 deletions docs/quickstart.rst
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ Use :func:`~vortex.encoding.compress` to compress the Vortex array and check the

>>> cvtx = vortex.compress(vtx)
>>> cvtx.nbytes
13963
13196
>>> cvtx.nbytes / vtx.nbytes
0.099...
0.093...

Vortex uses nearly ten times fewer bytes than Arrow. Fewer bytes means more of your data fits in
cache and RAM.
Expand Down
2 changes: 1 addition & 1 deletion encodings/dict/benches/dict_compress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ fn gen_varbin_words(len: usize, uniqueness: f64) -> Vec<String> {
let uniq_cnt = (len as f64 * uniqueness) as usize;
let dict: Vec<String> = (0..uniq_cnt)
.map(|_| {
(&mut rng)
(&mut rng)
.sample_iter(&Alphanumeric)
.take(16)
.map(char::from)
Expand Down
2 changes: 1 addition & 1 deletion vortex-array/src/stats/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ pub trait Statistics {
fn compute_all(&self, stats: &[Stat]) -> VortexResult<StatsSet> {
let mut stats_set = self.to_set();
for stat in stats {
if let Some(s) = self.compute(*stat) {
if let Some(s) = self.compute(*stat) {
stats_set.set(*stat, s)
}
}
Expand Down
6 changes: 5 additions & 1 deletion vortex-sampling-compressor/src/compressors/alp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use vortex_alp::{
use vortex_array::aliases::hash_set::HashSet;
use vortex_array::array::PrimitiveArray;
use vortex_array::encoding::EncodingRef;
use vortex_array::stats::ArrayStatistics as _;
use vortex_array::stats::ArrayStatistics;
use vortex_array::variants::PrimitiveArrayTrait;
use vortex_array::{ArrayData, ArrayDef, IntoArrayData};
use vortex_dtype::PType;
Expand All @@ -26,6 +26,10 @@ impl EncodingCompressor for ALPCompressor {
constants::ALP_COST
}

fn decompression_gib_per_second(&self) -> f64 {
constants::ALP_GIB_PER_S
}

fn can_compress(&self, array: &ArrayData) -> Option<&dyn EncodingCompressor> {
// Only support primitive arrays
let parray = PrimitiveArray::try_from(array).ok()?;
Expand Down
6 changes: 5 additions & 1 deletion vortex-sampling-compressor/src/compressors/alp_rd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use vortex_alp::{match_each_alp_float_ptype, ALPRDEncoding, RDEncoder as ALPRDEn
use vortex_array::aliases::hash_set::HashSet;
use vortex_array::array::PrimitiveArray;
use vortex_array::encoding::EncodingRef;
use vortex_array::stats::ArrayStatistics as _;
use vortex_array::stats::ArrayStatistics;
use vortex_array::variants::PrimitiveArrayTrait;
use vortex_array::{ArrayData, ArrayDef, IntoArrayData, IntoArrayVariant};
use vortex_dtype::PType;
Expand Down Expand Up @@ -33,6 +33,10 @@ impl EncodingCompressor for ALPRDCompressor {
constants::ALP_RD_COST
}

fn decompression_gib_per_second(&self) -> f64 {
constants::ALP_RD_GIB_PER_S
}

fn can_compress(&self, array: &ArrayData) -> Option<&dyn EncodingCompressor> {
// Only support primitive arrays
let parray = PrimitiveArray::try_from(array).ok()?;
Expand Down
8 changes: 8 additions & 0 deletions vortex-sampling-compressor/src/compressors/bitpacked.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,14 @@ impl EncodingCompressor for BitPackedCompressor {
}
}

fn decompression_gib_per_second(&self) -> f64 {
if self.allow_patches {
constants::BITPACKED_WITH_PATCHES_GIB_PER_S
} else {
constants::BITPACKED_NO_PATCHES_GIB_PER_S
}
}

fn can_compress(&self, array: &ArrayData) -> Option<&dyn EncodingCompressor> {
// Only support primitive arrays
let parray = PrimitiveArray::try_from(array).ok()?;
Expand Down
6 changes: 5 additions & 1 deletion vortex-sampling-compressor/src/compressors/chunked.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use vortex_array::aliases::hash_set::HashSet;
use vortex_array::array::{Chunked, ChunkedArray};
use vortex_array::compress::compute_precompression_stats;
use vortex_array::encoding::EncodingRef;
use vortex_array::stats::ArrayStatistics as _;
use vortex_array::stats::ArrayStatistics;
use vortex_array::{ArrayDType, ArrayData, ArrayDef, IntoArrayData};
use vortex_error::{vortex_bail, VortexResult};

Expand Down Expand Up @@ -40,6 +40,10 @@ impl EncodingCompressor for ChunkedCompressor {
constants::CHUNKED_COST
}

fn decompression_gib_per_second(&self) -> f64 {
constants::CHUNKED_GIB_PER_S
}

fn can_compress(&self, array: &ArrayData) -> Option<&dyn EncodingCompressor> {
array.is_encoding(Chunked::ID).then_some(self)
}
Expand Down
4 changes: 4 additions & 0 deletions vortex-sampling-compressor/src/compressors/constant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ impl EncodingCompressor for ConstantCompressor {
constants::CONSTANT_COST
}

fn decompression_gib_per_second(&self) -> f64 {
constants::CONSTANT_GIB_PER_S
}

fn can_compress(&self, array: &ArrayData) -> Option<&dyn EncodingCompressor> {
(!array.is_empty() && array.statistics().compute_is_constant().unwrap_or(false))
.then_some(self as &dyn EncodingCompressor)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use vortex_array::aliases::hash_set::HashSet;
use vortex_array::array::TemporalArray;
use vortex_array::encoding::EncodingRef;
use vortex_array::stats::ArrayStatistics as _;
use vortex_array::stats::ArrayStatistics;
use vortex_array::{ArrayDType, ArrayData, ArrayDef, IntoArrayData};
use vortex_datetime_dtype::TemporalMetadata;
use vortex_datetime_parts::{
Expand All @@ -24,6 +24,10 @@ impl EncodingCompressor for DateTimePartsCompressor {
constants::DATE_TIME_PARTS_COST
}

fn decompression_gib_per_second(&self) -> f64 {
constants::DATE_TIME_PARTS_GIB_PER_S
}

fn can_compress(&self, array: &ArrayData) -> Option<&dyn EncodingCompressor> {
if let Ok(temporal_array) = TemporalArray::try_from(array) {
match temporal_array.temporal_metadata() {
Expand Down
6 changes: 5 additions & 1 deletion vortex-sampling-compressor/src/compressors/delta.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use vortex_array::aliases::hash_set::HashSet;
use vortex_array::array::PrimitiveArray;
use vortex_array::encoding::EncodingRef;
use vortex_array::stats::ArrayStatistics as _;
use vortex_array::stats::ArrayStatistics;
use vortex_array::variants::PrimitiveArrayTrait;
use vortex_array::{ArrayData, ArrayDef, IntoArrayData};
use vortex_error::VortexResult;
Expand All @@ -22,6 +22,10 @@ impl EncodingCompressor for DeltaCompressor {
constants::DELTA_COST
}

fn decompression_gib_per_second(&self) -> f64 {
constants::DELTA_GIB_PER_S
}

fn can_compress(&self, array: &ArrayData) -> Option<&dyn EncodingCompressor> {
// Only support primitive arrays
let parray = PrimitiveArray::try_from(array).ok()?;
Expand Down
4 changes: 4 additions & 0 deletions vortex-sampling-compressor/src/compressors/dict.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ impl EncodingCompressor for DictCompressor {
constants::DICT_COST
}

fn decompression_gib_per_second(&self) -> f64 {
constants::DICT_GIB_PER_S
}

fn can_compress(&self, array: &ArrayData) -> Option<&dyn EncodingCompressor> {
if array.encoding().id() != Primitive::ID
&& array.encoding().id() != VarBin::ID
Expand Down
4 changes: 4 additions & 0 deletions vortex-sampling-compressor/src/compressors/for.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ impl EncodingCompressor for FoRCompressor {
constants::FOR_COST
}

fn decompression_gib_per_second(&self) -> f64 {
constants::FOR_GIB_PER_S
}

fn can_compress(&self, array: &ArrayData) -> Option<&dyn EncodingCompressor> {
// Only support primitive arrays
let parray = PrimitiveArray::try_from(array).ok()?;
Expand Down
6 changes: 5 additions & 1 deletion vortex-sampling-compressor/src/compressors/fsst.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use fsst::Compressor;
use vortex_array::aliases::hash_set::HashSet;
use vortex_array::array::{VarBin, VarBinArray, VarBinView};
use vortex_array::encoding::EncodingRef;
use vortex_array::stats::ArrayStatistics as _;
use vortex_array::stats::ArrayStatistics;
use vortex_array::{ArrayDType, ArrayDef, IntoArrayData};
use vortex_dtype::DType;
use vortex_error::{vortex_bail, VortexResult};
Expand Down Expand Up @@ -37,6 +37,10 @@ impl EncodingCompressor for FSSTCompressor {
constants::FSST_COST
}

fn decompression_gib_per_second(&self) -> f64 {
constants::FSST_GIB_PER_S
}

fn can_compress(&self, array: &vortex_array::ArrayData) -> Option<&dyn EncodingCompressor> {
// FSST arrays must have DType::Utf8.
//
Expand Down
49 changes: 49 additions & 0 deletions vortex-sampling-compressor/src/compressors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::fmt::{Debug, Display, Formatter};
use std::hash::{Hash, Hasher};
use std::sync::Arc;

use itertools::Itertools;
use vortex_array::aliases::hash_set::HashSet;
use vortex_array::encoding::EncodingRef;
use vortex_array::stats::{ArrayStatistics, Statistics};
Expand Down Expand Up @@ -33,6 +34,8 @@ pub trait EncodingCompressor: Sync + Send + Debug {

fn cost(&self) -> u8;

fn decompression_gib_per_second(&self) -> f64;

fn can_compress(&self, array: &ArrayData) -> Option<&dyn EncodingCompressor>;

fn compress<'a>(
Expand Down Expand Up @@ -233,6 +236,52 @@ impl<'a> CompressedArray<'a> {
pub fn nbytes(&self) -> usize {
self.array.nbytes()
}

pub fn decompression_time_ms(&self, assumed_compression_ratio: f64) -> f64 {
const MS_PER_SEC: f64 = 1000.0;
const BYTES_PER_GB: f64 = 1_073_741_824.0;

// recursively compute the time to decompress all children
let children_time_ms = self
.path()
.iter()
.map(|c| {
c.children
.iter()
.zip_eq(self.array.children().iter())
.map(|(child_tree, child_array)| {
CompressedArray::compressed(
child_array.clone(),
child_tree.clone(),
None::<&dyn Statistics>,
)
.decompression_time_ms(assumed_compression_ratio)
})
.sum::<f64>()
})
.sum::<f64>();

// get or estimate the output size from decompressing `self`
let uncompressed_size = self
.array()
.statistics()
.compute_uncompressed_size_in_bytes()
.map(|size| size as f64)
.unwrap_or_else(|| self.nbytes() as f64 * assumed_compression_ratio);

// get the decompression speed of this compressor
let decompression_gib_per_sec = self
.path()
.as_ref()
.map(|c| c.compressor().decompression_gib_per_second())
.unwrap_or(500.0);

// compute the time to decompress `self`
let self_time_ms =
(MS_PER_SEC / decompression_gib_per_sec) * uncompressed_size / BYTES_PER_GB;

self_time_ms + children_time_ms
}
}

impl AsRef<ArrayData> for CompressedArray<'_> {
Expand Down
6 changes: 5 additions & 1 deletion vortex-sampling-compressor/src/compressors/roaring_bool.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use vortex_array::aliases::hash_set::HashSet;
use vortex_array::array::Bool;
use vortex_array::encoding::EncodingRef;
use vortex_array::stats::ArrayStatistics as _;
use vortex_array::stats::ArrayStatistics;
use vortex_array::{ArrayDType, ArrayData, ArrayDef, IntoArrayData, IntoArrayVariant};
use vortex_dtype::DType;
use vortex_dtype::Nullability::NonNullable;
Expand All @@ -23,6 +23,10 @@ impl EncodingCompressor for RoaringBoolCompressor {
constants::ROARING_BOOL_COST
}

fn decompression_gib_per_second(&self) -> f64 {
constants::ROARING_BOOL_GIB_PER_S
}

fn can_compress(&self, array: &ArrayData) -> Option<&dyn EncodingCompressor> {
// Only support bool arrays
if array.encoding().id() != Bool::ID {
Expand Down
4 changes: 4 additions & 0 deletions vortex-sampling-compressor/src/compressors/roaring_int.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ impl EncodingCompressor for RoaringIntCompressor {
constants::ROARING_INT_COST
}

fn decompression_gib_per_second(&self) -> f64 {
constants::ROARING_INT_GIB_PER_S
}

fn can_compress(&self, array: &ArrayData) -> Option<&dyn EncodingCompressor> {
// Only support non-nullable uint arrays
if !array.dtype().is_unsigned_int() || array.dtype().is_nullable() {
Expand Down
4 changes: 4 additions & 0 deletions vortex-sampling-compressor/src/compressors/runend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ impl EncodingCompressor for RunEndCompressor {
constants::RUN_END_COST
}

fn decompression_gib_per_second(&self) -> f64 {
constants::RUN_END_GIB_PER_S
}

fn can_compress(&self, array: &ArrayData) -> Option<&dyn EncodingCompressor> {
if array.encoding().id() != Primitive::ID {
return None;
Expand Down
6 changes: 5 additions & 1 deletion vortex-sampling-compressor/src/compressors/sparse.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use vortex_array::aliases::hash_set::HashSet;
use vortex_array::array::{Sparse, SparseArray, SparseEncoding};
use vortex_array::encoding::EncodingRef;
use vortex_array::stats::ArrayStatistics as _;
use vortex_array::stats::ArrayStatistics;
use vortex_array::{ArrayData, ArrayDef, IntoArrayData};
use vortex_error::VortexResult;

Expand All @@ -20,6 +20,10 @@ impl EncodingCompressor for SparseCompressor {
constants::SPARSE_COST
}

fn decompression_gib_per_second(&self) -> f64 {
constants::SPARSE_GIB_PER_S
}

fn can_compress(&self, array: &ArrayData) -> Option<&dyn EncodingCompressor> {
array.is_encoding(Sparse::ID).then_some(self)
}
Expand Down
6 changes: 5 additions & 1 deletion vortex-sampling-compressor/src/compressors/struct_.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use vortex_array::aliases::hash_set::HashSet;
use vortex_array::array::{Struct, StructArray};
use vortex_array::compress::compute_precompression_stats;
use vortex_array::encoding::EncodingRef;
use vortex_array::stats::ArrayStatistics as _;
use vortex_array::stats::ArrayStatistics;
use vortex_array::variants::StructArrayTrait;
use vortex_array::{ArrayData, ArrayDef, IntoArrayData};
use vortex_error::VortexResult;
Expand All @@ -23,6 +23,10 @@ impl EncodingCompressor for StructCompressor {
constants::STRUCT_COST
}

fn decompression_gib_per_second(&self) -> f64 {
constants::STRUCT_GIB_PER_S
}

fn can_compress(&self, array: &ArrayData) -> Option<&dyn EncodingCompressor> {
StructArray::try_from(array)
.ok()
Expand Down
4 changes: 4 additions & 0 deletions vortex-sampling-compressor/src/compressors/zigzag.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ impl EncodingCompressor for ZigZagCompressor {
constants::ZIGZAG_COST
}

fn decompression_gib_per_second(&self) -> f64 {
constants::ZIGZAG_GIB_PER_S
}

fn can_compress(&self, array: &ArrayData) -> Option<&dyn EncodingCompressor> {
// Only support primitive arrays
let parray = PrimitiveArray::try_from(array).ok()?;
Expand Down
Loading

0 comments on commit 39cdcf3

Please sign in to comment.