diff --git a/src/array_decoder/timestamp.rs b/src/array_decoder/timestamp.rs index 6f9115f8..4ee920b0 100644 --- a/src/array_decoder/timestamp.rs +++ b/src/array_decoder/timestamp.rs @@ -110,23 +110,7 @@ fn decimal128_decoder( let iter: Box + Send> = match writer_tz { Some(UTC) | None => Box::new(iter), - // Avoid overflowable operations - Some(writer_tz) => Box::new(iter.map(move |ts| { - let ts = ts?; - let seconds = ts.div_euclid(NANOSECONDS_IN_SECOND); - let nanoseconds = ts.rem_euclid(NANOSECONDS_IN_SECOND); - - // The addition may panic, because chrono stores dates in an i32, - // which can be overflowed with an i64 of seconds. - let dt = (writer_tz.timestamp_nanos(0) - + TimeDelta::new(seconds as i64, nanoseconds as u32) - .expect("TimeDelta duration out of bound")) - .naive_local() - .and_utc(); - - Ok((dt.timestamp() as i128) * NANOSECONDS_IN_SECOND - + (dt.timestamp_subsec_nanos() as i128)) - })), + Some(writer_tz) => Box::new(TimestampNanosecondAsDecimalWithTzIterator(iter, writer_tz)), }; Ok(DecimalArrayDecoder::new( @@ -298,3 +282,35 @@ impl ArrayBatchDecoder for TimestampInstantArrayDecoder) -> Result { + let ts = ts?; + let seconds = ts.div_euclid(NANOSECONDS_IN_SECOND); + let nanoseconds = ts.rem_euclid(NANOSECONDS_IN_SECOND); + + // The addition may panic, because chrono stores dates in an i32, + // which can be overflowed with an i64 of seconds. + let dt = (self.1.timestamp_nanos(0) + + TimeDelta::new(seconds as i64, nanoseconds as u32) + .expect("TimeDelta duration out of bound")) + .naive_local() + .and_utc(); + + Ok( + (dt.timestamp() as i128) * NANOSECONDS_IN_SECOND + + (dt.timestamp_subsec_nanos() as i128), + ) + } +} + +impl Iterator for TimestampNanosecondAsDecimalWithTzIterator { + type Item = Result; + + fn next(&mut self) -> Option { + let ts = self.0.next()?; + Some(self.next_inner(ts)) + } +}