From c4cd24db0b258ae0c59931f3e0ce909eb4424fda Mon Sep 17 00:00:00 2001 From: Michael-F-Bryan Date: Sat, 4 Jun 2022 03:00:21 +0800 Subject: [PATCH 1/6] Moved ProcBlockNode and ModelNode to their own modules --- crates/runtime/src/zune.rs | 1282 ------------------------- crates/runtime/src/zune/mod.rs | 502 ++++++++++ crates/runtime/src/zune/proc_block.rs | 556 +++++++++++ crates/runtime/src/zune/tflite.rs | 266 +++++ 4 files changed, 1324 insertions(+), 1282 deletions(-) delete mode 100644 crates/runtime/src/zune.rs create mode 100644 crates/runtime/src/zune/mod.rs create mode 100644 crates/runtime/src/zune/proc_block.rs create mode 100644 crates/runtime/src/zune/tflite.rs diff --git a/crates/runtime/src/zune.rs b/crates/runtime/src/zune.rs deleted file mode 100644 index 9141bd32ac..0000000000 --- a/crates/runtime/src/zune.rs +++ /dev/null @@ -1,1282 +0,0 @@ -use std::{ - borrow::Cow, - collections::{HashMap, HashSet}, - io::{Cursor, Read}, - sync::{Arc, Mutex}, -}; - -use anyhow::{anyhow, Context, Error}; -use hotg_rune_compiler::parse::yaml::*; -use hotg_rune_core::TFLITE_MIMETYPE; -use hotg_runecoral::{ - AccelerationBackend, ElementType as RuneCoralElementType, InferenceContext, - Tensor as RuneCoralTensor, TensorDescriptor as RuneCoralTensorDescriptor, - TensorMut as RuneCoralTensorMut, -}; -use indexmap::IndexMap; -use wasmer::{ImportObject, Module, Store}; -use zip; - -pub use self::{proc_block_v1::*, runtime_v1::*}; -use crate::LoadError; - -wit_bindgen_wasmer::export!("../../wit-files/rune/runtime-v1.wit"); -wit_bindgen_wasmer::import!("../../wit-files/rune/proc-block-v1.wit"); - -#[derive(Debug, Default, Clone, wasmer::WasmerEnv)] -struct Runtime { - shared_state: Arc>, -} - -#[derive(Debug, Default)] -struct State { - tensors: Vec>, - tensor_constraints: Vec>, - graph_contexts: HashMap, -} - -struct ModelNode { - context: InferenceContext, - input_tensors: HashSet, - output_tensors: HashSet, - shared_state: Arc>, -} - -struct ProcBlockNode { - node_id: String, - context: ProcBlockV1, - shared_state: Arc>, -} - -pub struct ZuneEngine { - input_nodes: Vec, - output_nodes: Vec, - models: HashMap, - procblocks: HashMap, - pipeline: IndexMap, - processing_order: Vec, - shared_state: Arc>, // resources -} - -impl ZuneEngine { - #[tracing::instrument(skip_all)] - pub fn load(binary: &[u8]) -> Result - where - Self: Sized, - { - let mut archive = zip::ZipArchive::new(Cursor::new(binary)) - .context("Unable to load Zune")?; - - let mut read_zip_resource_by_path = - |path: &str| -> Result, Error> { - let mut requested_file = - archive.by_name(path).with_context(|| { - anyhow!("Unable to find {} in zune", path) - })?; - let mut buffer = Vec::new(); - requested_file.read_to_end(&mut buffer).with_context(|| { - anyhow!("Unable to read {} from zune", path) - })?; - Ok(buffer) - }; - - let runefile = - String::from_utf8(read_zip_resource_by_path("Runefile.yml")?) - .context("Unable to read Runefile")?; - tracing::debug!(length = runefile.len(), "Read the Rune"); - - let parsed_runefile = - Document::parse(&runefile).context("Unable to parse Runefile")?; - let pipeline = &parsed_runefile.to_v1().pipeline; - - let inputs: Vec<_> = pipeline - .iter() - .filter_map(|(k, v)| match v { - Stage::Capability(_) => Some(k.clone()), - _ => None, - }) - .collect(); - - let outputs: Vec<_> = pipeline - .iter() - .filter_map(|(k, v)| match v { - Stage::Out(_) => Some(k.clone()), - _ => None, - }) - .collect(); - - let (tensors, input_tensors, output_tensors, processing_order) = - get_tensors(&inputs, &outputs, &pipeline) - .context(anyhow!("Unable to map out input/output tensors"))?; - - let graph_contexts = pipeline - .iter() - .map(|(k, v)| { - let arguments = v - .args() - .iter() - .map(|(name, argument)| { - (name.clone(), argument.to_string()) - }) - .collect(); - ( - k.clone(), - GraphContext { - arguments, - input_tensors: HashMap::new(), - output_tensors: HashMap::new(), - }, - ) - }) - .collect(); - - let tensor_constraints = tensors.iter().map(|_| None).collect(); - let shared_state = Arc::new(Mutex::new(State { - tensors, - tensor_constraints, - graph_contexts, - })); - - tracing::trace!(?input_tensors, ?output_tensors, "Loaded tensors"); - - let (model_contexts, procblock_contexts) = instantiate_nodes( - pipeline, - read_zip_resource_by_path, - &shared_state, - input_tensors, - output_tensors, - ) - .map_err(LoadError::Other)?; - - tracing::debug!(order=?processing_order, "Determined the execution order"); - - // TODO: Validate and allocate input/output tensors - - Ok(ZuneEngine { - input_nodes: inputs, - output_nodes: outputs, - models: model_contexts, - procblocks: procblock_contexts, - pipeline: pipeline.to_owned(), - processing_order, - shared_state, - }) - } - - #[tracing::instrument(skip_all)] - pub fn predict(&mut self) -> Result<(), Error> { - for stage_name in &self.processing_order { - let _span = - tracing::debug_span!("Running Stage", %stage_name).entered(); - - let stage = self.pipeline.get(stage_name).unwrap(); - match stage { - Stage::Model(_) => { - self.models.get_mut(stage_name).unwrap().run()?; - }, - Stage::Capability(_) | Stage::ProcBlock(_) => { - self.procblocks.get_mut(stage_name).unwrap().run()?; - }, - _ => {}, - } - } - Ok(()) - } - - pub fn input_nodes(&self) -> &'_ Vec { - return &self.input_nodes; - } - - pub fn output_nodes(&self) -> &'_ Vec { - return &self.output_nodes; - } - - pub fn get_input_tensor_names( - &self, - node_name: &str, - ) -> Result, Error> { - let state = self.shared_state.lock().unwrap(); - state - .graph_contexts - .get(node_name) - .and_then(|c| { - let tensor_list: Vec = c - .input_tensors - .iter() - .map(|(k, _)| k.to_string()) - .collect(); - Some(tensor_list) - }) - .ok_or(anyhow!("Unable to get input tensors")) - } - - pub fn get_input_tensor( - &mut self, - node_name: &str, - tensor_name: &str, - ) -> Option { - let state = self.shared_state.lock().unwrap(); - let tensor_constraint = state - .graph_contexts - .get(node_name) - .and_then(|c| c.input_tensors.get(tensor_name)); - - match tensor_constraint { - Some(c) if c.tensor_id.is_some() => { - state.tensors[c.tensor_id.unwrap()].clone() - }, - _ => None, - } - } - - pub fn set_input_tensor( - &mut self, - node_name: &str, - tensor_name: &str, - tensor: &TensorResult, - ) { - let mut state = self.shared_state.lock().unwrap(); - let tensor_id = state.graph_contexts.get(node_name).and_then(|c| { - c.input_tensors - .get(tensor_name) - .and_then(|c| c.tensor_id.clone()) - }); - - match tensor_id { - Some(i) => state.tensors[i] = Some(tensor.clone()), - _ => {}, - } - } - - pub fn get_output_tensor_names( - &self, - node_name: &str, - ) -> Result, Error> { - let state = self.shared_state.lock().unwrap(); - state - .graph_contexts - .get(node_name) - .and_then(|c| { - let tensor_list: Vec = c - .output_tensors - .iter() - .map(|(k, _)| k.to_string()) - .collect(); - Some(tensor_list) - }) - .ok_or(anyhow!("Unable to get input tensors")) - } - - pub fn get_output_tensor( - &mut self, - node_name: &str, - tensor_name: &str, - ) -> Option { - let state = self.shared_state.lock().unwrap(); - let tensor_constraint = state - .graph_contexts - .get(node_name) - .and_then(|c| c.output_tensors.get(tensor_name)); - - match tensor_constraint { - Some(c) if c.tensor_id.is_some() => { - state.tensors[c.tensor_id.unwrap()].clone() - }, - _ => None, - } - } - - // pub fn get_tensor(&self, tensor_id: usize) -> Option<&TensorResult> { - // self.shared_state - // .lock() - // .unwrap() - // .tensors - // .get(tensor_id) - // .unwrap_or(&None) - // .as_ref() - // } - - // pub fn set_tensor(&mut self, tensor_id: usize, tensor: &TensorResult) -> Result<(), Error> { - // self.shared_state - // .lock() - // .unwrap() - // .tensors - // .get_mut(tensor_id) - // .and_then(|t| { t = Some(tensor.clone()); Ok() }) - // .ok() - // } - - pub fn set_output_tensor( - &mut self, - node_name: &str, - tensor_name: &str, - tensor: &TensorResult, - ) { - let mut state = self.shared_state.lock().unwrap(); - let tensor_id = state.graph_contexts.get(node_name).and_then(|c| { - c.output_tensors - .get(tensor_name) - .and_then(|c| c.tensor_id.clone()) - }); - - match tensor_id { - Some(i) => state.tensors[i] = Some(tensor.clone()), - _ => {}, - } - } -} - -impl ModelNode { - #[tracing::instrument( - skip( - node_data, - model_data, - shared_state, - input_tensors, - output_tensors - ), - level = "debug" - )] - fn load( - node_id: &str, - node_data: &ModelStage, - model_data: &[u8], - shared_state: &Arc>, - input_tensors: &HashMap, - output_tensors: &HashMap, - ) -> Result { - // Create Inference Context - let context = InferenceContext::create_context( - TFLITE_MIMETYPE, - &model_data, - AccelerationBackend::NONE, - ) - .with_context(|| { - format!( - "Error Instantiating model from zune for stage: {}", - &node_id - ) - })?; - - let tensor_from_descriptor = - |t: &RuneCoralTensorDescriptor| -> TensorResult { - let element_type = get_element_type(t); - let dimensions = t.shape.iter().map(|&x| x as u32).collect(); - let buffer_size = get_buffer_size(element_type, &dimensions); - - TensorResult { - element_type, - dimensions, - buffer: vec![0; buffer_size], - } - }; - - let tensor_constraint_from_descriptor = - |t: &RuneCoralTensorDescriptor, - tensor_id: usize| - -> TensorConstraint { - let element_type = get_element_type(t); - let dimensions = t.shape.iter().map(|&x| x as usize).collect(); - - TensorConstraint { - tensor_id: Some(tensor_id), - element_type, - dimensions: Dimensions::Fixed(dimensions), - } - }; - - // Returns the list of tensor indices in the State's tensors - let allocate_tensors = |tensor_type: &str, - model_tensors: &mut dyn Iterator< - Item = RuneCoralTensorDescriptor, - >, - pipeline_tensors: &HashMap| - -> Result< - (HashSet, HashMap), - Error, - > { - let mut tensor_indices: HashSet = HashSet::new(); - let mut tensor_constraints: HashMap = - HashMap::new(); - let mut i = 0; - let mut s = shared_state.lock().unwrap(); - - while let Some(model_tensor) = model_tensors.next() { - let tensor_key = key(&node_id, Some(i)); - let tensor_id = - *pipeline_tensors.get(&tensor_key).ok_or_else(|| { - anyhow!( - "Unable to find pipeline_tensor for {} tensor \ - with key {}", - &tensor_type, - &tensor_key - ) - })?; - - let tensor_name = model_tensor.name.to_str().ok(); - let tensor_name = match tensor_name { - Some(tensor_name) if tensor_name.len() > 0 => { - tensor_name.to_string() - }, - _ => format!("{}", i).to_string(), - }; - let tensor_constraint = - tensor_constraint_from_descriptor(&model_tensor, tensor_id); - let model_tensor = tensor_from_descriptor(&model_tensor); - - match s.tensors[tensor_id] { - Some(ref t) - if t.dimensions != model_tensor.dimensions - || t.element_type != model_tensor.element_type => - { - return Err(anyhow!( - "Pipeline tensor for {} with key {} doesn't match \ - model tensor", - &tensor_type, - &tensor_key - )) - }, - Some(_) => {}, - ref mut other => { - *other = Some(model_tensor); - }, - } - - tensor_indices.insert(tensor_id); - //FIXME: 2 tensors share same name (/empty name) - //then tensor_indices.len() != tensor_constraints.len() - tensor_constraints.insert(tensor_name, tensor_constraint); - - i += 1; - } - - Ok((tensor_indices, tensor_constraints)) - }; - - let (input_tensors, input_tensor_constraints) = - allocate_tensors("input", &mut context.inputs(), &input_tensors)?; - - let (output_tensors, output_tensor_constraints) = allocate_tensors( - "output", - &mut context.outputs(), - &output_tensors, - )?; - - let graph_context = GraphContext { - arguments: node_data - .args - .iter() - .map(|(k, v)| (k.clone(), v.to_string())) - .collect(), - input_tensors: input_tensor_constraints, - output_tensors: output_tensor_constraints, - }; - - shared_state - .lock() - .unwrap() - .graph_contexts - .insert(node_id.to_string(), graph_context); - - Ok(ModelNode { - context, - input_tensors, - output_tensors, - shared_state: shared_state.clone(), - }) - } - - #[tracing::instrument(skip_all, level = "debug")] - fn run(&mut self) -> Result<(), Error> { - // We are recreating the input_tensors and output_tensors every time - // before predict because wasm linear memory might have changed - // the locations TODO: There's an optimization that can happen - // here.. but just not yet - let mut inputs: Vec = Vec::new(); - let mut outputs: Vec = Vec::new(); - let mut state = self.shared_state.lock().unwrap(); - - state.tensors.iter_mut().enumerate().for_each(|(i, t)| { - if self.input_tensors.contains(&i) { - let pipeline_tensor = t.as_mut().unwrap(); - unsafe { - inputs.push(RuneCoralTensor { - element_type: get_runecoral_element_type( - &pipeline_tensor.element_type, - ), - shape: Cow::Borrowed(std::slice::from_raw_parts( - pipeline_tensor.dimensions.as_ptr() as *const i32, - pipeline_tensor.dimensions.len(), - )), - buffer: &pipeline_tensor.buffer, - }) - } - } else if self.output_tensors.contains(&i) { - let pipeline_tensor = t.as_mut().unwrap(); - unsafe { - outputs.push(RuneCoralTensorMut { - element_type: get_runecoral_element_type( - &pipeline_tensor.element_type, - ), - shape: Cow::Borrowed(std::slice::from_raw_parts( - pipeline_tensor.dimensions.as_ptr() as *const i32, - pipeline_tensor.dimensions.len(), - )), - buffer: &mut pipeline_tensor.buffer, - }) - } - } else { - // Do nothing - } - }); - - self.context - .infer(&inputs, &mut outputs) - .map_err(|e| anyhow!(e.to_string())) - } -} - -impl ProcBlockNode { - #[tracing::instrument(skip_all, level = "debug", fields(%node_id))] - fn load( - node_id: &str, - wasm: &[u8], - runtime: &Runtime, - input_tensors: &HashMap, - output_tensors: &HashMap, - ) -> Result { - let shared_state = runtime.shared_state.clone(); - let store = Store::default(); - let mut imports = ImportObject::default(); - add_to_imports(&store, &mut imports, runtime.clone()); - - let module = - Module::new(&store, wasm).context("Unable to load the module")?; - let (pb, _) = - ProcBlockV1::instantiate(&store, &module, &mut imports) - .context("Unable to instantiate the WebAssembly module")?; - - let _result = pb.graph(node_id); - - // Assign tensors - // TODO: See if this can be more smart. - // Not bothering with that for now because tensor names are lost in current Runefile format - shared_state - .lock() - .unwrap() - .graph_contexts - .get_mut(node_id) - .and_then(|c| { - c.input_tensors.iter_mut().enumerate().for_each( - |(i, (_, t))| { - input_tensors.get(&key(node_id, Some(i))).and_then( - |&tensor_index| { - Some(t.tensor_id = Some(tensor_index)) - }, - ); - }, - ); - - c.output_tensors.iter_mut().enumerate().for_each( - |(i, (_, t))| { - output_tensors.get(&key(node_id, Some(i))).and_then( - |&tensor_index| { - Some(t.tensor_id = Some(tensor_index)) - }, - ); - }, - ); - Some(()) - }); - - Ok(ProcBlockNode { - node_id: node_id.to_string(), - context: pb, - shared_state: shared_state.clone(), - }) - } - - #[tracing::instrument(skip_all, level = "debug")] - fn run(&mut self) -> Result<(), Error> { - println!("Executing proc block: {:?} ", self.node_id); - // impl stderr for KernelError - self.context - .kernel(&self.node_id) - .map_err(|_| anyhow!("Encountered a Runtime Error"))? - .map_err(|e| match e { - KernelError::Other(s) => anyhow!(s), - KernelError::InvalidArgument(a) => anyhow!( - "Invalid argument for {}: {}", - &self.node_id, - a.name - ), - KernelError::InvalidInput(i) => { - anyhow!("Invalid input for {}: {}", &self.node_id, i.name) - }, - KernelError::MissingContext => anyhow!( - "Unable to retrieve kernel context for {}:", - &self.node_id - ), - }) - } -} - -fn get_buffer_size(element_type: ElementType, dimensions: &Vec) -> usize { - (dimensions.iter().fold(1, |a, &b| a * b) - * get_bytes_per_element(element_type)) as usize -} - -fn get_bytes_per_element(element_type: ElementType) -> u32 { - match element_type { - ElementType::I16 => 2, - ElementType::I32 | ElementType::F32 => 4, - ElementType::I64 | ElementType::F64 => 8, - _ => 1, - } -} - -fn get_element_type(t: &RuneCoralTensorDescriptor) -> ElementType { - match t.element_type { - RuneCoralElementType::UInt8 => ElementType::U8, - RuneCoralElementType::Int8 => ElementType::I8, - RuneCoralElementType::Int16 => ElementType::I16, - RuneCoralElementType::Int32 => ElementType::I32, - RuneCoralElementType::Float32 => ElementType::F32, - RuneCoralElementType::Int64 => ElementType::I64, - RuneCoralElementType::Float64 => ElementType::F64, - RuneCoralElementType::String => ElementType::Utf8, - // TODO: Implement support for all the element types - _ => ElementType::U8, - } -} - -fn get_runecoral_element_type(t: &ElementType) -> RuneCoralElementType { - match t { - ElementType::U8 => RuneCoralElementType::UInt8, - ElementType::I8 => RuneCoralElementType::Int8, - ElementType::I16 => RuneCoralElementType::Int16, - ElementType::I32 => RuneCoralElementType::Int32, - ElementType::F32 => RuneCoralElementType::Float32, - ElementType::I64 => RuneCoralElementType::Int64, - ElementType::F64 => RuneCoralElementType::Float64, - ElementType::Utf8 => RuneCoralElementType::String, - // TODO: Implement support for all the element types - _ => RuneCoralElementType::NoType, - } -} - -fn instantiate_nodes( - pipeline: &IndexMap, - mut read_zip_resource_by_path: impl FnMut(&str) -> Result, Error>, - shared_state: &Arc>, - input_tensors: HashMap, - output_tensors: HashMap, -) -> Result<(HashMap, HashMap), Error> -{ - let mut models: HashMap = HashMap::new(); - let mut procblocks: HashMap = HashMap::new(); - - let runtime = Runtime { - shared_state: shared_state.clone(), - }; - - for item in pipeline { - // Collect each output tensor into tensors - let stage_name = item.0; - match item.1 { - // Models are handled on the host side, so we treat them separately - Stage::Capability(stage) => { - let wasm = - read_zip_resource_by_path(&stage.capability.to_string()) - .context("Unable to load the capability")?; - - procblocks.insert( - stage_name.to_string(), - ProcBlockNode::load( - &stage_name, - &wasm, - &runtime, - &input_tensors, - &output_tensors, - )?, - ); - }, - Stage::Model(stage) => { - // Instantiating the model's inference context here because that - // way model_data gets deallocated once we are done with it - // This way memory usage is under control - let model_data = - read_zip_resource_by_path(&stage.model.to_string()) - .with_context(|| { - anyhow!( - "Unable to read model from zune {}", - stage.model - ) - })?; - - models.insert( - stage_name.to_string(), - ModelNode::load( - &stage_name, - &stage, - &model_data, - &shared_state, - &input_tensors, - &output_tensors, - )?, - ); - }, - Stage::ProcBlock(stage) => { - let wasm = - read_zip_resource_by_path(&stage.proc_block.to_string()) - .context("Unable to load the proc_block")?; - - procblocks.insert( - stage_name.to_string(), - ProcBlockNode::load( - &stage_name, - &wasm, - &runtime, - &input_tensors, - &output_tensors, - )?, - ); - }, - - _ => {}, // Do nothing for capabilities/outputs - } - } - - Ok((models, procblocks)) -} - -fn get_tensors( - inputs: &Vec, - outputs: &Vec, - pipeline: &IndexMap, -) -> Result< - ( - Vec>, - HashMap, - HashMap, - Vec, - ), - Error, -> { - let mut nodes_to_visit = outputs.clone(); - let mut nodes_visited = Vec::new(); - let mut tensors: Vec> = Vec::new(); - let mut output_tensors: HashMap = HashMap::new(); - let mut input_tensors: HashMap = HashMap::new(); - - // For Inputs/Capabilities - We create an input so as to be able to inject inputs - for item in inputs { - tensors.push(None); - input_tensors.insert(key(item, Some(0)), tensors.len() - 1); - output_tensors.insert(key(item, Some(0)), tensors.len() - 1); - } - - // // For Outputs - we allocate all the outputs - // for item in outputs { - // for _ in pipeline.get(item).unwrap().output_types() { - // tensors.push(None); - // output_tensors.insert(key(item, Some(0)), tensors.len() - 1); - // } - // } - - // Do a depth first traversal of the tree structure to determine the order - // of processing/calling predict() Also allocate the output tensors of - // each node along the way - while !nodes_to_visit.is_empty() { - let node = nodes_to_visit.pop().unwrap(); - nodes_visited.push(node.clone()); - - let stage = pipeline.get(&node).unwrap(); - for output_index in 0..stage.output_types().len() { - tensors.push(None); - output_tensors - .insert(key(&node, Some(output_index)), tensors.len() - 1); - } - - for input in stage.inputs() { - if !nodes_to_visit.contains(&input.name) - && !nodes_visited.contains(&input.name) - { - nodes_to_visit.push(input.name.clone()); - } - } - } - - // For each stage in the pipeline, since the inputs have to come from the - // outputs of other stages, simply map to the same tensor - for item in pipeline { - // Collect each output tensor into tensors - let stage_name = item.0; - for i in 0..item.1.inputs().len() { - let input = &item.1.inputs()[i]; - let input_key = key(&input.name, input.index); - let &input_tensor_index = output_tensors.get(&input_key).context( - anyhow!("Invalid input key specified: {}", &input_key), - )?; - input_tensors.insert(key(stage_name, Some(i)), input_tensor_index); - } - } - - nodes_visited.reverse(); - - Ok((tensors, input_tensors, output_tensors, nodes_visited)) -} - -fn key(node_name: &str, tensor_index: Option) -> String { - format!("{}.{}", node_name, tensor_index.or(Some(0)).unwrap()) -} - -#[derive(Debug, Clone)] -pub enum Never {} - -#[derive(Debug, Clone)] -struct Metadata { - description: String, - repository: String, - homepage: String, - tags: Vec, - arguments: Vec, - inputs: Vec, - outputs: Vec, -} - -#[derive(Debug, Clone)] -struct ArgumentMetadata { - description: String, - default_value: String, - // hint: Vec -} - -#[derive(Debug, Clone)] -struct TensorMetadata {} - -#[derive(Debug, Clone)] -enum Dimensions { - Dynamic, - Fixed(Vec), -} - -#[derive(Debug, Clone)] -struct TensorConstraint { - tensor_id: Option, - element_type: ElementType, - dimensions: Dimensions, -} - -#[derive(Debug, Default, Clone)] -struct GraphContext { - arguments: HashMap, - input_tensors: HashMap, - output_tensors: HashMap, -} - -impl runtime_v1::RuntimeV1 for Runtime { - type ArgumentHint = Never; - type ArgumentMetadata = Never; - type KernelContext = String; - type Metadata = Metadata; - type Model = Never; - type TensorHint = Never; - type TensorMetadata = TensorMetadata; - type GraphContext = String; - - fn metadata_new(&mut self, _name: &str, _version: &str) -> Self::Metadata { - todo!() - } - - fn metadata_set_description( - &mut self, - _ctx: &Self::Metadata, - _description: &str, - ) { - todo!() - } - - fn metadata_set_repository(&mut self, _ctx: &Self::Metadata, _url: &str) { - todo!() - } - - fn metadata_set_homepage(&mut self, _ctx: &Self::Metadata, _url: &str) { - todo!() - } - - fn metadata_add_tag(&mut self, _ctx: &Self::Metadata, _tag: &str) { - todo!() - } - - fn metadata_add_argument( - &mut self, - _ctx: &Self::Metadata, - _arg: &Self::ArgumentMetadata, - ) { - todo!() - } - - fn metadata_add_input( - &mut self, - _ctx: &Self::Metadata, - _metadata: &Self::TensorMetadata, - ) { - todo!() - } - - fn metadata_add_output( - &mut self, - _ctx: &Self::Metadata, - _metadata: &Self::TensorMetadata, - ) { - todo!() - } - - fn argument_metadata_new(&mut self, _name: &str) -> Self::ArgumentMetadata { - todo!() - } - - fn argument_metadata_set_description( - &mut self, - _ctx: &Self::ArgumentMetadata, - _description: &str, - ) { - todo!() - } - - fn argument_metadata_set_default_value( - &mut self, - _ctx: &Self::ArgumentMetadata, - _default_value: &str, - ) { - todo!() - } - - fn argument_metadata_add_hint( - &mut self, - _ctx: &Self::ArgumentMetadata, - _hint: &Self::ArgumentHint, - ) { - todo!() - } - - fn tensor_metadata_new(&mut self, _name: &str) -> Self::TensorMetadata { - todo!() - } - - fn tensor_metadata_set_description( - &mut self, - _ctx: &Self::TensorMetadata, - _description: &str, - ) { - todo!() - } - - fn tensor_metadata_add_hint( - &mut self, - _ctx: &Self::TensorMetadata, - _hint: &Self::TensorHint, - ) { - todo!() - } - - fn interpret_as_image(&mut self) -> Self::TensorHint { - todo!() - } - - fn interpret_as_audio(&mut self) -> Self::TensorHint { - todo!() - } - - fn supported_shapes( - &mut self, - _supported_element_types: Vec, - _dimensions: DimensionsParam<'_>, - ) -> Self::TensorHint { - todo!() - } - - fn interpret_as_number_in_range( - &mut self, - _min: &str, - _max: &str, - ) -> Self::ArgumentHint { - todo!() - } - - fn interpret_as_string_in_enum( - &mut self, - _string_enum: Vec<&str>, - ) -> Self::ArgumentHint { - todo!() - } - - fn non_negative_number(&mut self) -> Self::ArgumentHint { - todo!() - } - - fn supported_argument_type( - &mut self, - _hint: ArgumentType, - ) -> Self::ArgumentHint { - todo!() - } - - fn register_node(&mut self, _metadata: &Self::Metadata) { - todo!() - } - - #[tracing::instrument(skip_all, level = "debug")] - fn graph_context_for_node( - &mut self, - node_id: &str, - ) -> Option { - self.shared_state - .lock() - .unwrap() - .graph_contexts - .get(node_id)?; - - Some(node_id.to_string()) - } - - #[tracing::instrument(skip(self, ctx), level = "debug")] - fn graph_context_get_argument( - &mut self, - ctx: &Self::GraphContext, - name: &str, - ) -> Option { - self.shared_state - .lock() - .unwrap() - .graph_contexts - .get(ctx) - .and_then(|c| c.arguments.get(name).and_then(|v| Some(v.clone()))) - } - - #[tracing::instrument(skip(self, ctx), level = "debug")] - fn graph_context_add_input_tensor( - &mut self, - ctx: &Self::GraphContext, - name: &str, - element_type: ElementType, - dimensions: DimensionsParam<'_>, - ) { - self.shared_state - .lock() - .unwrap() - .graph_contexts - .get_mut(ctx) - .and_then(|c| { - c.input_tensors.insert( - name.to_string(), - TensorConstraint { - tensor_id: None, - element_type, - dimensions: match dimensions { - DimensionsParam::Dynamic => Dimensions::Dynamic, - DimensionsParam::Fixed(shape) => Dimensions::Fixed( - shape - .iter() - .map(|&i| i.get() as usize) - .collect(), - ), - }, - }, - ) - }); - } - - #[tracing::instrument(skip(self, ctx), level = "debug")] - fn graph_context_add_output_tensor( - &mut self, - ctx: &Self::GraphContext, - name: &str, - element_type: ElementType, - dimensions: DimensionsParam<'_>, - ) { - self.shared_state - .lock() - .unwrap() - .graph_contexts - .get_mut(ctx) - .and_then(|c| { - c.output_tensors.insert( - name.to_string(), - TensorConstraint { - tensor_id: None, - element_type, - dimensions: match dimensions { - DimensionsParam::Dynamic => Dimensions::Dynamic, - DimensionsParam::Fixed(shape) => Dimensions::Fixed( - shape - .iter() - .map(|&i| i.get() as usize) - .collect(), - ), - }, - }, - ) - }); - } - - #[tracing::instrument(skip_all, level = "debug")] - fn kernel_context_for_node( - &mut self, - node_id: &str, - ) -> Option { - self.shared_state - .lock() - .unwrap() - .graph_contexts - .get(node_id)?; - Some(node_id.to_string()) - } - - #[tracing::instrument(skip(self, ctx), level = "debug")] - fn kernel_context_get_argument( - &mut self, - ctx: &Self::KernelContext, - name: &str, - ) -> Option { - self.shared_state - .lock() - .unwrap() - .graph_contexts - .get(ctx) - .and_then(|c| c.arguments.get(name).and_then(|v| Some(v.clone()))) - } - - #[tracing::instrument(skip(self, ctx), level = "debug")] - fn kernel_context_get_input_tensor( - &mut self, - ctx: &Self::KernelContext, - name: &str, - ) -> Option { - let state = self.shared_state.lock().unwrap(); - - let tensor_id = state - .graph_contexts - .get(ctx) - .and_then(|c| c.input_tensors.get(name).and_then(|v| v.tensor_id)); - - match tensor_id { - Some(i) => state.tensors[i].clone(), - _ => None, - } - } - - #[tracing::instrument(skip(self, ctx, buffer), level = "debug")] - fn kernel_context_set_output_tensor( - &mut self, - ctx: &Self::KernelContext, - name: &str, - TensorParam { - element_type, - buffer, - dimensions, - }: TensorParam<'_>, - ) { - let mut state = self.shared_state.lock().unwrap(); - - let tensor_id = state - .graph_contexts - .get(ctx) - .and_then(|c| c.output_tensors.get(name).and_then(|v| v.tensor_id)); - - let dimensions = dimensions.iter().map(|&i| i.get() as u32).collect(); - - // Todo check tensor constraint - - if tensor_id.is_some() { - state.tensors[tensor_id.unwrap()] = Some(TensorResult { - element_type, - buffer: buffer.to_vec(), - dimensions, - }); - } - } - - fn is_enabled(&mut self, _metadata: LogMetadata) -> bool { - true - } - - fn log( - &mut self, - metadata: LogMetadata, - message: &str, - data: Vec<(&'_ str, LogValue<'_>)>, - ) { - let level = match metadata.level { - LogLevel::Trace => tracing::Level::TRACE, - LogLevel::Debug => tracing::Level::DEBUG, - LogLevel::Info => tracing::Level::INFO, - LogLevel::Warn => tracing::Level::WARN, - LogLevel::Error | LogLevel::Fatal => tracing::Level::ERROR, - }; - - let LogMetadata { - name, - target, - level: _, - file, - line, - module, - } = metadata; - - tracing::event!( - tracing::Level::INFO, - meta.level = %level, - meta.name = %name, - meta.target = target, - meta.file = file, - meta.line = line, - meta.module = module, - ?data, - message, - ); - } - - fn kernel_context_get_global_input( - &mut self, - _ctx: &Self::KernelContext, - _name: &str, - ) -> Option { - todo!() - } - - fn kernel_context_set_global_output( - &mut self, - _ctx: &Self::KernelContext, - _name: &str, - _tensor: TensorParam<'_>, - ) { - todo!() - } - - fn model_load( - &mut self, - _: &str, - _: &[u8], - _: Vec<(&str, &str)>, - ) -> Result { - todo!() - } - - fn model_inputs(&mut self, _: &Self::Model) -> Vec { - todo!() - } - - fn model_outputs(&mut self, _: &Self::Model) -> Vec { - todo!() - } - - fn model_infer( - &mut self, - _: &Self::Model, - _: Vec>, - ) -> Result, ModelInferError> { - todo!() - } -} diff --git a/crates/runtime/src/zune/mod.rs b/crates/runtime/src/zune/mod.rs new file mode 100644 index 0000000000..b0c4402852 --- /dev/null +++ b/crates/runtime/src/zune/mod.rs @@ -0,0 +1,502 @@ +mod proc_block; +#[cfg(feature = "tflite")] +mod tflite; + +use std::{ + collections::HashMap, + io::{Cursor, Read}, + sync::{Arc, Mutex}, +}; + +use anyhow::{anyhow, Context, Error}; +use hotg_rune_compiler::parse::yaml::*; +use indexmap::IndexMap; +use zip; + +pub use self::{proc_block_v1::*, runtime_v1::*}; +use crate::{ + zune::proc_block::{GraphContext, ProcBlockNode, TensorConstraint}, + LoadError, +}; + +wit_bindgen_wasmer::export!("../../../wit-files/rune/runtime-v1.wit"); +wit_bindgen_wasmer::import!("../../../wit-files/rune/proc-block-v1.wit"); + +#[derive(Debug, Default, Clone, wasmer::WasmerEnv)] +struct Runtime { + shared_state: Arc>, +} + +#[derive(Debug, Default)] +pub(crate) struct State { + pub(crate) tensors: Vec>, + pub(crate) tensor_constraints: Vec>, + pub(crate) graph_contexts: HashMap, +} + +pub struct ZuneEngine { + input_nodes: Vec, + output_nodes: Vec, + #[cfg(feature = "tflite")] + models: HashMap, + procblocks: HashMap, + pipeline: IndexMap, + processing_order: Vec, + shared_state: Arc>, // resources +} + +impl ZuneEngine { + #[tracing::instrument(skip_all)] + pub fn load(binary: &[u8]) -> Result + where + Self: Sized, + { + let mut archive = zip::ZipArchive::new(Cursor::new(binary)) + .context("Unable to load Zune")?; + + let mut read_zip_resource_by_path = + |path: &str| -> Result, Error> { + let mut requested_file = + archive.by_name(path).with_context(|| { + anyhow!("Unable to find {} in zune", path) + })?; + let mut buffer = Vec::new(); + requested_file.read_to_end(&mut buffer).with_context(|| { + anyhow!("Unable to read {} from zune", path) + })?; + Ok(buffer) + }; + + let runefile = + String::from_utf8(read_zip_resource_by_path("Runefile.yml")?) + .context("Unable to read Runefile")?; + tracing::debug!(length = runefile.len(), "Read the Rune"); + + let parsed_runefile = + Document::parse(&runefile).context("Unable to parse Runefile")?; + let pipeline = &parsed_runefile.to_v1().pipeline; + + let inputs: Vec<_> = pipeline + .iter() + .filter_map(|(k, v)| match v { + Stage::Capability(_) => Some(k.clone()), + _ => None, + }) + .collect(); + + let outputs: Vec<_> = pipeline + .iter() + .filter_map(|(k, v)| match v { + Stage::Out(_) => Some(k.clone()), + _ => None, + }) + .collect(); + + let (tensors, input_tensors, output_tensors, processing_order) = + get_tensors(&inputs, &outputs, &pipeline) + .context(anyhow!("Unable to map out input/output tensors"))?; + + let graph_contexts = pipeline + .iter() + .map(|(k, v)| { + let arguments = v + .args() + .iter() + .map(|(name, argument)| { + (name.clone(), argument.to_string()) + }) + .collect(); + ( + k.clone(), + GraphContext { + arguments, + input_tensors: HashMap::new(), + output_tensors: HashMap::new(), + }, + ) + }) + .collect(); + + let tensor_constraints = tensors.iter().map(|_| None).collect(); + let shared_state = Arc::new(Mutex::new(State { + tensors, + tensor_constraints, + graph_contexts, + })); + + tracing::trace!(?input_tensors, ?output_tensors, "Loaded tensors"); + + let (model_contexts, procblock_contexts) = instantiate_nodes( + pipeline, + read_zip_resource_by_path, + &shared_state, + input_tensors, + output_tensors, + ) + .map_err(LoadError::Other)?; + + tracing::debug!(order=?processing_order, "Determined the execution order"); + + // TODO: Validate and allocate input/output tensors + + Ok(ZuneEngine { + input_nodes: inputs, + output_nodes: outputs, + models: model_contexts, + procblocks: procblock_contexts, + pipeline: pipeline.to_owned(), + processing_order, + shared_state, + }) + } + + #[tracing::instrument(skip_all)] + pub fn predict(&mut self) -> Result<(), Error> { + for stage_name in &self.processing_order { + let _span = + tracing::debug_span!("Running Stage", %stage_name).entered(); + + let stage = self.pipeline.get(stage_name).unwrap(); + match stage { + #[cfg(feature = "tflite")] + Stage::Model(_) => { + self.models.get_mut(stage_name).unwrap().run()?; + }, + #[cfg(not(feature = "tflite"))] + Stage::Model(_) => { + anyhow::bail!( + "Unable to run \"{}\" because models aren't supported", + stage_name + ); + }, + Stage::Capability(_) | Stage::ProcBlock(_) => { + self.procblocks.get_mut(stage_name).unwrap().run()?; + }, + _ => {}, + } + } + Ok(()) + } + + pub fn input_nodes(&self) -> &'_ Vec { + return &self.input_nodes; + } + + pub fn output_nodes(&self) -> &'_ Vec { + return &self.output_nodes; + } + + pub fn get_input_tensor_names( + &self, + node_name: &str, + ) -> Result, Error> { + let state = self.shared_state.lock().unwrap(); + state + .graph_contexts + .get(node_name) + .and_then(|c| { + let tensor_list: Vec = c + .input_tensors + .iter() + .map(|(k, _)| k.to_string()) + .collect(); + Some(tensor_list) + }) + .ok_or(anyhow!("Unable to get input tensors")) + } + + pub fn get_input_tensor( + &mut self, + node_name: &str, + tensor_name: &str, + ) -> Option { + let state = self.shared_state.lock().unwrap(); + let tensor_constraint = state + .graph_contexts + .get(node_name) + .and_then(|c| c.input_tensors.get(tensor_name)); + + match tensor_constraint { + Some(c) if c.tensor_id.is_some() => { + state.tensors[c.tensor_id.unwrap()].clone() + }, + _ => None, + } + } + + pub fn set_input_tensor( + &mut self, + node_name: &str, + tensor_name: &str, + tensor: &TensorResult, + ) { + let mut state = self.shared_state.lock().unwrap(); + let tensor_id = state.graph_contexts.get(node_name).and_then(|c| { + c.input_tensors + .get(tensor_name) + .and_then(|c| c.tensor_id.clone()) + }); + + match tensor_id { + Some(i) => state.tensors[i] = Some(tensor.clone()), + _ => {}, + } + } + + pub fn get_output_tensor_names( + &self, + node_name: &str, + ) -> Result, Error> { + let state = self.shared_state.lock().unwrap(); + state + .graph_contexts + .get(node_name) + .and_then(|c| { + let tensor_list: Vec = c + .output_tensors + .iter() + .map(|(k, _)| k.to_string()) + .collect(); + Some(tensor_list) + }) + .ok_or(anyhow!("Unable to get input tensors")) + } + + pub fn get_output_tensor( + &mut self, + node_name: &str, + tensor_name: &str, + ) -> Option { + let state = self.shared_state.lock().unwrap(); + let tensor_constraint = state + .graph_contexts + .get(node_name) + .and_then(|c| c.output_tensors.get(tensor_name)); + + match tensor_constraint { + Some(c) if c.tensor_id.is_some() => { + state.tensors[c.tensor_id.unwrap()].clone() + }, + _ => None, + } + } + + // pub fn get_tensor(&self, tensor_id: usize) -> Option<&TensorResult> { + // self.shared_state + // .lock() + // .unwrap() + // .tensors + // .get(tensor_id) + // .unwrap_or(&None) + // .as_ref() + // } + + // pub fn set_tensor(&mut self, tensor_id: usize, tensor: &TensorResult) -> Result<(), Error> { + // self.shared_state + // .lock() + // .unwrap() + // .tensors + // .get_mut(tensor_id) + // .and_then(|t| { t = Some(tensor.clone()); Ok() }) + // .ok() + // } + + pub fn set_output_tensor( + &mut self, + node_name: &str, + tensor_name: &str, + tensor: &TensorResult, + ) { + let mut state = self.shared_state.lock().unwrap(); + let tensor_id = state.graph_contexts.get(node_name).and_then(|c| { + c.output_tensors + .get(tensor_name) + .and_then(|c| c.tensor_id.clone()) + }); + + match tensor_id { + Some(i) => state.tensors[i] = Some(tensor.clone()), + _ => {}, + } + } +} + +fn get_buffer_size(element_type: ElementType, dimensions: &Vec) -> usize { + (dimensions.iter().fold(1, |a, &b| a * b) + * get_bytes_per_element(element_type)) as usize +} + +fn get_bytes_per_element(element_type: ElementType) -> u32 { + match element_type { + ElementType::I16 => 2, + ElementType::I32 | ElementType::F32 => 4, + ElementType::I64 | ElementType::F64 => 8, + _ => 1, + } +} + +fn instantiate_nodes( + pipeline: &IndexMap, + mut read_zip_resource_by_path: impl FnMut(&str) -> Result, Error>, + shared_state: &Arc>, + input_tensors: HashMap, + output_tensors: HashMap, +) -> Result<(HashMap, HashMap), Error> +{ + let mut models: HashMap = HashMap::new(); + let mut procblocks: HashMap = HashMap::new(); + + let runtime = Runtime { + shared_state: shared_state.clone(), + }; + + for item in pipeline { + // Collect each output tensor into tensors + let stage_name = item.0; + match item.1 { + // Models are handled on the host side, so we treat them separately + Stage::Capability(stage) => { + let wasm = + read_zip_resource_by_path(&stage.capability.to_string()) + .context("Unable to load the capability")?; + + procblocks.insert( + stage_name.to_string(), + ProcBlockNode::load( + &stage_name, + &wasm, + &runtime, + &input_tensors, + &output_tensors, + )?, + ); + }, + Stage::Model(stage) => { + // Instantiating the model's inference context here because that + // way model_data gets deallocated once we are done with it + // This way memory usage is under control + let model_data = + read_zip_resource_by_path(&stage.model.to_string()) + .with_context(|| { + anyhow!( + "Unable to read model from zune {}", + stage.model + ) + })?; + + models.insert( + stage_name.to_string(), + ModelNode::load( + &stage_name, + &stage, + &model_data, + &shared_state, + &input_tensors, + &output_tensors, + )?, + ); + }, + Stage::ProcBlock(stage) => { + let wasm = + read_zip_resource_by_path(&stage.proc_block.to_string()) + .context("Unable to load the proc_block")?; + + procblocks.insert( + stage_name.to_string(), + ProcBlockNode::load( + &stage_name, + &wasm, + &runtime, + &input_tensors, + &output_tensors, + )?, + ); + }, + + _ => {}, // Do nothing for capabilities/outputs + } + } + + Ok((models, procblocks)) +} + +fn get_tensors( + inputs: &Vec, + outputs: &Vec, + pipeline: &IndexMap, +) -> Result< + ( + Vec>, + HashMap, + HashMap, + Vec, + ), + Error, +> { + let mut nodes_to_visit = outputs.clone(); + let mut nodes_visited = Vec::new(); + let mut tensors: Vec> = Vec::new(); + let mut output_tensors: HashMap = HashMap::new(); + let mut input_tensors: HashMap = HashMap::new(); + + // For Inputs/Capabilities - We create an input so as to be able to inject inputs + for item in inputs { + tensors.push(None); + input_tensors.insert(key(item, Some(0)), tensors.len() - 1); + output_tensors.insert(key(item, Some(0)), tensors.len() - 1); + } + + // // For Outputs - we allocate all the outputs + // for item in outputs { + // for _ in pipeline.get(item).unwrap().output_types() { + // tensors.push(None); + // output_tensors.insert(key(item, Some(0)), tensors.len() - 1); + // } + // } + + // Do a depth first traversal of the tree structure to determine the order + // of processing/calling predict() Also allocate the output tensors of + // each node along the way + while !nodes_to_visit.is_empty() { + let node = nodes_to_visit.pop().unwrap(); + nodes_visited.push(node.clone()); + + let stage = pipeline.get(&node).unwrap(); + for output_index in 0..stage.output_types().len() { + tensors.push(None); + output_tensors + .insert(key(&node, Some(output_index)), tensors.len() - 1); + } + + for input in stage.inputs() { + if !nodes_to_visit.contains(&input.name) + && !nodes_visited.contains(&input.name) + { + nodes_to_visit.push(input.name.clone()); + } + } + } + + // For each stage in the pipeline, since the inputs have to come from the + // outputs of other stages, simply map to the same tensor + for item in pipeline { + // Collect each output tensor into tensors + let stage_name = item.0; + for i in 0..item.1.inputs().len() { + let input = &item.1.inputs()[i]; + let input_key = key(&input.name, input.index); + let &input_tensor_index = output_tensors.get(&input_key).context( + anyhow!("Invalid input key specified: {}", &input_key), + )?; + input_tensors.insert(key(stage_name, Some(i)), input_tensor_index); + } + } + + nodes_visited.reverse(); + + Ok((tensors, input_tensors, output_tensors, nodes_visited)) +} + +fn key(node_name: &str, tensor_index: Option) -> String { + format!("{}.{}", node_name, tensor_index.or(Some(0)).unwrap()) +} diff --git a/crates/runtime/src/zune/proc_block.rs b/crates/runtime/src/zune/proc_block.rs new file mode 100644 index 0000000000..c6beeac06b --- /dev/null +++ b/crates/runtime/src/zune/proc_block.rs @@ -0,0 +1,556 @@ +use std::{ + collections::HashMap, + sync::{Arc, Mutex}, +}; + +use anyhow::{Context, Error}; +use wasmer::{ImportObject, Module, Store}; + +use crate::zune::{ + key, runtime_v1, ArgumentType, DimensionsParam, ElementType, KernelError, + LogLevel, LogMetadata, LogValue, ModelInferError, ModelLoadError, + ProcBlockV1, Runtime, State, TensorParam, TensorResult, +}; + +pub(crate) struct ProcBlockNode { + node_id: String, + context: ProcBlockV1, + shared_state: Arc>, +} + +impl ProcBlockNode { + #[tracing::instrument(skip_all, level = "debug", fields(%node_id))] + pub(crate) fn load( + node_id: &str, + wasm: &[u8], + runtime: &Runtime, + input_tensors: &HashMap, + output_tensors: &HashMap, + ) -> Result { + let shared_state = runtime.shared_state.clone(); + let store = Store::default(); + let mut imports = ImportObject::default(); + super::add_to_imports(&store, &mut imports, runtime.clone()); + + let module = + Module::new(&store, wasm).context("Unable to load the module")?; + let (pb, _) = + ProcBlockV1::instantiate(&store, &module, &mut imports) + .context("Unable to instantiate the WebAssembly module")?; + + let _result = pb.graph(node_id); + + // Assign tensors + // TODO: See if this can be more smart. + // Not bothering with that for now because tensor names are lost in current Runefile format + shared_state + .lock() + .unwrap() + .graph_contexts + .get_mut(node_id) + .and_then(|c| { + c.input_tensors.iter_mut().enumerate().for_each( + |(i, (_, t))| { + input_tensors.get(&key(node_id, Some(i))).and_then( + |&tensor_index| { + Some(t.tensor_id = Some(tensor_index)) + }, + ); + }, + ); + + c.output_tensors.iter_mut().enumerate().for_each( + |(i, (_, t))| { + output_tensors.get(&key(node_id, Some(i))).and_then( + |&tensor_index| { + Some(t.tensor_id = Some(tensor_index)) + }, + ); + }, + ); + Some(()) + }); + + Ok(ProcBlockNode { + node_id: node_id.to_string(), + context: pb, + shared_state: shared_state.clone(), + }) + } + + #[tracing::instrument(skip_all, level = "debug")] + pub(crate) fn run(&mut self) -> Result<(), Error> { + println!("Executing proc block: {:?} ", self.node_id); + self.context.kernel(&self.node_id)?.map_err(|e| match e { + KernelError::Other(s) => Error::msg(s), + KernelError::InvalidArgument(a) => { + anyhow::anyhow!( + "Invalid argument for {}: {}", + &self.node_id, + a.name + ) + }, + KernelError::InvalidInput(i) => { + anyhow::anyhow!( + "Invalid input for {}: {}", + &self.node_id, + i.name + ) + }, + KernelError::MissingContext => anyhow::anyhow!( + "Unable to retrieve kernel context for {}:", + &self.node_id + ), + }) + } +} + +#[derive(Debug, Clone)] +pub enum Never {} + +#[derive(Debug, Clone)] +struct Metadata { + description: String, + repository: String, + homepage: String, + tags: Vec, + arguments: Vec, + inputs: Vec, + outputs: Vec, +} + +#[derive(Debug, Clone)] +struct ArgumentMetadata { + description: String, + default_value: String, + // hint: Vec +} + +#[derive(Debug, Clone)] +struct TensorMetadata {} + +#[derive(Debug, Clone)] +pub(crate) enum Dimensions { + Dynamic, + Fixed(Vec), +} + +#[derive(Debug, Clone)] +pub(crate) struct TensorConstraint { + pub tensor_id: Option, + pub element_type: ElementType, + pub dimensions: Dimensions, +} + +#[derive(Debug, Default, Clone)] +pub(crate) struct GraphContext { + pub arguments: HashMap, + pub input_tensors: HashMap, + pub output_tensors: HashMap, +} + +impl runtime_v1::RuntimeV1 for Runtime { + type ArgumentHint = Never; + type ArgumentMetadata = Never; + type KernelContext = String; + type Metadata = Metadata; + type Model = Never; + type TensorHint = Never; + type TensorMetadata = TensorMetadata; + type GraphContext = String; + + fn metadata_new(&mut self, _name: &str, _version: &str) -> Self::Metadata { + todo!() + } + + fn metadata_set_description( + &mut self, + _ctx: &Self::Metadata, + _description: &str, + ) { + todo!() + } + + fn metadata_set_repository(&mut self, _ctx: &Self::Metadata, _url: &str) { + todo!() + } + + fn metadata_set_homepage(&mut self, _ctx: &Self::Metadata, _url: &str) { + todo!() + } + + fn metadata_add_tag(&mut self, _ctx: &Self::Metadata, _tag: &str) { + todo!() + } + + fn metadata_add_argument( + &mut self, + _ctx: &Self::Metadata, + _arg: &Self::ArgumentMetadata, + ) { + todo!() + } + + fn metadata_add_input( + &mut self, + _ctx: &Self::Metadata, + _metadata: &Self::TensorMetadata, + ) { + todo!() + } + + fn metadata_add_output( + &mut self, + _ctx: &Self::Metadata, + _metadata: &Self::TensorMetadata, + ) { + todo!() + } + + fn argument_metadata_new(&mut self, _name: &str) -> Self::ArgumentMetadata { + todo!() + } + + fn argument_metadata_set_description( + &mut self, + _ctx: &Self::ArgumentMetadata, + _description: &str, + ) { + todo!() + } + + fn argument_metadata_set_default_value( + &mut self, + _ctx: &Self::ArgumentMetadata, + _default_value: &str, + ) { + todo!() + } + + fn argument_metadata_add_hint( + &mut self, + _ctx: &Self::ArgumentMetadata, + _hint: &Self::ArgumentHint, + ) { + todo!() + } + + fn tensor_metadata_new(&mut self, _name: &str) -> Self::TensorMetadata { + todo!() + } + + fn tensor_metadata_set_description( + &mut self, + _ctx: &Self::TensorMetadata, + _description: &str, + ) { + todo!() + } + + fn tensor_metadata_add_hint( + &mut self, + _ctx: &Self::TensorMetadata, + _hint: &Self::TensorHint, + ) { + todo!() + } + + fn interpret_as_image(&mut self) -> Self::TensorHint { + todo!() + } + + fn interpret_as_audio(&mut self) -> Self::TensorHint { + todo!() + } + + fn supported_shapes( + &mut self, + _supported_element_types: Vec, + _dimensions: DimensionsParam<'_>, + ) -> Self::TensorHint { + todo!() + } + + fn interpret_as_number_in_range( + &mut self, + _min: &str, + _max: &str, + ) -> Self::ArgumentHint { + todo!() + } + + fn interpret_as_string_in_enum( + &mut self, + _string_enum: Vec<&str>, + ) -> Self::ArgumentHint { + todo!() + } + + fn non_negative_number(&mut self) -> Self::ArgumentHint { + todo!() + } + + fn supported_argument_type( + &mut self, + _hint: ArgumentType, + ) -> Self::ArgumentHint { + todo!() + } + + fn register_node(&mut self, _metadata: &Self::Metadata) { + todo!() + } + + #[tracing::instrument(skip_all, level = "debug")] + fn graph_context_for_node( + &mut self, + node_id: &str, + ) -> Option { + self.shared_state + .lock() + .unwrap() + .graph_contexts + .get(node_id)?; + + Some(node_id.to_string()) + } + + #[tracing::instrument(skip(self, ctx), level = "debug")] + fn graph_context_get_argument( + &mut self, + ctx: &Self::GraphContext, + name: &str, + ) -> Option { + self.shared_state + .lock() + .unwrap() + .graph_contexts + .get(ctx) + .and_then(|c| c.arguments.get(name).and_then(|v| Some(v.clone()))) + } + + #[tracing::instrument(skip(self, ctx), level = "debug")] + fn graph_context_add_input_tensor( + &mut self, + ctx: &Self::GraphContext, + name: &str, + element_type: ElementType, + dimensions: DimensionsParam<'_>, + ) { + self.shared_state + .lock() + .unwrap() + .graph_contexts + .get_mut(ctx) + .and_then(|c| { + c.input_tensors.insert( + name.to_string(), + TensorConstraint { + tensor_id: None, + element_type, + dimensions: match dimensions { + DimensionsParam::Dynamic => Dimensions::Dynamic, + DimensionsParam::Fixed(shape) => Dimensions::Fixed( + shape + .iter() + .map(|&i| i.get() as usize) + .collect(), + ), + }, + }, + ) + }); + } + + #[tracing::instrument(skip(self, ctx), level = "debug")] + fn graph_context_add_output_tensor( + &mut self, + ctx: &Self::GraphContext, + name: &str, + element_type: ElementType, + dimensions: DimensionsParam<'_>, + ) { + self.shared_state + .lock() + .unwrap() + .graph_contexts + .get_mut(ctx) + .and_then(|c| { + c.output_tensors.insert( + name.to_string(), + TensorConstraint { + tensor_id: None, + element_type, + dimensions: match dimensions { + DimensionsParam::Dynamic => Dimensions::Dynamic, + DimensionsParam::Fixed(shape) => Dimensions::Fixed( + shape + .iter() + .map(|&i| i.get() as usize) + .collect(), + ), + }, + }, + ) + }); + } + + #[tracing::instrument(skip_all, level = "debug")] + fn kernel_context_for_node( + &mut self, + node_id: &str, + ) -> Option { + self.shared_state + .lock() + .unwrap() + .graph_contexts + .get(node_id)?; + Some(node_id.to_string()) + } + + #[tracing::instrument(skip(self, ctx), level = "debug")] + fn kernel_context_get_argument( + &mut self, + ctx: &Self::KernelContext, + name: &str, + ) -> Option { + self.shared_state + .lock() + .unwrap() + .graph_contexts + .get(ctx) + .and_then(|c| c.arguments.get(name).and_then(|v| Some(v.clone()))) + } + + #[tracing::instrument(skip(self, ctx), level = "debug")] + fn kernel_context_get_input_tensor( + &mut self, + ctx: &Self::KernelContext, + name: &str, + ) -> Option { + let state = self.shared_state.lock().unwrap(); + + let tensor_id = state + .graph_contexts + .get(ctx) + .and_then(|c| c.input_tensors.get(name).and_then(|v| v.tensor_id)); + + match tensor_id { + Some(i) => state.tensors[i].clone(), + _ => None, + } + } + + #[tracing::instrument(skip(self, ctx, buffer), level = "debug")] + fn kernel_context_set_output_tensor( + &mut self, + ctx: &Self::KernelContext, + name: &str, + TensorParam { + element_type, + buffer, + dimensions, + }: TensorParam<'_>, + ) { + let mut state = self.shared_state.lock().unwrap(); + + let tensor_id = state + .graph_contexts + .get(ctx) + .and_then(|c| c.output_tensors.get(name).and_then(|v| v.tensor_id)); + + let dimensions = dimensions.iter().map(|&i| i.get() as u32).collect(); + + // Todo check tensor constraint + + if tensor_id.is_some() { + state.tensors[tensor_id.unwrap()] = Some(TensorResult { + element_type, + buffer: buffer.to_vec(), + dimensions, + }); + } + } + + fn is_enabled(&mut self, _metadata: LogMetadata) -> bool { + true + } + + fn log( + &mut self, + metadata: LogMetadata, + message: &str, + data: Vec<(&'_ str, LogValue<'_>)>, + ) { + let level = match metadata.level { + LogLevel::Trace => tracing::Level::TRACE, + LogLevel::Debug => tracing::Level::DEBUG, + LogLevel::Info => tracing::Level::INFO, + LogLevel::Warn => tracing::Level::WARN, + LogLevel::Error | LogLevel::Fatal => tracing::Level::ERROR, + }; + + let LogMetadata { + name, + target, + level: _, + file, + line, + module, + } = metadata; + + tracing::event!( + tracing::Level::INFO, + meta.level = %level, + meta.name = %name, + meta.target = target, + meta.file = file, + meta.line = line, + meta.module = module, + ?data, + message, + ); + } + + fn kernel_context_get_global_input( + &mut self, + _ctx: &Self::KernelContext, + _name: &str, + ) -> Option { + todo!() + } + + fn kernel_context_set_global_output( + &mut self, + _ctx: &Self::KernelContext, + _name: &str, + _tensor: TensorParam<'_>, + ) { + todo!() + } + + fn model_load( + &mut self, + _: &str, + _: &[u8], + _: Vec<(&str, &str)>, + ) -> Result { + todo!() + } + + fn model_inputs(&mut self, _: &Self::Model) -> Vec { + todo!() + } + + fn model_outputs(&mut self, _: &Self::Model) -> Vec { + todo!() + } + + fn model_infer( + &mut self, + _: &Self::Model, + _: Vec>, + ) -> Result, ModelInferError> { + todo!() + } +} diff --git a/crates/runtime/src/zune/tflite.rs b/crates/runtime/src/zune/tflite.rs new file mode 100644 index 0000000000..bb97527a8c --- /dev/null +++ b/crates/runtime/src/zune/tflite.rs @@ -0,0 +1,266 @@ +use std::{ + borrow::Cow, + collections::{HashMap, HashSet}, + sync::{Arc, Mutex}, +}; + +use anyhow::{Context, Error}; +use hotg_rune_compiler::parse::ModelStage; +use hotg_rune_core::TFLITE_MIMETYPE; +use hotg_runecoral::{ + AccelerationBackend, ElementType as RuneCoralElementType, InferenceContext, + Tensor as RuneCoralTensor, TensorDescriptor as RuneCoralTensorDescriptor, + TensorMut as RuneCoralTensorMut, +}; + +use crate::zune::{ + get_buffer_size, key, proc_block::Dimensions, ElementType, GraphContext, + State, TensorConstraint, TensorResult, +}; + +pub(crate) struct ModelNode { + context: InferenceContext, + input_tensors: HashSet, + output_tensors: HashSet, + shared_state: Arc>, +} + +impl ModelNode { + #[tracing::instrument( + skip( + node_data, + model_data, + shared_state, + input_tensors, + output_tensors + ), + level = "debug" + )] + pub(crate) fn load( + node_id: &str, + node_data: &ModelStage, + model_data: &[u8], + shared_state: &Arc>, + input_tensors: &HashMap, + output_tensors: &HashMap, + ) -> Result { + // Create Inference Context + let context = InferenceContext::create_context( + TFLITE_MIMETYPE, + &model_data, + AccelerationBackend::NONE, + ) + .with_context(|| { + format!( + "Error Instantiating model from zune for stage: {}", + &node_id + ) + })?; + + let tensor_from_descriptor = + |t: &RuneCoralTensorDescriptor| -> TensorResult { + let element_type = get_element_type(t); + let dimensions = t.shape.iter().map(|&x| x as u32).collect(); + let buffer_size = get_buffer_size(element_type, &dimensions); + + TensorResult { + element_type, + dimensions, + buffer: vec![0; buffer_size], + } + }; + + let tensor_constraint_from_descriptor = + |t: &RuneCoralTensorDescriptor, + tensor_id: usize| + -> TensorConstraint { + let element_type = get_element_type(t); + let dimensions = t.shape.iter().map(|&x| x as usize).collect(); + + TensorConstraint { + tensor_id: Some(tensor_id), + element_type, + dimensions: Dimensions::Fixed(dimensions), + } + }; + + // Returns the list of tensor indices in the State's tensors + let allocate_tensors = |tensor_type: &str, + model_tensors: &mut dyn Iterator< + Item = RuneCoralTensorDescriptor, + >, + pipeline_tensors: &HashMap| + -> Result< + (HashSet, HashMap), + Error, + > { + let mut tensor_indices: HashSet = HashSet::new(); + let mut tensor_constraints: HashMap = + HashMap::new(); + let mut i = 0; + let mut s = shared_state.lock().unwrap(); + + while let Some(model_tensor) = model_tensors.next() { + let tensor_key = key(&node_id, Some(i)); + let tensor_id = + *pipeline_tensors.get(&tensor_key).ok_or_else(|| { + anyhow::anyhow!( + "Unable to find pipeline_tensor for {} tensor \ + with key {}", + &tensor_type, + &tensor_key + ) + })?; + + let tensor_name = model_tensor.name.to_str().ok(); + let tensor_name = match tensor_name { + Some(tensor_name) if tensor_name.len() > 0 => { + tensor_name.to_string() + }, + _ => format!("{}", i).to_string(), + }; + let tensor_constraint = + tensor_constraint_from_descriptor(&model_tensor, tensor_id); + let model_tensor = tensor_from_descriptor(&model_tensor); + + match s.tensors[tensor_id] { + Some(ref t) + if t.dimensions != model_tensor.dimensions + || t.element_type != model_tensor.element_type => + { + anyhow::bail!( + "Pipeline tensor for {} with key {} doesn't match \ + model tensor", + &tensor_type, + &tensor_key + ); + }, + Some(_) => {}, + ref mut other => { + *other = Some(model_tensor); + }, + } + + tensor_indices.insert(tensor_id); + //FIXME: 2 tensors share same name (/empty name) + //then tensor_indices.len() != tensor_constraints.len() + tensor_constraints.insert(tensor_name, tensor_constraint); + + i += 1; + } + + Ok((tensor_indices, tensor_constraints)) + }; + + let (input_tensors, input_tensor_constraints) = + allocate_tensors("input", &mut context.inputs(), &input_tensors)?; + + let (output_tensors, output_tensor_constraints) = allocate_tensors( + "output", + &mut context.outputs(), + &output_tensors, + )?; + + let graph_context = GraphContext { + arguments: node_data + .args + .iter() + .map(|(k, v)| (k.clone(), v.to_string())) + .collect(), + input_tensors: input_tensor_constraints, + output_tensors: output_tensor_constraints, + }; + + shared_state + .lock() + .unwrap() + .graph_contexts + .insert(node_id.to_string(), graph_context); + + Ok(ModelNode { + context, + input_tensors, + output_tensors, + shared_state: shared_state.clone(), + }) + } + + #[tracing::instrument(skip_all, level = "debug")] + pub(crate) fn run(&mut self) -> Result<(), Error> { + // We are recreating the input_tensors and output_tensors every time + // before predict because wasm linear memory might have changed + // the locations TODO: There's an optimization that can happen + // here.. but just not yet + let mut inputs: Vec = Vec::new(); + let mut outputs: Vec = Vec::new(); + let mut state = self.shared_state.lock().unwrap(); + + state.tensors.iter_mut().enumerate().for_each(|(i, t)| { + if self.input_tensors.contains(&i) { + let pipeline_tensor = t.as_mut().unwrap(); + unsafe { + inputs.push(RuneCoralTensor { + element_type: get_runecoral_element_type( + &pipeline_tensor.element_type, + ), + shape: Cow::Borrowed(std::slice::from_raw_parts( + pipeline_tensor.dimensions.as_ptr() as *const i32, + pipeline_tensor.dimensions.len(), + )), + buffer: &pipeline_tensor.buffer, + }) + } + } else if self.output_tensors.contains(&i) { + let pipeline_tensor = t.as_mut().unwrap(); + unsafe { + outputs.push(RuneCoralTensorMut { + element_type: get_runecoral_element_type( + &pipeline_tensor.element_type, + ), + shape: Cow::Borrowed(std::slice::from_raw_parts( + pipeline_tensor.dimensions.as_ptr() as *const i32, + pipeline_tensor.dimensions.len(), + )), + buffer: &mut pipeline_tensor.buffer, + }) + } + } else { + // Do nothing + } + }); + + self.context + .infer(&inputs, &mut outputs) + .map_err(Error::from) + } +} + +fn get_element_type(t: &RuneCoralTensorDescriptor) -> ElementType { + match t.element_type { + RuneCoralElementType::UInt8 => ElementType::U8, + RuneCoralElementType::Int8 => ElementType::I8, + RuneCoralElementType::Int16 => ElementType::I16, + RuneCoralElementType::Int32 => ElementType::I32, + RuneCoralElementType::Float32 => ElementType::F32, + RuneCoralElementType::Int64 => ElementType::I64, + RuneCoralElementType::Float64 => ElementType::F64, + RuneCoralElementType::String => ElementType::Utf8, + // TODO: Implement support for all the element types + _ => ElementType::U8, + } +} + +fn get_runecoral_element_type(t: &ElementType) -> RuneCoralElementType { + match t { + ElementType::U8 => RuneCoralElementType::UInt8, + ElementType::I8 => RuneCoralElementType::Int8, + ElementType::I16 => RuneCoralElementType::Int16, + ElementType::I32 => RuneCoralElementType::Int32, + ElementType::F32 => RuneCoralElementType::Float32, + ElementType::I64 => RuneCoralElementType::Int64, + ElementType::F64 => RuneCoralElementType::Float64, + ElementType::Utf8 => RuneCoralElementType::String, + // TODO: Implement support for all the element types + _ => RuneCoralElementType::NoType, + } +} From cf7deedec1aec0bebb332edfccc3adf193076e4e Mon Sep 17 00:00:00 2001 From: Michael-F-Bryan Date: Sat, 4 Jun 2022 03:21:25 +0800 Subject: [PATCH 2/6] Put ProcBlockNode and ModelNode behind a trait --- crates/runtime/src/zune/mod.rs | 132 ++++++++++++++------------ crates/runtime/src/zune/proc_block.rs | 10 +- crates/runtime/src/zune/tflite.rs | 6 +- 3 files changed, 79 insertions(+), 69 deletions(-) diff --git a/crates/runtime/src/zune/mod.rs b/crates/runtime/src/zune/mod.rs index b0c4402852..2b0d6ad75a 100644 --- a/crates/runtime/src/zune/mod.rs +++ b/crates/runtime/src/zune/mod.rs @@ -22,8 +22,12 @@ use crate::{ wit_bindgen_wasmer::export!("../../../wit-files/rune/runtime-v1.wit"); wit_bindgen_wasmer::import!("../../../wit-files/rune/proc-block-v1.wit"); +pub(crate) trait Node { + fn run(&mut self) -> Result<(), Error>; +} + #[derive(Debug, Default, Clone, wasmer::WasmerEnv)] -struct Runtime { +pub(crate) struct Runtime { shared_state: Arc>, } @@ -37,10 +41,7 @@ pub(crate) struct State { pub struct ZuneEngine { input_nodes: Vec, output_nodes: Vec, - #[cfg(feature = "tflite")] - models: HashMap, - procblocks: HashMap, - pipeline: IndexMap, + nodes: HashMap>, processing_order: Vec, shared_state: Arc>, // resources } @@ -126,7 +127,7 @@ impl ZuneEngine { tracing::trace!(?input_tensors, ?output_tensors, "Loaded tensors"); - let (model_contexts, procblock_contexts) = instantiate_nodes( + let nodes = instantiate_nodes( pipeline, read_zip_resource_by_path, &shared_state, @@ -142,9 +143,7 @@ impl ZuneEngine { Ok(ZuneEngine { input_nodes: inputs, output_nodes: outputs, - models: model_contexts, - procblocks: procblock_contexts, - pipeline: pipeline.to_owned(), + nodes, processing_order, shared_state, }) @@ -156,24 +155,7 @@ impl ZuneEngine { let _span = tracing::debug_span!("Running Stage", %stage_name).entered(); - let stage = self.pipeline.get(stage_name).unwrap(); - match stage { - #[cfg(feature = "tflite")] - Stage::Model(_) => { - self.models.get_mut(stage_name).unwrap().run()?; - }, - #[cfg(not(feature = "tflite"))] - Stage::Model(_) => { - anyhow::bail!( - "Unable to run \"{}\" because models aren't supported", - stage_name - ); - }, - Stage::Capability(_) | Stage::ProcBlock(_) => { - self.procblocks.get_mut(stage_name).unwrap().run()?; - }, - _ => {}, - } + self.nodes.get_mut(stage_name).unwrap().run()?; } Ok(()) } @@ -341,10 +323,8 @@ fn instantiate_nodes( shared_state: &Arc>, input_tensors: HashMap, output_tensors: HashMap, -) -> Result<(HashMap, HashMap), Error> -{ - let mut models: HashMap = HashMap::new(); - let mut procblocks: HashMap = HashMap::new(); +) -> Result>, Error> { + let mut nodes: HashMap> = HashMap::new(); let runtime = Runtime { shared_state: shared_state.clone(), @@ -360,16 +340,14 @@ fn instantiate_nodes( read_zip_resource_by_path(&stage.capability.to_string()) .context("Unable to load the capability")?; - procblocks.insert( - stage_name.to_string(), - ProcBlockNode::load( - &stage_name, - &wasm, - &runtime, - &input_tensors, - &output_tensors, - )?, - ); + let pb = ProcBlockNode::load( + &stage_name, + &wasm, + &runtime, + &input_tensors, + &output_tensors, + )?; + nodes.insert(stage_name.to_string(), Box::new(pb)); }, Stage::Model(stage) => { // Instantiating the model's inference context here because that @@ -384,40 +362,68 @@ fn instantiate_nodes( ) })?; - models.insert( - stage_name.to_string(), - ModelNode::load( - &stage_name, - &stage, - &model_data, - &shared_state, - &input_tensors, - &output_tensors, - )?, - ); + let model_format = + stage.args.get("model-format").map(|f| f.to_string()); + let node = load_model( + &model_data, + model_format.as_deref(), + stage_name, + stage, + shared_state, + &input_tensors, + &output_tensors, + )?; + nodes.insert(stage_name.to_string(), node); }, Stage::ProcBlock(stage) => { let wasm = read_zip_resource_by_path(&stage.proc_block.to_string()) .context("Unable to load the proc_block")?; - procblocks.insert( - stage_name.to_string(), - ProcBlockNode::load( - &stage_name, - &wasm, - &runtime, - &input_tensors, - &output_tensors, - )?, - ); + let pb = ProcBlockNode::load( + &stage_name, + &wasm, + &runtime, + &input_tensors, + &output_tensors, + )?; + nodes.insert(stage_name.to_string(), Box::new(pb)); }, _ => {}, // Do nothing for capabilities/outputs } } - Ok((models, procblocks)) + Ok(nodes) +} + +fn load_model( + model_data: &[u8], + model_format: Option<&str>, + stage_name: &str, + stage: &ModelStage, + shared_state: &Arc>, + input_tensors: &HashMap, + output_tensors: &HashMap, +) -> Result, Error> { + match model_format { + #[cfg(feature = "tflite")] + Some("tflite") | None => { + let model = tflite::ModelNode::load( + stage_name, + stage, + model_data, + shared_state, + input_tensors, + output_tensors, + )?; + + Ok(Box::new(model)) + }, + #[cfg(not(feature = "tflite"))] + None => anyhow::bail!("Unsupported model format, \"tflite\""), + Some(other) => anyhow::bail!("Unsupported model format, \"{}\"", other), + } } fn get_tensors( diff --git a/crates/runtime/src/zune/proc_block.rs b/crates/runtime/src/zune/proc_block.rs index c6beeac06b..85f97320e6 100644 --- a/crates/runtime/src/zune/proc_block.rs +++ b/crates/runtime/src/zune/proc_block.rs @@ -8,7 +8,7 @@ use wasmer::{ImportObject, Module, Store}; use crate::zune::{ key, runtime_v1, ArgumentType, DimensionsParam, ElementType, KernelError, - LogLevel, LogMetadata, LogValue, ModelInferError, ModelLoadError, + LogLevel, LogMetadata, LogValue, ModelInferError, ModelLoadError, Node, ProcBlockV1, Runtime, State, TensorParam, TensorResult, }; @@ -77,9 +77,11 @@ impl ProcBlockNode { shared_state: shared_state.clone(), }) } +} +impl Node for ProcBlockNode { #[tracing::instrument(skip_all, level = "debug")] - pub(crate) fn run(&mut self) -> Result<(), Error> { + fn run(&mut self) -> Result<(), Error> { println!("Executing proc block: {:?} ", self.node_id); self.context.kernel(&self.node_id)?.map_err(|e| match e { KernelError::Other(s) => Error::msg(s), @@ -109,7 +111,7 @@ impl ProcBlockNode { pub enum Never {} #[derive(Debug, Clone)] -struct Metadata { +pub(crate) struct Metadata { description: String, repository: String, homepage: String, @@ -127,7 +129,7 @@ struct ArgumentMetadata { } #[derive(Debug, Clone)] -struct TensorMetadata {} +pub(crate) struct TensorMetadata {} #[derive(Debug, Clone)] pub(crate) enum Dimensions { diff --git a/crates/runtime/src/zune/tflite.rs b/crates/runtime/src/zune/tflite.rs index bb97527a8c..7ddb94e0c8 100644 --- a/crates/runtime/src/zune/tflite.rs +++ b/crates/runtime/src/zune/tflite.rs @@ -15,7 +15,7 @@ use hotg_runecoral::{ use crate::zune::{ get_buffer_size, key, proc_block::Dimensions, ElementType, GraphContext, - State, TensorConstraint, TensorResult, + Node, State, TensorConstraint, TensorResult, }; pub(crate) struct ModelNode { @@ -184,9 +184,11 @@ impl ModelNode { shared_state: shared_state.clone(), }) } +} +impl Node for ModelNode { #[tracing::instrument(skip_all, level = "debug")] - pub(crate) fn run(&mut self) -> Result<(), Error> { + fn run(&mut self) -> Result<(), Error> { // We are recreating the input_tensors and output_tensors every time // before predict because wasm linear memory might have changed // the locations TODO: There's an optimization that can happen From 58c9137a223580cbcb04803b2f9fe5f8c3be6410 Mon Sep 17 00:00:00 2001 From: Michael-F-Bryan Date: Sat, 4 Jun 2022 03:41:16 +0800 Subject: [PATCH 3/6] Implemented std::error::Error for KernelError and other minor cleanups --- crates/runtime/src/zune/mod.rs | 119 ++++++++++++++++++++++---- crates/runtime/src/zune/proc_block.rs | 25 +----- 2 files changed, 106 insertions(+), 38 deletions(-) diff --git a/crates/runtime/src/zune/mod.rs b/crates/runtime/src/zune/mod.rs index 2b0d6ad75a..1c68971c02 100644 --- a/crates/runtime/src/zune/mod.rs +++ b/crates/runtime/src/zune/mod.rs @@ -4,6 +4,7 @@ mod tflite; use std::{ collections::HashMap, + fmt::{self, Display, Formatter}, io::{Cursor, Read}, sync::{Arc, Mutex}, }; @@ -63,7 +64,7 @@ impl ZuneEngine { })?; let mut buffer = Vec::new(); requested_file.read_to_end(&mut buffer).with_context(|| { - anyhow!("Unable to read {} from zune", path) + format!("Unable to read {} from zune", path) })?; Ok(buffer) }; @@ -95,7 +96,7 @@ impl ZuneEngine { let (tensors, input_tensors, output_tensors, processing_order) = get_tensors(&inputs, &outputs, &pipeline) - .context(anyhow!("Unable to map out input/output tensors"))?; + .context("Unable to map out input/output tensors")?; let graph_contexts = pipeline .iter() @@ -160,11 +161,11 @@ impl ZuneEngine { Ok(()) } - pub fn input_nodes(&self) -> &'_ Vec { + pub fn input_nodes(&self) -> &[String] { return &self.input_nodes; } - pub fn output_nodes(&self) -> &'_ Vec { + pub fn output_nodes(&self) -> &[String] { return &self.output_nodes; } @@ -184,7 +185,7 @@ impl ZuneEngine { .collect(); Some(tensor_list) }) - .ok_or(anyhow!("Unable to get input tensors")) + .context("Unable to get input tensors") } pub fn get_input_tensor( @@ -241,7 +242,7 @@ impl ZuneEngine { .collect(); Some(tensor_list) }) - .ok_or(anyhow!("Unable to get input tensors")) + .context("Unable to get input tensors") } pub fn get_output_tensor( @@ -356,7 +357,7 @@ fn instantiate_nodes( let model_data = read_zip_resource_by_path(&stage.model.to_string()) .with_context(|| { - anyhow!( + format!( "Unable to read model from zune {}", stage.model ) @@ -366,7 +367,7 @@ fn instantiate_nodes( stage.args.get("model-format").map(|f| f.to_string()); let node = load_model( &model_data, - model_format.as_deref(), + model_format.as_deref().unwrap_or("tflite"), stage_name, stage, shared_state, @@ -399,7 +400,7 @@ fn instantiate_nodes( fn load_model( model_data: &[u8], - model_format: Option<&str>, + model_format: &str, stage_name: &str, stage: &ModelStage, shared_state: &Arc>, @@ -408,7 +409,7 @@ fn load_model( ) -> Result, Error> { match model_format { #[cfg(feature = "tflite")] - Some("tflite") | None => { + "tflite" | hotg_rune_core::TFLITE_MIMETYPE => { let model = tflite::ModelNode::load( stage_name, stage, @@ -420,9 +421,7 @@ fn load_model( Ok(Box::new(model)) }, - #[cfg(not(feature = "tflite"))] - None => anyhow::bail!("Unsupported model format, \"tflite\""), - Some(other) => anyhow::bail!("Unsupported model format, \"{}\"", other), + other => anyhow::bail!("Unsupported model format, \"{}\"", other), } } @@ -491,9 +490,10 @@ fn get_tensors( for i in 0..item.1.inputs().len() { let input = &item.1.inputs()[i]; let input_key = key(&input.name, input.index); - let &input_tensor_index = output_tensors.get(&input_key).context( - anyhow!("Invalid input key specified: {}", &input_key), - )?; + let &input_tensor_index = + output_tensors.get(&input_key).with_context(|| { + format!("Invalid input key specified: {}", &input_key) + })?; input_tensors.insert(key(stage_name, Some(i)), input_tensor_index); } } @@ -506,3 +506,90 @@ fn get_tensors( fn key(node_name: &str, tensor_index: Option) -> String { format!("{}.{}", node_name, tensor_index.or(Some(0)).unwrap()) } + +impl std::error::Error for proc_block_v1::GraphError { + fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { + match self { + GraphError::InvalidArgument(InvalidArgument { reason, .. }) => { + Some(reason) + }, + _ => None, + } + } +} + +impl Display for proc_block_v1::GraphError { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + match self { + GraphError::Other(msg) => Display::fmt(msg, f), + GraphError::InvalidArgument(InvalidArgument { name, .. }) => { + write!(f, "The \"{}\" argument is invalid", name) + }, + GraphError::MissingContext => { + write!(f, "Unable to retrieve the graph context") + }, + } + } +} + +impl std::error::Error for proc_block_v1::KernelError { + fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { + match self { + KernelError::InvalidArgument(InvalidArgument { + reason, .. + }) => Some(reason), + KernelError::InvalidInput(InvalidInput { reason, .. }) => { + Some(reason) + }, + _ => None, + } + } +} + +impl Display for proc_block_v1::KernelError { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + match self { + KernelError::Other(s) => Display::fmt(s, f), + KernelError::InvalidArgument(InvalidArgument { name, .. }) => { + write!(f, "The \"{}\" argument is invalid", name) + }, + KernelError::InvalidInput(InvalidInput { name, .. }) => { + write!(f, "The \"{}\" input is invalid", name) + }, + KernelError::MissingContext => { + write!(f, "Unable to retrieve the kernel context") + }, + } + } +} + +impl std::error::Error for BadArgumentReason {} + +impl Display for BadArgumentReason { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + match self { + BadArgumentReason::Other(msg) => Display::fmt(msg, f), + BadArgumentReason::NotFound => write!(f, "Argument not found"), + BadArgumentReason::InvalidValue(msg) => { + write!(f, "Invalid argument value: {}", msg) + }, + } + } +} + +impl std::error::Error for BadInputReason {} + +impl Display for BadInputReason { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + match self { + BadInputReason::Other(msg) => Display::fmt(msg, f), + BadInputReason::NotFound => write!(f, "Input not found"), + BadInputReason::UnsupportedShape => { + write!(f, "Unsupported tensor shape") + }, + BadInputReason::InvalidValue(msg) => { + write!(f, "Invalid argument value: {}", msg) + }, + } + } +} diff --git a/crates/runtime/src/zune/proc_block.rs b/crates/runtime/src/zune/proc_block.rs index 85f97320e6..86c414daaa 100644 --- a/crates/runtime/src/zune/proc_block.rs +++ b/crates/runtime/src/zune/proc_block.rs @@ -82,28 +82,9 @@ impl ProcBlockNode { impl Node for ProcBlockNode { #[tracing::instrument(skip_all, level = "debug")] fn run(&mut self) -> Result<(), Error> { - println!("Executing proc block: {:?} ", self.node_id); - self.context.kernel(&self.node_id)?.map_err(|e| match e { - KernelError::Other(s) => Error::msg(s), - KernelError::InvalidArgument(a) => { - anyhow::anyhow!( - "Invalid argument for {}: {}", - &self.node_id, - a.name - ) - }, - KernelError::InvalidInput(i) => { - anyhow::anyhow!( - "Invalid input for {}: {}", - &self.node_id, - i.name - ) - }, - KernelError::MissingContext => anyhow::anyhow!( - "Unable to retrieve kernel context for {}:", - &self.node_id - ), - }) + self.context.kernel(&self.node_id)??; + + Ok(()) } } From 3ca2b049b4dcffd13a77a2b05529ae3ed49e4dcc Mon Sep 17 00:00:00 2001 From: Michael-F-Bryan Date: Sat, 4 Jun 2022 03:45:19 +0800 Subject: [PATCH 4/6] Removing some dead code --- crates/runtime/src/zune/proc_block.rs | 8 +++----- crates/runtime/src/zune/tflite.rs | 11 +++-------- 2 files changed, 6 insertions(+), 13 deletions(-) diff --git a/crates/runtime/src/zune/proc_block.rs b/crates/runtime/src/zune/proc_block.rs index 86c414daaa..982d6fe029 100644 --- a/crates/runtime/src/zune/proc_block.rs +++ b/crates/runtime/src/zune/proc_block.rs @@ -7,15 +7,14 @@ use anyhow::{Context, Error}; use wasmer::{ImportObject, Module, Store}; use crate::zune::{ - key, runtime_v1, ArgumentType, DimensionsParam, ElementType, KernelError, - LogLevel, LogMetadata, LogValue, ModelInferError, ModelLoadError, Node, - ProcBlockV1, Runtime, State, TensorParam, TensorResult, + key, runtime_v1, ArgumentType, DimensionsParam, ElementType, LogLevel, + LogMetadata, LogValue, ModelInferError, ModelLoadError, Node, ProcBlockV1, + Runtime, TensorParam, TensorResult, }; pub(crate) struct ProcBlockNode { node_id: String, context: ProcBlockV1, - shared_state: Arc>, } impl ProcBlockNode { @@ -74,7 +73,6 @@ impl ProcBlockNode { Ok(ProcBlockNode { node_id: node_id.to_string(), context: pb, - shared_state: shared_state.clone(), }) } } diff --git a/crates/runtime/src/zune/tflite.rs b/crates/runtime/src/zune/tflite.rs index 7ddb94e0c8..b2ffe96ecb 100644 --- a/crates/runtime/src/zune/tflite.rs +++ b/crates/runtime/src/zune/tflite.rs @@ -103,13 +103,8 @@ impl ModelNode { while let Some(model_tensor) = model_tensors.next() { let tensor_key = key(&node_id, Some(i)); let tensor_id = - *pipeline_tensors.get(&tensor_key).ok_or_else(|| { - anyhow::anyhow!( - "Unable to find pipeline_tensor for {} tensor \ - with key {}", - &tensor_type, - &tensor_key - ) + *pipeline_tensors.get(&tensor_key).with_context(|| { + format!( "Unable to find pipeline_tensor for {tensor_type} tensor with key {tensor_key}") })?; let tensor_name = model_tensor.name.to_str().ok(); @@ -117,7 +112,7 @@ impl ModelNode { Some(tensor_name) if tensor_name.len() > 0 => { tensor_name.to_string() }, - _ => format!("{}", i).to_string(), + _ => i.to_string(), }; let tensor_constraint = tensor_constraint_from_descriptor(&model_tensor, tensor_id); From d3a3fcf10a04269e137bafe2f2a6863450bc89b7 Mon Sep 17 00:00:00 2001 From: Michael-F-Bryan Date: Sat, 4 Jun 2022 04:45:18 +0800 Subject: [PATCH 5/6] The "zune" module was never put behind its feature flag --- crates/runtime/src/lib.rs | 1 + crates/runtime/src/models/mod.rs | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/crates/runtime/src/lib.rs b/crates/runtime/src/lib.rs index 8fe1e6f682..54dbbe7684 100644 --- a/crates/runtime/src/lib.rs +++ b/crates/runtime/src/lib.rs @@ -32,6 +32,7 @@ mod tensor; pub mod builtins; mod outputs; +#[cfg(feature = "zune")] pub mod zune; pub use crate::{ diff --git a/crates/runtime/src/models/mod.rs b/crates/runtime/src/models/mod.rs index 59ec7773ce..f92f7f8ad1 100644 --- a/crates/runtime/src/models/mod.rs +++ b/crates/runtime/src/models/mod.rs @@ -15,7 +15,7 @@ use crate::callbacks::{Model, ModelMetadata}; /// /// Supported formats are: /// - TensorFlow Lite -#[cfg_attr(not(feature = "tflite"), doc("(not supported)"))] +#[cfg_attr(not(feature = "tflite"), doc = "(not supported)")] pub fn default_model_handler( _id: u32, meta: &ModelMetadata<'_>, From 93d939f209c7db02798fcc370b535644ba218078 Mon Sep 17 00:00:00 2001 From: Michael-F-Bryan Date: Sat, 4 Jun 2022 05:21:57 +0800 Subject: [PATCH 6/6] Updating the path too the wit-files folder --- crates/runtime/src/zune/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/runtime/src/zune/mod.rs b/crates/runtime/src/zune/mod.rs index 1c68971c02..7501538509 100644 --- a/crates/runtime/src/zune/mod.rs +++ b/crates/runtime/src/zune/mod.rs @@ -20,8 +20,8 @@ use crate::{ LoadError, }; -wit_bindgen_wasmer::export!("../../../wit-files/rune/runtime-v1.wit"); -wit_bindgen_wasmer::import!("../../../wit-files/rune/proc-block-v1.wit"); +wit_bindgen_wasmer::export!("../../wit-files/rune/runtime-v1.wit"); +wit_bindgen_wasmer::import!("../../wit-files/rune/proc-block-v1.wit"); pub(crate) trait Node { fn run(&mut self) -> Result<(), Error>;