diff --git a/crates/polars-io/src/cloud/object_store_setup.rs b/crates/polars-io/src/cloud/object_store_setup.rs index 35dbc4876c189..882be1f8b4f63 100644 --- a/crates/polars-io/src/cloud/object_store_setup.rs +++ b/crates/polars-io/src/cloud/object_store_setup.rs @@ -2,10 +2,21 @@ use std::sync::Arc; use object_store::local::LocalFileSystem; use object_store::ObjectStore; +use once_cell::sync::Lazy; use polars_error::{polars_bail, to_compute_err, PolarsError, PolarsResult}; +use polars_utils::aliases::PlHashMap; +use tokio::sync::RwLock; +use url::Url; use super::{parse_url, CloudLocation, CloudOptions, CloudType}; +/// Object stores must be cached. Every object-store will do DNS lookups and +/// get rate limited when querying the DNS (can take up to 5s). +/// Other reasons are connection pools that must be shared between as much as possible. +#[allow(clippy::type_complexity)] +static OBJECT_STORE_CACHE: Lazy>>> = + Lazy::new(Default::default); + type BuildResult = PolarsResult<(CloudLocation, Arc)>; #[allow(dead_code)] @@ -16,6 +27,16 @@ fn err_missing_feature(feature: &str, scheme: &str) -> BuildResult { ); } +/// Get the key of a url for object store registration. +/// The credential info will be removed +fn url_to_key(url: &Url) -> String { + format!( + "{}://{}", + url.scheme(), + &url[url::Position::BeforeHost..url::Position::AfterPort], + ) +} + /// Build an [`ObjectStore`] based on the URL and passed in url. Return the cloud location and an implementation of the object store. pub async fn build_object_store( url: &str, @@ -28,6 +49,16 @@ pub async fn build_object_store( let parsed = parse_url(url).map_err(to_compute_err)?; let cloud_location = CloudLocation::from_url(&parsed)?; + let key = url_to_key(&parsed); + let mut allow_cache = true; + + { + let cache = OBJECT_STORE_CACHE.read().await; + if let Some(store) = cache.get(&key) { + return Ok((cloud_location, store.clone())); + } + } + #[cfg(any(feature = "aws", feature = "gcp", feature = "azure"))] let options = options.map(std::borrow::Cow::Borrowed).unwrap_or_default(); @@ -83,5 +114,13 @@ pub async fn build_object_store( return err_missing_feature("http", &cloud_location.scheme); }, }?; + if allow_cache { + let mut cache = OBJECT_STORE_CACHE.write().await; + // Clear the cache if we surpass a certain amount of buckets. + if cache.len() > 32 { + cache.clear() + } + cache.insert(key, store.clone()); + } Ok((cloud_location, store)) -} +} \ No newline at end of file