From 7ad5bd66d05f70d5694752324aff350dcfb8d5c4 Mon Sep 17 00:00:00 2001 From: Brooks Townsend Date: Mon, 8 Feb 2021 13:11:16 -0500 Subject: [PATCH] Updating in-memory keyvalue to wasmcloud:keyvalue contract (#11) * replaced all references to wascc-codec::keyvalue to interface * addressed clippy warnings, removed unused variables --- keyvalue-provider/Cargo.toml | 12 ++- keyvalue-provider/README.md | 2 +- keyvalue-provider/src/kv.rs | 31 +++--- keyvalue-provider/src/lib.rs | 201 +++++++++++++++++------------------ 4 files changed, 123 insertions(+), 123 deletions(-) diff --git a/keyvalue-provider/Cargo.toml b/keyvalue-provider/Cargo.toml index a33778c9..5b9b0d99 100644 --- a/keyvalue-provider/Cargo.toml +++ b/keyvalue-provider/Cargo.toml @@ -1,7 +1,7 @@ [package] -name = "keyvalue" -version = "0.3.0" -authors = ["Kevin Hoffman "] +name = "inmemory-keyvalue" +version = "0.4.0" +authors = ["wasmCloud Team"] edition = "2018" [lib] @@ -12,6 +12,8 @@ crate-type = ["cdylib", "rlib"] static_plugin = [] [dependencies] -wascc-codec = { version = "0.7.0", path = "../../wascc-codec" } +wascc-codec = "0.9.0" log = "0.4.8" -env_logger = "0.7.1" \ No newline at end of file +env_logger = "0.7.1" +actor-keyvalue = { git = "https://github.com/wasmcloud/actor-interfaces", branch = "main"} +actor-core = { git = "https://github.com/wasmcloud/actor-interfaces", branch = "main"} \ No newline at end of file diff --git a/keyvalue-provider/README.md b/keyvalue-provider/README.md index 0da3d938..41768b35 100644 --- a/keyvalue-provider/README.md +++ b/keyvalue-provider/README.md @@ -1,5 +1,5 @@ # Key-Value Capability Provider -This is a tutorial capability provider. It supplies an implementation for the `wascc:keyvalue` capability ID and can be used as a testing alternative for the **Redis** capability provider. The usual caveats should apply - this provider's data only lasts as long as the host runtime, so it should only be used for isolated testing and experimentation, and obviously won't provide distributed cached data in production. +This is a tutorial capability provider. It supplies an implementation for the `wasmcloud:keyvalue` capability ID and can be used as a testing alternative for the **Redis** capability provider. The usual caveats should apply - this provider's data only lasts as long as the host runtime, so it should only be used for isolated testing and experimentation, and obviously won't provide distributed cached data in production. This provider was created using the `new-provider-template` cargo generation template as a starter. diff --git a/keyvalue-provider/src/kv.rs b/keyvalue-provider/src/kv.rs index abe84f87..859eb8e5 100644 --- a/keyvalue-provider/src/kv.rs +++ b/keyvalue-provider/src/kv.rs @@ -21,7 +21,7 @@ impl KeyValueStore { } } - pub fn incr(&mut self, key: &str, value: i32) -> Result> { + pub fn incr(&mut self, key: &str, value: i32) -> Result> { let mut orig = 0; self.items .entry(key.to_string()) @@ -35,16 +35,16 @@ impl KeyValueStore { Ok(orig + value) } - pub fn del(&mut self, key: &str) -> Result<(), Box> { + pub fn del(&mut self, key: &str) -> Result<(), Box> { self.items.remove(key); Ok(()) } - pub fn exists(&self, key: &str) -> Result> { + pub fn exists(&self, key: &str) -> Result> { Ok(self.items.contains_key(key)) } - pub fn get(&self, key: &str) -> Result> { + pub fn get(&self, key: &str) -> Result> { self.items.get(key).map_or_else( || Err("No such key".into()), |v| { @@ -57,7 +57,12 @@ impl KeyValueStore { ) } - pub fn lrange(&self, key: &str, start: i32, stop: i32) -> Result, Box> { + pub fn lrange( + &self, + key: &str, + start: i32, + stop: i32, + ) -> Result, Box> { let start = start.max(0); self.items.get(key).map_or_else( || Ok(vec![]), @@ -72,7 +77,7 @@ impl KeyValueStore { ) } - pub fn lpush(&mut self, key: &str, value: String) -> Result> { + pub fn lpush(&mut self, key: &str, value: String) -> Result> { let mut len = 1; self.items .entry(key.to_string()) @@ -89,7 +94,7 @@ impl KeyValueStore { Ok(len as _) } - pub fn set(&mut self, key: &str, value: String) -> Result<(), Box> { + pub fn set(&mut self, key: &str, value: String) -> Result<(), Box> { self.items .entry(key.to_string()) .and_modify(|v| { @@ -101,7 +106,7 @@ impl KeyValueStore { Ok(()) } - pub fn lrem(&mut self, key: &str, value: String) -> Result> { + pub fn lrem(&mut self, key: &str, value: String) -> Result> { let mut len: i32 = 0; self.items.entry(key.to_string()).and_modify(|v| { if let KeyValueItem::List(ref l) = v { @@ -117,7 +122,7 @@ impl KeyValueStore { Ok(len) } - pub fn sadd(&mut self, key: &str, value: String) -> Result> { + pub fn sadd(&mut self, key: &str, value: String) -> Result> { let mut len: i32 = 1; self.items .entry(key.to_string()) @@ -131,7 +136,7 @@ impl KeyValueStore { Ok(len) } - pub fn srem(&mut self, key: &str, value: String) -> Result> { + pub fn srem(&mut self, key: &str, value: String) -> Result> { let mut len: i32 = 0; self.items .entry(key.to_string()) @@ -145,7 +150,7 @@ impl KeyValueStore { Ok(len) } - pub fn sunion(&self, keys: Vec) -> Result, Box> { + pub fn sunion(&self, keys: Vec) -> Result, Box> { let union = self .items .iter() @@ -165,7 +170,7 @@ impl KeyValueStore { Ok(union.iter().cloned().collect()) } - pub fn sinter(&self, keys: Vec) -> Result, Box> { + pub fn sinter(&self, keys: Vec) -> Result, Box> { let sets: Vec> = self .items .iter() @@ -188,7 +193,7 @@ impl KeyValueStore { Ok(inter.cloned().collect()) } - pub fn smembers(&self, key: String) -> Result, Box> { + pub fn smembers(&self, key: String) -> Result, Box> { self.items.get(&key).map_or_else( || Ok(vec![]), |v| { diff --git a/keyvalue-provider/src/lib.rs b/keyvalue-provider/src/lib.rs index 883b9447..5f0bbd16 100644 --- a/keyvalue-provider/src/lib.rs +++ b/keyvalue-provider/src/lib.rs @@ -1,47 +1,37 @@ #[macro_use] extern crate wascc_codec as codec; - #[macro_use] extern crate log; mod kv; - use crate::kv::KeyValueStore; -use codec::capabilities::{ - CapabilityDescriptor, CapabilityProvider, Dispatcher, NullDispatcher, OperationDirection, - OP_GET_CAPABILITY_DESCRIPTOR, -}; +use actor_core::CapabilityConfiguration; +use actor_keyvalue::*; +use codec::capabilities::{CapabilityProvider, Dispatcher, NullDispatcher}; use codec::core::{OP_BIND_ACTOR, OP_REMOVE_ACTOR}; -use codec::keyvalue; -use codec::keyvalue::*; -use wascc_codec::core::CapabilityConfiguration; -use wascc_codec::{deserialize, serialize}; - use std::error::Error; -use std::sync::RwLock; +use std::sync::{Arc, RwLock}; +use wascc_codec::{deserialize, serialize}; #[cfg(not(feature = "static_plugin"))] capability_provider!(KeyvalueProvider, KeyvalueProvider::new); -const CAPABILITY_ID: &str = "wascc:keyvalue"; +#[allow(unused)] +const CAPABILITY_ID: &str = "wasmcloud:keyvalue"; const SYSTEM_ACTOR: &str = "system"; -const VERSION: &str = env!("CARGO_PKG_VERSION"); -const REVISION: u32 = 0; // Increment for each crates publish +#[derive(Clone)] pub struct KeyvalueProvider { - dispatcher: RwLock>, - store: RwLock, + dispatcher: Arc>>, + store: Arc>, } impl Default for KeyvalueProvider { fn default() -> Self { - match env_logger::try_init() { - Ok(_) => {} - Err(_) => {} - }; + if env_logger::try_init().is_ok() {}; KeyvalueProvider { - dispatcher: RwLock::new(Box::new(NullDispatcher::new())), - store: RwLock::new(KeyValueStore::new()), + dispatcher: Arc::new(RwLock::new(Box::new(NullDispatcher::new()))), + store: Arc::new(RwLock::new(KeyValueStore::new())), } } } @@ -51,17 +41,23 @@ impl KeyvalueProvider { Self::default() } - fn configure(&self, _config: CapabilityConfiguration) -> Result, Box> { + fn configure( + &self, + _config: CapabilityConfiguration, + ) -> Result, Box> { // Do nothing here Ok(vec![]) } - fn remove_actor(&self, _config: CapabilityConfiguration) -> Result, Box> { + fn remove_actor( + &self, + _config: CapabilityConfiguration, + ) -> Result, Box> { // Do nothing here Ok(vec![]) } - fn add(&self, _actor: &str, req: AddRequest) -> Result, Box> { + fn add(&self, _actor: &str, req: AddArgs) -> Result, Box> { let mut store = self.store.write().unwrap(); let res: i32 = store.incr(&req.key, req.value)?; let resp = AddResponse { value: res }; @@ -69,7 +65,7 @@ impl KeyvalueProvider { Ok(serialize(resp)?) } - fn del(&self, _actor: &str, req: DelRequest) -> Result, Box> { + fn del(&self, _actor: &str, req: DelArgs) -> Result, Box> { let mut store = self.store.write().unwrap(); store.del(&req.key)?; let resp = DelResponse { key: req.key }; @@ -77,7 +73,7 @@ impl KeyvalueProvider { Ok(serialize(resp)?) } - fn get(&self, _actor: &str, req: GetRequest) -> Result, Box> { + fn get(&self, _actor: &str, req: GetArgs) -> Result, Box> { let store = self.store.read().unwrap(); if !store.exists(&req.key)? { Ok(serialize(GetResponse { @@ -102,23 +98,35 @@ impl KeyvalueProvider { } } - fn list_clear(&self, actor: &str, req: ListClearRequest) -> Result, Box> { - self.del(actor, DelRequest { key: req.key }) + fn list_clear( + &self, + actor: &str, + req: ClearArgs, + ) -> Result, Box> { + self.del(actor, DelArgs { key: req.key }) } - fn list_range(&self, _actor: &str, req: ListRangeRequest) -> Result, Box> { + fn list_range( + &self, + _actor: &str, + req: RangeArgs, + ) -> Result, Box> { let store = self.store.read().unwrap(); let result: Vec = store.lrange(&req.key, req.start as _, req.stop as _)?; Ok(serialize(ListRangeResponse { values: result })?) } - fn list_push(&self, _actor: &str, req: ListPushRequest) -> Result, Box> { + fn list_push( + &self, + _actor: &str, + req: PushArgs, + ) -> Result, Box> { let mut store = self.store.write().unwrap(); let result: i32 = store.lpush(&req.key, req.value)?; Ok(serialize(ListResponse { new_count: result })?) } - fn set(&self, _actor: &str, req: SetRequest) -> Result, Box> { + fn set(&self, _actor: &str, req: SetArgs) -> Result, Box> { let mut store = self.store.write().unwrap(); store.set(&req.key, req.value.clone())?; Ok(serialize(SetResponse { value: req.value })?) @@ -127,26 +135,38 @@ impl KeyvalueProvider { fn list_del_item( &self, _actor: &str, - req: ListDelItemRequest, - ) -> Result, Box> { + req: ListItemDeleteArgs, + ) -> Result, Box> { let mut store = self.store.write().unwrap(); let result: i32 = store.lrem(&req.key, req.value)?; Ok(serialize(ListResponse { new_count: result })?) } - fn set_add(&self, _actor: &str, req: SetAddRequest) -> Result, Box> { + fn set_add( + &self, + _actor: &str, + req: SetAddArgs, + ) -> Result, Box> { let mut store = self.store.write().unwrap(); let result: i32 = store.sadd(&req.key, req.value)?; Ok(serialize(SetOperationResponse { new_count: result })?) } - fn set_remove(&self, _actor: &str, req: SetRemoveRequest) -> Result, Box> { + fn set_remove( + &self, + _actor: &str, + req: SetRemoveArgs, + ) -> Result, Box> { let mut store = self.store.write().unwrap(); let result: i32 = store.srem(&req.key, req.value)?; Ok(serialize(SetOperationResponse { new_count: result })?) } - fn set_union(&self, _actor: &str, req: SetUnionRequest) -> Result, Box> { + fn set_union( + &self, + _actor: &str, + req: SetUnionArgs, + ) -> Result, Box> { let store = self.store.read().unwrap(); let result: Vec = store.sunion(req.keys)?; Ok(serialize(SetQueryResponse { values: result })?) @@ -155,20 +175,28 @@ impl KeyvalueProvider { fn set_intersect( &self, _actor: &str, - req: SetIntersectionRequest, - ) -> Result, Box> { + req: SetIntersectionArgs, + ) -> Result, Box> { let store = self.store.read().unwrap(); let result: Vec = store.sinter(req.keys)?; Ok(serialize(SetQueryResponse { values: result })?) } - fn set_query(&self, _actor: &str, req: SetQueryRequest) -> Result, Box> { + fn set_query( + &self, + _actor: &str, + req: SetQueryArgs, + ) -> Result, Box> { let store = self.store.read().unwrap(); let result: Vec = store.smembers(req.key)?; Ok(serialize(SetQueryResponse { values: result })?) } - fn exists(&self, _actor: &str, req: KeyExistsQuery) -> Result, Box> { + fn exists( + &self, + _actor: &str, + req: KeyExistsArgs, + ) -> Result, Box> { let store = self.store.read().unwrap(); let result: bool = store.exists(&req.key)?; Ok(serialize(GetResponse { @@ -176,57 +204,15 @@ impl KeyvalueProvider { exists: result, })?) } - - fn get_descriptor(&self) -> Result, Box> { - use OperationDirection::ToProvider; - Ok(serialize( - CapabilityDescriptor::builder() - .id(CAPABILITY_ID) - .name("waSCC Default Key-Value Provider (In-Memory)") - .long_description( - "A key-value store capability provider built on in-process hash maps", - ) - .version(VERSION) - .revision(REVISION) - .with_operation(OP_ADD, ToProvider, "Performs an atomic addition operation") - .with_operation(OP_DEL, ToProvider, "Deletes a key from the store") - .with_operation(OP_GET, ToProvider, "Gets the raw value for a key") - .with_operation(OP_CLEAR, ToProvider, "Clears a list") - .with_operation( - OP_RANGE, - ToProvider, - "Selects items from a list within a range", - ) - .with_operation(OP_PUSH, ToProvider, "Pushes a new item onto a list") - .with_operation(OP_SET, ToProvider, "Sets the value of a key") - .with_operation(OP_LIST_DEL, ToProvider, "Deletes an item from a list") - .with_operation(OP_SET_ADD, ToProvider, "Adds an item to a set") - .with_operation(OP_SET_REMOVE, ToProvider, "Remove an item from a set") - .with_operation( - OP_SET_UNION, - ToProvider, - "Returns the union of multiple sets", - ) - .with_operation( - OP_SET_INTERSECT, - ToProvider, - "Returns the intersection of multiple sets", - ) - .with_operation(OP_SET_QUERY, ToProvider, "Queries a set") - .with_operation( - OP_KEY_EXISTS, - ToProvider, - "Returns a boolean indicating if a key exists", - ) - .build(), - )?) - } } impl CapabilityProvider for KeyvalueProvider { // Invoked by the runtime host to give this provider plugin the ability to communicate // with actors - fn configure_dispatch(&self, dispatcher: Box) -> Result<(), Box> { + fn configure_dispatch( + &self, + dispatcher: Box, + ) -> Result<(), Box> { trace!("Dispatcher received."); let mut lock = self.dispatcher.write().unwrap(); *lock = dispatcher; @@ -236,28 +222,35 @@ impl CapabilityProvider for KeyvalueProvider { // Invoked by host runtime to allow an actor to make use of the capability // All providers MUST handle the "configure" message, even if no work will be done - fn handle_call(&self, actor: &str, op: &str, msg: &[u8]) -> Result, Box> { + fn handle_call( + &self, + actor: &str, + op: &str, + msg: &[u8], + ) -> Result, Box> { trace!("Received host call from {}, operation - {}", actor, op); match op { OP_BIND_ACTOR if actor == SYSTEM_ACTOR => self.configure(deserialize(msg)?), OP_REMOVE_ACTOR if actor == SYSTEM_ACTOR => self.remove_actor(deserialize(msg)?), - OP_GET_CAPABILITY_DESCRIPTOR if actor == SYSTEM_ACTOR => self.get_descriptor(), - keyvalue::OP_ADD => self.add(actor, deserialize(msg)?), - keyvalue::OP_DEL => self.del(actor, deserialize(msg)?), - keyvalue::OP_GET => self.get(actor, deserialize(msg)?), - keyvalue::OP_CLEAR => self.list_clear(actor, deserialize(msg)?), - keyvalue::OP_RANGE => self.list_range(actor, deserialize(msg)?), - keyvalue::OP_PUSH => self.list_push(actor, deserialize(msg)?), - keyvalue::OP_SET => self.set(actor, deserialize(msg)?), - keyvalue::OP_LIST_DEL => self.list_del_item(actor, deserialize(msg)?), - keyvalue::OP_SET_ADD => self.set_add(actor, deserialize(msg)?), - keyvalue::OP_SET_REMOVE => self.set_remove(actor, deserialize(msg)?), - keyvalue::OP_SET_UNION => self.set_union(actor, deserialize(msg)?), - keyvalue::OP_SET_INTERSECT => self.set_intersect(actor, deserialize(msg)?), - keyvalue::OP_SET_QUERY => self.set_query(actor, deserialize(msg)?), - keyvalue::OP_KEY_EXISTS => self.exists(actor, deserialize(msg)?), + OP_ADD => self.add(actor, deserialize(msg)?), + OP_DEL => self.del(actor, deserialize(msg)?), + OP_GET => self.get(actor, deserialize(msg)?), + OP_CLEAR => self.list_clear(actor, deserialize(msg)?), + OP_RANGE => self.list_range(actor, deserialize(msg)?), + OP_PUSH => self.list_push(actor, deserialize(msg)?), + OP_SET => self.set(actor, deserialize(msg)?), + OP_LIST_DEL => self.list_del_item(actor, deserialize(msg)?), + OP_SET_ADD => self.set_add(actor, deserialize(msg)?), + OP_SET_REMOVE => self.set_remove(actor, deserialize(msg)?), + OP_SET_UNION => self.set_union(actor, deserialize(msg)?), + OP_SET_INTERSECT => self.set_intersect(actor, deserialize(msg)?), + OP_SET_QUERY => self.set_query(actor, deserialize(msg)?), + OP_KEY_EXISTS => self.exists(actor, deserialize(msg)?), _ => Err("bad dispatch".into()), } } + + /// No cleanup needed + fn stop(&self) {} }