From 47b107a86e780e6f657f638034a484ec1d2d634c Mon Sep 17 00:00:00 2001 From: Kould Date: Thu, 5 Dec 2024 02:37:40 +0800 Subject: [PATCH] chore: make `IndexIter` stateful to enhance readability --- src/expression/range_detacher.rs | 24 ++- .../rule/normalization/pushdown_predicates.rs | 2 +- src/storage/mod.rs | 184 ++++++++++-------- src/storage/rocksdb.rs | 12 +- src/types/value.rs | 13 +- tpcc/src/main.rs | 4 +- 6 files changed, 136 insertions(+), 103 deletions(-) diff --git a/src/expression/range_detacher.rs b/src/expression/range_detacher.rs index 135f8620..9c77c719 100644 --- a/src/expression/range_detacher.rs +++ b/src/expression/range_detacher.rs @@ -1922,7 +1922,23 @@ mod test { Range::Eq(DataValue::Int32(Some(7))), Range::Eq(DataValue::Int32(Some(10))), ]); - println!("{:#?}", range); + assert_eq!( + range, + Some(Range::Scope { + min: Bound::Included(DataValue::Tuple(Some(( + vec![ + DataValue::Int32(Some(7)), + DataValue::Int32(Some(10)), + DataValue::Int32(Some(2)) + ], + false + )))), + max: Bound::Excluded(DataValue::Tuple(Some(( + vec![DataValue::Int32(Some(7)), DataValue::Int32(Some(10))], + true + )))), + }) + ); let Range::Scope { min: Bound::Included(min), max: Bound::Excluded(max), @@ -1930,7 +1946,9 @@ mod test { else { unreachable!() }; - let evaluator = TupleLtBinaryEvaluator; - println!("{:#?}", evaluator.binary_eval(&min, &max)); + assert_eq!( + TupleLtBinaryEvaluator.binary_eval(&min, &max), + DataValue::Boolean(Some(true)) + ) } } diff --git a/src/optimizer/rule/normalization/pushdown_predicates.rs b/src/optimizer/rule/normalization/pushdown_predicates.rs index b6334776..1fcef798 100644 --- a/src/optimizer/rule/normalization/pushdown_predicates.rs +++ b/src/optimizer/rule/normalization/pushdown_predicates.rs @@ -271,7 +271,7 @@ impl PushPredicateIntoScan { match range { Range::Eq(DataValue::Tuple(Some((values, _)))) => { let min = - Bound::Included(DataValue::Tuple(Some((values.clone(), false)))); + Bound::Excluded(DataValue::Tuple(Some((values.clone(), false)))); let max = Bound::Excluded(DataValue::Tuple(Some((values, true)))); Range::Scope { min, max } diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 8d697568..a9b560a3 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -16,11 +16,12 @@ use crate::types::{ColumnId, LogicalType}; use crate::utils::lru::SharedLruCache; use bytes::Bytes; use itertools::Itertools; -use std::collections::{Bound, VecDeque}; +use std::collections::Bound; use std::io::Cursor; use std::mem; use std::ops::SubAssign; use std::sync::Arc; +use std::vec::IntoIter; use ulid::Generator; pub(crate) type StatisticsMetaCache = SharedLruCache<(TableName, IndexId), StatisticsMeta>; @@ -127,8 +128,8 @@ pub trait Transaction: Sized { tx: self, }, inner, - ranges: VecDeque::from(ranges), - scope_iter: None, + ranges: ranges.into_iter(), + state: IndexIterState::Init, }) } @@ -965,8 +966,14 @@ pub struct IndexIter<'a, T: Transaction> { params: IndexImplParams<'a, T>, inner: IndexImplEnum, // for buffering data - ranges: VecDeque, - scope_iter: Option>, + ranges: IntoIter, + state: IndexIterState<'a, T>, +} + +pub enum IndexIterState<'a, T: Transaction + 'a> { + Init, + Range(T::IterType<'a>), + Over, } impl<'a, T: Transaction + 'a> IndexIter<'a, T> { @@ -985,102 +992,111 @@ impl<'a, T: Transaction + 'a> IndexIter<'a, T> { num.sub_assign(1); } } - - fn is_empty(&self) -> bool { - self.scope_iter.is_none() && self.ranges.is_empty() - } } /// expression -> index value -> tuple impl Iter for IndexIter<'_, T> { fn next_tuple(&mut self) -> Result, DatabaseError> { - if matches!(self.limit, Some(0)) || self.is_empty() { - self.scope_iter = None; - self.ranges.clear(); + if matches!(self.limit, Some(0)) { + self.state = IndexIterState::Over; return Ok(None); } - if let Some(iter) = &mut self.scope_iter { - while let Some((_, bytes)) = iter.try_next()? { - if Self::offset_move(&mut self.offset) { - continue; - } - Self::limit_sub(&mut self.limit); - let tuple = self - .inner - .index_lookup(&bytes, &mut self.id_builder, &self.params)?; - - return Ok(Some(tuple)); - } - self.scope_iter = None; - } - - if let Some(binary) = self.ranges.pop_front() { - match binary { - Range::Scope { min, max } => { - let table_name = self.params.table_name; - let index_meta = &self.params.index_meta; - let bound_encode = |bound: Bound| -> Result<_, DatabaseError> { - match bound { - Bound::Included(mut val) => { - val = self.params.try_cast(val)?; - - Ok(Bound::Included(self.inner.bound_key(&self.params, &val)?)) - } - Bound::Excluded(mut val) => { - val = self.params.try_cast(val)?; - - Ok(Bound::Excluded(self.inner.bound_key(&self.params, &val)?)) - } - Bound::Unbounded => Ok(Bound::Unbounded), - } + loop { + match &mut self.state { + IndexIterState::Init => { + let Some(binary) = self.ranges.next() else { + self.state = IndexIterState::Over; + continue; }; - let (bound_min, bound_max) = - if matches!(index_meta.ty, IndexType::PrimaryKey { .. }) { - TableCodec::tuple_bound(table_name) - } else { - TableCodec::index_bound(table_name, &index_meta.id)? - }; - let check_bound = |value: &mut Bound>, bound: Vec| { - if matches!(value, Bound::Unbounded) { - let _ = mem::replace(value, Bound::Included(bound)); + match binary { + Range::Scope { min, max } => { + let table_name = self.params.table_name; + let index_meta = &self.params.index_meta; + let bound_encode = + |bound: Bound| -> Result<_, DatabaseError> { + match bound { + Bound::Included(mut val) => { + val = self.params.try_cast(val)?; + + Ok(Bound::Included( + self.inner.bound_key(&self.params, &val)?, + )) + } + Bound::Excluded(mut val) => { + val = self.params.try_cast(val)?; + + Ok(Bound::Excluded( + self.inner.bound_key(&self.params, &val)?, + )) + } + Bound::Unbounded => Ok(Bound::Unbounded), + } + }; + let (bound_min, bound_max) = + if matches!(index_meta.ty, IndexType::PrimaryKey { .. }) { + TableCodec::tuple_bound(table_name) + } else { + TableCodec::index_bound(table_name, &index_meta.id)? + }; + let check_bound = |value: &mut Bound>, bound: Vec| { + if matches!(value, Bound::Unbounded) { + let _ = mem::replace(value, Bound::Included(bound)); + } + }; + + let mut encode_min = bound_encode(min)?; + check_bound(&mut encode_min, bound_min); + + let mut encode_max = bound_encode(max)?; + check_bound(&mut encode_max, bound_max); + + let iter = self.params.tx.range( + encode_min.as_ref().map(Vec::as_slice), + encode_max.as_ref().map(Vec::as_slice), + )?; + self.state = IndexIterState::Range(iter); } - }; - - let mut encode_min = bound_encode(min)?; - check_bound(&mut encode_min, bound_min); - - let mut encode_max = bound_encode(max)?; - check_bound(&mut encode_max, bound_max); - - let iter = self.params.tx.range( - encode_min.as_ref().map(Vec::as_slice), - encode_max.as_ref().map(Vec::as_slice), - )?; - self.scope_iter = Some(iter); - } - Range::Eq(mut val) => { - val = self.params.try_cast(val)?; - - match self - .inner - .eq_to_res(&val, &mut self.id_builder, &self.params)? - { - IndexResult::Tuple(tuple) => { - if Self::offset_move(&mut self.offset) { - return self.next_tuple(); + Range::Eq(mut val) => { + val = self.params.try_cast(val)?; + + match self + .inner + .eq_to_res(&val, &mut self.id_builder, &self.params)? + { + IndexResult::Tuple(tuple) => { + if Self::offset_move(&mut self.offset) { + continue; + } + Self::limit_sub(&mut self.limit); + return Ok(tuple); + } + IndexResult::Scope(iter) => { + self.state = IndexIterState::Range(iter); + } } - Self::limit_sub(&mut self.limit); - return Ok(tuple); } - IndexResult::Scope(iter) => self.scope_iter = Some(iter), + _ => (), + } + } + IndexIterState::Range(iter) => { + while let Some((_, bytes)) = iter.try_next()? { + if Self::offset_move(&mut self.offset) { + continue; + } + Self::limit_sub(&mut self.limit); + let tuple = + self.inner + .index_lookup(&bytes, &mut self.id_builder, &self.params)?; + + return Ok(Some(tuple)); } + self.state = IndexIterState::Init; } - _ => (), + IndexIterState::Over => return Ok(None), } } - self.next_tuple() } } diff --git a/src/storage/rocksdb.rs b/src/storage/rocksdb.rs index 7bd51e32..552142da 100644 --- a/src/storage/rocksdb.rs +++ b/src/storage/rocksdb.rs @@ -139,7 +139,8 @@ mod test { use crate::expression::range_detacher::Range; use crate::storage::rocksdb::RocksStorage; use crate::storage::{ - IndexImplEnum, IndexImplParams, IndexIter, Iter, PrimaryKeyIndexImpl, Storage, Transaction, + IndexImplEnum, IndexImplParams, IndexIter, IndexIterState, Iter, PrimaryKeyIndexImpl, + Storage, Transaction, }; use crate::types::index::{IndexMeta, IndexType}; use crate::types::tuple::Tuple; @@ -148,7 +149,7 @@ mod test { use crate::types::LogicalType; use crate::utils::lru::SharedLruCache; use itertools::Itertools; - use std::collections::{Bound, VecDeque}; + use std::collections::Bound; use std::hash::RandomState; use std::sync::Arc; use tempfile::TempDir; @@ -267,14 +268,15 @@ mod test { table_types: table.types(), tx: &transaction, }, - ranges: VecDeque::from(vec![ + ranges: vec![ Range::Eq(DataValue::Int32(Some(0))), Range::Scope { min: Bound::Included(DataValue::Int32(Some(2))), max: Bound::Included(DataValue::Int32(Some(4))), }, - ]), - scope_iter: None, + ] + .into_iter(), + state: IndexIterState::Init, inner: IndexImplEnum::PrimaryKey(PrimaryKeyIndexImpl), }; let mut result = Vec::new(); diff --git a/src/types/value.rs b/src/types/value.rs index 77fa2fe0..78ff424a 100644 --- a/src/types/value.rs +++ b/src/types/value.rs @@ -823,19 +823,16 @@ impl DataValue { DataValue::Null => (), DataValue::Decimal(Some(_v)) => todo!(), DataValue::Tuple(Some((values, is_upper))) => { - for v in values.iter() { + let last = values.len() - 1; + + for (i, v) in values.iter().enumerate() { v.memcomparable_encode(b)?; - if v.is_null() && *is_upper { + if (v.is_null() || i == last) && *is_upper { b.push(BOUND_MAX_TAG); } else { b.push(BOUND_MIN_TAG); } } - if *is_upper && !values.is_empty() { - if let Some(v) = b.last_mut() { - *v = BOUND_MAX_TAG - } - } } value => { if !value.is_null() { @@ -1688,7 +1685,7 @@ impl fmt::Debug for DataValue { DataValue::Tuple(_) => { write!(f, "Tuple({}", self)?; if matches!(self, DataValue::Tuple(Some((_, true)))) { - write!(f, " [is_upper]")?; + write!(f, " [is upper]")?; } write!(f, ")") } diff --git a/tpcc/src/main.rs b/tpcc/src/main.rs index c539dfcb..4d753364 100644 --- a/tpcc/src/main.rs +++ b/tpcc/src/main.rs @@ -10,10 +10,10 @@ use clap::Parser; use fnck_sql::db::{DBTransaction, DataBaseBuilder, Statement}; use fnck_sql::errors::DatabaseError; use fnck_sql::storage::Storage; +use fnck_sql::types::tuple::create_table; use rand::prelude::ThreadRng; use rand::Rng; use std::time::{Duration, Instant}; -use fnck_sql::types::tuple::create_table; mod delivery; mod load; @@ -320,7 +320,7 @@ pub enum TpccError { } #[test] -fn debug_tpcc() -> Result<(), DatabaseError> { +fn explain_tpcc() -> Result<(), DatabaseError> { let database = DataBaseBuilder::path("./fnck_sql_tpcc").build()?; let mut tx = database.new_transaction()?;