Skip to content

Commit

Permalink
chore: make IndexIter stateful to enhance readability
Browse files Browse the repository at this point in the history
  • Loading branch information
KKould committed Dec 4, 2024
1 parent 60526e1 commit 47b107a
Show file tree
Hide file tree
Showing 6 changed files with 136 additions and 103 deletions.
24 changes: 21 additions & 3 deletions src/expression/range_detacher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1922,15 +1922,33 @@ mod test {
Range::Eq(DataValue::Int32(Some(7))),
Range::Eq(DataValue::Int32(Some(10))),
]);
println!("{:#?}", range);
assert_eq!(
range,
Some(Range::Scope {
min: Bound::Included(DataValue::Tuple(Some((
vec![
DataValue::Int32(Some(7)),
DataValue::Int32(Some(10)),
DataValue::Int32(Some(2))
],
false
)))),
max: Bound::Excluded(DataValue::Tuple(Some((
vec![DataValue::Int32(Some(7)), DataValue::Int32(Some(10))],
true
)))),
})
);
let Range::Scope {
min: Bound::Included(min),
max: Bound::Excluded(max),
} = range.unwrap()
else {
unreachable!()
};
let evaluator = TupleLtBinaryEvaluator;
println!("{:#?}", evaluator.binary_eval(&min, &max));
assert_eq!(
TupleLtBinaryEvaluator.binary_eval(&min, &max),
DataValue::Boolean(Some(true))
)
}
}
2 changes: 1 addition & 1 deletion src/optimizer/rule/normalization/pushdown_predicates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ impl PushPredicateIntoScan {
match range {
Range::Eq(DataValue::Tuple(Some((values, _)))) => {
let min =
Bound::Included(DataValue::Tuple(Some((values.clone(), false))));
Bound::Excluded(DataValue::Tuple(Some((values.clone(), false))));
let max = Bound::Excluded(DataValue::Tuple(Some((values, true))));

Range::Scope { min, max }
Expand Down
184 changes: 100 additions & 84 deletions src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,12 @@ use crate::types::{ColumnId, LogicalType};
use crate::utils::lru::SharedLruCache;
use bytes::Bytes;
use itertools::Itertools;
use std::collections::{Bound, VecDeque};
use std::collections::Bound;
use std::io::Cursor;
use std::mem;
use std::ops::SubAssign;
use std::sync::Arc;
use std::vec::IntoIter;
use ulid::Generator;

pub(crate) type StatisticsMetaCache = SharedLruCache<(TableName, IndexId), StatisticsMeta>;
Expand Down Expand Up @@ -127,8 +128,8 @@ pub trait Transaction: Sized {
tx: self,
},
inner,
ranges: VecDeque::from(ranges),
scope_iter: None,
ranges: ranges.into_iter(),
state: IndexIterState::Init,
})
}

Expand Down Expand Up @@ -965,8 +966,14 @@ pub struct IndexIter<'a, T: Transaction> {
params: IndexImplParams<'a, T>,
inner: IndexImplEnum,
// for buffering data
ranges: VecDeque<Range>,
scope_iter: Option<T::IterType<'a>>,
ranges: IntoIter<Range>,
state: IndexIterState<'a, T>,
}

pub enum IndexIterState<'a, T: Transaction + 'a> {
Init,
Range(T::IterType<'a>),
Over,
}

impl<'a, T: Transaction + 'a> IndexIter<'a, T> {
Expand All @@ -985,102 +992,111 @@ impl<'a, T: Transaction + 'a> IndexIter<'a, T> {
num.sub_assign(1);
}
}

fn is_empty(&self) -> bool {
self.scope_iter.is_none() && self.ranges.is_empty()
}
}

