Skip to content

Commit

Permalink
feat(execution-engine)!: canon stream map support [fixes VM-301] (#648)
Browse files Browse the repository at this point in the history
* feat(execution-engine)!: canon stream map support [fixes VM-301]
	Canon stream map is a CRDT-like map structure that contains a
	canonicalized stream map and allows indexed access to its
	contents. ATM CSM supports both strings and int64 keys.
  • Loading branch information
raftedproc authored Sep 7, 2023
1 parent bc4cc68 commit b4cbf8f
Show file tree
Hide file tree
Showing 54 changed files with 4,142 additions and 1,150 deletions.
15 changes: 15 additions & 0 deletions air/src/execution_step/errors/uncatchable_errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
use super::Stream;
use crate::execution_step::Generation;
use crate::execution_step::STREAM_MAX_SIZE;
use crate::CanonStreamMapError;
use crate::StreamMapError;
use crate::StreamMapKeyError;
use crate::ToErrorCode;

use air_interpreter_cid::CidCalculationError;
Expand Down Expand Up @@ -102,6 +105,18 @@ pub enum UncatchableError {
/// Stream size estimate goes over a hardcoded limit.
#[error("stream size goes over the allowed limit of {STREAM_MAX_SIZE}")]
StreamSizeLimitExceeded,

/// CanonStreamMapKey related errors.
#[error(transparent)]
StreamMapKeyError(#[from] StreamMapKeyError),

/// Stream map related errors.
#[error(transparent)]
StreamMapError(#[from] StreamMapError),

/// CanonStreamMap related errors.
#[error(transparent)]
CanonStreamMapError(#[from] CanonStreamMapError),
}

impl ToErrorCode for UncatchableError {
Expand Down
32 changes: 32 additions & 0 deletions air/src/execution_step/execution_context/scalar_variables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
mod values_sparse_matrix;

use crate::execution_step::errors_prelude::*;
use crate::execution_step::value_types::CanonStreamMapWithProvenance;
use crate::execution_step::value_types::CanonStreamWithProvenance;
use crate::execution_step::value_types::ScalarRef;
use crate::execution_step::ExecutionResult;
Expand Down Expand Up @@ -88,6 +89,8 @@ pub(crate) struct Scalars<'i> {

pub(crate) canon_streams: ValuesSparseMatrix<CanonStreamWithProvenance>,

pub(crate) canon_maps: ValuesSparseMatrix<CanonStreamMapWithProvenance<'i>>,

pub(crate) iterable_variables: HashMap<String, FoldState<'i>>,
}

Expand All @@ -96,6 +99,7 @@ impl<'i> Scalars<'i> {
Self {
non_iterable_variables: ValuesSparseMatrix::new(),
canon_streams: ValuesSparseMatrix::new(),
canon_maps: ValuesSparseMatrix::new(),
iterable_variables: HashMap::new(),
}
}
Expand All @@ -116,6 +120,16 @@ impl<'i> Scalars<'i> {
self.canon_streams.set_value(name, value)
}

/// Returns true if there was a previous value for the provided key on the same
/// fold block.
pub(crate) fn set_canon_map_value<'k: 'i>(
&mut self,
name: impl Into<String>,
value: CanonStreamMapWithProvenance<'k>,
) -> ExecutionResult<bool> {
self.canon_maps.set_value(name, value)
}

pub(crate) fn set_iterable_value(
&mut self,
name: impl Into<String>,
Expand Down Expand Up @@ -158,6 +172,12 @@ impl<'i> Scalars<'i> {
.ok_or_else(|| CatchableError::VariableWasNotInitializedAfterNew(name.to_string()).into())
}

pub(crate) fn get_canon_map(&'i self, name: &str) -> ExecutionResult<&'i CanonStreamMapWithProvenance<'i>> {
self.canon_maps
.get_value(name)?
.ok_or_else(|| CatchableError::VariableWasNotInitializedAfterNew(name.to_string()).into())
}

pub(crate) fn get_value(&'i self, name: &str) -> ExecutionResult<ScalarRef<'i>> {
let value = self.get_non_iterable_scalar(name);
let iterable_value_with_prov = self.iterable_variables.get(name);
Expand All @@ -179,23 +199,27 @@ impl<'i> Scalars<'i> {
pub(crate) fn meet_fold_start(&mut self) {
self.non_iterable_variables.meet_fold_start();
self.canon_streams.meet_fold_start();
self.canon_maps.meet_fold_start();
}

// meet next before recursion
pub(crate) fn meet_next_before(&mut self) {
self.non_iterable_variables.meet_next_before();
self.canon_streams.meet_next_before();
self.canon_maps.meet_next_before();
}

// meet next after recursion
pub(crate) fn meet_next_after(&mut self) {
self.non_iterable_variables.meet_next_after();
self.canon_streams.meet_next_after();
self.canon_maps.meet_next_after();
}

pub(crate) fn meet_fold_end(&mut self) {
self.non_iterable_variables.meet_fold_end();
self.canon_streams.meet_fold_end();
self.canon_maps.meet_fold_end();
}

pub(crate) fn meet_new_start_scalar(&mut self, scalar_name: String) {
Expand All @@ -206,13 +230,21 @@ impl<'i> Scalars<'i> {
self.canon_streams.meet_new_start(canon_stream_name);
}

pub(crate) fn meet_new_start_canon_stream_map(&mut self, canon_stream_map_name: String) {
self.canon_maps.meet_new_start(canon_stream_map_name);
}

pub(crate) fn meet_new_end_scalar(&mut self, scalar_name: &str) -> ExecutionResult<()> {
self.non_iterable_variables.meet_new_end(scalar_name)
}

pub(crate) fn meet_new_end_canon_stream(&mut self, canon_name: &str) -> ExecutionResult<()> {
self.canon_streams.meet_new_end(canon_name)
}

pub(crate) fn meet_new_end_canon_stream_map(&mut self, canon_stream_map_name: &str) -> ExecutionResult<()> {
self.canon_maps.meet_new_end(canon_stream_map_name)
}
}

impl Default for Scalars<'_> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,25 @@ pub enum StreamMapError {
UnsupportedMapKeyType { variable_name: String },
}

/// CanonStreamMap related errors.
#[derive(Debug, Clone, ThisError)]
pub enum CanonStreamMapError {
#[error("can not find JValue to produce scalar from")]
NoDataToProduceScalar,
}

#[derive(Debug, Clone, ThisError)]
pub enum StreamMapKeyError {
#[error("the value must be an object with key and value fields")]
NotAnObject,

#[error("there must be a \"value\" field in kvpair object")]
ValueFieldIsAbsent,

#[error("unsupported kvpair object or map key type")]
UnsupportedKVPairObjectOrMapKeyType,
}

pub fn unsupported_map_key_type(variable_name: &str) -> StreamMapError {
StreamMapError::UnsupportedMapKeyType {
variable_name: variable_name.to_string(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,42 +14,66 @@
* limitations under the License.
*/

use crate::execution_step::execution_context::stream_maps_variables::errors::unsupported_map_key_type;
use crate::execution_step::ValueAggregate;
use crate::CatchableError;
use crate::ExecutionError;
use crate::JValue;

use serde::Serialize;
use std::borrow::Cow;
use std::fmt::Display;
use std::fmt::Formatter;

#[derive(Clone, PartialEq, Eq, Hash)]
pub(crate) static KEY_FIELD_NAME: &str = "key";

// TODO refactor the keys so that integer and string
// value domains overlap would become impossible or less harmful.
#[derive(Clone, PartialEq, Debug, Eq, Hash)]
pub(crate) enum StreamMapKey<'value> {
Str(Cow<'value, str>),
U64(u64),
I64(i64),
}

impl<'value> StreamMapKey<'value> {
pub(crate) fn from_value(value: JValue, map_name: &str) -> Result<Self, ExecutionError> {
pub fn from_value(value: JValue) -> Option<Self> {
match value {
JValue::String(s) => Ok(StreamMapKey::Str(Cow::Owned(s))),
JValue::Number(n) if n.is_i64() => Ok(StreamMapKey::I64(n.as_i64().unwrap())),
JValue::Number(n) if n.is_u64() => Ok(StreamMapKey::U64(n.as_u64().unwrap())),
_ => Err(CatchableError::StreamMapError(unsupported_map_key_type(map_name)).into()),
JValue::String(s) => Some(StreamMapKey::Str(Cow::Owned(s))),
JValue::Number(n) if n.is_i64() => Some(StreamMapKey::I64(n.as_i64().unwrap())),
JValue::Number(n) if n.is_u64() => Some(StreamMapKey::U64(n.as_u64().unwrap())),
_ => None,
}
}

pub(crate) fn from_kvpair(value: &'value ValueAggregate) -> Option<Self> {
let object = value.get_result().as_object()?;
let key = object.get("key")?;
match key {
pub fn from_value_ref(value: &'value JValue) -> Option<Self> {
match value {
JValue::String(s) => Some(StreamMapKey::Str(Cow::Borrowed(s.as_str()))),
JValue::Number(n) if n.is_i64() => Some(StreamMapKey::I64(n.as_i64().unwrap())),
JValue::Number(n) if n.is_u64() => Some(StreamMapKey::U64(n.as_u64().unwrap())),
_ => None,
}
}

pub(crate) fn from_kvpair_owned(value: &ValueAggregate) -> Option<Self> {
let object = value.get_result().as_object()?;
let key = object.get(KEY_FIELD_NAME)?.clone();
StreamMapKey::from_value(key)
}

pub(crate) fn from_kvpair(value: &'value ValueAggregate) -> Option<Self> {
let object = value.get_result().as_object()?;
let key = object.get(KEY_FIELD_NAME)?;
StreamMapKey::from_value_ref(key)
}

pub(crate) fn into_owned(self) -> StreamMapKey<'static> {
match self {
StreamMapKey::Str(s) => {
let s = s.to_string();
StreamMapKey::Str(Cow::Owned(s))
}
StreamMapKey::U64(n) => StreamMapKey::U64(n),
StreamMapKey::I64(n) => StreamMapKey::I64(n),
}
}
}

impl From<i64> for StreamMapKey<'_> {
Expand All @@ -64,12 +88,27 @@ impl From<u64> for StreamMapKey<'_> {
}
}

// TODO unify all types.
// This conversion is used to cast from numeric lambda accessor that leverages u32
// however larpop parser grammar uses i64 for numeric keys inserting into a stream map.
impl From<u32> for StreamMapKey<'_> {
fn from(value: u32) -> Self {
StreamMapKey::I64(value.into())
}
}

impl<'value> From<&'value str> for StreamMapKey<'value> {
fn from(value: &'value str) -> Self {
StreamMapKey::Str(Cow::Borrowed(value))
}
}

impl From<String> for StreamMapKey<'static> {
fn from(value: String) -> Self {
StreamMapKey::Str(Cow::Owned(value))
}
}

impl<'value> Serialize for StreamMapKey<'value> {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
Expand All @@ -82,3 +121,15 @@ impl<'value> Serialize for StreamMapKey<'value> {
}
}
}

// This trait impl proposefully prints numbers the same way as strings
// to use it in map-to-scalar cast.
impl<'value> Display for StreamMapKey<'value> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
StreamMapKey::Str(s) => write!(f, "{}", s),
StreamMapKey::U64(n) => write!(f, "{}", n),
StreamMapKey::I64(n) => write!(f, "{}", n),
}
}
}
43 changes: 43 additions & 0 deletions air/src/execution_step/instructions/ap/apply_to_arguments.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ pub(crate) fn apply_to_arg(
ScalarWithLambda(scalar) => apply_scalar_wl(scalar, exec_ctx, trace_ctx),
CanonStream(canon_stream) => apply_canon_stream(canon_stream, exec_ctx, trace_ctx),
CanonStreamWithLambda(canon_stream) => apply_canon_stream_wl(canon_stream, exec_ctx, trace_ctx),
CanonStreamMap(canon_stream_map) => apply_canon_stream_map(canon_stream_map, exec_ctx, trace_ctx),
CanonStreamMapWithLambda(canon_stream_map) => apply_canon_stream_map_wl(canon_stream_map, exec_ctx, trace_ctx),
}?;

Ok(result)
Expand Down Expand Up @@ -180,3 +182,44 @@ fn apply_canon_stream_wl(
let result = ValueAggregate::new(result.into_owned().into(), tetraplet.into(), position, provenance);
Ok(result)
}

fn apply_canon_stream_map(
ast_canon_stream_map: &ast::CanonStreamMap<'_>,
exec_ctx: &ExecutionCtx<'_>,
trace_ctx: &TraceHandler,
) -> ExecutionResult<ValueAggregate> {
use crate::execution_step::value_types::JValuable;

let canon_stream_map = exec_ctx.scalars.get_canon_map(ast_canon_stream_map.name)?;
let value = JValuable::as_jvalue(&&canon_stream_map.canon_stream_map).into_owned();
let tetraplet = canon_stream_map.tetraplet();
let position = trace_ctx.trace_pos().map_err(UncatchableError::from)?;
let value = CanonResultAggregate::new(
Rc::new(value),
tetraplet.peer_pk.as_str().into(),
&tetraplet.json_path,
position,
);
let result = ValueAggregate::from_canon_result(value, canon_stream_map.cid.clone());
Ok(result)
}

fn apply_canon_stream_map_wl(
ast_canon_stream_map: &ast::CanonStreamMapWithLambda<'_>,
exec_ctx: &ExecutionCtx<'_>,
trace_ctx: &TraceHandler,
) -> ExecutionResult<ValueAggregate> {
use crate::execution_step::value_types::JValuable;

let canon_stream_map = exec_ctx.scalars.get_canon_map(ast_canon_stream_map.name)?;
let cid = canon_stream_map.cid.clone();
let lambda = &ast_canon_stream_map.lambda;
let canon_stream_map = &canon_stream_map.canon_stream_map;

let (result, tetraplet, provenance) =
JValuable::apply_lambda_with_tetraplets(&canon_stream_map, lambda, exec_ctx, &Provenance::canon(cid))?;
let position = trace_ctx.trace_pos().map_err(UncatchableError::from)?;

let result = ValueAggregate::new(result.into_owned().into(), tetraplet.into(), position, provenance);
Ok(result)
}
25 changes: 10 additions & 15 deletions air/src/execution_step/instructions/ap_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ use crate::ExecutionError;

use air_interpreter_data::ApResult;
use air_parser::ast::ApMap;
use air_parser::ast::ApMapKey;
use air_parser::ast::StreamMap;
use air_parser::ast::StreamMapKeyClause;
use air_trace_handler::merger::MergerApResult;

impl<'i> super::ExecutableInstruction<'i> for ApMap<'i> {
Expand All @@ -44,7 +44,7 @@ impl<'i> super::ExecutableInstruction<'i> for ApMap<'i> {
let result = apply_to_arg(&self.value, exec_ctx, trace_ctx, true)?;

let merger_ap_result = to_merger_ap_map_result(&self, trace_ctx)?;
let key = resolve_if_needed(&self.key, exec_ctx, self.map.name)?;
let key = resolve_key_if_needed(&self.key, exec_ctx, self.map.name)?;
populate_context(key, &self.map, &merger_ap_result, result, exec_ctx)?;
trace_ctx.meet_ap_end(ApResult::stub());

Expand All @@ -68,22 +68,17 @@ fn populate_context<'ctx>(
exec_ctx.stream_maps.add_stream_map_value(key, value_descriptor)
}

fn resolve_if_needed<'ctx>(
key: &ApMapKey<'ctx>,
fn resolve_key_if_needed<'ctx>(
key: &StreamMapKeyClause<'ctx>,
exec_ctx: &mut ExecutionCtx<'ctx>,
map_name: &str,
) -> Result<StreamMapKey<'ctx>, ExecutionError> {
use air_parser::ast::Number;

match key {
&ApMapKey::Literal(s) => Ok(s.into()),
ApMapKey::Number(n) => match n {
&Number::Int(i) => Ok(i.into()),
Number::Float(_) => Err(CatchableError::StreamMapError(unsupported_map_key_type(map_name)).into()),
},
ApMapKey::Scalar(s) => resolve(s, exec_ctx, map_name),
ApMapKey::ScalarWithLambda(s) => resolve(s, exec_ctx, map_name),
ApMapKey::CanonStreamWithLambda(c) => resolve(c, exec_ctx, map_name),
&StreamMapKeyClause::Literal(s) => Ok(s.into()),
StreamMapKeyClause::Int(i) => Ok(i.to_owned().into()),
StreamMapKeyClause::Scalar(s) => resolve(s, exec_ctx, map_name),
StreamMapKeyClause::ScalarWithLambda(s) => resolve(s, exec_ctx, map_name),
StreamMapKeyClause::CanonStreamWithLambda(c) => resolve(c, exec_ctx, map_name),
}
}

Expand All @@ -93,5 +88,5 @@ fn resolve<'ctx>(
map_name: &str,
) -> Result<StreamMapKey<'ctx>, ExecutionError> {
let (value, _, _) = resolvable.resolve(exec_ctx)?;
StreamMapKey::from_value(value, map_name)
StreamMapKey::from_value(value).ok_or(CatchableError::StreamMapError(unsupported_map_key_type(map_name)).into())
}
Loading

0 comments on commit b4cbf8f

Please sign in to comment.