From 116e2599917c2dc5f7ed2c5a5d43133093e71904 Mon Sep 17 00:00:00 2001 From: "John T. Wodder II" Date: Tue, 9 Jul 2024 17:46:04 -0400 Subject: [PATCH 01/13] Write out the `list_entry_pages()` stream directly This reduces the sizes of many Futures by 3520 bytes each. --- Cargo.lock | 1 + Cargo.toml | 1 + src/{s3.rs => s3/mod.rs} | 55 +++---------------- src/s3/streams.rs | 112 +++++++++++++++++++++++++++++++++++++++ 4 files changed, 121 insertions(+), 48 deletions(-) rename src/{s3.rs => s3/mod.rs} (89%) create mode 100644 src/s3/streams.rs diff --git a/Cargo.lock b/Cargo.lock index fb01fbd..ed689fa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -861,6 +861,7 @@ dependencies = [ "async-trait", "aws-config", "aws-sdk-s3", + "aws-smithy-async", "aws-smithy-runtime-api", "aws-smithy-types-convert", "axum", diff --git a/Cargo.toml b/Cargo.toml index 7966710..9a8f8da 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,6 +20,7 @@ async-stream = "0.3.5" async-trait = "0.1.81" aws-config = { version = "1.5.3", features = ["behavior-version-latest"] } aws-sdk-s3 = "1.39.0" +aws-smithy-async = "1.2.1" aws-smithy-runtime-api = "1.7.1" aws-smithy-types-convert = { version = "0.60.8", features = ["convert-time"] } axum = { version = "0.7.5", default-features = false, features = ["http1", "tokio", "tower-log"] } diff --git a/src/s3.rs b/src/s3/mod.rs similarity index 89% rename from src/s3.rs rename to src/s3/mod.rs index 7a2adaa..6e9986b 100644 --- a/src/s3.rs +++ b/src/s3/mod.rs @@ -1,3 +1,5 @@ +mod streams; +use self::streams::ListEntryPages; use crate::httputil::{self, BuildClientError, HttpError}; use crate::paths::{ParsePureDirPathError, ParsePurePathError, PureDirPath, PurePath}; use crate::validstr::TryFromStringError; @@ -13,6 +15,8 @@ use thiserror::Error; use time::OffsetDateTime; use url::{Host, Url}; +type ListObjectsError = SdkError; + #[derive(Clone, Debug)] pub(crate) struct S3Client { inner: Client, @@ -42,53 +46,8 @@ impl S3Client { } // `key_prefix` may or may not end with `/`; it is used as-is - fn list_entry_pages<'a>( - &'a self, - key_prefix: &'a str, - ) -> impl Stream> + 'a { - try_stream! { - let mut stream = self.inner - .list_objects_v2() - .bucket(&*self.bucket) - .prefix(key_prefix) - .delimiter("/") - .into_paginator() - .send(); - while let Some(r) = stream.next().await { - let page = match r { - Ok(page) => page, - Err(source) => Err( - S3Error::ListObjects { - bucket: self.bucket.clone(), - prefix: key_prefix.to_owned(), - source, - } - )?, - }; - let objects = page - .contents - .unwrap_or_default() - .into_iter() - .map(|obj| S3Object::try_from_aws_object(obj, &self.bucket)) - .collect::, _>>() - .map_err(|source| S3Error::BadObject { - bucket: self.bucket.clone(), - prefix: key_prefix.to_owned(), - source, - })?; - let folders = page.common_prefixes - .unwrap_or_default() - .into_iter() - .map(S3Folder::try_from) - .collect::, _>>() - .map_err(|source| S3Error::BadPrefix { - bucket: self.bucket.clone(), - prefix: key_prefix.to_owned(), - source, - })?; - yield S3EntryPage {folders, objects}; - } - } + fn list_entry_pages<'a>(&'a self, key_prefix: &'a str) -> ListEntryPages<'a> { + ListEntryPages::new(self, key_prefix) } pub(crate) fn get_folder_entries<'a>( @@ -431,7 +390,7 @@ pub(crate) enum S3Error { ListObjects { bucket: CompactString, prefix: String, - source: SdkError, + source: ListObjectsError, }, #[error("invalid object found in S3 bucket {bucket:?} under prefix {prefix:?}")] BadObject { diff --git a/src/s3/streams.rs b/src/s3/streams.rs new file mode 100644 index 0000000..8b8cf4f --- /dev/null +++ b/src/s3/streams.rs @@ -0,0 +1,112 @@ +use super::{ + ListObjectsError, S3Client, S3EntryPage, S3Error, S3Folder, S3Object, TryFromAwsObjectError, + TryFromCommonPrefixError, +}; +use aws_sdk_s3::operation::list_objects_v2::ListObjectsV2Output; +use aws_smithy_async::future::pagination_stream::PaginationStream; +use futures_util::Stream; +use smartstring::alias::CompactString; +use std::pin::Pin; +use std::task::{ready, Context, Poll}; + +pub(super) struct ListEntryPages<'a> { + bucket: &'a CompactString, + key_prefix: &'a str, + inner: Option>>, +} + +impl<'a> ListEntryPages<'a> { + pub(super) fn new(client: &'a S3Client, key_prefix: &'a str) -> Self { + ListEntryPages { + bucket: &client.bucket, + key_prefix, + inner: Some( + client + .inner + .list_objects_v2() + .bucket(&*client.bucket) + .prefix(key_prefix) + .delimiter("/") + .into_paginator() + .send(), + ), + } + } + + fn die(&mut self, e: S3Error) -> Poll>> { + self.inner = None; + Some(Err(e)).into() + } + + fn die_list_objects( + &mut self, + source: ListObjectsError, + ) -> Poll>> { + self.die(S3Error::ListObjects { + bucket: self.bucket.clone(), + prefix: self.key_prefix.to_owned(), + source, + }) + } + + fn die_bad_object( + &mut self, + source: TryFromAwsObjectError, + ) -> Poll>> { + self.die(S3Error::BadObject { + bucket: self.bucket.clone(), + prefix: self.key_prefix.to_owned(), + source, + }) + } + + fn die_bad_prefix( + &mut self, + source: TryFromCommonPrefixError, + ) -> Poll>> { + self.die(S3Error::BadPrefix { + bucket: self.bucket.clone(), + prefix: self.key_prefix.to_owned(), + source, + }) + } +} + +impl Stream for ListEntryPages<'_> { + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let Some(inner) = self.inner.as_mut() else { + return None.into(); + }; + let Some(r) = ready!(inner.poll_next(cx)) else { + self.inner = None; + return None.into(); + }; + let page = match r { + Ok(page) => page, + Err(source) => return self.die_list_objects(source), + }; + let objects = match page + .contents + .unwrap_or_default() + .into_iter() + .map(|obj| S3Object::try_from_aws_object(obj, self.bucket)) + .collect::, _>>() + { + Ok(objects) => objects, + Err(source) => return self.die_bad_object(source), + }; + let folders = match page + .common_prefixes + .unwrap_or_default() + .into_iter() + .map(S3Folder::try_from) + .collect::, _>>() + { + Ok(folders) => folders, + Err(source) => return self.die_bad_prefix(source), + }; + Some(Ok(S3EntryPage { folders, objects })).into() + } +} From 70370c0d969a8dfe85445762023a09a091ce00f1 Mon Sep 17 00:00:00 2001 From: "John T. Wodder II" Date: Tue, 9 Jul 2024 18:01:46 -0400 Subject: [PATCH 02/13] No need to pin now --- src/s3/mod.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/s3/mod.rs b/src/s3/mod.rs index 6e9986b..33e7c0f 100644 --- a/src/s3/mod.rs +++ b/src/s3/mod.rs @@ -55,8 +55,7 @@ impl S3Client { key_prefix: &'a PureDirPath, ) -> impl Stream> + 'a { try_stream! { - let stream = self.list_entry_pages(key_prefix.as_ref()); - tokio::pin!(stream); + let mut stream = self.list_entry_pages(key_prefix.as_ref()); while let Some(page) = stream.try_next().await? { for entry in page { yield entry; @@ -70,8 +69,7 @@ impl S3Client { let mut surpassed_objects = false; let mut surpassed_folders = false; let folder_cutoff = format!("{path}/"); - let stream = self.list_entry_pages(path); - tokio::pin!(stream); + let mut stream = self.list_entry_pages(path); while let Some(page) = stream.try_next().await? { if !surpassed_objects { for obj in page.objects { From 56f090791152a7ff7c40580e84ca6226e7bc0ff3 Mon Sep 17 00:00:00 2001 From: "John T. Wodder II" Date: Tue, 9 Jul 2024 18:14:47 -0400 Subject: [PATCH 03/13] Boilerplate --- src/s3/streams.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/s3/streams.rs b/src/s3/streams.rs index 8b8cf4f..88e8acb 100644 --- a/src/s3/streams.rs +++ b/src/s3/streams.rs @@ -9,6 +9,8 @@ use smartstring::alias::CompactString; use std::pin::Pin; use std::task::{ready, Context, Poll}; +#[derive(Debug)] +#[must_use = "streams do nothing unless polled"] pub(super) struct ListEntryPages<'a> { bucket: &'a CompactString, key_prefix: &'a str, From 57dd15aec12b14bc57806b565d38b1fc42f6781a Mon Sep 17 00:00:00 2001 From: "John T. Wodder II" Date: Tue, 9 Jul 2024 19:05:29 -0400 Subject: [PATCH 04/13] try_flat_iter_map() The net savings from this commit are 64 bytes each for various Future types. It's not much, but I feel the code also looks cleaner now. --- Cargo.lock | 1 + Cargo.toml | 1 + src/main.rs | 1 + src/s3/mod.rs | 65 +++++++++++++++++------------------------------ src/s3/streams.rs | 25 +++++++++--------- src/streamutil.rs | 65 +++++++++++++++++++++++++++++++++++++++++++++++ src/validstr.rs | 7 +++++ 7 files changed, 112 insertions(+), 53 deletions(-) create mode 100644 src/streamutil.rs diff --git a/Cargo.lock b/Cargo.lock index ed689fa..0faf73b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -875,6 +875,7 @@ dependencies = [ "memory-stats", "moka", "percent-encoding", + "pin-project-lite", "pretty_assertions", "reqwest", "reqwest-middleware", diff --git a/Cargo.toml b/Cargo.toml index 9a8f8da..f380605 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -34,6 +34,7 @@ itertools = "0.13.0" memory-stats = "1.2.0" moka = { version = "0.12.8", features = ["future"] } percent-encoding = "2.3.1" +pin-project-lite = "0.2.14" reqwest = { version = "0.12.5", default-features = false, features = ["json", "rustls-tls-native-roots"] } reqwest-middleware = "0.3.2" reqwest-retry = "0.6.0" diff --git a/src/main.rs b/src/main.rs index 71ecf49..3a94dbf 100644 --- a/src/main.rs +++ b/src/main.rs @@ -7,6 +7,7 @@ mod dav; mod httputil; mod paths; mod s3; +mod streamutil; mod zarrman; use crate::consts::{CSS_CONTENT_TYPE, DEFAULT_API_URL, SERVER_VALUE}; use crate::dandi::DandiClient; diff --git a/src/s3/mod.rs b/src/s3/mod.rs index 33e7c0f..e50d9bd 100644 --- a/src/s3/mod.rs +++ b/src/s3/mod.rs @@ -2,13 +2,14 @@ mod streams; use self::streams::ListEntryPages; use crate::httputil::{self, BuildClientError, HttpError}; use crate::paths::{ParsePureDirPathError, ParsePurePathError, PureDirPath, PurePath}; +use crate::streamutil::TryStreamUtil; use crate::validstr::TryFromStringError; -use async_stream::try_stream; use aws_sdk_s3::{operation::list_objects_v2::ListObjectsV2Error, types::CommonPrefix, Client}; use aws_smithy_runtime_api::client::{orchestrator::HttpResponse, result::SdkError}; use aws_smithy_types_convert::date_time::DateTimeExt; use futures_util::{Stream, TryStreamExt}; use smartstring::alias::CompactString; +use std::borrow::Borrow; use std::cmp::Ordering; use std::sync::Arc; use thiserror::Error; @@ -24,7 +25,7 @@ pub(crate) struct S3Client { } impl S3Client { - pub(crate) async fn new(bucket: CompactString, region: String) -> S3Client { + async fn new(bucket: CompactString, region: String) -> S3Client { let config = aws_config::from_env() .app_name( aws_config::AppName::new("dandidav") @@ -46,26 +47,20 @@ impl S3Client { } // `key_prefix` may or may not end with `/`; it is used as-is - fn list_entry_pages<'a>(&'a self, key_prefix: &'a str) -> ListEntryPages<'a> { + fn list_entry_pages>(&self, key_prefix: S) -> ListEntryPages { ListEntryPages::new(self, key_prefix) } - pub(crate) fn get_folder_entries<'a>( - &'a self, - key_prefix: &'a PureDirPath, - ) -> impl Stream> + 'a { - try_stream! { - let mut stream = self.list_entry_pages(key_prefix.as_ref()); - while let Some(page) = stream.try_next().await? { - for entry in page { - yield entry; - } - } - } + fn get_folder_entries>( + &self, + key_prefix: P, + ) -> impl Stream> { + self.list_entry_pages(key_prefix.borrow()) + .try_flat_iter_map(|page| page) } // Returns `None` if nothing found at path - pub(crate) async fn get_path(&self, path: &PurePath) -> Result, S3Error> { + async fn get_path(&self, path: &PurePath) -> Result, S3Error> { let mut surpassed_objects = false; let mut surpassed_folders = false; let folder_cutoff = format!("{path}/"); @@ -113,33 +108,21 @@ pub(crate) struct PrefixedS3Client { impl PrefixedS3Client { pub(crate) fn get_root_entries(&self) -> impl Stream> + '_ { - let stream = self.inner.get_folder_entries(&self.prefix); - try_stream! { - tokio::pin!(stream); - while let Some(entry) = stream.try_next().await? { - if let Some(entry) = entry.relative_to(&self.prefix) { - yield entry; - } - // TODO: Else: Error? Warn? - } - } + self.inner + .get_folder_entries(&self.prefix) + .try_flat_iter_map(|entry| entry.relative_to(&self.prefix)) + // TODO: Do something when relative_to() fails (Error? Warn?) } - pub(crate) fn get_folder_entries<'a>( - &'a self, - dirpath: &'a PureDirPath, - ) -> impl Stream> + 'a { - try_stream! { - let key_prefix = self.prefix.join_dir(dirpath); - let stream = self.inner.get_folder_entries(&key_prefix); - tokio::pin!(stream); - while let Some(entry) = stream.try_next().await? { - if let Some(entry) = entry.relative_to(&self.prefix) { - yield entry; - } - // TODO: Else: Error? Warn? - } - } + pub(crate) fn get_folder_entries( + &self, + dirpath: &PureDirPath, + ) -> impl Stream> + '_ { + let key_prefix = self.prefix.join_dir(dirpath); + self.inner + .get_folder_entries(key_prefix) + .try_flat_iter_map(|entry| entry.relative_to(&self.prefix)) + // TODO: Do something when relative_to() fails (Error? Warn?) } // Returns `None` if nothing found at path diff --git a/src/s3/streams.rs b/src/s3/streams.rs index 88e8acb..c9ba182 100644 --- a/src/s3/streams.rs +++ b/src/s3/streams.rs @@ -11,17 +11,18 @@ use std::task::{ready, Context, Poll}; #[derive(Debug)] #[must_use = "streams do nothing unless polled"] -pub(super) struct ListEntryPages<'a> { - bucket: &'a CompactString, - key_prefix: &'a str, +pub(super) struct ListEntryPages { + bucket: CompactString, + key_prefix: String, inner: Option>>, } -impl<'a> ListEntryPages<'a> { - pub(super) fn new(client: &'a S3Client, key_prefix: &'a str) -> Self { +impl ListEntryPages { + pub(super) fn new>(client: &S3Client, key_prefix: S) -> Self { + let key_prefix = key_prefix.into(); ListEntryPages { - bucket: &client.bucket, - key_prefix, + bucket: client.bucket.clone(), + key_prefix: key_prefix.clone(), inner: Some( client .inner @@ -46,7 +47,7 @@ impl<'a> ListEntryPages<'a> { ) -> Poll>> { self.die(S3Error::ListObjects { bucket: self.bucket.clone(), - prefix: self.key_prefix.to_owned(), + prefix: self.key_prefix.clone(), source, }) } @@ -57,7 +58,7 @@ impl<'a> ListEntryPages<'a> { ) -> Poll>> { self.die(S3Error::BadObject { bucket: self.bucket.clone(), - prefix: self.key_prefix.to_owned(), + prefix: self.key_prefix.clone(), source, }) } @@ -68,13 +69,13 @@ impl<'a> ListEntryPages<'a> { ) -> Poll>> { self.die(S3Error::BadPrefix { bucket: self.bucket.clone(), - prefix: self.key_prefix.to_owned(), + prefix: self.key_prefix.clone(), source, }) } } -impl Stream for ListEntryPages<'_> { +impl Stream for ListEntryPages { type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { @@ -93,7 +94,7 @@ impl Stream for ListEntryPages<'_> { .contents .unwrap_or_default() .into_iter() - .map(|obj| S3Object::try_from_aws_object(obj, self.bucket)) + .map(|obj| S3Object::try_from_aws_object(obj, &self.bucket)) .collect::, _>>() { Ok(objects) => objects, diff --git a/src/streamutil.rs b/src/streamutil.rs new file mode 100644 index 0000000..5503002 --- /dev/null +++ b/src/streamutil.rs @@ -0,0 +1,65 @@ +use futures_util::{Stream, TryStream}; +use pin_project_lite::pin_project; +use std::pin::Pin; +use std::task::{ready, Context, Poll}; + +pub(crate) trait TryStreamUtil: TryStream { + fn try_flat_iter_map(self, f: F) -> TryFlatIterMap + where + F: FnMut(Self::Ok) -> I, + I: IntoIterator, + Self: Sized, + { + TryFlatIterMap::new(self, f) + } +} + +impl TryStreamUtil for S {} + +pin_project! { + #[derive(Clone, Debug)] + #[must_use = "streams do nothing unless polled"] + pub(crate) struct TryFlatIterMap { + #[pin] + inner: S, + f: F, + iter: Option, + } +} + +impl TryFlatIterMap { + fn new(inner: S, f: F) -> Self { + TryFlatIterMap { + inner, + f, + iter: None, + } + } +} + +impl Stream for TryFlatIterMap +where + S: TryStream, + F: FnMut(S::Ok) -> I, + I: IntoIterator, +{ + type Item = Result; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut this = self.project(); + loop { + if let Some(iter) = this.iter.as_mut() { + if let Some(r) = iter.next() { + return Some(Ok(r)).into(); + } else { + *this.iter = None; + } + } + match ready!(this.inner.as_mut().try_poll_next(cx)) { + Some(Ok(iter)) => *this.iter = Some((this.f)(iter).into_iter()), + Some(Err(e)) => return Some(Err(e)).into(), + None => return None.into(), + } + } + } +} diff --git a/src/validstr.rs b/src/validstr.rs index 1ab6ae3..e21cc1a 100644 --- a/src/validstr.rs +++ b/src/validstr.rs @@ -15,6 +15,13 @@ macro_rules! validstr { } } + impl From<&$t> for String { + #[allow(clippy::string_to_string)] + fn from(value: &$t) -> String { + value.0.to_string() + } + } + impl std::fmt::Debug for $t { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "{:?}", self.0) From 50ecf2daae232225835f76ed67debb8f8f2b5a53 Mon Sep 17 00:00:00 2001 From: "John T. Wodder II" Date: Tue, 9 Jul 2024 19:14:42 -0400 Subject: [PATCH 05/13] Add a comment about why list_entry_pages() is implemented this way --- src/s3/streams.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/s3/streams.rs b/src/s3/streams.rs index c9ba182..9f685c1 100644 --- a/src/s3/streams.rs +++ b/src/s3/streams.rs @@ -9,6 +9,9 @@ use smartstring::alias::CompactString; use std::pin::Pin; use std::task::{ready, Context, Poll}; +// Implementing list_entry_pages() as a manually-implemented Stream instead of +// via async_stream lets us save about 3500 bytes on dandidav's top-level +// Futures. #[derive(Debug)] #[must_use = "streams do nothing unless polled"] pub(super) struct ListEntryPages { From 06de381bb6b504aa71b095a09f6b3300d88dd1c0 Mon Sep 17 00:00:00 2001 From: "John T. Wodder II" Date: Tue, 9 Jul 2024 19:51:27 -0400 Subject: [PATCH 06/13] Make the get_json() Future 'static --- src/httputil.rs | 24 ++++++++++++++++++------ 1 file changed, 18 insertions(+), 6 deletions(-) diff --git a/src/httputil.rs b/src/httputil.rs index f1af85c..6451f1f 100644 --- a/src/httputil.rs +++ b/src/httputil.rs @@ -3,6 +3,7 @@ use reqwest::{Method, Request, Response, StatusCode}; use reqwest_middleware::{Middleware, Next}; use reqwest_retry::{policies::ExponentialBackoff, RetryTransientMiddleware}; use serde::de::DeserializeOwned; +use std::future::Future; use thiserror::Error; use tracing::Instrument; use url::Url; @@ -53,12 +54,23 @@ impl Client { self.request(Method::GET, url).await } - pub(crate) async fn get_json(&self, url: Url) -> Result { - self.get(url.clone()) - .await? - .json::() - .await - .map_err(move |source| HttpError::Deserialize { url, source }) + pub(crate) fn get_json( + &self, + url: Url, + ) -> impl Future> { + // Clone the client and move it into an async block (as opposed to just + // writing a "normal" async function) so that the resulting Future will + // be 'static rather than retaining a reference to &self, thereby + // facilitating the Future's use by the Paginate stream. + let client = self.clone(); + async move { + client + .get(url.clone()) + .await? + .json::() + .await + .map_err(move |source| HttpError::Deserialize { url, source }) + } } } From 2d0e186c99974a95ae9a3f106e7c4d6e16da5b77 Mon Sep 17 00:00:00 2001 From: "John T. Wodder II" Date: Tue, 9 Jul 2024 19:41:25 -0400 Subject: [PATCH 07/13] `Paginate` stream type This reduces the sizes of many Futures by 4720 bytes each. --- src/dandi/mod.rs | 18 +++------- src/dandi/streams.rs | 82 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 86 insertions(+), 14 deletions(-) create mode 100644 src/dandi/streams.rs diff --git a/src/dandi/mod.rs b/src/dandi/mod.rs index 93440d1..eb250da 100644 --- a/src/dandi/mod.rs +++ b/src/dandi/mod.rs @@ -1,7 +1,9 @@ mod dandiset_id; +mod streams; mod types; mod version_id; pub(crate) use self::dandiset_id::*; +use self::streams::Paginate; pub(crate) use self::types::*; pub(crate) use self::version_id::*; use crate::consts::S3CLIENT_CACHE_SIZE; @@ -51,20 +53,8 @@ impl DandiClient { self.inner.get_json(url).await.map_err(Into::into) } - fn paginate( - &self, - url: Url, - ) -> impl Stream> + '_ { - try_stream! { - let mut url = Some(url); - while let Some(u) = url { - let page = self.inner.get_json::>(u).await?; - for r in page.results { - yield r; - } - url = page.next; - } - } + fn paginate(&self, url: Url) -> Paginate { + Paginate::new(self, url) } async fn get_s3client(&self, loc: S3Location) -> Result { diff --git a/src/dandi/streams.rs b/src/dandi/streams.rs new file mode 100644 index 0000000..7c8bf14 --- /dev/null +++ b/src/dandi/streams.rs @@ -0,0 +1,82 @@ +use super::types::Page; +use super::{DandiClient, DandiError}; +use crate::httputil::{Client, HttpError}; +use futures_util::{future::BoxFuture, FutureExt, Stream}; +use pin_project_lite::pin_project; +use serde::de::DeserializeOwned; +use std::pin::Pin; +use std::task::{ready, Context, Poll}; +use url::Url; + +pin_project! { + // Implementing paginate() as a manually-implemented Stream instead of via + // async_stream lets us save about 4700 bytes on dandidav's top-level + // Futures. + #[must_use = "streams do nothing unless polled"] + pub(super) struct Paginate { + client: Client, + state: PaginateState, + } +} + +enum PaginateState { + Requesting(BoxFuture<'static, Result, HttpError>>), + Yielding { + results: std::vec::IntoIter, + next: Option, + }, + Done, +} + +impl Paginate { + pub(super) fn new(client: &DandiClient, url: Url) -> Self { + Paginate { + client: client.inner.clone(), + state: PaginateState::Yielding { + results: Vec::new().into_iter(), + next: Some(url), + }, + } + } +} + +impl Stream for Paginate +where + T: DeserializeOwned + 'static, +{ + type Item = Result; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.project(); + loop { + match this.state { + PaginateState::Requesting(ref mut fut) => match ready!(fut.as_mut().poll(cx)) { + Ok(page) => { + *this.state = PaginateState::Yielding { + results: page.results.into_iter(), + next: page.next, + } + } + Err(e) => { + *this.state = PaginateState::Done; + return Some(Err(DandiError::from(e))).into(); + } + }, + PaginateState::Yielding { + ref mut results, + ref mut next, + } => { + if let Some(item) = results.next() { + return Some(Ok(item)).into(); + } else if let Some(url) = next.take() { + *this.state = + PaginateState::Requesting(this.client.get_json::>(url).boxed()); + } else { + *this.state = PaginateState::Done; + } + } + PaginateState::Done => return None.into(), + } + } + } +} From 51095deda77c0298e0ab59bfb9f50b2453c70445 Mon Sep 17 00:00:00 2001 From: "John T. Wodder II" Date: Tue, 9 Jul 2024 20:09:48 -0400 Subject: [PATCH 08/13] Adjust some return types --- src/dandi/mod.rs | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/src/dandi/mod.rs b/src/dandi/mod.rs index eb250da..e98e7c4 100644 --- a/src/dandi/mod.rs +++ b/src/dandi/mod.rs @@ -255,17 +255,11 @@ impl<'a> VersionEndpoint<'a> { } } - fn get_folder_entries( - &self, - path: &AssetFolder, - ) -> impl Stream> + '_ { + fn get_folder_entries(&self, path: &AssetFolder) -> Paginate { self.get_entries_under_path(Some(&path.path)) } - fn get_entries_under_path( - &self, - path: Option<&PureDirPath>, - ) -> impl Stream> + '_ { + fn get_entries_under_path(&self, path: Option<&PureDirPath>) -> Paginate { let mut url = self.client.get_url([ "dandisets", self.dandiset_id.as_ref(), From 755da87bae3a51e42bb33bb14624a12441cbada2 Mon Sep 17 00:00:00 2001 From: "John T. Wodder II" Date: Tue, 9 Jul 2024 20:12:01 -0400 Subject: [PATCH 09/13] Remove some superfluous pins --- src/dandi/mod.rs | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/src/dandi/mod.rs b/src/dandi/mod.rs index e98e7c4..3ac445e 100644 --- a/src/dandi/mod.rs +++ b/src/dandi/mod.rs @@ -238,8 +238,7 @@ impl<'a> VersionEndpoint<'a> { &self, ) -> impl Stream> + '_ { try_stream! { - let stream = self.get_entries_under_path(None); - tokio::pin!(stream); + let mut stream = self.get_entries_under_path(None); while let Some(entry) = stream.try_next().await? { match entry { FolderEntry::Folder(subf) => yield DandiResource::Folder(subf), @@ -288,8 +287,7 @@ impl<'a> VersionEndpoint<'a> { .append_pair("metadata", "1") .append_pair("order", "path"); let dirpath = path.to_dir_path(); - let stream = self.client.paginate::(url.clone()); - tokio::pin!(stream); + let mut stream = self.client.paginate::(url.clone()); while let Some(asset) = stream.try_next().await? { if &asset.path == path { return Ok(AtAssetPath::Asset(asset.try_into_asset(self)?)); @@ -355,8 +353,7 @@ impl<'a> VersionEndpoint<'a> { match self.get_resource_with_s3(path).await? { DandiResourceWithS3::Folder(folder) => { let mut children = Vec::new(); - let stream = self.get_folder_entries(&folder); - tokio::pin!(stream); + let mut stream = self.get_folder_entries(&folder); while let Some(child) = stream.try_next().await? { let child = match child { FolderEntry::Folder(subf) => DandiResource::Folder(subf), @@ -377,8 +374,7 @@ impl<'a> VersionEndpoint<'a> { let s3 = self.client.get_s3client_for_zarr(&zarr).await?; let mut children = Vec::new(); { - let stream = s3.get_root_entries(); - tokio::pin!(stream); + let mut stream = s3.get_root_entries(); while let Some(child) = stream.try_next().await? { children.push(zarr.make_resource(child)); } @@ -388,8 +384,7 @@ impl<'a> VersionEndpoint<'a> { DandiResourceWithS3::ZarrFolder { folder, s3 } => { let mut children = Vec::new(); { - let stream = s3.get_folder_entries(&folder.path); - tokio::pin!(stream); + let mut stream = s3.get_folder_entries(&folder.path); while let Some(child) = stream.try_next().await? { children.push(folder.make_resource(child)); } From 32df4e284159b85bf36382f4befb12f81fc8e1ea Mon Sep 17 00:00:00 2001 From: "John T. Wodder II" Date: Tue, 9 Jul 2024 21:18:37 -0400 Subject: [PATCH 10/13] Rewrite `get_root_children()` using a combinator This reduces the sizes of various Futures by 808 bytes each. --- Cargo.lock | 23 ----------------------- Cargo.toml | 1 - src/dandi/mod.rs | 17 +++++++---------- src/dav/mod.rs | 6 ++---- 4 files changed, 9 insertions(+), 38 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0faf73b..faa0114 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -88,28 +88,6 @@ dependencies = [ "pin-project-lite", ] -[[package]] -name = "async-stream" -version = "0.3.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cd56dd203fef61ac097dd65721a419ddccb106b2d2b70ba60a6b529f03961a51" -dependencies = [ - "async-stream-impl", - "futures-core", - "pin-project-lite", -] - -[[package]] -name = "async-stream-impl" -version = "0.3.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193" -dependencies = [ - "proc-macro2", - "quote", - "syn", -] - [[package]] name = "async-trait" version = "0.1.81" @@ -857,7 +835,6 @@ version = "0.4.0" dependencies = [ "anyhow", "assert_matches", - "async-stream", "async-trait", "aws-config", "aws-sdk-s3", diff --git a/Cargo.toml b/Cargo.toml index f380605..5e8fee5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,7 +16,6 @@ publish = false [dependencies] anyhow = "1.0.86" -async-stream = "0.3.5" async-trait = "0.1.81" aws-config = { version = "1.5.3", features = ["behavior-version-latest"] } aws-sdk-s3 = "1.39.0" diff --git a/src/dandi/mod.rs b/src/dandi/mod.rs index 3ac445e..98eb63b 100644 --- a/src/dandi/mod.rs +++ b/src/dandi/mod.rs @@ -12,7 +12,6 @@ use crate::paths::{ParsePureDirPathError, PureDirPath, PurePath}; use crate::s3::{ BucketSpec, GetBucketRegionError, PrefixedS3Client, S3Client, S3Error, S3Location, }; -use async_stream::try_stream; use futures_util::{Stream, TryStreamExt}; use moka::future::{Cache, CacheBuilder}; use serde::de::DeserializeOwned; @@ -237,21 +236,19 @@ impl<'a> VersionEndpoint<'a> { pub(crate) fn get_root_children( &self, ) -> impl Stream> + '_ { - try_stream! { - let mut stream = self.get_entries_under_path(None); - while let Some(entry) = stream.try_next().await? { + self.get_entries_under_path(None) + .and_then(move |entry| async move { match entry { - FolderEntry::Folder(subf) => yield DandiResource::Folder(subf), + FolderEntry::Folder(subf) => Ok(DandiResource::Folder(subf)), FolderEntry::Asset { id, path } => match self.get_asset_by_id(&id).await { - Ok(asset) => yield DandiResource::Asset(asset), + Ok(asset) => Ok(DandiResource::Asset(asset)), Err(DandiError::Http(HttpError::NotFound { .. })) => { - Err(DandiError::DisappearingAsset { asset_id: id, path })?; + Err(DandiError::DisappearingAsset { asset_id: id, path }) } - Err(e) => Err(e)?, + Err(e) => Err(e), }, } - } - } + }) } fn get_folder_entries(&self, path: &AssetFolder) -> Paginate { diff --git a/src/dav/mod.rs b/src/dav/mod.rs index 15a65a4..32d0a39 100644 --- a/src/dav/mod.rs +++ b/src/dav/mod.rs @@ -247,8 +247,7 @@ impl DandiDav { DavPath::DandisetIndex => { let col = DavCollection::dandiset_index(); let mut children = Vec::new(); - let stream = self.dandi.get_all_dandisets(); - tokio::pin!(stream); + let mut stream = self.dandi.get_all_dandisets(); while let Some(ds) = stream.try_next().await? { children.push(DavResource::Collection(ds.into())); } @@ -282,8 +281,7 @@ impl DandiDav { let col = DavCollection::dandiset_releases(dandiset_id); let mut children = Vec::new(); let endpoint = self.dandi.dandiset(dandiset_id.clone()); - let stream = endpoint.get_all_versions(); - tokio::pin!(stream); + let mut stream = endpoint.get_all_versions(); while let Some(v) = stream.try_next().await? { if let VersionId::Published(ref pvid) = v.version { let path = version_path(dandiset_id, &VersionSpec::Published(pvid.clone())); From 273bcc9b06738b6edf11312847392763436356be Mon Sep 17 00:00:00 2001 From: "John T. Wodder II" Date: Tue, 9 Jul 2024 21:35:01 -0400 Subject: [PATCH 11/13] `inner_handle_request()` no longer needs to be boxed --- src/dav/mod.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/dav/mod.rs b/src/dav/mod.rs index 32d0a39..003481b 100644 --- a/src/dav/mod.rs +++ b/src/dav/mod.rs @@ -43,8 +43,7 @@ impl DandiDav { &self, req: Request, ) -> Result, Infallible> { - // Box large future: - let resp = match Box::pin(self.inner_handle_request(req)).await { + let resp = match self.inner_handle_request(req).await { Ok(r) => r, Err(e) if e.is_404() => { let e = anyhow::Error::from(e); From 4741067caafb5cbbd5f2c577cf7231fd3d8ce194 Mon Sep 17 00:00:00 2001 From: "John T. Wodder II" Date: Tue, 9 Jul 2024 21:59:55 -0400 Subject: [PATCH 12/13] Rewrite some `try_next()` loops to use `try_collect()` instead This reduces the sizes of various Futures by 2192 bytes each. --- src/dandi/mod.rs | 24 ++++++++++-------------- src/dav/mod.rs | 22 +++++++++++----------- 2 files changed, 21 insertions(+), 25 deletions(-) diff --git a/src/dandi/mod.rs b/src/dandi/mod.rs index 98eb63b..675a048 100644 --- a/src/dandi/mod.rs +++ b/src/dandi/mod.rs @@ -369,23 +369,19 @@ impl<'a> VersionEndpoint<'a> { DandiResourceWithS3::Asset(Asset::Blob(r)) => Ok(DandiResourceWithChildren::Blob(r)), DandiResourceWithS3::Asset(Asset::Zarr(zarr)) => { let s3 = self.client.get_s3client_for_zarr(&zarr).await?; - let mut children = Vec::new(); - { - let mut stream = s3.get_root_entries(); - while let Some(child) = stream.try_next().await? { - children.push(zarr.make_resource(child)); - } - } + let children = s3 + .get_root_entries() + .map_ok(|child| zarr.make_resource(child)) + .try_collect::>() + .await?; Ok(DandiResourceWithChildren::Zarr { zarr, children }) } DandiResourceWithS3::ZarrFolder { folder, s3 } => { - let mut children = Vec::new(); - { - let mut stream = s3.get_folder_entries(&folder.path); - while let Some(child) = stream.try_next().await? { - children.push(folder.make_resource(child)); - } - } + let children = s3 + .get_folder_entries(&folder.path) + .map_ok(|child| folder.make_resource(child)) + .try_collect::>() + .await?; Ok(DandiResourceWithChildren::ZarrFolder { folder, children }) } DandiResourceWithS3::ZarrEntry(r) => Ok(DandiResourceWithChildren::ZarrEntry(r)), diff --git a/src/dav/mod.rs b/src/dav/mod.rs index 003481b..732724b 100644 --- a/src/dav/mod.rs +++ b/src/dav/mod.rs @@ -245,11 +245,12 @@ impl DandiDav { DavPath::Root => Ok(DavResourceWithChildren::root()), DavPath::DandisetIndex => { let col = DavCollection::dandiset_index(); - let mut children = Vec::new(); - let mut stream = self.dandi.get_all_dandisets(); - while let Some(ds) = stream.try_next().await? { - children.push(DavResource::Collection(ds.into())); - } + let children = self + .dandi + .get_all_dandisets() + .map_ok(|ds| DavResource::Collection(ds.into())) + .try_collect::>() + .await?; Ok(DavResourceWithChildren::Collection { col, children }) } DavPath::Dandiset { dandiset_id } => { @@ -296,12 +297,11 @@ impl DandiDav { version, } => { let (col, endpoint) = self.get_dandiset_version(dandiset_id, version).await?; - let mut children = Vec::new(); - let stream = endpoint.get_root_children(); - tokio::pin!(stream); - while let Some(res) = stream.try_next().await? { - children.push(DavResource::from(res).under_version_path(dandiset_id, version)); - } + let mut children = endpoint + .get_root_children() + .map_ok(|res| DavResource::from(res).under_version_path(dandiset_id, version)) + .try_collect::>() + .await?; children.push( self.get_dandiset_yaml(dandiset_id, version) .await From ff44f1ffefda5b810d4a620b6d9481e435f370c9 Mon Sep 17 00:00:00 2001 From: "John T. Wodder II" Date: Tue, 9 Jul 2024 22:03:47 -0400 Subject: [PATCH 13/13] Update CHANGELOG --- CHANGELOG.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 064f92f..673e934 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,7 @@ +In Development +-------------- +- Reduced the sizes of a number of streams & futures + v0.4.0 (2024-07-09) ------------------- - Set `Access-Control-Allow-Origin: *` header in all responses