Skip to content

Commit

Permalink
fix: use ARROW:schema instead of LogicalType to embed Date64 type
Browse files Browse the repository at this point in the history
  • Loading branch information
dsgibbons committed Sep 27, 2024
1 parent 4034040 commit 6ff08a2
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 88 deletions.
68 changes: 8 additions & 60 deletions parquet/src/arrow/array_reader/primitive_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,8 +208,14 @@ where
// As there is not always a 1:1 mapping between Arrow and Parquet, there
// are datatypes which we must convert explicitly.
// These are:
// - date64: cast int32 to date32, then date32 to date64.
// - decimal: cast int32 to decimal, int64 to decimal
let array = match target_type {
ArrowType::Date64 if *(array.data_type()) == ArrowType::Int32 => {
// this is cheap as it internally reinterprets the data
let a = arrow_cast::cast(&array, &ArrowType::Date32)?;
arrow_cast::cast(&a, target_type)?
}
ArrowType::Decimal128(p, s) => {
// Apply conversion to all elements regardless of null slots as the conversion
// to `i128` is infallible. This improves performance by avoiding a branch in
Expand Down Expand Up @@ -299,9 +305,9 @@ mod tests {
use crate::util::test_common::rand_gen::make_pages;
use crate::util::InMemoryPageIterator;
use arrow::datatypes::ArrowPrimitiveType;
use arrow_array::{Array, Date32Array, Date64Array, PrimitiveArray};
use arrow_array::{Array, Date32Array, PrimitiveArray};

use arrow::datatypes::DataType::{Date32, Date64, Decimal128};
use arrow::datatypes::DataType::{Date32, Decimal128};
use rand::distributions::uniform::SampleUniform;
use std::collections::VecDeque;

Expand Down Expand Up @@ -539,14 +545,6 @@ mod tests {
arrow::datatypes::Int32Type,
i32
);
test_primitive_array_reader_one_type!(
crate::data_type::Int64Type,
PhysicalType::INT64,
"DATE",
arrow::datatypes::Date64Type,
arrow::datatypes::Int64Type,
i64
);
test_primitive_array_reader_one_type!(
crate::data_type::Int32Type,
PhysicalType::INT32,
Expand Down Expand Up @@ -835,54 +833,4 @@ mod tests {
assert_eq!(array, &data_date_array);
}
}

#[test]
fn test_primitive_array_reader_date64_type() {
// parquet `INT64` to date
let message_type = "
message test_schema {
REQUIRED INT64 date1 (DATE);
}
";
let schema = parse_message_type(message_type)
.map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
.unwrap();
let column_desc = schema.column(0);

// create the array reader
{
let mut data = Vec::new();
let mut page_lists = Vec::new();
make_column_chunks::<Int64Type>(
column_desc.clone(),
Encoding::PLAIN,
100,
-999999999999999999,
999999999999999999,
&mut Vec::new(),
&mut Vec::new(),
&mut data,
&mut page_lists,
true,
2,
);
let page_iterator = InMemoryPageIterator::new(page_lists);

let mut array_reader =
PrimitiveArrayReader::<Int64Type>::new(Box::new(page_iterator), column_desc, None)
.unwrap();

// read data from the reader
// the data type is date
let array = array_reader.next_batch(50).unwrap();
assert_eq!(array.data_type(), &Date64);
let array = array.as_any().downcast_ref::<Date64Array>().unwrap();
let data_date_array = data[0..50]
.iter()
.copied()
.map(Some)
.collect::<Date64Array>();
assert_eq!(array, &data_date_array);
}
}
}
25 changes: 12 additions & 13 deletions parquet/src/arrow/schema/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -413,15 +413,18 @@ fn arrow_to_parquet_type(field: &Field, coerce_types: bool) -> Result<Type> {
.with_id(id)
.build(),
DataType::Date64 => {
let physical_type = match coerce_types {
true => PhysicalType::INT32,
false => PhysicalType::INT64,
};
Type::primitive_type_builder(name, physical_type)
.with_logical_type(Some(LogicalType::Date))
.with_repetition(repetition)
.with_id(id)
.build()
if coerce_types {
Type::primitive_type_builder(name, PhysicalType::INT32)
.with_logical_type(Some(LogicalType::Date))
.with_repetition(repetition)
.with_id(id)
.build()
} else {
Type::primitive_type_builder(name, PhysicalType::INT64)
.with_repetition(repetition)
.with_id(id)
.build()
}
},
DataType::Time32(TimeUnit::Second) => {
// Cannot represent seconds in LogicalType
Expand Down Expand Up @@ -1328,7 +1331,6 @@ mod tests {
OPTIONAL BINARY string (UTF8);
REPEATED BOOLEAN bools;
OPTIONAL INT32 date32 (DATE);
OPTIONAL INT64 date64 (DATE);
OPTIONAL INT32 time_milli (TIME_MILLIS);
OPTIONAL INT64 time_micro (TIME_MICROS);
OPTIONAL INT64 time_nano (TIME(NANOS,false));
Expand Down Expand Up @@ -1370,7 +1372,6 @@ mod tests {
false,
),
Field::new("date32", DataType::Date32, true),
Field::new("date64", DataType::Date64, true),
Field::new("time_milli", DataType::Time32(TimeUnit::Millisecond), true),
Field::new("time_micro", DataType::Time64(TimeUnit::Microsecond), true),
Field::new("time_nano", DataType::Time64(TimeUnit::Nanosecond), true),
Expand Down Expand Up @@ -1436,7 +1437,6 @@ mod tests {
}
}
OPTIONAL INT32 date32 (DATE);
OPTIONAL INT64 date64 (DATE);
OPTIONAL INT32 time_milli (TIME(MILLIS,false));
OPTIONAL INT32 time_milli_utc (TIME(MILLIS,true));
OPTIONAL INT64 time_micro (TIME_MICROS);
Expand Down Expand Up @@ -1490,7 +1490,6 @@ mod tests {
false,
),
Field::new("date32", DataType::Date32, true),
Field::new("date64", DataType::Date64, true),
Field::new("time_milli", DataType::Time32(TimeUnit::Millisecond), true),
Field::new("time_milli_utc", DataType::Time32(TimeUnit::Millisecond), true)
.with_metadata(HashMap::from_iter(vec![("adjusted_to_utc".to_string(), "".to_string())])),
Expand Down
2 changes: 0 additions & 2 deletions parquet/src/arrow/schema/primitive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,6 @@ fn from_int64(info: &BasicTypeInfo, scale: i32, precision: i32) -> Result<DataTy
true => Ok(DataType::Int64),
false => Ok(DataType::UInt64),
},
(Some(LogicalType::Date), _) => Ok(DataType::Date64),
(Some(LogicalType::Time { unit, .. }), _) => match unit {
ParquetTimeUnit::MILLIS(_) => {
Err(arrow_err!("Cannot create INT64 from MILLIS time unit",))
Expand Down Expand Up @@ -229,7 +228,6 @@ fn from_int64(info: &BasicTypeInfo, scale: i32, precision: i32) -> Result<DataTy
)),
(None, ConvertedType::INT_64) => Ok(DataType::Int64),
(None, ConvertedType::UINT_64) => Ok(DataType::UInt64),
(None, ConvertedType::DATE) => Ok(DataType::Date64),
(None, ConvertedType::TIME_MICROS) => Ok(DataType::Time64(TimeUnit::Microsecond)),
(None, ConvertedType::TIMESTAMP_MILLIS) => Ok(DataType::Timestamp(
TimeUnit::Millisecond,
Expand Down
15 changes: 2 additions & 13 deletions parquet/src/schema/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,6 @@ impl<'a> PrimitiveTypeBuilder<'a> {
self.check_decimal_precision_scale()?;
}
(LogicalType::Date, PhysicalType::INT32) => {}
(LogicalType::Date, PhysicalType::INT64) => {}
(
LogicalType::Time {
unit: TimeUnit::MILLIS(_),
Expand Down Expand Up @@ -410,18 +409,8 @@ impl<'a> PrimitiveTypeBuilder<'a> {
ConvertedType::DECIMAL => {
self.check_decimal_precision_scale()?;
}
ConvertedType::DATE => {
if !(self.physical_type == PhysicalType::INT32
|| self.physical_type == PhysicalType::INT64)
{
return Err(general_err!(
"{} cannot annotate field '{}' because it is not a INT32 or a INT64 field",
self.converted_type,
self.name
));
}
}
ConvertedType::TIME_MILLIS
ConvertedType::DATE
| ConvertedType::TIME_MILLIS
| ConvertedType::UINT_8
| ConvertedType::UINT_16
| ConvertedType::UINT_32
Expand Down

0 comments on commit 6ff08a2

Please sign in to comment.