Skip to content

Commit

Permalink
Move DecimalArrayDecoder to array_decoder/decimal.rs
Browse files Browse the repository at this point in the history
  • Loading branch information
Jefffrey committed Sep 29, 2024
1 parent e8400fb commit 94282a1
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 64 deletions.
49 changes: 48 additions & 1 deletion src/array_decoder/decimal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,21 @@
// under the License.

use std::cmp::Ordering;
use std::sync::Arc;

use arrow::array::ArrayRef;
use arrow::buffer::NullBuffer;
use arrow::datatypes::Decimal128Type;
use snafu::ResultExt;

use crate::encoding::decimal::UnboundedVarintStreamDecoder;
use crate::encoding::{get_rle_reader, PrimitiveValueDecoder};
use crate::error::ArrowSnafu;
use crate::proto::stream::Kind;
use crate::stripe::Stripe;
use crate::{column::Column, error::Result};

use super::{ArrayBatchDecoder, DecimalArrayDecoder, PresentDecoder};
use super::{ArrayBatchDecoder, PresentDecoder, PrimitiveArrayDecoder};

pub fn new_decimal_decoder(
column: &Column,
Expand Down Expand Up @@ -55,6 +62,46 @@ pub fn new_decimal_decoder(
)))
}

/// Wrapper around PrimitiveArrayDecoder to allow specifying the precision and scale
/// of the output decimal array.
pub struct DecimalArrayDecoder {
precision: u8,
scale: i8,
inner: PrimitiveArrayDecoder<Decimal128Type>,
}

impl DecimalArrayDecoder {
pub fn new(
precision: u8,
scale: i8,
iter: Box<dyn PrimitiveValueDecoder<i128> + Send>,
present: Option<PresentDecoder>,
) -> Self {
let inner = PrimitiveArrayDecoder::<Decimal128Type>::new(iter, present);
Self {
precision,
scale,
inner,
}
}
}

impl ArrayBatchDecoder for DecimalArrayDecoder {
fn next_batch(
&mut self,
batch_size: usize,
parent_present: Option<&NullBuffer>,
) -> Result<ArrayRef> {
let array = self
.inner
.next_primitive_batch(batch_size, parent_present)?
.with_precision_and_scale(self.precision, self.scale)
.context(ArrowSnafu)?;
let array = Arc::new(array) as ArrayRef;
Ok(array)
}
}

