Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement json_get_array UDF #36

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 92 additions & 23 deletions src/common_union.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use std::sync::{Arc, OnceLock};

use arrow::array::{Array, ArrayRef, BooleanArray, Float64Array, Int64Array, NullArray, StringArray, UnionArray};
use arrow::array::{
Array, ArrayRef, BooleanArray, Float64Array, Int64Array, ListArray, ListBuilder, NullArray, StringArray,
StringBuilder, UnionArray,
};
use arrow::buffer::Buffer;
use arrow_schema::{DataType, Field, UnionFields, UnionMode};
use datafusion_common::ScalarValue;
Expand Down Expand Up @@ -46,7 +49,7 @@ pub(crate) struct JsonUnion {
ints: Vec<Option<i64>>,
floats: Vec<Option<f64>>,
strings: Vec<Option<String>>,
arrays: Vec<Option<String>>,
arrays: Vec<Option<Vec<String>>>,
objects: Vec<Option<String>>,
type_ids: Vec<i8>,
index: usize,
Expand Down Expand Up @@ -93,24 +96,6 @@ impl JsonUnion {
}
}

/// So we can do `collect::<JsonUnion>()`
impl FromIterator<Option<JsonUnionField>> for JsonUnion {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved down to the JsonUnionField definition

fn from_iter<I: IntoIterator<Item = Option<JsonUnionField>>>(iter: I) -> Self {
let inner = iter.into_iter();
let (lower, upper) = inner.size_hint();
let mut union = Self::new(upper.unwrap_or(lower));

for opt_field in inner {
if let Some(union_field) = opt_field {
union.push(union_field);
} else {
union.push_none();
}
}
union
}
}

impl TryFrom<JsonUnion> for UnionArray {
type Error = arrow::error::ArrowError;

Expand All @@ -121,21 +106,50 @@ impl TryFrom<JsonUnion> for UnionArray {
Arc::new(Int64Array::from(value.ints)),
Arc::new(Float64Array::from(value.floats)),
Arc::new(StringArray::from(value.strings)),
Arc::new(StringArray::from(value.arrays)),
Arc::new(StringArray::from(
value
.arrays
.into_iter()
.map(|r| r.map(|e| e.join(",")))
.collect::<Vec<_>>(),
)),
Arc::new(StringArray::from(value.objects)),
];
UnionArray::try_new(union_fields(), Buffer::from_vec(value.type_ids).into(), None, children)
}
}

impl TryFrom<JsonUnion> for ListArray {
type Error = arrow::error::ArrowError;

fn try_from(value: JsonUnion) -> Result<Self, Self::Error> {
let string_builder = StringBuilder::new();
let mut list_builder = ListBuilder::new(string_builder);

for row in value.arrays {
if let Some(row) = row {
for elem in row {
list_builder.values().append_value(elem);
}

list_builder.append(true);
} else {
list_builder.append(false);
}
}

Ok(list_builder.finish())
}
}

#[derive(Debug)]
pub(crate) enum JsonUnionField {
JsonNull,
Bool(bool),
Int(i64),
Float(f64),
Str(String),
Array(String),
Array(Vec<String>),
Object(String),
}

Expand Down Expand Up @@ -193,7 +207,62 @@ impl From<JsonUnionField> for ScalarValue {
JsonUnionField::Bool(b) => Self::Boolean(Some(b)),
JsonUnionField::Int(i) => Self::Int64(Some(i)),
JsonUnionField::Float(f) => Self::Float64(Some(f)),
JsonUnionField::Str(s) | JsonUnionField::Array(s) | JsonUnionField::Object(s) => Self::Utf8(Some(s)),
JsonUnionField::Array(a) => Self::Utf8(Some(a.join(","))),
JsonUnionField::Str(s) | JsonUnionField::Object(s) => Self::Utf8(Some(s)),
}
}
}

/// So we can do `collect::<JsonUnion>()`
impl FromIterator<Option<JsonUnionField>> for JsonUnion {
fn from_iter<I: IntoIterator<Item = Option<JsonUnionField>>>(iter: I) -> Self {
let inner = iter.into_iter();
let (lower, upper) = inner.size_hint();
let mut union = Self::new(upper.unwrap_or(lower));

for opt_field in inner {
if let Some(union_field) = opt_field {
union.push(union_field);
} else {
union.push_none();
}
}
union
}
}

#[derive(Debug)]
pub(crate) struct JsonArrayField(pub(crate) Vec<String>);

impl From<JsonArrayField> for ScalarValue {
fn from(JsonArrayField(elems): JsonArrayField) -> Self {
Self::List(Self::new_list_nullable(
&elems.into_iter().map(|e| Self::Utf8(Some(e))).collect::<Vec<_>>(),
&DataType::Utf8,
))
}
}

impl From<JsonArrayField> for JsonUnionField {
fn from(JsonArrayField(elems): JsonArrayField) -> Self {
JsonUnionField::Array(elems)
}
}

