Skip to content

Commit

Permalink
feat: added key and upload_id to Multipart uploads
Browse files Browse the repository at this point in the history
  • Loading branch information
KernelFreeze committed Oct 22, 2023
1 parent a71f413 commit 0a54c9e
Show file tree
Hide file tree
Showing 10 changed files with 109 additions and 107 deletions.
4 changes: 2 additions & 2 deletions lib/worker/src/body/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use wasm_bindgen::JsCast;
use crate::body::wasm::WasmStreamBody;
use crate::body::HttpBody;
use crate::error::WorkerError;
use crate::futures::SendJsFuture;
use crate::futures::future_from_promise;

type BoxBody = http_body::combinators::UnsyncBoxBody<Bytes, WorkerError>;

Expand Down Expand Up @@ -90,7 +90,7 @@ impl Body {
buf: Result<js_sys::Promise, wasm_bindgen::JsValue>,
) -> Bytes {
// Unwrapping only panics when the body has already been accessed before
let fut = SendJsFuture::from(buf.unwrap());
let fut = future_from_promise(buf.unwrap());
let buf = js_sys::Uint8Array::new(&fut.await.unwrap());
buf.to_vec().into()
}
Expand Down
10 changes: 5 additions & 5 deletions lib/worker/src/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use serde::Serialize;
use wasm_bindgen::JsCast;

use crate::body::Body;
use crate::futures::SendJsFuture;
use crate::futures::future_from_promise;
use crate::http::{request, response};
use crate::prelude::WorkerError;
use crate::result::Result;
Expand Down Expand Up @@ -60,7 +60,7 @@ impl Cache {
let global: web_sys::WorkerGlobalScope = js_sys::global().unchecked_into();
let cache = global.caches().unwrap().open(&name);

SendJsFuture::from(cache)
future_from_promise(cache)
};

// unwrap is safe because this promise never rejects
Expand Down Expand Up @@ -90,7 +90,7 @@ impl Cache {
CacheKey::Request(req) => self.0.put_with_request(&req, &res.into().0),
};

SendJsFuture::from(promise)
future_from_promise(promise)
};

fut.await.map_err(WorkerError::from_promise_err)?;
Expand Down Expand Up @@ -135,7 +135,7 @@ impl Cache {
CacheKey::Request(req) => self.0.match_with_request_and_options(&req, &options),
};

SendJsFuture::from(promise)
future_from_promise(promise)
};

// `match` returns either a response or undefined
Expand Down Expand Up @@ -167,7 +167,7 @@ impl Cache {
CacheKey::Request(req) => self.0.delete_with_request_and_options(&req, &options),
};

SendJsFuture::from(promise)
future_from_promise(promise)
};

let result = fut.await.map_err(WorkerError::from_promise_err)?;
Expand Down
118 changes: 63 additions & 55 deletions lib/worker/src/durable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use crate::body::Body;
use crate::date::Date;
use crate::env::Env;
use crate::error::WorkerError;
use crate::futures::SendJsFuture;
use crate::futures::future_from_promise;
use crate::http::{request, response};
use crate::result::Result;

Expand All @@ -54,7 +54,7 @@ impl Stub {
let req = request::into_web_sys_request(req);
let promise = self.inner.fetch_with_request(&req);

SendJsFuture::from(promise)
future_from_promise(promise)
};