/// This iter fixes the scales of the varints decoded as scale is specified on a per
/// varint basis, and needs to align with type specified scale
struct DecimalScaleRepairDecoder {
Expand Down
83 changes: 21 additions & 62 deletions src/array_decoder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use std::sync::Arc;
use arrow::array::{ArrayRef, BooleanArray, BooleanBufferBuilder, PrimitiveArray};
use arrow::buffer::NullBuffer;
use arrow::datatypes::ArrowNativeTypeOp;
use arrow::datatypes::{ArrowPrimitiveType, Decimal128Type};
use arrow::datatypes::ArrowPrimitiveType;
use arrow::datatypes::{DataType as ArrowDataType, Field};
use arrow::datatypes::{
Date32Type, Float32Type, Float64Type, Int16Type, Int32Type, Int64Type, Int8Type, SchemaRef,
Expand All @@ -34,7 +34,7 @@ use crate::encoding::byte::ByteRleDecoder;
use crate::encoding::float::FloatDecoder;
use crate::encoding::{get_rle_reader, PrimitiveValueDecoder};
use crate::error::{
self, ArrowSnafu, MismatchedSchemaSnafu, Result, UnexpectedSnafu, UnsupportedTypeVariantSnafu,
self, MismatchedSchemaSnafu, Result, UnexpectedSnafu, UnsupportedTypeVariantSnafu,
};
use crate::proto::stream::Kind;
use crate::schema::DataType;
Expand All @@ -56,6 +56,25 @@ mod struct_decoder;
mod timestamp;
mod union;

pub trait ArrayBatchDecoder: Send {
/// Used as base for decoding ORC columns into Arrow arrays. Provide an input `batch_size`
/// which specifies the upper limit of the number of values returned in the output array.
///
/// If parent nested type (e.g. Struct) indicates a null in it's PRESENT stream,
/// then the child doesn't have a value (similar to other nullability). So we need
/// to take care to insert these null values as Arrow requires the child to hold
/// data in the null slot of the child.
// TODO: encode nullability in generic -> for a given column in a stripe, we will always know
// upfront if we need to bother with nulls or not, so we don't need to keep checking this
// for every invocation of next_batch
// NOTE: null parent may have non-null child, so would still have to account for this
fn next_batch(
&mut self,
batch_size: usize,
parent_present: Option<&NullBuffer>,
) -> Result<ArrayRef>;
}

struct PrimitiveArrayDecoder<T: ArrowPrimitiveType> {
iter: Box<dyn PrimitiveValueDecoder<T::Native> + Send>,
present: Option<PresentDecoder>,
Expand Down Expand Up @@ -112,47 +131,6 @@ type Float32ArrayDecoder = PrimitiveArrayDecoder<Float32Type>;
type Float64ArrayDecoder = PrimitiveArrayDecoder<Float64Type>;
type DateArrayDecoder = PrimitiveArrayDecoder<Date32Type>; // TODO: does ORC encode as i64 or i32?

/// Wrapper around PrimitiveArrayDecoder to allow specifying the precision and scale
/// of the output decimal array.
// TODO: move into array_decoder/decimal.rs
struct DecimalArrayDecoder {
precision: u8,
scale: i8,
inner: PrimitiveArrayDecoder<Decimal128Type>,
}

impl DecimalArrayDecoder {
pub fn new(
precision: u8,
scale: i8,
iter: Box<dyn PrimitiveValueDecoder<i128> + Send>,
present: Option<PresentDecoder>,
) -> Self {
let inner = PrimitiveArrayDecoder::<Decimal128Type>::new(iter, present);
Self {
precision,
scale,
inner,
}
}
}

impl ArrayBatchDecoder for DecimalArrayDecoder {
fn next_batch(
&mut self,
batch_size: usize,
parent_present: Option<&NullBuffer>,
) -> Result<ArrayRef> {
let array = self
.inner
.next_primitive_batch(batch_size, parent_present)?
.with_precision_and_scale(self.precision, self.scale)
.context(ArrowSnafu)?;
let array = Arc::new(array) as ArrayRef;
Ok(array)
}
}

struct BooleanArrayDecoder {
iter: Box<dyn PrimitiveValueDecoder<bool> + Send>,
present: Option<PresentDecoder>,
Expand Down Expand Up @@ -271,25 +249,6 @@ impl Iterator for NaiveStripeDecoder {
}
}

pub trait ArrayBatchDecoder: Send {
/// Used as base for decoding ORC columns into Arrow arrays. Provide an input `batch_size`
/// which specifies the upper limit of the number of values returned in the output array.
///
/// If parent nested type (e.g. Struct) indicates a null in it's PRESENT stream,
/// then the child doesn't have a value (similar to other nullability). So we need
/// to take care to insert these null values as Arrow requires the child to hold
/// data in the null slot of the child.
// TODO: encode nullability in generic -> for a given column in a stripe, we will always know
// upfront if we need to bother with nulls or not, so we don't need to keep checking this
// for every invocation of next_batch
// NOTE: null parent may have non-null child, so would still have to account for this
fn next_batch(
&mut self,
batch_size: usize,
parent_present: Option<&NullBuffer>,
) -> Result<ArrayRef>;
}

pub fn array_decoder_factory(
column: &Column,
field: Arc<Field>,
Expand Down
4 changes: 3 additions & 1 deletion src/array_decoder/timestamp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ use chrono::offset::TimeZone;
use chrono::TimeDelta;
use chrono_tz::{Tz, UTC};

use super::{ArrayBatchDecoder, DecimalArrayDecoder, PresentDecoder, PrimitiveArrayDecoder};
use super::{
decimal::DecimalArrayDecoder, ArrayBatchDecoder, PresentDecoder, PrimitiveArrayDecoder,
};
use crate::error::UnsupportedTypeVariantSnafu;

const NANOSECONDS_IN_SECOND: i128 = 1_000_000_000;
Expand Down

0 comments on commit 94282a1

Please sign in to comment.