Skip to content

Commit

Permalink
feat(execution-engine)!: refactor streams [fixes VM-255] (#621)
Browse files Browse the repository at this point in the history
Refactored stream and stream generation a lot, it introduces the following changes:
- no generation in data anymore, AquaVM relies on generation inside data to stay valid and places value accordingly to it
- stream is internally divided into previous, current, and new values, before, it was one array for all of them
- recursive streams cursors are refactored and rely on new generation values instead
- the Generation enum was refactored and now contains the source of the generation
  • Loading branch information
mikevoronov authored Aug 3, 2023
1 parent 3843da5 commit eca52b7
Show file tree
Hide file tree
Showing 51 changed files with 1,542 additions and 1,195 deletions.
7 changes: 7 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions air/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ serde_json = "1.0.95"
concat-idents = "1.1.4"
maplit = "1.0.2"
non-empty-vec = "0.2.3"
typed-index-collections = "3.1.0"
log = "0.4.17"
once_cell = "1.17.1"
thiserror = "1.0.40"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
use super::Iterable;
use super::IterableItem;
use crate::execution_step::boxed_value::CanonStream;
use crate::execution_step::boxed_value::TracePosOperate;
use crate::foldable_next;
use crate::foldable_prev;

Expand Down
9 changes: 8 additions & 1 deletion air/src/execution_step/boxed_value/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,24 @@ mod stream;
mod stream_map;
mod utils;

pub type Stream = stream::Stream<ValueAggregate>;

pub(crate) use canon_stream::*;
pub(crate) use iterable::*;
pub(crate) use jvaluable::*;
pub(crate) use scalar::CanonResultAggregate;
pub(crate) use scalar::LiteralAggregate;
pub(crate) use scalar::ScalarRef;
pub(crate) use scalar::ServiceResultAggregate;
pub(crate) use scalar::TracePosOperate;
pub(crate) use scalar::ValueAggregate;

pub(crate) use stream::Generation;
pub(crate) use stream::Stream;
pub(crate) use stream::IterableValue;
pub(crate) use stream::RecursiveCursorState;
pub(crate) use stream::RecursiveStreamCursor;
pub(crate) use stream_map::StreamMap;

pub(crate) use utils::populate_tetraplet_with_lambda;

use super::ExecutionResult;
41 changes: 25 additions & 16 deletions air/src/execution_step/boxed_value/scalar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ pub enum ValueAggregate {
provenance_cid: Rc<CID<CanonResultCidAggregate>>,
},
}

pub(crate) enum ScalarRef<'i> {
Value(&'i ValueAggregate),
IterableValue(&'i FoldState<'i>),
Expand Down Expand Up @@ -176,7 +177,29 @@ impl ValueAggregate {
}
}

pub fn get_trace_pos(&self) -> TracePos {
pub fn get_provenance(&self) -> Provenance {
match self {
ValueAggregate::Literal(_) => Provenance::Literal,
ValueAggregate::ServiceResult {
result: _,
provenance_cid: cid,
} => Provenance::ServiceResult { cid: cid.clone() },
ValueAggregate::Canon {
result: _,
provenance_cid: cid,
} => Provenance::Canon { cid: cid.clone() },
}
}
}

pub trait TracePosOperate {
fn get_trace_pos(&self) -> TracePos;

fn set_trace_pos(&mut self, pos: TracePos);
}

impl TracePosOperate for ValueAggregate {
fn get_trace_pos(&self) -> TracePos {
match self {
ValueAggregate::Literal(literal) => literal.trace_pos,
ValueAggregate::ServiceResult {
Expand All @@ -190,7 +213,7 @@ impl ValueAggregate {
}
}

pub fn set_trace_pos(&mut self, trace_pos: TracePos) {
fn set_trace_pos(&mut self, trace_pos: TracePos) {
let trace_pos_ref = match self {
ValueAggregate::Literal(literal) => &mut literal.trace_pos,
ValueAggregate::ServiceResult {
Expand All @@ -204,20 +227,6 @@ impl ValueAggregate {
};
*trace_pos_ref = trace_pos;
}

pub fn get_provenance(&self) -> Provenance {
match self {
ValueAggregate::Literal(_) => Provenance::Literal,
ValueAggregate::ServiceResult {
result: _,
provenance_cid: cid,
} => Provenance::ServiceResult { cid: cid.clone() },
ValueAggregate::Canon {
result: _,
provenance_cid: cid,
} => Provenance::Canon { cid: cid.clone() },
}
}
}

use std::fmt;
Expand Down
Loading

0 comments on commit eca52b7

Please sign in to comment.