/// expression -> index value -> tuple
impl<T: Transaction> Iter for IndexIter<'_, T> {
fn next_tuple(&mut self) -> Result<Option<Tuple>, DatabaseError> {
if matches!(self.limit, Some(0)) || self.is_empty() {
self.scope_iter = None;
self.ranges.clear();
if matches!(self.limit, Some(0)) {
self.state = IndexIterState::Over;

return Ok(None);
}

if let Some(iter) = &mut self.scope_iter {
while let Some((_, bytes)) = iter.try_next()? {
if Self::offset_move(&mut self.offset) {
continue;
}
Self::limit_sub(&mut self.limit);
let tuple = self
.inner
.index_lookup(&bytes, &mut self.id_builder, &self.params)?;

return Ok(Some(tuple));
}
self.scope_iter = None;
}

if let Some(binary) = self.ranges.pop_front() {
match binary {
Range::Scope { min, max } => {
let table_name = self.params.table_name;
let index_meta = &self.params.index_meta;
let bound_encode = |bound: Bound<DataValue>| -> Result<_, DatabaseError> {
match bound {
Bound::Included(mut val) => {
val = self.params.try_cast(val)?;

Ok(Bound::Included(self.inner.bound_key(&self.params, &val)?))
}
Bound::Excluded(mut val) => {
val = self.params.try_cast(val)?;

Ok(Bound::Excluded(self.inner.bound_key(&self.params, &val)?))
}
Bound::Unbounded => Ok(Bound::Unbounded),
}
loop {
match &mut self.state {
IndexIterState::Init => {
let Some(binary) = self.ranges.next() else {
self.state = IndexIterState::Over;
continue;
};
let (bound_min, bound_max) =
if matches!(index_meta.ty, IndexType::PrimaryKey { .. }) {
TableCodec::tuple_bound(table_name)
} else {
TableCodec::index_bound(table_name, &index_meta.id)?
};
let check_bound = |value: &mut Bound<Vec<u8>>, bound: Vec<u8>| {
if matches!(value, Bound::Unbounded) {
let _ = mem::replace(value, Bound::Included(bound));
match binary {
Range::Scope { min, max } => {
let table_name = self.params.table_name;
let index_meta = &self.params.index_meta;
let bound_encode =
|bound: Bound<DataValue>| -> Result<_, DatabaseError> {
match bound {
Bound::Included(mut val) => {
val = self.params.try_cast(val)?;

Ok(Bound::Included(
self.inner.bound_key(&self.params, &val)?,
))
}
Bound::Excluded(mut val) => {
val = self.params.try_cast(val)?;

Ok(Bound::Excluded(
self.inner.bound_key(&self.params, &val)?,
))
}
Bound::Unbounded => Ok(Bound::Unbounded),
}
};
let (bound_min, bound_max) =
if matches!(index_meta.ty, IndexType::PrimaryKey { .. }) {
TableCodec::tuple_bound(table_name)
} else {
TableCodec::index_bound(table_name, &index_meta.id)?
};
let check_bound = |value: &mut Bound<Vec<u8>>, bound: Vec<u8>| {
if matches!(value, Bound::Unbounded) {
let _ = mem::replace(value, Bound::Included(bound));
}
};

let mut encode_min = bound_encode(min)?;
check_bound(&mut encode_min, bound_min);

let mut encode_max = bound_encode(max)?;
check_bound(&mut encode_max, bound_max);

let iter = self.params.tx.range(
encode_min.as_ref().map(Vec::as_slice),
encode_max.as_ref().map(Vec::as_slice),
)?;
self.state = IndexIterState::Range(iter);
}
};

let mut encode_min = bound_encode(min)?;
check_bound(&mut encode_min, bound_min);

let mut encode_max = bound_encode(max)?;
check_bound(&mut encode_max, bound_max);

let iter = self.params.tx.range(
encode_min.as_ref().map(Vec::as_slice),
encode_max.as_ref().map(Vec::as_slice),
)?;
self.scope_iter = Some(iter);
}
Range::Eq(mut val) => {
val = self.params.try_cast(val)?;

match self
.inner
.eq_to_res(&val, &mut self.id_builder, &self.params)?
{
IndexResult::Tuple(tuple) => {
if Self::offset_move(&mut self.offset) {
return self.next_tuple();
Range::Eq(mut val) => {
val = self.params.try_cast(val)?;

match self
.inner
.eq_to_res(&val, &mut self.id_builder, &self.params)?
{
IndexResult::Tuple(tuple) => {
if Self::offset_move(&mut self.offset) {
continue;
}
Self::limit_sub(&mut self.limit);
return Ok(tuple);
}
IndexResult::Scope(iter) => {
self.state = IndexIterState::Range(iter);
}
}
Self::limit_sub(&mut self.limit);
return Ok(tuple);
}
IndexResult::Scope(iter) => self.scope_iter = Some(iter),
_ => (),
}
}
IndexIterState::Range(iter) => {
while let Some((_, bytes)) = iter.try_next()? {
if Self::offset_move(&mut self.offset) {
continue;
}
Self::limit_sub(&mut self.limit);
let tuple =
self.inner
.index_lookup(&bytes, &mut self.id_builder, &self.params)?;

return Ok(Some(tuple));
}
self.state = IndexIterState::Init;
}
_ => (),
IndexIterState::Over => return Ok(None),
}
}
self.next_tuple()
}
}

Expand Down
12 changes: 7 additions & 5 deletions src/storage/rocksdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,8 @@ mod test {
use crate::expression::range_detacher::Range;
use crate::storage::rocksdb::RocksStorage;
use crate::storage::{
IndexImplEnum, IndexImplParams, IndexIter, Iter, PrimaryKeyIndexImpl, Storage, Transaction,
IndexImplEnum, IndexImplParams, IndexIter, IndexIterState, Iter, PrimaryKeyIndexImpl,
Storage, Transaction,
};
use crate::types::index::{IndexMeta, IndexType};
use crate::types::tuple::Tuple;
Expand All @@ -148,7 +149,7 @@ mod test {
use crate::types::LogicalType;
use crate::utils::lru::SharedLruCache;
use itertools::Itertools;
use std::collections::{Bound, VecDeque};
use std::collections::Bound;
use std::hash::RandomState;
use std::sync::Arc;
use tempfile::TempDir;
Expand Down Expand Up @@ -267,14 +268,15 @@ mod test {
table_types: table.types(),
tx: &transaction,
},
ranges: VecDeque::from(vec![
ranges: vec![
Range::Eq(DataValue::Int32(Some(0))),
Range::Scope {
min: Bound::Included(DataValue::Int32(Some(2))),
max: Bound::Included(DataValue::Int32(Some(4))),
},
]),
scope_iter: None,
]
.into_iter(),
state: IndexIterState::Init,
inner: IndexImplEnum::PrimaryKey(PrimaryKeyIndexImpl),
};
let mut result = Vec::new();
Expand Down
13 changes: 5 additions & 8 deletions src/types/value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -823,19 +823,16 @@ impl DataValue {
DataValue::Null => (),
DataValue::Decimal(Some(_v)) => todo!(),
DataValue::Tuple(Some((values, is_upper))) => {
for v in values.iter() {
let last = values.len() - 1;

for (i, v) in values.iter().enumerate() {
v.memcomparable_encode(b)?;
if v.is_null() && *is_upper {
if (v.is_null() || i == last) && *is_upper {
b.push(BOUND_MAX_TAG);
} else {
b.push(BOUND_MIN_TAG);
}
}
if *is_upper && !values.is_empty() {
if let Some(v) = b.last_mut() {
*v = BOUND_MAX_TAG
}
}
}
value => {
if !value.is_null() {
Expand Down Expand Up @@ -1688,7 +1685,7 @@ impl fmt::Debug for DataValue {
DataValue::Tuple(_) => {
write!(f, "Tuple({}", self)?;
if matches!(self, DataValue::Tuple(Some((_, true)))) {
write!(f, " [is_upper]")?;
write!(f, " [is upper]")?;
}
write!(f, ")")
}
Expand Down
4 changes: 2 additions & 2 deletions tpcc/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ use clap::Parser;
use fnck_sql::db::{DBTransaction, DataBaseBuilder, Statement};
use fnck_sql::errors::DatabaseError;
use fnck_sql::storage::Storage;
use fnck_sql::types::tuple::create_table;
use rand::prelude::ThreadRng;
use rand::Rng;
use std::time::{Duration, Instant};
use fnck_sql::types::tuple::create_table;

mod delivery;
mod load;
Expand Down Expand Up @@ -320,7 +320,7 @@ pub enum TpccError {
}

#[test]
fn debug_tpcc() -> Result<(), DatabaseError> {
fn explain_tpcc() -> Result<(), DatabaseError> {
let database = DataBaseBuilder::path("./fnck_sql_tpcc").build()?;
let mut tx = database.new_transaction()?;

Expand Down

0 comments on commit 47b107a

Please sign in to comment.