Skip to content

Commit

Permalink
feat: Epoch extraction from intervals
Browse files Browse the repository at this point in the history
  • Loading branch information
srh committed Jun 14, 2024
1 parent dac2467 commit e61a28a
Show file tree
Hide file tree
Showing 2 changed files with 219 additions and 47 deletions.
145 changes: 114 additions & 31 deletions datafusion/cube_ext/src/temporal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,12 @@

use arrow::array::{Array, Float64Array, Int32Array, Int32Builder, PrimitiveArray};
use arrow::compute::kernels::arity::unary;
use arrow::datatypes::{ArrowNumericType, ArrowTemporalType, DataType, TimeUnit};
use arrow::datatypes::{
ArrowNumericType, ArrowPrimitiveType, ArrowTemporalType, DataType, Date32Type,
Date64Type, Float64Type, IntervalDayTimeType, IntervalMonthDayNanoType,
IntervalYearMonthType, TimestampMicrosecondType, TimestampMillisecondType,
TimestampNanosecondType, TimestampSecondType,
};
use arrow::error::{ArrowError, Result};

use chrono::format::strftime::StrftimeItems;
Expand Down Expand Up @@ -145,39 +150,117 @@ where
Ok(b.finish())
}

fn postgres_months_epoch(n: i32) -> f64 {
let years = n / 12;
let remainder = n % 12;
// Note that this arithmetic produces exact integer calculations with no floating point error.
let seconds_in_a_day = 86400_f64;
(years as f64) * (seconds_in_a_day * 365.25)
+ (remainder as f64) * (seconds_in_a_day * 30.0)
}

pub trait Epochable: ArrowPrimitiveType + Sized {
fn get_epoch(array: &PrimitiveArray<Self>) -> PrimitiveArray<Float64Type>;
}

impl Epochable for TimestampSecondType {
fn get_epoch(
array: &PrimitiveArray<TimestampSecondType>,
) -> PrimitiveArray<Float64Type> {
unary(array, |n| n as f64)
}
}

impl Epochable for TimestampMillisecondType {
fn get_epoch(
array: &PrimitiveArray<TimestampMillisecondType>,
) -> PrimitiveArray<Float64Type> {
unary(array, |n| n as f64 / 1_000.0)
}
}

impl Epochable for TimestampMicrosecondType {
fn get_epoch(
array: &PrimitiveArray<TimestampMicrosecondType>,
) -> PrimitiveArray<Float64Type> {
unary(array, |n| n as f64 / 1_000_000.0)
}
}

impl Epochable for TimestampNanosecondType {
fn get_epoch(
array: &PrimitiveArray<TimestampNanosecondType>,
) -> PrimitiveArray<Float64Type> {
unary(array, |n| n as f64 / 1_000_000_000.0)
}
}

impl Epochable for Date32Type {
fn get_epoch(array: &PrimitiveArray<Date32Type>) -> PrimitiveArray<Float64Type> {
unary(array, |n| {
let seconds_in_a_day = 86400.0;
n as f64 * seconds_in_a_day
})
}
}

impl Epochable for Date64Type {
fn get_epoch(array: &PrimitiveArray<Date64Type>) -> PrimitiveArray<Float64Type> {
unary(array, |n| n as f64 / 1_000.0)
}
}

impl Epochable for IntervalYearMonthType {
fn get_epoch(
array: &PrimitiveArray<IntervalYearMonthType>,
) -> PrimitiveArray<Float64Type> {
unary(array, postgres_months_epoch)
}
}

impl Epochable for IntervalDayTimeType {
fn get_epoch(
array: &PrimitiveArray<IntervalDayTimeType>,
) -> PrimitiveArray<Float64Type> {
unary(array, |n| {
// Implemented based on scalar_timestamp_add_interval_day_time
let sign = n.signum();
let n = n.abs();
// i64::MIN would work okay in release mode after the bitmask
// in the days variable (but TODO: what is Postgres' exact behavior?)

let days: i64 = (n >> 32) & 0xFFFF_FFFF;
let millis: i64 = n & 0xFFFF_FFFF;

let seconds_in_a_day = 86400.0;
sign as f64 * ((days as f64) * seconds_in_a_day + (millis as f64) / 1_000.0)
})
}
}

