diff --git a/bench-vortex/benches/compress_noci.rs b/bench-vortex/benches/compress_noci.rs index 5030b74d3..215948b7a 100644 --- a/bench-vortex/benches/compress_noci.rs +++ b/bench-vortex/benches/compress_noci.rs @@ -1,8 +1,8 @@ mod tokio_runtime; +use core::cell::LazyCell; use core::str::FromStr; use core::sync::atomic::{AtomicBool, Ordering}; -use std::cell::LazyCell; use std::io::Cursor; use std::path::Path; use std::sync::Arc; diff --git a/bench-vortex/benches/compressor_throughput.rs b/bench-vortex/benches/compressor_throughput.rs index 834e5ff47..4f30c1353 100644 --- a/bench-vortex/benches/compressor_throughput.rs +++ b/bench-vortex/benches/compressor_throughput.rs @@ -103,9 +103,7 @@ fn strings(c: &mut Criterion) { let varbinview_arr = VarBinViewArray::from_iter_str(gen_varbin_words(1_000_000, 0.00005)); let (codes, values) = dict_encode_varbinview(&varbinview_arr); - group.throughput(Throughput::Bytes( - varbinview_arr.clone().into_array().nbytes() as u64, - )); + group.throughput(Throughput::Bytes(varbinview_arr.clone().into_array().nbytes() as u64)); group.bench_function("dict_decode_varbinview", |b| { b.iter_batched( || DictArray::try_new(codes.clone().into_array(), values.clone().into_array()).unwrap(), diff --git a/docs/quickstart.rst b/docs/quickstart.rst index 1cd5f6781..7ed994dd8 100644 --- a/docs/quickstart.rst +++ b/docs/quickstart.rst @@ -46,9 +46,9 @@ Use :func:`~vortex.encoding.compress` to compress the Vortex array and check the >>> cvtx = vortex.compress(vtx) >>> cvtx.nbytes - 13963 + 13196 >>> cvtx.nbytes / vtx.nbytes - 0.099... + 0.093... Vortex uses nearly ten times fewer bytes than Arrow. Fewer bytes means more of your data fits in cache and RAM. diff --git a/encodings/dict/benches/dict_compress.rs b/encodings/dict/benches/dict_compress.rs index 068ef22b2..21a5b0782 100644 --- a/encodings/dict/benches/dict_compress.rs +++ b/encodings/dict/benches/dict_compress.rs @@ -22,7 +22,7 @@ fn gen_varbin_words(len: usize, uniqueness: f64) -> Vec { let uniq_cnt = (len as f64 * uniqueness) as usize; let dict: Vec = (0..uniq_cnt) .map(|_| { - (&mut rng) + (&mut rng) .sample_iter(&Alphanumeric) .take(16) .map(char::from) diff --git a/vortex-array/src/stats/mod.rs b/vortex-array/src/stats/mod.rs index e0e0dfe21..77200c023 100644 --- a/vortex-array/src/stats/mod.rs +++ b/vortex-array/src/stats/mod.rs @@ -105,7 +105,7 @@ pub trait Statistics { fn compute_all(&self, stats: &[Stat]) -> VortexResult { let mut stats_set = self.to_set(); for stat in stats { - if let Some(s) = self.compute(*stat) { + if let Some(s) = self.compute(*stat) { stats_set.set(*stat, s) } } diff --git a/vortex-sampling-compressor/src/compressors/alp.rs b/vortex-sampling-compressor/src/compressors/alp.rs index d57e4eade..615e40e49 100644 --- a/vortex-sampling-compressor/src/compressors/alp.rs +++ b/vortex-sampling-compressor/src/compressors/alp.rs @@ -4,7 +4,7 @@ use vortex_alp::{ use vortex_array::aliases::hash_set::HashSet; use vortex_array::array::PrimitiveArray; use vortex_array::encoding::EncodingRef; -use vortex_array::stats::ArrayStatistics as _; +use vortex_array::stats::ArrayStatistics; use vortex_array::variants::PrimitiveArrayTrait; use vortex_array::{ArrayData, ArrayDef, IntoArrayData}; use vortex_dtype::PType; @@ -26,6 +26,10 @@ impl EncodingCompressor for ALPCompressor { constants::ALP_COST } + fn decompression_gib_per_second(&self) -> f64 { + constants::ALP_GIB_PER_S + } + fn can_compress(&self, array: &ArrayData) -> Option<&dyn EncodingCompressor> { // Only support primitive arrays let parray = PrimitiveArray::try_from(array).ok()?; diff --git a/vortex-sampling-compressor/src/compressors/alp_rd.rs b/vortex-sampling-compressor/src/compressors/alp_rd.rs index dd5944601..13fe5dd40 100644 --- a/vortex-sampling-compressor/src/compressors/alp_rd.rs +++ b/vortex-sampling-compressor/src/compressors/alp_rd.rs @@ -5,7 +5,7 @@ use vortex_alp::{match_each_alp_float_ptype, ALPRDEncoding, RDEncoder as ALPRDEn use vortex_array::aliases::hash_set::HashSet; use vortex_array::array::PrimitiveArray; use vortex_array::encoding::EncodingRef; -use vortex_array::stats::ArrayStatistics as _; +use vortex_array::stats::ArrayStatistics; use vortex_array::variants::PrimitiveArrayTrait; use vortex_array::{ArrayData, ArrayDef, IntoArrayData, IntoArrayVariant}; use vortex_dtype::PType; @@ -33,6 +33,10 @@ impl EncodingCompressor for ALPRDCompressor { constants::ALP_RD_COST } + fn decompression_gib_per_second(&self) -> f64 { + constants::ALP_RD_GIB_PER_S + } + fn can_compress(&self, array: &ArrayData) -> Option<&dyn EncodingCompressor> { // Only support primitive arrays let parray = PrimitiveArray::try_from(array).ok()?; diff --git a/vortex-sampling-compressor/src/compressors/bitpacked.rs b/vortex-sampling-compressor/src/compressors/bitpacked.rs index 7d8b6007e..3b64441d8 100644 --- a/vortex-sampling-compressor/src/compressors/bitpacked.rs +++ b/vortex-sampling-compressor/src/compressors/bitpacked.rs @@ -52,6 +52,14 @@ impl EncodingCompressor for BitPackedCompressor { } } + fn decompression_gib_per_second(&self) -> f64 { + if self.allow_patches { + constants::BITPACKED_WITH_PATCHES_GIB_PER_S + } else { + constants::BITPACKED_NO_PATCHES_GIB_PER_S + } + } + fn can_compress(&self, array: &ArrayData) -> Option<&dyn EncodingCompressor> { // Only support primitive arrays let parray = PrimitiveArray::try_from(array).ok()?; diff --git a/vortex-sampling-compressor/src/compressors/chunked.rs b/vortex-sampling-compressor/src/compressors/chunked.rs index 478a2f083..f7ac9c444 100644 --- a/vortex-sampling-compressor/src/compressors/chunked.rs +++ b/vortex-sampling-compressor/src/compressors/chunked.rs @@ -6,7 +6,7 @@ use vortex_array::aliases::hash_set::HashSet; use vortex_array::array::{Chunked, ChunkedArray}; use vortex_array::compress::compute_precompression_stats; use vortex_array::encoding::EncodingRef; -use vortex_array::stats::ArrayStatistics as _; +use vortex_array::stats::ArrayStatistics; use vortex_array::{ArrayDType, ArrayData, ArrayDef, IntoArrayData}; use vortex_error::{vortex_bail, VortexResult}; @@ -40,6 +40,10 @@ impl EncodingCompressor for ChunkedCompressor { constants::CHUNKED_COST } + fn decompression_gib_per_second(&self) -> f64 { + constants::CHUNKED_GIB_PER_S + } + fn can_compress(&self, array: &ArrayData) -> Option<&dyn EncodingCompressor> { array.is_encoding(Chunked::ID).then_some(self) } diff --git a/vortex-sampling-compressor/src/compressors/constant.rs b/vortex-sampling-compressor/src/compressors/constant.rs index 4a2453f5f..655f13258 100644 --- a/vortex-sampling-compressor/src/compressors/constant.rs +++ b/vortex-sampling-compressor/src/compressors/constant.rs @@ -21,6 +21,10 @@ impl EncodingCompressor for ConstantCompressor { constants::CONSTANT_COST } + fn decompression_gib_per_second(&self) -> f64 { + constants::CONSTANT_GIB_PER_S + } + fn can_compress(&self, array: &ArrayData) -> Option<&dyn EncodingCompressor> { (!array.is_empty() && array.statistics().compute_is_constant().unwrap_or(false)) .then_some(self as &dyn EncodingCompressor) diff --git a/vortex-sampling-compressor/src/compressors/date_time_parts.rs b/vortex-sampling-compressor/src/compressors/date_time_parts.rs index 8bf4ed754..5237d7393 100644 --- a/vortex-sampling-compressor/src/compressors/date_time_parts.rs +++ b/vortex-sampling-compressor/src/compressors/date_time_parts.rs @@ -1,7 +1,7 @@ use vortex_array::aliases::hash_set::HashSet; use vortex_array::array::TemporalArray; use vortex_array::encoding::EncodingRef; -use vortex_array::stats::ArrayStatistics as _; +use vortex_array::stats::ArrayStatistics; use vortex_array::{ArrayDType, ArrayData, ArrayDef, IntoArrayData}; use vortex_datetime_dtype::TemporalMetadata; use vortex_datetime_parts::{ @@ -24,6 +24,10 @@ impl EncodingCompressor for DateTimePartsCompressor { constants::DATE_TIME_PARTS_COST } + fn decompression_gib_per_second(&self) -> f64 { + constants::DATE_TIME_PARTS_GIB_PER_S + } + fn can_compress(&self, array: &ArrayData) -> Option<&dyn EncodingCompressor> { if let Ok(temporal_array) = TemporalArray::try_from(array) { match temporal_array.temporal_metadata() { diff --git a/vortex-sampling-compressor/src/compressors/delta.rs b/vortex-sampling-compressor/src/compressors/delta.rs index 57c2818c9..320e7d2a4 100644 --- a/vortex-sampling-compressor/src/compressors/delta.rs +++ b/vortex-sampling-compressor/src/compressors/delta.rs @@ -1,7 +1,7 @@ use vortex_array::aliases::hash_set::HashSet; use vortex_array::array::PrimitiveArray; use vortex_array::encoding::EncodingRef; -use vortex_array::stats::ArrayStatistics as _; +use vortex_array::stats::ArrayStatistics; use vortex_array::variants::PrimitiveArrayTrait; use vortex_array::{ArrayData, ArrayDef, IntoArrayData}; use vortex_error::VortexResult; @@ -22,6 +22,10 @@ impl EncodingCompressor for DeltaCompressor { constants::DELTA_COST } + fn decompression_gib_per_second(&self) -> f64 { + constants::DELTA_GIB_PER_S + } + fn can_compress(&self, array: &ArrayData) -> Option<&dyn EncodingCompressor> { // Only support primitive arrays let parray = PrimitiveArray::try_from(array).ok()?; diff --git a/vortex-sampling-compressor/src/compressors/dict.rs b/vortex-sampling-compressor/src/compressors/dict.rs index b9a156c24..fa5d4de54 100644 --- a/vortex-sampling-compressor/src/compressors/dict.rs +++ b/vortex-sampling-compressor/src/compressors/dict.rs @@ -26,6 +26,10 @@ impl EncodingCompressor for DictCompressor { constants::DICT_COST } + fn decompression_gib_per_second(&self) -> f64 { + constants::DICT_GIB_PER_S + } + fn can_compress(&self, array: &ArrayData) -> Option<&dyn EncodingCompressor> { if array.encoding().id() != Primitive::ID && array.encoding().id() != VarBin::ID diff --git a/vortex-sampling-compressor/src/compressors/for.rs b/vortex-sampling-compressor/src/compressors/for.rs index 29e32ae88..1d0736af0 100644 --- a/vortex-sampling-compressor/src/compressors/for.rs +++ b/vortex-sampling-compressor/src/compressors/for.rs @@ -24,6 +24,10 @@ impl EncodingCompressor for FoRCompressor { constants::FOR_COST } + fn decompression_gib_per_second(&self) -> f64 { + constants::FOR_GIB_PER_S + } + fn can_compress(&self, array: &ArrayData) -> Option<&dyn EncodingCompressor> { // Only support primitive arrays let parray = PrimitiveArray::try_from(array).ok()?; diff --git a/vortex-sampling-compressor/src/compressors/fsst.rs b/vortex-sampling-compressor/src/compressors/fsst.rs index baf2b0f97..d3056655c 100644 --- a/vortex-sampling-compressor/src/compressors/fsst.rs +++ b/vortex-sampling-compressor/src/compressors/fsst.rs @@ -6,7 +6,7 @@ use fsst::Compressor; use vortex_array::aliases::hash_set::HashSet; use vortex_array::array::{VarBin, VarBinArray, VarBinView}; use vortex_array::encoding::EncodingRef; -use vortex_array::stats::ArrayStatistics as _; +use vortex_array::stats::ArrayStatistics; use vortex_array::{ArrayDType, ArrayDef, IntoArrayData}; use vortex_dtype::DType; use vortex_error::{vortex_bail, VortexResult}; @@ -37,6 +37,10 @@ impl EncodingCompressor for FSSTCompressor { constants::FSST_COST } + fn decompression_gib_per_second(&self) -> f64 { + constants::FSST_GIB_PER_S + } + fn can_compress(&self, array: &vortex_array::ArrayData) -> Option<&dyn EncodingCompressor> { // FSST arrays must have DType::Utf8. // diff --git a/vortex-sampling-compressor/src/compressors/mod.rs b/vortex-sampling-compressor/src/compressors/mod.rs index ecec7ea21..c2345b88c 100644 --- a/vortex-sampling-compressor/src/compressors/mod.rs +++ b/vortex-sampling-compressor/src/compressors/mod.rs @@ -3,6 +3,7 @@ use std::fmt::{Debug, Display, Formatter}; use std::hash::{Hash, Hasher}; use std::sync::Arc; +use itertools::Itertools; use vortex_array::aliases::hash_set::HashSet; use vortex_array::encoding::EncodingRef; use vortex_array::stats::{ArrayStatistics, Statistics}; @@ -33,6 +34,8 @@ pub trait EncodingCompressor: Sync + Send + Debug { fn cost(&self) -> u8; + fn decompression_gib_per_second(&self) -> f64; + fn can_compress(&self, array: &ArrayData) -> Option<&dyn EncodingCompressor>; fn compress<'a>( @@ -233,6 +236,52 @@ impl<'a> CompressedArray<'a> { pub fn nbytes(&self) -> usize { self.array.nbytes() } + + pub fn decompression_time_ms(&self, assumed_compression_ratio: f64) -> f64 { + const MS_PER_SEC: f64 = 1000.0; + const BYTES_PER_GB: f64 = 1_073_741_824.0; + + // recursively compute the time to decompress all children + let children_time_ms = self + .path() + .iter() + .map(|c| { + c.children + .iter() + .zip_eq(self.array.children().iter()) + .map(|(child_tree, child_array)| { + CompressedArray::compressed( + child_array.clone(), + child_tree.clone(), + None::<&dyn Statistics>, + ) + .decompression_time_ms(assumed_compression_ratio) + }) + .sum::() + }) + .sum::(); + + // get or estimate the output size from decompressing `self` + let uncompressed_size = self + .array() + .statistics() + .compute_uncompressed_size_in_bytes() + .map(|size| size as f64) + .unwrap_or_else(|| self.nbytes() as f64 * assumed_compression_ratio); + + // get the decompression speed of this compressor + let decompression_gib_per_sec = self + .path() + .as_ref() + .map(|c| c.compressor().decompression_gib_per_second()) + .unwrap_or(500.0); + + // compute the time to decompress `self` + let self_time_ms = + (MS_PER_SEC / decompression_gib_per_sec) * uncompressed_size / BYTES_PER_GB; + + self_time_ms + children_time_ms + } } impl AsRef for CompressedArray<'_> { diff --git a/vortex-sampling-compressor/src/compressors/roaring_bool.rs b/vortex-sampling-compressor/src/compressors/roaring_bool.rs index 57d70259a..243f37889 100644 --- a/vortex-sampling-compressor/src/compressors/roaring_bool.rs +++ b/vortex-sampling-compressor/src/compressors/roaring_bool.rs @@ -1,7 +1,7 @@ use vortex_array::aliases::hash_set::HashSet; use vortex_array::array::Bool; use vortex_array::encoding::EncodingRef; -use vortex_array::stats::ArrayStatistics as _; +use vortex_array::stats::ArrayStatistics; use vortex_array::{ArrayDType, ArrayData, ArrayDef, IntoArrayData, IntoArrayVariant}; use vortex_dtype::DType; use vortex_dtype::Nullability::NonNullable; @@ -23,6 +23,10 @@ impl EncodingCompressor for RoaringBoolCompressor { constants::ROARING_BOOL_COST } + fn decompression_gib_per_second(&self) -> f64 { + constants::ROARING_BOOL_GIB_PER_S + } + fn can_compress(&self, array: &ArrayData) -> Option<&dyn EncodingCompressor> { // Only support bool arrays if array.encoding().id() != Bool::ID { diff --git a/vortex-sampling-compressor/src/compressors/roaring_int.rs b/vortex-sampling-compressor/src/compressors/roaring_int.rs index 993d26407..677e9e1a3 100644 --- a/vortex-sampling-compressor/src/compressors/roaring_int.rs +++ b/vortex-sampling-compressor/src/compressors/roaring_int.rs @@ -20,6 +20,10 @@ impl EncodingCompressor for RoaringIntCompressor { constants::ROARING_INT_COST } + fn decompression_gib_per_second(&self) -> f64 { + constants::ROARING_INT_GIB_PER_S + } + fn can_compress(&self, array: &ArrayData) -> Option<&dyn EncodingCompressor> { // Only support non-nullable uint arrays if !array.dtype().is_unsigned_int() || array.dtype().is_nullable() { diff --git a/vortex-sampling-compressor/src/compressors/runend.rs b/vortex-sampling-compressor/src/compressors/runend.rs index 491e14249..a737fcf57 100644 --- a/vortex-sampling-compressor/src/compressors/runend.rs +++ b/vortex-sampling-compressor/src/compressors/runend.rs @@ -26,6 +26,10 @@ impl EncodingCompressor for RunEndCompressor { constants::RUN_END_COST } + fn decompression_gib_per_second(&self) -> f64 { + constants::RUN_END_GIB_PER_S + } + fn can_compress(&self, array: &ArrayData) -> Option<&dyn EncodingCompressor> { if array.encoding().id() != Primitive::ID { return None; diff --git a/vortex-sampling-compressor/src/compressors/sparse.rs b/vortex-sampling-compressor/src/compressors/sparse.rs index 03c08efe8..5248c001b 100644 --- a/vortex-sampling-compressor/src/compressors/sparse.rs +++ b/vortex-sampling-compressor/src/compressors/sparse.rs @@ -1,7 +1,7 @@ use vortex_array::aliases::hash_set::HashSet; use vortex_array::array::{Sparse, SparseArray, SparseEncoding}; use vortex_array::encoding::EncodingRef; -use vortex_array::stats::ArrayStatistics as _; +use vortex_array::stats::ArrayStatistics; use vortex_array::{ArrayData, ArrayDef, IntoArrayData}; use vortex_error::VortexResult; @@ -20,6 +20,10 @@ impl EncodingCompressor for SparseCompressor { constants::SPARSE_COST } + fn decompression_gib_per_second(&self) -> f64 { + constants::SPARSE_GIB_PER_S + } + fn can_compress(&self, array: &ArrayData) -> Option<&dyn EncodingCompressor> { array.is_encoding(Sparse::ID).then_some(self) } diff --git a/vortex-sampling-compressor/src/compressors/struct_.rs b/vortex-sampling-compressor/src/compressors/struct_.rs index cd948cbc4..cdfd86ef2 100644 --- a/vortex-sampling-compressor/src/compressors/struct_.rs +++ b/vortex-sampling-compressor/src/compressors/struct_.rs @@ -3,7 +3,7 @@ use vortex_array::aliases::hash_set::HashSet; use vortex_array::array::{Struct, StructArray}; use vortex_array::compress::compute_precompression_stats; use vortex_array::encoding::EncodingRef; -use vortex_array::stats::ArrayStatistics as _; +use vortex_array::stats::ArrayStatistics; use vortex_array::variants::StructArrayTrait; use vortex_array::{ArrayData, ArrayDef, IntoArrayData}; use vortex_error::VortexResult; @@ -23,6 +23,10 @@ impl EncodingCompressor for StructCompressor { constants::STRUCT_COST } + fn decompression_gib_per_second(&self) -> f64 { + constants::STRUCT_GIB_PER_S + } + fn can_compress(&self, array: &ArrayData) -> Option<&dyn EncodingCompressor> { StructArray::try_from(array) .ok() diff --git a/vortex-sampling-compressor/src/compressors/zigzag.rs b/vortex-sampling-compressor/src/compressors/zigzag.rs index 36f57fe7f..b202dfb95 100644 --- a/vortex-sampling-compressor/src/compressors/zigzag.rs +++ b/vortex-sampling-compressor/src/compressors/zigzag.rs @@ -22,6 +22,10 @@ impl EncodingCompressor for ZigZagCompressor { constants::ZIGZAG_COST } + fn decompression_gib_per_second(&self) -> f64 { + constants::ZIGZAG_GIB_PER_S + } + fn can_compress(&self, array: &ArrayData) -> Option<&dyn EncodingCompressor> { // Only support primitive arrays let parray = PrimitiveArray::try_from(array).ok()?; diff --git a/vortex-sampling-compressor/src/constants.rs b/vortex-sampling-compressor/src/constants.rs index c92b13b74..16e984b07 100644 --- a/vortex-sampling-compressor/src/constants.rs +++ b/vortex-sampling-compressor/src/constants.rs @@ -1,26 +1,54 @@ -#![allow(dead_code)] +pub use cost::*; +pub use decompression::*; -// structural pass-throughs have no cost -pub const SPARSE_COST: u8 = 0; -pub const CHUNKED_COST: u8 = 0; -pub const STRUCT_COST: u8 = 0; +mod cost { + pub const DEFAULT_MAX_COST: u8 = 3; -// so fast that we can ignore the cost -pub const BITPACKED_NO_PATCHES_COST: u8 = 0; -pub const BITPACKED_WITH_PATCHES_COST: u8 = 0; -pub const CONSTANT_COST: u8 = 0; -pub const ZIGZAG_COST: u8 = 0; + // structural pass-throughs have no cost + pub const SPARSE_COST: u8 = 0; + pub const STRUCT_COST: u8 = 0; + pub const CHUNKED_COST: u8 = 0; -// "normal" encodings -pub const ALP_COST: u8 = 1; -pub const ALP_RD_COST: u8 = 1; -pub const DATE_TIME_PARTS_COST: u8 = 1; -pub const DICT_COST: u8 = 1; -pub const FOR_COST: u8 = 1; -pub const FSST_COST: u8 = 1; -pub const ROARING_BOOL_COST: u8 = 1; -pub const ROARING_INT_COST: u8 = 1; -pub const RUN_END_COST: u8 = 1; + // so fast that we can ignore the cost + pub const BITPACKED_NO_PATCHES_COST: u8 = 0; + pub const BITPACKED_WITH_PATCHES_COST: u8 = 0; + pub const CONSTANT_COST: u8 = 0; + pub const ZIGZAG_COST: u8 = 0; -// "expensive" encodings -pub const DELTA_COST: u8 = 2; + // "normal" encodings + pub const ALP_COST: u8 = 1; + pub const ALP_RD_COST: u8 = 1; + pub const DATE_TIME_PARTS_COST: u8 = 1; + pub const DICT_COST: u8 = 1; + pub const FOR_COST: u8 = 1; + pub const FSST_COST: u8 = 1; + pub const ROARING_BOOL_COST: u8 = 1; + pub const ROARING_INT_COST: u8 = 1; + pub const RUN_END_COST: u8 = 1; + + // "expensive" encodings + pub const DELTA_COST: u8 = 2; +} + +mod decompression { + // structural pass-throughs + pub const SPARSE_GIB_PER_S: f64 = f64::INFINITY; + pub const STRUCT_GIB_PER_S: f64 = f64::INFINITY; + pub const CHUNKED_GIB_PER_S: f64 = f64::INFINITY; + + // benchmarked decompression throughput + pub const ALP_GIB_PER_S: f64 = 10.8; + pub const ALP_RD_GIB_PER_S: f64 = 4.4; + pub const BITPACKED_NO_PATCHES_GIB_PER_S: f64 = 48.2; + pub const BITPACKED_WITH_PATCHES_GIB_PER_S: f64 = 46.6; + pub const CONSTANT_GIB_PER_S: f64 = 200.0; + pub const DATE_TIME_PARTS_GIB_PER_S: f64 = 50.0; // this is a guess + pub const DELTA_GIB_PER_S: f64 = 12.8; + pub const DICT_GIB_PER_S: f64 = 30.0; // ranges from 15-45 depending on data, picked the average + pub const FOR_GIB_PER_S: f64 = 11.3; + pub const FSST_GIB_PER_S: f64 = 6.7; + pub const ROARING_BOOL_GIB_PER_S: f64 = 9.3; + pub const ROARING_INT_GIB_PER_S: f64 = 9.3; + pub const RUN_END_GIB_PER_S: f64 = 10.0; + pub const ZIGZAG_GIB_PER_S: f64 = 30.0; +} diff --git a/vortex-sampling-compressor/src/lib.rs b/vortex-sampling-compressor/src/lib.rs index 79e32ed8c..b973e239e 100644 --- a/vortex-sampling-compressor/src/lib.rs +++ b/vortex-sampling-compressor/src/lib.rs @@ -2,7 +2,7 @@ use std::sync::{Arc, LazyLock}; use compressors::bitpacked::BITPACK_WITH_PATCHES; use compressors::fsst::FSSTCompressor; -use compressors::{CompressedArray, CompressionTree}; +use compressors::{CompressedArray, CompressionTree, CompressorRef}; use vortex_alp::{ALPEncoding, ALPRDEncoding}; use vortex_array::encoding::EncodingRef; use vortex_array::Context; @@ -23,7 +23,6 @@ use crate::compressors::r#for::FoRCompressor; use crate::compressors::runend::DEFAULT_RUN_END_COMPRESSOR; use crate::compressors::sparse::SparseCompressor; use crate::compressors::zigzag::ZigZagCompressor; -use crate::compressors::CompressorRef; #[cfg(feature = "arbitrary")] pub mod arbitrary; @@ -32,7 +31,7 @@ mod constants; mod sampling; mod sampling_compressor; -pub use sampling_compressor::SamplingCompressor; +pub use sampling_compressor::*; pub static DEFAULT_COMPRESSORS: LazyLock<[CompressorRef<'static>; 9]> = LazyLock::new(|| { [ @@ -82,14 +81,45 @@ pub static ALL_ENCODINGS_CONTEXT: LazyLock> = LazyLock::new(|| { ])) }); +#[derive(Debug, Clone)] +pub struct ScanPerfConfig { + /// MiB per second of download throughput + mib_per_second: f64, + /// Compression ratio to assume when calculating decompression time + assumed_compression_ratio: f64, +} + +impl ScanPerfConfig { + pub fn download_time_ms(&self, nbytes: u64) -> f64 { + const MS_PER_SEC: f64 = 1000.0; + const BYTES_PER_MIB: f64 = (1 << 20) as f64; + (MS_PER_SEC / self.mib_per_second) * (nbytes as f64 / BYTES_PER_MIB) + } +} + +impl Default for ScanPerfConfig { + fn default() -> Self { + Self { + mib_per_second: 500.0, // 500 MiB/s for object storage + assumed_compression_ratio: 10.0, // 10:1 ratio of uncompressed data size to compressed data size + } + } +} + #[derive(Debug, Clone)] pub enum Objective { MinSize, + ScanPerf(ScanPerfConfig), } impl Objective { pub fn starting_value(&self) -> f64 { - 1.0 + match self { + // if we're minimizing size, we should never choose a worse compression ratio than "uncompressed" + Objective::MinSize => 1.0, + // if we're maximizing performance, the units are in milliseconds + Objective::ScanPerf(_) => f64::INFINITY, + } } pub fn evaluate( @@ -107,6 +137,14 @@ impl Objective { match &config.objective { Objective::MinSize => (size_in_bytes as f64) / (base_size_bytes as f64), + Objective::ScanPerf(config) => { + let download_time = config.download_time_ms(size_in_bytes); + let decompression_time = + array.decompression_time_ms(config.assumed_compression_ratio); + + // we take the geometric mean of download and decompression time + download_time + decompression_time + } } } } @@ -141,7 +179,7 @@ impl Default for CompressConfig { // Sample length should always be multiple of 1024 sample_size: 64, sample_count: 16, - max_cost: 3, + max_cost: constants::DEFAULT_MAX_COST, objective: Objective::MinSize, overhead_bytes_per_array: 64, target_block_bytesize: 16 * mib, diff --git a/vortex-sampling-compressor/src/sampling_compressor.rs b/vortex-sampling-compressor/src/sampling_compressor.rs index e2b7deedc..d698bf6e5 100644 --- a/vortex-sampling-compressor/src/sampling_compressor.rs +++ b/vortex-sampling-compressor/src/sampling_compressor.rs @@ -1,8 +1,9 @@ -use std::fmt::{Debug, Display, Formatter}; +use core::fmt::Formatter; +use std::fmt::Display; -use log::{debug, warn}; +use log::{debug, info, warn}; use rand::rngs::StdRng; -use rand::SeedableRng; +use rand::SeedableRng as _; use vortex_array::aliases::hash_set::HashSet; use vortex_array::array::{ChunkedArray, Constant}; use vortex_array::compress::{ @@ -15,12 +16,13 @@ use vortex_array::validity::Validity; use vortex_array::{ArrayDType, ArrayData, ArrayDef, IntoCanonical}; use vortex_error::{VortexExpect as _, VortexResult}; -use crate::compressors::chunked::DEFAULT_CHUNKED_COMPRESSOR; +use super::compressors::chunked::DEFAULT_CHUNKED_COMPRESSOR; +use super::compressors::struct_::StructCompressor; +use super::{CompressConfig, Objective, DEFAULT_COMPRESSORS}; use crate::compressors::constant::ConstantCompressor; -use crate::compressors::struct_::StructCompressor; use crate::compressors::{CompressedArray, CompressionTree, CompressorRef, EncodingCompressor}; use crate::sampling::stratified_slices; -use crate::{CompressConfig, Objective, DEFAULT_COMPRESSORS}; +use crate::ScanPerfConfig; #[derive(Debug, Clone)] pub struct SamplingCompressor<'a> { @@ -135,10 +137,7 @@ impl<'a> SamplingCompressor<'a> { check_statistics_unchanged(arr, compressed.as_ref()); return Ok(compressed); } else { - warn!( - "{} cannot find compressor to compress {} like {}", - self, arr, l - ); + warn!("{} cannot compress {} like {}", self, arr, l); } } @@ -158,7 +157,7 @@ impl<'a> SamplingCompressor<'a> { } } - pub fn compress_array(&self, array: &ArrayData) -> VortexResult> { + pub(crate) fn compress_array(&self, array: &ArrayData) -> VortexResult> { let mut rng = StdRng::seed_from_u64(self.options.rng_seed); if array.encoding().id() == Constant::ID { @@ -258,7 +257,7 @@ impl<'a> SamplingCompressor<'a> { } } -fn find_best_compression<'a>( +pub(crate) fn find_best_compression<'a>( candidates: Vec<&'a dyn EncodingCompressor>, sample: &ArrayData, ctx: &SamplingCompressor<'a>, @@ -267,8 +266,8 @@ fn find_best_compression<'a>( let mut best_objective = ctx.options().objective.starting_value(); let mut best_objective_ratio = 1.0; // for logging - let mut best_ratio = 1.0; - let mut best_ratio_sample = None; + let mut best_compression_ratio = 1.0; + let mut best_compression_ratio_sample = None; for compression in candidates { debug!( @@ -287,17 +286,18 @@ fn find_best_compression<'a>( let objective = Objective::evaluate(&compressed_sample, sample.nbytes(), ctx.options()); // track the compression ratio, just for logging - if ratio < best_ratio { - best_ratio = ratio; + if ratio < best_compression_ratio { + best_compression_ratio = ratio; // if we find one with a better compression ratio but worse objective value, save it // for debug logging later. if ratio < best_objective_ratio && objective >= best_objective { - best_ratio_sample = Some(compressed_sample.clone()); + best_compression_ratio_sample = Some(compressed_sample.clone()); } } - if objective < best_objective { + // don't consider anything that compresses to be *larger* than uncompressed + if objective < best_objective && ratio < 1.0 { best_objective = objective; best_objective_ratio = ratio; best = Some(compressed_sample); @@ -309,30 +309,40 @@ fn find_best_compression<'a>( compression.id(), ratio, objective, - best_ratio, + best_compression_ratio, best_objective ); } let best = best.unwrap_or_else(|| CompressedArray::uncompressed(sample.clone())); - if best_ratio < best_objective_ratio && best_ratio_sample.is_some() { + if best_compression_ratio < best_objective_ratio && best_compression_ratio_sample.is_some() { let best_ratio_sample = - best_ratio_sample.vortex_expect("already checked that this Option is Some"); + best_compression_ratio_sample.vortex_expect("already checked that this Option is Some"); debug!( "{} best objective fn value ({}) has ratio {} from {}", ctx, best_objective, - best_ratio, + best_compression_ratio, best.array().tree_display() ); debug!( "{} best ratio ({}) has objective fn value {} from {}", ctx, - best_ratio, + best_compression_ratio, best_objective, best_ratio_sample.array().tree_display() ); } + info!( + "{} best compression ({} bytes, {} objective fn value, {} compression ratio, {} download time, {} decompression time)", + ctx, + best.nbytes(), + best_objective, + best_compression_ratio, + ScanPerfConfig::default().download_time_ms(best.nbytes() as u64), + best.decompression_time_ms(ScanPerfConfig::default().assumed_compression_ratio), + ); + Ok(best) }