From 0a54c9e63926c208d28e317f0b4adc413ce15a11 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Celeste=20Pel=C3=A1ez?= Date: Sat, 21 Oct 2023 23:17:43 -0500 Subject: [PATCH] feat: added `key` and `upload_id` to Multipart uploads --- lib/worker/src/body/body.rs | 4 +- lib/worker/src/cache.rs | 10 +-- lib/worker/src/durable.rs | 118 +++++++++++++++++++---------------- lib/worker/src/fetch.rs | 4 +- lib/worker/src/fetcher.rs | 4 +- lib/worker/src/futures.rs | 32 ++-------- lib/worker/src/lib.rs | 1 - lib/worker/src/queue.rs | 4 +- lib/worker/src/r2/builder.rs | 10 +-- lib/worker/src/r2/mod.rs | 29 ++++++--- 10 files changed, 109 insertions(+), 107 deletions(-) diff --git a/lib/worker/src/body/body.rs b/lib/worker/src/body/body.rs index 1c3de2e..bd15098 100644 --- a/lib/worker/src/body/body.rs +++ b/lib/worker/src/body/body.rs @@ -11,7 +11,7 @@ use wasm_bindgen::JsCast; use crate::body::wasm::WasmStreamBody; use crate::body::HttpBody; use crate::error::WorkerError; -use crate::futures::SendJsFuture; +use crate::futures::future_from_promise; type BoxBody = http_body::combinators::UnsyncBoxBody; @@ -90,7 +90,7 @@ impl Body { buf: Result, ) -> Bytes { // Unwrapping only panics when the body has already been accessed before - let fut = SendJsFuture::from(buf.unwrap()); + let fut = future_from_promise(buf.unwrap()); let buf = js_sys::Uint8Array::new(&fut.await.unwrap()); buf.to_vec().into() } diff --git a/lib/worker/src/cache.rs b/lib/worker/src/cache.rs index d091146..ec2e058 100644 --- a/lib/worker/src/cache.rs +++ b/lib/worker/src/cache.rs @@ -4,7 +4,7 @@ use serde::Serialize; use wasm_bindgen::JsCast; use crate::body::Body; -use crate::futures::SendJsFuture; +use crate::futures::future_from_promise; use crate::http::{request, response}; use crate::prelude::WorkerError; use crate::result::Result; @@ -60,7 +60,7 @@ impl Cache { let global: web_sys::WorkerGlobalScope = js_sys::global().unchecked_into(); let cache = global.caches().unwrap().open(&name); - SendJsFuture::from(cache) + future_from_promise(cache) }; // unwrap is safe because this promise never rejects @@ -90,7 +90,7 @@ impl Cache { CacheKey::Request(req) => self.0.put_with_request(&req, &res.into().0), }; - SendJsFuture::from(promise) + future_from_promise(promise) }; fut.await.map_err(WorkerError::from_promise_err)?; @@ -135,7 +135,7 @@ impl Cache { CacheKey::Request(req) => self.0.match_with_request_and_options(&req, &options), }; - SendJsFuture::from(promise) + future_from_promise(promise) }; // `match` returns either a response or undefined @@ -167,7 +167,7 @@ impl Cache { CacheKey::Request(req) => self.0.delete_with_request_and_options(&req, &options), }; - SendJsFuture::from(promise) + future_from_promise(promise) }; let result = fut.await.map_err(WorkerError::from_promise_err)?; diff --git a/lib/worker/src/durable.rs b/lib/worker/src/durable.rs index 635948c..6cc7d7d 100644 --- a/lib/worker/src/durable.rs +++ b/lib/worker/src/durable.rs @@ -35,7 +35,7 @@ use crate::body::Body; use crate::date::Date; use crate::env::Env; use crate::error::WorkerError; -use crate::futures::SendJsFuture; +use crate::futures::future_from_promise; use crate::http::{request, response}; use crate::result::Result; @@ -54,7 +54,7 @@ impl Stub { let req = request::into_web_sys_request(req); let promise = self.inner.fetch_with_request(&req); - SendJsFuture::from(promise) + future_from_promise(promise) }; let res = fut @@ -70,7 +70,7 @@ impl Stub { pub async fn fetch_with_str(&self, url: &str) -> Result> { let fut = { let promise = self.inner.fetch_with_str(url); - SendJsFuture::from(promise) + future_from_promise(promise) }; let res = fut @@ -289,7 +289,7 @@ impl Storage { /// Returns [Err] if the key does not exist. pub async fn get(&self, key: &str) -> Result { let promise = self.inner.get(key).map_err(WorkerError::from_js_err)?; - let fut = SendJsFuture::from(promise); + let fut = future_from_promise(promise); fut.await .and_then(|val| { if val.is_undefined() { @@ -312,7 +312,7 @@ impl Storage { .collect(), ) .map_err(WorkerError::from_js_err)?; - SendJsFuture::from(keys) + future_from_promise(keys) }; let keys = fut.await.map_err(WorkerError::from_promise_err)?; @@ -325,7 +325,7 @@ impl Storage { .inner .put(key, serde_wasm_bindgen::to_value(&value)?) .map_err(WorkerError::from_js_err)?; - let fut = SendJsFuture::from(promise); + let fut = future_from_promise(promise); fut.await.map_err(WorkerError::from_promise_err).map(|_| ()) } @@ -337,7 +337,7 @@ impl Storage { if !values.is_object() { return Err(WorkerError::MustPassInStructType); } - SendJsFuture::from( + future_from_promise( self.inner .put_multiple(values) .map_err(WorkerError::from_js_err)?, @@ -349,11 +349,11 @@ impl Storage { /// Deletes the key and associated value. Returns true if the key existed or /// false if it didn't. pub async fn delete(&mut self, key: &str) -> Result { - let fut: SendJsFuture = self + let fut = self .inner .delete(key) - .map_err(WorkerError::from_js_err)? - .into(); + .map(future_from_promise) + .map_err(WorkerError::from_js_err)?; fut.await .and_then(|jsv| { jsv.as_bool() @@ -365,15 +365,15 @@ impl Storage { /// Deletes the provided keys and their associated values. Returns a count /// of the number of key-value pairs deleted. pub async fn delete_multiple(&mut self, keys: Vec>) -> Result { - let fut: SendJsFuture = self + let fut = self .inner .delete_multiple( keys.into_iter() .map(|key| JsValue::from(key.deref())) .collect(), ) - .map_err(WorkerError::from_js_err)? - .into(); + .map(future_from_promise) + .map_err(WorkerError::from_js_err)?; fut.await .and_then(|jsv| { jsv.as_f64() @@ -388,11 +388,11 @@ impl Storage { /// the operation is still in flight, it may be that only a subset of /// the data is properly deleted. pub async fn delete_all(&mut self) -> Result<()> { - let fut: SendJsFuture = self + let fut = self .inner .delete_all() - .map_err(WorkerError::from_js_err)? - .into(); + .map(future_from_promise) + .map_err(WorkerError::from_js_err)?; fut.await.map(|_| ()).map_err(WorkerError::from_promise_err) } @@ -404,7 +404,11 @@ impl Storage { /// loaded into the Durable Object's memory, potentially hitting its [limit](https://developers.cloudflare.com/workers/platform/limits#durable-objects-limits). /// If that is a concern, use the alternate `list_with_options()` method. pub async fn list(&self) -> Result { - let fut: SendJsFuture = self.inner.list().map_err(WorkerError::from_js_err)?.into(); + let fut = self + .inner + .list() + .map(future_from_promise) + .map_err(WorkerError::from_js_err)?; fut.await .and_then(|jsv| jsv.dyn_into()) .map_err(WorkerError::from_promise_err) @@ -413,11 +417,11 @@ impl Storage { /// Returns keys associated with the current Durable Object according to the /// parameters in the provided options object. pub async fn list_with_options(&self, opts: ListOptions<'_>) -> Result { - let fut: SendJsFuture = self + let fut = self .inner .list_with_options(serde_wasm_bindgen::to_value(&opts)?.into()) - .map_err(WorkerError::from_js_err)? - .into(); + .map(future_from_promise) + .map_err(WorkerError::from_js_err)?; fut.await .and_then(|jsv| jsv.dyn_into()) .map_err(WorkerError::from_promise_err) @@ -428,22 +432,22 @@ impl Storage { /// if it has failed and any retry has not begun. If no alarm is set, /// `get_alarm()` returns `None`. pub async fn get_alarm(&self) -> Result> { - let fut: SendJsFuture = self + let fut = self .inner .get_alarm(JsValue::NULL.into()) - .map_err(WorkerError::from_js_err)? - .into(); + .map(future_from_promise) + .map_err(WorkerError::from_js_err)?; fut.await .map(|jsv| jsv.as_f64().map(|f| f as i64)) .map_err(WorkerError::from_promise_err) } pub async fn get_alarm_with_options(&self, options: GetAlarmOptions) -> Result> { - let fut: SendJsFuture = self + let fut = self .inner .get_alarm(serde_wasm_bindgen::to_value(&options)?.into()) - .map_err(WorkerError::from_js_err)? - .into(); + .map(future_from_promise) + .map_err(WorkerError::from_js_err)?; fut.await .map(|jsv| jsv.as_f64().map(|f| f as i64)) .map_err(WorkerError::from_promise_err) @@ -459,45 +463,45 @@ impl Storage { /// the set time, but can be delayed by up to a minute /// due to maintenance or failures while failover takes place. pub async fn set_alarm(&self, scheduled_time: impl Into) -> Result<()> { - let fut: SendJsFuture = self + let fut = self .inner .set_alarm(scheduled_time.into().schedule(), JsValue::NULL.into()) - .map_err(WorkerError::from_js_err)? - .into(); + .map(future_from_promise) + .map_err(WorkerError::from_js_err)?; fut.await.map(|_| ()).map_err(WorkerError::from_promise_err) } pub async fn set_alarm_with_options( &self, scheduled_time: impl Into, options: SetAlarmOptions, ) -> Result<()> { - let fut: SendJsFuture = self + let fut = self .inner .set_alarm( scheduled_time.into().schedule(), serde_wasm_bindgen::to_value(&options)?.into(), ) - .map_err(WorkerError::from_js_err)? - .into(); + .map(future_from_promise) + .map_err(WorkerError::from_js_err)?; fut.await.map(|_| ()).map_err(WorkerError::from_promise_err) } /// Deletes the alarm if one exists. Does not cancel the alarm handler if it /// is currently executing. pub async fn delete_alarm(&self) -> Result<()> { - let fut: SendJsFuture = self + let fut = self .inner .delete_alarm(JsValue::NULL.into()) - .map_err(WorkerError::from_js_err)? - .into(); + .map(future_from_promise) + .map_err(WorkerError::from_js_err)?; fut.await.map(|_| ()).map_err(WorkerError::from_promise_err) } pub async fn delete_alarm_with_options(&self, options: SetAlarmOptions) -> Result<()> { - let fut: SendJsFuture = self + let fut = self .inner .delete_alarm(serde_wasm_bindgen::to_value(&options)?.into()) - .map_err(WorkerError::from_js_err)? - .into(); + .map(future_from_promise) + .map_err(WorkerError::from_js_err)?; fut.await.map(|_| ()).map_err(WorkerError::from_promise_err) } } @@ -511,7 +515,7 @@ struct Transaction { impl Transaction { async fn get(&self, key: &str) -> Result { let promise = self.inner.get(key).map_err(WorkerError::from_js_err)?; - let fut = SendJsFuture::from(promise); + let fut = future_from_promise(promise); fut.await .and_then(|val| { @@ -534,7 +538,7 @@ impl Transaction { .collect(), ) .map_err(WorkerError::from_js_err)?; - SendJsFuture::from(keys) + future_from_promise(keys) }; let keys = fut.await.map_err(WorkerError::from_promise_err)?; @@ -546,7 +550,7 @@ impl Transaction { .inner .put(key, serde_wasm_bindgen::to_value(&value)?) .map_err(WorkerError::from_js_err)?; - let fut = SendJsFuture::from(promise); + let fut = future_from_promise(promise); fut.await.map_err(WorkerError::from_promise_err).map(|_| ()) } @@ -562,18 +566,18 @@ impl Transaction { .inner .put_multiple(values) .map_err(WorkerError::from_js_err)?; - SendJsFuture::from(promise) + future_from_promise(promise) }; fut.await.map_err(WorkerError::from_promise_err).map(|_| ()) } async fn delete(&mut self, key: &str) -> Result { - let fut: SendJsFuture = self + let fut = self .inner .delete(key) - .map_err(WorkerError::from_js_err)? - .into(); + .map(future_from_promise) + .map_err(WorkerError::from_js_err)?; fut.await .and_then(|jsv| { jsv.as_bool() @@ -583,15 +587,15 @@ impl Transaction { } async fn delete_multiple(&mut self, keys: Vec>) -> Result { - let fut: SendJsFuture = self + let fut = self .inner .delete_multiple( keys.into_iter() .map(|key| JsValue::from(key.deref())) .collect(), ) - .map_err(WorkerError::from_js_err)? - .into(); + .map(future_from_promise) + .map_err(WorkerError::from_js_err)?; fut.await .and_then(|jsv| { jsv.as_f64() @@ -602,27 +606,31 @@ impl Transaction { } async fn delete_all(&mut self) -> Result<()> { - let fut: SendJsFuture = self + let fut = self .inner .delete_all() - .map_err(WorkerError::from_js_err)? - .into(); + .map(future_from_promise) + .map_err(WorkerError::from_js_err)?; fut.await.map(|_| ()).map_err(WorkerError::from_promise_err) } async fn list(&self) -> Result { - let fut: SendJsFuture = self.inner.list().map_err(WorkerError::from_js_err)?.into(); + let fut = self + .inner + .list() + .map(future_from_promise) + .map_err(WorkerError::from_js_err)?; fut.await .and_then(|jsv| jsv.dyn_into()) .map_err(WorkerError::from_promise_err) } async fn list_with_options(&self, opts: ListOptions<'_>) -> Result { - let fut: SendJsFuture = self + let fut = self .inner .list_with_options(serde_wasm_bindgen::to_value(&opts)?.into()) - .map_err(WorkerError::from_js_err)? - .into(); + .map(future_from_promise) + .map_err(WorkerError::from_js_err)?; fut.await .and_then(|jsv| jsv.dyn_into()) .map_err(WorkerError::from_promise_err) diff --git a/lib/worker/src/fetch.rs b/lib/worker/src/fetch.rs index 22db535..91b6d98 100644 --- a/lib/worker/src/fetch.rs +++ b/lib/worker/src/fetch.rs @@ -3,7 +3,7 @@ use web_sys::WorkerGlobalScope; use crate::body::Body; use crate::error::WorkerError; -use crate::futures::SendJsFuture; +use crate::futures::future_from_promise; use crate::http::{request, response}; use crate::result::Result; @@ -37,7 +37,7 @@ pub async fn fetch(req: http::Request>) -> Result); - -impl Future for SendJsFuture { - type Output = ::Output; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - self.0.poll_unpin(cx) - } -} - -impl From for SendJsFuture { - fn from(p: Promise) -> Self { - Self(SendWrapper::new(JsFuture::from(p))) - } -} - -#[cfg(test)] -mod tests { - use static_assertions::assert_impl_all; - - use super::*; - - assert_impl_all!(SendJsFuture: Send, Sync, Unpin); +pub(crate) fn future_from_promise( + p: Promise, +) -> Pin> + Send + Sync + 'static>> { + Box::pin(SendWrapper::new(JsFuture::from(p))) } diff --git a/lib/worker/src/lib.rs b/lib/worker/src/lib.rs index 2b25bd6..b872e63 100644 --- a/lib/worker/src/lib.rs +++ b/lib/worker/src/lib.rs @@ -1,4 +1,3 @@ -#![feature(async_fn_in_trait)] #![deny(unsafe_code)] #[cfg(feature = "d1")] diff --git a/lib/worker/src/queue.rs b/lib/worker/src/queue.rs index 6d5dcb3..6491b14 100644 --- a/lib/worker/src/queue.rs +++ b/lib/worker/src/queue.rs @@ -11,7 +11,7 @@ use wasm_bindgen::prelude::*; use crate::date::Date; use crate::error::WorkerError; -use crate::futures::SendJsFuture; +use crate::futures::future_from_promise; use crate::result::Result; static BODY_KEY_STR: &str = "body"; @@ -193,7 +193,7 @@ impl Queue { T: Serialize, { let fut = { let js_value = serde_wasm_bindgen::to_value(message)?; - SendJsFuture::from(self.0.send(js_value)) + future_from_promise(self.0.send(js_value)) }; fut.await.map_err(WorkerError::from_promise_err)?; diff --git a/lib/worker/src/r2/builder.rs b/lib/worker/src/r2/builder.rs index 32cc7d9..507c701 100644 --- a/lib/worker/src/r2/builder.rs +++ b/lib/worker/src/r2/builder.rs @@ -12,7 +12,7 @@ use wasm_bindgen::{JsCast, JsValue}; use super::{Data, MultipartUpload, ObjectInner, R2Object, R2Objects}; use crate::date::Date; use crate::error::WorkerError; -use crate::futures::SendJsFuture; +use crate::futures::future_from_promise; use crate::result::Result; /// Options for configuring the [get](crate::r2::Bucket::get) operation. @@ -51,7 +51,7 @@ impl<'bucket> GetOptionsBuilder<'bucket> { .into(), ); - SendJsFuture::from(get_promise) + future_from_promise(get_promise) }; let value = fut.await.map_err(WorkerError::from_promise_err)?; @@ -214,7 +214,7 @@ impl<'bucket> PutOptionsBuilder<'bucket> { } .into(), ); - SendJsFuture::from(put_promise) + future_from_promise(put_promise) }; let res: EdgeR2Object = fut.await.map_err(WorkerError::from_promise_err)?.into(); @@ -276,7 +276,7 @@ impl<'bucket> CreateMultipartUploadOptionsBuilder<'bucket> { .into(), ); - SendJsFuture::from(create_multipart_upload_promise) + future_from_promise(create_multipart_upload_promise) }; let inner: EdgeR2MutipartUpload = fut.await.map_err(WorkerError::from_promise_err)?.into(); @@ -418,7 +418,7 @@ impl<'bucket> ListOptionsBuilder<'bucket> { .into(), ); - SendJsFuture::from(list_promise) + future_from_promise(list_promise) }; let inner = fut.await.map_err(WorkerError::from_promise_err)?.into(); diff --git a/lib/worker/src/r2/mod.rs b/lib/worker/src/r2/mod.rs index 587e52b..4398b76 100644 --- a/lib/worker/src/r2/mod.rs +++ b/lib/worker/src/r2/mod.rs @@ -14,7 +14,7 @@ use wasm_bindgen::{JsCast, JsValue}; use crate::date::Date; use crate::error::WorkerError; -use crate::futures::SendJsFuture; +use crate::futures::future_from_promise; use crate::result::Result; use crate::streams::{ByteStream, FixedLengthStream}; @@ -29,7 +29,7 @@ impl Bucket { pub async fn head(&self, key: impl Into) -> Result> { let fut = { let head_promise = self.0.head(key.into()); - SendJsFuture::from(head_promise) + future_from_promise(head_promise) }; let value = fut.await.map_err(WorkerError::from_promise_err)?; @@ -80,7 +80,7 @@ impl Bucket { pub async fn delete(&self, key: impl Into) -> Result<()> { let fut = { let delete_promise = self.0.delete(key.into()); - SendJsFuture::from(delete_promise) + future_from_promise(delete_promise) }; fut.await.map_err(WorkerError::from_promise_err)?; @@ -309,7 +309,7 @@ impl<'body> ObjectBody<'body> { } pub async fn bytes(self) -> Result> { - let fut = SendJsFuture::from(self.inner.array_buffer()); + let fut = future_from_promise(self.inner.array_buffer()); let js_buffer = fut.await.map_err(WorkerError::from_promise_err)?; let js_buffer = Uint8Array::new(&js_buffer); @@ -342,6 +342,11 @@ impl UploadedPart { } } +/// [MultipartUpload] represents an in-progress multipart upload. +/// [MultipartUpload] objects are returned from +/// [create_multipart_upload](Bucket::create_multipart_upload) operations and +/// must be passed to the [complete](MultipartUpload::complete) operation to +/// complete the multipart upload. pub struct MultipartUpload { inner: SendWrapper, } @@ -365,7 +370,7 @@ impl MultipartUpload { pub async fn upload_part( &self, part_number: u16, value: impl Into, ) -> Result { - let fut = SendJsFuture::from(self.inner.upload_part(part_number, value.into().into())); + let fut = future_from_promise(self.inner.upload_part(part_number, value.into().into())); let uploaded_part = fut.await.map_err(WorkerError::from_promise_err)?; Ok(UploadedPart { @@ -375,7 +380,7 @@ impl MultipartUpload { /// Aborts the multipart upload. pub async fn abort(&self) -> Result<()> { - let fut = SendJsFuture::from(self.inner.abort()); + let fut = future_from_promise(self.inner.abort()); fut.await.map_err(WorkerError::from_promise_err)?; Ok(()) } @@ -386,7 +391,7 @@ impl MultipartUpload { pub async fn complete( self, uploaded_parts: impl IntoIterator, ) -> Result { - let fut = SendJsFuture::from( + let fut = future_from_promise( self.inner.complete( uploaded_parts .into_iter() @@ -399,6 +404,16 @@ impl MultipartUpload { inner: ObjectInner::Body(SendWrapper::new(object.into())), }) } + + /// Returns the key of the object associated with this multipart upload. + pub fn key(&self) -> String { + self.inner.key() + } + + /// Returns the upload ID of this multipart upload. + pub fn upload_id(&self) -> String { + self.inner.upload_id() + } } /// A series of [Object]s returned by [list](Bucket::list).