diff --git a/Cargo.lock b/Cargo.lock index 25d792db42..0e9cf73e06 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7093,6 +7093,7 @@ dependencies = [ "spin-resource-table", "spin-world", "tempfile", + "thiserror", "tokio", "toml 0.8.19", "tracing", @@ -7355,6 +7356,7 @@ name = "spin-key-value-azure" version = "2.8.0-pre0" dependencies = [ "anyhow", + "azure_core", "azure_data_cosmos", "azure_identity", "futures", diff --git a/crates/factor-key-value/Cargo.toml b/crates/factor-key-value/Cargo.toml index 1d103ec294..af38d9f8e5 100644 --- a/crates/factor-key-value/Cargo.toml +++ b/crates/factor-key-value/Cargo.toml @@ -16,6 +16,7 @@ spin-world = { path = "../world" } tokio = { workspace = true, features = ["macros", "sync", "rt"] } toml = { workspace = true } tracing = { workspace = true } +thiserror = { workspace = true } [dev-dependencies] spin-factors-test = { path = "../factors-test" } diff --git a/crates/factor-key-value/src/host.rs b/crates/factor-key-value/src/host.rs index b8dbb95968..06acd0bf40 100644 --- a/crates/factor-key-value/src/host.rs +++ b/crates/factor-key-value/src/host.rs @@ -1,7 +1,9 @@ +use super::{Cas, SwapError}; use anyhow::{Context, Result}; use spin_core::{async_trait, wasmtime::component::Resource}; use spin_resource_table::Table; use spin_world::v2::key_value; +use spin_world::wasi::keyvalue as wasi_keyvalue; use std::{collections::HashSet, sync::Arc}; use tracing::{instrument, Level}; @@ -30,12 +32,19 @@ pub trait Store: Sync + Send { async fn delete(&self, key: &str) -> Result<(), Error>; async fn exists(&self, key: &str) -> Result; async fn get_keys(&self) -> Result, Error>; + async fn get_many(&self, keys: Vec) -> Result>)>, Error>; + async fn set_many(&self, key_values: Vec<(String, Vec)>) -> Result<(), Error>; + async fn delete_many(&self, keys: Vec) -> Result<(), Error>; + async fn increment(&self, key: String, delta: i64) -> Result; + async fn new_compare_and_swap(&self, bucket_rep: u32, key: &str) + -> Result, Error>; } pub struct KeyValueDispatch { allowed_stores: HashSet, manager: Arc, stores: Table>, + compare_and_swaps: Table>, } impl KeyValueDispatch { @@ -52,16 +61,43 @@ impl KeyValueDispatch { allowed_stores, manager, stores: Table::new(capacity), + compare_and_swaps: Table::new(capacity), } } - pub fn get_store(&self, store: Resource) -> anyhow::Result<&Arc> { + pub fn get_store(&self, store: Resource) -> anyhow::Result<&Arc> { self.stores.get(store.rep()).context("invalid store") } + pub fn get_cas(&self, cas: Resource) -> Result<&Arc> { + self.compare_and_swaps + .get(cas.rep()) + .context("invalid compare and swap") + } + pub fn allowed_stores(&self) -> &HashSet { &self.allowed_stores } + + pub fn get_store_wasi( + &self, + store: Resource, + ) -> Result<&Arc, wasi_keyvalue::store::Error> { + self.stores + .get(store.rep()) + .ok_or(wasi_keyvalue::store::Error::NoSuchStore) + } + + pub fn get_cas_wasi( + &self, + cas: Resource, + ) -> Result<&Arc, wasi_keyvalue::atomics::Error> { + self.compare_and_swaps + .get(cas.rep()) + .ok_or(wasi_keyvalue::atomics::Error::Other( + "compare and swap not found".to_string(), + )) + } } #[async_trait] @@ -141,12 +177,239 @@ impl key_value::HostStore for KeyValueDispatch { } } +fn to_wasi_err(e: Error) -> wasi_keyvalue::store::Error { + match e { + Error::AccessDenied => wasi_keyvalue::store::Error::AccessDenied, + Error::NoSuchStore => wasi_keyvalue::store::Error::NoSuchStore, + Error::StoreTableFull => wasi_keyvalue::store::Error::Other("store table full".to_string()), + Error::Other(msg) => wasi_keyvalue::store::Error::Other(msg), + } +} + +#[async_trait] +impl wasi_keyvalue::store::Host for KeyValueDispatch { + async fn open( + &mut self, + identifier: String, + ) -> Result, wasi_keyvalue::store::Error> { + if self.allowed_stores.contains(&identifier) { + let store = self + .stores + .push(self.manager.get(&identifier).await.map_err(to_wasi_err)?) + .map_err(|()| wasi_keyvalue::store::Error::Other("store table full".to_string()))?; + Ok(Resource::new_own(store)) + } else { + Err(wasi_keyvalue::store::Error::AccessDenied) + } + } + + fn convert_error( + &mut self, + error: spin_world::wasi::keyvalue::store::Error, + ) -> std::result::Result { + Ok(error) + } +} + +use wasi_keyvalue::store::Bucket; +#[async_trait] +impl wasi_keyvalue::store::HostBucket for KeyValueDispatch { + async fn get( + &mut self, + self_: Resource, + key: String, + ) -> Result>, wasi_keyvalue::store::Error> { + let store = self.get_store_wasi(self_)?; + store.get(&key).await.map_err(to_wasi_err) + } + + async fn set( + &mut self, + self_: Resource, + key: String, + value: Vec, + ) -> Result<(), wasi_keyvalue::store::Error> { + let store = self.get_store_wasi(self_)?; + store.set(&key, &value).await.map_err(to_wasi_err) + } + + async fn delete( + &mut self, + self_: Resource, + key: String, + ) -> Result<(), wasi_keyvalue::store::Error> { + let store = self.get_store_wasi(self_)?; + store.delete(&key).await.map_err(to_wasi_err) + } + + async fn exists( + &mut self, + self_: Resource, + key: String, + ) -> Result { + let store = self.get_store_wasi(self_)?; + store.exists(&key).await.map_err(to_wasi_err) + } + + async fn list_keys( + &mut self, + self_: Resource, + cursor: Option, + ) -> Result { + match cursor { + Some(_) => Err(wasi_keyvalue::store::Error::Other( + "list_keys: cursor not supported".to_owned(), + )), + None => { + let store = self.get_store_wasi(self_)?; + let keys = store.get_keys().await.map_err(to_wasi_err)?; + Ok(wasi_keyvalue::store::KeyResponse { keys, cursor: None }) + } + } + } + + async fn drop(&mut self, rep: Resource) -> anyhow::Result<()> { + self.stores.remove(rep.rep()); + Ok(()) + } +} + +#[async_trait] +impl wasi_keyvalue::batch::Host for KeyValueDispatch { + #[instrument(name = "spin_key_value.get_many", skip(self, bucket, keys), err(level = Level::INFO), fields(otel.kind = "client"))] + async fn get_many( + &mut self, + bucket: Resource, + keys: Vec, + ) -> std::result::Result>)>, wasi_keyvalue::store::Error> { + let store = self.get_store_wasi(bucket)?; + store + .get_many(keys.iter().map(|k| k.to_string()).collect()) + .await + .map_err(to_wasi_err) + } + + #[instrument(name = "spin_key_value.set_many", skip(self, bucket, key_values), err(level = Level::INFO), fields(otel.kind = "client"))] + async fn set_many( + &mut self, + bucket: Resource, + key_values: Vec<(String, Vec)>, + ) -> std::result::Result<(), wasi_keyvalue::store::Error> { + let store = self.get_store_wasi(bucket)?; + store.set_many(key_values).await.map_err(to_wasi_err) + } + + #[instrument(name = "spin_key_value.get_many", skip(self, bucket, keys), err(level = Level::INFO), fields(otel.kind = "client"))] + async fn delete_many( + &mut self, + bucket: Resource, + keys: Vec, + ) -> std::result::Result<(), wasi_keyvalue::store::Error> { + let store = self.get_store_wasi(bucket)?; + store + .delete_many(keys.iter().map(|k| k.to_string()).collect()) + .await + .map_err(to_wasi_err) + } +} + +#[async_trait] +impl wasi_keyvalue::atomics::HostCas for KeyValueDispatch { + async fn new( + &mut self, + bucket: Resource, + key: String, + ) -> Result, wasi_keyvalue::store::Error> { + let bucket_rep = bucket.rep(); + let bucket: Resource = Resource::new_own(bucket_rep); + let store = self.get_store_wasi(bucket)?; + let cas = store + .new_compare_and_swap(bucket_rep, &key) + .await + .map_err(to_wasi_err)?; + self.compare_and_swaps + .push(cas) + .map_err(|()| { + spin_world::wasi::keyvalue::store::Error::Other( + "too many compare_and_swaps opened".to_string(), + ) + }) + .map(Resource::new_own) + } + + async fn current( + &mut self, + cas: Resource, + ) -> Result>, wasi_keyvalue::store::Error> { + let cas = self + .get_cas(cas) + .map_err(|e| wasi_keyvalue::store::Error::Other(e.to_string()))?; + cas.current().await.map_err(to_wasi_err) + } + + async fn drop(&mut self, rep: Resource) -> Result<()> { + self.compare_and_swaps.remove(rep.rep()); + Ok(()) + } +} + +#[async_trait] +impl wasi_keyvalue::atomics::Host for KeyValueDispatch { + #[instrument(name = "spin_key_value.increment", skip(self, bucket, key, delta), err(level = Level::INFO), fields(otel.kind = "client"))] + async fn increment( + &mut self, + bucket: Resource, + key: String, + delta: i64, + ) -> Result { + let store = self.get_store_wasi(bucket)?; + store.increment(key, delta).await.map_err(to_wasi_err) + } + + #[instrument(name = "spin_key_value.swap", skip(self, cas_res, value), err(level = Level::INFO), fields(otel.kind = "client"))] + async fn swap( + &mut self, + cas_res: Resource, + value: Vec, + ) -> Result> { + let cas_rep = cas_res.rep(); + let cas = self + .get_cas(Resource::::new_own(cas_rep)) + .map_err(|e| CasError::StoreError(atomics::Error::Other(e.to_string())))?; + + match cas.swap(value).await { + Ok(_) => Ok(Ok(())), + Err(err) => match err { + SwapError::CasFailed(_) => { + let bucket = Resource::new_own(cas.bucket_rep().await); + let new_cas = self.new(bucket, cas.key().await).await?; + let new_cas_rep = new_cas.rep(); + self.current(Resource::new_own(new_cas_rep)).await?; + Err(anyhow::Error::new(CasError::CasFailed(Resource::new_own( + new_cas_rep, + )))) + } + SwapError::Other(msg) => Err(anyhow::Error::new(CasError::StoreError( + atomics::Error::Other(msg), + ))), + }, + } + } +} + pub fn log_error(err: impl std::fmt::Debug) -> Error { tracing::warn!("key-value error: {err:?}"); Error::Other(format!("{err:?}")) } +pub fn log_cas_error(err: impl std::fmt::Debug) -> SwapError { + tracing::warn!("key-value error: {err:?}"); + SwapError::Other(format!("{err:?}")) +} + use spin_world::v1::key_value::Error as LegacyError; +use spin_world::wasi::keyvalue::atomics; +use spin_world::wasi::keyvalue::atomics::{CasError, HostCas}; fn to_legacy_error(value: key_value::Error) -> LegacyError { match value { diff --git a/crates/factor-key-value/src/lib.rs b/crates/factor-key-value/src/lib.rs index 685766b51c..2d9dbb2d9d 100644 --- a/crates/factor-key-value/src/lib.rs +++ b/crates/factor-key-value/src/lib.rs @@ -15,8 +15,9 @@ use spin_locked_app::MetadataKey; /// Metadata key for key-value stores. pub const KEY_VALUE_STORES_KEY: MetadataKey> = MetadataKey::new("key_value_stores"); -pub use host::{log_error, Error, KeyValueDispatch, Store, StoreManager}; +pub use host::{log_cas_error, log_error, Error, KeyValueDispatch, Store, StoreManager}; pub use runtime_config::RuntimeConfig; +use spin_core::async_trait; pub use util::{CachingStoreManager, DelegatingStoreManager}; /// A factor that provides key-value storage. @@ -40,6 +41,9 @@ impl Factor for KeyValueFactor { fn init(&mut self, mut ctx: InitContext) -> anyhow::Result<()> { ctx.link_bindings(spin_world::v1::key_value::add_to_linker)?; ctx.link_bindings(spin_world::v2::key_value::add_to_linker)?; + ctx.link_bindings(spin_world::wasi::keyvalue::store::add_to_linker)?; + ctx.link_bindings(spin_world::wasi::keyvalue::batch::add_to_linker)?; + ctx.link_bindings(spin_world::wasi::keyvalue::atomics::add_to_linker)?; Ok(()) } @@ -131,6 +135,37 @@ impl AppState { } } +/// `SwapError` are errors that occur during compare and swap operations +#[derive(Debug, thiserror::Error)] +pub enum SwapError { + #[error("{0}")] + CasFailed(String), + + #[error("{0}")] + Other(String), +} + +/// `Cas` trait describes the interface a key value compare and swap implementor must fulfill. +/// +/// `current` is expected to get the current value for the key associated with the CAS operation +/// while also starting what is needed to ensure the value to be replaced will not have mutated +/// between the time of calling `current` and `swap`. For example, a get from a backend store +/// may provide the caller with an etag (a version stamp), which can be used with an if-match +/// header to ensure the version updated is the version that was read (optimistic concurrency). +/// Rather than an etag, one could start a transaction, if supported by the backing store, which +/// would provide atomicity. +/// +/// `swap` is expected to replace the old value with the new value respecting the atomicity of the +/// operation. If there was no key / value with the given key in the store, the `swap` operation +/// should **insert** the key and value, disallowing an update. +#[async_trait] +pub trait Cas: Sync + Send { + async fn current(&self) -> anyhow::Result>, Error>; + async fn swap(&self, value: Vec) -> anyhow::Result<(), SwapError>; + async fn bucket_rep(&self) -> u32; + async fn key(&self) -> String; +} + pub struct InstanceBuilder { /// The store manager for the app. /// diff --git a/crates/factor-key-value/src/util.rs b/crates/factor-key-value/src/util.rs index 57c4968011..dcf8cb4e7e 100644 --- a/crates/factor-key-value/src/util.rs +++ b/crates/factor-key-value/src/util.rs @@ -1,4 +1,4 @@ -use crate::{Error, Store, StoreManager}; +use crate::{Cas, Error, Store, StoreManager, SwapError}; use lru::LruCache; use spin_core::async_trait; use std::{ @@ -92,10 +92,10 @@ impl StoreManager for CachingStoreManager { async fn get(&self, name: &str) -> Result, Error> { Ok(Arc::new(CachingStore { inner: self.inner.get(name).await?, - state: AsyncMutex::new(CachingStoreState { + state: Arc::new(AsyncMutex::new(CachingStoreState { cache: LruCache::new(self.capacity), previous_task: None, - }), + })), })) } @@ -143,7 +143,7 @@ impl CachingStoreState { struct CachingStore { inner: Arc, - state: AsyncMutex, + state: Arc>, } #[async_trait] @@ -237,4 +237,123 @@ impl Store for CachingStore { .into_iter() .collect()) } + + async fn get_many( + &self, + keys: Vec, + ) -> anyhow::Result>)>, Error> { + let mut state = self.state.lock().await; + let mut found: Vec<(String, Option>)> = Vec::new(); + let mut not_found: Vec = Vec::new(); + for key in keys { + match state.cache.get(key.as_str()) { + Some(Some(value)) => found.push((key, Some(value.clone()))), + _ => not_found.push(key), + } + } + + let keys_and_values = self.inner.get_many(not_found).await?; + for (key, value) in keys_and_values { + found.push((key.clone(), value.clone())); + state.cache.put(key, value); + } + + Ok(found) + } + + async fn set_many(&self, key_values: Vec<(String, Vec)>) -> anyhow::Result<(), Error> { + let mut state = self.state.lock().await; + + for (key, value) in key_values.clone() { + state.cache.put(key, Some(value)); + } + + self.inner.set_many(key_values).await + } + + async fn delete_many(&self, keys: Vec) -> anyhow::Result<(), Error> { + let mut state = self.state.lock().await; + + for key in keys.clone() { + state.cache.put(key, None); + } + + self.inner.delete_many(keys).await + } + + async fn increment(&self, key: String, delta: i64) -> anyhow::Result { + let mut state = self.state.lock().await; + let counter = self.inner.increment(key.clone(), delta).await?; + state + .cache + .put(key, Some(i64::to_le_bytes(counter).to_vec())); + Ok(counter) + } + + async fn new_compare_and_swap( + &self, + bucket_rep: u32, + key: &str, + ) -> anyhow::Result, Error> { + let inner = self.inner.new_compare_and_swap(bucket_rep, key).await?; + Ok(Arc::new(CompareAndSwap { + bucket_rep, + state: self.state.clone(), + key: key.to_string(), + inner_cas: inner, + })) + } +} + +struct CompareAndSwap { + bucket_rep: u32, + key: String, + state: Arc>, + inner_cas: Arc, +} + +#[async_trait] +impl Cas for CompareAndSwap { + async fn current(&self) -> anyhow::Result>, Error> { + let mut state = self.state.lock().await; + state.flush().await?; + let res = self.inner_cas.current().await; + match res.clone() { + Ok(value) => { + state.cache.put(self.key.clone(), value.clone()); + state.flush().await?; + Ok(value) + } + Err(err) => Err(err), + }?; + res + } + + async fn swap(&self, value: Vec) -> anyhow::Result<(), SwapError> { + let mut state = self.state.lock().await; + state + .flush() + .await + .map_err(|_e| SwapError::Other("failed flushing".to_string()))?; + let res = self.inner_cas.swap(value.clone()).await; + match res { + Ok(()) => { + state.cache.put(self.key.clone(), Some(value)); + state + .flush() + .await + .map_err(|_e| SwapError::Other("failed flushing".to_string()))?; + Ok(()) + } + Err(err) => Err(err), + } + } + + async fn bucket_rep(&self) -> u32 { + self.bucket_rep + } + + async fn key(&self) -> String { + self.key.clone() + } } diff --git a/crates/factor-key-value/tests/factor_test.rs b/crates/factor-key-value/tests/factor_test.rs index 83a67700d7..afd62a03f9 100644 --- a/crates/factor-key-value/tests/factor_test.rs +++ b/crates/factor-key-value/tests/factor_test.rs @@ -1,6 +1,6 @@ use anyhow::bail; use spin_core::async_trait; -use spin_factor_key_value::{KeyValueFactor, RuntimeConfig, Store, StoreManager}; +use spin_factor_key_value::{Cas, KeyValueFactor, RuntimeConfig, Store, StoreManager}; use spin_factors::RuntimeFactors; use spin_factors_test::{toml, TestEnvironment}; use spin_world::v2::key_value::{Error, HostStore}; @@ -140,4 +140,36 @@ impl Store for MockStore { async fn get_keys(&self) -> Result, Error> { todo!() } + + async fn get_many( + &self, + keys: Vec, + ) -> anyhow::Result>)>, Error> { + let _ = keys; + todo!() + } + + async fn set_many(&self, key_values: Vec<(String, Vec)>) -> anyhow::Result<(), Error> { + let _ = key_values; + todo!() + } + + async fn delete_many(&self, keys: Vec) -> anyhow::Result<(), Error> { + let _ = keys; + todo!() + } + + async fn increment(&self, key: String, delta: i64) -> anyhow::Result { + let (_, _) = (key, delta); + todo!() + } + + async fn new_compare_and_swap( + &self, + bucket_rep: u32, + key: &str, + ) -> anyhow::Result, Error> { + let (_, _) = (key, bucket_rep); + todo!() + } } diff --git a/crates/key-value-azure/Cargo.toml b/crates/key-value-azure/Cargo.toml index 4697b0f920..b612f3f4a4 100644 --- a/crates/key-value-azure/Cargo.toml +++ b/crates/key-value-azure/Cargo.toml @@ -12,6 +12,7 @@ rust-version.workspace = true anyhow = { workspace = true } azure_data_cosmos = { git = "https://github.com/azure/azure-sdk-for-rust.git", rev = "8c4caa251c3903d5eae848b41bb1d02a4d65231c" } azure_identity = { git = "https://github.com/azure/azure-sdk-for-rust.git", rev = "8c4caa251c3903d5eae848b41bb1d02a4d65231c" } +azure_core = { git = "https://github.com/azure/azure-sdk-for-rust.git", rev = "8c4caa251c3903d5eae848b41bb1d02a4d65231c" } futures = { workspace = true } serde = { workspace = true } spin-core = { path = "../core" } diff --git a/crates/key-value-azure/src/store.rs b/crates/key-value-azure/src/store.rs index a6538e0747..258934e4df 100644 --- a/crates/key-value-azure/src/store.rs +++ b/crates/key-value-azure/src/store.rs @@ -1,6 +1,6 @@ -use std::sync::Arc; - use anyhow::Result; +use azure_data_cosmos::prelude::Operation; +use azure_data_cosmos::resources::collection::PartitionKey; use azure_data_cosmos::{ prelude::{AuthorizationToken, CollectionClient, CosmosClient, Query}, CosmosEntity, @@ -8,7 +8,8 @@ use azure_data_cosmos::{ use futures::StreamExt; use serde::{Deserialize, Serialize}; use spin_core::async_trait; -use spin_factor_key_value::{log_error, Error, Store, StoreManager}; +use spin_factor_key_value::{log_cas_error, log_error, Cas, Error, Store, StoreManager, SwapError}; +use std::sync::{Arc, Mutex}; pub struct KeyValueAzureCosmos { client: CollectionClient, @@ -111,11 +112,19 @@ impl StoreManager for KeyValueAzureCosmos { } } +#[derive(Clone)] struct AzureCosmosStore { _name: String, client: CollectionClient, } +struct CompareAndSwap { + key: String, + client: CollectionClient, + bucket_rep: u32, + etag: Mutex>, +} + #[async_trait] impl Store for AzureCosmosStore { async fn get(&self, key: &str) -> Result>, Error> { @@ -151,6 +160,160 @@ impl Store for AzureCosmosStore { async fn get_keys(&self) -> Result, Error> { self.get_keys().await } + + async fn get_many(&self, keys: Vec) -> Result>)>, Error> { + let in_clause: String = keys + .into_iter() + .map(|k| format!("'{}'", k)) + .collect::>() + .join(", "); + let stmt = Query::new(format!("SELECT * FROM c WHERE c.id IN ({})", in_clause)); + let query = self + .client + .query_documents(stmt) + .query_cross_partition(true); + + let mut res = Vec::new(); + let mut stream = query.into_stream::(); + while let Some(resp) = stream.next().await { + let resp = resp.map_err(log_error)?; + for (pair, _) in resp.results { + res.push((pair.id, Some(pair.value))); + } + } + Ok(res) + } + + async fn set_many(&self, key_values: Vec<(String, Vec)>) -> Result<(), Error> { + for (key, value) in key_values { + self.set(key.as_ref(), &value).await? + } + Ok(()) + } + + async fn delete_many(&self, keys: Vec) -> Result<(), Error> { + for key in keys { + self.delete(key.as_ref()).await? + } + Ok(()) + } + + async fn increment(&self, key: String, delta: i64) -> Result { + let operations = vec![Operation::incr("/value", delta).map_err(log_error)?]; + let _ = self + .client + .document_client(key.clone(), &key.as_str()) + .map_err(log_error)? + .patch_document(operations) + .await + .map_err(log_error)?; + let pair = self.get_pair(key.as_ref()).await?; + match pair { + Some(p) => Ok(i64::from_le_bytes( + p.value.try_into().expect("incorrect length"), + )), + None => Err(Error::Other( + "increment returned an empty value after patching, which indicates a bug" + .to_string(), + )), + } + } + + async fn new_compare_and_swap( + &self, + bucket_rep: u32, + key: &str, + ) -> Result, Error> { + Ok(Arc::new(CompareAndSwap { + key: key.to_string(), + client: self.client.clone(), + etag: Mutex::new(None), + bucket_rep, + })) + } +} + +#[async_trait] +impl Cas for CompareAndSwap { + /// `current` will fetch the current value for the key and store the etag for the record. The + /// etag will be used to perform and optimistic concurrency update using the `if-match` header. + async fn current(&self) -> Result>, Error> { + let mut stream = self + .client + .query_documents(Query::new(format!( + "SELECT * FROM c WHERE c.id='{}'", + self.key + ))) + .query_cross_partition(true) + .max_item_count(1) + .into_stream::(); + + let current_value: Option<(Vec, Option)> = match stream.next().await { + Some(r) => { + let r = r.map_err(log_error)?; + match r.results.first() { + Some((item, Some(attr))) => { + Some((item.clone().value, Some(attr.etag().to_string()))) + } + Some((item, None)) => Some((item.clone().value, None)), + _ => None, + } + } + None => None, + }; + + match current_value { + Some((value, etag)) => { + self.etag.lock().unwrap().clone_from(&etag); + Ok(Some(value)) + } + None => Ok(None), + } + } + + /// `swap` updates the value for the key using the etag saved in the `current` function for + /// optimistic concurrency. + async fn swap(&self, value: Vec) -> Result<(), SwapError> { + let pk = PartitionKey::from(&self.key); + let pair = Pair { + id: self.key.clone(), + value, + }; + + let doc_client = self + .client + .document_client(&self.key, &pk) + .map_err(log_cas_error)?; + + let etag_value = self.etag.lock().unwrap().clone(); + match etag_value { + Some(etag) => { + // attempt to replace the document if the etag matches + doc_client + .replace_document(pair) + .if_match_condition(azure_core::request_options::IfMatchCondition::Match(etag)) + .await + .map_err(|e| SwapError::CasFailed(format!("{e:?}"))) + .map(drop) + } + None => { + // if we have no etag, then we assume the document does not yet exist and must insert; no upserts. + self.client + .create_document(pair) + .await + .map_err(|e| SwapError::CasFailed(format!("{e:?}"))) + .map(drop) + } + } + } + + async fn bucket_rep(&self) -> u32 { + self.bucket_rep + } + + async fn key(&self) -> String { + self.key.clone() + } } impl AzureCosmosStore { diff --git a/crates/key-value-redis/src/store.rs b/crates/key-value-redis/src/store.rs index 8d2950305f..1fbbabc6c0 100644 --- a/crates/key-value-redis/src/store.rs +++ b/crates/key-value-redis/src/store.rs @@ -1,7 +1,8 @@ use anyhow::{Context, Result}; -use redis::{aio::MultiplexedConnection, parse_redis_url, AsyncCommands}; +use redis::{aio::MultiplexedConnection, parse_redis_url, AsyncCommands, Client, RedisError}; use spin_core::async_trait; -use spin_factor_key_value::{log_error, Error, Store, StoreManager}; +use spin_factor_key_value::{log_error, Cas, Error, Store, StoreManager, SwapError}; +use std::ops::DerefMut; use std::sync::Arc; use tokio::sync::{Mutex, OnceCell}; use url::Url; @@ -28,7 +29,7 @@ impl StoreManager for KeyValueRedis { let connection = self .connection .get_or_try_init(|| async { - redis::Client::open(self.database_url.clone())? + Client::open(self.database_url.clone())? .get_multiplexed_async_connection() .await .map(Mutex::new) @@ -39,6 +40,7 @@ impl StoreManager for KeyValueRedis { Ok(Arc::new(RedisStore { connection: connection.clone(), + database_url: self.database_url.clone(), })) } @@ -54,6 +56,13 @@ impl StoreManager for KeyValueRedis { struct RedisStore { connection: Arc>, + database_url: Url, +} + +struct CompareAndSwap { + key: String, + connection: Arc>, + bucket_rep: u32, } #[async_trait] @@ -98,4 +107,113 @@ impl Store for RedisStore { .await .map_err(log_error) } + + async fn get_many(&self, keys: Vec) -> Result>)>, Error> { + self.connection + .lock() + .await + .keys(keys) + .await + .map_err(log_error) + } + + async fn set_many(&self, key_values: Vec<(String, Vec)>) -> Result<(), Error> { + self.connection + .lock() + .await + .mset(&key_values) + .await + .map_err(log_error) + } + + async fn delete_many(&self, keys: Vec) -> Result<(), Error> { + self.connection + .lock() + .await + .del(keys) + .await + .map_err(log_error) + } + + async fn increment(&self, key: String, delta: i64) -> Result { + self.connection + .lock() + .await + .incr(key, delta) + .await + .map_err(log_error) + } + + /// `new_compare_and_swap` builds a new CAS structure giving it its own connection since Redis + /// transactions are scoped to a connection and any WATCH should be dropped upon the drop of + /// the connection. + async fn new_compare_and_swap( + &self, + bucket_rep: u32, + key: &str, + ) -> Result, Error> { + let cx = Client::open(self.database_url.clone()) + .map_err(log_error)? + .get_multiplexed_async_connection() + .await + .map(Mutex::new) + .map(Arc::new) + .map_err(log_error)?; + + Ok(Arc::new(CompareAndSwap { + key: key.to_string(), + connection: cx, + bucket_rep, + })) + } +} + +#[async_trait] +impl Cas for CompareAndSwap { + /// current will initiate a transaction by WATCH'ing a key in Redis, and then returning the + /// current value for the key. + async fn current(&self) -> Result>, Error> { + redis::cmd("WATCH") + .arg(&self.key) + .exec_async(self.connection.lock().await.deref_mut()) + .await + .map_err(log_error)?; + self.connection + .lock() + .await + .get(&self.key) + .await + .map_err(log_error) + } + + /// swap will set the key to the new value only if the key has not changed. Afterward, the + /// transaction will be terminated with an UNWATCH + async fn swap(&self, value: Vec) -> Result<(), SwapError> { + // Create transaction pipeline + let mut transaction = redis::pipe(); + let res: Result<(), RedisError> = transaction + .atomic() + .set(&self.key, value) + .query_async(self.connection.lock().await.deref_mut()) + .await; + + redis::cmd("UNWATCH") + .arg(&self.key) + .exec_async(self.connection.lock().await.deref_mut()) + .await + .map_err(|err| SwapError::CasFailed(format!("{err:?}")))?; + + match res { + Ok(_) => Ok(()), + Err(err) => Err(SwapError::CasFailed(format!("{err:?}"))), + } + } + + async fn bucket_rep(&self) -> u32 { + self.bucket_rep + } + + async fn key(&self) -> String { + self.key.clone() + } } diff --git a/crates/key-value-spin/Cargo.toml b/crates/key-value-spin/Cargo.toml index 84a522f27c..6c431c7ad6 100644 --- a/crates/key-value-spin/Cargo.toml +++ b/crates/key-value-spin/Cargo.toml @@ -6,7 +6,7 @@ edition = { workspace = true } [dependencies] anyhow = { workspace = true } -rusqlite = { version = "0.32", features = ["bundled"] } +rusqlite = { version = "0.32", features = ["bundled", "array"] } serde = { workspace = true } spin-core = { path = "../core" } spin-factor-key-value = { path = "../factor-key-value" } diff --git a/crates/key-value-spin/src/store.rs b/crates/key-value-spin/src/store.rs index e12f5e05c7..f18b60f7b7 100644 --- a/crates/key-value-spin/src/store.rs +++ b/crates/key-value-spin/src/store.rs @@ -1,7 +1,8 @@ use anyhow::Result; -use rusqlite::Connection; +use rusqlite::{named_params, Connection}; use spin_core::async_trait; -use spin_factor_key_value::{log_error, Error, Store, StoreManager}; +use spin_factor_key_value::{log_cas_error, log_error, Cas, Error, Store, StoreManager, SwapError}; +use std::rc::Rc; use std::{ path::PathBuf, sync::OnceLock, @@ -53,6 +54,9 @@ impl KeyValueSqlite { ) .map_err(log_error)?; + // the array module is needed for `rarray` usage in queries. + rusqlite::vtab::array::load_module(&connection).map_err(log_error)?; + Ok(Arc::new(Mutex::new(connection))) } } @@ -156,6 +160,184 @@ impl Store for SqliteStore { .collect() }) } + + async fn get_many(&self, keys: Vec) -> Result>)>, Error> { + task::block_in_place(|| { + let sql_value_keys: Vec = + keys.into_iter().map(rusqlite::types::Value::from).collect(); + let ptr = Rc::new(sql_value_keys); + let row_iter: Vec>), Error>> = self.connection + .lock() + .unwrap() + .prepare_cached("SELECT key, value FROM spin_key_value WHERE store=:name AND key IN rarray(:keys)") + .map_err(log_error)? + .query_map(named_params! {":name": &self.name, ":keys": ptr}, |row| { + <(String, Option>)>::try_from(row) + }) + .map_err(log_error)? + .map(|r: Result<(String, Option>), rusqlite::Error>| r.map_err(log_error)) + .collect(); + + let mut keys_and_values: Vec<(String, Option>)> = Vec::new(); + for row in row_iter { + let res = row.map_err(log_error)?; + keys_and_values.push(res); + } + Ok(keys_and_values) + }) + } + + async fn set_many(&self, key_values: Vec<(String, Vec)>) -> Result<(), Error> { + task::block_in_place(|| { + let mut binding = self.connection.lock().unwrap(); + let tx = binding.transaction().map_err(log_error)?; + for kv in key_values { + tx.prepare_cached( + "INSERT INTO spin_key_value (store, key, value) VALUES ($1, $2, $3) + ON CONFLICT(store, key) DO UPDATE SET value=$3", + ) + .map_err(log_error)? + .execute(rusqlite::params![&self.name, kv.0, kv.1]) + .map_err(log_error) + .map(drop)?; + } + tx.commit().map_err(log_error) + }) + } + + async fn delete_many(&self, keys: Vec) -> Result<(), Error> { + task::block_in_place(|| { + let sql_value_keys: Vec = + keys.into_iter().map(rusqlite::types::Value::from).collect(); + let ptr = Rc::new(sql_value_keys); + self.connection + .lock() + .unwrap() + .prepare_cached( + "DELETE FROM spin_key_value WHERE store=:name AND key IN rarray(:keys)", + ) + .map_err(log_error)? + .execute(named_params! {":name": &self.name, ":keys": ptr}) + .map_err(log_error) + .map(drop) + }) + } + + // The assumption with increment is that if the value for the key does not exist, it will be + // assumed to be zero. In the case that we are unable to unmarshal the value into an i64 an error will be returned. + async fn increment(&self, key: String, delta: i64) -> Result { + task::block_in_place(|| { + let mut binding = self.connection.lock().unwrap(); + + let tx = binding.transaction().map_err(log_error)?; + + let value: Option> = tx + .prepare_cached("SELECT value FROM spin_key_value WHERE store=$1 AND key=$2") + .map_err(log_error)? + .query_map([&self.name, &key], |row| row.get(0)) + .map_err(log_error)? + .next() + .transpose() + .map_err(log_error)?; + + let numeric: i64 = match value { + Some(v) => i64::from_le_bytes(v.try_into().expect("incorrect length")), + None => 0, + }; + + let new_value = numeric + delta; + tx.prepare_cached( + "INSERT INTO spin_key_value (store, key, value) VALUES ($1, $2, $3) + ON CONFLICT(store, key) DO UPDATE SET value=$3", + ) + .map_err(log_error)? + .execute(rusqlite::params![&self.name, key, new_value.to_le_bytes()]) + .map_err(log_error) + .map(drop)?; + + tx.commit().map_err(log_error)?; + Ok(new_value) + }) + } + + async fn new_compare_and_swap( + &self, + bucket_rep: u32, + key: &str, + ) -> Result, Error> { + Ok(Arc::new(CompareAndSwap { + name: self.name.clone(), + key: key.to_string(), + connection: self.connection.clone(), + value: Mutex::new(None), + bucket_rep, + })) + } +} + +struct CompareAndSwap { + name: String, + key: String, + value: Mutex>>, + connection: Arc>, + bucket_rep: u32, +} + +#[async_trait] +impl Cas for CompareAndSwap { + async fn current(&self) -> Result>, Error> { + task::block_in_place(|| { + let value: Option> = self + .connection + .lock() + .unwrap() + .prepare_cached("SELECT value FROM spin_key_value WHERE store=$1 AND key=$2") + .map_err(log_error)? + .query_map([&self.name, &self.key], |row| row.get(0)) + .map_err(log_error)? + .next() + .transpose() + .map_err(log_error)?; + + self.value.lock().unwrap().clone_from(&value); + Ok(value.clone()) + }) + } + + async fn swap(&self, value: Vec) -> Result<(), SwapError> { + task::block_in_place(|| { + let old_value = self.value.lock().unwrap(); + let rows_changed = self.connection + .lock() + .unwrap() + .prepare_cached( + "UPDATE spin_key_value SET value=:new_value WHERE store=:name and key=:key and value=:old_value", + ) + .map_err(log_cas_error)? + .execute(named_params! { + ":name": &self.name, + ":key": self.key, + ":old_value": old_value.clone().unwrap(), + ":new_value": value, + }) + .map_err(log_cas_error)?; + + // We expect only 1 row to be updated. If 0, we know that the underlying value has changed. + if rows_changed == 1 { + Ok(()) + } else { + Err(SwapError::CasFailed("failed to update 1 row".to_owned())) + } + }) + } + + async fn bucket_rep(&self) -> u32 { + self.bucket_rep + } + + async fn key(&self) -> String { + self.key.clone() + } } #[cfg(test)] @@ -164,6 +346,9 @@ mod test { use spin_core::wasmtime::component::Resource; use spin_factor_key_value::{DelegatingStoreManager, KeyValueDispatch}; use spin_world::v2::key_value::HostStore; + use spin_world::wasi::keyvalue::atomics::HostCas as wasi_cas_host; + use spin_world::wasi::keyvalue::atomics::{CasError, Host}; + use spin_world::wasi::keyvalue::batch::Host as wasi_batch_host; #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn all() -> Result<()> { @@ -248,8 +433,123 @@ mod test { Ok(None) )); - kv.drop(Resource::new_own(rep)).await?; + let keys_and_values: Vec<(String, Vec)> = vec![ + ("bin".to_string(), b"baz".to_vec()), + ("alex".to_string(), b"pat".to_vec()), + ]; + assert!(kv + .set_many(Resource::new_own(rep), keys_and_values.clone()) + .await + .is_ok()); + + let res = kv + .get_many( + Resource::new_own(rep), + keys_and_values + .clone() + .iter() + .map(|key_value| key_value.0.clone()) + .collect(), + ) + .await; + + assert!(res.is_ok(), "failed with {:?}", res.err()); + assert_eq!( + kv.get(Resource::new_own(rep), "bin".to_owned()) + .await?? + .unwrap(), + b"baz".to_vec() + ); + + assert_eq!(kv_incr(&mut kv, rep, 1).await, 1); + assert_eq!(kv_incr(&mut kv, rep, 2).await, 3); + assert_eq!(kv_incr(&mut kv, rep, -10).await, -7); + + let res = kv + .delete_many( + Resource::new_own(rep), + vec!["counter".to_owned(), "bin".to_owned(), "alex".to_owned()], + ) + .await; + assert!(res.is_ok(), "failed with {:?}", res.err()); + assert_eq!(kv.get_keys(Resource::new_own(rep)).await??.len(), 0); + + // Compare and Swap tests + cas_failed(&mut kv, rep).await?; + cas_succeeds(&mut kv, rep).await?; + + HostStore::drop(&mut kv, Resource::new_own(rep)).await?; Ok(()) } + + async fn cas_failed(kv: &mut KeyValueDispatch, rep: u32) -> Result<()> { + let cas_key = "fail".to_owned(); + let cas_orig_value = b"baz".to_vec(); + kv.set( + Resource::new_own(rep), + cas_key.clone(), + cas_orig_value.clone(), + ) + .await??; + let cas = kv.new(Resource::new_own(rep), cas_key.clone()).await?; + let cas_rep = cas.rep(); + let current_val = kv.current(Resource::new_own(cas_rep)).await?.unwrap(); + assert_eq!( + String::from_utf8(cas_orig_value)?, + String::from_utf8(current_val)? + ); + + // change the value midway through a compare_and_set + kv.set(Resource::new_own(rep), cas_key.clone(), b"foo".to_vec()) + .await??; + let cas_final_value = b"This should fail with a CAS error".to_vec(); + let res = kv.swap(Resource::new_own(cas.rep()), cas_final_value).await; + match res { + Ok(_) => panic!("expected a CAS failure"), + Err(err) => { + for cause in err.chain() { + if let Some(cas_err) = cause.downcast_ref::() { + assert!(matches!(cas_err, CasError::CasFailed(_))); + return Ok(()); + } + } + panic!("expected a CAS failure") + } + } + } + + async fn cas_succeeds(kv: &mut KeyValueDispatch, rep: u32) -> Result<()> { + let cas_key = "succeed".to_owned(); + let cas_orig_value = b"baz".to_vec(); + kv.set( + Resource::new_own(rep), + cas_key.clone(), + cas_orig_value.clone(), + ) + .await??; + let cas = kv.new(Resource::new_own(rep), cas_key.clone()).await?; + let cas_rep = cas.rep(); + let current_val = kv.current(Resource::new_own(cas_rep)).await?.unwrap(); + assert_eq!( + String::from_utf8(cas_orig_value)?, + String::from_utf8(current_val)? + ); + let cas_final_value = b"This should update key bar".to_vec(); + let res = kv.swap(Resource::new_own(cas.rep()), cas_final_value).await; + match res { + Ok(_) => Ok(()), + Err(err) => { + panic!("unexpected err: {:?}", err); + } + } + } + + async fn kv_incr(kv: &mut KeyValueDispatch, rep: u32, delta: i64) -> i64 { + let res = kv + .increment(Resource::new_own(rep), "counter".to_owned(), delta) + .await; + assert!(res.is_ok(), "failed with {:?}", res.err()); + res.unwrap() + } } diff --git a/crates/world/src/lib.rs b/crates/world/src/lib.rs index 4aa2ecbd0c..a3ecf810e4 100644 --- a/crates/world/src/lib.rs +++ b/crates/world/src/lib.rs @@ -10,6 +10,7 @@ wasmtime::component::bindgen!({ include fermyon:spin/host; include fermyon:spin/platform@2.0.0; include fermyon:spin/platform@3.0.0; + include wasi:keyvalue/imports@0.2.0-draft2; } "#, path: "../../wit", @@ -31,6 +32,7 @@ wasmtime::component::bindgen!({ "fermyon:spin/variables@2.0.0/error" => v2::variables::Error, "spin:postgres/postgres/error" => spin::postgres::postgres::Error, "wasi:config/store@0.2.0-draft-2024-09-27/error" => wasi::config::store::Error, + "wasi:keyvalue/store/error" => wasi::keyvalue::store::Error, }, trappable_imports: true, }); diff --git a/tests/runtime-tests/tests/wasi-key-value/spin.toml b/tests/runtime-tests/tests/wasi-key-value/spin.toml new file mode 100644 index 0000000000..53cda06cec --- /dev/null +++ b/tests/runtime-tests/tests/wasi-key-value/spin.toml @@ -0,0 +1,14 @@ +spin_manifest_version = 2 + +[application] +name = "wasi-key-value" +authors = ["Fermyon Engineering "] +version = "0.1.0" + +[[trigger.http]] +route = "/" +component = "test" + +[component.test] +source = "%{source=wasi-key-value}" +key_value_stores = ["default"] diff --git a/tests/test-components/components/Cargo.lock b/tests/test-components/components/Cargo.lock index bec40906b2..d4f02054f9 100644 --- a/tests/test-components/components/Cargo.lock +++ b/tests/test-components/components/Cargo.lock @@ -939,6 +939,14 @@ dependencies = [ "wit-bindgen 0.16.0", ] +[[package]] +name = "wasi-key-value" +version = "0.1.0" +dependencies = [ + "helper", + "wit-bindgen 0.16.0", +] + [[package]] name = "wasm-encoder" version = "0.36.2" diff --git a/tests/test-components/components/wasi-key-value/Cargo.toml b/tests/test-components/components/wasi-key-value/Cargo.toml new file mode 100644 index 0000000000..0bd198ce14 --- /dev/null +++ b/tests/test-components/components/wasi-key-value/Cargo.toml @@ -0,0 +1,11 @@ +[package] +name = "wasi-key-value" +version = "0.1.0" +edition = "2021" + +[lib] +crate-type = ["cdylib"] + +[dependencies] +helper = { path = "../../helper" } +wit-bindgen = "0.16.0" diff --git a/tests/test-components/components/wasi-key-value/README.md b/tests/test-components/components/wasi-key-value/README.md new file mode 100644 index 0000000000..2cff701082 --- /dev/null +++ b/tests/test-components/components/wasi-key-value/README.md @@ -0,0 +1,10 @@ +# Key Value + +Tests the key/value interface. + +## Expectations + +This test component expects the following to be true: +* It is given permission to open a connection to the "default" store. +* It does not have permission to access a store named "forbidden". +* It is empty diff --git a/tests/test-components/components/wasi-key-value/src/lib.rs b/tests/test-components/components/wasi-key-value/src/lib.rs new file mode 100644 index 0000000000..dd435f6133 --- /dev/null +++ b/tests/test-components/components/wasi-key-value/src/lib.rs @@ -0,0 +1,66 @@ +use helper::{ensure_matches, ensure_ok}; + +use bindings::wasi::keyvalue::store::{open, Error, KeyResponse}; +use bindings::wasi::keyvalue::batch as wasi_batch; +use bindings::wasi::keyvalue::atomics as wasi_atomics; + +helper::define_component!(Component); + +impl Component { + fn main() -> Result<(), String> { + ensure_matches!(open("forbidden"), Err(Error::AccessDenied)); + + let store = ensure_ok!(open("default")); + + // Ensure nothing set in `bar` key + ensure_ok!(store.delete("bar")); + ensure_matches!(store.exists("bar"), Ok(false)); + ensure_matches!(store.get("bar"), Ok(None)); + ensure_matches!(keys(&store.list_keys(None)), Ok(&[])); + + // Set `bar` key + ensure_ok!(store.set("bar", b"baz")); + ensure_matches!(store.exists("bar"), Ok(true)); + ensure_matches!(store.get("bar"), Ok(Some(v)) if v == b"baz"); + ensure_matches!(keys(&store.list_keys(None)), Ok([bar]) if bar == "bar"); + ensure_matches!(keys(&store.list_keys(Some("0"))), Err(Error::Other(_))); // "list_keys: cursor not supported" + + // Override `bar` key + ensure_ok!(store.set("bar", b"wow")); + ensure_matches!(store.exists("bar"), Ok(true)); + ensure_matches!(store.get("bar"), Ok(Some(wow)) if wow == b"wow"); + ensure_matches!(keys(&store.list_keys(None)), Ok([bar]) if bar == "bar"); + + // Set another key + ensure_ok!(store.set("qux", b"yay")); + ensure_matches!(keys(&store.list_keys(None)), Ok(c) if c.len() == 2 && c.contains(&"bar".into()) && c.contains(&"qux".into())); + + // Delete everything + ensure_ok!(store.delete("bar")); + ensure_ok!(store.delete("bar")); + ensure_ok!(store.delete("qux")); + ensure_matches!(store.exists("bar"), Ok(false)); + ensure_matches!(store.get("qux"), Ok(None)); + ensure_matches!(keys(&store.list_keys(None)), Ok(&[])); + + ensure_ok!(wasi_batch::set_many(&store, &[("bar".to_string(), b"bin".to_vec()), ("baz".to_string(), b"buzz".to_vec())])); + ensure_ok!(wasi_batch::get_many(&store, &["bar".to_string(), "baz".to_string()])); + ensure_ok!(wasi_batch::delete_many(&store, &["bar".to_string(), "baz".to_string()])); + ensure_matches!(wasi_atomics::increment(&store, "counter", 10), Ok(v) if v == 10); + ensure_matches!(wasi_atomics::increment(&store, "counter", 5), Ok(v) if v == 15); + + // successful compare and swap + ensure_ok!(store.set("bar", b"wow")); + let cas = ensure_ok!(wasi_atomics::Cas::new(&store, "bar")); + ensure_matches!(cas.current(), Ok(Some(v)) if v == b"wow".to_vec()); + ensure_ok!(wasi_atomics::swap(cas, b"swapped")); + ensure_matches!(store.get("bar"), Ok(Some(v)) if v == b"swapped"); + ensure_ok!(store.delete("bar")); + + Ok(()) + } +} + +fn keys(res: &Result) -> Result<&[String], &E> { + res.as_ref().map(|kr| kr.keys.as_slice()) +} diff --git a/wit/deps/keyvalue-2024-10-17/atomic.wit b/wit/deps/keyvalue-2024-10-17/atomic.wit new file mode 100644 index 0000000000..2c3e0d047b --- /dev/null +++ b/wit/deps/keyvalue-2024-10-17/atomic.wit @@ -0,0 +1,46 @@ +/// A keyvalue interface that provides atomic operations. +/// +/// Atomic operations are single, indivisible operations. When a fault causes an atomic operation to +/// fail, it will appear to the invoker of the atomic operation that the action either completed +/// successfully or did nothing at all. +/// +/// Please note that this interface is bare functions that take a reference to a bucket. This is to +/// get around the current lack of a way to "extend" a resource with additional methods inside of +/// wit. Future version of the interface will instead extend these methods on the base `bucket` +/// resource. +interface atomics { + use store.{bucket, error}; + + /// The error returned by a CAS operation + variant cas-error { + /// A store error occurred when performing the operation + store-error(error), + /// The CAS operation failed because the value was too old. This returns a new CAS handle + /// for easy retries. Implementors MUST return a CAS handle that has been updated to the + /// latest version or transaction. + cas-failed(cas), + } + + /// A handle to a CAS (compare-and-swap) operation. + resource cas { + /// Construct a new CAS operation. Implementors can map the underlying functionality + /// (transactions, versions, etc) as desired. + new: static func(bucket: borrow, key: string) -> result; + /// Get the current value of the key (if it exists). This allows for avoiding reads if all + /// that is needed to ensure the atomicity of the operation + current: func() -> result>, error>; + } + + /// Atomically increment the value associated with the key in the store by the given delta. It + /// returns the new value. + /// + /// If the key does not exist in the store, it creates a new key-value pair with the value set + /// to the given delta. + /// + /// If any other error occurs, it returns an `Err(error)`. + increment: func(bucket: borrow, key: string, delta: s64) -> result; + + /// Perform the swap on a CAS operation. This consumes the CAS handle and returns an error if + /// the CAS operation failed. + swap: func(cas: cas, value: list) -> result<_, cas-error>; +} diff --git a/wit/deps/keyvalue-2024-10-17/batch.wit b/wit/deps/keyvalue-2024-10-17/batch.wit new file mode 100644 index 0000000000..6d6e873553 --- /dev/null +++ b/wit/deps/keyvalue-2024-10-17/batch.wit @@ -0,0 +1,63 @@ +/// A keyvalue interface that provides batch operations. +/// +/// A batch operation is an operation that operates on multiple keys at once. +/// +/// Batch operations are useful for reducing network round-trip time. For example, if you want to +/// get the values associated with 100 keys, you can either do 100 get operations or you can do 1 +/// batch get operation. The batch operation is faster because it only needs to make 1 network call +/// instead of 100. +/// +/// A batch operation does not guarantee atomicity, meaning that if the batch operation fails, some +/// of the keys may have been modified and some may not. +/// +/// This interface does has the same consistency guarantees as the `store` interface, meaning that +/// you should be able to "read your writes." +/// +/// Please note that this interface is bare functions that take a reference to a bucket. This is to +/// get around the current lack of a way to "extend" a resource with additional methods inside of +/// wit. Future version of the interface will instead extend these methods on the base `bucket` +/// resource. +interface batch { + use store.{bucket, error}; + + /// Get the key-value pairs associated with the keys in the store. It returns a list of + /// key-value pairs. + /// + /// If any of the keys do not exist in the store, it returns a `none` value for that pair in the + /// list. + /// + /// MAY show an out-of-date value if there are concurrent writes to the store. + /// + /// If any other error occurs, it returns an `Err(error)`. + get-many: func(bucket: borrow, keys: list) -> result>>>, error>; + + /// Set the values associated with the keys in the store. If the key already exists in the + /// store, it overwrites the value. + /// + /// Note that the key-value pairs are not guaranteed to be set in the order they are provided. + /// + /// If any of the keys do not exist in the store, it creates a new key-value pair. + /// + /// If any other error occurs, it returns an `Err(error)`. When an error occurs, it does not + /// rollback the key-value pairs that were already set. Thus, this batch operation does not + /// guarantee atomicity, implying that some key-value pairs could be set while others might + /// fail. + /// + /// Other concurrent operations may also be able to see the partial results. + set-many: func(bucket: borrow, key-values: list>>) -> result<_, error>; + + /// Delete the key-value pairs associated with the keys in the store. + /// + /// Note that the key-value pairs are not guaranteed to be deleted in the order they are + /// provided. + /// + /// If any of the keys do not exist in the store, it skips the key. + /// + /// If any other error occurs, it returns an `Err(error)`. When an error occurs, it does not + /// rollback the key-value pairs that were already deleted. Thus, this batch operation does not + /// guarantee atomicity, implying that some key-value pairs could be deleted while others might + /// fail. + /// + /// Other concurrent operations may also be able to see the partial results. + delete-many: func(bucket: borrow, keys: list) -> result<_, error>; +} diff --git a/wit/deps/keyvalue-2024-10-17/store.wit b/wit/deps/keyvalue-2024-10-17/store.wit new file mode 100644 index 0000000000..c7fef41135 --- /dev/null +++ b/wit/deps/keyvalue-2024-10-17/store.wit @@ -0,0 +1,122 @@ +/// A keyvalue interface that provides eventually consistent key-value operations. +/// +/// Each of these operations acts on a single key-value pair. +/// +/// The value in the key-value pair is defined as a `u8` byte array and the intention is that it is +/// the common denominator for all data types defined by different key-value stores to handle data, +/// ensuring compatibility between different key-value stores. Note: the clients will be expecting +/// serialization/deserialization overhead to be handled by the key-value store. The value could be +/// a serialized object from JSON, HTML or vendor-specific data types like AWS S3 objects. +/// +/// Data consistency in a key value store refers to the guarantee that once a write operation +/// completes, all subsequent read operations will return the value that was written. +/// +/// Any implementation of this interface must have enough consistency to guarantee "reading your +/// writes." In particular, this means that the client should never get a value that is older than +/// the one it wrote, but it MAY get a newer value if one was written around the same time. These +/// guarantees only apply to the same client (which will likely be provided by the host or an +/// external capability of some kind). In this context a "client" is referring to the caller or +/// guest that is consuming this interface. Once a write request is committed by a specific client, +/// all subsequent read requests by the same client will reflect that write or any subsequent +/// writes. Another client running in a different context may or may not immediately see the result +/// due to the replication lag. As an example of all of this, if a value at a given key is A, and +/// the client writes B, then immediately reads, it should get B. If something else writes C in +/// quick succession, then the client may get C. However, a client running in a separate context may +/// still see A or B +interface store { + /// The set of errors which may be raised by functions in this package + variant error { + /// The host does not recognize the store identifier requested. + no-such-store, + + /// The requesting component does not have access to the specified store + /// (which may or may not exist). + access-denied, + + /// Some implementation-specific error has occurred (e.g. I/O) + other(string) + } + + /// A response to a `list-keys` operation. + record key-response { + /// The list of keys returned by the query. + keys: list, + /// The continuation token to use to fetch the next page of keys. If this is `null`, then + /// there are no more keys to fetch. + cursor: option + } + + /// Get the bucket with the specified identifier. + /// + /// `identifier` must refer to a bucket provided by the host. + /// + /// `error::no-such-store` will be raised if the `identifier` is not recognized. + open: func(identifier: string) -> result; + + /// A bucket is a collection of key-value pairs. Each key-value pair is stored as a entry in the + /// bucket, and the bucket itself acts as a collection of all these entries. + /// + /// It is worth noting that the exact terminology for bucket in key-value stores can very + /// depending on the specific implementation. For example: + /// + /// 1. Amazon DynamoDB calls a collection of key-value pairs a table + /// 2. Redis has hashes, sets, and sorted sets as different types of collections + /// 3. Cassandra calls a collection of key-value pairs a column family + /// 4. MongoDB calls a collection of key-value pairs a collection + /// 5. Riak calls a collection of key-value pairs a bucket + /// 6. Memcached calls a collection of key-value pairs a slab + /// 7. Azure Cosmos DB calls a collection of key-value pairs a container + /// + /// In this interface, we use the term `bucket` to refer to a collection of key-value pairs + resource bucket { + /// Get the value associated with the specified `key` + /// + /// The value is returned as an option. If the key-value pair exists in the + /// store, it returns `Ok(value)`. If the key does not exist in the + /// store, it returns `Ok(none)`. + /// + /// If any other error occurs, it returns an `Err(error)`. + get: func(key: string) -> result>, error>; + + /// Set the value associated with the key in the store. If the key already + /// exists in the store, it overwrites the value. + /// + /// If the key does not exist in the store, it creates a new key-value pair. + /// + /// If any other error occurs, it returns an `Err(error)`. + set: func(key: string, value: list) -> result<_, error>; + + /// Delete the key-value pair associated with the key in the store. + /// + /// If the key does not exist in the store, it does nothing. + /// + /// If any other error occurs, it returns an `Err(error)`. + delete: func(key: string) -> result<_, error>; + + /// Check if the key exists in the store. + /// + /// If the key exists in the store, it returns `Ok(true)`. If the key does + /// not exist in the store, it returns `Ok(false)`. + /// + /// If any other error occurs, it returns an `Err(error)`. + exists: func(key: string) -> result; + + /// Get all the keys in the store with an optional cursor (for use in pagination). It + /// returns a list of keys. Please note that for most KeyValue implementations, this is a + /// can be a very expensive operation and so it should be used judiciously. Implementations + /// can return any number of keys in a single response, but they should never attempt to + /// send more data than is reasonable (i.e. on a small edge device, this may only be a few + /// KB, while on a large machine this could be several MB). Any response should also return + /// a cursor that can be used to fetch the next page of keys. See the `key-response` record + /// for more information. + /// + /// Note that the keys are not guaranteed to be returned in any particular order. + /// + /// If the store is empty, it returns an empty list. + /// + /// MAY show an out-of-date list of keys if there are concurrent writes to the store. + /// + /// If any error occurs, it returns an `Err(error)`. + list-keys: func(cursor: option) -> result; + } +} diff --git a/wit/deps/keyvalue-2024-10-17/watch.wit b/wit/deps/keyvalue-2024-10-17/watch.wit new file mode 100644 index 0000000000..9299119624 --- /dev/null +++ b/wit/deps/keyvalue-2024-10-17/watch.wit @@ -0,0 +1,16 @@ +/// A keyvalue interface that provides watch operations. +/// +/// This interface is used to provide event-driven mechanisms to handle +/// keyvalue changes. +interface watcher { + /// A keyvalue interface that provides handle-watch operations. + use store.{bucket}; + + /// Handle the `set` event for the given bucket and key. It includes a reference to the `bucket` + /// that can be used to interact with the store. + on-set: func(bucket: bucket, key: string, value: list); + + /// Handle the `delete` event for the given bucket and key. It includes a reference to the + /// `bucket` that can be used to interact with the store. + on-delete: func(bucket: bucket, key: string); +} diff --git a/wit/deps/keyvalue-2024-10-17/world.wit b/wit/deps/keyvalue-2024-10-17/world.wit new file mode 100644 index 0000000000..64eb4e1225 --- /dev/null +++ b/wit/deps/keyvalue-2024-10-17/world.wit @@ -0,0 +1,26 @@ +package wasi: keyvalue@0.2.0-draft2; + +/// The `wasi:keyvalue/imports` world provides common APIs for interacting with key-value stores. +/// Components targeting this world will be able to do: +/// +/// 1. CRUD (create, read, update, delete) operations on key-value stores. +/// 2. Atomic `increment` and CAS (compare-and-swap) operations. +/// 3. Batch operations that can reduce the number of round trips to the network. +world imports { + /// The `store` capability allows the component to perform eventually consistent operations on + /// the key-value store. + import store; + + /// The `atomic` capability allows the component to perform atomic / `increment` and CAS + /// (compare-and-swap) operations. + import atomics; + + /// The `batch` capability allows the component to perform eventually consistent batch + /// operations that can reduce the number of round trips to the network. + import batch; +} + +world watch-service { + include imports; + export watcher; +} diff --git a/wit/world.wit b/wit/world.wit index 55a369174b..07da9cd4bf 100644 --- a/wit/world.wit +++ b/wit/world.wit @@ -9,6 +9,7 @@ world http-trigger { /// The imports needed for a guest to run on a Spin host world platform { include fermyon:spin/platform@2.0.0; + include wasi:keyvalue/imports@0.2.0-draft2; import spin:postgres/postgres@3.0.0; import wasi:config/store@0.2.0-draft-2024-09-27; }