Skip to content

Commit

Permalink
Make present decoding batch based using NullBuffer
Browse files Browse the repository at this point in the history
  • Loading branch information
Jefffrey committed Sep 29, 2024
1 parent 790744d commit e8400fb
Show file tree
Hide file tree
Showing 10 changed files with 162 additions and 170 deletions.
6 changes: 2 additions & 4 deletions src/array_decoder/decimal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,13 @@

use std::cmp::Ordering;

use crate::column::get_present_vec;
use crate::encoding::decimal::UnboundedVarintStreamDecoder;
use crate::encoding::{get_rle_reader, PrimitiveValueDecoder};
use crate::proto::stream::Kind;
use crate::stripe::Stripe;
use crate::{column::Column, error::Result};

use super::{ArrayBatchDecoder, DecimalArrayDecoder};
use super::{ArrayBatchDecoder, DecimalArrayDecoder, PresentDecoder};

pub fn new_decimal_decoder(
column: &Column,
Expand All @@ -39,8 +38,7 @@ pub fn new_decimal_decoder(
let scale_iter = stripe.stream_map().get(column, Kind::Secondary);
let scale_iter = get_rle_reader::<i32, _>(column, scale_iter)?;

let present = get_present_vec(column, stripe)?
.map(|iter| Box::new(iter.into_iter()) as Box<dyn Iterator<Item = bool> + Send>);
let present = PresentDecoder::from_stripe(stripe, column);

let iter = DecimalScaleRepairDecoder {
varint_iter,
Expand Down
14 changes: 7 additions & 7 deletions src/array_decoder/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,26 +23,25 @@ use arrow::datatypes::{Field, FieldRef};
use snafu::ResultExt;

use crate::array_decoder::derive_present_vec;
use crate::column::{get_present_vec, Column};
use crate::column::Column;
use crate::encoding::{get_unsigned_rle_reader, PrimitiveValueDecoder};
use crate::proto::stream::Kind;

use crate::error::{ArrowSnafu, Result};
use crate::stripe::Stripe;

use super::{array_decoder_factory, ArrayBatchDecoder};
use super::{array_decoder_factory, ArrayBatchDecoder, PresentDecoder};

pub struct ListArrayDecoder {
inner: Box<dyn ArrayBatchDecoder>,
present: Option<Box<dyn Iterator<Item = bool> + Send>>,
present: Option<PresentDecoder>,
lengths: Box<dyn PrimitiveValueDecoder<i64> + Send>,
field: FieldRef,
}

impl ListArrayDecoder {
pub fn new(column: &Column, field: Arc<Field>, stripe: &Stripe) -> Result<Self> {
let present = get_present_vec(column, stripe)?
.map(|iter| Box::new(iter.into_iter()) as Box<dyn Iterator<Item = bool> + Send>);
let present = PresentDecoder::from_stripe(stripe, column);

let child = &column.children()[0];
let inner = array_decoder_factory(child, field.clone(), stripe)?;
Expand All @@ -63,9 +62,10 @@ impl ArrayBatchDecoder for ListArrayDecoder {
fn next_batch(
&mut self,
batch_size: usize,
parent_present: Option<&[bool]>,
parent_present: Option<&NullBuffer>,
) -> Result<ArrayRef> {
let present = derive_present_vec(&mut self.present, parent_present, batch_size);
let present =
derive_present_vec(&mut self.present, parent_present, batch_size).transpose()?;

let mut lengths = vec![0; batch_size];
if let Some(present) = &present {
Expand Down
17 changes: 8 additions & 9 deletions src/array_decoder/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,18 @@ use arrow::datatypes::{Field, Fields};
use snafu::ResultExt;

use crate::array_decoder::derive_present_vec;
use crate::column::{get_present_vec, Column};
use crate::column::Column;
use crate::encoding::{get_unsigned_rle_reader, PrimitiveValueDecoder};
use crate::error::{ArrowSnafu, Result};
use crate::proto::stream::Kind;
use crate::stripe::Stripe;

use super::{array_decoder_factory, ArrayBatchDecoder};
use super::{array_decoder_factory, ArrayBatchDecoder, PresentDecoder};

pub struct MapArrayDecoder {
keys: Box<dyn ArrayBatchDecoder>,
values: Box<dyn ArrayBatchDecoder>,
present: Option<Box<dyn Iterator<Item = bool> + Send>>,
present: Option<PresentDecoder>,
lengths: Box<dyn PrimitiveValueDecoder<i64> + Send>,
fields: Fields,
}
Expand All @@ -46,8 +46,7 @@ impl MapArrayDecoder {
values_field: Arc<Field>,
stripe: &Stripe,
) -> Result<Self> {
let present = get_present_vec(column, stripe)?
.map(|iter| Box::new(iter.into_iter()) as Box<dyn Iterator<Item = bool> + Send>);
let present = PresentDecoder::from_stripe(stripe, column);

let keys_column = &column.children()[0];
let keys = array_decoder_factory(keys_column, keys_field.clone(), stripe)?;
Expand All @@ -74,9 +73,10 @@ impl ArrayBatchDecoder for MapArrayDecoder {
fn next_batch(
&mut self,
batch_size: usize,
parent_present: Option<&[bool]>,
parent_present: Option<&NullBuffer>,
) -> Result<ArrayRef> {
let present = derive_present_vec(&mut self.present, parent_present, batch_size);
let present =
derive_present_vec(&mut self.present, parent_present, batch_size).transpose()?;

let mut lengths = vec![0; batch_size];
if let Some(present) = &present {
Expand All @@ -94,11 +94,10 @@ impl ArrayBatchDecoder for MapArrayDecoder {
StructArray::try_new(self.fields.clone(), vec![keys_array, values_array], None)
.context(ArrowSnafu)?;
let offsets = OffsetBuffer::from_lengths(lengths.into_iter().map(|l| l as usize));
let null_buffer = present.map(NullBuffer::from);

let field = Arc::new(Field::new_struct("entries", self.fields.clone(), false));
let array =
MapArray::try_new(field, offsets, entries, null_buffer, false).context(ArrowSnafu)?;
MapArray::try_new(field, offsets, entries, present, false).context(ArrowSnafu)?;
let array = Arc::new(array);
Ok(array)
}
Expand Down
Loading

0 comments on commit e8400fb

Please sign in to comment.