Skip to content

Commit

Permalink
chore: simplify Tuple::deserialize_from & ScalaExpression::eval &…
Browse files Browse the repository at this point in the history
… `HepGraph::node_iter` & `HepOptimizer::apply_batch`
  • Loading branch information
KKould committed Dec 12, 2024
1 parent 80ae6e6 commit 1f8a3f2
Show file tree
Hide file tree
Showing 32 changed files with 181 additions and 297 deletions.
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

[package]
name = "fnck_sql"
version = "0.0.8"
version = "0.0.9"
edition = "2021"
authors = ["Kould <[email protected]>", "Xwg <[email protected]>"]
description = "SQL as a Function for Rust"
Expand Down Expand Up @@ -41,6 +41,7 @@ chrono = { version = "0.4" }
comfy-table = { version = "7" }
csv = { version = "1" }
dirs = { version = "5" }
fixedbitset = { version = "0.4" }
itertools = { version = "0.12" }
ordered-float = { version = "4" }
paste = { version = "1" }
Expand Down
3 changes: 1 addition & 2 deletions src/catalog/column.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use crate::catalog::TableName;
use crate::errors::DatabaseError;
use crate::expression::ScalarExpression;
use crate::types::tuple::EMPTY_TUPLE;
use crate::types::value::DataValue;
use crate::types::{ColumnId, LogicalType};
use fnck_sql_serde_macros::ReferenceSerialization;
Expand Down Expand Up @@ -170,7 +169,7 @@ impl ColumnCatalog {
self.desc
.default
.as_ref()
.map(|expr| expr.eval(&EMPTY_TUPLE, &[]))
.map(|expr| expr.eval(None))
.transpose()
}

Expand Down
2 changes: 1 addition & 1 deletion src/execution/dml/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for Update {
}
for (i, column) in input_schema.iter().enumerate() {
if let Some(expr) = exprs_map.get(&column.id()) {
tuple.values[i] = throw!(expr.eval(&tuple, &input_schema));
tuple.values[i] = throw!(expr.eval(Some((&tuple, &input_schema))));
}
}
tuple.clear_id();
Expand Down
4 changes: 2 additions & 2 deletions src/execution/dql/aggregate/hash_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,14 @@ impl<'a, T: Transaction + 'a> ReadExecutor<'a, T> for HashAggExecutor {
if args.len() > 1 {
throw!(Err(DatabaseError::UnsupportedStmt("currently aggregate functions only support a single Column as a parameter".to_string())))
}
values.push(throw!(args[0].eval(&tuple, &schema_ref)));
values.push(throw!(args[0].eval(Some((&tuple, &schema_ref)))));
} else {
unreachable!()
}
}
let group_keys: Vec<DataValue> = throw!(groupby_exprs
.iter()
.map(|expr| expr.eval(&tuple, &schema_ref))
.map(|expr| expr.eval(Some((&tuple, &schema_ref))))
.try_collect());

