Skip to content

Commit

Permalink
feat: Support cloud storage in scan_csv (pola-rs#16674)
Browse files Browse the repository at this point in the history
  • Loading branch information
nameexhaustion authored and Wouittone committed Jun 22, 2024
1 parent f89f72b commit ebd37e5
Show file tree
Hide file tree
Showing 30 changed files with 1,700 additions and 63 deletions.
47 changes: 45 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 6 additions & 3 deletions crates/polars-io/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ ahash = { workspace = true }
arrow = { workspace = true }
async-trait = { version = "0.1.59", optional = true }
atoi_simd = { workspace = true, optional = true }
blake3 = { version = "1.5.1", optional = true }
bytes = { version = "1.3" }
chrono = { workspace = true, optional = true }
chrono-tz = { workspace = true, optional = true }
Expand All @@ -38,16 +39,17 @@ regex = { workspace = true }
reqwest = { workspace = true, optional = true }
ryu = { workspace = true, optional = true }
serde = { workspace = true, features = ["rc"], optional = true }
serde_json = { version = "1", default-features = false, features = ["alloc", "raw_value"], optional = true }
serde_json = { version = "1", default-features = false, features = ["alloc", "raw_value", "std"], optional = true }
simd-json = { workspace = true, optional = true }
simdutf8 = { workspace = true, optional = true }
smartstring = { workspace = true }
tokio = { workspace = true, features = ["net", "rt-multi-thread", "time", "sync"], optional = true }
tokio = { workspace = true, features = ["fs", "net", "rt-multi-thread", "time", "sync"], optional = true }
tokio-util = { workspace = true, features = ["io", "io-util"], optional = true }
url = { workspace = true, optional = true }
zstd = { workspace = true, optional = true }

[target.'cfg(not(target_family = "wasm"))'.dependencies]
fs4 = { version = "0.8.3", features = ["sync"], optional = true }
home = "0.5.4"

[dev-dependencies]
Expand Down Expand Up @@ -107,7 +109,8 @@ async = [
"polars-error/regex",
"polars-parquet?/async",
]
cloud = ["object_store", "async", "polars-error/object_store", "url", "serde_json", "serde"]
cloud = ["object_store", "async", "polars-error/object_store", "url", "serde_json", "serde", "file_cache"]
file_cache = ["async", "dep:blake3", "dep:fs4"]
aws = ["object_store/aws", "cloud", "reqwest"]
azure = ["object_store/azure", "cloud"]
gcp = ["object_store/gcp", "cloud"]
Expand Down
30 changes: 29 additions & 1 deletion crates/polars-io/src/cloud/polars_object_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ use std::ops::Range;
use std::sync::Arc;

use bytes::Bytes;
use futures::StreamExt;
use object_store::path::Path;
use object_store::{ObjectMeta, ObjectStore};
use polars_error::{to_compute_err, PolarsResult};
use tokio::io::AsyncWriteExt;

use crate::pl_async::{
tune_with_concurrency_budget, with_concurrency_budget, MAX_BUDGET_PER_REQUEST,
self, tune_with_concurrency_budget, with_concurrency_budget, MAX_BUDGET_PER_REQUEST,
};

/// Polars specific wrapper for `Arc<dyn ObjectStore>` that limits the number of
Expand Down Expand Up @@ -52,6 +54,32 @@ impl PolarsObjectStore {
.map_err(to_compute_err)
}

pub async fn download<F: tokio::io::AsyncWrite + std::marker::Unpin>(
&self,
path: &Path,
file: &mut F,
) -> PolarsResult<()> {
tune_with_concurrency_budget(1, || async {
let mut stream = self
.0
.get(path)
.await
.map_err(to_compute_err)?
.into_stream();

let mut len = 0;
while let Some(bytes) = stream.next().await {
let bytes = bytes.map_err(to_compute_err)?;
len += bytes.len();
file.write(bytes.as_ref()).await.map_err(to_compute_err)?;
}

PolarsResult::Ok(pl_async::Size::from(len as u64))
})
.await?;
Ok(())
}

/// Fetch the metadata of the parquet file, do not memoize it.
pub async fn head(&self, path: &Path) -> PolarsResult<ObjectMeta> {
with_concurrency_budget(1, || self.0.head(path))
Expand Down
22 changes: 19 additions & 3 deletions crates/polars-io/src/csv/read/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::path::PathBuf;
use memchr::memchr2_iter;
use num_traits::Pow;
use polars_core::prelude::*;
use polars_core::POOL;
use polars_core::{config, POOL};
use polars_utils::index::Bounded;
use polars_utils::slice::GetSaferUnchecked;
use rayon::prelude::*;
Expand All @@ -12,6 +12,7 @@ use super::buffer::Buffer;
use super::options::{CommentPrefix, NullValuesCompiled};
use super::splitfields::SplitFields;
use super::utils::get_file_chunks;
use crate::prelude::is_cloud_url;
use crate::utils::get_reader_bytes;

/// Read the number of rows without parsing columns
Expand All @@ -24,7 +25,22 @@ pub fn count_rows(
eol_char: u8,
has_header: bool,
) -> PolarsResult<usize> {
let mut reader = polars_utils::open_file(path)?;
let mut reader = if is_cloud_url(path) || config::force_async() {
#[cfg(feature = "cloud")]
{
crate::file_cache::FILE_CACHE
.get_entry(path.to_str().unwrap())
// Safety: This was initialized by schema inference.
.unwrap()
.try_open_assume_latest()?
}
#[cfg(not(feature = "cloud"))]
{
panic!("required feature `cloud` is not enabled")
}
} else {
polars_utils::open_file(path)?
};
let reader_bytes = get_reader_bytes(&mut reader)?;
const MIN_ROWS_PER_THREAD: usize = 1024;
let max_threads = POOL.current_num_threads();
Expand All @@ -44,7 +60,7 @@ pub fn count_rows(
})
.unwrap_or(1);

let file_chunks = get_file_chunks(
let file_chunks: Vec<(usize, usize)> = get_file_chunks(
&reader_bytes,
n_threads,
None,
Expand Down
151 changes: 151 additions & 0 deletions crates/polars-io/src/file_cache/cache.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
use std::path::Path;
use std::sync::{Arc, RwLock};
use std::time::Duration;

use once_cell::sync::Lazy;
use polars_core::config;
use polars_error::PolarsResult;
use polars_utils::aliases::PlHashMap;

use super::entry::{FileCacheEntry, DATA_PREFIX, METADATA_PREFIX};
use super::eviction::EvictionManager;
use super::file_fetcher::FileFetcher;
use super::utils::FILE_CACHE_PREFIX;
use crate::prelude::is_cloud_url;

pub static FILE_CACHE: Lazy<FileCache> = Lazy::new(|| {
let prefix = FILE_CACHE_PREFIX.as_ref();
let prefix = Arc::<Path>::from(prefix);

if config::verbose() {
eprintln!("file cache prefix: {}", prefix.to_str().unwrap());
}

EvictionManager {
prefix: prefix.clone(),
files_to_remove: None,
limit_since_last_access: Duration::from_secs(
std::env::var("POLARS_FILE_CACHE_TTL")
.map(|x| x.parse::<u64>().expect("integer"))
.unwrap_or(60 * 60),
),
}
.run_in_background();

FileCache::new(prefix)
});

pub struct FileCache {
prefix: Arc<Path>,
entries: Arc<RwLock<PlHashMap<Arc<str>, Arc<FileCacheEntry>>>>,
}

impl FileCache {
fn new(prefix: Arc<Path>) -> Self {
let path = &prefix
.as_ref()
.join(std::str::from_utf8(&[METADATA_PREFIX]).unwrap());
let _ = std::fs::create_dir_all(path);
assert!(
path.is_dir(),
"failed to create file cache metadata directory: {}",
path.to_str().unwrap(),
);

let path = &prefix
.as_ref()
.join(std::str::from_utf8(&[DATA_PREFIX]).unwrap());
let _ = std::fs::create_dir_all(path);
assert!(
path.is_dir(),
"failed to create file cache data directory: {}",
path.to_str().unwrap(),
);

Self {
prefix,
entries: Default::default(),
}
}

/// If `uri` is a local path, it must be an absolute path.
pub fn init_entry<F: Fn() -> PolarsResult<Arc<dyn FileFetcher>>>(
&self,
uri: Arc<str>,
get_file_fetcher: F,
) -> PolarsResult<Arc<FileCacheEntry>> {
let verbose = config::verbose();

#[cfg(debug_assertions)]
{
// Local paths must be absolute or else the cache would be wrong.
if !crate::utils::is_cloud_url(uri.as_ref()) {
let path = Path::new(uri.as_ref());
assert_eq!(path, std::fs::canonicalize(path).unwrap().as_path());
}
}

{
let entries = self.entries.read().unwrap();

if let Some(entry) = entries.get(uri.as_ref()) {
if verbose {
eprintln!(
"[file_cache] init_entry: return existing entry for uri = {}",
uri.clone()
);
}
return Ok(entry.clone());
}
}

let uri_hash = blake3::hash(uri.as_bytes())
.to_hex()
.get(..32)
.unwrap()
.to_string();

{
let mut entries = self.entries.write().unwrap();

// May have been raced
if let Some(entry) = entries.get(uri.as_ref()) {
if verbose {
eprintln!("[file_cache] init_entry: return existing entry for uri = {} (lost init race)", uri.clone());
}
return Ok(entry.clone());
}

if verbose {
eprintln!(
"[file_cache] init_entry: creating new entry for uri = {}, hash = {}",
uri.clone(),
uri_hash.clone()
);
}

let entry = Arc::new(FileCacheEntry::new(
uri.clone(),
uri_hash,
self.prefix.clone(),
get_file_fetcher()?,
));
entries.insert_unique_unchecked(uri, entry.clone());
Ok(entry.clone())
}
}

/// This function can accept relative local paths.
pub fn get_entry(&self, uri: &str) -> Option<Arc<FileCacheEntry>> {
if is_cloud_url(uri) {
self.entries.read().unwrap().get(uri).map(Arc::clone)
} else {
let uri = std::fs::canonicalize(uri).unwrap();
self.entries
.read()
.unwrap()
.get(uri.to_str().unwrap())
.map(Arc::clone)
}
}
}
Loading

0 comments on commit ebd37e5

Please sign in to comment.