diff --git a/Cargo.toml b/Cargo.toml index 0deca9c48..e440dcf58 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,7 +18,6 @@ [workspace] members = [ "ballista-cli", - "ballista/cache", "ballista/client", "ballista/core", "ballista/executor", diff --git a/ballista/cache/Cargo.toml b/ballista/cache/Cargo.toml deleted file mode 100644 index be89453e3..000000000 --- a/ballista/cache/Cargo.toml +++ /dev/null @@ -1,32 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -[package] -name = "ballista-cache" -version = "0.11.0" -edition = "2021" - -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - -[dependencies] -async-trait = "0.1.64" -futures = "0.3" -hashbrown = "0.14" -hashlink = "0.8.4" -log = "0.4" -parking_lot = "0.12" -tokio = { version = "1.25", features = ["macros", "parking_lot", "rt-multi-thread", "sync", "time"] } diff --git a/ballista/cache/src/backend/mod.rs b/ballista/cache/src/backend/mod.rs deleted file mode 100644 index f24119e8e..000000000 --- a/ballista/cache/src/backend/mod.rs +++ /dev/null @@ -1,73 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -pub mod policy; - -use crate::backend::policy::CachePolicy; -use std::fmt::Debug; -use std::hash::Hash; - -/// Backend to keep and manage stored entries. -/// -/// A backend might remove entries at any point, e.g. due to memory pressure or expiration. -#[derive(Debug)] -pub struct CacheBackend -where - K: Clone + Eq + Hash + Ord + Debug + Send + 'static, - V: Clone + Debug + Send + 'static, -{ - policy: Box>, -} - -impl CacheBackend -where - K: Clone + Eq + Hash + Ord + Debug + Send + 'static, - V: Clone + Debug + Send + 'static, -{ - pub fn new(policy: impl CachePolicy) -> Self { - Self { - policy: Box::new(policy), - } - } - - /// Get value for given key if it exists. - pub fn get(&mut self, k: &K) -> Option { - self.policy.get(k) - } - - /// Peek value for given key if it exists. - /// - /// In contrast to [`get`](Self::get) this will only return a value if there is a stored value. - /// This will not change the cache contents. - pub fn peek(&mut self, k: &K) -> Option { - self.policy.peek(k) - } - - /// Put value for given key. - /// - /// If a key already exists, its old value will be returned. - pub fn put(&mut self, k: K, v: V) -> Option { - self.policy.put(k, v).0 - } - - /// Remove value for given key. - /// - /// If a key does not exist, none will be returned. - pub fn remove(&mut self, k: &K) -> Option { - self.policy.remove(k) - } -} diff --git a/ballista/cache/src/backend/policy/lru/lru_cache.rs b/ballista/cache/src/backend/policy/lru/lru_cache.rs deleted file mode 100644 index 284a4a287..000000000 --- a/ballista/cache/src/backend/policy/lru/lru_cache.rs +++ /dev/null @@ -1,337 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use crate::backend::policy::lru::ResourceCounter; -use crate::backend::policy::{lru::LruCachePolicy, CachePolicy, CachePolicyPutResult}; -use hashbrown::hash_map::DefaultHashBuilder; -use hashlink::linked_hash_map::{self, IntoIter, Iter, IterMut, LinkedHashMap}; -use std::any::Any; -use std::fmt; -use std::fmt::Debug; -use std::hash::{BuildHasher, Hash}; - -pub struct LruCache -where - K: Clone + Eq + Hash + Ord + Debug + Send + 'static, - V: Clone + Debug + Send + 'static, -{ - map: LinkedHashMap, - resource_counter: Box>, -} - -impl LruCache -where - K: Clone + Eq + Hash + Ord + Debug + Send + 'static, - V: Clone + Debug + Send + 'static, -{ - pub fn with_resource_counter(resource_counter: R) -> Self - where - R: ResourceCounter, - { - LruCache { - map: LinkedHashMap::new(), - resource_counter: Box::new(resource_counter), - } - } -} - -impl LruCache -where - K: Clone + Eq + Hash + Ord + Debug + Send + 'static, - V: Clone + Debug + Send + 'static, -{ - pub fn with_resource_counter_and_hasher( - resource_counter: R, - hash_builder: H, - ) -> Self - where - R: ResourceCounter, - { - LruCache { - map: LinkedHashMap::with_hasher(hash_builder), - resource_counter: Box::new(resource_counter), - } - } - - pub fn len(&self) -> usize { - self.map.len() - } - - pub fn is_empty(&self) -> bool { - self.map.is_empty() - } - - pub fn iter(&self) -> Iter { - self.map.iter() - } - - pub fn iter_mut(&mut self) -> IterMut { - self.map.iter_mut() - } -} - -impl LruCachePolicy for LruCache -where - K: 'static + Clone + Debug + Eq + Hash + Ord + Send, - H: 'static + BuildHasher + Debug + Send, - V: 'static + Clone + Debug + Send, -{ - fn get_lru(&mut self, k: &Self::K) -> Option { - match self.map.raw_entry_mut().from_key(k) { - linked_hash_map::RawEntryMut::Occupied(mut occupied) => { - occupied.to_back(); - Some(occupied.into_mut().clone()) - } - linked_hash_map::RawEntryMut::Vacant(_) => None, - } - } - - fn put_lru( - &mut self, - k: Self::K, - v: Self::V, - ) -> CachePolicyPutResult { - let old_val = self.map.insert(k.clone(), v.clone()); - // Consume resources for (k, v) - self.resource_counter.consume(&k, &v); - // Restore resources for old (k, old_val) - if let Some(old_val) = &old_val { - self.resource_counter.restore(&k, old_val); - } - - let mut popped_entries = vec![]; - while self.resource_counter.exceed_capacity() { - if let Some(entry) = self.pop_lru() { - popped_entries.push(entry); - } - } - (old_val, popped_entries) - } - - fn pop_lru(&mut self) -> Option<(Self::K, Self::V)> { - if let Some(entry) = self.map.pop_front() { - self.resource_counter.restore(&entry.0, &entry.1); - Some(entry) - } else { - None - } - } -} - -impl CachePolicy for LruCache -where - K: 'static + Clone + Debug + Eq + Hash + Ord + Send, - H: 'static + BuildHasher + Debug + Send, - V: 'static + Clone + Debug + Send, -{ - type K = K; - type V = V; - - fn get(&mut self, k: &Self::K) -> Option { - self.get_lru(k) - } - - fn peek(&mut self, k: &Self::K) -> Option { - self.map.get(k).cloned() - } - - fn put(&mut self, k: Self::K, v: Self::V) -> CachePolicyPutResult { - self.put_lru(k, v) - } - - fn remove(&mut self, k: &Self::K) -> Option { - if let Some(v) = self.map.remove(k) { - self.resource_counter.restore(k, &v); - Some(v) - } else { - None - } - } - - fn pop(&mut self) -> Option<(Self::K, Self::V)> { - self.pop_lru() - } - - fn as_any(&self) -> &dyn Any { - self - } -} - -impl IntoIterator for LruCache -where - K: 'static + Clone + Debug + Eq + Hash + Ord + Send, - V: 'static + Clone + Debug + Send, -{ - type Item = (K, V); - type IntoIter = IntoIter; - - fn into_iter(self) -> IntoIter { - self.map.into_iter() - } -} - -impl<'a, K, V, H> IntoIterator for &'a LruCache -where - K: 'static + Clone + Debug + Eq + Hash + Ord + Send, - V: 'static + Clone + Debug + Send, -{ - type Item = (&'a K, &'a V); - type IntoIter = Iter<'a, K, V>; - - fn into_iter(self) -> Iter<'a, K, V> { - self.iter() - } -} - -impl<'a, K, V, H> IntoIterator for &'a mut LruCache -where - K: 'static + Clone + Debug + Eq + Hash + Ord + Send, - V: 'static + Clone + Debug + Send, -{ - type Item = (&'a K, &'a mut V); - type IntoIter = IterMut<'a, K, V>; - - fn into_iter(self) -> IterMut<'a, K, V> { - self.iter_mut() - } -} - -impl Debug for LruCache -where - K: 'static + Clone + Debug + Eq + Hash + Ord + Send, - V: 'static + Clone + Debug + Send, -{ - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.debug_map().entries(self.iter().rev()).finish() - } -} - -#[cfg(test)] -mod tests { - use crate::backend::policy::lru::lru_cache::LruCache; - use crate::backend::policy::lru::{DefaultResourceCounter, ResourceCounter}; - use crate::backend::policy::CachePolicy; - use hashbrown::HashMap; - - #[test] - fn test_cache_with_lru_policy() { - let mut cache = LruCache::with_resource_counter(DefaultResourceCounter::new(3)); - - cache.put("1".to_string(), "file1".to_string()); - cache.put("2".to_string(), "file2".to_string()); - cache.put("3".to_string(), "file3".to_string()); - assert_eq!(3, cache.len()); - - cache.put("4".to_string(), "file4".to_string()); - assert_eq!(3, cache.len()); - assert!(cache.peek(&"1".to_string()).is_none()); - - assert!(cache.peek(&"2".to_string()).is_some()); - let mut iter = cache.iter(); - assert_eq!("2", iter.next().unwrap().0); - assert_eq!("3", iter.next().unwrap().0); - assert_eq!("4", iter.next().unwrap().0); - - assert!(cache.get(&"2".to_string()).is_some()); - let mut iter = cache.iter(); - assert_eq!("3", iter.next().unwrap().0); - assert_eq!("4", iter.next().unwrap().0); - assert_eq!("2", iter.next().unwrap().0); - - assert_eq!(Some("file4".to_string()), cache.remove(&"4".to_string())); - - assert_eq!("3".to_string(), cache.pop().unwrap().0); - assert_eq!("2".to_string(), cache.pop().unwrap().0); - assert!(cache.pop().is_none()); - } - - #[test] - fn test_cache_with_size_resource_counter() { - let mut cache = - LruCache::with_resource_counter(get_test_size_resource_counter(50)); - - cache.put("1".to_string(), "file1".to_string()); - cache.put("2".to_string(), "file2".to_string()); - cache.put("3".to_string(), "file3".to_string()); - assert_eq!(3, cache.len()); - - cache.put("4".to_string(), "file4".to_string()); - assert_eq!(2, cache.len()); - assert!(cache.peek(&"1".to_string()).is_none()); - assert!(cache.peek(&"2".to_string()).is_none()); - - assert!(cache.peek(&"3".to_string()).is_some()); - let mut iter = cache.iter(); - assert_eq!("3", iter.next().unwrap().0); - assert_eq!("4", iter.next().unwrap().0); - - assert!(cache.get(&"3".to_string()).is_some()); - let mut iter = cache.iter(); - assert_eq!("4", iter.next().unwrap().0); - assert_eq!("3", iter.next().unwrap().0); - - cache.put("5".to_string(), "file5".to_string()); - cache.put("3".to_string(), "file3-bak".to_string()); - cache.put("1".to_string(), "file1".to_string()); - let mut iter = cache.iter(); - assert_eq!("3", iter.next().unwrap().0); - assert_eq!("1", iter.next().unwrap().0); - assert!(iter.next().is_none()); - } - - fn get_test_size_resource_counter(max_size: usize) -> TestSizeResourceCounter { - let mut size_map = HashMap::new(); - size_map.insert(("1".to_string(), "file1".to_string()), 10); - size_map.insert(("2".to_string(), "file2".to_string()), 20); - size_map.insert(("3".to_string(), "file3".to_string()), 15); - size_map.insert(("3".to_string(), "file3-bak".to_string()), 30); - size_map.insert(("4".to_string(), "file4".to_string()), 35); - size_map.insert(("5".to_string(), "file5".to_string()), 25); - - TestSizeResourceCounter { - size_map, - max_size, - current_size: 0, - } - } - - #[derive(Debug)] - struct TestSizeResourceCounter { - size_map: HashMap<(String, String), usize>, - max_size: usize, - current_size: usize, - } - - impl ResourceCounter for TestSizeResourceCounter { - type K = String; - type V = String; - - fn consume(&mut self, k: &Self::K, v: &Self::V) { - let s = self.size_map.get(&(k.clone(), v.clone())).unwrap(); - self.current_size += s; - } - - fn restore(&mut self, k: &Self::K, v: &Self::V) { - let s = self.size_map.get(&(k.clone(), v.clone())).unwrap(); - self.current_size -= s; - } - - fn exceed_capacity(&self) -> bool { - self.current_size > self.max_size - } - } -} diff --git a/ballista/cache/src/backend/policy/lru/mod.rs b/ballista/cache/src/backend/policy/lru/mod.rs deleted file mode 100644 index a9b855696..000000000 --- a/ballista/cache/src/backend/policy/lru/mod.rs +++ /dev/null @@ -1,111 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -pub mod lru_cache; - -use crate::backend::policy::CachePolicyPutResult; -use crate::backend::CachePolicy; -use std::fmt::Debug; -use std::hash::Hash; -use std::marker::PhantomData; - -pub trait LruCachePolicy: CachePolicy { - /// Retrieve the value for the given key, - /// marking it as recently used and moving it to the back of the LRU list. - fn get_lru(&mut self, k: &Self::K) -> Option; - - /// Put value for given key. - /// - /// If a key already exists, its old value will be returned. - /// - /// If necessary, will remove the value at the front of the LRU list to make room. - fn put_lru( - &mut self, - k: Self::K, - v: Self::V, - ) -> CachePolicyPutResult; - - /// Remove the least recently used entry and return it. - /// - /// If the `LruCache` is empty this will return None. - fn pop_lru(&mut self) -> Option<(Self::K, Self::V)>; -} - -pub trait ResourceCounter: Debug + Send + 'static { - /// Resource key. - type K: Clone + Eq + Hash + Ord + Debug + Send + 'static; - - /// Resource value. - type V: Clone + Debug + Send + 'static; - - /// Consume resource for a given key-value pair. - fn consume(&mut self, k: &Self::K, v: &Self::V); - - /// Return resource for a given key-value pair. - fn restore(&mut self, k: &Self::K, v: &Self::V); - - /// Check whether the current used resource exceeds the capacity - fn exceed_capacity(&self) -> bool; -} - -#[derive(Debug, Clone, Copy)] -pub struct DefaultResourceCounter -where - K: Clone + Eq + Hash + Ord + Debug + Send + 'static, - V: Clone + Debug + Send + 'static, -{ - max_num: usize, - current_num: usize, - _key_marker: PhantomData, - _value_marker: PhantomData, -} - -impl DefaultResourceCounter -where - K: Clone + Eq + Hash + Ord + Debug + Send + 'static, - V: Clone + Debug + Send + 'static, -{ - pub fn new(capacity: usize) -> Self { - Self { - max_num: capacity, - current_num: 0, - _key_marker: PhantomData, - _value_marker: PhantomData, - } - } -} - -impl ResourceCounter for DefaultResourceCounter -where - K: Clone + Eq + Hash + Ord + Debug + Send + 'static, - V: Clone + Debug + Send + 'static, -{ - type K = K; - type V = V; - - fn consume(&mut self, _k: &Self::K, _v: &Self::V) { - self.current_num += 1; - } - - fn restore(&mut self, _k: &Self::K, _v: &Self::V) { - self.current_num -= 1; - } - - fn exceed_capacity(&self) -> bool { - self.current_num > self.max_num - } -} diff --git a/ballista/cache/src/backend/policy/mod.rs b/ballista/cache/src/backend/policy/mod.rs deleted file mode 100644 index 26f17137b..000000000 --- a/ballista/cache/src/backend/policy/mod.rs +++ /dev/null @@ -1,61 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use std::any::Any; -use std::fmt::Debug; -use std::hash::Hash; - -pub mod lru; - -pub type CachePolicyPutResult = (Option, Vec<(K, V)>); - -pub trait CachePolicy: Debug + Send + 'static { - /// Cache key. - type K: Clone + Eq + Hash + Ord + Debug + Send + 'static; - - /// Cached value. - type V: Clone + Debug + Send + 'static; - - /// Get value for given key if it exists. - fn get(&mut self, k: &Self::K) -> Option; - - /// Peek value for given key if it exists. - /// - /// In contrast to [`get`](Self::get) this will only return a value if there is a stored value. - /// This will not change the cache entries. - fn peek(&mut self, k: &Self::K) -> Option; - - /// Put value for given key. - /// - /// If a key already exists, its old value will be returned. - /// - /// At the meanwhile, entries popped due to memory pressure will be returned - fn put(&mut self, k: Self::K, v: Self::V) -> CachePolicyPutResult; - - /// Remove value for given key. - /// - /// If a key does not exist, none will be returned. - fn remove(&mut self, k: &Self::K) -> Option; - - /// Remove an entry from the cache due to memory pressure or expiration. - /// - /// If the cache is empty, none will be returned. - fn pop(&mut self) -> Option<(Self::K, Self::V)>; - - /// Return backend as [`Any`] which can be used to downcast to a specific implementation. - fn as_any(&self) -> &dyn Any; -} diff --git a/ballista/cache/src/lib.rs b/ballista/cache/src/lib.rs deleted file mode 100644 index 1cba039c5..000000000 --- a/ballista/cache/src/lib.rs +++ /dev/null @@ -1,54 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use crate::backend::policy::CachePolicy; -use crate::backend::CacheBackend; -use crate::listener::{ - cache_policy::CachePolicyWithListener, loading_cache::LoadingCacheWithListener, -}; -use crate::loading_cache::{driver::CacheDriver, loader::CacheLoader}; -use std::fmt::Debug; -use std::hash::Hash; -use std::sync::Arc; - -pub mod backend; -pub mod listener; -pub mod loading_cache; -pub mod metrics; - -pub type DefaultLoadingCache = LoadingCacheWithListener>; -pub type LoadingCacheMetrics = metrics::loading_cache::Metrics; - -pub fn create_loading_cache_with_metrics( - policy: impl CachePolicy, - loader: Arc, -) -> (DefaultLoadingCache, Arc>) -where - K: Clone + Eq + Hash + Debug + Ord + Send + 'static, - V: Clone + Debug + Send + 'static, - L: CacheLoader, -{ - let metrics = Arc::new(metrics::loading_cache::Metrics::new()); - - let policy_with_metrics = CachePolicyWithListener::new(policy, vec![metrics.clone()]); - let cache_backend = CacheBackend::new(policy_with_metrics); - let loading_cache = CacheDriver::new(cache_backend, loader); - let loading_cache_with_metrics = - LoadingCacheWithListener::new(loading_cache, vec![metrics.clone()]); - - (loading_cache_with_metrics, metrics) -} diff --git a/ballista/cache/src/listener/cache_policy.rs b/ballista/cache/src/listener/cache_policy.rs deleted file mode 100644 index 83917ea89..000000000 --- a/ballista/cache/src/listener/cache_policy.rs +++ /dev/null @@ -1,133 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use crate::backend::policy::{CachePolicy, CachePolicyPutResult}; -use std::any::Any; -use std::fmt::Debug; -use std::hash::Hash; -use std::sync::Arc; - -pub trait CachePolicyListener: Debug + Send + Sync + 'static { - /// Cache key. - type K: Clone + Eq + Hash + Debug + Ord + Send + 'static; - - /// Cache value. - type V: Clone + Debug + Send + 'static; - - fn listen_on_get(&self, k: Self::K, v: Option); - - fn listen_on_peek(&self, k: Self::K, v: Option); - - fn listen_on_put(&self, k: Self::K, v: Self::V, old_v: Option); - - fn listen_on_remove(&self, k: Self::K, v: Option); - - fn listen_on_pop(&self, entry: (Self::K, Self::V)); -} - -#[derive(Debug)] -pub struct CachePolicyWithListener

-where - P: CachePolicy, -{ - inner: P, - listeners: Vec>>, -} - -impl

CachePolicyWithListener

-where - P: CachePolicy, -{ - pub fn new( - inner: P, - listeners: Vec>>, - ) -> Self { - Self { inner, listeners } - } -} - -impl

CachePolicy for CachePolicyWithListener

-where - P: CachePolicy, -{ - type K = P::K; - type V = P::V; - - fn get(&mut self, k: &Self::K) -> Option { - let v = self.inner.get(k); - - // For listeners - self.listeners - .iter() - .for_each(|listener| listener.listen_on_get(k.clone(), v.as_ref().cloned())); - - v - } - - fn peek(&mut self, k: &Self::K) -> Option { - let v = self.inner.peek(k); - - // For listeners - self.listeners - .iter() - .for_each(|listener| listener.listen_on_peek(k.clone(), v.as_ref().cloned())); - - v - } - - fn put(&mut self, k: Self::K, v: Self::V) -> CachePolicyPutResult { - let ret = self.inner.put(k.clone(), v.clone()); - - // For listeners - self.listeners.iter().for_each(|listener| { - listener.listen_on_put(k.clone(), v.clone(), ret.0.as_ref().cloned()); - ret.1 - .iter() - .for_each(|entry| listener.listen_on_pop(entry.clone())); - }); - - ret - } - - fn remove(&mut self, k: &Self::K) -> Option { - let v = self.inner.remove(k); - - // For listeners - self.listeners.iter().for_each(|listener| { - listener.listen_on_remove(k.clone(), v.as_ref().cloned()) - }); - - v - } - - fn pop(&mut self) -> Option<(Self::K, Self::V)> { - let entry = self.inner.pop(); - - // For listeners - if let Some(entry) = &entry { - self.listeners - .iter() - .for_each(|listener| listener.listen_on_pop(entry.clone())); - } - - entry - } - - fn as_any(&self) -> &dyn Any { - self - } -} diff --git a/ballista/cache/src/listener/loading_cache.rs b/ballista/cache/src/listener/loading_cache.rs deleted file mode 100644 index e54e7d831..000000000 --- a/ballista/cache/src/listener/loading_cache.rs +++ /dev/null @@ -1,197 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use crate::loading_cache::{CacheGetStatus, LoadingCache}; -use async_trait::async_trait; -use std::fmt::Debug; -use std::hash::Hash; -use std::sync::Arc; - -pub trait LoadingCacheListener: Debug + Send + Sync + 'static { - /// Cache key. - type K: Clone + Eq + Hash + Debug + Ord + Send + 'static; - - /// Cache value. - type V: Clone + Debug + Send + 'static; - - fn listen_on_get_if_present(&self, k: Self::K, v: Option); - - fn listen_on_get(&self, k: Self::K, v: Self::V, status: CacheGetStatus); - - fn listen_on_put(&self, k: Self::K, v: Self::V); - - fn listen_on_invalidate(&self, k: Self::K); - - fn listen_on_get_cancelling(&self, k: Self::K); -} - -#[derive(Debug)] -pub struct LoadingCacheWithListener -where - L: LoadingCache, -{ - inner: L, - listeners: Vec>>, -} - -impl LoadingCacheWithListener -where - L: LoadingCache, -{ - pub fn new( - inner: L, - listeners: Vec>>, - ) -> Self { - Self { inner, listeners } - } -} - -#[async_trait] -impl LoadingCache for LoadingCacheWithListener -where - L: LoadingCache, -{ - type K = L::K; - type V = L::V; - type GetExtra = L::GetExtra; - - fn get_if_present(&self, k: Self::K) -> Option { - let v = self.inner.get_if_present(k.clone()); - self.listen_on_get_if_present(k, v.as_ref().cloned()); - v - } - - async fn get_with_status( - &self, - k: Self::K, - extra: Self::GetExtra, - ) -> (Self::V, CacheGetStatus) { - let mut set_on_drop = SetGetListenerOnDrop::new(self, k.clone()); - let (v, status) = self.inner.get_with_status(k, extra).await; - set_on_drop.get_result = Some((v.clone(), status)); - (v, status) - } - - async fn put(&self, k: Self::K, v: Self::V) { - let k_captured = k.clone(); - let v_captured = v.clone(); - self.inner.put(k_captured, v_captured).await; - self.listen_on_put(k, v); - } - - fn invalidate(&self, k: Self::K) { - self.inner.invalidate(k.clone()); - self.listen_on_invalidate(k); - } -} - -struct SetGetListenerOnDrop<'a, L> -where - L: LoadingCache, -{ - listener: &'a LoadingCacheWithListener, - key: L::K, - get_result: Option<(L::V, CacheGetStatus)>, -} - -impl<'a, L> SetGetListenerOnDrop<'a, L> -where - L: LoadingCache, -{ - fn new(listener: &'a LoadingCacheWithListener, key: L::K) -> Self { - Self { - listener, - key, - get_result: None, - } - } -} - -impl<'a, L> Drop for SetGetListenerOnDrop<'a, L> -where - L: LoadingCache, -{ - fn drop(&mut self) { - if let Some((value, status)) = &self.get_result { - self.listener - .listen_on_get(self.key.clone(), value.clone(), *status) - } else { - self.listener.listen_on_get_cancelling(self.key.clone()); - } - } -} - -impl LoadingCacheListener for LoadingCacheWithListener -where - L: LoadingCache, -{ - type K = L::K; - type V = L::V; - - fn listen_on_get_if_present(&self, k: Self::K, v: Option) { - if self.listeners.len() == 1 { - self.listeners - .get(0) - .unwrap() - .listen_on_get_if_present(k, v); - } else { - self.listeners.iter().for_each(|listener| { - listener.listen_on_get_if_present(k.clone(), v.as_ref().cloned()) - }); - } - } - - fn listen_on_get(&self, k: Self::K, v: Self::V, status: CacheGetStatus) { - if self.listeners.len() == 1 { - self.listeners.get(0).unwrap().listen_on_get(k, v, status); - } else { - self.listeners.iter().for_each(|listener| { - listener.listen_on_get(k.clone(), v.clone(), status) - }); - } - } - - fn listen_on_put(&self, k: Self::K, v: Self::V) { - if self.listeners.len() == 1 { - self.listeners.get(0).unwrap().listen_on_put(k, v); - } else { - self.listeners - .iter() - .for_each(|listener| listener.listen_on_put(k.clone(), v.clone())); - } - } - - fn listen_on_invalidate(&self, k: Self::K) { - if self.listeners.len() == 1 { - self.listeners.get(0).unwrap().listen_on_invalidate(k); - } else { - self.listeners - .iter() - .for_each(|listener| listener.listen_on_invalidate(k.clone())); - } - } - - fn listen_on_get_cancelling(&self, k: Self::K) { - if self.listeners.len() == 1 { - self.listeners.get(0).unwrap().listen_on_get_cancelling(k); - } else { - self.listeners - .iter() - .for_each(|listener| listener.listen_on_get_cancelling(k.clone())); - } - } -} diff --git a/ballista/cache/src/listener/mod.rs b/ballista/cache/src/listener/mod.rs deleted file mode 100644 index 670362e1c..000000000 --- a/ballista/cache/src/listener/mod.rs +++ /dev/null @@ -1,19 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -pub mod cache_policy; -pub mod loading_cache; diff --git a/ballista/cache/src/loading_cache/cancellation_safe_future.rs b/ballista/cache/src/loading_cache/cancellation_safe_future.rs deleted file mode 100644 index 730170346..000000000 --- a/ballista/cache/src/loading_cache/cancellation_safe_future.rs +++ /dev/null @@ -1,179 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use std::{ - future::Future, - pin::Pin, - sync::Arc, - task::{Context, Poll}, -}; - -use futures::future::BoxFuture; -use parking_lot::Mutex; -use tokio::task::JoinHandle; - -/// Wrapper around a future that cannot be cancelled. -/// -/// When the future is dropped/cancelled, we'll spawn a tokio task to _rescue_ it. -pub struct CancellationSafeFuture -where - F: Future + Send + 'static, - F::Output: Send, -{ - /// Mark if the inner future finished. If not, we must spawn a helper task on drop. - done: bool, - - /// Inner future. - /// - /// Wrapped in an `Option` so we can extract it during drop. Inside that option however we also need a pinned - /// box because once this wrapper is polled, it will be pinned in memory -- even during drop. Now the inner - /// future does not necessarily implement `Unpin`, so we need a heap allocation to pin it in memory even when we - /// move it out of this option. - inner: Option>, - - /// Where to store the join handle on drop. - receiver: Arc>>>, -} - -impl Drop for CancellationSafeFuture -where - F: Future + Send + 'static, - F::Output: Send, -{ - fn drop(&mut self) { - if !self.done { - // acquire lock BEFORE checking the Arc - let mut receiver = self.receiver.lock(); - assert!(receiver.is_none()); - - // The Mutex is owned by the Arc and cannot be moved out of it. So after we acquired the lock we can safely - // check if any external party still has access to the receiver state. If not, we assume there is no - // interest in this future at all (e.g. during shutdown) and will NOT spawn it. - if Arc::strong_count(&self.receiver) > 1 { - let inner = self.inner.take().expect("Double-drop?"); - let handle = tokio::task::spawn(inner); - *receiver = Some(handle); - } - } - } -} - -impl CancellationSafeFuture -where - F: Future + Send, - F::Output: Send, -{ - /// Create new future that is protected from cancellation. - /// - /// If [`CancellationSafeFuture`] is cancelled (i.e. dropped) and there is still some external receiver of the state - /// left, than we will drive the payload (`f`) to completion. Otherwise `f` will be cancelled. - pub fn new(fut: F, receiver: Arc>>>) -> Self { - Self { - done: false, - inner: Some(Box::pin(fut)), - receiver, - } - } -} - -impl Future for CancellationSafeFuture -where - F: Future + Send, - F::Output: Send, -{ - type Output = F::Output; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - assert!(!self.done, "Polling future that already returned"); - - match self.inner.as_mut().expect("not dropped").as_mut().poll(cx) { - Poll::Ready(res) => { - self.done = true; - Poll::Ready(res) - } - Poll::Pending => Poll::Pending, - } - } -} - -#[cfg(test)] -mod tests { - use std::{ - sync::atomic::{AtomicBool, Ordering}, - time::Duration, - }; - - use tokio::sync::Barrier; - - use super::*; - - #[tokio::test] - async fn test_happy_path() { - let done = Arc::new(AtomicBool::new(false)); - let done_captured = Arc::clone(&done); - - let receiver = Default::default(); - let fut = CancellationSafeFuture::new( - async move { - done_captured.store(true, Ordering::SeqCst); - }, - receiver, - ); - - fut.await; - - assert!(done.load(Ordering::SeqCst)); - } - - #[tokio::test] - async fn test_cancel_future() { - let done = Arc::new(Barrier::new(2)); - let done_captured = Arc::clone(&done); - - let receiver = Default::default(); - let fut = CancellationSafeFuture::new( - async move { - done_captured.wait().await; - }, - Arc::clone(&receiver), - ); - - drop(fut); - - tokio::time::timeout(Duration::from_secs(5), done.wait()) - .await - .unwrap(); - } - - #[tokio::test] - async fn test_receiver_gone() { - let done = Arc::new(Barrier::new(2)); - let done_captured = Arc::clone(&done); - - let receiver = Default::default(); - let fut = CancellationSafeFuture::new( - async move { - done_captured.wait().await; - }, - receiver, - ); - - drop(fut); - - assert_eq!(Arc::strong_count(&done), 1); - } -} diff --git a/ballista/cache/src/loading_cache/driver.rs b/ballista/cache/src/loading_cache/driver.rs deleted file mode 100644 index 614c6ead3..000000000 --- a/ballista/cache/src/loading_cache/driver.rs +++ /dev/null @@ -1,573 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! Main data structure, see [`CacheDriver`]. - -use crate::backend::CacheBackend; -use crate::loading_cache::{ - cancellation_safe_future::CancellationSafeFuture, - loader::CacheLoader, - {CacheGetStatus, LoadingCache}, -}; -use async_trait::async_trait; -use futures::future::{BoxFuture, Shared}; -use futures::{FutureExt, TryFutureExt}; -use log::debug; -use parking_lot::Mutex; -use std::collections::HashMap; -use std::fmt::Debug; -use std::future::Future; -use std::hash::Hash; -use std::sync::Arc; -use tokio::{ - sync::oneshot::{error::RecvError, Sender}, - task::JoinHandle, -}; - -/// Combine a [`CacheBackend`] and a [`Loader`] into a single [`Cache`] -#[derive(Debug)] -pub struct CacheDriver -where - K: Clone + Eq + Hash + Ord + Debug + Send + 'static, - V: Clone + Debug + Send + 'static, - L: CacheLoader, -{ - state: Arc>>, - loader: Arc, -} - -impl CacheDriver -where - K: Clone + Eq + Hash + Ord + Debug + Send + 'static, - V: Clone + Debug + Send + 'static, - L: CacheLoader, -{ - /// Create new, empty cache with given loader function. - pub fn new(backend: CacheBackend, loader: Arc) -> Self { - Self { - state: Arc::new(Mutex::new(CacheState { - cached_entries: backend, - loaders: HashMap::new(), - next_loader_tag: 0, - })), - loader, - } - } -} - -#[async_trait] -impl LoadingCache for CacheDriver -where - K: Clone + Eq + Hash + Ord + Debug + Send + 'static, - V: Clone + Debug + Send + 'static, - L: CacheLoader, -{ - type K = K; - type V = V; - type GetExtra = L::Extra; - - fn get_if_present(&self, k: Self::K) -> Option { - self.state.lock().cached_entries.get(&k) - } - - async fn get_with_status( - &self, - k: Self::K, - extra: Self::GetExtra, - ) -> (Self::V, CacheGetStatus) { - // place state locking into its own scope so it doesn't leak into the generator (async - // function) - let (fut, receiver, status) = { - let mut state = self.state.lock(); - - // check if the entry has already been cached - if let Some(v) = state.cached_entries.get(&k) { - return (v, CacheGetStatus::Hit); - } - - // check if there is already a running loader for this key - if let Some(loader) = state.loaders.get(&k) { - ( - None, - loader.recv.clone(), - CacheGetStatus::MissAlreadyLoading, - ) - } else { - // generate unique tag - let loader_tag = state.next_loader_tag(); - - // requires new loader - let (fut, loader) = create_value_loader( - self.state.clone(), - self.loader.clone(), - loader_tag, - k.clone(), - extra, - ); - - let receiver = loader.recv.clone(); - state.loaders.insert(k, loader); - - (Some(fut), receiver, CacheGetStatus::Miss) - } - }; - - // try to run the loader future in this very task context to avoid spawning tokio tasks (which adds latency and - // overhead) - if let Some(fut) = fut { - fut.await; - } - - let v = retrieve_from_shared(receiver).await; - - (v, status) - } - - async fn put(&self, k: Self::K, v: Self::V) { - let maybe_join_handle = { - let mut state = self.state.lock(); - - let maybe_recv = if let Some(loader) = state.loaders.remove(&k) { - // it's OK when the receiver side is gone (likely panicked) - loader.set.send(v.clone()).ok(); - - // When we side-load data into the running task, the task does NOT modify the - // backend, so we have to do that. The reason for not letting the task feed the - // side-loaded data back into `cached_entries` is that we would need to drop the - // state lock here before the task could acquire it, leading to a lock gap. - Some(loader.recv) - } else { - None - }; - - state.cached_entries.put(k, v); - - maybe_recv - }; - - // drive running loader (if any) to completion - if let Some(recv) = maybe_join_handle { - // we do not care if the loader died (e.g. due to a panic) - recv.await.ok(); - } - } - - fn invalidate(&self, k: Self::K) { - let mut state = self.state.lock(); - - if state.loaders.remove(&k).is_some() { - debug!("Running loader for key {:?} is removed", k); - } - - state.cached_entries.remove(&k); - } -} - -impl Drop for CacheDriver -where - K: Clone + Eq + Hash + Ord + Debug + Send + 'static, - V: Clone + Debug + Send + 'static, - L: CacheLoader, -{ - fn drop(&mut self) { - for (_k, loader) in self.state.lock().loaders.drain() { - // It's unlikely that anyone is still using the shared receiver at this point, because - // `Cache::get` borrows the `self`. If it is still in use, aborting the task will - // cancel the contained future which in turn will drop the sender of the oneshot - // channel. The receivers will be notified. - let handle = loader.join_handle.lock(); - if let Some(handle) = handle.as_ref() { - handle.abort(); - } - } - } -} - -fn create_value_loader( - state: Arc>>, - loader: Arc>, - loader_tag: u64, - k: K, - extra: Extra, -) -> ( - CancellationSafeFuture>, - ValueLoader, -) -where - K: Clone + Eq + Hash + Ord + Debug + Send + 'static, - V: Clone + Debug + Send + 'static, - Extra: Debug + Send + 'static, -{ - let (tx_main, rx_main) = tokio::sync::oneshot::channel(); - let receiver = rx_main - .map_ok(|v| Arc::new(Mutex::new(v))) - .map_err(Arc::new) - .boxed() - .shared(); - let (tx_set, rx_set) = tokio::sync::oneshot::channel(); - - // need to wrap the loader into a `CancellationSafeFuture` so that it doesn't get cancelled when - // this very request is cancelled - let join_handle_receiver = Arc::new(Mutex::new(None)); - let fut = async move { - let loader_fut = async move { - let mut submitter = ResultSubmitter::new(state, k.clone(), loader_tag); - - // execute the loader - // If we panic here then `tx` will be dropped and the receivers will be - // notified. - let v = loader.load(k, extra).await; - - // remove "running" state and store result - let was_running = submitter.submit(v.clone()); - - if !was_running { - // value was side-loaded, so we cannot populate `v`. Instead block this - // execution branch and wait for `rx_set` to deliver the side-loaded - // result. - loop { - tokio::task::yield_now().await; - } - } - - v - }; - - // prefer the side-loader - let v = futures::select_biased! { - maybe_v = rx_set.fuse() => { - match maybe_v { - Ok(v) => { - // data get side-loaded via `Cache::set`. In this case, we do - // NOT modify the state because there would be a lock-gap. The - // `set` function will do that for us instead. - v - } - Err(_) => { - // sender side is gone, very likely the cache is shutting down - debug!( - "Sender for side-loading data into running loader gone.", - ); - return; - } - } - } - v = loader_fut.fuse() => v, - }; - - // broadcast result - // It's OK if the receiver side is gone. This might happen during shutdown - tx_main.send(v).ok(); - }; - let fut = CancellationSafeFuture::new(fut, Arc::clone(&join_handle_receiver)); - - ( - fut, - ValueLoader { - recv: receiver, - set: tx_set, - join_handle: join_handle_receiver, - tag: loader_tag, - }, - ) -} - -/// Inner cache state that is usually guarded by a lock. -/// -/// The state parts must be updated in a consistent manner, i.e. while using the same lock guard. -#[derive(Debug)] -struct CacheState -where - K: Clone + Eq + Hash + Ord + Debug + Send + 'static, - V: Clone + Debug + Send + 'static, -{ - /// Cached entries (i.e. queries completed). - cached_entries: CacheBackend, - - /// Currently value loaders indexed by cache key. - loaders: HashMap>, - - /// Tag used for the next value loader to distinguish loaders for the same key - /// (e.g. when starting, side-loading, starting again) - next_loader_tag: u64, -} - -impl CacheState -where - K: Clone + Eq + Hash + Ord + Debug + Send + 'static, - V: Clone + Debug + Send + 'static, -{ - /// To avoid overflow issue, it will begin from 0. It will rarely happen that - /// two value loaders share the same key and tag while for different purposes - #[inline] - fn next_loader_tag(&mut self) -> u64 { - let ret = self.next_loader_tag; - if self.next_loader_tag != u64::MAX { - self.next_loader_tag += 1; - } else { - self.next_loader_tag = 0; - } - ret - } -} - -/// State for coordinating the execution of a single value loader. -#[derive(Debug)] -struct ValueLoader { - /// A receiver that can await the result. - recv: SharedReceiver, - - /// A sender that enables setting entries while the query is running. - set: Sender, - - /// A handle for the task that is currently loading the value. - /// - /// The handle can be used to abort the loading, e.g. when dropping the cache. - join_handle: Arc>>>, - - /// Tag so that loaders for the same key (e.g. when starting, side-loading, starting again) can - /// be told apart. - tag: u64, -} - -/// A [`tokio::sync::oneshot::Receiver`] that can be cloned. -/// -/// The types are: -/// -/// - `Arc>`: Ensures that we can clone `V` without requiring `V: Sync`. At the same time -/// the reference to `V` (i.e. the `Arc`) must be cloneable for `Shared` -/// - `Arc`: Is required because `RecvError` is not `Clone` but `Shared` requires that. -/// - `BoxFuture`: The transformation from `Result` to `Result>, -/// Arc>` results in a kinda messy type and we wanna erase that. -/// - `Shared`: Allow the receiver to be cloned and be awaited from multiple places. -type SharedReceiver = - Shared>, Arc>>>; - -/// Retrieve data from shared receiver. -async fn retrieve_from_shared(receiver: SharedReceiver) -> V -where - V: Clone + Send, -{ - receiver - .await - .expect("cache loader panicked, see logs") - .lock() - .clone() -} - -/// Helper to submit results of running queries. -/// -/// Ensures that running loader is removed when dropped (e.g. during panic). -struct ResultSubmitter -where - K: Clone + Eq + Hash + Ord + Debug + Send + 'static, - V: Clone + Debug + Send + 'static, -{ - state: Arc>>, - tag: u64, - k: Option, - v: Option, -} - -impl ResultSubmitter -where - K: Clone + Eq + Hash + Ord + Debug + Send + 'static, - V: Clone + Debug + Send + 'static, -{ - fn new(state: Arc>>, k: K, tag: u64) -> Self { - Self { - state, - tag, - k: Some(k), - v: None, - } - } - - /// Submit value. - /// - /// Returns `true` if this very loader was running. - fn submit(&mut self, v: V) -> bool { - assert!(self.v.is_none()); - self.v = Some(v); - self.finalize() - } - - /// Finalize request. - /// - /// Returns `true` if this very loader was running. - fn finalize(&mut self) -> bool { - let k = self.k.take().expect("finalized twice"); - let mut state = self.state.lock(); - - match state.loaders.get(&k) { - Some(loader) if loader.tag == self.tag => { - state.loaders.remove(&k); - - if let Some(v) = self.v.take() { - // this very loader is in charge of the key, so store in in the - // underlying cache - state.cached_entries.put(k, v); - } - - true - } - _ => { - // This loader is actually not really running any longer but got - // shut down, e.g. due to side loading. Do NOT store the - // generated value in the underlying cache. - - false - } - } - } -} - -impl Drop for ResultSubmitter -where - K: Clone + Eq + Hash + Ord + Debug + Send + 'static, - V: Clone + Debug + Send + 'static, -{ - fn drop(&mut self) { - if self.k.is_some() { - // not finalized yet - self.finalize(); - } - } -} - -#[cfg(test)] -mod tests { - - use crate::backend::policy::lru::lru_cache::LruCache; - use crate::listener::cache_policy::CachePolicyListener; - use crate::{CacheBackend, CacheDriver, CacheLoader, CachePolicyWithListener}; - - use crate::backend::policy::lru::DefaultResourceCounter; - use crate::loading_cache::LoadingCache; - use async_trait::async_trait; - use parking_lot::Mutex; - use std::sync::mpsc::{channel, Sender}; - use std::sync::Arc; - - #[tokio::test] - async fn test_removal_entries() { - let cache_policy = - LruCache::with_resource_counter(DefaultResourceCounter::new(3)); - let loader = TestStringCacheLoader { - prefix: "file".to_string(), - }; - let (sender, receiver) = channel::<(String, String)>(); - let listener = Arc::new(EntryRemovalListener::new(sender)); - let policy_with_listener = - CachePolicyWithListener::new(cache_policy, vec![listener.clone()]); - let cache_backend = CacheBackend::new(policy_with_listener); - let loading_cache = CacheDriver::new(cache_backend, Arc::new(loader)); - - assert_eq!( - "file1".to_string(), - loading_cache.get("1".to_string(), ()).await - ); - assert_eq!( - "file2".to_string(), - loading_cache.get("2".to_string(), ()).await - ); - assert_eq!( - "file3".to_string(), - loading_cache.get("3".to_string(), ()).await - ); - assert_eq!( - "file4".to_string(), - loading_cache.get("4".to_string(), ()).await - ); - assert_eq!(Ok(("1".to_string(), "file1".to_string())), receiver.recv()); - assert!(loading_cache.get_if_present("1".to_string()).is_none()); - - loading_cache - .put("2".to_string(), "file2-bak".to_string()) - .await; - assert_eq!( - "file5".to_string(), - loading_cache.get("5".to_string(), ()).await - ); - assert_eq!(Ok(("3".to_string(), "file3".to_string())), receiver.recv()); - assert!(loading_cache.get_if_present("3".to_string()).is_none()); - assert!(loading_cache.get_if_present("2".to_string()).is_some()); - - loading_cache.invalidate("2".to_string()); - assert_eq!( - Ok(("2".to_string(), "file2-bak".to_string())), - receiver.recv() - ); - assert!(loading_cache.get_if_present("2".to_string()).is_none()); - } - - #[derive(Debug)] - struct EntryRemovalListener { - sender: Arc>>, - } - - impl EntryRemovalListener { - pub fn new(sender: Sender<(String, String)>) -> Self { - Self { - sender: Arc::new(Mutex::new(sender)), - } - } - } - - impl CachePolicyListener for EntryRemovalListener { - type K = String; - type V = String; - - fn listen_on_get(&self, _k: Self::K, _v: Option) { - // Do nothing - } - - fn listen_on_peek(&self, _k: Self::K, _v: Option) { - // Do nothing - } - - fn listen_on_put(&self, _k: Self::K, _v: Self::V, _old_v: Option) { - // Do nothing - } - - fn listen_on_remove(&self, k: Self::K, v: Option) { - if let Some(v) = v { - self.sender.lock().send((k, v)).unwrap(); - } - } - - fn listen_on_pop(&self, entry: (Self::K, Self::V)) { - self.sender.lock().send(entry).unwrap(); - } - } - - #[derive(Debug)] - struct TestStringCacheLoader { - prefix: String, - } - - #[async_trait] - impl CacheLoader for TestStringCacheLoader { - type K = String; - type V = String; - type Extra = (); - - async fn load(&self, k: Self::K, _extra: Self::Extra) -> Self::V { - format!("{}{k}", self.prefix) - } - } -} diff --git a/ballista/cache/src/loading_cache/loader.rs b/ballista/cache/src/loading_cache/loader.rs deleted file mode 100644 index 8987d88bd..000000000 --- a/ballista/cache/src/loading_cache/loader.rs +++ /dev/null @@ -1,52 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use async_trait::async_trait; -use std::fmt::Debug; -use std::hash::Hash; - -/// Loader for missing [`Cache`](crate::cache::Cache) entries. -#[async_trait] -pub trait CacheLoader: Debug + Send + Sync + 'static { - /// Cache key. - type K: Debug + Hash + Send + 'static; - - /// Cache value. - type V: Debug + Send + 'static; - - /// Extra data needed when loading a missing entry. Specify `()` if not needed. - type Extra: Debug + Send + 'static; - - /// Load value for given key, using the extra data if needed. - async fn load(&self, k: Self::K, extra: Self::Extra) -> Self::V; -} - -#[async_trait] -impl CacheLoader for Box> -where - K: Debug + Hash + Send + 'static, - V: Debug + Send + 'static, - Extra: Debug + Send + 'static, -{ - type K = K; - type V = V; - type Extra = Extra; - - async fn load(&self, k: Self::K, extra: Self::Extra) -> Self::V { - self.as_ref().load(k, extra).await - } -} diff --git a/ballista/cache/src/loading_cache/mod.rs b/ballista/cache/src/loading_cache/mod.rs deleted file mode 100644 index bf6f03607..000000000 --- a/ballista/cache/src/loading_cache/mod.rs +++ /dev/null @@ -1,113 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -/// It's a fork version from the influxdb_iox::cache_system[https://github.com/influxdata/influxdb_iox]. -/// Later we will propose to influxdb team to make this cache part more general. -mod cancellation_safe_future; -pub mod driver; -pub mod loader; - -use async_trait::async_trait; -use std::fmt::Debug; -use std::hash::Hash; - -/// High-level loading cache interface. -/// -/// Cache entries are manually added using get(Key, GetExtra) or put(Key, Value), -/// and are stored in the cache until either evicted or manually invalidated. -/// -/// # Concurrency -/// -/// Multiple cache requests for different keys can run at the same time. When data is requested for -/// the same key, the underlying loader will only be polled once, even when the requests are made -/// while the loader is still running. -/// -/// # Cancellation -/// -/// Canceling a [`get`](Self::get) request will NOT cancel the underlying loader. The data will -/// still be cached. -#[async_trait] -pub trait LoadingCache: Debug + Send + Sync + 'static { - /// Cache key. - type K: Clone + Eq + Hash + Debug + Ord + Send + 'static; - - /// Cache value. - type V: Clone + Debug + Send + 'static; - - /// Extra data that is provided during [`GET`](Self::get) but that is NOT part of the cache key. - type GetExtra: Debug + Send + 'static; - - /// Get value from cache. - /// - /// In contrast to [`get`](Self::get) this will only return a value if there is a stored value. - /// This will NOT start a new loading task. - fn get_if_present(&self, k: Self::K) -> Option; - - /// Get value from cache. - /// - /// Note that `extra` is only used if the key is missing from the storage backend - /// and no value loader for this key is running yet. - async fn get(&self, k: Self::K, extra: Self::GetExtra) -> Self::V { - self.get_with_status(k, extra).await.0 - } - - /// Get value from cache and the [status](CacheGetStatus). - /// - /// Note that `extra` is only used if the key is missing from the storage backend - /// and no value loader for this key is running yet. - async fn get_with_status( - &self, - k: Self::K, - extra: Self::GetExtra, - ) -> (Self::V, CacheGetStatus); - - /// Side-load an entry into the cache. If the cache previously contained a value associated with key, - /// the old value is replaced by value. - /// - /// This will also complete a currently running loader for this key. - async fn put(&self, k: Self::K, v: Self::V); - - /// Discard any cached value for the key. - /// - /// This will also interrupt a currently running loader for this key. - fn invalidate(&self, k: Self::K); -} - -/// Status of a [`Cache`] [GET](LoadingCache::get_with_status) request. -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub enum CacheGetStatus { - /// The requested entry was present in the storage backend. - Hit, - - /// The requested entry was NOT present in the storage backend and there's no running value loader. - Miss, - - /// The requested entry was NOT present in the storage backend, but there was already a running value loader for - /// this particular key. - MissAlreadyLoading, -} - -impl CacheGetStatus { - /// Get human and machine readable name. - pub fn name(&self) -> &'static str { - match self { - Self::Hit => "hit", - Self::Miss => "miss", - Self::MissAlreadyLoading => "miss_already_loading", - } - } -} diff --git a/ballista/cache/src/metrics/loading_cache.rs b/ballista/cache/src/metrics/loading_cache.rs deleted file mode 100644 index 92a2e4fc3..000000000 --- a/ballista/cache/src/metrics/loading_cache.rs +++ /dev/null @@ -1,292 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use crate::loading_cache::CacheGetStatus; -use std::fmt::Debug; -use std::hash::Hash; -use std::marker::PhantomData; - -use crate::listener::cache_policy::CachePolicyListener; -use crate::listener::loading_cache::LoadingCacheListener; -use std::sync::atomic::{AtomicU64, Ordering}; -use std::sync::Arc; - -/// Struct containing all the metrics -#[derive(Debug)] -pub struct Metrics -where - K: Clone + Eq + Hash + Ord + Debug + Send + 'static, - V: Clone + Debug + Send + 'static, -{ - get_hit_count: U64Counter, - get_miss_count: U64Counter, - get_miss_already_loading_count: U64Counter, - get_cancelled_count: U64Counter, - put_count: U64Counter, - eviction_count: U64Counter, - _key_marker: PhantomData, - _value_marker: PhantomData, -} - -impl Default for Metrics -where - K: Clone + Eq + Hash + Ord + Debug + Send + 'static, - V: Clone + Debug + Send + 'static, -{ - fn default() -> Self { - Self::new() - } -} - -impl Metrics -where - K: Clone + Eq + Hash + Ord + Debug + Send + 'static, - V: Clone + Debug + Send + 'static, -{ - pub fn new() -> Self { - Self { - get_hit_count: Default::default(), - get_miss_count: Default::default(), - get_miss_already_loading_count: Default::default(), - get_cancelled_count: Default::default(), - put_count: Default::default(), - eviction_count: Default::default(), - _key_marker: Default::default(), - _value_marker: Default::default(), - } - } - - pub fn get_hit_count(&self) -> u64 { - self.get_hit_count.fetch() - } - - pub fn get_miss_count(&self) -> u64 { - self.get_miss_count.fetch() - } - - pub fn get_miss_already_loading_count(&self) -> u64 { - self.get_miss_already_loading_count.fetch() - } - - pub fn get_cancelled_count(&self) -> u64 { - self.get_cancelled_count.fetch() - } - - pub fn put_count(&self) -> u64 { - self.put_count.fetch() - } - - pub fn eviction_count(&self) -> u64 { - self.eviction_count.fetch() - } -} - -// Since we don't store K and V directly, it will be safe. -unsafe impl Sync for Metrics -where - K: Clone + Eq + Hash + Ord + Debug + Send + 'static, - V: Clone + Debug + Send + 'static, -{ -} - -impl LoadingCacheListener for Metrics -where - K: Clone + Eq + Hash + Ord + Debug + Send + 'static, - V: Clone + Debug + Send + 'static, -{ - type K = K; - type V = V; - - fn listen_on_get_if_present(&self, _k: Self::K, v: Option) { - if v.is_some() { - &self.get_hit_count - } else { - &self.get_miss_count - } - .inc(1); - } - - fn listen_on_get(&self, _k: Self::K, _v: Self::V, status: CacheGetStatus) { - match status { - CacheGetStatus::Hit => &self.get_hit_count, - - CacheGetStatus::Miss => &self.get_miss_count, - - CacheGetStatus::MissAlreadyLoading => &self.get_miss_already_loading_count, - } - .inc(1); - } - - fn listen_on_put(&self, _k: Self::K, _v: Self::V) { - // Do nothing - } - - fn listen_on_invalidate(&self, _k: Self::K) { - // Do nothing - } - - fn listen_on_get_cancelling(&self, _k: Self::K) { - self.get_cancelled_count.inc(1); - } -} - -impl CachePolicyListener for Metrics -where - K: Clone + Eq + Hash + Ord + Debug + Send + 'static, - V: Clone + Debug + Send + 'static, -{ - type K = K; - type V = V; - - fn listen_on_get(&self, _k: Self::K, _v: Option) { - // Do nothing - } - - fn listen_on_peek(&self, _k: Self::K, _v: Option) { - // Do nothing - } - - fn listen_on_put(&self, _k: Self::K, _v: Self::V, _old_v: Option) { - self.put_count.inc(1); - } - - fn listen_on_remove(&self, _k: Self::K, _v: Option) { - self.eviction_count.inc(1); - } - - fn listen_on_pop(&self, _entry: (Self::K, Self::V)) { - self.eviction_count.inc(1); - } -} - -/// A monotonic counter -#[derive(Debug, Clone, Default)] -pub struct U64Counter { - counter: Arc, -} - -impl U64Counter { - pub fn inc(&self, count: u64) { - self.counter.fetch_add(count, Ordering::Relaxed); - } - - pub fn fetch(&self) -> u64 { - self.counter.load(Ordering::Relaxed) - } -} - -#[cfg(test)] -mod tests { - use crate::backend::policy::lru::lru_cache::LruCache; - use crate::backend::policy::lru::DefaultResourceCounter; - use crate::create_loading_cache_with_metrics; - use crate::loading_cache::loader::CacheLoader; - use crate::loading_cache::LoadingCache; - use async_trait::async_trait; - use std::sync::Arc; - - #[tokio::test] - async fn test_metrics() { - let cache_policy = - LruCache::with_resource_counter(DefaultResourceCounter::new(3)); - let loader = TestStringCacheLoader { - prefix: "file".to_string(), - }; - let (loading_cache, metrics) = - create_loading_cache_with_metrics(cache_policy, Arc::new(loader)); - - assert_eq!( - "file1".to_string(), - loading_cache.get("1".to_string(), ()).await - ); - assert_eq!( - "file2".to_string(), - loading_cache.get("2".to_string(), ()).await - ); - assert_eq!( - "file3".to_string(), - loading_cache.get("3".to_string(), ()).await - ); - assert_eq!(3, metrics.get_miss_count()); - - assert_eq!( - "file4".to_string(), - loading_cache.get("4".to_string(), ()).await - ); - assert_eq!(0, metrics.get_hit_count()); - assert_eq!(4, metrics.get_miss_count()); - assert_eq!(4, metrics.put_count()); - assert_eq!(1, metrics.eviction_count()); - - assert!(loading_cache.get_if_present("1".to_string()).is_none()); - assert_eq!(0, metrics.get_hit_count()); - assert_eq!(5, metrics.get_miss_count()); - assert_eq!(4, metrics.put_count()); - assert_eq!(1, metrics.eviction_count()); - - loading_cache - .put("2".to_string(), "file2-bak".to_string()) - .await; - assert_eq!(0, metrics.get_hit_count()); - assert_eq!(5, metrics.get_miss_count()); - assert_eq!(5, metrics.put_count()); - assert_eq!(1, metrics.eviction_count()); - - assert_eq!( - "file5".to_string(), - loading_cache.get("5".to_string(), ()).await - ); - assert_eq!(0, metrics.get_hit_count()); - assert_eq!(6, metrics.get_miss_count()); - assert_eq!(6, metrics.put_count()); - assert_eq!(2, metrics.eviction_count()); - - assert!(loading_cache.get_if_present("3".to_string()).is_none()); - assert_eq!(0, metrics.get_hit_count()); - assert_eq!(7, metrics.get_miss_count()); - assert_eq!(6, metrics.put_count()); - assert_eq!(2, metrics.eviction_count()); - - assert!(loading_cache.get_if_present("2".to_string()).is_some()); - assert_eq!(1, metrics.get_hit_count()); - assert_eq!(7, metrics.get_miss_count()); - assert_eq!(6, metrics.put_count()); - assert_eq!(2, metrics.eviction_count()); - - loading_cache.invalidate("2".to_string()); - assert_eq!(1, metrics.get_hit_count()); - assert_eq!(7, metrics.get_miss_count()); - assert_eq!(6, metrics.put_count()); - assert_eq!(3, metrics.eviction_count()); - } - - #[derive(Debug)] - struct TestStringCacheLoader { - prefix: String, - } - - #[async_trait] - impl CacheLoader for TestStringCacheLoader { - type K = String; - type V = String; - type Extra = (); - - async fn load(&self, k: Self::K, _extra: Self::Extra) -> Self::V { - format!("{}{k}", self.prefix) - } - } -} diff --git a/ballista/cache/src/metrics/mod.rs b/ballista/cache/src/metrics/mod.rs deleted file mode 100644 index c1b6bcbbb..000000000 --- a/ballista/cache/src/metrics/mod.rs +++ /dev/null @@ -1,18 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -pub mod loading_cache; diff --git a/ballista/core/Cargo.toml b/ballista/core/Cargo.toml index d8d4bfcc5..55ac29815 100644 --- a/ballista/core/Cargo.toml +++ b/ballista/core/Cargo.toml @@ -47,7 +47,6 @@ simd = ["datafusion/simd"] ahash = { version = "0.8", default-features = false } arrow-flight = { workspace = true } async-trait = "0.1.41" -ballista-cache = { path = "../cache", version = "0.11.0" } bytes = "1.0" chrono = { version = "0.4", default-features = false } clap = { version = "3", features = ["derive", "cargo"] } diff --git a/ballista/core/src/cache_layer/medium/local_disk.rs b/ballista/core/src/cache_layer/medium/local_disk.rs deleted file mode 100644 index e30d80893..000000000 --- a/ballista/core/src/cache_layer/medium/local_disk.rs +++ /dev/null @@ -1,69 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use crate::cache_layer::medium::CacheMedium; -use crate::cache_layer::object_store::ObjectStoreWithKey; -use object_store::local::LocalFileSystem; -use object_store::path::{Path, DELIMITER}; -use object_store::ObjectStore; -use std::any::Any; -use std::fmt::{Display, Formatter}; -use std::sync::Arc; - -#[derive(Debug, Clone)] -pub struct LocalDiskMedium { - cache_object_store: Arc, - root_cache_dir: Path, -} - -impl LocalDiskMedium { - pub fn new(root_cache_dir: String) -> Self { - Self { - cache_object_store: Arc::new(LocalFileSystem::new()), - root_cache_dir: Path::from(root_cache_dir), - } - } -} - -impl Display for LocalDiskMedium { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - write!(f, "Cache medium with local disk({})", self.root_cache_dir) - } -} - -impl CacheMedium for LocalDiskMedium { - fn as_any(&self) -> &dyn Any { - self - } - - fn get_object_store(&self) -> Arc { - self.cache_object_store.clone() - } - - fn get_mapping_location( - &self, - source_location: &Path, - source_object_store: &ObjectStoreWithKey, - ) -> Path { - let cache_location = format!( - "{}{DELIMITER}{}{DELIMITER}{source_location}", - self.root_cache_dir, - source_object_store.key(), - ); - Path::from(cache_location) - } -} diff --git a/ballista/core/src/cache_layer/medium/local_memory.rs b/ballista/core/src/cache_layer/medium/local_memory.rs deleted file mode 100644 index 2b89392e1..000000000 --- a/ballista/core/src/cache_layer/medium/local_memory.rs +++ /dev/null @@ -1,73 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use crate::cache_layer::medium::CacheMedium; -use crate::cache_layer::object_store::ObjectStoreWithKey; -use object_store::memory::InMemory; -use object_store::path::{Path, DELIMITER}; -use object_store::ObjectStore; -use std::any::Any; -use std::fmt::{Display, Formatter}; -use std::sync::Arc; - -#[derive(Debug, Clone)] -pub struct LocalMemoryMedium { - cache_object_store: Arc, -} - -impl LocalMemoryMedium { - pub fn new() -> Self { - Self { - cache_object_store: Arc::new(InMemory::new()), - } - } -} - -impl Default for LocalMemoryMedium { - fn default() -> Self { - Self::new() - } -} - -impl Display for LocalMemoryMedium { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - write!(f, "Cache medium with local memory") - } -} - -impl CacheMedium for LocalMemoryMedium { - fn as_any(&self) -> &dyn Any { - self - } - - fn get_object_store(&self) -> Arc { - self.cache_object_store.clone() - } - - fn get_mapping_location( - &self, - source_location: &Path, - source_object_store: &ObjectStoreWithKey, - ) -> Path { - let cache_location = format!( - "{}{DELIMITER}{}{DELIMITER}{source_location}", - "memory", - source_object_store.key(), - ); - Path::from(cache_location) - } -} diff --git a/ballista/core/src/cache_layer/medium/mod.rs b/ballista/core/src/cache_layer/medium/mod.rs deleted file mode 100644 index f2082e23b..000000000 --- a/ballista/core/src/cache_layer/medium/mod.rs +++ /dev/null @@ -1,42 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use crate::cache_layer::object_store::ObjectStoreWithKey; -use object_store::path::Path; -use object_store::ObjectStore; -use std::any::Any; -use std::fmt::{Debug, Display}; -use std::sync::Arc; - -pub mod local_disk; -pub mod local_memory; - -pub trait CacheMedium: Debug + Send + Sync + Display + 'static { - /// Returns the cache layer policy as [`Any`](std::any::Any) so that it can be - /// downcast to a specific implementation. - fn as_any(&self) -> &dyn Any; - - /// Get the ObjectStore for the cache storage - fn get_object_store(&self) -> Arc; - - /// Get the mapping location on the cache ObjectStore for the source location on the source ObjectStore - fn get_mapping_location( - &self, - source_location: &Path, - source_object_store: &ObjectStoreWithKey, - ) -> Path; -} diff --git a/ballista/core/src/cache_layer/mod.rs b/ballista/core/src/cache_layer/mod.rs deleted file mode 100644 index 86e52395f..000000000 --- a/ballista/core/src/cache_layer/mod.rs +++ /dev/null @@ -1,128 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use crate::cache_layer::medium::local_disk::LocalDiskMedium; -use crate::cache_layer::medium::local_memory::LocalMemoryMedium; -use crate::cache_layer::policy::file::FileCacheLayer; -use std::sync::Arc; - -pub mod medium; -pub mod object_store; -pub mod policy; - -#[derive(Debug, Clone)] -pub enum CacheLayer { - /// The local disk will be used as the cache layer medium - /// and the cache level will be the whole file. - LocalDiskFile(Arc>), - - /// The local memory will be used as the cache layer medium - /// and the cache level will be the whole file. - LocalMemoryFile(Arc>), -} - -#[cfg(test)] -mod tests { - use ballista_cache::loading_cache::LoadingCache; - use futures::TryStreamExt; - use object_store::local::LocalFileSystem; - use object_store::path::Path; - use object_store::{GetResultPayload, ObjectStore}; - use std::io::Write; - use std::sync::Arc; - use tempfile::NamedTempFile; - - use crate::cache_layer::medium::local_memory::LocalMemoryMedium; - use crate::cache_layer::object_store::file::FileCacheObjectStore; - use crate::cache_layer::object_store::ObjectStoreWithKey; - use crate::cache_layer::policy::file::FileCacheLayer; - use crate::error::{BallistaError, Result}; - - #[tokio::test] - async fn test_cache_file_to_memory() -> Result<()> { - let test_data = "test_cache_file_to_memory"; - let test_bytes = test_data.as_bytes(); - - let mut test_file = NamedTempFile::new()?; - let source_location = Path::from(test_file.as_ref().to_str().unwrap()); - - let test_size = test_file.write(test_bytes)?; - assert_eq!(test_bytes.len(), test_size); - - // Check the testing data on the source object store - let source_object_store = Arc::new(LocalFileSystem::new()); - let source_key = "file"; - let source_object_store_with_key = Arc::new(ObjectStoreWithKey::new( - source_key.to_string(), - source_object_store.clone(), - )); - let actual_source = source_object_store.get(&source_location).await.unwrap(); - match actual_source.payload { - GetResultPayload::File(file, _) => { - assert_eq!(test_bytes.len(), file.metadata()?.len() as usize); - } - _ => { - return Err(BallistaError::General( - "File instead of data stream should be returned".to_string(), - )) - } - } - - // Check the testing data on the cache object store - let cache_medium = LocalMemoryMedium::new(); - let cache_layer = FileCacheLayer::new(1000, 1, cache_medium); - let cache_meta = cache_layer - .cache() - .get( - source_location.clone(), - source_object_store_with_key.clone(), - ) - .await; - assert_eq!(test_bytes.len(), cache_meta.size); - - let cache_object_store = FileCacheObjectStore::new( - Arc::new(cache_layer), - source_object_store_with_key.clone(), - ); - let actual_cache = cache_object_store.get(&source_location).await.unwrap(); - match actual_cache.payload { - GetResultPayload::File(_, _) => { - return Err(BallistaError::General( - "Data stream instead of file should be returned".to_string(), - )) - } - GetResultPayload::Stream(s) => { - let mut buf: Vec = vec![]; - s.try_fold(&mut buf, |acc, part| async move { - let mut part: Vec = part.into(); - acc.append(&mut part); - Ok(acc) - }) - .await - .unwrap(); - let actual_cache_data = String::from_utf8(buf).unwrap(); - assert_eq!(test_data, actual_cache_data); - } - } - - test_file.close()?; - - std::mem::forget(cache_object_store); - - Ok(()) - } -} diff --git a/ballista/core/src/cache_layer/object_store/file.rs b/ballista/core/src/cache_layer/object_store/file.rs deleted file mode 100644 index 169d2b5ce..000000000 --- a/ballista/core/src/cache_layer/object_store/file.rs +++ /dev/null @@ -1,268 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use crate::cache_layer::medium::CacheMedium; -use crate::cache_layer::object_store::ObjectStoreWithKey; -use crate::cache_layer::policy::file::FileCacheLayer; -use crate::error::BallistaError; -use async_trait::async_trait; -use ballista_cache::loading_cache::LoadingCache; -use bytes::Bytes; -use futures::stream::{self, BoxStream, StreamExt}; -use log::info; -use object_store::path::Path; -use object_store::{ - Error, GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, - PutOptions, PutResult, -}; -use std::fmt::{Debug, Display, Formatter}; -use std::ops::Range; -use std::sync::Arc; -use tokio::io::AsyncWrite; - -#[derive(Debug)] -pub struct FileCacheObjectStore -where - M: CacheMedium, -{ - cache_layer: Arc>, - inner: Arc, -} - -impl FileCacheObjectStore -where - M: CacheMedium, -{ - pub fn new( - cache_layer: Arc>, - inner: Arc, - ) -> Self { - Self { cache_layer, inner } - } -} - -impl Display for FileCacheObjectStore -where - M: CacheMedium, -{ - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - write!( - f, - "Object store {} with file level cache on {}", - self.inner, - self.cache_layer.cache_store() - ) - } -} - -#[async_trait] -impl ObjectStore for FileCacheObjectStore -where - M: CacheMedium, -{ - async fn put( - &self, - _location: &Path, - _bytes: Bytes, - ) -> object_store::Result { - Err(Error::NotSupported { - source: Box::new(BallistaError::General( - "Write path is not supported".to_string(), - )), - }) - } - - async fn put_opts( - &self, - _location: &Path, - _bytes: Bytes, - _opts: PutOptions, - ) -> object_store::Result { - Err(Error::NotSupported { - source: Box::new(BallistaError::General( - "Write path is not supported".to_string(), - )), - }) - } - - async fn put_multipart( - &self, - _location: &Path, - ) -> object_store::Result<(MultipartId, Box)> { - Err(Error::NotSupported { - source: Box::new(BallistaError::General( - "Write path is not supported".to_string(), - )), - }) - } - - async fn abort_multipart( - &self, - _location: &Path, - _multipart_id: &MultipartId, - ) -> object_store::Result<()> { - Err(Error::NotSupported { - source: Box::new(BallistaError::General( - "Write path is not supported".to_string(), - )), - }) - } - - /// If it already exists in cache, use the cached result. - /// Otherwise, trigger a task to load the data into cache; At the meanwhile, - /// get the result from the data source - async fn get(&self, location: &Path) -> object_store::Result { - if let Some(cache_object_mata) = - self.cache_layer.cache().get_if_present(location.clone()) - { - info!("Data for {} is cached", location); - let cache_location = &cache_object_mata.location; - self.cache_layer.cache_store().get(cache_location).await - } else { - let io_runtime = self.cache_layer.io_runtime(); - let cache_layer = self.cache_layer.clone(); - let key = location.clone(); - let extra = self.inner.clone(); - io_runtime.spawn(async move { - info!("Going to cache data for {}", key); - cache_layer.cache().get(key.clone(), extra).await; - info!("Data for {} has been cached", key); - }); - self.inner.get(location).await - } - } - - async fn get_opts( - &self, - location: &Path, - options: GetOptions, - ) -> object_store::Result { - if let Some(cache_object_mata) = - self.cache_layer.cache().get_if_present(location.clone()) - { - info!("Data for {} is cached", location); - let cache_location = &cache_object_mata.location; - self.cache_layer - .cache_store() - .get_opts(cache_location, options) - .await - } else { - let io_runtime = self.cache_layer.io_runtime(); - let cache_layer = self.cache_layer.clone(); - let key = location.clone(); - let extra = self.inner.clone(); - io_runtime.spawn(async move { - info!("Going to cache data for {}", key); - cache_layer.cache().get(key.clone(), extra).await; - info!("Data for {} has been cached", key); - }); - self.inner.get_opts(location, options).await - } - } - - /// If it already exists in cache, use the cached result. - /// Otherwise, trigger a task to load the data into cache; At the meanwhile, - /// get the result from the data source - async fn get_range( - &self, - location: &Path, - range: Range, - ) -> object_store::Result { - if let Some(cache_object_mata) = - self.cache_layer.cache().get_if_present(location.clone()) - { - info!("Data for {} is cached", location); - let cache_location = &cache_object_mata.location; - self.cache_layer - .cache_store() - .get_range(cache_location, range) - .await - } else { - let io_runtime = self.cache_layer.io_runtime(); - let cache_layer = self.cache_layer.clone(); - let key = location.clone(); - let extra = self.inner.clone(); - io_runtime.spawn(async move { - info!("Going to cache data for {}", key); - cache_layer.cache().get(key.clone(), extra).await; - info!("Data for {} has been cached", key); - }); - self.inner.get_range(location, range).await - } - } - - /// If it already exists in cache, use the cached result. - /// Otherwise, get the result from the data source. - /// It will not trigger the task to load data into cache. - async fn head(&self, location: &Path) -> object_store::Result { - if let Some(cache_object_mata) = - self.cache_layer.cache().get_if_present(location.clone()) - { - let cache_location = &cache_object_mata.location; - self.cache_layer.cache_store().head(cache_location).await - } else { - self.inner.head(location).await - } - } - - async fn delete(&self, _location: &Path) -> object_store::Result<()> { - Err(Error::NotSupported { - source: Box::new(BallistaError::General( - "Delete is not supported".to_string(), - )), - }) - } - - fn list( - &self, - _prefix: Option<&Path>, - ) -> BoxStream<'_, object_store::Result> { - stream::once(async { - Err(Error::NotSupported { - source: Box::new(BallistaError::General( - "List is not supported".to_string(), - )), - }) - }) - .boxed() - } - - async fn list_with_delimiter( - &self, - _prefix: Option<&Path>, - ) -> object_store::Result { - Err(Error::NotSupported { - source: Box::new(BallistaError::General("List is not supported".to_string())), - }) - } - - async fn copy(&self, _from: &Path, _to: &Path) -> object_store::Result<()> { - Err(Error::NotSupported { - source: Box::new(BallistaError::General("Copy is not supported".to_string())), - }) - } - - async fn copy_if_not_exists( - &self, - _from: &Path, - _to: &Path, - ) -> object_store::Result<()> { - Err(Error::NotSupported { - source: Box::new(BallistaError::General("Copy is not supported".to_string())), - }) - } -} diff --git a/ballista/core/src/cache_layer/object_store/mod.rs b/ballista/core/src/cache_layer/object_store/mod.rs deleted file mode 100644 index 6d754ecaf..000000000 --- a/ballista/core/src/cache_layer/object_store/mod.rs +++ /dev/null @@ -1,169 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -pub mod file; - -use async_trait::async_trait; -use bytes::Bytes; -use futures::stream::BoxStream; -use object_store::path::Path; -use object_store::{ - GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, PutOptions, - PutResult, -}; -use std::fmt::{Debug, Display, Formatter}; -use std::ops::Range; -use std::sync::Arc; -use tokio::io::AsyncWrite; - -/// An [`ObjectStore`] wrapper with a specific key which is used for registration in [`ObjectStoreRegistry`]. -/// -/// The [`key`] can be used for the cache path prefix. -#[derive(Debug)] -pub struct ObjectStoreWithKey { - key: String, - inner: Arc, -} - -impl ObjectStoreWithKey { - pub fn new(key: String, inner: Arc) -> Self { - Self { key, inner } - } - - pub fn key(&self) -> &str { - &self.key - } -} - -impl Display for ObjectStoreWithKey { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - write!( - f, - "Registered object store {} with key {}", - self.inner, self.key, - ) - } -} - -#[async_trait] -impl ObjectStore for ObjectStoreWithKey { - async fn put( - &self, - location: &Path, - bytes: Bytes, - ) -> object_store::Result { - self.inner.put(location, bytes).await - } - - async fn put_opts( - &self, - location: &Path, - bytes: Bytes, - opts: PutOptions, - ) -> object_store::Result { - self.inner.put_opts(location, bytes, opts).await - } - - async fn put_multipart( - &self, - location: &Path, - ) -> object_store::Result<(MultipartId, Box)> { - self.inner.put_multipart(location).await - } - - async fn abort_multipart( - &self, - location: &Path, - multipart_id: &MultipartId, - ) -> object_store::Result<()> { - self.inner.abort_multipart(location, multipart_id).await - } - - async fn get(&self, location: &Path) -> object_store::Result { - self.inner.get(location).await - } - - async fn get_opts( - &self, - location: &Path, - options: GetOptions, - ) -> object_store::Result { - self.inner.get_opts(location, options).await - } - - async fn get_range( - &self, - location: &Path, - range: Range, - ) -> object_store::Result { - self.inner.get_range(location, range).await - } - - async fn get_ranges( - &self, - location: &Path, - ranges: &[Range], - ) -> object_store::Result> { - self.inner.get_ranges(location, ranges).await - } - - async fn head(&self, location: &Path) -> object_store::Result { - self.inner.head(location).await - } - - async fn delete(&self, location: &Path) -> object_store::Result<()> { - self.inner.delete(location).await - } - - fn list( - &self, - prefix: Option<&Path>, - ) -> BoxStream<'_, object_store::Result> { - self.inner.list(prefix) - } - - async fn list_with_delimiter( - &self, - prefix: Option<&Path>, - ) -> object_store::Result { - self.inner.list_with_delimiter(prefix).await - } - - async fn copy(&self, from: &Path, to: &Path) -> object_store::Result<()> { - self.inner.copy(from, to).await - } - - async fn rename(&self, from: &Path, to: &Path) -> object_store::Result<()> { - self.inner.rename(from, to).await - } - - async fn copy_if_not_exists( - &self, - from: &Path, - to: &Path, - ) -> object_store::Result<()> { - self.inner.copy_if_not_exists(from, to).await - } - - async fn rename_if_not_exists( - &self, - from: &Path, - to: &Path, - ) -> object_store::Result<()> { - self.inner.rename_if_not_exists(from, to).await - } -} diff --git a/ballista/core/src/cache_layer/policy/file.rs b/ballista/core/src/cache_layer/policy/file.rs deleted file mode 100644 index dfc6c83fc..000000000 --- a/ballista/core/src/cache_layer/policy/file.rs +++ /dev/null @@ -1,299 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use crate::cache_layer::medium::CacheMedium; -use crate::cache_layer::object_store::ObjectStoreWithKey; -use crate::error::{BallistaError, Result}; -use async_trait::async_trait; -use ballista_cache::backend::policy::lru::lru_cache::LruCache; -use ballista_cache::backend::policy::lru::ResourceCounter; -use ballista_cache::listener::cache_policy::{ - CachePolicyListener, CachePolicyWithListener, -}; -use ballista_cache::loading_cache::loader::CacheLoader; -use ballista_cache::{ - create_loading_cache_with_metrics, DefaultLoadingCache, LoadingCacheMetrics, -}; -use log::{error, info, warn}; -use object_store::path::Path; -use object_store::{ObjectMeta, ObjectStore}; -use std::ops::Range; -use std::sync::Arc; -use tokio::runtime::Runtime; - -type DefaultFileLoadingCache = - DefaultLoadingCache>; -type FileCacheMetrics = LoadingCacheMetrics; - -#[derive(Debug)] -pub struct FileCacheLayer -where - M: CacheMedium, -{ - cache_store: Arc, - loading_cache: DefaultFileLoadingCache, - io_runtime: Runtime, - metrics: Arc, -} - -impl FileCacheLayer -where - M: CacheMedium, -{ - pub fn new(capacity: usize, cache_io_concurrency: u32, cache_medium: M) -> Self { - let cache_store = cache_medium.get_object_store(); - - let cache_counter = FileCacheCounter::new(capacity); - let lru_cache = LruCache::with_resource_counter(cache_counter); - let file_cache_loader = Arc::new(FileCacheLoader::new(cache_medium)); - let cache_with_removal_listener = - CachePolicyWithListener::new(lru_cache, vec![file_cache_loader.clone()]); - let (loading_cache, metrics) = create_loading_cache_with_metrics( - cache_with_removal_listener, - file_cache_loader, - ); - let io_runtime = tokio::runtime::Builder::new_multi_thread() - .enable_all() - .thread_name("loading_cache") - .worker_threads(cache_io_concurrency as usize) - .build() - .expect("Creating tokio runtime"); - - Self { - cache_store, - loading_cache, - io_runtime, - metrics, - } - } - - pub fn cache_store(&self) -> Arc { - self.cache_store.clone() - } - - pub fn cache(&self) -> &DefaultFileLoadingCache { - &self.loading_cache - } - - pub fn io_runtime(&self) -> &Runtime { - &self.io_runtime - } - - pub fn metrics(&self) -> &FileCacheMetrics { - self.metrics.as_ref() - } -} - -#[derive(Debug)] -pub struct FileCacheLoader -where - M: CacheMedium, -{ - cache_medium: Arc, -} - -impl FileCacheLoader -where - M: CacheMedium, -{ - fn new(cache_medium: M) -> Self { - Self { - cache_medium: Arc::new(cache_medium), - } - } - - fn remove_object(&self, source_path: Path, object_meta: ObjectMeta) { - let cache_store = self.cache_medium.get_object_store(); - let location = object_meta.location; - tokio::runtime::Handle::try_current().unwrap().block_on( async { - if let Err(e) = cache_store.delete(&location).await { - error!("Fail to delete file {location} on the cache ObjectStore for source {source_path} due to {e}"); - } - }); - } -} - -/// Will return the location of the cached file on the cache object store. -/// -/// The last_modified of the ObjectMeta will be from the source file, which will be useful -/// for checking whether the source file changed or not. -/// -/// The size will be the one of cached file rather than the one of the source file in case of changing the data format -async fn load_object( - cache_medium: Arc, - source_location: Path, - source_store: &ObjectStoreWithKey, -) -> Result -where - M: CacheMedium, -{ - let source_meta = source_store.head(&source_location).await.map_err(|e| { - BallistaError::General(format!( - "Fail to read head info for {source_location} due to {e}" - )) - })?; - - let cache_store = cache_medium.get_object_store(); - let cache_location = - cache_medium.get_mapping_location(&source_location, source_store); - - // Check whether the cache location exist or not. If exists, delete it first. - if cache_store.head(&cache_location).await.is_ok() { - if let Err(e) = cache_store.delete(&cache_location).await { - error!( - "Fail to delete file {cache_location} on the cache ObjectStore due to {e}" - ); - } - } - - info!( - "Going to cache object from {} to {}", - source_location, cache_location - ); - let range = Range { - start: 0, - end: source_meta.size, - }; - let data = source_store - .get_range(&source_location, range) - .await - .map_err(|e| { - BallistaError::General(format!( - "Fail to get file data from {source_location} due to {e}" - )) - })?; - info!( - "{} bytes will be cached for {}", - data.len(), - source_location - ); - cache_store.put(&cache_location, data).await.map_err(|e| { - BallistaError::General(format!( - "Fail to write out data to {cache_location} due to {e}" - )) - })?; - info!( - "Object {} has already been cached to {}", - source_location, cache_location - ); - - let cache_meta = cache_store.head(&cache_location).await.map_err(|e| { - BallistaError::General(format!( - "Fail to read head info for {cache_location} due to {e}" - )) - })?; - - Ok(ObjectMeta { - location: cache_location, - last_modified: source_meta.last_modified, - size: cache_meta.size, - e_tag: source_meta.e_tag, - version: None, - }) -} - -#[async_trait] -impl CacheLoader for FileCacheLoader -where - M: CacheMedium, -{ - type K = Path; - type V = ObjectMeta; - type Extra = Arc; - - async fn load(&self, source_location: Self::K, source_store: Self::Extra) -> Self::V { - match load_object(self.cache_medium.clone(), source_location, &source_store).await - { - Ok(object_meta) => object_meta, - Err(e) => panic!("{}", e), - } - } -} - -impl CachePolicyListener for FileCacheLoader -where - M: CacheMedium, -{ - type K = Path; - type V = ObjectMeta; - - fn listen_on_get(&self, _k: Self::K, _v: Option) { - // Do nothing - } - - fn listen_on_peek(&self, _k: Self::K, _v: Option) { - // Do nothing - } - - fn listen_on_put(&self, _k: Self::K, _v: Self::V, _old_v: Option) { - // Do nothing - } - - fn listen_on_remove(&self, k: Self::K, v: Option) { - if let Some(v) = v { - self.remove_object(k, v); - } else { - warn!("The entry does not exist for key {k}"); - } - } - - fn listen_on_pop(&self, entry: (Self::K, Self::V)) { - self.remove_object(entry.0, entry.1); - } -} - -#[derive(Debug, Clone, Copy)] -pub struct FileCacheCounter { - /// The maximum data size to be cached - capacity: usize, - /// The data size already be cached - cached_size: usize, -} - -impl FileCacheCounter { - pub fn new(capacity: usize) -> Self { - FileCacheCounter { - capacity, - cached_size: 0, - } - } - - pub fn capacity(&self) -> usize { - self.capacity - } - - pub fn cached_size(&self) -> usize { - self.cached_size - } -} - -impl ResourceCounter for FileCacheCounter { - type K = Path; - type V = ObjectMeta; - - fn consume(&mut self, _k: &Self::K, v: &Self::V) { - self.cached_size += v.size; - } - - fn restore(&mut self, _k: &Self::K, v: &Self::V) { - self.cached_size -= v.size; - } - - fn exceed_capacity(&self) -> bool { - self.cached_size > self.capacity - } -} diff --git a/ballista/core/src/cache_layer/policy/mod.rs b/ballista/core/src/cache_layer/policy/mod.rs deleted file mode 100644 index 0cb4c86b3..000000000 --- a/ballista/core/src/cache_layer/policy/mod.rs +++ /dev/null @@ -1,18 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -pub mod file; diff --git a/ballista/core/src/lib.rs b/ballista/core/src/lib.rs index 6a5d8772d..973756be0 100644 --- a/ballista/core/src/lib.rs +++ b/ballista/core/src/lib.rs @@ -22,8 +22,6 @@ pub fn print_version() { println!("Ballista version: {BALLISTA_VERSION}") } -#[cfg(not(windows))] -pub mod cache_layer; pub mod client; pub mod config; pub mod error; diff --git a/ballista/core/src/object_store_registry/cache.rs b/ballista/core/src/object_store_registry/cache.rs deleted file mode 100644 index 621e30a18..000000000 --- a/ballista/core/src/object_store_registry/cache.rs +++ /dev/null @@ -1,86 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use crate::cache_layer::object_store::file::FileCacheObjectStore; -use crate::cache_layer::object_store::ObjectStoreWithKey; -use crate::cache_layer::CacheLayer; -use crate::object_store_registry::BallistaObjectStoreRegistry; -use datafusion::datasource::object_store::ObjectStoreRegistry; -use datafusion::execution::runtime_env::RuntimeConfig; -use object_store::ObjectStore; -use std::sync::Arc; -use url::Url; - -/// Get a RuntimeConfig with CachedBasedObjectStoreRegistry -pub fn with_cache_layer(config: RuntimeConfig, cache_layer: CacheLayer) -> RuntimeConfig { - let registry = Arc::new(BallistaObjectStoreRegistry::default()); - let registry = Arc::new(CachedBasedObjectStoreRegistry::new(registry, cache_layer)); - config.with_object_store_registry(registry) -} - -/// An object store registry wrapped an existing one with a cache layer. -/// -/// During [`get_store`], after getting the source [`ObjectStore`], based on the url, -/// it will firstly be wrapped with a key which will be used as the cache prefix path. -/// And then it will be wrapped with the [`cache_layer`]. -#[derive(Debug)] -pub struct CachedBasedObjectStoreRegistry { - inner: Arc, - cache_layer: CacheLayer, -} - -impl CachedBasedObjectStoreRegistry { - pub fn new(inner: Arc, cache_layer: CacheLayer) -> Self { - Self { inner, cache_layer } - } -} - -impl ObjectStoreRegistry for CachedBasedObjectStoreRegistry { - fn register_store( - &self, - url: &Url, - store: Arc, - ) -> Option> { - self.inner.register_store(url, store) - } - - fn get_store(&self, url: &Url) -> datafusion::common::Result> { - let source_object_store = self.inner.get_store(url)?; - let object_store_with_key = Arc::new(ObjectStoreWithKey::new( - get_url_key(url), - source_object_store, - )); - Ok(match &self.cache_layer { - CacheLayer::LocalDiskFile(cache_layer) => Arc::new( - FileCacheObjectStore::new(cache_layer.clone(), object_store_with_key), - ), - CacheLayer::LocalMemoryFile(cache_layer) => Arc::new( - FileCacheObjectStore::new(cache_layer.clone(), object_store_with_key), - ), - }) - } -} - -/// Get the key of a url for object store cache prefix path. -/// The credential info will be removed. -fn get_url_key(url: &Url) -> String { - format!( - "{}://{}", - url.scheme(), - &url[url::Position::BeforeHost..url::Position::AfterPort], - ) -} diff --git a/ballista/core/src/object_store_registry/mod.rs b/ballista/core/src/object_store_registry/mod.rs index be527e717..2e394a7cb 100644 --- a/ballista/core/src/object_store_registry/mod.rs +++ b/ballista/core/src/object_store_registry/mod.rs @@ -15,9 +15,6 @@ // specific language governing permissions and limitations // under the License. -#[cfg(not(windows))] -pub mod cache; - use datafusion::common::DataFusionError; use datafusion::datasource::object_store::{ DefaultObjectStoreRegistry, ObjectStoreRegistry, diff --git a/ballista/executor/src/executor_process.rs b/ballista/executor/src/executor_process.rs index db2a0005c..7bccaa3f8 100644 --- a/ballista/executor/src/executor_process.rs +++ b/ballista/executor/src/executor_process.rs @@ -40,14 +40,8 @@ use uuid::Uuid; use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv}; use datafusion_proto::protobuf::{LogicalPlanNode, PhysicalPlanNode}; -#[cfg(not(windows))] -use ballista_core::cache_layer::{ - medium::local_disk::LocalDiskMedium, policy::file::FileCacheLayer, CacheLayer, -}; use ballista_core::config::{DataCachePolicy, LogRotationPolicy, TaskSchedulingPolicy}; use ballista_core::error::BallistaError; -#[cfg(not(windows))] -use ballista_core::object_store_registry::cache::CachedBasedObjectStoreRegistry; use ballista_core::object_store_registry::with_object_store_registry; use ballista_core::serde::protobuf::executor_resource::Resource; use ballista_core::serde::protobuf::executor_status::Status; @@ -195,50 +189,13 @@ pub async fn start_executor_process(opt: Arc) -> Result<( })?) }; - // Set the object store registry - #[cfg(not(windows))] - let runtime_with_data_cache = { - let cache_dir = opt.cache_dir.clone(); - let cache_capacity = opt.cache_capacity; - let cache_io_concurrency = opt.cache_io_concurrency; - let cache_layer = - opt.data_cache_policy - .map(|data_cache_policy| match data_cache_policy { - DataCachePolicy::LocalDiskFile => { - let cache_dir = cache_dir.unwrap(); - let cache_layer = FileCacheLayer::new( - cache_capacity as usize, - cache_io_concurrency, - LocalDiskMedium::new(cache_dir), - ); - CacheLayer::LocalDiskFile(Arc::new(cache_layer)) - } - }); - if let Some(cache_layer) = cache_layer { - let registry = Arc::new(CachedBasedObjectStoreRegistry::new( - runtime.object_store_registry.clone(), - cache_layer, - )); - Some(Arc::new(RuntimeEnv { - memory_pool: runtime.memory_pool.clone(), - disk_manager: runtime.disk_manager.clone(), - cache_manager: runtime.cache_manager.clone(), - object_store_registry: registry, - })) - } else { - None - } - }; - #[cfg(windows)] - let runtime_with_data_cache = { None }; - let metrics_collector = Arc::new(LoggingMetricsCollector::default()); let executor = Arc::new(Executor::new( executor_meta, &work_dir, runtime, - runtime_with_data_cache, + None, metrics_collector, concurrent_tasks, opt.execution_engine.clone(),