Skip to content

Commit

Permalink
revert removal of cache as turned off for http
Browse files Browse the repository at this point in the history
  • Loading branch information
Boruch Chalk committed Mar 25, 2024
1 parent de572d5 commit 2c357ce
Showing 1 changed file with 40 additions and 1 deletion.
41 changes: 40 additions & 1 deletion crates/polars-io/src/cloud/object_store_setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,21 @@ use std::sync::Arc;

use object_store::local::LocalFileSystem;
use object_store::ObjectStore;
use once_cell::sync::Lazy;
use polars_error::{polars_bail, to_compute_err, PolarsError, PolarsResult};
use polars_utils::aliases::PlHashMap;
use tokio::sync::RwLock;
use url::Url;

use super::{parse_url, CloudLocation, CloudOptions, CloudType};

/// Object stores must be cached. Every object-store will do DNS lookups and
/// get rate limited when querying the DNS (can take up to 5s).
/// Other reasons are connection pools that must be shared between as much as possible.
#[allow(clippy::type_complexity)]
static OBJECT_STORE_CACHE: Lazy<RwLock<PlHashMap<String, Arc<dyn ObjectStore>>>> =
Lazy::new(Default::default);

type BuildResult = PolarsResult<(CloudLocation, Arc<dyn ObjectStore>)>;

#[allow(dead_code)]
Expand All @@ -16,6 +27,16 @@ fn err_missing_feature(feature: &str, scheme: &str) -> BuildResult {
);
}

/// Get the key of a url for object store registration.
/// The credential info will be removed
fn url_to_key(url: &Url) -> String {
format!(
"{}://{}",
url.scheme(),
&url[url::Position::BeforeHost..url::Position::AfterPort],
)
}

/// Build an [`ObjectStore`] based on the URL and passed in url. Return the cloud location and an implementation of the object store.
pub async fn build_object_store(
url: &str,
Expand All @@ -28,6 +49,16 @@ pub async fn build_object_store(
let parsed = parse_url(url).map_err(to_compute_err)?;
let cloud_location = CloudLocation::from_url(&parsed)?;

let key = url_to_key(&parsed);
let mut allow_cache = true;

{
let cache = OBJECT_STORE_CACHE.read().await;
if let Some(store) = cache.get(&key) {
return Ok((cloud_location, store.clone()));
}
}

#[cfg(any(feature = "aws", feature = "gcp", feature = "azure"))]
let options = options.map(std::borrow::Cow::Borrowed).unwrap_or_default();

Expand Down Expand Up @@ -83,5 +114,13 @@ pub async fn build_object_store(
return err_missing_feature("http", &cloud_location.scheme);
},
}?;
if allow_cache {
let mut cache = OBJECT_STORE_CACHE.write().await;
// Clear the cache if we surpass a certain amount of buckets.
if cache.len() > 32 {
cache.clear()
}
cache.insert(key, store.clone());
}
Ok((cloud_location, store))
}
}

0 comments on commit 2c357ce

Please sign in to comment.