let entry = match group_hash_accs.entry(group_keys) {
Expand Down
3 changes: 2 additions & 1 deletion src/execution/dql/aggregate/simple_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ impl<'a, T: Transaction + 'a> ReadExecutor<'a, T> for SimpleAggExecutor {
let values: Vec<DataValue> = throw!(agg_calls
.iter()
.map(|expr| match expr {
ScalarExpression::AggCall { args, .. } => args[0].eval(&tuple, &schema),
ScalarExpression::AggCall { args, .. } =>
args[0].eval(Some((&tuple, &schema))),
_ => unreachable!(),
})
.try_collect());
Expand Down
2 changes: 1 addition & 1 deletion src/execution/dql/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ impl<'a, T: Transaction + 'a> ReadExecutor<'a, T> for Filter {
while let CoroutineState::Yielded(tuple) = Pin::new(&mut coroutine).resume(()) {
let tuple = throw!(tuple);

if throw!(throw!(predicate.eval(&tuple, &schema)).is_true()) {
if throw!(throw!(predicate.eval(Some((&tuple, &schema)))).is_true()) {
yield Ok(tuple);
}
}
Expand Down
12 changes: 6 additions & 6 deletions src/execution/dql/join/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ use crate::storage::{StatisticsMetaCache, TableCache, Transaction, ViewCache};
use crate::throw;
use crate::types::tuple::{Schema, Tuple};
use crate::types::value::{DataValue, NULL_VALUE};
use crate::utils::bit_vector::BitVector;
use ahash::{HashMap, HashMapExt};
use fixedbitset::FixedBitSet;
use itertools::Itertools;
use std::ops::Coroutine;
use std::ops::CoroutineState;
Expand Down Expand Up @@ -49,7 +49,7 @@ impl HashJoin {
let mut values = Vec::with_capacity(on_keys.len());

for expr in on_keys {
values.push(expr.eval(tuple, schema)?);
values.push(expr.eval(Some((tuple, schema)))?);
}
Ok(values)
}
Expand All @@ -62,7 +62,7 @@ impl HashJoin {
left_schema_len: usize,
) -> Result<Option<Tuple>, DatabaseError> {
if let (Some(expr), false) = (filter, matches!(join_ty, JoinType::Full | JoinType::Cross)) {
match &expr.eval(&tuple, schema)? {
match &expr.eval(Some((&tuple, schema)))? {
DataValue::Boolean(Some(false) | None) => {
let full_schema_len = schema.len();

Expand Down Expand Up @@ -193,7 +193,7 @@ impl<'a, T: Transaction + 'a> ReadExecutor<'a, T> for HashJoin {
if *is_filtered {
continue;
} else {
bits_option = Some(BitVector::new(tuples.len()));
bits_option = Some(FixedBitSet::with_capacity(tuples.len()));
}
}
JoinType::LeftAnti => continue,
Expand All @@ -214,7 +214,7 @@ impl<'a, T: Transaction + 'a> ReadExecutor<'a, T> for HashJoin {
left_schema_len
)) {
if let Some(bits) = bits_option.as_mut() {
bits.set_bit(i, true);
bits.insert(i);
} else {
yield Ok(tuple);
}
Expand All @@ -223,7 +223,7 @@ impl<'a, T: Transaction + 'a> ReadExecutor<'a, T> for HashJoin {
if let Some(bits) = bits_option {
let mut cnt = 0;
tuples.retain(|_| {
let res = bits.get_bit(cnt);
let res = bits.contains(cnt);
cnt += 1;
res
});
Expand Down
15 changes: 8 additions & 7 deletions src/execution/dql/join/nested_loop_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use crate::storage::{StatisticsMetaCache, TableCache, Transaction, ViewCache};
use crate::throw;
use crate::types::tuple::{Schema, SchemaRef, Tuple};
use crate::types::value::{DataValue, NULL_VALUE};
use crate::utils::bit_vector::BitVector;
use fixedbitset::FixedBitSet;
use itertools::Itertools;
use std::ops::Coroutine;
use std::ops::CoroutineState;
Expand Down Expand Up @@ -146,7 +146,7 @@ impl<'a, T: Transaction + 'a> ReadExecutor<'a, T> for NestedLoopJoin {

let right_schema_len = eq_cond.right_schema.len();
let mut left_coroutine = build_read(left_input, cache, transaction);
let mut bitmap: Option<BitVector> = None;
let mut bitmap: Option<FixedBitSet> = None;
let mut first_matches = Vec::new();

while let CoroutineState::Yielded(left_tuple) =
Expand Down Expand Up @@ -177,7 +177,8 @@ impl<'a, T: Transaction + 'a> ReadExecutor<'a, T> for NestedLoopJoin {
}
(Some(filter), true) => {
let new_tuple = Self::merge_tuple(&left_tuple, &right_tuple, &ty);
let value = throw!(filter.eval(&new_tuple, &output_schema_ref));
let value =
throw!(filter.eval(Some((&new_tuple, &output_schema_ref))));
match &value {
DataValue::Boolean(Some(true)) => {
let tuple = match ty {
Expand Down Expand Up @@ -215,7 +216,7 @@ impl<'a, T: Transaction + 'a> ReadExecutor<'a, T> for NestedLoopJoin {
break;
}
if let Some(bits) = bitmap.as_mut() {
bits.set_bit(right_idx, true);
bits.insert(right_idx);
} else if matches!(ty, JoinType::Full) {
first_matches.push(right_idx);
}
Expand All @@ -227,7 +228,7 @@ impl<'a, T: Transaction + 'a> ReadExecutor<'a, T> for NestedLoopJoin {
}

if matches!(self.ty, JoinType::Full) && bitmap.is_none() {
bitmap = Some(BitVector::new(right_idx));
bitmap = Some(FixedBitSet::with_capacity(right_idx));
}

// handle no matched tuple case
Expand Down Expand Up @@ -256,15 +257,15 @@ impl<'a, T: Transaction + 'a> ReadExecutor<'a, T> for NestedLoopJoin {

if matches!(ty, JoinType::Full) {
for idx in first_matches.into_iter() {
bitmap.as_mut().unwrap().set_bit(idx, true);
bitmap.as_mut().unwrap().insert(idx);
}

let mut right_coroutine = build_read(right_input.clone(), cache, transaction);
let mut idx = 0;
while let CoroutineState::Yielded(right_tuple) =
Pin::new(&mut right_coroutine).resume(())
{
if !bitmap.as_ref().unwrap().get_bit(idx) {
if !bitmap.as_ref().unwrap().contains(idx) {
let mut right_tuple: Tuple = throw!(right_tuple);
let mut values = vec![NULL_VALUE.clone(); right_schema_len];
values.append(&mut right_tuple.values);
Expand Down
2 changes: 1 addition & 1 deletion src/execution/dql/projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ impl Projection {
let mut values = Vec::with_capacity(exprs.len());

for expr in exprs.iter() {
values.push(expr.eval(tuple, schmea)?);
values.push(expr.eval(Some((tuple, schmea)))?);
}
Ok(values)
}
Expand Down
5 changes: 3 additions & 2 deletions src/execution/dql/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,8 @@ impl SortBy {
let mut key = BumpBytes::new_in(arena);
let tuple = tuple.as_ref().map(|(_, tuple)| tuple).unwrap();

expr.eval(tuple, schema)?.memcomparable_encode(&mut key)?;
expr.eval(Some((tuple, schema)))?
.memcomparable_encode(&mut key)?;
if !asc {
for byte in key.iter_mut() {
*byte ^= 0xFF;
Expand Down Expand Up @@ -169,7 +170,7 @@ impl SortBy {
debug_assert!(tuple.is_some());

let (_, tuple) = tuple.as_ref().unwrap();
eval_values[x].push(expr.eval(tuple, schema)?);
eval_values[x].push(expr.eval(Some((tuple, schema)))?);
}
}

Expand Down
Loading

0 comments on commit 1f8a3f2

Please sign in to comment.