Skip to content

Commit

Permalink
Refactor away macro usage in timestamp array decoder
Browse files Browse the repository at this point in the history
  • Loading branch information
Jefffrey committed Sep 22, 2024
1 parent 51571b4 commit cfb955b
Showing 1 changed file with 93 additions and 117 deletions.
210 changes: 93 additions & 117 deletions src/array_decoder/timestamp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,79 +33,46 @@ use arrow::datatypes::{
use chrono::offset::TimeZone;
use chrono::TimeDelta;
use chrono_tz::{Tz, UTC};
use snafu::ensure;

use super::{ArrayBatchDecoder, DecimalArrayDecoder, PrimitiveArrayDecoder};
use crate::error::UnsupportedTypeVariantSnafu;

const NANOSECONDS_IN_SECOND: i128 = 1_000_000_000;
const NANOSECOND_DIGITS: i8 = 9;

/// Statically dispatches to the right ArrowTimestampType based on the value of $time_unit
/// to create a $decoder_type with that type as type parameter and $iter/$present as value
/// parameters, then applies $f to it and $tz.
///
/// $f has to be generic so it cannot be a closure.
macro_rules! decoder_for_time_unit {
($column: expr, $time_unit:expr, $seconds_since_unix_epoch:expr, $stripe:expr, $tz:expr, $f:expr,) => {{
let column = $column;
let stripe = $stripe;
let data = stripe.stream_map().get(column, Kind::Data);
let data = get_rle_reader(column, data)?;
/// Seconds from ORC epoch of 1 January 2015, which serves as the 0
/// point for all timestamp values, to the UNIX epoch of 1 January 1970.
const ORC_EPOCH_UTC_SECONDS_SINCE_UNIX_EPOCH: i64 = 1_420_070_400;

fn get_timestamp_decoder<T: ArrowTimestampType + Send>(
column: &Column,
stripe: &Stripe,
seconds_since_unix_epoch: i64,
is_instant: bool,
) -> Result<Box<dyn ArrayBatchDecoder>> {
let data = stripe.stream_map().get(column, Kind::Data);
let data = get_rle_reader(column, data)?;

let secondary = stripe.stream_map().get(column, Kind::Secondary);
let secondary = get_unsigned_rle_reader(column, secondary);
let secondary = stripe.stream_map().get(column, Kind::Secondary);
let secondary = get_unsigned_rle_reader(column, secondary);

let present = get_present_vec(column, stripe)?
.map(|iter| Box::new(iter.into_iter()) as Box<dyn Iterator<Item = bool> + Send>);
let present = get_present_vec(column, stripe)?
.map(|iter| Box::new(iter.into_iter()) as Box<dyn Iterator<Item = bool> + Send>);

match $time_unit {
TimeUnit::Second => {
let iter = Box::new(TimestampIterator::<TimestampSecondType, _>::new(
$seconds_since_unix_epoch,
data,
secondary,
));
Ok(Box::new(($f)(
PrimitiveArrayDecoder::<TimestampSecondType>::new(iter, present),
$tz,
)))
}
TimeUnit::Millisecond => {
let iter = Box::new(TimestampIterator::<TimestampMillisecondType, _>::new(
$seconds_since_unix_epoch,
data,
secondary,
));
Ok(Box::new(($f)(
PrimitiveArrayDecoder::<TimestampMillisecondType>::new(iter, present),
$tz,
)))
}
TimeUnit::Microsecond => {
let iter = Box::new(TimestampIterator::<TimestampMicrosecondType, _>::new(
$seconds_since_unix_epoch,
data,
secondary,
));
Ok(Box::new(($f)(
PrimitiveArrayDecoder::<TimestampMicrosecondType>::new(iter, present),
$tz,
)))
}
TimeUnit::Nanosecond => {
let iter = Box::new(TimestampIterator::<TimestampNanosecondType, _>::new(
$seconds_since_unix_epoch,
data,
secondary,
));
Ok(Box::new(($f)(
PrimitiveArrayDecoder::<TimestampNanosecondType>::new(iter, present),
$tz,
)))
}
let iter = Box::new(TimestampIterator::<T, _>::new(
seconds_since_unix_epoch,
data,
secondary,
));
let inner = PrimitiveArrayDecoder::<T>::new(iter, present);
if is_instant {
Ok(Box::new(TimestampInstantArrayDecoder(inner)))
} else {
match stripe.writer_tz() {
Some(writer_tz) => Ok(Box::new(TimestampOffsetArrayDecoder { inner, writer_tz })),
None => Ok(Box::new(inner)),
}
}};
}
}

fn decimal128_decoder(
Expand Down Expand Up @@ -158,10 +125,6 @@ fn decimal128_decoder(
))
}

/// Seconds from ORC epoch of 1 January 2015, which serves as the 0
/// point for all timestamp values, to the UNIX epoch of 1 January 1970.
const ORC_EPOCH_UTC_SECONDS_SINCE_UNIX_EPOCH: i64 = 1_420_070_400;

/// Decodes a TIMESTAMP column stripe into batches of Timestamp{Nano,Micro,Milli,}secondArrays
/// with no timezone. Will convert timestamps from writer timezone to UTC if a writer timezone
/// is specified for the stripe.
Expand All @@ -187,37 +150,38 @@ pub fn new_timestamp_decoder(
};

match field_type {
ArrowDataType::Timestamp(time_unit, None) => match stripe.writer_tz() {
Some(writer_tz) => {
fn f<T: ArrowTimestampType>(
decoder: PrimitiveArrayDecoder<T>,
writer_tz: Tz,
) -> TimestampOffsetArrayDecoder<T> {
TimestampOffsetArrayDecoder {
inner: decoder,
writer_tz,
}
}
decoder_for_time_unit!(
column,
time_unit,
seconds_since_unix_epoch,
stripe,
writer_tz,
f,
)
}
None => {
fn f<T: ArrowTimestampType>(
decoder: PrimitiveArrayDecoder<T>,
_writer_tz: (),
) -> PrimitiveArrayDecoder<T> {
decoder
}

decoder_for_time_unit!(column, time_unit, seconds_since_unix_epoch, stripe, (), f,)
}
},
ArrowDataType::Timestamp(TimeUnit::Second, None) => {
get_timestamp_decoder::<TimestampSecondType>(
column,
stripe,
seconds_since_unix_epoch,
false,
)
}
ArrowDataType::Timestamp(TimeUnit::Millisecond, None) => {
get_timestamp_decoder::<TimestampMillisecondType>(
column,
stripe,
seconds_since_unix_epoch,
false,
)
}
ArrowDataType::Timestamp(TimeUnit::Microsecond, None) => {
get_timestamp_decoder::<TimestampMicrosecondType>(
column,
stripe,
seconds_since_unix_epoch,
false,
)
}
ArrowDataType::Timestamp(TimeUnit::Nanosecond, None) => {
get_timestamp_decoder::<TimestampNanosecondType>(
column,
stripe,
seconds_since_unix_epoch,
false,
)
}
ArrowDataType::Decimal128(Decimal128Type::MAX_PRECISION, NANOSECOND_DIGITS) => {
Ok(Box::new(decimal128_decoder(
column,
Expand All @@ -242,31 +206,43 @@ pub fn new_timestamp_instant_decoder(
stripe: &Stripe,
) -> Result<Box<dyn ArrayBatchDecoder>> {
match field_type {
ArrowDataType::Timestamp(time_unit, Some(tz)) => {
ensure!(
tz.as_ref() == "UTC",
UnsupportedTypeVariantSnafu {
msg: "Non-UTC Arrow timestamps"
}
);

fn f<T: ArrowTimestampType>(
decoder: PrimitiveArrayDecoder<T>,
_writer_tz: (),
) -> TimestampInstantArrayDecoder<T> {
TimestampInstantArrayDecoder(decoder)
}

decoder_for_time_unit!(
// TIMESTAMP_INSTANT is encoded as UTC so we don't check writer timezone in stripe
ArrowDataType::Timestamp(TimeUnit::Second, Some(tz)) if tz.as_ref() == "UTC" => {
get_timestamp_decoder::<TimestampSecondType>(
column,
stripe,
ORC_EPOCH_UTC_SECONDS_SINCE_UNIX_EPOCH,
true,
)
}
ArrowDataType::Timestamp(TimeUnit::Millisecond, Some(tz)) if tz.as_ref() == "UTC" => {
get_timestamp_decoder::<TimestampMillisecondType>(
column,
time_unit,
// TIMESTAMP_INSTANT is encoded as UTC so we don't check writer timezone in stripe
stripe,
ORC_EPOCH_UTC_SECONDS_SINCE_UNIX_EPOCH,
true,
)
}
ArrowDataType::Timestamp(TimeUnit::Microsecond, Some(tz)) if tz.as_ref() == "UTC" => {
get_timestamp_decoder::<TimestampMicrosecondType>(
column,
stripe,
(),
f,
ORC_EPOCH_UTC_SECONDS_SINCE_UNIX_EPOCH,
true,
)
}
ArrowDataType::Timestamp(TimeUnit::Nanosecond, Some(tz)) if tz.as_ref() == "UTC" => {
get_timestamp_decoder::<TimestampNanosecondType>(
column,
stripe,
ORC_EPOCH_UTC_SECONDS_SINCE_UNIX_EPOCH,
true,
)
}
ArrowDataType::Timestamp(_, Some(_)) => UnsupportedTypeVariantSnafu {
msg: "Non-UTC Arrow timestamps",
}
.fail(),
ArrowDataType::Decimal128(Decimal128Type::MAX_PRECISION, NANOSECOND_DIGITS) => {
Ok(Box::new(decimal128_decoder(
column,
Expand Down

0 comments on commit cfb955b

Please sign in to comment.