diff --git a/Cargo.lock b/Cargo.lock index 0d9577f2d237..87d7b627f840 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -155,6 +155,18 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bf7d0a018de4f6aa429b9d33d69edf69072b1c5b1cb8d3e4a5f7ef898fc3eb76" +[[package]] +name = "arrayref" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6b4930d2cb77ce62f89ee5d5289b4ac049559b1c45539271f5ed4fdc7db34545" + +[[package]] +name = "arrayvec" +version = "0.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "96d30a06541fbafbc7f82ed10c06164cfbd2c401138f6addd8404629c4b16711" + [[package]] name = "arrow-array" version = "51.0.0" @@ -737,6 +749,19 @@ dependencies = [ "serde", ] +[[package]] +name = "blake3" +version = "1.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30cca6d3674597c30ddf2c587bf8d9d65c9a84d2326d941cc79c9842dfe0ef52" +dependencies = [ + "arrayref", + "arrayvec", + "cc", + "cfg-if", + "constant_time_eq", +] + [[package]] name = "block-buffer" version = "0.10.4" @@ -757,9 +782,9 @@ dependencies = [ [[package]] name = "brotli" -version = "6.0.0" +version = "5.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "74f7971dbd9326d58187408ab83117d8ac1bb9c17b085fdacd1cf2f598719b6b" +checksum = "19483b140a7ac7174d34b5a581b406c64f84da5409d3e09cf4fff604f9270e67" dependencies = [ "alloc-no-stdlib", "alloc-stdlib", @@ -1016,6 +1041,12 @@ dependencies = [ "tiny-keccak", ] +[[package]] +name = "constant_time_eq" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f7144d30dcf0fafbce74250a3963025d8d52177934239851c917d29f1df280c2" + [[package]] name = "core-foundation" version = "0.9.4" @@ -1439,6 +1470,16 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "fs4" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "73969b81e8bc90a3828d913dd3973d80771bfb9d7fbe1a78a79122aad456af15" +dependencies = [ + "rustix", + "windows-sys 0.52.0", +] + [[package]] name = "futures" version = "0.3.30" @@ -2973,11 +3014,13 @@ dependencies = [ "ahash", "async-trait", "atoi_simd", + "blake3", "bytes", "chrono", "chrono-tz", "fast-float", "flate2", + "fs4", "futures", "home", "itoa", diff --git a/crates/polars-io/Cargo.toml b/crates/polars-io/Cargo.toml index ca188cecad1b..09e56a341c3c 100644 --- a/crates/polars-io/Cargo.toml +++ b/crates/polars-io/Cargo.toml @@ -20,6 +20,7 @@ ahash = { workspace = true } arrow = { workspace = true } async-trait = { version = "0.1.59", optional = true } atoi_simd = { workspace = true, optional = true } +blake3 = { version = "1.5.1", optional = true } bytes = { version = "1.3" } chrono = { workspace = true, optional = true } chrono-tz = { workspace = true, optional = true } @@ -38,16 +39,17 @@ regex = { workspace = true } reqwest = { workspace = true, optional = true } ryu = { workspace = true, optional = true } serde = { workspace = true, features = ["rc"], optional = true } -serde_json = { version = "1", default-features = false, features = ["alloc", "raw_value"], optional = true } +serde_json = { version = "1", default-features = false, features = ["alloc", "raw_value", "std"], optional = true } simd-json = { workspace = true, optional = true } simdutf8 = { workspace = true, optional = true } smartstring = { workspace = true } -tokio = { workspace = true, features = ["net", "rt-multi-thread", "time", "sync"], optional = true } +tokio = { workspace = true, features = ["fs", "net", "rt-multi-thread", "time", "sync"], optional = true } tokio-util = { workspace = true, features = ["io", "io-util"], optional = true } url = { workspace = true, optional = true } zstd = { workspace = true, optional = true } [target.'cfg(not(target_family = "wasm"))'.dependencies] +fs4 = { version = "0.8.3", features = ["sync"], optional = true } home = "0.5.4" [dev-dependencies] @@ -107,7 +109,8 @@ async = [ "polars-error/regex", "polars-parquet?/async", ] -cloud = ["object_store", "async", "polars-error/object_store", "url", "serde_json", "serde"] +cloud = ["object_store", "async", "polars-error/object_store", "url", "serde_json", "serde", "file_cache"] +file_cache = ["async", "dep:blake3", "dep:fs4"] aws = ["object_store/aws", "cloud", "reqwest"] azure = ["object_store/azure", "cloud"] gcp = ["object_store/gcp", "cloud"] diff --git a/crates/polars-io/src/cloud/polars_object_store.rs b/crates/polars-io/src/cloud/polars_object_store.rs index e22b658520c1..f2744432bfa0 100644 --- a/crates/polars-io/src/cloud/polars_object_store.rs +++ b/crates/polars-io/src/cloud/polars_object_store.rs @@ -2,12 +2,14 @@ use std::ops::Range; use std::sync::Arc; use bytes::Bytes; +use futures::StreamExt; use object_store::path::Path; use object_store::{ObjectMeta, ObjectStore}; use polars_error::{to_compute_err, PolarsResult}; +use tokio::io::AsyncWriteExt; use crate::pl_async::{ - tune_with_concurrency_budget, with_concurrency_budget, MAX_BUDGET_PER_REQUEST, + self, tune_with_concurrency_budget, with_concurrency_budget, MAX_BUDGET_PER_REQUEST, }; /// Polars specific wrapper for `Arc` that limits the number of @@ -52,6 +54,32 @@ impl PolarsObjectStore { .map_err(to_compute_err) } + pub async fn download( + &self, + path: &Path, + file: &mut F, + ) -> PolarsResult<()> { + tune_with_concurrency_budget(1, || async { + let mut stream = self + .0 + .get(path) + .await + .map_err(to_compute_err)? + .into_stream(); + + let mut len = 0; + while let Some(bytes) = stream.next().await { + let bytes = bytes.map_err(to_compute_err)?; + len += bytes.len(); + file.write(bytes.as_ref()).await.map_err(to_compute_err)?; + } + + PolarsResult::Ok(pl_async::Size::from(len as u64)) + }) + .await?; + Ok(()) + } + /// Fetch the metadata of the parquet file, do not memoize it. pub async fn head(&self, path: &Path) -> PolarsResult { with_concurrency_budget(1, || self.0.head(path)) diff --git a/crates/polars-io/src/csv/read/parser.rs b/crates/polars-io/src/csv/read/parser.rs index eff02dd4f19e..a28af966fa22 100644 --- a/crates/polars-io/src/csv/read/parser.rs +++ b/crates/polars-io/src/csv/read/parser.rs @@ -3,7 +3,7 @@ use std::path::PathBuf; use memchr::memchr2_iter; use num_traits::Pow; use polars_core::prelude::*; -use polars_core::POOL; +use polars_core::{config, POOL}; use polars_utils::index::Bounded; use polars_utils::slice::GetSaferUnchecked; use rayon::prelude::*; @@ -12,6 +12,7 @@ use super::buffer::Buffer; use super::options::{CommentPrefix, NullValuesCompiled}; use super::splitfields::SplitFields; use super::utils::get_file_chunks; +use crate::prelude::is_cloud_url; use crate::utils::get_reader_bytes; /// Read the number of rows without parsing columns @@ -24,7 +25,22 @@ pub fn count_rows( eol_char: u8, has_header: bool, ) -> PolarsResult { - let mut reader = polars_utils::open_file(path)?; + let mut reader = if is_cloud_url(path) || config::force_async() { + #[cfg(feature = "cloud")] + { + crate::file_cache::FILE_CACHE + .get_entry(path.to_str().unwrap()) + // Safety: This was initialized by schema inference. + .unwrap() + .try_open_assume_latest()? + } + #[cfg(not(feature = "cloud"))] + { + panic!("required feature `cloud` is not enabled") + } + } else { + polars_utils::open_file(path)? + }; let reader_bytes = get_reader_bytes(&mut reader)?; const MIN_ROWS_PER_THREAD: usize = 1024; let max_threads = POOL.current_num_threads(); @@ -44,7 +60,7 @@ pub fn count_rows( }) .unwrap_or(1); - let file_chunks = get_file_chunks( + let file_chunks: Vec<(usize, usize)> = get_file_chunks( &reader_bytes, n_threads, None, diff --git a/crates/polars-io/src/file_cache/cache.rs b/crates/polars-io/src/file_cache/cache.rs new file mode 100644 index 000000000000..a1710ed2c0bb --- /dev/null +++ b/crates/polars-io/src/file_cache/cache.rs @@ -0,0 +1,151 @@ +use std::path::Path; +use std::sync::{Arc, RwLock}; +use std::time::Duration; + +use once_cell::sync::Lazy; +use polars_core::config; +use polars_error::PolarsResult; +use polars_utils::aliases::PlHashMap; + +use super::entry::{FileCacheEntry, DATA_PREFIX, METADATA_PREFIX}; +use super::eviction::EvictionManager; +use super::file_fetcher::FileFetcher; +use super::utils::FILE_CACHE_PREFIX; +use crate::prelude::is_cloud_url; + +pub static FILE_CACHE: Lazy = Lazy::new(|| { + let prefix = FILE_CACHE_PREFIX.as_ref(); + let prefix = Arc::::from(prefix); + + if config::verbose() { + eprintln!("file cache prefix: {}", prefix.to_str().unwrap()); + } + + EvictionManager { + prefix: prefix.clone(), + files_to_remove: None, + limit_since_last_access: Duration::from_secs( + std::env::var("POLARS_FILE_CACHE_TTL") + .map(|x| x.parse::().expect("integer")) + .unwrap_or(60 * 60), + ), + } + .run_in_background(); + + FileCache::new(prefix) +}); + +pub struct FileCache { + prefix: Arc, + entries: Arc, Arc>>>, +} + +impl FileCache { + fn new(prefix: Arc) -> Self { + let path = &prefix + .as_ref() + .join(std::str::from_utf8(&[METADATA_PREFIX]).unwrap()); + let _ = std::fs::create_dir_all(path); + assert!( + path.is_dir(), + "failed to create file cache metadata directory: {}", + path.to_str().unwrap(), + ); + + let path = &prefix + .as_ref() + .join(std::str::from_utf8(&[DATA_PREFIX]).unwrap()); + let _ = std::fs::create_dir_all(path); + assert!( + path.is_dir(), + "failed to create file cache data directory: {}", + path.to_str().unwrap(), + ); + + Self { + prefix, + entries: Default::default(), + } + } + + /// If `uri` is a local path, it must be an absolute path. + pub fn init_entry PolarsResult>>( + &self, + uri: Arc, + get_file_fetcher: F, + ) -> PolarsResult> { + let verbose = config::verbose(); + + #[cfg(debug_assertions)] + { + // Local paths must be absolute or else the cache would be wrong. + if !crate::utils::is_cloud_url(uri.as_ref()) { + let path = Path::new(uri.as_ref()); + assert_eq!(path, std::fs::canonicalize(path).unwrap().as_path()); + } + } + + { + let entries = self.entries.read().unwrap(); + + if let Some(entry) = entries.get(uri.as_ref()) { + if verbose { + eprintln!( + "[file_cache] init_entry: return existing entry for uri = {}", + uri.clone() + ); + } + return Ok(entry.clone()); + } + } + + let uri_hash = blake3::hash(uri.as_bytes()) + .to_hex() + .get(..32) + .unwrap() + .to_string(); + + { + let mut entries = self.entries.write().unwrap(); + + // May have been raced + if let Some(entry) = entries.get(uri.as_ref()) { + if verbose { + eprintln!("[file_cache] init_entry: return existing entry for uri = {} (lost init race)", uri.clone()); + } + return Ok(entry.clone()); + } + + if verbose { + eprintln!( + "[file_cache] init_entry: creating new entry for uri = {}, hash = {}", + uri.clone(), + uri_hash.clone() + ); + } + + let entry = Arc::new(FileCacheEntry::new( + uri.clone(), + uri_hash, + self.prefix.clone(), + get_file_fetcher()?, + )); + entries.insert_unique_unchecked(uri, entry.clone()); + Ok(entry.clone()) + } + } + + /// This function can accept relative local paths. + pub fn get_entry(&self, uri: &str) -> Option> { + if is_cloud_url(uri) { + self.entries.read().unwrap().get(uri).map(Arc::clone) + } else { + let uri = std::fs::canonicalize(uri).unwrap(); + self.entries + .read() + .unwrap() + .get(uri.to_str().unwrap()) + .map(Arc::clone) + } + } +} diff --git a/crates/polars-io/src/file_cache/cache_lock.rs b/crates/polars-io/src/file_cache/cache_lock.rs new file mode 100644 index 000000000000..892c1e7aeb86 --- /dev/null +++ b/crates/polars-io/src/file_cache/cache_lock.rs @@ -0,0 +1,214 @@ +use std::sync::atomic::AtomicBool; +use std::sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard}; +use std::time::Duration; + +use fs4::FileExt; +use once_cell::sync::Lazy; +use polars_core::config; + +use super::utils::FILE_CACHE_PREFIX; +use crate::pl_async; + +pub(super) static GLOBAL_FILE_CACHE_LOCK: Lazy = Lazy::new(|| { + let path = FILE_CACHE_PREFIX.join(".process-lock"); + + let file = std::fs::OpenOptions::new() + .write(true) + .create(true) + .truncate(false) + .open(path) + .map_err(|err| { + panic!("failed to open/create global file cache lockfile: {}", err); + }) + .unwrap(); + + let at_bool = Arc::new(AtomicBool::new(false)); + // Holding this access tracker prevents the background task from + // unlocking the lock. + let access_tracker = AccessTracker(at_bool.clone()); + let notify_lock_acquired = Arc::new(tokio::sync::Notify::new()); + let notify_lock_acquired_2 = notify_lock_acquired.clone(); + + pl_async::get_runtime().spawn(async move { + let access_tracker = at_bool; + let notify_lock_acquired = notify_lock_acquired_2; + let verbose = config::verbose(); + + loop { + if verbose { + eprintln!("file cache background unlock: waiting for acquisition notification"); + } + + notify_lock_acquired.notified().await; + + if verbose { + eprintln!("file cache background unlock: got acquisition notification"); + } + + loop { + if !access_tracker.swap(false, std::sync::atomic::Ordering::Relaxed) { + if let Some(unlocked_by_this_call) = GLOBAL_FILE_CACHE_LOCK.try_unlock() { + if unlocked_by_this_call && verbose { + eprintln!( + "file cache background unlock: unlocked global file cache lockfile" + ); + } + break; + } + } + tokio::time::sleep(Duration::from_secs(3)).await; + } + } + }); + + GlobalLock { + inner: RwLock::new(GlobalLockData { file, state: None }), + access_tracker, + notify_lock_acquired, + } +}); + +pub(super) enum LockedState { + Shared, + #[allow(dead_code)] + Exclusive, +} + +#[allow(dead_code)] +pub(super) type GlobalFileCacheGuardAny<'a> = RwLockReadGuard<'a, GlobalLockData>; +pub(super) type GlobalFileCacheGuardExclusive<'a> = RwLockWriteGuard<'a, GlobalLockData>; + +pub(super) struct GlobalLockData { + file: std::fs::File, + state: Option, +} + +pub(super) struct GlobalLock { + inner: RwLock, + access_tracker: AccessTracker, + notify_lock_acquired: Arc, +} + +/// Tracks access to the global lock: +/// * The inner `bool` is used to delay the background unlock task from unlocking +/// the global lock until 3 seconds after the last lock attempt. +/// * The `Arc` ref-count is used as a semaphore that allows us to block exclusive +/// lock attempts while temporarily releasing the `RwLock`. +#[derive(Clone)] +struct AccessTracker(Arc); + +impl Drop for AccessTracker { + fn drop(&mut self) { + self.0.store(true, std::sync::atomic::Ordering::Relaxed); + } +} + +struct NotifyOnDrop(Arc); + +impl Drop for NotifyOnDrop { + fn drop(&mut self) { + self.0.notify_one(); + } +} + +impl GlobalLock { + fn get_access_tracker(&self) -> AccessTracker { + let at = self.access_tracker.clone(); + at.0.store(true, std::sync::atomic::Ordering::Relaxed); + at + } + + /// Returns + /// * `None` - Could be locked (ambiguous) + /// * `Some(true)` - Unlocked (by this function call) + /// * `Some(false)` - Unlocked (was not locked) + fn try_unlock(&self) -> Option { + if let Ok(mut this) = self.inner.try_write() { + if Arc::strong_count(&self.access_tracker.0) <= 2 { + return if this.state.take().is_some() { + this.file.unlock().unwrap(); + Some(true) + } else { + Some(false) + }; + } + } + None + } + + /// Acquire either a shared or exclusive lock. This always returns a read-guard + /// to allow for better parallelism within the current process. The tradeoff + /// is that we may hold an exclusive lock on the global lockfile for longer + /// than we need to (since we don't transition to a shared lock state), + /// which blocks other processes. + pub(super) fn lock_any(&self) -> GlobalFileCacheGuardAny { + let access_tracker = self.get_access_tracker(); + let _notify_on_drop = NotifyOnDrop(self.notify_lock_acquired.clone()); + + { + let this = self.inner.read().unwrap(); + + if this.state.is_some() { + return this; + } + } + + { + let mut this = self.inner.write().unwrap(); + + if this.state.is_none() { + this.file.lock_shared().unwrap(); + this.state = Some(LockedState::Shared); + } + } + + // Safety: Holding the access tracker guard maintains an Arc refcount + // > 2, which prevents automatic unlock. + debug_assert!(Arc::strong_count(&access_tracker.0) > 2); + + { + let this = self.inner.read().unwrap(); + assert!( + this.state.is_some(), + "impl error: global file cache lock was unlocked" + ); + this + } + } + + /// Acquire an exclusive lock on the cache directory. Holding this lock freezes + /// all cache operations except for reading from already-opened data files. + #[allow(dead_code)] + pub(super) fn try_lock_exclusive(&self) -> Option { + let access_tracker = self.get_access_tracker(); + + if let Ok(mut this) = self.inner.try_write() { + if + // 3: + // * the Lazy + // * the global unlock background task + // * this function + Arc::strong_count(&access_tracker.0) > 3 { + return None; + } + + let _notify_on_drop = NotifyOnDrop(self.notify_lock_acquired.clone()); + + if let Some(ref state) = this.state { + if matches!(state, LockedState::Exclusive) { + return Some(this); + } + } + + if this.state.take().is_some() { + this.file.unlock().unwrap(); + } + + if this.file.try_lock_exclusive().is_ok() { + this.state = Some(LockedState::Exclusive); + return Some(this); + } + } + None + } +} diff --git a/crates/polars-io/src/file_cache/entry.rs b/crates/polars-io/src/file_cache/entry.rs new file mode 100644 index 000000000000..775e1764e60b --- /dev/null +++ b/crates/polars-io/src/file_cache/entry.rs @@ -0,0 +1,352 @@ +use std::io::{Seek, SeekFrom}; +use std::ops::DerefMut; +use std::path::{Path, PathBuf}; +use std::sync::{Arc, Mutex}; + +use fs4::FileExt; +use polars_core::config; +use polars_error::{polars_bail, to_compute_err, PolarsError, PolarsResult}; +use polars_utils::flatten; + +use super::cache_lock::{self, GLOBAL_FILE_CACHE_LOCK}; +use super::file_fetcher::{FileFetcher, RemoteMetadata}; +use super::file_lock::{FileLock, FileLockAnyGuard}; +use super::metadata::EntryMetadata; +use super::utils::update_last_accessed; + +pub(super) const DATA_PREFIX: u8 = b'd'; +pub(super) const METADATA_PREFIX: u8 = b'm'; + +struct CachedData { + last_modified: u64, + metadata: Arc, + data_file_path: PathBuf, +} + +struct Inner { + uri: Arc, + uri_hash: String, + path_prefix: Arc, + metadata: FileLock, + cached_data: Option, + file_fetcher: Arc, +} + +struct EntryData { + uri: Arc, + inner: Mutex, +} + +#[derive(Clone)] +pub struct FileCacheEntry(Arc); + +impl EntryMetadata { + fn matches_remote_metadata(&self, remote_metadata: &RemoteMetadata) -> bool { + self.remote_last_modified == remote_metadata.last_modified + && self.local_size == remote_metadata.size + } +} + +impl Inner { + fn try_open_assume_latest(&mut self) -> PolarsResult { + let verbose = config::verbose(); + + { + let cache_guard = GLOBAL_FILE_CACHE_LOCK.lock_any(); + let metadata_file = &mut self.metadata.acquire_shared().unwrap(); + update_last_accessed(metadata_file); + + if let Ok(metadata) = self.try_get_metadata(metadata_file, &cache_guard) { + let data_file_path = self.get_cached_data_file_path(); + + if metadata.compare_local_state(data_file_path).is_ok() { + if verbose { + eprintln!("[file_cache::entry] try_open_assume_latest: opening already fetched file for uri = {}", self.uri.clone()); + } + return Ok(finish_open(data_file_path, metadata_file)); + } + } + } + + if verbose { + eprintln!( + "[file_cache::entry] try_open_assume_latest: did not find cached file for uri = {}", + self.uri.clone() + ); + } + + self.try_open_check_latest() + } + + fn try_open_check_latest(&mut self) -> PolarsResult { + let verbose = config::verbose(); + let remote_metadata = &self.file_fetcher.fetch_metadata()?; + let cache_guard = GLOBAL_FILE_CACHE_LOCK.lock_any(); + + { + let metadata_file = &mut self.metadata.acquire_shared().unwrap(); + update_last_accessed(metadata_file); + + if let Ok(metadata) = self.try_get_metadata(metadata_file, &cache_guard) { + if metadata.matches_remote_metadata(remote_metadata) { + let data_file_path = self.get_cached_data_file_path(); + + if metadata.compare_local_state(data_file_path).is_ok() { + if verbose { + eprintln!("[file_cache::entry] try_open_check_latest: opening already fetched file for uri = {}", self.uri.clone()); + } + return Ok(finish_open(data_file_path, metadata_file)); + } + } + } + } + + let metadata_file = &mut self.metadata.acquire_exclusive().unwrap(); + let metadata = self + .try_get_metadata(metadata_file, &cache_guard) + .unwrap_or_else(|_| Arc::new(EntryMetadata::new_with_uri(self.uri.clone()))); + + if metadata.matches_remote_metadata(remote_metadata) { + let data_file_path = self.get_cached_data_file_path(); + + if metadata.compare_local_state(data_file_path).is_ok() { + if verbose { + eprintln!( + "[file_cache::entry] try_open_check_latest: opening already fetched file (lost race) for uri = {}", + self.uri.clone() + ); + } + return Ok(finish_open(data_file_path, metadata_file)); + } + } + + if verbose { + eprintln!( + "[file_cache::entry] try_open_check_latest: fetching new data file for uri = {}, remote_last_modified = {}, remote_size = {}", + self.uri.clone(), + remote_metadata.last_modified, + remote_metadata.size + ); + } + + let data_file_path = &get_data_file_path( + self.path_prefix.to_str().unwrap().as_bytes(), + self.uri_hash.as_bytes(), + remote_metadata.last_modified, + ); + // Remove the file if it exists, since it doesn't match the metadata. + // This could be left from an aborted process. + let _ = std::fs::remove_file(data_file_path); + if !self.file_fetcher.fetches_as_symlink() { + let file = std::fs::OpenOptions::new() + .write(true) + .create(true) + .truncate(true) + .open(data_file_path) + .map_err(PolarsError::from)?; + file.lock_exclusive().unwrap(); + if file.allocate(remote_metadata.size).is_err() { + polars_bail!( + ComputeError: "failed to allocate {} bytes to download uri = {}", + remote_metadata.size, + self.uri.as_ref() + ); + } + } + self.file_fetcher.fetch(data_file_path)?; + + // Don't do this on windows as it will break setting last accessed times. + #[cfg(target_family = "unix")] + if !self.file_fetcher.fetches_as_symlink() { + let mut perms = std::fs::metadata(data_file_path.clone()) + .unwrap() + .permissions(); + perms.set_readonly(true); + std::fs::set_permissions(data_file_path, perms).unwrap(); + } + + let data_file_metadata = std::fs::metadata(data_file_path).unwrap(); + let local_last_modified = super::utils::last_modified_u64(&data_file_metadata); + let local_size = data_file_metadata.len(); + + if local_size != remote_metadata.size { + polars_bail!(ComputeError: "downloaded file size ({}) does not match expected size ({})", local_size, remote_metadata.size); + } + + let mut metadata = Arc::unwrap_or_clone(metadata); + metadata.local_last_modified = local_last_modified; + metadata.local_size = local_size; + metadata.remote_last_modified = remote_metadata.last_modified; + + if let Err(e) = metadata.compare_local_state(data_file_path) { + panic!("metadata mismatch after file fetch: {}", e); + } + + let data_file = finish_open(data_file_path, metadata_file); + + metadata_file.seek(SeekFrom::Start(0)).unwrap(); + metadata + .try_write(metadata_file.deref_mut()) + .map_err(to_compute_err)?; + + Ok(data_file) + } + + /// Try to read the metadata from disk. + fn try_get_metadata( + &mut self, + metadata_file: &mut F, + _cache_guard: &cache_lock::GlobalFileCacheGuardAny, + ) -> PolarsResult> { + let mut f = || { + let last_modified = super::utils::last_modified_u64(&metadata_file.metadata().unwrap()); + + if let Some(ref cached) = self.cached_data { + if cached.last_modified == last_modified { + return Ok(cached.metadata.clone()); + } + } + + // Ensure cache is unset if read fails + self.cached_data = None; + + let metadata = Arc::new( + EntryMetadata::try_from_reader(metadata_file.deref_mut()) + .map_err(to_compute_err)?, + ); + let data_file_path = get_data_file_path( + self.path_prefix.to_str().unwrap().as_bytes(), + self.uri_hash.as_bytes(), + metadata.remote_last_modified, + ); + self.cached_data = Some(CachedData { + last_modified, + metadata, + data_file_path, + }); + + Ok(self.cached_data.as_ref().unwrap().metadata.clone()) + }; + + f().map(|v| { + // "just in case" + // but would be cool if we saw this one day :D + if v.uri != self.uri { + unimplemented!( + "hash collision: uri1 = {}, uri2 = {}, hash = {}", + v.uri, + self.uri, + self.uri_hash, + ); + } + v + }) + } + + /// # Panics + /// Panics if `self.cached_data` is `None`. + fn get_cached_data_file_path(&self) -> &Path { + &self.cached_data.as_ref().unwrap().data_file_path + } +} + +impl FileCacheEntry { + pub(crate) fn new( + uri: Arc, + uri_hash: String, + path_prefix: Arc, + file_fetcher: Arc, + ) -> Self { + let metadata = FileLock::from(get_metadata_file_path( + path_prefix.to_str().unwrap().as_bytes(), + uri_hash.as_bytes(), + )); + + debug_assert!( + Arc::ptr_eq(&uri, file_fetcher.get_uri()), + "impl error: entry uri != file_fetcher uri" + ); + + Self(Arc::new(EntryData { + uri: uri.clone(), + inner: Mutex::new(Inner { + uri, + uri_hash, + path_prefix, + metadata, + cached_data: None, + file_fetcher, + }), + })) + } + + pub fn uri(&self) -> Arc { + self.0.uri.clone() + } + + /// Directly returns the cached file if it finds one without checking if + /// there is a newer version on the remote. This does not make any API calls + /// if it finds a cached file, otherwise it simply downloads the file. + pub fn try_open_assume_latest(&self) -> PolarsResult { + self.0.inner.lock().unwrap().try_open_assume_latest() + } + + /// Returns the cached file after ensuring it is up to date against the remote + /// This will always perform at least 1 API call for fetching metadata. + pub fn try_open_check_latest(&self) -> PolarsResult { + self.0.inner.lock().unwrap().try_open_check_latest() + } +} + +fn finish_open(data_file_path: &Path, _metadata_guard: &F) -> std::fs::File { + let file = { + #[cfg(not(target_family = "windows"))] + { + std::fs::OpenOptions::new() + .read(true) + .open(data_file_path) + .unwrap() + } + // windows requires write access to update the last accessed time + #[cfg(target_family = "windows")] + { + std::fs::OpenOptions::new() + .read(true) + .write(true) + .open(data_file_path) + .unwrap() + } + }; + update_last_accessed(&file); + if file.try_lock_shared().is_err() { + panic!( + "finish_open: could not acquire shared lock on data file at {}", + data_file_path.to_str().unwrap() + ); + } + file +} + +/// `[prefix]/d/[uri hash][last modified]` +fn get_data_file_path(path_prefix: &[u8], uri_hash: &[u8], remote_last_modified: u64) -> PathBuf { + let path = flatten( + &[ + path_prefix, + &[b'/', DATA_PREFIX, b'/'], + uri_hash, + format!("{:013x}", remote_last_modified).as_bytes(), + ], + None, + ); + PathBuf::from(std::str::from_utf8(&path).unwrap()) +} + +/// `[prefix]/m/[uri hash]` +fn get_metadata_file_path(path_prefix: &[u8], uri_hash: &[u8]) -> PathBuf { + let bytes = flatten( + &[path_prefix, &[b'/', METADATA_PREFIX, b'/'], uri_hash], + None, + ); + let s = std::str::from_utf8(bytes.as_slice()).unwrap(); + PathBuf::from(s) +} diff --git a/crates/polars-io/src/file_cache/eviction.rs b/crates/polars-io/src/file_cache/eviction.rs new file mode 100644 index 000000000000..a870da6be5f9 --- /dev/null +++ b/crates/polars-io/src/file_cache/eviction.rs @@ -0,0 +1,194 @@ +use std::path::{Path, PathBuf}; +use std::sync::Arc; +use std::time::{Duration, SystemTime}; + +use fs4::FileExt; +use polars_core::config; +use polars_error::PolarsResult; + +use super::cache_lock::{GlobalFileCacheGuardExclusive, GLOBAL_FILE_CACHE_LOCK}; +use crate::pl_async; + +pub(super) struct EvictionManager { + pub(super) prefix: Arc, + pub(super) files_to_remove: Option<(SystemTime, Vec)>, + pub(super) limit_since_last_access: Duration, +} + +impl EvictionManager { + pub(super) fn run_in_background(mut self) { + let verbose = config::verbose(); + let sleep_interval = std::cmp::max(self.limit_since_last_access.as_secs() / 4, 60); + + if verbose { + eprintln!( + "[EvictionManager] creating cache eviction background task with limit_since_last_accessed = {}, sleep_interval = {}", + self.limit_since_last_access.as_secs(), + sleep_interval, + ); + } + + pl_async::get_runtime().spawn(async move { + // Give some time at startup for other code to run. + tokio::time::sleep(Duration::from_secs(3)).await; + + loop { + let mut sleep_interval = sleep_interval; + + let this: &'static mut Self = unsafe { std::mem::transmute(&mut self) }; + + match tokio::task::spawn_blocking(|| this.update_file_list()) + .await + .unwrap() + { + Ok(_) if self.files_to_remove.as_ref().unwrap().1.is_empty() => {}, + Ok(_) => loop { + if let Some(guard) = GLOBAL_FILE_CACHE_LOCK.try_lock_exclusive() { + if verbose { + eprintln!( + "[EvictionManager] got exclusive cache lock, evicting {} files", + self.files_to_remove.as_ref().unwrap().1.len() + ); + } + + tokio::task::block_in_place(|| self.evict_files(&guard)); + break; + } + sleep_interval = sleep_interval.saturating_sub(7); + tokio::time::sleep(Duration::from_secs(7)).await; + }, + Err(err) => { + if verbose { + eprintln!("[EvictionManager] error updating file list: {}", err); + } + }, + } + + tokio::time::sleep(Duration::from_secs(sleep_interval)).await; + } + }); + } + + fn update_file_list(&mut self) -> PolarsResult<()> { + let data_dir = &self.prefix.join("d/"); + let metadata_dir = &self.prefix.join("m/"); + + let data_files_iter = std::fs::read_dir(data_dir).unwrap(); + let metadata_files_iter = std::fs::read_dir(metadata_dir).unwrap(); + let mut files_to_remove = Vec::with_capacity( + data_files_iter + .size_hint() + .1 + .unwrap_or(data_files_iter.size_hint().0) + + metadata_files_iter + .size_hint() + .1 + .unwrap_or(metadata_files_iter.size_hint().0), + ); + + let now = SystemTime::now(); + + let mut f = |file: std::fs::DirEntry| { + let metadata = file.metadata()?; + + if let Ok(since_last_accessed) = now.duration_since( + metadata + .accessed() + .unwrap_or_else(|_| metadata.modified().unwrap()), + ) { + if since_last_accessed >= self.limit_since_last_access { + files_to_remove.push(file.path()); + } + } + + std::io::Result::Ok(()) + }; + + for file in data_files_iter { + f(file?)?; + } + + for file in metadata_files_iter { + f(file?)?; + } + + self.files_to_remove = Some((now, files_to_remove)); + + Ok(()) + } + + /// # Panics + /// Panics if `self.files_to_remove` is `None`. + fn evict_files(&mut self, _guard: &GlobalFileCacheGuardExclusive) { + let verbose = config::verbose(); + let files_to_remove = self.files_to_remove.take().unwrap().1; + let now = SystemTime::now(); + + for path in &files_to_remove { + if !path.exists() { + if verbose { + eprintln!( + "[EvictionManager] evict_files: skipping {} (path no longer exists)", + path.to_str().unwrap() + ); + } + continue; + } + + let metadata = std::fs::metadata(path).unwrap(); + + let since_last_accessed = match now.duration_since( + metadata + .accessed() + .unwrap_or_else(|_| metadata.modified().unwrap()), + ) { + Ok(v) => v, + Err(_) => { + if verbose { + eprintln!("[EvictionManager] evict_files: skipping {} (last accessed time was updated)", path.to_str().unwrap()); + } + continue; + }, + }; + + if since_last_accessed < self.limit_since_last_access { + if verbose { + eprintln!( + "[EvictionManager] evict_files: skipping {} (last accessed time was updated)", + path.to_str().unwrap() + ); + } + continue; + } + + let file = std::fs::OpenOptions::new().read(true).open(path).unwrap(); + + if file.try_lock_exclusive().is_err() { + if verbose { + eprintln!( + "[EvictionManager] evict_files: skipping {} (file is locked)", + path.to_str().unwrap() + ); + } + continue; + } + + drop(file); + + if let Err(err) = std::fs::remove_file(path) { + if verbose { + eprintln!( + "[EvictionManager] evict_files: error removing file: {} ({})", + path.to_str().unwrap(), + err + ); + } + } else { + eprintln!( + "[EvictionManager] evict_files: removed file at {}", + path.to_str().unwrap() + ); + } + } + } +} diff --git a/crates/polars-io/src/file_cache/file_fetcher.rs b/crates/polars-io/src/file_cache/file_fetcher.rs new file mode 100644 index 000000000000..fd8303ecab28 --- /dev/null +++ b/crates/polars-io/src/file_cache/file_fetcher.rs @@ -0,0 +1,121 @@ +use std::sync::Arc; + +use polars_error::{PolarsError, PolarsResult}; + +use super::utils::last_modified_u64; +use crate::cloud::PolarsObjectStore; +use crate::pl_async; + +pub trait FileFetcher: Send + Sync { + fn get_uri(&self) -> &Arc; + fn fetch_metadata(&self) -> PolarsResult; + /// Fetches the object to a `local_path`. + fn fetch(&self, local_path: &std::path::Path) -> PolarsResult<()>; + fn fetches_as_symlink(&self) -> bool; +} + +pub struct RemoteMetadata { + pub size: u64, + pub last_modified: u64, +} + +/// A struct that fetches data from local disk and stores it into the `cache`. +/// Mostly used for debugging, it only ever gets called if `POLARS_FORCE_ASYNC` is set. +pub(super) struct LocalFileFetcher { + uri: Arc, + path: Box, +} + +impl LocalFileFetcher { + pub(super) fn from_uri(uri: Arc) -> Self { + let path = std::path::PathBuf::from(uri.as_ref()).into_boxed_path(); + debug_assert_eq!( + path, + std::fs::canonicalize(&path).unwrap().into_boxed_path() + ); + + Self { uri, path } + } +} + +impl FileFetcher for LocalFileFetcher { + fn get_uri(&self) -> &Arc { + &self.uri + } + + fn fetches_as_symlink(&self) -> bool { + #[cfg(target_family = "unix")] + { + true + } + #[cfg(not(target_family = "unix"))] + { + false + } + } + + fn fetch_metadata(&self) -> PolarsResult { + let metadata = std::fs::metadata(&self.path).map_err(PolarsError::from)?; + + Ok(RemoteMetadata { + size: metadata.len(), + last_modified: last_modified_u64(&metadata), + }) + } + + fn fetch(&self, local_path: &std::path::Path) -> PolarsResult<()> { + #[cfg(target_family = "unix")] + { + std::os::unix::fs::symlink(&self.path, local_path).map_err(PolarsError::from) + } + #[cfg(not(target_family = "unix"))] + { + std::fs::copy(&self.path, local_path).map_err(PolarsError::from)?; + Ok(()) + } + } +} + +pub(super) struct CloudFileFetcher { + pub(super) uri: Arc, + pub(super) cloud_path: object_store::path::Path, + pub(super) object_store: PolarsObjectStore, +} + +impl FileFetcher for CloudFileFetcher { + fn get_uri(&self) -> &Arc { + &self.uri + } + + fn fetches_as_symlink(&self) -> bool { + false + } + + fn fetch_metadata(&self) -> PolarsResult { + let metadata = pl_async::get_runtime() + .block_on_potential_spawn(self.object_store.head(&self.cloud_path))?; + + Ok(RemoteMetadata { + size: metadata.size as u64, + last_modified: metadata.last_modified.timestamp_millis() as u64, + }) + } + + fn fetch(&self, local_path: &std::path::Path) -> PolarsResult<()> { + pl_async::get_runtime().block_on_potential_spawn(async { + let file = &mut tokio::fs::OpenOptions::new() + .write(true) + .truncate(true) + .open(local_path) + .await + .map_err(PolarsError::from)?; + + self.object_store.download(&self.cloud_path, file).await?; + // Dropping is delayed for tokio async files so we need to explicitly + // flush here (https://github.com/tokio-rs/tokio/issues/2307#issuecomment-596336451). + file.sync_all().await.map_err(PolarsError::from)?; + PolarsResult::Ok(()) + })?; + Ok(()) + } +} diff --git a/crates/polars-io/src/file_cache/file_lock.rs b/crates/polars-io/src/file_cache/file_lock.rs new file mode 100644 index 000000000000..7bd305ebf4fa --- /dev/null +++ b/crates/polars-io/src/file_cache/file_lock.rs @@ -0,0 +1,86 @@ +use std::fs::{File, OpenOptions}; +use std::path::Path; + +use fs4::FileExt; + +/// Note: this creates the file if it does not exist when acquiring locks. +pub(super) struct FileLock>(T); +pub(super) struct FileLockSharedGuard(File); +pub(super) struct FileLockExclusiveGuard(File); + +/// Trait to specify a file is lock-guarded without needing a particular type of +/// guard (i.e. shared/exclusive). +pub(super) trait FileLockAnyGuard: + std::ops::Deref + std::ops::DerefMut +{ +} +impl FileLockAnyGuard for FileLockSharedGuard {} +impl FileLockAnyGuard for FileLockExclusiveGuard {} + +impl> From for FileLock { + fn from(path: T) -> Self { + Self(path) + } +} + +impl> FileLock { + pub(super) fn acquire_shared(&self) -> Result { + let file = OpenOptions::new() + .create(true) + .truncate(false) + .read(true) + .write(true) + .open(self.0.as_ref())?; + file.lock_shared().map(|_| FileLockSharedGuard(file)) + } + + pub(super) fn acquire_exclusive(&self) -> Result { + let file = OpenOptions::new() + .create(true) + .truncate(false) + .read(true) + .write(true) + .open(self.0.as_ref())?; + file.lock_exclusive().map(|_| FileLockExclusiveGuard(file)) + } +} + +impl std::ops::Deref for FileLockSharedGuard { + type Target = File; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl std::ops::DerefMut for FileLockSharedGuard { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + +impl Drop for FileLockSharedGuard { + fn drop(&mut self) { + self.0.unlock().unwrap(); + } +} + +impl std::ops::Deref for FileLockExclusiveGuard { + type Target = File; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl std::ops::DerefMut for FileLockExclusiveGuard { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + +impl Drop for FileLockExclusiveGuard { + fn drop(&mut self) { + self.0.unlock().unwrap(); + } +} diff --git a/crates/polars-io/src/file_cache/metadata.rs b/crates/polars-io/src/file_cache/metadata.rs new file mode 100644 index 000000000000..30ae4724d82d --- /dev/null +++ b/crates/polars-io/src/file_cache/metadata.rs @@ -0,0 +1,83 @@ +use std::path::Path; +use std::sync::Arc; + +use serde::{Deserialize, Serialize}; + +#[derive(Debug)] +pub enum LocalCompareError { + LastModifiedMismatch { expected: u64, actual: u64 }, + SizeMismatch { expected: u64, actual: u64 }, + DataFileReadError(std::io::Error), +} + +pub type LocalCompareResult = Result<(), LocalCompareError>; + +/// Metadata written to a file used to track state / synchronize across processes. +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub(super) struct EntryMetadata { + pub(super) uri: Arc, + pub(super) local_last_modified: u64, + pub(super) local_size: u64, + pub(super) remote_last_modified: u64, +} + +impl std::fmt::Display for LocalCompareError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::LastModifiedMismatch { expected, actual } => write!( + f, + "last modified time mismatch: expected {}, found {}", + expected, actual + ), + Self::SizeMismatch { expected, actual } => { + write!(f, "size mismatch: expected {}, found {}", expected, actual) + }, + Self::DataFileReadError(err) => { + write!(f, "failed to read local file metadata: {}", err) + }, + } + } +} + +impl EntryMetadata { + pub(super) fn new_with_uri(uri: Arc) -> Self { + Self { + uri, + local_last_modified: 0, + local_size: 0, + remote_last_modified: 0, + } + } + + pub(super) fn compare_local_state(&self, data_file_path: &Path) -> LocalCompareResult { + let metadata = match std::fs::metadata(data_file_path) { + Ok(v) => v, + Err(e) => return Err(LocalCompareError::DataFileReadError(e)), + }; + + let local_last_modified = super::utils::last_modified_u64(&metadata); + let local_size = metadata.len(); + + if local_last_modified != self.local_last_modified { + Err(LocalCompareError::LastModifiedMismatch { + expected: self.local_last_modified, + actual: local_last_modified, + }) + } else if local_size != self.local_size { + Err(LocalCompareError::SizeMismatch { + expected: self.local_size, + actual: local_size, + }) + } else { + Ok(()) + } + } + + pub(super) fn try_write(&self, writer: &mut W) -> serde_json::Result<()> { + serde_json::to_writer(writer, self) + } + + pub(super) fn try_from_reader(reader: &mut R) -> serde_json::Result { + serde_json::from_reader(reader) + } +} diff --git a/crates/polars-io/src/file_cache/mod.rs b/crates/polars-io/src/file_cache/mod.rs new file mode 100644 index 000000000000..78a81148736a --- /dev/null +++ b/crates/polars-io/src/file_cache/mod.rs @@ -0,0 +1,11 @@ +mod cache; +mod cache_lock; +mod entry; +mod eviction; +mod file_fetcher; +mod file_lock; +mod metadata; +mod utils; +pub use cache::FILE_CACHE; +pub use entry::FileCacheEntry; +pub use utils::init_entries_from_uri_list; diff --git a/crates/polars-io/src/file_cache/utils.rs b/crates/polars-io/src/file_cache/utils.rs new file mode 100644 index 000000000000..84d6ca6556da --- /dev/null +++ b/crates/polars-io/src/file_cache/utils.rs @@ -0,0 +1,110 @@ +use std::path::Path; +use std::sync::Arc; +use std::time::UNIX_EPOCH; + +use once_cell::sync::Lazy; +use polars_error::{to_compute_err, PolarsError, PolarsResult}; + +use super::cache::FILE_CACHE; +use super::entry::FileCacheEntry; +use super::file_fetcher::{CloudFileFetcher, LocalFileFetcher}; +use crate::cloud::{build_object_store, CloudLocation, CloudOptions, PolarsObjectStore}; +use crate::pl_async; +use crate::prelude::{is_cloud_url, POLARS_TEMP_DIR_BASE_PATH}; + +pub(super) static FILE_CACHE_PREFIX: Lazy> = Lazy::new(|| { + let path = POLARS_TEMP_DIR_BASE_PATH + .join("file-cache/") + .into_boxed_path(); + + if let Err(err) = std::fs::create_dir_all(path.as_ref()) { + if !path.is_dir() { + panic!("failed to create file cache directory: {}", err); + } + } + + path +}); + +pub(super) fn last_modified_u64(metadata: &std::fs::Metadata) -> u64 { + metadata + .modified() + .unwrap() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_millis() as u64 +} + +pub(super) fn update_last_accessed(file: &std::fs::File) { + let file_metadata = file.metadata().unwrap(); + + if let Err(e) = file.set_times( + std::fs::FileTimes::new() + .set_modified(file_metadata.modified().unwrap()) + .set_accessed(std::time::SystemTime::now()), + ) { + panic!("failed to update file last accessed time: {}", e); + } +} + +pub fn init_entries_from_uri_list]>>( + uri_list: A, + cloud_options: Option<&CloudOptions>, +) -> PolarsResult>> { + let uri_list = uri_list.as_ref(); + + if uri_list.is_empty() { + return Ok(Default::default()); + } + + let first_uri = uri_list.first().unwrap().as_ref(); + + if is_cloud_url(first_uri) { + let (_, object_store) = pl_async::get_runtime() + .block_on_potential_spawn(build_object_store(first_uri, cloud_options))?; + let object_store = PolarsObjectStore::new(object_store); + + uri_list + .iter() + .map(|uri| { + FILE_CACHE.init_entry(uri.clone(), || { + let CloudLocation { + prefix, expansion, .. + } = CloudLocation::new(uri.as_ref()).unwrap(); + + let cloud_path = { + assert!(expansion.is_none(), "path should not contain wildcards"); + object_store::path::Path::from_url_path(prefix).map_err(to_compute_err)? + }; + + let object_store = object_store.clone(); + let uri = uri.clone(); + + Ok(Arc::new(CloudFileFetcher { + uri, + object_store, + cloud_path, + })) + }) + }) + .collect::>>() + } else { + uri_list + .iter() + .map(|uri| { + let uri = std::fs::canonicalize(uri.as_ref()).map_err(|err| { + let msg = Some(format!("{}: {}", err, uri.as_ref()).into()); + PolarsError::IO { + error: err.into(), + msg, + } + })?; + let uri = Arc::::from(uri.to_str().unwrap()); + + FILE_CACHE.init_entry(uri.clone(), || { + Ok(Arc::new(LocalFileFetcher::from_uri(uri.clone()))) + }) + }) + .collect::>>() + } +} diff --git a/crates/polars-io/src/lib.rs b/crates/polars-io/src/lib.rs index 6e9f4f691e8f..d5fc527b822b 100644 --- a/crates/polars-io/src/lib.rs +++ b/crates/polars-io/src/lib.rs @@ -7,6 +7,8 @@ pub mod avro; pub mod cloud; #[cfg(any(feature = "csv", feature = "json"))] pub mod csv; +#[cfg(feature = "file_cache")] +pub mod file_cache; #[cfg(any(feature = "ipc", feature = "ipc_streaming"))] pub mod ipc; #[cfg(feature = "json")] diff --git a/crates/polars-io/src/pl_async.rs b/crates/polars-io/src/pl_async.rs index 9ff4494250c2..c8b5d4ddad78 100644 --- a/crates/polars-io/src/pl_async.rs +++ b/crates/polars-io/src/pl_async.rs @@ -37,6 +37,20 @@ impl GetSize for Result { } } +pub(crate) struct Size(u64); + +impl GetSize for Size { + fn size(&self) -> u64 { + self.0 + } +} + +impl From for Size { + fn from(value: u64) -> Self { + Self(value) + } +} + enum Optimization { Step, Accept, diff --git a/crates/polars-io/src/utils.rs b/crates/polars-io/src/utils.rs index abee4e91a1a6..fc5afc8a0ac6 100644 --- a/crates/polars-io/src/utils.rs +++ b/crates/polars-io/src/utils.rs @@ -11,6 +11,23 @@ use regex::{Regex, RegexBuilder}; use crate::mmap::{MmapBytesReader, ReaderBytes}; +pub static POLARS_TEMP_DIR_BASE_PATH: Lazy> = Lazy::new(|| { + let path = std::env::var("POLARS_TEMP_DIR") + .map(PathBuf::from) + .unwrap_or_else(|_| { + PathBuf::from(std::env::temp_dir().to_string_lossy().as_ref()).join("polars/") + }) + .into_boxed_path(); + + if let Err(err) = std::fs::create_dir_all(path.as_ref()) { + if !path.is_dir() { + panic!("failed to create temporary directory: {}", err); + } + } + + path +}); + pub fn get_reader_bytes<'a, R: Read + MmapBytesReader + ?Sized>( reader: &'a mut R, ) -> PolarsResult> { diff --git a/crates/polars-lazy/src/physical_plan/executors/scan/csv.rs b/crates/polars-lazy/src/physical_plan/executors/scan/csv.rs index a163e58efca3..8d9af9c028f5 100644 --- a/crates/polars-lazy/src/physical_plan/executors/scan/csv.rs +++ b/crates/polars-lazy/src/physical_plan/executors/scan/csv.rs @@ -1,7 +1,7 @@ use std::path::PathBuf; use std::sync::Arc; -use polars_core::config::verbose; +use polars_core::config; use polars_core::utils::{ accumulate_dataframes_vertical, accumulate_dataframes_vertical_unchecked, }; @@ -42,7 +42,51 @@ impl CsvExec { .with_row_index(None) .with_path::<&str>(None); - let verbose = verbose(); + if self.paths.is_empty() { + let out = if let Some(schema) = options_base.schema { + DataFrame::from_rows_and_schema(&[], schema.as_ref())? + } else { + Default::default() + }; + return Ok(out); + } + + let verbose = config::verbose(); + let force_async = config::force_async(); + let run_async = force_async || is_cloud_url(self.paths.first().unwrap()); + + if force_async && verbose { + eprintln!("ASYNC READING FORCED"); + } + + let finish_read = + |i: usize, options: CsvReadOptions, predicate: Option>| { + if run_async { + #[cfg(feature = "cloud")] + { + options + .into_reader_with_file_handle( + polars_io::file_cache::FILE_CACHE + .get_entry(self.paths.get(i).unwrap().to_str().unwrap()) + // Safety: This was initialized by schema inference. + .unwrap() + .try_open_assume_latest()?, + ) + ._with_predicate(predicate.clone()) + .finish() + } + #[cfg(not(feature = "cloud"))] + { + panic!("required feature `cloud` is not enabled") + } + } else { + options + .try_into_reader_with_file_path(Some(self.paths.get(i).unwrap().clone())) + .unwrap() + ._with_predicate(predicate.clone()) + .finish() + } + }; let mut df = if n_rows.is_some() || (predicate.is_some() && self.file_options.row_index.is_some()) @@ -62,19 +106,15 @@ impl CsvExec { .filter(|_| n_rows.is_none() && self.file_options.row_index.is_none()); for i in 0..self.paths.len() { - let path = &self.paths[i]; - - let mut df = options_base + let opts = options_base .clone() .with_row_index(self.file_options.row_index.clone().map(|mut ri| { ri.offset += n_rows_read as IdxSize; ri })) - .with_n_rows(n_rows.map(|n| n - n_rows_read)) - .try_into_reader_with_file_path(Some(path.clone())) - .unwrap() - ._with_predicate(predicate_during_read.clone()) - .finish()?; + .with_n_rows(n_rows.map(|n| n - n_rows_read)); + + let mut df = finish_read(i, opts, predicate_during_read.clone())?; n_rows_read = n_rows_read.saturating_add(df.height()); @@ -136,19 +176,14 @@ impl CsvExec { } let dfs = POOL.install(|| { - self.paths - .chunks(std::cmp::min(POOL.current_num_threads(), 128)) - .map(|paths| { - paths + let step = std::cmp::min(POOL.current_num_threads(), 128); + + (0..self.paths.len()) + .step_by(step) + .map(|start| { + (start..std::cmp::min(start.saturating_add(step), self.paths.len())) .into_par_iter() - .map(|path| { - options_base - .clone() - .try_into_reader_with_file_path(Some(path.clone())) - .unwrap() - ._with_predicate(predicate.clone()) - .finish() - }) + .map(|i| finish_read(i, options_base.clone(), predicate.clone())) .collect::>>() }) .collect::>>() diff --git a/crates/polars-lazy/src/physical_plan/planner/lp.rs b/crates/polars-lazy/src/physical_plan/planner/lp.rs index 884d1fc82667..754b0cf4d2e6 100644 --- a/crates/polars-lazy/src/physical_plan/planner/lp.rs +++ b/crates/polars-lazy/src/physical_plan/planner/lp.rs @@ -254,7 +254,7 @@ fn create_physical_plan_impl( match scan_type { #[cfg(feature = "csv")] - FileScan::Csv { options } => Ok(Box::new(executors::CsvExec { + FileScan::Csv { options, .. } => Ok(Box::new(executors::CsvExec { paths, file_info, options, diff --git a/crates/polars-lazy/src/scan/csv.rs b/crates/polars-lazy/src/scan/csv.rs index 785d807492dc..577441e3dee7 100644 --- a/crates/polars-lazy/src/scan/csv.rs +++ b/crates/polars-lazy/src/scan/csv.rs @@ -1,6 +1,7 @@ use std::path::{Path, PathBuf}; use polars_core::prelude::*; +use polars_io::cloud::CloudOptions; use polars_io::csv::read::{ infer_file_schema, CommentPrefix, CsvEncoding, CsvParseOptions, CsvReadOptions, NullValues, }; @@ -17,6 +18,7 @@ pub struct LazyCsvReader { glob: bool, cache: bool, read_options: CsvReadOptions, + cloud_options: Option, } #[cfg(feature = "csv")] @@ -38,6 +40,7 @@ impl LazyCsvReader { glob: true, cache: true, read_options: Default::default(), + cloud_options: Default::default(), } } @@ -203,6 +206,11 @@ impl LazyCsvReader { self } + pub fn with_cloud_options(mut self, cloud_options: Option) -> Self { + self.cloud_options = cloud_options; + self + } + /// Modify a schema before we run the lazy scanning. /// /// Important! Run this function latest in the builder! @@ -276,9 +284,10 @@ impl LazyFileListReader for LazyCsvReader { self.paths }; - let mut lf: LazyFrame = DslBuilder::scan_csv(paths, self.read_options, self.cache)? - .build() - .into(); + let mut lf: LazyFrame = + DslBuilder::scan_csv(paths, self.read_options, self.cache, self.cloud_options)? + .build() + .into(); lf.opt_state.file_caching = true; Ok(lf) } diff --git a/crates/polars-parquet/Cargo.toml b/crates/polars-parquet/Cargo.toml index 03ab5b8c1aa1..b30d86df794b 100644 --- a/crates/polars-parquet/Cargo.toml +++ b/crates/polars-parquet/Cargo.toml @@ -31,7 +31,7 @@ streaming-decompression = "0.1" async-stream = { version = "0.3.3", optional = true } -brotli = { version = "^6.0", optional = true } +brotli = { version = "^5.0", optional = true } flate2 = { workspace = true, optional = true } lz4 = { version = "1.24", optional = true } lz4_flex = { version = "0.11", optional = true } diff --git a/crates/polars-pipe/src/executors/sources/csv.rs b/crates/polars-pipe/src/executors/sources/csv.rs index 000ebdb17f0f..218915a76359 100644 --- a/crates/polars-pipe/src/executors/sources/csv.rs +++ b/crates/polars-pipe/src/executors/sources/csv.rs @@ -1,8 +1,9 @@ use std::fs::File; use std::path::PathBuf; -use polars_core::POOL; +use polars_core::{config, POOL}; use polars_io::csv::read::{BatchedCsvReader, CsvReadOptions, CsvReader}; +use polars_io::utils::is_cloud_url; use polars_plan::global::_set_n_rows_for_scan; use polars_plan::prelude::FileScanOptions; use polars_utils::iter::EnumerateIdxTrait; @@ -44,6 +45,14 @@ impl CsvSource { return Ok(()); } let path = &self.paths[self.current_path_idx]; + + let force_async = config::force_async(); + let run_async = force_async || is_cloud_url(path); + + if self.current_path_idx == 0 && force_async && self.verbose { + eprintln!("ASYNC READING FORCED"); + } + self.current_path_idx += 1; let options = self.options.clone().unwrap(); @@ -80,14 +89,33 @@ impl CsvSource { eprintln!("STREAMING CHUNK SIZE: {chunk_size} rows") } - let reader: CsvReader = options + let options = options .with_schema(Some(self.schema.clone())) .with_n_rows(n_rows) .with_columns(with_columns) .with_rechunk(false) - .with_row_index(row_index) - .with_path(Some(path)) - .try_into_reader_with_file_path(None)?; + .with_row_index(row_index); + + let reader: CsvReader = if run_async { + #[cfg(feature = "cloud")] + { + options.into_reader_with_file_handle( + polars_io::file_cache::FILE_CACHE + .get_entry(path.to_str().unwrap()) + // Safety: This was initialized by schema inference. + .unwrap() + .try_open_assume_latest()?, + ) + } + #[cfg(not(feature = "cloud"))] + { + panic!("required feature `cloud` is not enabled") + } + } else { + options + .with_path(Some(path)) + .try_into_reader_with_file_path(None)? + }; self.reader = Some(reader); let reader = self.reader.as_mut().unwrap(); diff --git a/crates/polars-pipe/src/pipeline/convert.rs b/crates/polars-pipe/src/pipeline/convert.rs index 2832a5c5ae69..e7dcba877d7c 100644 --- a/crates/polars-pipe/src/pipeline/convert.rs +++ b/crates/polars-pipe/src/pipeline/convert.rs @@ -99,7 +99,7 @@ where } match scan_type { #[cfg(feature = "csv")] - FileScan::Csv { options } => { + FileScan::Csv { options, .. } => { let src = sources::CsvSource::new( paths, file_info.schema, diff --git a/crates/polars-plan/src/logical_plan/builder_dsl.rs b/crates/polars-plan/src/logical_plan/builder_dsl.rs index a64356285839..5dd140f5f835 100644 --- a/crates/polars-plan/src/logical_plan/builder_dsl.rs +++ b/crates/polars-plan/src/logical_plan/builder_dsl.rs @@ -156,6 +156,7 @@ impl DslBuilder { paths: P, read_options: CsvReadOptions, cache: bool, + cloud_options: Option, ) -> PolarsResult { let paths = paths.into(); @@ -182,6 +183,7 @@ impl DslBuilder { predicate: None, scan_type: FileScan::Csv { options: read_options, + cloud_options, }, } .into()) diff --git a/crates/polars-plan/src/logical_plan/conversion/dsl_to_ir.rs b/crates/polars-plan/src/logical_plan/conversion/dsl_to_ir.rs index 70c79b8e8dc0..33be0ded29ab 100644 --- a/crates/polars-plan/src/logical_plan/conversion/dsl_to_ir.rs +++ b/crates/polars-plan/src/logical_plan/conversion/dsl_to_ir.rs @@ -116,8 +116,11 @@ pub fn to_alp_impl( file_info }, #[cfg(feature = "csv")] - FileScan::Csv { options, .. } => { - scans::csv_file_info(&paths, &file_options, options) + FileScan::Csv { + options, + cloud_options, + } => { + scans::csv_file_info(&paths, &file_options, options, cloud_options.as_ref()) .map_err(|e| e.context(failed_here!(csv scan)))? }, // FileInfo should be set. diff --git a/crates/polars-plan/src/logical_plan/conversion/scans.rs b/crates/polars-plan/src/logical_plan/conversion/scans.rs index 3249b02f983e..1c758f39100c 100644 --- a/crates/polars-plan/src/logical_plan/conversion/scans.rs +++ b/crates/polars-plan/src/logical_plan/conversion/scans.rs @@ -1,4 +1,3 @@ -use std::io::Read; use std::path::PathBuf; use either::Either; @@ -123,10 +122,11 @@ pub(super) fn csv_file_info( paths: &[PathBuf], file_options: &FileScanOptions, csv_options: &mut CsvReadOptions, + cloud_options: Option<&polars_io::cloud::CloudOptions>, ) -> PolarsResult { - use std::io::Seek; + use std::io::{Read, Seek}; - use polars_core::POOL; + use polars_core::{config, POOL}; use polars_io::csv::read::is_compressed; use polars_io::csv::read::schema_inference::SchemaInferenceResult; use polars_io::utils::get_reader_bytes; @@ -137,10 +137,47 @@ pub(super) fn csv_file_info( // * See if we can do this without downloading the entire file // prints the error message if paths is empty. - get_path(paths)?; + let first_path = get_path(paths)?; + let run_async = is_cloud_url(first_path) || config::force_async(); - let infer_schema_func = |path| { - let mut file = polars_utils::open_file(path)?; + let cache_entries = { + #[cfg(feature = "cloud")] + { + if run_async { + Some(polars_io::file_cache::init_entries_from_uri_list( + paths + .iter() + .map(|path| Arc::from(path.to_str().unwrap())) + .collect::>(), + cloud_options, + )?) + } else { + None + } + } + #[cfg(not(feature = "cloud"))] + { + if run_async { + panic!("required feature `cloud` is not enabled") + } + } + }; + + let infer_schema_func = |i| { + let mut file = if run_async { + #[cfg(feature = "cloud")] + { + let entry: &Arc = + cache_entries.as_ref().unwrap().get(i).unwrap(); + entry.try_open_check_latest()? + } + #[cfg(not(feature = "cloud"))] + { + panic!("required feature `cloud` is not enabled") + } + } else { + polars_utils::open_file(paths.get(i).unwrap())? + }; let mut magic_nr = [0u8; 4]; let res_len = file.read(&mut magic_nr)?; @@ -191,11 +228,9 @@ pub(super) fn csv_file_info( }; let si_results = POOL.join( - || infer_schema_func(paths.first().unwrap()), + || infer_schema_func(0), || { - paths - .get(1..) - .unwrap() + (1..paths.len()) .into_par_iter() .map(infer_schema_func) .reduce(|| Ok(Default::default()), merge_func) diff --git a/crates/polars-plan/src/logical_plan/file_scan.rs b/crates/polars-plan/src/logical_plan/file_scan.rs index 94295b1c0db1..4b00e0ecb5b6 100644 --- a/crates/polars-plan/src/logical_plan/file_scan.rs +++ b/crates/polars-plan/src/logical_plan/file_scan.rs @@ -15,7 +15,10 @@ use super::*; #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] pub enum FileScan { #[cfg(feature = "csv")] - Csv { options: CsvReadOptions }, + Csv { + options: CsvReadOptions, + cloud_options: Option, + }, #[cfg(feature = "parquet")] Parquet { options: ParquetOptions, @@ -41,7 +44,16 @@ impl PartialEq for FileScan { fn eq(&self, other: &Self) -> bool { match (self, other) { #[cfg(feature = "csv")] - (FileScan::Csv { options: l }, FileScan::Csv { options: r }) => l == r, + ( + FileScan::Csv { + options: l, + cloud_options: c_l, + }, + FileScan::Csv { + options: r, + cloud_options: c_r, + }, + ) => l == r && c_l == c_r, #[cfg(feature = "parquet")] ( FileScan::Parquet { @@ -80,7 +92,13 @@ impl Hash for FileScan { std::mem::discriminant(self).hash(state); match self { #[cfg(feature = "csv")] - FileScan::Csv { options } => options.hash(state), + FileScan::Csv { + options, + cloud_options, + } => { + options.hash(state); + cloud_options.hash(state); + }, #[cfg(feature = "parquet")] FileScan::Parquet { options, @@ -88,7 +106,7 @@ impl Hash for FileScan { metadata: _, } => { options.hash(state); - cloud_options.hash(state) + cloud_options.hash(state); }, #[cfg(feature = "ipc")] FileScan::Ipc { diff --git a/crates/polars-plan/src/logical_plan/functions/count.rs b/crates/polars-plan/src/logical_plan/functions/count.rs index a7072d41b2a1..9042355c2ebc 100644 --- a/crates/polars-plan/src/logical_plan/functions/count.rs +++ b/crates/polars-plan/src/logical_plan/functions/count.rs @@ -21,7 +21,10 @@ use super::*; pub fn count_rows(paths: &Arc<[PathBuf]>, scan_type: &FileScan) -> PolarsResult { match scan_type { #[cfg(feature = "csv")] - FileScan::Csv { options } => { + FileScan::Csv { + options, + cloud_options, + } => { let parse_options = options.get_parse_options(); let n_rows: PolarsResult = paths .iter() diff --git a/crates/polars-plan/src/logical_plan/optimizer/slice_pushdown_lp.rs b/crates/polars-plan/src/logical_plan/optimizer/slice_pushdown_lp.rs index 755790efc2ca..9fbb83805bb8 100644 --- a/crates/polars-plan/src/logical_plan/optimizer/slice_pushdown_lp.rs +++ b/crates/polars-plan/src/logical_plan/optimizer/slice_pushdown_lp.rs @@ -168,7 +168,7 @@ impl SlicePushDown { output_schema, mut file_options, predicate, - scan_type: FileScan::Csv { options }, + scan_type: FileScan::Csv { options, cloud_options }, }, Some(state)) if predicate.is_none() && state.offset >= 0 => { file_options.n_rows = Some(state.offset as usize + state.len as usize); @@ -176,7 +176,7 @@ impl SlicePushDown { paths, file_info, output_schema, - scan_type: FileScan::Csv { options }, + scan_type: FileScan::Csv { options, cloud_options }, file_options, predicate, }; diff --git a/py-polars/polars/io/csv/functions.py b/py-polars/polars/io/csv/functions.py index d4fd03f7fffd..2930a2a93992 100644 --- a/py-polars/polars/io/csv/functions.py +++ b/py-polars/polars/io/csv/functions.py @@ -934,6 +934,8 @@ def scan_csv( truncate_ragged_lines: bool = False, decimal_comma: bool = False, glob: bool = True, + storage_options: dict[str, Any] | None = None, + retries: int = 0, ) -> LazyFrame: r""" Lazily read from a CSV file or multiple files via glob patterns. @@ -1030,6 +1032,22 @@ def scan_csv( Parse floats using a comma as the decimal separator instead of a period. glob Expand path given via globbing rules. + storage_options + Options that indicate how to connect to a cloud provider. + If the cloud provider is not supported by Polars, the storage options + are passed to `fsspec.open()`. + + The cloud providers currently supported are AWS, GCP, and Azure. + See supported keys here: + + * `aws `_ + * `gcp `_ + * `azure `_ + + If `storage_options` is not provided, Polars will try to infer the information + from environment variables. + retries + Number of retries if accessing a cloud instance fails. Returns ------- @@ -1150,6 +1168,8 @@ def with_column_names(cols: list[str]) -> list[str]: truncate_ragged_lines=truncate_ragged_lines, decimal_comma=decimal_comma, glob=glob, + retries=retries, + storage_options=storage_options, ) @@ -1182,6 +1202,8 @@ def _scan_csv_impl( truncate_ragged_lines: bool = True, decimal_comma: bool = False, glob: bool = True, + storage_options: dict[str, Any] | None = None, + retries: int = 0, ) -> LazyFrame: dtype_list: list[tuple[str, PolarsDataType]] | None = None if schema_overrides is not None: @@ -1196,6 +1218,12 @@ def _scan_csv_impl( else: sources = [] + if storage_options: + storage_options = list(storage_options.items()) # type: ignore[assignment] + else: + # Handle empty dict input + storage_options = None + pylf = PyLazyFrame.new_from_csv( source, sources, @@ -1224,5 +1252,7 @@ def _scan_csv_impl( decimal_comma=decimal_comma, schema=schema, glob=glob, + retries=retries, + cloud_options=storage_options, ) return wrap_ldf(pylf) diff --git a/py-polars/src/lazyframe/mod.rs b/py-polars/src/lazyframe/mod.rs index 41c50e2de5a9..73dbc4ccdb46 100644 --- a/py-polars/src/lazyframe/mod.rs +++ b/py-polars/src/lazyframe/mod.rs @@ -150,7 +150,7 @@ impl PyLazyFrame { #[pyo3(signature = (path, paths, separator, has_header, ignore_errors, skip_rows, n_rows, cache, overwrite_dtype, low_memory, comment_prefix, quote_char, null_values, missing_utf8_is_empty_string, infer_schema_length, with_schema_modify, rechunk, skip_rows_after_header, - encoding, row_index, try_parse_dates, eol_char, raise_if_empty, truncate_ragged_lines, decimal_comma, glob, schema + encoding, row_index, try_parse_dates, eol_char, raise_if_empty, truncate_ragged_lines, decimal_comma, glob, schema, cloud_options, retries ) )] fn new_from_csv( @@ -181,6 +181,8 @@ impl PyLazyFrame { decimal_comma: bool, glob: bool, schema: Option>, + cloud_options: Option>, + retries: usize, ) -> PyResult { let null_values = null_values.map(|w| w.0); let quote_char = quote_char.map(|s| s.as_bytes()[0]); @@ -198,6 +200,32 @@ impl PyLazyFrame { .collect::() }); + #[cfg(feature = "cloud")] + let cloud_options = { + let first_path = if let Some(path) = &path { + path + } else { + paths + .first() + .ok_or_else(|| PyValueError::new_err("expected a path argument"))? + }; + + let first_path_url = first_path.to_string_lossy(); + let mut cloud_options = cloud_options + .map(|kv| parse_cloud_options(&first_path_url, kv)) + .transpose()?; + if retries > 0 { + cloud_options = + cloud_options + .or_else(|| Some(CloudOptions::default())) + .map(|mut options| { + options.max_retries = retries; + options + }); + } + cloud_options + }; + let r = if let Some(path) = path.as_ref() { LazyCsvReader::new(path) } else { @@ -228,7 +256,8 @@ impl PyLazyFrame { .with_truncate_ragged_lines(truncate_ragged_lines) .with_decimal_comma(decimal_comma) .with_glob(glob) - .with_raise_if_empty(raise_if_empty); + .with_raise_if_empty(raise_if_empty) + .with_cloud_options(cloud_options); if let Some(lambda) = with_schema_modify { let f = |schema: Schema| {