Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(cubesql): CubeScan - don't clone strings for non stream response #8633

Merged
merged 5 commits into from
Aug 26, 2024
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 43 additions & 30 deletions rust/cubesql/cubesql/src/compile/engine/df/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -446,8 +446,11 @@ struct CubeScanExecutionPlan {
}

#[derive(Debug)]
pub enum FieldValue {
pub enum FieldValue<'a> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add meaningful name for lifetime, like 'str/`'string' please

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe it's pretty much the convention to have it as 'a at least when you only have one lifetime to care about. It's pretty easy to follow with one lifetime.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With Cow<'str, str> is not an option because it's meme

// Neon doesn't allow us to build string reference, because V8 uses UTF-16, while Rust uses UTF-8
String(String),
// Best variant for us with strings, because we dont need to clone string
StringRef(&'a String),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not reference to str? I don't think reference to String gets us anything functionally, but ties to specific allocation pattern.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

'str cannot be used because I combine branches via |. In Rust, it is a requirement to align them to the same type for usage.

Number(f64),
Bool(bool),
Null,
Expand Down Expand Up @@ -475,31 +478,28 @@ impl ValueObject for JsonValueObject {
Ok(self.rows.len())
}

fn get<'a>(
&'a mut self,
fn get(
&mut self,
index: usize,
field_name: &str,
) -> std::result::Result<FieldValue, CubeError> {
let option = self.rows[index].as_object_mut();
let as_object = if let Some(as_object) = option {
let as_object = if let Some(as_object) = self.rows[index].as_object() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can be represented as let Some(as_object) = ... else {}

as_object
} else {
return Err(CubeError::user(format!(
"Unexpected response from Cube, row is not an object: {:?}",
self.rows[index]
)));
};
let value = as_object
.get(field_name)
.unwrap_or(&Value::Null)
// TODO expose strings as references to avoid clonning
.clone();

let value = as_object.get(field_name).unwrap_or(&Value::Null);

Ok(match value {
Value::String(s) => FieldValue::String(s),
Value::String(s) => FieldValue::StringRef(s),
Value::Number(n) => FieldValue::Number(n.as_f64().ok_or(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is an additional boost here (we don't clone it), while n is used as a reference.

image

DataFusionError::Execution(format!("Can't convert {:?} to float", n)),
)?),
Value::Bool(b) => FieldValue::Bool(b),
Value::Bool(b) => FieldValue::Bool(*b),
Value::Null => FieldValue::Null,
x => {
return Err(CubeError::user(format!(
Expand Down Expand Up @@ -851,12 +851,15 @@ async fn load_data(
.map(|v| v.iter().filter(|d| d.granularity.is_some()).count())
.unwrap_or(0)
== 0;

let result = if no_members_query {
let limit = request.limit.unwrap_or(1);
let mut data = Vec::new();

for _ in 0..limit {
data.push(serde_json::Value::Null)
}

V1LoadResult::new(
V1LoadResultAnnotation {
measures: json!(Vec::<serde_json::Value>::new()),
Expand All @@ -881,9 +884,9 @@ async fn load_data(

data
} else {
return Err(ArrowError::ComputeError(format!(
"Unable to extract result from Cube.js response",
)));
return Err(ArrowError::ComputeError(
"Unable to extract result from Cube.js response, empty results".to_string(),
));
}
};

Expand Down Expand Up @@ -945,6 +948,7 @@ pub fn transform_response<V: ValueObject>(
field_name,
{
(FieldValue::String(v), builder) => builder.append_value(v)?,
(FieldValue::StringRef(v), builder) => builder.append_value(v)?,
(FieldValue::Bool(v), builder) => builder.append_value(if v { "true" } else { "false" })?,
(FieldValue::Number(v), builder) => builder.append_value(v.to_string())?,
},
Expand All @@ -961,7 +965,7 @@ pub fn transform_response<V: ValueObject>(
field_name,
{
(FieldValue::Number(number), builder) => builder.append_value(number.round() as i16)?,
(FieldValue::String(s), builder) => match s.parse::<i16>() {
(FieldValue::String(ref s), builder) | (FieldValue::StringRef(&ref s), builder) => match s.parse::<i16>() {
Ok(v) => builder.append_value(v)?,
Err(error) => {
warn!(
Expand All @@ -986,7 +990,7 @@ pub fn transform_response<V: ValueObject>(
field_name,
{
(FieldValue::Number(number), builder) => builder.append_value(number.round() as i32)?,
(FieldValue::String(s), builder) => match s.parse::<i32>() {
(FieldValue::String(ref s), builder) | (FieldValue::StringRef(&ref s), builder) => match s.parse::<i32>() {
Ok(v) => builder.append_value(v)?,
Err(error) => {
warn!(
Expand All @@ -1011,7 +1015,7 @@ pub fn transform_response<V: ValueObject>(
field_name,
{
(FieldValue::Number(number), builder) => builder.append_value(number.round() as i64)?,
(FieldValue::String(s), builder) => match s.parse::<i64>() {
(FieldValue::String(ref s), builder) | (FieldValue::StringRef(&ref s), builder) => match s.parse::<i64>() {
Ok(v) => builder.append_value(v)?,
Err(error) => {
warn!(
Expand All @@ -1036,7 +1040,7 @@ pub fn transform_response<V: ValueObject>(
field_name,
{
(FieldValue::Number(number), builder) => builder.append_value(number as f32)?,
(FieldValue::String(s), builder) => match s.parse::<f32>() {
(FieldValue::String(ref s), builder) | (FieldValue::StringRef(&ref s), builder) => match s.parse::<f32>() {
Ok(v) => builder.append_value(v)?,
Err(error) => {
warn!(
Expand All @@ -1061,7 +1065,7 @@ pub fn transform_response<V: ValueObject>(
field_name,
{
(FieldValue::Number(number), builder) => builder.append_value(number)?,
(FieldValue::String(s), builder) => match s.parse::<f64>() {
(FieldValue::String(ref s), builder) | (FieldValue::StringRef(&ref s), builder) => match s.parse::<f64>() {
Ok(v) => builder.append_value(v)?,
Err(error) => {
warn!(
Expand All @@ -1086,7 +1090,7 @@ pub fn transform_response<V: ValueObject>(
field_name,
{
(FieldValue::Bool(v), builder) => builder.append_value(v)?,
(FieldValue::String(v), builder) => match v.as_str() {
(FieldValue::String(ref v), builder) | (FieldValue::StringRef(&ref v), builder) => match v.as_str() {
"true" | "1" => builder.append_value(true)?,
"false" | "0" => builder.append_value(false)?,
_ => {
Expand All @@ -1108,7 +1112,7 @@ pub fn transform_response<V: ValueObject>(
response,
field_name,
{
(FieldValue::String(s), builder) => {
(FieldValue::String(ref s), builder) | (FieldValue::StringRef(&ref s), builder) => {
let timestamp = NaiveDateTime::parse_from_str(s.as_str(), "%Y-%m-%dT%H:%M:%S%.f")
.or_else(|_| NaiveDateTime::parse_from_str(s.as_str(), "%Y-%m-%d %H:%M:%S%.f"))
.or_else(|_| NaiveDateTime::parse_from_str(s.as_str(), "%Y-%m-%dT%H:%M:%S"))
Expand Down Expand Up @@ -1144,7 +1148,7 @@ pub fn transform_response<V: ValueObject>(
response,
field_name,
{
(FieldValue::String(s), builder) => {
(FieldValue::String(ref s), builder) | (FieldValue::StringRef(&ref s), builder) => {
let timestamp = NaiveDateTime::parse_from_str(s.as_str(), "%Y-%m-%dT%H:%M:%S%.f")
.or_else(|_| NaiveDateTime::parse_from_str(s.as_str(), "%Y-%m-%d %H:%M:%S%.f"))
.or_else(|_| NaiveDateTime::parse_from_str(s.as_str(), "%Y-%m-%dT%H:%M:%S"))
Expand Down Expand Up @@ -1180,7 +1184,7 @@ pub fn transform_response<V: ValueObject>(
response,
field_name,
{
(FieldValue::String(s), builder) => {
(FieldValue::String(ref s), builder) | (FieldValue::StringRef(&ref s), builder) => {
let date = NaiveDate::parse_from_str(s.as_str(), "%Y-%m-%d")
// FIXME: temporary solution for cases when expected type is Date32
// but underlying data is a Timestamp
Expand Down Expand Up @@ -1224,7 +1228,7 @@ pub fn transform_response<V: ValueObject>(
response,
field_name,
{
(FieldValue::String(s), builder) => {
(FieldValue::String(ref s), builder) | (FieldValue::StringRef(&ref s), builder) => {
let mut parts = s.split(".");
match parts.next() {
None => builder.append_null()?,
Expand Down Expand Up @@ -1395,11 +1399,11 @@ mod tests {
"timeDimensions": []
},
"data": [
{"KibanaSampleDataEcommerce.count": null, "KibanaSampleDataEcommerce.maxPrice": null, "KibanaSampleDataEcommerce.isBool": null, "KibanaSampleDataEcommerce.orderDate": null},
{"KibanaSampleDataEcommerce.count": 5, "KibanaSampleDataEcommerce.maxPrice": 5.05, "KibanaSampleDataEcommerce.isBool": true, "KibanaSampleDataEcommerce.orderDate": "2022-01-01 00:00:00.000"},
{"KibanaSampleDataEcommerce.count": "5", "KibanaSampleDataEcommerce.maxPrice": "5.05", "KibanaSampleDataEcommerce.isBool": false, "KibanaSampleDataEcommerce.orderDate": "2023-01-01 00:00:00.000"},
{"KibanaSampleDataEcommerce.count": null, "KibanaSampleDataEcommerce.maxPrice": null, "KibanaSampleDataEcommerce.isBool": "true", "KibanaSampleDataEcommerce.orderDate": "9999-12-31 00:00:00.000"},
{"KibanaSampleDataEcommerce.count": null, "KibanaSampleDataEcommerce.maxPrice": null, "KibanaSampleDataEcommerce.isBool": "false", "KibanaSampleDataEcommerce.orderDate": null}
{"KibanaSampleDataEcommerce.count": null, "KibanaSampleDataEcommerce.maxPrice": null, "KibanaSampleDataEcommerce.isBool": null, "KibanaSampleDataEcommerce.orderDate": null, "KibanaSampleDataEcommerce.city": "City 1"},
{"KibanaSampleDataEcommerce.count": 5, "KibanaSampleDataEcommerce.maxPrice": 5.05, "KibanaSampleDataEcommerce.isBool": true, "KibanaSampleDataEcommerce.orderDate": "2022-01-01 00:00:00.000", "KibanaSampleDataEcommerce.city": "City 2"},
{"KibanaSampleDataEcommerce.count": "5", "KibanaSampleDataEcommerce.maxPrice": "5.05", "KibanaSampleDataEcommerce.isBool": false, "KibanaSampleDataEcommerce.orderDate": "2023-01-01 00:00:00.000", "KibanaSampleDataEcommerce.city": "City 3"},
{"KibanaSampleDataEcommerce.count": null, "KibanaSampleDataEcommerce.maxPrice": null, "KibanaSampleDataEcommerce.isBool": "true", "KibanaSampleDataEcommerce.orderDate": "9999-12-31 00:00:00.000", "KibanaSampleDataEcommerce.city": "City 4"},
{"KibanaSampleDataEcommerce.count": null, "KibanaSampleDataEcommerce.maxPrice": null, "KibanaSampleDataEcommerce.isBool": "false", "KibanaSampleDataEcommerce.orderDate": null, "KibanaSampleDataEcommerce.city": null}
]
}
"#;
Expand Down Expand Up @@ -1475,6 +1479,7 @@ mod tests {
DataType::Boolean,
false,
),
Field::new("KibanaSampleDataEcommerce.city", DataType::Utf8, false),
]));

let scan_node = CubeScanExecutionPlan {
Expand All @@ -1498,6 +1503,7 @@ mod tests {
dimensions: Some(vec![
"KibanaSampleDataEcommerce.isBool".to_string(),
"KibanaSampleDataEcommerce.orderDate".to_string(),
"KibanaSampleDataEcommerce.city".to_string(),
]),
segments: None,
time_dimensions: None,
Expand Down Expand Up @@ -1577,6 +1583,13 @@ mod tests {
Some(false)
])) as ArrayRef,
Arc::new(BooleanArray::from(vec![None, None, None, None, None,])) as ArrayRef,
Arc::new(StringArray::from(vec![
Some("City 1"),
Some("City 2"),
Some("City 3"),
Some("City 4"),
None
])) as ArrayRef,
],
)
.unwrap()
Expand Down
Loading