impl Epochable for IntervalMonthDayNanoType {
fn get_epoch(
array: &PrimitiveArray<IntervalMonthDayNanoType>,
) -> PrimitiveArray<Float64Type> {
unary(array, |n| {
let seconds_in_a_day = 86400_f64;
let n: i128 = n;
let month = (n >> 96) & 0xFFFF_FFFF;
let day = (n >> 64) & 0xFFFF_FFFF;
let nano = n & 0xFFFF_FFFF_FFFF_FFFF;
let month_epoch: f64 = postgres_months_epoch(month as i32);
month_epoch
+ (day as f64) * seconds_in_a_day
+ (nano as f64) / 1_000_000_000.0
})
}
}

pub fn epoch<T>(array: &PrimitiveArray<T>) -> Result<Float64Array>
where
T: ArrowTemporalType + ArrowNumericType,
i64: From<T::Native>,
T: Epochable,
{
let b = match array.data_type() {
DataType::Timestamp(tu, _) => {
let scale = match tu {
TimeUnit::Second => 1,
TimeUnit::Millisecond => 1_000,
TimeUnit::Microsecond => 1_000_000,
TimeUnit::Nanosecond => 1_000_000_000,
} as f64;
unary(array, |n| {
let n: i64 = n.into();
n as f64 / scale
})
}
DataType::Date32 => {
let seconds_in_a_day = 86400_f64;
unary(array, |n| {
let n: i64 = n.into();
n as f64 * seconds_in_a_day
})
}
DataType::Date64 => unary(array, |n| {
let n: i64 = n.into();
n as f64 / 1_000_f64
}),
_ => {
return_compute_error_with!("Can not convert {:?} to epoch", array.data_type())
}
};
let b = Epochable::get_epoch(array);
Ok(b)
}

Expand Down
121 changes: 105 additions & 16 deletions datafusion/physical-expr/src/datetime_expressions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@

//! DateTime expressions

use arrow::array::{Int64Array, IntervalDayTimeArray, IntervalYearMonthArray};
use arrow::array::{
Int64Array, IntervalDayTimeArray, IntervalMonthDayNanoArray, IntervalYearMonthArray,
};
use arrow::compute::cast;
use arrow::{
array::{Array, ArrayRef, GenericStringArray, PrimitiveArray, StringOffsetSizeTrait},
Expand All @@ -33,7 +35,7 @@ use arrow::{
TimestampNanosecondArray, TimestampSecondArray,
},
compute::kernels::temporal,
datatypes::TimeUnit,
datatypes::{IntervalUnit, TimeUnit},
temporal_conversions::timestamp_ns_to_datetime,
};
use chrono::prelude::*;
Expand Down Expand Up @@ -482,7 +484,7 @@ pub fn date_trunc(args: &[ColumnarValue]) -> Result<ColumnarValue> {
}

