diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 8d697568..a9b560a3 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -16,11 +16,12 @@ use crate::types::{ColumnId, LogicalType}; use crate::utils::lru::SharedLruCache; use bytes::Bytes; use itertools::Itertools; -use std::collections::{Bound, VecDeque}; +use std::collections::Bound; use std::io::Cursor; use std::mem; use std::ops::SubAssign; use std::sync::Arc; +use std::vec::IntoIter; use ulid::Generator; pub(crate) type StatisticsMetaCache = SharedLruCache<(TableName, IndexId), StatisticsMeta>; @@ -127,8 +128,8 @@ pub trait Transaction: Sized { tx: self, }, inner, - ranges: VecDeque::from(ranges), - scope_iter: None, + ranges: ranges.into_iter(), + state: IndexIterState::Init, }) } @@ -965,8 +966,14 @@ pub struct IndexIter<'a, T: Transaction> { params: IndexImplParams<'a, T>, inner: IndexImplEnum, // for buffering data - ranges: VecDeque, - scope_iter: Option>, + ranges: IntoIter, + state: IndexIterState<'a, T>, +} + +pub enum IndexIterState<'a, T: Transaction + 'a> { + Init, + Range(T::IterType<'a>), + Over, } impl<'a, T: Transaction + 'a> IndexIter<'a, T> { @@ -985,102 +992,111 @@ impl<'a, T: Transaction + 'a> IndexIter<'a, T> { num.sub_assign(1); } } - - fn is_empty(&self) -> bool { - self.scope_iter.is_none() && self.ranges.is_empty() - } } /// expression -> index value -> tuple impl Iter for IndexIter<'_, T> { fn next_tuple(&mut self) -> Result, DatabaseError> { - if matches!(self.limit, Some(0)) || self.is_empty() { - self.scope_iter = None; - self.ranges.clear(); + if matches!(self.limit, Some(0)) { + self.state = IndexIterState::Over; return Ok(None); } - if let Some(iter) = &mut self.scope_iter { - while let Some((_, bytes)) = iter.try_next()? { - if Self::offset_move(&mut self.offset) { - continue; - } - Self::limit_sub(&mut self.limit); - let tuple = self - .inner - .index_lookup(&bytes, &mut self.id_builder, &self.params)?; - - return Ok(Some(tuple)); - } - self.scope_iter = None; - } - - if let Some(binary) = self.ranges.pop_front() { - match binary { - Range::Scope { min, max } => { - let table_name = self.params.table_name; - let index_meta = &self.params.index_meta; - let bound_encode = |bound: Bound| -> Result<_, DatabaseError> { - match bound { - Bound::Included(mut val) => { - val = self.params.try_cast(val)?; - - Ok(Bound::Included(self.inner.bound_key(&self.params, &val)?)) - } - Bound::Excluded(mut val) => { - val = self.params.try_cast(val)?; - - Ok(Bound::Excluded(self.inner.bound_key(&self.params, &val)?)) - } - Bound::Unbounded => Ok(Bound::Unbounded), - } + loop { + match &mut self.state { + IndexIterState::Init => { + let Some(binary) = self.ranges.next() else { + self.state = IndexIterState::Over; + continue; }; - let (bound_min, bound_max) = - if matches!(index_meta.ty, IndexType::PrimaryKey { .. }) { - TableCodec::tuple_bound(table_name) - } else { - TableCodec::index_bound(table_name, &index_meta.id)? - }; - let check_bound = |value: &mut Bound>, bound: Vec| { - if matches!(value, Bound::Unbounded) { - let _ = mem::replace(value, Bound::Included(bound)); + match binary { + Range::Scope { min, max } => { + let table_name = self.params.table_name; + let index_meta = &self.params.index_meta; + let bound_encode = + |bound: Bound| -> Result<_, DatabaseError> { + match bound { + Bound::Included(mut val) => { + val = self.params.try_cast(val)?; + + Ok(Bound::Included( + self.inner.bound_key(&self.params, &val)?, + )) + } + Bound::Excluded(mut val) => { + val = self.params.try_cast(val)?; + + Ok(Bound::Excluded( + self.inner.bound_key(&self.params, &val)?, + )) + } + Bound::Unbounded => Ok(Bound::Unbounded), + } + }; + let (bound_min, bound_max) = + if matches!(index_meta.ty, IndexType::PrimaryKey { .. }) { + TableCodec::tuple_bound(table_name) + } else { + TableCodec::index_bound(table_name, &index_meta.id)? + }; + let check_bound = |value: &mut Bound>, bound: Vec| { + if matches!(value, Bound::Unbounded) { + let _ = mem::replace(value, Bound::Included(bound)); + } + }; + + let mut encode_min = bound_encode(min)?; + check_bound(&mut encode_min, bound_min); + + let mut encode_max = bound_encode(max)?; + check_bound(&mut encode_max, bound_max); + + let iter = self.params.tx.range( + encode_min.as_ref().map(Vec::as_slice), + encode_max.as_ref().map(Vec::as_slice), + )?; + self.state = IndexIterState::Range(iter); } - }; - - let mut encode_min = bound_encode(min)?; - check_bound(&mut encode_min, bound_min); - - let mut encode_max = bound_encode(max)?; - check_bound(&mut encode_max, bound_max); - - let iter = self.params.tx.range( - encode_min.as_ref().map(Vec::as_slice), - encode_max.as_ref().map(Vec::as_slice), - )?; - self.scope_iter = Some(iter); - } - Range::Eq(mut val) => { - val = self.params.try_cast(val)?; - - match self - .inner - .eq_to_res(&val, &mut self.id_builder, &self.params)? - { - IndexResult::Tuple(tuple) => { - if Self::offset_move(&mut self.offset) { - return self.next_tuple(); + Range::Eq(mut val) => { + val = self.params.try_cast(val)?; + + match self + .inner + .eq_to_res(&val, &mut self.id_builder, &self.params)? + { + IndexResult::Tuple(tuple) => { + if Self::offset_move(&mut self.offset) { + continue; + } + Self::limit_sub(&mut self.limit); + return Ok(tuple); + } + IndexResult::Scope(iter) => { + self.state = IndexIterState::Range(iter); + } } - Self::limit_sub(&mut self.limit); - return Ok(tuple); } - IndexResult::Scope(iter) => self.scope_iter = Some(iter), + _ => (), + } + } + IndexIterState::Range(iter) => { + while let Some((_, bytes)) = iter.try_next()? { + if Self::offset_move(&mut self.offset) { + continue; + } + Self::limit_sub(&mut self.limit); + let tuple = + self.inner + .index_lookup(&bytes, &mut self.id_builder, &self.params)?; + + return Ok(Some(tuple)); } + self.state = IndexIterState::Init; } - _ => (), + IndexIterState::Over => return Ok(None), } } - self.next_tuple() } } diff --git a/tpcc/src/main.rs b/tpcc/src/main.rs index c539dfcb..a4315c9f 100644 --- a/tpcc/src/main.rs +++ b/tpcc/src/main.rs @@ -10,10 +10,10 @@ use clap::Parser; use fnck_sql::db::{DBTransaction, DataBaseBuilder, Statement}; use fnck_sql::errors::DatabaseError; use fnck_sql::storage::Storage; +use fnck_sql::types::tuple::create_table; use rand::prelude::ThreadRng; use rand::Rng; use std::time::{Duration, Instant}; -use fnck_sql::types::tuple::create_table; mod delivery; mod load;