Skip to content

Commit

Permalink
refactor(cubesql): Use arrow interval packing (#8471)
Browse files Browse the repository at this point in the history
Bump arrow-datafusion to f99263552906b5b9fb22d679cb466876057d95e9
Use to_parts in date +- interval
Use IntervalDayTimeType::make_value in one_day rewrite
Use make_value and to_parts in Decomposed*
  • Loading branch information
mcheshkov authored Jul 19, 2024
1 parent 59d74fe commit b5075a4
Show file tree
Hide file tree
Showing 6 changed files with 75 additions and 63 deletions.
16 changes: 8 additions & 8 deletions packages/cubejs-backend-native/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 8 additions & 8 deletions rust/cubesql/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion rust/cubesql/cubesql/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ homepage = "https://cube.dev"

[dependencies]
arc-swap = "1"
datafusion = { git = 'https://github.com/cube-js/arrow-datafusion.git', rev = "8d98b8587052b35888d29d70ba94618bd913ad39", default-features = false, features = ["regex_expressions", "unicode_expressions"] }
datafusion = { git = 'https://github.com/cube-js/arrow-datafusion.git', rev = "f99263552906b5b9fb22d679cb466876057d95e9", default-features = false, features = ["regex_expressions", "unicode_expressions"] }
anyhow = "1.0"
thiserror = "1.0.50"
cubeclient = { path = "../cubeclient" }
Expand Down
52 changes: 35 additions & 17 deletions rust/cubesql/cubesql/src/compile/engine/udf/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ use datafusion::{
},
compute::{cast, concat},
datatypes::{
DataType, Date32Type, Field, Float64Type, Int32Type, Int64Type, IntervalDayTimeType,
IntervalMonthDayNanoType, IntervalUnit, IntervalYearMonthType, TimeUnit,
TimestampNanosecondType, UInt32Type,
ArrowPrimitiveType, DataType, Date32Type, Field, Float64Type, Int32Type, Int64Type,
IntervalDayTimeType, IntervalMonthDayNanoType, IntervalUnit, IntervalYearMonthType,
TimeUnit, TimestampNanosecondType, UInt32Type,
},
},
error::{DataFusionError, Result},
Expand Down Expand Up @@ -46,6 +46,9 @@ use crate::{
sql::SessionState,
};

type IntervalDayTime = <IntervalDayTimeType as ArrowPrimitiveType>::Native;
type IntervalMonthDayNano = <IntervalMonthDayNanoType as ArrowPrimitiveType>::Native;

pub type ReturnTypeFunction = Arc<dyn Fn(&[DataType]) -> Result<Arc<DataType>> + Send + Sync>;
pub type ScalarFunctionImplementation =
Arc<dyn Fn(&[ColumnarValue]) -> Result<ColumnarValue> + Send + Sync>;
Expand Down Expand Up @@ -1424,10 +1427,12 @@ fn date_addsub_year_month(t: NaiveDateTime, i: i32, is_add: bool) -> Result<Naiv
};
}

