Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: new "scan throughput" objective in sampling compressor #1068

Open
wants to merge 3 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion bench-vortex/benches/compress_noci.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
mod tokio_runtime;

use core::cell::LazyCell;
use core::str::FromStr;
use core::sync::atomic::{AtomicBool, Ordering};
use std::cell::LazyCell;
use std::io::Cursor;
use std::path::Path;
use std::sync::Arc;
Expand Down
4 changes: 2 additions & 2 deletions docs/quickstart.rst
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ Use :func:`~vortex.encoding.compress` to compress the Vortex array and check the

>>> cvtx = vortex.compress(vtx)
>>> cvtx.nbytes
13963
13196
>>> cvtx.nbytes / vtx.nbytes
0.099...
0.093...

Vortex uses nearly ten times fewer bytes than Arrow. Fewer bytes means more of your data fits in
cache and RAM.
Expand Down
6 changes: 5 additions & 1 deletion vortex-sampling-compressor/src/compressors/alp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use vortex_alp::{
use vortex_array::aliases::hash_set::HashSet;
use vortex_array::array::PrimitiveArray;
use vortex_array::encoding::EncodingRef;
use vortex_array::stats::ArrayStatistics as _;
use vortex_array::stats::ArrayStatistics;
use vortex_array::variants::PrimitiveArrayTrait;
use vortex_array::{ArrayData, ArrayDef, IntoArrayData};
use vortex_dtype::PType;
Expand All @@ -26,6 +26,10 @@ impl EncodingCompressor for ALPCompressor {
constants::ALP_COST
}

fn decompression_gib_per_second(&self) -> f64 {
constants::ALP_GIB_PER_S
}

fn can_compress(&self, array: &ArrayData) -> Option<&dyn EncodingCompressor> {
// Only support primitive arrays
let parray = PrimitiveArray::try_from(array).ok()?;
Expand Down
6 changes: 5 additions & 1 deletion vortex-sampling-compressor/src/compressors/alp_rd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use vortex_alp::{match_each_alp_float_ptype, ALPRDEncoding, RDEncoder as ALPRDEn
use vortex_array::aliases::hash_set::HashSet;
use vortex_array::array::PrimitiveArray;
use vortex_array::encoding::EncodingRef;
use vortex_array::stats::ArrayStatistics as _;
use vortex_array::stats::ArrayStatistics;
use vortex_array::variants::PrimitiveArrayTrait;
use vortex_array::{ArrayData, ArrayDef, IntoArrayData, IntoArrayVariant};
use vortex_dtype::PType;
Expand Down Expand Up @@ -33,6 +33,10 @@ impl EncodingCompressor for ALPRDCompressor {
constants::ALP_RD_COST
}

fn decompression_gib_per_second(&self) -> f64 {
constants::ALP_RD_GIB_PER_S
}

fn can_compress(&self, array: &ArrayData) -> Option<&dyn EncodingCompressor> {
// Only support primitive arrays
let parray = PrimitiveArray::try_from(array).ok()?;
Expand Down
8 changes: 8 additions & 0 deletions vortex-sampling-compressor/src/compressors/bitpacked.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,14 @@ impl EncodingCompressor for BitPackedCompressor {
}
}

fn decompression_gib_per_second(&self) -> f64 {
if self.allow_patches {
constants::BITPACKED_WITH_PATCHES_GIB_PER_S
} else {
constants::BITPACKED_NO_PATCHES_GIB_PER_S
}
}

fn can_compress(&self, array: &ArrayData) -> Option<&dyn EncodingCompressor> {
// Only support primitive arrays
let parray = PrimitiveArray::try_from(array).ok()?;
Expand Down
6 changes: 5 additions & 1 deletion vortex-sampling-compressor/src/compressors/chunked.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use vortex_array::aliases::hash_set::HashSet;
use vortex_array::array::{Chunked, ChunkedArray};
use vortex_array::compress::compute_precompression_stats;
use vortex_array::encoding::EncodingRef;
use vortex_array::stats::ArrayStatistics as _;
use vortex_array::stats::ArrayStatistics;
use vortex_array::{ArrayDType, ArrayData, ArrayDef, IntoArrayData};
use vortex_error::{vortex_bail, VortexResult};

Expand Down Expand Up @@ -40,6 +40,10 @@ impl EncodingCompressor for ChunkedCompressor {
constants::CHUNKED_COST
}

fn decompression_gib_per_second(&self) -> f64 {
constants::CHUNKED_GIB_PER_S
}

fn can_compress(&self, array: &ArrayData) -> Option<&dyn EncodingCompressor> {
array.is_encoding(Chunked::ID).then_some(self)
}
Expand Down
4 changes: 4 additions & 0 deletions vortex-sampling-compressor/src/compressors/constant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ impl EncodingCompressor for ConstantCompressor {
constants::CONSTANT_COST
}

fn decompression_gib_per_second(&self) -> f64 {
constants::CONSTANT_GIB_PER_S
}

fn can_compress(&self, array: &ArrayData) -> Option<&dyn EncodingCompressor> {
(!array.is_empty() && array.statistics().compute_is_constant().unwrap_or(false))
.then_some(self as &dyn EncodingCompressor)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use vortex_array::aliases::hash_set::HashSet;
use vortex_array::array::TemporalArray;
use vortex_array::encoding::EncodingRef;
use vortex_array::stats::ArrayStatistics as _;
use vortex_array::stats::ArrayStatistics;
use vortex_array::{ArrayDType, ArrayData, ArrayDef, IntoArrayData};
use vortex_datetime_dtype::TemporalMetadata;
use vortex_datetime_parts::{
Expand All @@ -24,6 +24,10 @@ impl EncodingCompressor for DateTimePartsCompressor {
constants::DATE_TIME_PARTS_COST
}

fn decompression_gib_per_second(&self) -> f64 {
constants::DATE_TIME_PARTS_GIB_PER_S
}

fn can_compress(&self, array: &ArrayData) -> Option<&dyn EncodingCompressor> {
if let Ok(temporal_array) = TemporalArray::try_from(array) {
match temporal_array.temporal_metadata() {
Expand Down
6 changes: 5 additions & 1 deletion vortex-sampling-compressor/src/compressors/delta.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use vortex_array::aliases::hash_set::HashSet;
use vortex_array::array::PrimitiveArray;
use vortex_array::encoding::EncodingRef;
use vortex_array::stats::ArrayStatistics as _;
use vortex_array::stats::ArrayStatistics;
use vortex_array::variants::PrimitiveArrayTrait;
use vortex_array::{ArrayData, ArrayDef, IntoArrayData};
use vortex_error::VortexResult;
Expand All @@ -22,6 +22,10 @@ impl EncodingCompressor for DeltaCompressor {
constants::DELTA_COST
}

fn decompression_gib_per_second(&self) -> f64 {
constants::DELTA_GIB_PER_S
}

fn can_compress(&self, array: &ArrayData) -> Option<&dyn EncodingCompressor> {
// Only support primitive arrays
let parray = PrimitiveArray::try_from(array).ok()?;
Expand Down
4 changes: 4 additions & 0 deletions vortex-sampling-compressor/src/compressors/dict.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ impl EncodingCompressor for DictCompressor {
constants::DICT_COST
}

fn decompression_gib_per_second(&self) -> f64 {
constants::DICT_GIB_PER_S
}

fn can_compress(&self, array: &ArrayData) -> Option<&dyn EncodingCompressor> {
if array.encoding().id() != Primitive::ID
&& array.encoding().id() != VarBin::ID
Expand Down
4 changes: 4 additions & 0 deletions vortex-sampling-compressor/src/compressors/for.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ impl EncodingCompressor for FoRCompressor {
constants::FOR_COST
}

fn decompression_gib_per_second(&self) -> f64 {
constants::FOR_GIB_PER_S
}

fn can_compress(&self, array: &ArrayData) -> Option<&dyn EncodingCompressor> {
// Only support primitive arrays
let parray = PrimitiveArray::try_from(array).ok()?;
Expand Down
6 changes: 5 additions & 1 deletion vortex-sampling-compressor/src/compressors/fsst.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use fsst::Compressor;
use vortex_array::aliases::hash_set::HashSet;
use vortex_array::array::{VarBin, VarBinArray, VarBinView};
use vortex_array::encoding::EncodingRef;
use vortex_array::stats::ArrayStatistics as _;
use vortex_array::stats::ArrayStatistics;
use vortex_array::{ArrayDType, ArrayDef, IntoArrayData};
use vortex_dtype::DType;
use vortex_error::{vortex_bail, VortexResult};
Expand Down Expand Up @@ -37,6 +37,10 @@ impl EncodingCompressor for FSSTCompressor {
constants::FSST_COST
}

fn decompression_gib_per_second(&self) -> f64 {
constants::FSST_GIB_PER_S
}

fn can_compress(&self, array: &vortex_array::ArrayData) -> Option<&dyn EncodingCompressor> {
// FSST arrays must have DType::Utf8.
//
Expand Down
49 changes: 49 additions & 0 deletions vortex-sampling-compressor/src/compressors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::fmt::{Debug, Display, Formatter};
use std::hash::{Hash, Hasher};
use std::sync::Arc;

use itertools::Itertools;
use vortex_array::aliases::hash_set::HashSet;
use vortex_array::encoding::EncodingRef;
use vortex_array::stats::{ArrayStatistics, Statistics};
Expand Down Expand Up @@ -33,6 +34,8 @@ pub trait EncodingCompressor: Sync + Send + Debug {

fn cost(&self) -> u8;

fn decompression_gib_per_second(&self) -> f64;

fn can_compress(&self, array: &ArrayData) -> Option<&dyn EncodingCompressor>;

fn compress<'a>(
Expand Down Expand Up @@ -233,6 +236,52 @@ impl<'a> CompressedArray<'a> {
pub fn nbytes(&self) -> usize {
self.array.nbytes()
}

pub fn decompression_time_ms(&self, assumed_compression_ratio: f64) -> f64 {
const MS_PER_SEC: f64 = 1000.0;
const BYTES_PER_GB: f64 = 1_073_741_824.0;
lwwmanning marked this conversation as resolved.
Show resolved Hide resolved

// recursively compute the time to decompress all children
let children_time_ms = self
.path()
.iter()
.map(|c| {
c.children
.iter()
.zip_eq(self.array.children().iter())
.map(|(child_tree, child_array)| {
CompressedArray::compressed(
child_array.clone(),
child_tree.clone(),
None::<&dyn Statistics>,
)
.decompression_time_ms(assumed_compression_ratio)
})
.sum::<f64>()
})
.sum::<f64>();

// get or estimate the output size from decompressing `self`
let uncompressed_size = self
.array()
.statistics()
.compute_uncompressed_size_in_bytes()
.map(|size| size as f64)
.unwrap_or_else(|| self.nbytes() as f64 * assumed_compression_ratio);

// get the decompression speed of this compressor
let decompression_gib_per_sec = self
.path()
.as_ref()
.map(|c| c.compressor().decompression_gib_per_second())
.unwrap_or(500.0);

// compute the time to decompress `self`
let self_time_ms =
(MS_PER_SEC / decompression_gib_per_sec) * uncompressed_size / BYTES_PER_GB;

self_time_ms + children_time_ms
}
}

impl AsRef<ArrayData> for CompressedArray<'_> {
Expand Down
6 changes: 5 additions & 1 deletion vortex-sampling-compressor/src/compressors/roaring_bool.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use vortex_array::aliases::hash_set::HashSet;
use vortex_array::array::Bool;
use vortex_array::encoding::EncodingRef;
use vortex_array::stats::ArrayStatistics as _;
use vortex_array::stats::ArrayStatistics;
use vortex_array::{ArrayDType, ArrayData, ArrayDef, IntoArrayData, IntoArrayVariant};
use vortex_dtype::DType;
use vortex_dtype::Nullability::NonNullable;
Expand All @@ -23,6 +23,10 @@ impl EncodingCompressor for RoaringBoolCompressor {
constants::ROARING_BOOL_COST
}

fn decompression_gib_per_second(&self) -> f64 {
constants::ROARING_BOOL_GIB_PER_S
}

fn can_compress(&self, array: &ArrayData) -> Option<&dyn EncodingCompressor> {
// Only support bool arrays
if array.encoding().id() != Bool::ID {
Expand Down
4 changes: 4 additions & 0 deletions vortex-sampling-compressor/src/compressors/roaring_int.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ impl EncodingCompressor for RoaringIntCompressor {
constants::ROARING_INT_COST
}

fn decompression_gib_per_second(&self) -> f64 {
constants::ROARING_INT_GIB_PER_S
}

fn can_compress(&self, array: &ArrayData) -> Option<&dyn EncodingCompressor> {
// Only support non-nullable uint arrays
if !array.dtype().is_unsigned_int() || array.dtype().is_nullable() {
Expand Down
4 changes: 4 additions & 0 deletions vortex-sampling-compressor/src/compressors/runend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ impl EncodingCompressor for RunEndCompressor {
constants::RUN_END_COST
}

fn decompression_gib_per_second(&self) -> f64 {
constants::RUN_END_GIB_PER_S
}

fn can_compress(&self, array: &ArrayData) -> Option<&dyn EncodingCompressor> {
if array.encoding().id() != Primitive::ID {
return None;
Expand Down
6 changes: 5 additions & 1 deletion vortex-sampling-compressor/src/compressors/sparse.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use vortex_array::aliases::hash_set::HashSet;
use vortex_array::array::{Sparse, SparseArray, SparseEncoding};
use vortex_array::encoding::EncodingRef;
use vortex_array::stats::ArrayStatistics as _;
use vortex_array::stats::ArrayStatistics;
use vortex_array::{ArrayData, ArrayDef, IntoArrayData};
use vortex_error::VortexResult;

Expand All @@ -20,6 +20,10 @@ impl EncodingCompressor for SparseCompressor {
constants::SPARSE_COST
}

fn decompression_gib_per_second(&self) -> f64 {
constants::SPARSE_GIB_PER_S
}

fn can_compress(&self, array: &ArrayData) -> Option<&dyn EncodingCompressor> {
array.is_encoding(Sparse::ID).then_some(self)
}
Expand Down
6 changes: 5 additions & 1 deletion vortex-sampling-compressor/src/compressors/struct_.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use vortex_array::aliases::hash_set::HashSet;
use vortex_array::array::{Struct, StructArray};
use vortex_array::compress::compute_precompression_stats;
use vortex_array::encoding::EncodingRef;
use vortex_array::stats::ArrayStatistics as _;
use vortex_array::stats::ArrayStatistics;
use vortex_array::variants::StructArrayTrait;
use vortex_array::{ArrayData, ArrayDef, IntoArrayData};
use vortex_error::VortexResult;
Expand All @@ -23,6 +23,10 @@ impl EncodingCompressor for StructCompressor {
constants::STRUCT_COST
}

fn decompression_gib_per_second(&self) -> f64 {
constants::STRUCT_GIB_PER_S
}

fn can_compress(&self, array: &ArrayData) -> Option<&dyn EncodingCompressor> {
StructArray::try_from(array)
.ok()
Expand Down
4 changes: 4 additions & 0 deletions vortex-sampling-compressor/src/compressors/zigzag.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ impl EncodingCompressor for ZigZagCompressor {
constants::ZIGZAG_COST
}

fn decompression_gib_per_second(&self) -> f64 {
constants::ZIGZAG_GIB_PER_S
}

fn can_compress(&self, array: &ArrayData) -> Option<&dyn EncodingCompressor> {
// Only support primitive arrays
let parray = PrimitiveArray::try_from(array).ok()?;
Expand Down
Loading
Loading