let res = fut
Expand All @@ -70,7 +70,7 @@ impl Stub {
pub async fn fetch_with_str(&self, url: &str) -> Result<http::Response<Body>> {
let fut = {
let promise = self.inner.fetch_with_str(url);
SendJsFuture::from(promise)
future_from_promise(promise)
};

let res = fut
Expand Down Expand Up @@ -289,7 +289,7 @@ impl Storage {
/// Returns [Err] if the key does not exist.
pub async fn get<T: serde::de::DeserializeOwned>(&self, key: &str) -> Result<T> {
let promise = self.inner.get(key).map_err(WorkerError::from_js_err)?;
let fut = SendJsFuture::from(promise);
let fut = future_from_promise(promise);
fut.await
.and_then(|val| {
if val.is_undefined() {
Expand All @@ -312,7 +312,7 @@ impl Storage {
.collect(),
)
.map_err(WorkerError::from_js_err)?;
SendJsFuture::from(keys)
future_from_promise(keys)
};

let keys = fut.await.map_err(WorkerError::from_promise_err)?;
Expand All @@ -325,7 +325,7 @@ impl Storage {
.inner
.put(key, serde_wasm_bindgen::to_value(&value)?)
.map_err(WorkerError::from_js_err)?;
let fut = SendJsFuture::from(promise);
let fut = future_from_promise(promise);
fut.await.map_err(WorkerError::from_promise_err).map(|_| ())
}

Expand All @@ -337,7 +337,7 @@ impl Storage {
if !values.is_object() {
return Err(WorkerError::MustPassInStructType);
}
SendJsFuture::from(
future_from_promise(
self.inner
.put_multiple(values)
.map_err(WorkerError::from_js_err)?,
Expand All @@ -349,11 +349,11 @@ impl Storage {
/// Deletes the key and associated value. Returns true if the key existed or
/// false if it didn't.
pub async fn delete(&mut self, key: &str) -> Result<bool> {
let fut: SendJsFuture = self
let fut = self
.inner
.delete(key)
.map_err(WorkerError::from_js_err)?
.into();
.map(future_from_promise)
.map_err(WorkerError::from_js_err)?;
fut.await
.and_then(|jsv| {
jsv.as_bool()
Expand All @@ -365,15 +365,15 @@ impl Storage {
/// Deletes the provided keys and their associated values. Returns a count
/// of the number of key-value pairs deleted.
pub async fn delete_multiple(&mut self, keys: Vec<impl Deref<Target = str>>) -> Result<usize> {
let fut: SendJsFuture = self
let fut = self
.inner
.delete_multiple(
keys.into_iter()
.map(|key| JsValue::from(key.deref()))
.collect(),
)
.map_err(WorkerError::from_js_err)?
.into();
.map(future_from_promise)
.map_err(WorkerError::from_js_err)?;
fut.await
.and_then(|jsv| {
jsv.as_f64()
Expand All @@ -388,11 +388,11 @@ impl Storage {
/// the operation is still in flight, it may be that only a subset of
/// the data is properly deleted.
pub async fn delete_all(&mut self) -> Result<()> {
let fut: SendJsFuture = self
let fut = self
.inner
.delete_all()
.map_err(WorkerError::from_js_err)?
.into();
.map(future_from_promise)
.map_err(WorkerError::from_js_err)?;
fut.await.map(|_| ()).map_err(WorkerError::from_promise_err)
}

Expand All @@ -404,7 +404,11 @@ impl Storage {
/// loaded into the Durable Object's memory, potentially hitting its [limit](https://developers.cloudflare.com/workers/platform/limits#durable-objects-limits).
/// If that is a concern, use the alternate `list_with_options()` method.
pub async fn list(&self) -> Result<Map> {
let fut: SendJsFuture = self.inner.list().map_err(WorkerError::from_js_err)?.into();
let fut = self
.inner
.list()
.map(future_from_promise)
.map_err(WorkerError::from_js_err)?;
fut.await
.and_then(|jsv| jsv.dyn_into())
.map_err(WorkerError::from_promise_err)
Expand All @@ -413,11 +417,11 @@ impl Storage {
/// Returns keys associated with the current Durable Object according to the
/// parameters in the provided options object.
pub async fn list_with_options(&self, opts: ListOptions<'_>) -> Result<Map> {
let fut: SendJsFuture = self
let fut = self
.inner
.list_with_options(serde_wasm_bindgen::to_value(&opts)?.into())
.map_err(WorkerError::from_js_err)?
.into();
.map(future_from_promise)
.map_err(WorkerError::from_js_err)?;
fut.await
.and_then(|jsv| jsv.dyn_into())
.map_err(WorkerError::from_promise_err)
Expand All @@ -428,22 +432,22 @@ impl Storage {
/// if it has failed and any retry has not begun. If no alarm is set,
/// `get_alarm()` returns `None`.
pub async fn get_alarm(&self) -> Result<Option<i64>> {
let fut: SendJsFuture = self
let fut = self
.inner
.get_alarm(JsValue::NULL.into())
.map_err(WorkerError::from_js_err)?
.into();
.map(future_from_promise)
.map_err(WorkerError::from_js_err)?;
fut.await
.map(|jsv| jsv.as_f64().map(|f| f as i64))
.map_err(WorkerError::from_promise_err)
}

pub async fn get_alarm_with_options(&self, options: GetAlarmOptions) -> Result<Option<i64>> {
let fut: SendJsFuture = self
let fut = self
.inner
.get_alarm(serde_wasm_bindgen::to_value(&options)?.into())
.map_err(WorkerError::from_js_err)?
.into();
.map(future_from_promise)
.map_err(WorkerError::from_js_err)?;
fut.await
.map(|jsv| jsv.as_f64().map(|f| f as i64))
.map_err(WorkerError::from_promise_err)
Expand All @@ -459,45 +463,45 @@ impl Storage {
/// the set time, but can be delayed by up to a minute
/// due to maintenance or failures while failover takes place.
pub async fn set_alarm(&self, scheduled_time: impl Into<ScheduledTime>) -> Result<()> {
let fut: SendJsFuture = self
let fut = self
.inner
.set_alarm(scheduled_time.into().schedule(), JsValue::NULL.into())
.map_err(WorkerError::from_js_err)?
.into();
.map(future_from_promise)
.map_err(WorkerError::from_js_err)?;
fut.await.map(|_| ()).map_err(WorkerError::from_promise_err)
}

pub async fn set_alarm_with_options(
&self, scheduled_time: impl Into<ScheduledTime>, options: SetAlarmOptions,
) -> Result<()> {
let fut: SendJsFuture = self
let fut = self
.inner
.set_alarm(
scheduled_time.into().schedule(),
serde_wasm_bindgen::to_value(&options)?.into(),
)
.map_err(WorkerError::from_js_err)?
.into();
.map(future_from_promise)
.map_err(WorkerError::from_js_err)?;
fut.await.map(|_| ()).map_err(WorkerError::from_promise_err)
}

/// Deletes the alarm if one exists. Does not cancel the alarm handler if it
/// is currently executing.
pub async fn delete_alarm(&self) -> Result<()> {
let fut: SendJsFuture = self
let fut = self
.inner
.delete_alarm(JsValue::NULL.into())
.map_err(WorkerError::from_js_err)?
.into();
.map(future_from_promise)
.map_err(WorkerError::from_js_err)?;
fut.await.map(|_| ()).map_err(WorkerError::from_promise_err)
}

pub async fn delete_alarm_with_options(&self, options: SetAlarmOptions) -> Result<()> {
let fut: SendJsFuture = self
let fut = self
.inner
.delete_alarm(serde_wasm_bindgen::to_value(&options)?.into())
.map_err(WorkerError::from_js_err)?
.into();
.map(future_from_promise)
.map_err(WorkerError::from_js_err)?;
fut.await.map(|_| ()).map_err(WorkerError::from_promise_err)
}
}
Expand All @@ -511,7 +515,7 @@ struct Transaction {
impl Transaction {
async fn get<T: DeserializeOwned>(&self, key: &str) -> Result<T> {
let promise = self.inner.get(key).map_err(WorkerError::from_js_err)?;
let fut = SendJsFuture::from(promise);
let fut = future_from_promise(promise);

fut.await
.and_then(|val| {
Expand All @@ -534,7 +538,7 @@ impl Transaction {
.collect(),
)
.map_err(WorkerError::from_js_err)?;
SendJsFuture::from(keys)
future_from_promise(keys)
};

let keys = fut.await.map_err(WorkerError::from_promise_err)?;
Expand All @@ -546,7 +550,7 @@ impl Transaction {
.inner
.put(key, serde_wasm_bindgen::to_value(&value)?)
.map_err(WorkerError::from_js_err)?;
let fut = SendJsFuture::from(promise);
let fut = future_from_promise(promise);
fut.await.map_err(WorkerError::from_promise_err).map(|_| ())
}

Expand All @@ -562,18 +566,18 @@ impl Transaction {
.inner
.put_multiple(values)
.map_err(WorkerError::from_js_err)?;
SendJsFuture::from(promise)
future_from_promise(promise)
};

fut.await.map_err(WorkerError::from_promise_err).map(|_| ())
}

async fn delete(&mut self, key: &str) -> Result<bool> {
let fut: SendJsFuture = self
let fut = self
.inner
.delete(key)
.map_err(WorkerError::from_js_err)?
.into();
.map(future_from_promise)
.map_err(WorkerError::from_js_err)?;
fut.await
.and_then(|jsv| {
jsv.as_bool()
Expand All @@ -583,15 +587,15 @@ impl Transaction {
}

async fn delete_multiple(&mut self, keys: Vec<impl Deref<Target = str>>) -> Result<usize> {
let fut: SendJsFuture = self
let fut = self
.inner
.delete_multiple(
keys.into_iter()
.map(|key| JsValue::from(key.deref()))
.collect(),
)
.map_err(WorkerError::from_js_err)?
.into();
.map(future_from_promise)
.map_err(WorkerError::from_js_err)?;
fut.await
.and_then(|jsv| {
jsv.as_f64()
Expand All @@ -602,27 +606,31 @@ impl Transaction {
}

async fn delete_all(&mut self) -> Result<()> {
let fut: SendJsFuture = self
let fut = self
.inner
.delete_all()
.map_err(WorkerError::from_js_err)?
.into();
.map(future_from_promise)
.map_err(WorkerError::from_js_err)?;
fut.await.map(|_| ()).map_err(WorkerError::from_promise_err)
}

async fn list(&self) -> Result<Map> {
let fut: SendJsFuture = self.inner.list().map_err(WorkerError::from_js_err)?.into();
let fut = self
.inner
.list()
.map(future_from_promise)
.map_err(WorkerError::from_js_err)?;
fut.await
.and_then(|jsv| jsv.dyn_into())
.map_err(WorkerError::from_promise_err)
}

async fn list_with_options(&self, opts: ListOptions<'_>) -> Result<Map> {
let fut: SendJsFuture = self
let fut = self
.inner
.list_with_options(serde_wasm_bindgen::to_value(&opts)?.into())
.map_err(WorkerError::from_js_err)?
.into();
.map(future_from_promise)
.map_err(WorkerError::from_js_err)?;
fut.await
.and_then(|jsv| jsv.dyn_into())
.map_err(WorkerError::from_promise_err)
Expand Down
Loading

0 comments on commit 0a54c9e

Please sign in to comment.