Skip to content

Commit

Permalink
feat(cubesql): CubeScan - don't clone strings for non stream response (
Browse files Browse the repository at this point in the history
  • Loading branch information
ovr authored Aug 26, 2024
1 parent 5ca76db commit df364be
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 45 deletions.
3 changes: 2 additions & 1 deletion packages/cubejs-backend-native/src/stream.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use cubesql::compile::engine::df::scan::{
transform_response, FieldValue, MemberField, RecordBatch, SchemaRef, ValueObject,
};
use std::borrow::Cow;

use std::cell::RefCell;
use std::future::Future;
Expand Down Expand Up @@ -211,7 +212,7 @@ impl ValueObject for JsValueObject<'_> {
CubeError::user(format!("Can't get '{}' field value: {}", field_name, e))
})?;
if let Ok(s) = value.downcast::<JsString, _>(&mut self.cx) {
Ok(FieldValue::String(s.value(&mut self.cx)))
Ok(FieldValue::String(Cow::Owned(s.value(&mut self.cx))))
} else if let Ok(n) = value.downcast::<JsNumber, _>(&mut self.cx) {
Ok(FieldValue::Number(n.value(&mut self.cx)))
} else if let Ok(b) = value.downcast::<JsBoolean, _>(&mut self.cx) {
Expand Down
101 changes: 57 additions & 44 deletions rust/cubesql/cubesql/src/compile/engine/df/scan.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,3 @@
use std::{
any::Any,
fmt,
sync::Arc,
task::{Context, Poll},
};

use async_trait::async_trait;
use cubeclient::models::{V1LoadRequestQuery, V1LoadResult, V1LoadResultAnnotation};
pub use datafusion::{
Expand All @@ -27,6 +20,13 @@ pub use datafusion::{
};
use futures::Stream;
use log::warn;
use std::{
any::Any,
borrow::Cow,
fmt,
sync::Arc,
task::{Context, Poll},
};

use crate::{
compile::{
Expand Down Expand Up @@ -446,8 +446,12 @@ struct CubeScanExecutionPlan {
}

#[derive(Debug)]
pub enum FieldValue {
String(String),
pub enum FieldValue<'a> {
// Why Cow?
// We use N-API via Neon (only for streaming), which doesn't allow us to build string reference,
// because V8 uses UTF-16 It allocates/converts a new strings while doing JsString.value()
// @see v8 WriteUtf8 for more details. Cow::Owned is used for this variant
String(Cow<'a, str>),
Number(f64),
Bool(bool),
Null,
Expand Down Expand Up @@ -475,31 +479,26 @@ impl ValueObject for JsonValueObject {
Ok(self.rows.len())
}

fn get<'a>(
&'a mut self,
fn get(
&mut self,
index: usize,
field_name: &str,
) -> std::result::Result<FieldValue, CubeError> {
let option = self.rows[index].as_object_mut();
let as_object = if let Some(as_object) = option {
as_object
} else {
let Some(as_object) = self.rows[index].as_object() else {
return Err(CubeError::user(format!(
"Unexpected response from Cube, row is not an object: {:?}",
self.rows[index]
)));
};
let value = as_object
.get(field_name)
.unwrap_or(&Value::Null)
// TODO expose strings as references to avoid clonning
.clone();

let value = as_object.get(field_name).unwrap_or(&Value::Null);

Ok(match value {
Value::String(s) => FieldValue::String(s),
Value::String(s) => FieldValue::String(Cow::Borrowed(s)),
Value::Number(n) => FieldValue::Number(n.as_f64().ok_or(
DataFusionError::Execution(format!("Can't convert {:?} to float", n)),
)?),
Value::Bool(b) => FieldValue::Bool(b),
Value::Bool(b) => FieldValue::Bool(*b),
Value::Null => FieldValue::Null,
x => {
return Err(CubeError::user(format!(
Expand Down Expand Up @@ -851,12 +850,15 @@ async fn load_data(
.map(|v| v.iter().filter(|d| d.granularity.is_some()).count())
.unwrap_or(0)
== 0;

let result = if no_members_query {
let limit = request.limit.unwrap_or(1);
let mut data = Vec::new();

for _ in 0..limit {
data.push(serde_json::Value::Null)
}

V1LoadResult::new(
V1LoadResultAnnotation {
measures: json!(Vec::<serde_json::Value>::new()),
Expand All @@ -881,9 +883,9 @@ async fn load_data(

data
} else {
return Err(ArrowError::ComputeError(format!(
"Unable to extract result from Cube.js response",
)));
return Err(ArrowError::ComputeError(
"Unable to extract results from response: results is empty".to_string(),
));
}
};

Expand Down Expand Up @@ -1011,7 +1013,7 @@ pub fn transform_response<V: ValueObject>(
field_name,
{
(FieldValue::Number(number), builder) => builder.append_value(number.round() as i64)?,
(FieldValue::String(s), builder) => match s.parse::<i64>() {
(FieldValue::String(s), builder) => match s.parse::<i64>() {
Ok(v) => builder.append_value(v)?,
Err(error) => {
warn!(
Expand Down Expand Up @@ -1086,7 +1088,7 @@ pub fn transform_response<V: ValueObject>(
field_name,
{
(FieldValue::Bool(v), builder) => builder.append_value(v)?,
(FieldValue::String(v), builder) => match v.as_str() {
(FieldValue::String(v), builder) => match v.as_ref() {
"true" | "1" => builder.append_value(true)?,
"false" | "0" => builder.append_value(false)?,
_ => {
Expand All @@ -1109,12 +1111,12 @@ pub fn transform_response<V: ValueObject>(
field_name,
{
(FieldValue::String(s), builder) => {
let timestamp = NaiveDateTime::parse_from_str(s.as_str(), "%Y-%m-%dT%H:%M:%S%.f")
.or_else(|_| NaiveDateTime::parse_from_str(s.as_str(), "%Y-%m-%d %H:%M:%S%.f"))
.or_else(|_| NaiveDateTime::parse_from_str(s.as_str(), "%Y-%m-%dT%H:%M:%S"))
.or_else(|_| NaiveDateTime::parse_from_str(s.as_str(), "%Y-%m-%dT%H:%M:%S%.fZ"))
let timestamp = NaiveDateTime::parse_from_str(s.as_ref(), "%Y-%m-%dT%H:%M:%S%.f")
.or_else(|_| NaiveDateTime::parse_from_str(s.as_ref(), "%Y-%m-%d %H:%M:%S%.f"))
.or_else(|_| NaiveDateTime::parse_from_str(s.as_ref(), "%Y-%m-%dT%H:%M:%S"))
.or_else(|_| NaiveDateTime::parse_from_str(s.as_ref(), "%Y-%m-%dT%H:%M:%S%.fZ"))
.or_else(|_| {
NaiveDate::parse_from_str(s.as_str(), "%Y-%m-%d").map(|date| {
NaiveDate::parse_from_str(s.as_ref(), "%Y-%m-%d").map(|date| {
date.and_hms_opt(0, 0, 0).unwrap()
})
})
Expand Down Expand Up @@ -1145,12 +1147,12 @@ pub fn transform_response<V: ValueObject>(
field_name,
{
(FieldValue::String(s), builder) => {
let timestamp = NaiveDateTime::parse_from_str(s.as_str(), "%Y-%m-%dT%H:%M:%S%.f")
.or_else(|_| NaiveDateTime::parse_from_str(s.as_str(), "%Y-%m-%d %H:%M:%S%.f"))
.or_else(|_| NaiveDateTime::parse_from_str(s.as_str(), "%Y-%m-%dT%H:%M:%S"))
.or_else(|_| NaiveDateTime::parse_from_str(s.as_str(), "%Y-%m-%dT%H:%M:%S%.fZ"))
let timestamp = NaiveDateTime::parse_from_str(s.as_ref(), "%Y-%m-%dT%H:%M:%S%.f")
.or_else(|_| NaiveDateTime::parse_from_str(s.as_ref(), "%Y-%m-%d %H:%M:%S%.f"))
.or_else(|_| NaiveDateTime::parse_from_str(s.as_ref(), "%Y-%m-%dT%H:%M:%S"))
.or_else(|_| NaiveDateTime::parse_from_str(s.as_ref(), "%Y-%m-%dT%H:%M:%S%.fZ"))
.or_else(|_| {
NaiveDate::parse_from_str(s.as_str(), "%Y-%m-%d").map(|date| {
NaiveDate::parse_from_str(s.as_ref(), "%Y-%m-%d").map(|date| {
date.and_hms_opt(0, 0, 0).unwrap()
})
})
Expand Down Expand Up @@ -1181,10 +1183,10 @@ pub fn transform_response<V: ValueObject>(
field_name,
{
(FieldValue::String(s), builder) => {
let date = NaiveDate::parse_from_str(s.as_str(), "%Y-%m-%d")
let date = NaiveDate::parse_from_str(s.as_ref(), "%Y-%m-%d")
// FIXME: temporary solution for cases when expected type is Date32
// but underlying data is a Timestamp
.or_else(|_| NaiveDate::parse_from_str(s.as_str(), "%Y-%m-%dT00:00:00.000"))
.or_else(|_| NaiveDate::parse_from_str(s.as_ref(), "%Y-%m-%dT00:00:00.000"))
.map_err(|e| {
DataFusionError::Execution(format!(
"Can't parse date: '{}': {}",
Expand Down Expand Up @@ -1395,11 +1397,11 @@ mod tests {
"timeDimensions": []
},
"data": [
{"KibanaSampleDataEcommerce.count": null, "KibanaSampleDataEcommerce.maxPrice": null, "KibanaSampleDataEcommerce.isBool": null, "KibanaSampleDataEcommerce.orderDate": null},
{"KibanaSampleDataEcommerce.count": 5, "KibanaSampleDataEcommerce.maxPrice": 5.05, "KibanaSampleDataEcommerce.isBool": true, "KibanaSampleDataEcommerce.orderDate": "2022-01-01 00:00:00.000"},
{"KibanaSampleDataEcommerce.count": "5", "KibanaSampleDataEcommerce.maxPrice": "5.05", "KibanaSampleDataEcommerce.isBool": false, "KibanaSampleDataEcommerce.orderDate": "2023-01-01 00:00:00.000"},
{"KibanaSampleDataEcommerce.count": null, "KibanaSampleDataEcommerce.maxPrice": null, "KibanaSampleDataEcommerce.isBool": "true", "KibanaSampleDataEcommerce.orderDate": "9999-12-31 00:00:00.000"},
{"KibanaSampleDataEcommerce.count": null, "KibanaSampleDataEcommerce.maxPrice": null, "KibanaSampleDataEcommerce.isBool": "false", "KibanaSampleDataEcommerce.orderDate": null}
{"KibanaSampleDataEcommerce.count": null, "KibanaSampleDataEcommerce.maxPrice": null, "KibanaSampleDataEcommerce.isBool": null, "KibanaSampleDataEcommerce.orderDate": null, "KibanaSampleDataEcommerce.city": "City 1"},
{"KibanaSampleDataEcommerce.count": 5, "KibanaSampleDataEcommerce.maxPrice": 5.05, "KibanaSampleDataEcommerce.isBool": true, "KibanaSampleDataEcommerce.orderDate": "2022-01-01 00:00:00.000", "KibanaSampleDataEcommerce.city": "City 2"},
{"KibanaSampleDataEcommerce.count": "5", "KibanaSampleDataEcommerce.maxPrice": "5.05", "KibanaSampleDataEcommerce.isBool": false, "KibanaSampleDataEcommerce.orderDate": "2023-01-01 00:00:00.000", "KibanaSampleDataEcommerce.city": "City 3"},
{"KibanaSampleDataEcommerce.count": null, "KibanaSampleDataEcommerce.maxPrice": null, "KibanaSampleDataEcommerce.isBool": "true", "KibanaSampleDataEcommerce.orderDate": "9999-12-31 00:00:00.000", "KibanaSampleDataEcommerce.city": "City 4"},
{"KibanaSampleDataEcommerce.count": null, "KibanaSampleDataEcommerce.maxPrice": null, "KibanaSampleDataEcommerce.isBool": "false", "KibanaSampleDataEcommerce.orderDate": null, "KibanaSampleDataEcommerce.city": null}
]
}
"#;
Expand Down Expand Up @@ -1456,6 +1458,8 @@ mod tests {

#[tokio::test]
async fn test_df_cube_scan_execute() {
assert_eq!(std::mem::size_of::<FieldValue>(), 24);

let schema = Arc::new(Schema::new(vec![
Field::new("KibanaSampleDataEcommerce.count", DataType::Utf8, false),
Field::new("KibanaSampleDataEcommerce.count", DataType::Utf8, false),
Expand All @@ -1475,6 +1479,7 @@ mod tests {
DataType::Boolean,
false,
),
Field::new("KibanaSampleDataEcommerce.city", DataType::Utf8, false),
]));

let scan_node = CubeScanExecutionPlan {
Expand All @@ -1498,6 +1503,7 @@ mod tests {
dimensions: Some(vec![
"KibanaSampleDataEcommerce.isBool".to_string(),
"KibanaSampleDataEcommerce.orderDate".to_string(),
"KibanaSampleDataEcommerce.city".to_string(),
]),
segments: None,
time_dimensions: None,
Expand Down Expand Up @@ -1577,6 +1583,13 @@ mod tests {
Some(false)
])) as ArrayRef,
Arc::new(BooleanArray::from(vec![None, None, None, None, None,])) as ArrayRef,
Arc::new(StringArray::from(vec![
Some("City 1"),
Some("City 2"),
Some("City 3"),
Some("City 4"),
None
])) as ArrayRef,
],
)
.unwrap()
Expand Down

0 comments on commit df364be

Please sign in to comment.