impl FromIterator<Option<JsonArrayField>> for JsonUnion {
fn from_iter<T: IntoIterator<Item = Option<JsonArrayField>>>(iter: T) -> Self {
let inner = iter.into_iter();
let (lower, upper) = inner.size_hint();
let mut union = Self::new(upper.unwrap_or(lower));

for opt_field in inner {
if let Some(array_field) = opt_field {
union.push(array_field.into());
} else {
union.push_none();
}
}

union
}
}
2 changes: 1 addition & 1 deletion src/json_get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ fn build_union(jiter: &mut Jiter, peek: Peek) -> Result<JsonUnionField, GetError
jiter.known_skip(peek)?;
let array_slice = jiter.slice_to_current(start);
let array_string = std::str::from_utf8(array_slice)?;
Ok(JsonUnionField::Array(array_string.to_owned()))
Ok(JsonUnionField::Array(vec![array_string.to_owned()]))
}
Peek::Object => {
let start = jiter.current_index();
Expand Down
95 changes: 95 additions & 0 deletions src/json_get_array.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
use std::any::Any;
use std::sync::Arc;

use arrow::array::ListArray;
use arrow_schema::{DataType, Field};
use datafusion_common::arrow::array::ArrayRef;
use datafusion_common::{Result as DataFusionResult, ScalarValue};
use datafusion_expr::{ColumnarValue, ScalarUDFImpl, Signature, Volatility};
use jiter::Peek;

use crate::common::{check_args, get_err, invoke, jiter_json_find, GetError, JsonPath};
use crate::common_macros::make_udf_function;
use crate::common_union::{JsonArrayField, JsonUnion};

make_udf_function!(
JsonGetArray,
json_get_array,
json_data path,
r#"Get an arrow array value from a JSON string by its "path""#
);

#[derive(Debug)]
pub(super) struct JsonGetArray {
signature: Signature,
aliases: [String; 1],
}

impl Default for JsonGetArray {
fn default() -> Self {
Self {
signature: Signature::variadic_any(Volatility::Immutable),
aliases: ["json_get_array".to_string()],
}
}
}

impl ScalarUDFImpl for JsonGetArray {
fn as_any(&self) -> &dyn Any {
self
}

fn name(&self) -> &str {
self.aliases[0].as_str()
}

fn signature(&self) -> &Signature {
&self.signature
}

fn return_type(&self, arg_types: &[DataType]) -> DataFusionResult<DataType> {
check_args(arg_types, self.name()).map(|()| DataType::List(Field::new("item", DataType::Utf8, true).into()))
}

fn invoke(&self, args: &[ColumnarValue]) -> DataFusionResult<ColumnarValue> {
let to_array = |c: JsonUnion| {
let array: ListArray = c.try_into()?;
Ok(Arc::new(array) as ArrayRef)
};

invoke::<JsonUnion, JsonArrayField>(args, jiter_json_get_array, to_array, |i| {
i.map_or_else(|| ScalarValue::Null, Into::into)
})
}

fn aliases(&self) -> &[String] {
&self.aliases
}
}

fn jiter_json_get_array(json_data: Option<&str>, path: &[JsonPath]) -> Result<JsonArrayField, GetError> {
if let Some((mut jiter, peek)) = jiter_json_find(json_data, path) {
match peek {
Peek::Array => {
let mut peek_opt = jiter.known_array()?;
let mut elements = Vec::new();

while let Some(peek) = peek_opt {
let start = jiter.current_index();
jiter.known_skip(peek)?;
let object_slice = jiter.slice_to_current(start);
let object_string = std::str::from_utf8(object_slice)?;

elements.push(object_string.to_owned());

peek_opt = jiter.array_step()?;
}

Ok(JsonArrayField(elements))
}
_ => get_err!(),
}
} else {
get_err!()
}
}
4 changes: 4 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ mod common_union;
mod json_as_text;
mod json_contains;
mod json_get;
mod json_get_array;
mod json_get_bool;
mod json_get_float;
mod json_get_int;
Expand All @@ -22,6 +23,7 @@ pub mod functions {
pub use crate::json_as_text::json_as_text;
pub use crate::json_contains::json_contains;
pub use crate::json_get::json_get;
pub use crate::json_get_array::json_get_array;
pub use crate::json_get_bool::json_get_bool;
pub use crate::json_get_float::json_get_float;
pub use crate::json_get_int::json_get_int;
Expand All @@ -34,6 +36,7 @@ pub mod udfs {
pub use crate::json_as_text::json_as_text_udf;
pub use crate::json_contains::json_contains_udf;
pub use crate::json_get::json_get_udf;
pub use crate::json_get_array::json_get_array_udf;
pub use crate::json_get_bool::json_get_bool_udf;
pub use crate::json_get_float::json_get_float_udf;
pub use crate::json_get_int::json_get_int_udf;
Expand All @@ -54,6 +57,7 @@ pub mod udfs {
pub fn register_all(registry: &mut dyn FunctionRegistry) -> Result<()> {
let functions: Vec<Arc<ScalarUDF>> = vec![
json_get::json_get_udf(),
json_get_array::json_get_array_udf(),
json_get_bool::json_get_bool_udf(),
json_get_float::json_get_float_udf(),
json_get_int::json_get_int_udf(),
Expand Down
2 changes: 1 addition & 1 deletion src/rewrite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ fn unnest_json_calls(func: &ScalarFunction) -> Option<Transformed<Expr>> {
fn extract_scalar_function(expr: &Expr) -> Option<&ScalarFunction> {
match expr {
Expr::ScalarFunction(func) => Some(func),
Expr::Alias(alias) => extract_scalar_function(&*alias.expr),
Expr::Alias(alias) => extract_scalar_function(&alias.expr),
_ => None,
}
}
Expand Down
Loading
Loading