fn date_addsub_month_day_nano(t: NaiveDateTime, i: i128, is_add: bool) -> Result<NaiveDateTime> {
let month = (i >> (64 + 32)) & 0xFFFFFFFF;
let day = (i >> 64) & 0xFFFFFFFF;
let nano = i & 0xFFFFFFFFFFFFFFFF;
fn date_addsub_month_day_nano(
t: NaiveDateTime,
i: IntervalMonthDayNano,
is_add: bool,
) -> Result<NaiveDateTime> {
let (month, day, nano) = IntervalMonthDayNanoType::to_parts(i);

let result = if month > 0 && is_add || month < 0 && !is_add {
t.checked_add_months(Months::new(month as u32))
Expand All @@ -1442,9 +1447,7 @@ fn date_addsub_month_day_nano(t: NaiveDateTime, i: i128, is_add: bool) -> Result
};

let result = result.and_then(|t| {
t.checked_add_signed(Duration::nanoseconds(
(nano as i64) * (if !is_add { -1 } else { 1 }),
))
t.checked_add_signed(Duration::nanoseconds(nano * (if !is_add { -1 } else { 1 })))
});
result.ok_or_else(|| {
DataFusionError::Execution(format!(
Expand All @@ -1454,15 +1457,30 @@ fn date_addsub_month_day_nano(t: NaiveDateTime, i: i128, is_add: bool) -> Result
})
}

fn date_addsub_day_time(t: NaiveDateTime, interval: i64, is_add: bool) -> Result<NaiveDateTime> {
let i = match is_add {
true => interval,
false => -interval,
fn date_addsub_day_time(
t: NaiveDateTime,
interval: IntervalDayTime,
is_add: bool,
) -> Result<NaiveDateTime> {
let (days, millis) = IntervalDayTimeType::to_parts(interval);

let result = if days > 0 && is_add || days < 0 && !is_add {
t.checked_add_days(Days::new(days as u64))
} else {
t.checked_sub_days(Days::new(days.abs() as u64))
};

let days: i64 = i.signum() * (i.abs() >> 32);
let millis: i64 = i.signum() * ((i.abs() << 32) >> 32);
return Ok(t + chrono::Duration::days(days) + chrono::Duration::milliseconds(millis));
let result = result.and_then(|t| {
t.checked_add_signed(Duration::milliseconds(
millis as i64 * (if !is_add { -1 } else { 1 }),
))
});
result.ok_or_else(|| {
DataFusionError::Execution(format!(
"Failed to add interval: {} day {} ms",
days, millis
))
})
}

fn change_ym(t: NaiveDateTime, y: i32, m: u32) -> Option<NaiveDateTime> {
Expand Down
6 changes: 4 additions & 2 deletions rust/cubesql/cubesql/src/compile/rewrite/rules/filters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use cubeclient::models::V1CubeMeta;
use datafusion::{
arrow::{
array::{Date32Array, Date64Array, TimestampNanosecondArray},
datatypes::DataType,
datatypes::{DataType, IntervalDayTimeType},
},
logical_plan::{Column, Expr, Operator},
scalar::ScalarValue,
Expand Down Expand Up @@ -4441,7 +4441,9 @@ impl FilterRules {
subst.insert(
one_day_var,
egraph.add(LogicalPlanLanguage::LiteralExprValue(LiteralExprValue(
ScalarValue::IntervalDayTime(Some(1 << 32)),
ScalarValue::IntervalDayTime(Some(IntervalDayTimeType::make_value(
1, 0,
))),
))),
);
return true;
Expand Down
46 changes: 19 additions & 27 deletions rust/cubesql/cubesql/src/compile/rewrite/rules/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::{

use chrono::{Datelike, NaiveDateTime, Timelike};
use datafusion::{
arrow::datatypes::{ArrowPrimitiveType, IntervalDayTimeType, IntervalMonthDayNanoType},
error::DataFusionError,
logical_plan::{Expr, Operator},
physical_plan::aggregates::AggregateFunction,
Expand All @@ -20,6 +21,9 @@ use crate::{
CubeError,
};

type IntervalDayTime = <IntervalDayTimeType as ArrowPrimitiveType>::Native;
type IntervalMonthDayNano = <IntervalMonthDayNanoType as ArrowPrimitiveType>::Native;

pub fn parse_granularity_string(granularity: &str, to_normalize: bool) -> Option<String> {
if to_normalize {
match granularity.to_lowercase().as_str() {
Expand Down Expand Up @@ -208,30 +212,26 @@ pub struct DecomposedDayTime {
}

impl DecomposedDayTime {
const _DAY_BITS: i32 = 32;
const MILLIS_BITS: i32 = 32;

const DAY_LABEL: &'static str = "DAY";
const MILLIS_LABEL: &'static str = "MILLISECOND";

pub fn from_raw_interval_value(interval: i64) -> Self {
Self {
days: (interval >> Self::MILLIS_BITS) as i32,
millis: interval as i32,
}
pub fn from_raw_interval_value(interval: IntervalDayTime) -> Self {
let (days, millis) = IntervalDayTimeType::to_parts(interval);

Self { days, millis }
}

pub fn is_single_part(&self) -> bool {
self.days == 0 || self.millis == 0
}

pub fn millis_scalar(&self) -> ScalarValue {
let value = Some(self.millis as i64);
let value = Some(IntervalDayTimeType::make_value(0, self.millis));
ScalarValue::IntervalDayTime(value)
}

pub fn days_scalar(&self) -> ScalarValue {
let value = Some((self.days as i64) << Self::MILLIS_BITS);
let value = Some(IntervalDayTimeType::make_value(self.days, 0));
ScalarValue::IntervalDayTime(value)
}

Expand Down Expand Up @@ -317,26 +317,14 @@ pub struct DecomposedMonthDayNano {
}

impl DecomposedMonthDayNano {
const _MONTHS_MASK: u128 = 0xFFFF_FFFF_0000_0000_0000_0000_0000_0000;
const DAYS_MASK: u128 = 0x0000_0000_FFFF_FFFF_0000_0000_0000_0000;
const NANOS_MASK: u128 = 0x0000_0000_0000_0000_FFFF_FFFF_FFFF_FFFF;
const _MONTHS_BITS: i32 = 32;
const DAYS_BITS: i32 = 32;
const NANOS_BITS: i32 = 64;
const DAYS_OFFSET: i32 = Self::NANOS_BITS;
const MONTHS_OFFSET: i32 = Self::DAYS_OFFSET + Self::DAYS_BITS;

const MONTH: &'static str = "MONTH";
const DAY: &'static str = "DAY";
const MILLIS: &'static str = "MILLISECOND";

const NANOS_IN_MILLI: i64 = 1_000_000;

pub fn from_raw_interval_value(interval: i128) -> Self {
let interval = interval as u128;
let months = (interval >> Self::MONTHS_OFFSET) as i32;
let days = (interval >> Self::DAYS_OFFSET) as i32;
let nanos = interval as i64;
pub fn from_raw_interval_value(interval: IntervalMonthDayNano) -> Self {
let (months, days, nanos) = IntervalMonthDayNanoType::to_parts(interval);
// TODO: precision loss
let millis = nanos / Self::NANOS_IN_MILLI;
DecomposedMonthDayNano {
Expand All @@ -355,17 +343,21 @@ impl DecomposedMonthDayNano {
}

pub fn millis_scalar(&self) -> ScalarValue {
let value = Some(((self.millis * Self::NANOS_IN_MILLI) as u128 & Self::NANOS_MASK) as i128);
let value = Some(IntervalMonthDayNanoType::make_value(
0,
0,
self.millis * Self::NANOS_IN_MILLI,
));
ScalarValue::IntervalMonthDayNano(value)
}

pub fn days_scalar(&self) -> ScalarValue {
let value = Some((((self.days as u128) << Self::DAYS_OFFSET) & Self::DAYS_MASK) as i128);
let value = Some(IntervalMonthDayNanoType::make_value(0, self.days, 0));
ScalarValue::IntervalMonthDayNano(value)
}

pub fn months_scalar(&self) -> ScalarValue {
let value = Some(((self.months as u128) << Self::MONTHS_OFFSET) as i128);
let value = Some(IntervalMonthDayNanoType::make_value(self.months, 0, 0));
ScalarValue::IntervalMonthDayNano(value)
}

Expand Down

0 comments on commit b5075a4

Please sign in to comment.