Skip to content

Commit

Permalink
feat: timestamp subtract date32 support
Browse files Browse the repository at this point in the history
  • Loading branch information
KSDaemon committed Jun 18, 2024
1 parent e61a28a commit e9b19ec
Showing 1 changed file with 56 additions and 8 deletions.
64 changes: 56 additions & 8 deletions datafusion/physical-expr/src/expressions/binary_distinct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,17 @@ use std::sync::Arc;

use arrow::{
array::{
Array, ArrayRef, Int64Array, IntervalDayTimeArray, IntervalMonthDayNanoArray,
IntervalYearMonthArray, TimestampNanosecondArray,
Array, ArrayRef, Date32Array, Int64Array, IntervalDayTimeArray,
IntervalMonthDayNanoArray, IntervalYearMonthArray, TimestampNanosecondArray,
},
datatypes::{DataType, IntervalUnit},
temporal_conversions::timestamp_ns_to_datetime,
datatypes::{
DataType::{self, Date32},
IntervalUnit,
TimeUnit::Nanosecond,
},
temporal_conversions::{date32_to_datetime, timestamp_ns_to_datetime},
};
use chrono::{Datelike, Days, Duration, Months, NaiveDate, NaiveDateTime};

use datafusion_common::{DataFusionError, Result};
use datafusion_expr::Operator;

Expand All @@ -49,6 +52,7 @@ pub fn distinct_types_allowed(
(left_type, right_type),
(Timestamp(Nanosecond, _), Interval(_))
| (Timestamp(Nanosecond, _), Timestamp(Nanosecond, _))
| (Timestamp(Nanosecond, _), Date32)
),
Operator::Multiply => matches!(
(left_type, right_type),
Expand Down Expand Up @@ -105,6 +109,9 @@ pub fn coerce_types_distinct(
Timestamp(Nanosecond, tz.clone()),
Timestamp(Nanosecond, tz2.clone()),
)),
(Timestamp(_, tz), Date32) => {
Some((Timestamp(Nanosecond, tz.clone()), Date32))
}
_ => None,
},
Operator::Multiply => match (lhs_type, rhs_type) {
Expand Down Expand Up @@ -175,6 +182,9 @@ pub fn evaluate_distinct_with_resolved_args(
// TODO: Implement postgres behavior with time zones
Some(timestamp_subtract_timestamp(left, right))
}
(Timestamp(Nanosecond, None), Date32) => {
Some(timestamp_subtract_date(left, right))
}
_ => None,
},
Operator::Multiply => match (left_data_type, right_data_type) {
Expand Down Expand Up @@ -364,6 +374,24 @@ fn timestamp_subtract_timestamp(
Ok(Arc::new(result))
}

fn timestamp_subtract_date(
left: Arc<dyn Array>,
right: Arc<dyn Array>,
) -> Result<ArrayRef> {
let left = left
.as_any()
.downcast_ref::<TimestampNanosecondArray>()
.unwrap();
let right = right.as_any().downcast_ref::<Date32Array>().unwrap();

let result = left
.iter()
.zip(right.iter())
.map(|(t_l, t_r)| scalar_timestamp_subtract_date(t_l, t_r))
.collect::<Result<IntervalMonthDayNanoArray>>()?;
Ok(Arc::new(result))
}

fn scalar_timestamp_add_interval_year_month(
timestamp: Option<i64>,
interval: Option<i32>,
Expand Down Expand Up @@ -487,7 +515,29 @@ fn scalar_timestamp_subtract_timestamp(
let datetime_right: NaiveDateTime =
timestamp_ns_to_datetime(timestamp_right.unwrap());
let duration = datetime_left.signed_duration_since(datetime_right);
// TODO: What is Postgres behavior? E.g. if these timestamp values are i64::MAX and i64::MIN,

duration_to_interval_day_nano(duration)

// TODO: How can day, above, in scalar_timestamp_add_interval_month_day_nano, be negative?
}

fn scalar_timestamp_subtract_date(
timestamp_left: Option<i64>,
timestamp_right: Option<i32>,
) -> Result<Option<i128>> {
if timestamp_left.is_none() || timestamp_right.is_none() {
return Ok(None);
}

let datetime_left: NaiveDateTime = timestamp_ns_to_datetime(timestamp_left.unwrap());
let datetime_right: NaiveDateTime = date32_to_datetime(timestamp_right.unwrap());
let duration = datetime_left.signed_duration_since(datetime_right);

duration_to_interval_day_nano(duration)
}

fn duration_to_interval_day_nano(duration: Duration) -> Result<Option<i128>> {
// TODO: What is Postgres behavior? E.g. if these timestamp values are i64::MIN and i32/i64::MAX,
// we needlessly have a range error.
let nanos: i64 = duration.num_nanoseconds().ok_or_else(|| {
DataFusionError::Execution("Interval value is out of range".to_string())
Expand All @@ -499,8 +549,6 @@ fn scalar_timestamp_subtract_timestamp(
(((days as i128) & 0xFFFF_FFFF) << 64)
| ((nanos_rem as i128) & 0xFFFF_FFFF_FFFF_FFFF),
))

// TODO: How can day, above, in scalar_timestamp_add_interval_month_day_nano, be negative?
}

fn change_ym(t: NaiveDateTime, y: i32, m: u32) -> Result<NaiveDateTime> {
Expand Down

0 comments on commit e9b19ec

Please sign in to comment.