Skip to content

Commit

Permalink
Simplify generic usage in timestamp decoder
Browse files Browse the repository at this point in the history
  • Loading branch information
Jefffrey committed Sep 22, 2024
1 parent 6e9f957 commit 3e66b83
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 20 deletions.
17 changes: 8 additions & 9 deletions src/array_decoder/timestamp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@ use std::sync::Arc;
use crate::{
array_decoder::ArrowDataType,
column::{get_present_vec, Column},
encoding::{get_rle_reader, get_unsigned_rle_reader, timestamp::TimestampIterator},
encoding::{
get_rle_reader, get_unsigned_rle_reader,
timestamp::{TimestampIterator, TimestampNanosecondAsDecimalIterator},
},
error::{MismatchedSchemaSnafu, Result},
proto::stream::Kind,
stripe::Stripe,
Expand Down Expand Up @@ -58,7 +61,7 @@ fn get_inner_timestamp_decoder<T: ArrowTimestampType + Send>(
let present = get_present_vec(column, stripe)?
.map(|iter| Box::new(iter.into_iter()) as Box<dyn Iterator<Item = bool> + Send>);

let iter = Box::new(TimestampIterator::<T, _>::new(
let iter = Box::new(TimestampIterator::<T>::new(
seconds_since_unix_epoch,
data,
secondary,
Expand Down Expand Up @@ -103,14 +106,11 @@ fn decimal128_decoder(
let present = get_present_vec(column, stripe)?
.map(|iter| Box::new(iter.into_iter()) as Box<dyn Iterator<Item = bool> + Send>);

let iter = TimestampIterator::<TimestampNanosecondType, i128>::new(
seconds_since_unix_epoch,
data,
secondary,
);
let iter = TimestampNanosecondAsDecimalIterator::new(seconds_since_unix_epoch, data, secondary);

let iter: Box<dyn Iterator<Item = _> + Send> = match writer_tz {
Some(UTC) => Box::new(iter), // Avoid overflow-able operations below
Some(UTC) | None => Box::new(iter),
// Avoid overflowable operations
Some(writer_tz) => Box::new(iter.map(move |ts| {
let ts = ts?;
let seconds = ts.div_euclid(NANOSECONDS_IN_SECOND);
Expand All @@ -127,7 +127,6 @@ fn decimal128_decoder(
Ok((dt.timestamp() as i128) * NANOSECONDS_IN_SECOND
+ (dt.timestamp_subsec_nanos() as i128))
})),
None => Box::new(iter),
};

Ok(DecimalArrayDecoder::new(
Expand Down
84 changes: 73 additions & 11 deletions src/encoding/timestamp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@ use crate::error::{DecodeTimestampSnafu, Result};

const NANOSECONDS_IN_SECOND: i64 = 1_000_000_000;

pub struct TimestampIterator<T: ArrowTimestampType, Item: TryFrom<i128>> {
pub struct TimestampIterator<T: ArrowTimestampType> {
base_from_epoch: i64,
data: Box<dyn Iterator<Item = Result<i64>> + Send>,
secondary: Box<dyn Iterator<Item = Result<i64>> + Send>,
_marker: PhantomData<(T, Item)>,
_marker: PhantomData<T>,
}

impl<T: ArrowTimestampType, Item: TryFrom<i128>> TimestampIterator<T, Item> {
impl<T: ArrowTimestampType> TimestampIterator<T> {
pub fn new(
base_from_epoch: i64,
data: Box<dyn Iterator<Item = Result<i64>> + Send>,
Expand All @@ -46,30 +46,72 @@ impl<T: ArrowTimestampType, Item: TryFrom<i128>> TimestampIterator<T, Item> {
}
}

impl<T: ArrowTimestampType, Item: TryFrom<i128>> Iterator for TimestampIterator<T, Item> {
type Item = Result<Item>;
impl<T: ArrowTimestampType> Iterator for TimestampIterator<T> {
type Item = Result<i64>;

fn next(&mut self) -> Option<Self::Item> {
// TODO: throw error for mismatched stream lengths?
let (seconds_since_orc_base, nanoseconds) =
self.data.by_ref().zip(self.secondary.by_ref()).next()?;
decode_timestamp::<T, _>(self.base_from_epoch, seconds_since_orc_base, nanoseconds)
.transpose()
Some(decode_timestamp::<T>(
self.base_from_epoch,
seconds_since_orc_base,
nanoseconds,
))
}
}

/// Arrow TimestampNanosecond type cannot represent the full datetime range of
/// the ORC Timestamp type, so this iterator provides the ability to decode the
/// raw nanoseconds without restricting it to the Arrow TimestampNanosecond range.
pub struct TimestampNanosecondAsDecimalIterator {
base_from_epoch: i64,
data: Box<dyn Iterator<Item = Result<i64>> + Send>,
secondary: Box<dyn Iterator<Item = Result<i64>> + Send>,
}

impl TimestampNanosecondAsDecimalIterator {
pub fn new(
base_from_epoch: i64,
data: Box<dyn Iterator<Item = Result<i64>> + Send>,
secondary: Box<dyn Iterator<Item = Result<i64>> + Send>,
) -> Self {
Self {
base_from_epoch,
data,
secondary,
}
}
}

fn decode_timestamp<T: ArrowTimestampType, Ret: TryFrom<i128>>(
impl Iterator for TimestampNanosecondAsDecimalIterator {
type Item = Result<i128>;

fn next(&mut self) -> Option<Self::Item> {
// TODO: throw error for mismatched stream lengths?
let (seconds_since_orc_base, nanoseconds) =
self.data.by_ref().zip(self.secondary.by_ref()).next()?;
Some(decode_timestamp_as_i128(
self.base_from_epoch,
seconds_since_orc_base,
nanoseconds,
))
}
}

fn decode(
base: i64,
seconds_since_orc_base: Result<i64>,
nanoseconds: Result<i64>,
) -> Result<Option<Ret>> {
) -> Result<(i128, i64, u64)> {
let data = seconds_since_orc_base?;
// TODO
// TODO: is this a safe cast?
let mut nanoseconds = nanoseconds? as u64;
// Last 3 bits indicate how many trailing zeros were truncated
let zeros = nanoseconds & 0x7;
nanoseconds >>= 3;
// Multiply by powers of 10 to get back the trailing zeros
// TODO: would it be more efficient to unroll this? (if LLVM doesn't already do so)
if zeros != 0 {
nanoseconds *= 10_u64.pow(zeros as u32 + 1);
}
Expand All @@ -88,6 +130,16 @@ fn decode_timestamp<T: ArrowTimestampType, Ret: TryFrom<i128>>(
// while we encode them as a single i64 of nanoseconds in Arrow.
let nanoseconds_since_epoch =
(seconds as i128 * NANOSECONDS_IN_SECOND as i128) + (nanoseconds as i128);
Ok((nanoseconds_since_epoch, seconds, nanoseconds))
}

fn decode_timestamp<T: ArrowTimestampType>(
base: i64,
seconds_since_orc_base: Result<i64>,
nanoseconds: Result<i64>,
) -> Result<i64> {
let (nanoseconds_since_epoch, seconds, nanoseconds) =
decode(base, seconds_since_orc_base, nanoseconds)?;

let nanoseconds_in_timeunit = match T::UNIT {
TimeUnit::Second => 1_000_000_000,
Expand All @@ -97,6 +149,7 @@ fn decode_timestamp<T: ArrowTimestampType, Ret: TryFrom<i128>>(
};

// Error if loss of precision
// TODO: make this configurable (e.g. can succeed but truncate)
ensure!(
nanoseconds_since_epoch % nanoseconds_in_timeunit == 0,
DecodeTimestampSnafu {
Expand All @@ -118,5 +171,14 @@ fn decode_timestamp<T: ArrowTimestampType, Ret: TryFrom<i128>>(
.fail()
})?;

Ok(Some(num_since_epoch))
Ok(num_since_epoch)
}

fn decode_timestamp_as_i128(
base: i64,
seconds_since_orc_base: Result<i64>,
nanoseconds: Result<i64>,
) -> Result<i128> {
let (nanoseconds_since_epoch, _, _) = decode(base, seconds_since_orc_base, nanoseconds)?;
Ok(nanoseconds_since_epoch)
}

0 comments on commit 3e66b83

Please sign in to comment.