Skip to content

Commit

Permalink
feat(native): Add datetime types support to generate_series() UDTF (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
KSDaemon authored Jul 25, 2024
1 parent e0ac7bf commit 99b3c65
Show file tree
Hide file tree
Showing 13 changed files with 644 additions and 284 deletions.
143 changes: 138 additions & 5 deletions rust/cubesql/cubesql/src/compile/engine/udf/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ use datafusion::{
arrow::{
array::{
new_null_array, Array, ArrayBuilder, ArrayRef, BooleanArray, BooleanBuilder,
Float64Array, Float64Builder, GenericStringArray, Int32Builder, Int64Array,
Int64Builder, IntervalDayTimeBuilder, ListArray, ListBuilder, PrimitiveArray,
PrimitiveBuilder, StringArray, StringBuilder, StructBuilder, TimestampMicrosecondArray,
TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray,
UInt32Builder, UInt64Builder,
Date32Array, Float64Array, Float64Builder, GenericStringArray, Int32Builder,
Int64Array, Int64Builder, IntervalDayTimeBuilder, IntervalMonthDayNanoArray, ListArray,
ListBuilder, PrimitiveArray, PrimitiveBuilder, StringArray, StringBuilder,
StructBuilder, TimestampMicrosecondArray, TimestampMillisecondArray,
TimestampNanosecondArray, TimestampSecondArray, UInt32Builder, UInt64Builder,
},
compute::{cast, concat},
datatypes::{
Expand Down Expand Up @@ -2313,6 +2313,86 @@ macro_rules! generate_series_udtf {
}};
}

macro_rules! generate_series_helper_date32 {
($CURRENT:ident, $STEP:ident, $PRIMITIVE_TYPE: ident) => {
let current_dt = NaiveDateTime::from_timestamp_opt(($CURRENT as i64) * 86400, 0)
.ok_or_else(|| {
DataFusionError::Execution(format!(
"Cannot convert date to NaiveDateTime: {}",
$CURRENT
))
})?;
let res = date_addsub_month_day_nano(current_dt, $STEP, true)?;
$CURRENT = (res.timestamp() / 86400) as $PRIMITIVE_TYPE;
};
}

macro_rules! generate_series_helper_timestamp {
($CURRENT:ident, $STEP:ident, $PRIMITIVE_TYPE: ident) => {
let current_dt = NaiveDateTime::from_timestamp_opt(
($CURRENT as i64) / 1_000_000_000,
($CURRENT % 1_000_000_000) as u32,
)
.ok_or_else(|| {
DataFusionError::Execution(format!(
"Cannot convert timestamp to NaiveDateTime: {}",
$CURRENT
))
})?;
let res = date_addsub_month_day_nano(current_dt, $STEP, true)?;
$CURRENT = res.timestamp_nanos_opt().unwrap() as $PRIMITIVE_TYPE;
};
}

macro_rules! generate_series_non_primitive_udtf {
($ARGS:expr, $TYPE: ident, $PRIMITIVE_TYPE: ident, $HANDLE_MACRO:ident) => {{
let mut section_sizes: Vec<usize> = Vec::new();
let l_arr = &$ARGS[0].as_any().downcast_ref::<PrimitiveArray<$TYPE>>();

if l_arr.is_some() {
let l_arr = l_arr.unwrap();
let r_arr = downcast_primitive_arg!($ARGS[1], "right", $TYPE);
let step_arr = IntervalMonthDayNanoArray::from_value(
IntervalMonthDayNanoType::make_value(0, 1, 0), // 1 day as default
1,
);
let step_arr = if $ARGS.len() > 2 {
downcast_primitive_arg!($ARGS[2], "step", IntervalMonthDayNanoType)
} else {
&step_arr
};

let mut builder = PrimitiveBuilder::<$TYPE>::new(1);
for (i, (start, end)) in l_arr.iter().zip(r_arr.iter()).enumerate() {
let step = if step_arr.len() > i {
step_arr.value(i)
} else {
step_arr.value(0)
};

if let (Some(start), Some(end)) = (start, end) {
let mut section_size: i64 = 0;
if start <= end && step > 0 {
let mut current = start;
loop {
if current > end {
break;
}
builder.append_value(current).unwrap();

section_size += 1;
$HANDLE_MACRO!(current, step, $PRIMITIVE_TYPE);
}
}
section_sizes.push(section_size as usize);
}
}

return Ok((Arc::new(builder.finish()) as ArrayRef, section_sizes));
}
}};
}

pub fn create_generate_series_udtf() -> TableUDF {
let fun = make_table_function(move |args: &[ArrayRef]| {
assert!(args.len() == 2 || args.len() == 3);
Expand All @@ -2321,6 +2401,24 @@ pub fn create_generate_series_udtf() -> TableUDF {
generate_series_udtf!(args, Int64Type, i64)
} else if args[0].as_any().downcast_ref::<Float64Array>().is_some() {
generate_series_udtf!(args, Float64Type, f64)
} else if args[0].as_any().downcast_ref::<Date32Array>().is_some() {
generate_series_non_primitive_udtf!(
args,
Date32Type,
i32,
generate_series_helper_date32
)
} else if args[0]
.as_any()
.downcast_ref::<TimestampNanosecondArray>()
.is_some()
{
generate_series_non_primitive_udtf!(
args,
TimestampNanosecondType,
i64,
generate_series_helper_timestamp
)
}

Err(DataFusionError::Execution(format!("Unsupported type")))
Expand All @@ -2346,6 +2444,41 @@ pub fn create_generate_series_udtf() -> TableUDF {
DataType::Float64,
DataType::Float64,
]),
TypeSignature::Exact(vec![DataType::Date32, DataType::Date32]),
TypeSignature::Exact(vec![
DataType::Date32,
DataType::Date32,
DataType::Interval(IntervalUnit::MonthDayNano),
]),
TypeSignature::Exact(vec![
DataType::Date32,
DataType::Date32,
DataType::Interval(IntervalUnit::YearMonth),
]),
TypeSignature::Exact(vec![
DataType::Date32,
DataType::Date32,
DataType::Interval(IntervalUnit::DayTime),
]),
TypeSignature::Exact(vec![
DataType::Timestamp(TimeUnit::Nanosecond, None),
DataType::Timestamp(TimeUnit::Nanosecond, None),
]),
TypeSignature::Exact(vec![
DataType::Timestamp(TimeUnit::Nanosecond, None),
DataType::Timestamp(TimeUnit::Nanosecond, None),
DataType::Interval(IntervalUnit::MonthDayNano),
]),
TypeSignature::Exact(vec![
DataType::Timestamp(TimeUnit::Nanosecond, None),
DataType::Timestamp(TimeUnit::Nanosecond, None),
DataType::Interval(IntervalUnit::YearMonth),
]),
TypeSignature::Exact(vec![
DataType::Timestamp(TimeUnit::Nanosecond, None),
DataType::Timestamp(TimeUnit::Nanosecond, None),
DataType::Interval(IntervalUnit::DayTime),
]),
],
Volatility::Immutable,
),
Expand Down
Loading

0 comments on commit 99b3c65

Please sign in to comment.