macro_rules! extract_date_part {
($ARRAY: expr, $FN:expr, $RT: expr) => {
($ARRAY: expr, $PART_STRING: expr, $FN: expr, $RT: expr) => {
match $ARRAY.data_type() {
DataType::Date32 => {
let array = $ARRAY.as_any().downcast_ref::<Date32Array>().unwrap();
Expand Down Expand Up @@ -523,8 +525,80 @@ macro_rules! extract_date_part {
}
},
datatype => Err(DataFusionError::Internal(format!(
"Extract does not support datatype {:?}",
datatype
"Extract with date part '{}' does not support datatype {:?}",
$PART_STRING, datatype
))),
}
};
}

macro_rules! extract_date_part_from_date_or_interval {
($ARRAY: expr, $PART_STRING: expr, $FN: expr, $RT: expr) => {
match $ARRAY.data_type() {
DataType::Date32 => {
let array = $ARRAY.as_any().downcast_ref::<Date32Array>().unwrap();
Ok($FN(array).map(|v| cast(&(Arc::new(v) as ArrayRef), &$RT))?)
}
DataType::Date64 => {
let array = $ARRAY.as_any().downcast_ref::<Date64Array>().unwrap();
Ok($FN(array).map(|v| cast(&(Arc::new(v) as ArrayRef), &$RT))?)
}
DataType::Timestamp(time_unit, None) => match time_unit {
TimeUnit::Second => {
let array = $ARRAY
.as_any()
.downcast_ref::<TimestampSecondArray>()
.unwrap();
Ok($FN(array).map(|v| cast(&(Arc::new(v) as ArrayRef), &$RT))?)
}
TimeUnit::Millisecond => {
let array = $ARRAY
.as_any()
.downcast_ref::<TimestampMillisecondArray>()
.unwrap();
Ok($FN(array).map(|v| cast(&(Arc::new(v) as ArrayRef), &$RT))?)
}
TimeUnit::Microsecond => {
let array = $ARRAY
.as_any()
.downcast_ref::<TimestampMicrosecondArray>()
.unwrap();
Ok($FN(array).map(|v| cast(&(Arc::new(v) as ArrayRef), &$RT))?)
}
TimeUnit::Nanosecond => {
let array = $ARRAY
.as_any()
.downcast_ref::<TimestampNanosecondArray>()
.unwrap();
Ok($FN(array).map(|v| cast(&(Arc::new(v) as ArrayRef), &$RT))?)
}
},
DataType::Interval(interval_unit) => match interval_unit {
IntervalUnit::YearMonth => {
let array = $ARRAY
.as_any()
.downcast_ref::<IntervalYearMonthArray>()
.unwrap();
Ok($FN(array).map(|v| cast(&(Arc::new(v) as ArrayRef), &$RT))?)
}
IntervalUnit::DayTime => {
let array = $ARRAY
.as_any()
.downcast_ref::<IntervalDayTimeArray>()
.unwrap();
Ok($FN(array).map(|v| cast(&(Arc::new(v) as ArrayRef), &$RT))?)
}
IntervalUnit::MonthDayNano => {
let array = $ARRAY
.as_any()
.downcast_ref::<IntervalMonthDayNanoArray>()
.unwrap();
Ok($FN(array).map(|v| cast(&(Arc::new(v) as ArrayRef), &$RT))?)
}
},
datatype => Err(DataFusionError::Internal(format!(
"Extract with date part '{}' does not support datatype {:?}",
$PART_STRING, datatype
))),
}
};
Expand Down Expand Up @@ -555,18 +629,33 @@ pub fn date_part(args: &[ColumnarValue]) -> Result<ColumnarValue> {
};

let arr = match date_part.to_lowercase().as_str() {
"doy" => extract_date_part!(array, cube_ext::temporal::doy, DataType::Int32),
"dow" => extract_date_part!(array, cube_ext::temporal::dow, DataType::Int32),
"year" => extract_date_part!(array, temporal::year, DataType::Int32),
"quarter" => extract_date_part!(array, temporal::quarter, DataType::Int32),
"month" => extract_date_part!(array, temporal::month, DataType::Int32),
"week" => extract_date_part!(array, temporal::week, DataType::Int32),
"day" => extract_date_part!(array, temporal::day, DataType::Int32),
"hour" => extract_date_part!(array, temporal::hour, DataType::Int32),
"minute" => extract_date_part!(array, temporal::minute, DataType::Int32),
"second" => extract_date_part!(array, temporal::second, DataType::Int32),
"doy" => {
extract_date_part!(array, date_part, cube_ext::temporal::doy, DataType::Int32)
}
"dow" => {
extract_date_part!(array, date_part, cube_ext::temporal::dow, DataType::Int32)
}
"year" => extract_date_part!(array, date_part, temporal::year, DataType::Int32),
"quarter" => {
extract_date_part!(array, date_part, temporal::quarter, DataType::Int32)
}
"month" => extract_date_part!(array, date_part, temporal::month, DataType::Int32),
"week" => extract_date_part!(array, date_part, temporal::week, DataType::Int32),
"day" => extract_date_part!(array, date_part, temporal::day, DataType::Int32),
"hour" => extract_date_part!(array, date_part, temporal::hour, DataType::Int32),
"minute" => {
extract_date_part!(array, date_part, temporal::minute, DataType::Int32)
}
"second" => {
extract_date_part!(array, date_part, temporal::second, DataType::Int32)
}
"epoch" => {
extract_date_part!(array, cube_ext::temporal::epoch, DataType::Float64)
extract_date_part_from_date_or_interval!(
array,
date_part,
cube_ext::temporal::epoch,
DataType::Float64
)
}
_ => Err(DataFusionError::Execution(format!(
"Date part '{}' not supported",
Expand Down

0 comments on commit e61a28a

Please sign in to comment.