Skip to content

Commit

Permalink
Merge pull request #105
Browse files Browse the repository at this point in the history
Improved concurrent performance of CachingLocal
  • Loading branch information
gfusee authored Oct 30, 2024
2 parents c7f32f6 + c887762 commit c23553a
Show file tree
Hide file tree
Showing 3 changed files with 170 additions and 84 deletions.
221 changes: 152 additions & 69 deletions caching/src/local/caching_local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,38 +7,42 @@ use std::time::Duration;
use async_trait::async_trait;
use serde::de::DeserializeOwned;
use serde::Serialize;
use tokio::sync::Mutex;
use tokio::sync::{Mutex, RwLock};
use tokio::task;

use novax::caching::{CachingDurationStrategy, CachingStrategy};
use novax::errors::CachingError;
use novax::errors::NovaXError;

use crate::date::get_current_timestamp::{get_current_timestamp, GetDuration};
use crate::utils::lock::MutexLike;
use crate::utils::lock::{Locker, MutexLike};

pub type CachingLocal = BaseCachingLocal<Mutex<HashMap<u64, Vec<u8>>>, Mutex<HashMap<u64, Duration>>, Mutex<Duration>, Mutex<bool>>;
pub type CachingLocal = BaseCachingLocal<RwLock<Vec<u8>>, RwLock<HashMap<u64, RwLock<Vec<u8>>>>, RwLock<Duration>, RwLock<HashMap<u64, RwLock<Duration>>>, Mutex<Duration>, Mutex<bool>>;

pub struct BaseCachingLocal<MutexValue, MutexExpiration, MutexCleanupInterval, MutexIsCleanupProcessStarted>
pub struct BaseCachingLocal<LockerValue, LockerValueHashMap, LockerExpiration, LockerExpirationHashMap, MutexCleanupInterval, MutexIsCleanupProcessStarted>
where
MutexValue: MutexLike<T = HashMap<u64, Vec<u8>>>,
MutexExpiration: MutexLike<T = HashMap<u64, Duration>>,
MutexCleanupInterval: MutexLike<T = Duration>,
MutexIsCleanupProcessStarted: MutexLike<T = bool>
LockerValue: Locker<T = Vec<u8>> + Debug,
LockerValueHashMap: Locker<T = HashMap<u64, LockerValue>> + Debug,
LockerExpiration: Locker<T = Duration> + Debug,
LockerExpirationHashMap: Locker<T = HashMap<u64, LockerExpiration>> + Debug,
MutexCleanupInterval: MutexLike<T = Duration> + Debug,
MutexIsCleanupProcessStarted: MutexLike<T = bool> + Debug
{
duration_strategy: CachingDurationStrategy,
value_map: Arc<MutexValue>,
expiration_timestamp_map: Arc<MutexExpiration>,
value_map: Arc<LockerValueHashMap>,
expiration_timestamp_map: Arc<LockerExpirationHashMap>,
cleanup_interval: Arc<MutexCleanupInterval>,
is_cleanup_process_started: Arc<MutexIsCleanupProcessStarted>,
}

impl<MutexValue, MutexExpiration, MutexCleanupInterval, MutexIsCleanupProcessStarted> Clone for BaseCachingLocal<MutexValue, MutexExpiration, MutexCleanupInterval, MutexIsCleanupProcessStarted>
impl<LockerValue, LockerValueHashMap, LockerExpiration, LockerExpirationHashMap, MutexCleanupInterval, MutexIsCleanupProcessStarted> Clone for BaseCachingLocal<LockerValue, LockerValueHashMap, LockerExpiration, LockerExpirationHashMap, MutexCleanupInterval, MutexIsCleanupProcessStarted>
where
MutexValue: MutexLike<T = HashMap<u64, Vec<u8>>>,
MutexExpiration: MutexLike<T = HashMap<u64, Duration>>,
MutexCleanupInterval: MutexLike<T = Duration>,
MutexIsCleanupProcessStarted: MutexLike<T = bool>
LockerValue: Locker<T = Vec<u8>> + Debug,
LockerValueHashMap: Locker<T = HashMap<u64, LockerValue>> + Debug,
LockerExpiration: Locker<T = Duration> + Debug,
LockerExpirationHashMap: Locker<T = HashMap<u64, LockerExpiration>> + Debug,
MutexCleanupInterval: MutexLike<T = Duration> + Debug,
MutexIsCleanupProcessStarted: MutexLike<T = bool> + Debug
{
fn clone(&self) -> Self {
Self {
Expand All @@ -51,12 +55,14 @@ where
}
}

impl<MutexValue, MutexExpiration, MutexCleanupInterval, MutexIsCleanupProcessStarted> Debug for BaseCachingLocal<MutexValue, MutexExpiration, MutexCleanupInterval, MutexIsCleanupProcessStarted>
impl<LockerValue, LockerValueHashMap, LockerExpiration, LockerExpirationHashMap, MutexCleanupInterval, MutexIsCleanupProcessStarted> Debug for BaseCachingLocal<LockerValue, LockerValueHashMap, LockerExpiration, LockerExpirationHashMap, MutexCleanupInterval, MutexIsCleanupProcessStarted>
where
MutexValue: MutexLike<T = HashMap<u64, Vec<u8>>>,
MutexExpiration: MutexLike<T = HashMap<u64, Duration>>,
MutexCleanupInterval: MutexLike<T = Duration>,
MutexIsCleanupProcessStarted: MutexLike<T = bool>
LockerValue: Locker<T = Vec<u8>> + Debug,
LockerValueHashMap: Locker<T = HashMap<u64, LockerValue>> + Debug,
LockerExpiration: Locker<T = Duration> + Debug,
LockerExpirationHashMap: Locker<T = HashMap<u64, LockerExpiration>> + Debug,
MutexCleanupInterval: MutexLike<T = Duration> + Debug,
MutexIsCleanupProcessStarted: MutexLike<T = bool> + Debug
{
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("BaseCachingLocal")
Expand All @@ -69,34 +75,74 @@ where
}
}

impl<MutexValue, MutexExpiration, MutexCleanupInterval, MutexIsCleanupProcessStarted> BaseCachingLocal<MutexValue, MutexExpiration, MutexCleanupInterval, MutexIsCleanupProcessStarted>
impl<LockerValue, LockerValueHashMap, LockerExpiration, LockerExpirationHashMap, MutexCleanupInterval, MutexIsCleanupProcessStarted> BaseCachingLocal<LockerValue, LockerValueHashMap, LockerExpiration, LockerExpirationHashMap, MutexCleanupInterval, MutexIsCleanupProcessStarted>
where
MutexValue: MutexLike<T = HashMap<u64, Vec<u8>>>,
MutexExpiration: MutexLike<T = HashMap<u64, Duration>>,
MutexCleanupInterval: MutexLike<T = Duration>,
MutexIsCleanupProcessStarted: MutexLike<T = bool>
LockerValue: Locker<T = Vec<u8>> + Debug,
LockerValueHashMap: Locker<T = HashMap<u64, LockerValue>> + Debug,
LockerExpiration: Locker<T = Duration> + Debug,
LockerExpirationHashMap: Locker<T = HashMap<u64, LockerExpiration>> + Debug,
MutexCleanupInterval: MutexLike<T = Duration> + Debug,
MutexIsCleanupProcessStarted: MutexLike<T = bool> + Debug
{
pub fn empty(duration_strategy: CachingDurationStrategy) -> CachingLocal {
CachingLocal {
pub fn empty(duration_strategy: CachingDurationStrategy) -> Self {
BaseCachingLocal {
duration_strategy,
value_map: Arc::new(Mutex::new(HashMap::new())),
expiration_timestamp_map: Arc::new(Mutex::new(HashMap::new())),
cleanup_interval: Arc::new(Mutex::new(Duration::from_secs(0))),
is_cleanup_process_started: Arc::new(Mutex::new(false)),
value_map: Arc::new(LockerValueHashMap::new(HashMap::new())),
expiration_timestamp_map: Arc::new(LockerExpirationHashMap::new(HashMap::new())),
cleanup_interval: Arc::new(MutexCleanupInterval::new(Duration::from_secs(0))),
is_cleanup_process_started: Arc::new(MutexIsCleanupProcessStarted::new(false)),
}
}

async fn remove_key(&self, key: u64) {
let _ = self.expiration_timestamp_map.lock().await.remove(&key);
let _ = self.value_map.lock().await.remove(&key);
let contains_key = {
let expiration_timestamp_read_guard = self.expiration_timestamp_map.read().await;
expiration_timestamp_read_guard.contains_key(&key)
};

if contains_key {
let mut expiration_write_guard = self.expiration_timestamp_map.write().await;
let mut value_map_write_guard = self.value_map.write().await;

expiration_write_guard.remove(&key);
value_map_write_guard.remove(&key);
}
}

async fn set_value<T: Serialize + DeserializeOwned>(&self, key: u64, value: &T) -> Result<(), NovaXError> {
let contains_key = {
let expiration_timestamp_read_guard = self.expiration_timestamp_map.read().await;
expiration_timestamp_read_guard.contains_key(&key)
};

let expiration_timestamp = self.duration_strategy.get_duration_timestamp(&get_current_timestamp()?)?;
self.expiration_timestamp_map.lock().await.insert(key, expiration_timestamp);

let Ok(serialized) = rmp_serde::to_vec(value) else { return Err(CachingError::UnableToSerialize.into())};
self.value_map.lock().await.insert(key, serialized);

if contains_key {
let expiration_timestamp_map_read_guard = self.expiration_timestamp_map.read().await;
// Important: the key might have been removed since the contains_key assignment.
// If so, we won't set the cache here, but go to the "!contains_key" scope.
// We could lock the whole map but this has a terrible performance impact by creating a bottleneck.
if let Some(expiration_timestamp_locker) = expiration_timestamp_map_read_guard.get(&key) {
let mut expiration_timestamp_write = expiration_timestamp_locker.write().await;

// Let's do the same for the value
let value_map_read_guard = self.value_map.read().await;
if let Some(value_locker) = value_map_read_guard.get(&key) {
let mut value_write = value_locker.write().await;
*expiration_timestamp_write = expiration_timestamp;
*value_write = serialized;

return Ok(());
};
};
}

// The key is not found, we have to lock everything.
let mut expiration_map_write_guard = self.expiration_timestamp_map.write().await;
let mut value_map_write_guard = self.value_map.write().await;
expiration_map_write_guard.insert(key, LockerExpiration::new(expiration_timestamp));
value_map_write_guard.insert(key, LockerValue::new(serialized));

Ok(())
}
Expand Down Expand Up @@ -165,14 +211,30 @@ impl CachingLocal
}

async fn perform_cleanup(&self) -> Result<(), NovaXError> {
// Can create a bottleneck, be sure to not run this function too frequently.
let current_timestamp = get_current_timestamp()?;
let mut value_map_locked = self.value_map.lock().await;
let mut expiration_map_locked = self.expiration_timestamp_map.lock().await;
let mut expiration_map_write_guard = self.expiration_timestamp_map.write().await;
let mut value_map_write_guard = self.value_map.write().await;

let keys: Vec<u64> = expiration_map_write_guard
.keys()
.copied()
.collect();

for key in keys {
let should_remove = {
let Some(duration_locker) = expiration_map_write_guard.get(&key) else {
continue;
};

let duration_read = duration_locker.read().await;

current_timestamp > *duration_read
};

for (key, duration) in expiration_map_locked.clone().into_iter() {
if current_timestamp > duration {
value_map_locked.remove(&key);
expiration_map_locked.remove(&key);
if should_remove {
value_map_write_guard.remove(&key);
expiration_map_write_guard.remove(&key);
}
}

Expand All @@ -181,27 +243,45 @@ impl CachingLocal
}

#[async_trait]
impl<MutexValue, MutexExpiration, MutexCleanupInterval, MutexIsCleanupProcessStarted> CachingStrategy for BaseCachingLocal<MutexValue, MutexExpiration, MutexCleanupInterval, MutexIsCleanupProcessStarted>
impl<LockerValue, LockerValueHashMap, LockerExpiration, LockerExpirationHashMap, MutexCleanupInterval, MutexIsCleanupProcessStarted> CachingStrategy for BaseCachingLocal<LockerValue, LockerValueHashMap, LockerExpiration, LockerExpirationHashMap, MutexCleanupInterval, MutexIsCleanupProcessStarted>
where
MutexValue: MutexLike<T = HashMap<u64, Vec<u8>>>,
MutexExpiration: MutexLike<T = HashMap<u64, Duration>>,
MutexCleanupInterval: MutexLike<T = Duration>,
MutexIsCleanupProcessStarted: MutexLike<T = bool>
LockerValue: Locker<T = Vec<u8>> + Debug,
LockerValueHashMap: Locker<T = HashMap<u64, LockerValue>> + Debug,
LockerExpiration: Locker<T = Duration> + Debug,
LockerExpirationHashMap: Locker<T = HashMap<u64, LockerExpiration>> + Debug,
MutexCleanupInterval: MutexLike<T = Duration> + Debug,
MutexIsCleanupProcessStarted: MutexLike<T = bool> + Debug
{
async fn get_cache<T: Serialize + DeserializeOwned + Send>(&self, key: u64) -> Result<Option<T>, NovaXError> {
let Some(expiration_timestamp) = self.expiration_timestamp_map.lock().await.get(&key).cloned() else { return Ok(None) };
{
let expiration_timestamp = {
let read_guard = self.expiration_timestamp_map.read().await;
let Some(expiration_timestamp_locker) = read_guard.get(&key) else {
return Ok(None);
};

if get_current_timestamp()? >= expiration_timestamp {
self.remove_key(key).await;
Ok(None)
} else {
let Some(encoded_value) = self.value_map.lock().await.get(&key).cloned() else { return Ok(None) };
let Ok(value) = rmp_serde::from_slice(&encoded_value) else {
return Err(CachingError::UnableToDeserialize.into())
let expiration_timestamp_read = expiration_timestamp_locker.read().await;
*expiration_timestamp_read
};

Ok(Some(value))
}
if get_current_timestamp()? >= expiration_timestamp {
self.remove_key(key).await;
return Ok(None)
}
};

let value_map_read_guard = self.value_map.read().await;
let Some(encoded_value_locked) = value_map_read_guard.get(&key) else {
return Ok(None);
};

let encoded_value = encoded_value_locked.read().await;

let Ok(value) = rmp_serde::from_slice(&encoded_value) else {
return Err(CachingError::UnableToDeserialize.into())
};

Ok(Some(value))
}

async fn set_cache<T: Serialize + DeserializeOwned + Send + Sync>(&self, key: u64, value: &T) -> Result<(), NovaXError> {
Expand All @@ -224,8 +304,11 @@ where
}

async fn clear(&self) -> Result<(), NovaXError> {
self.expiration_timestamp_map.lock().await.clear();
self.value_map.lock().await.clear();
let mut expiration_map_write_guard = self.expiration_timestamp_map.write().await;
let mut value_map_write_guard = self.value_map.write().await;

expiration_map_write_guard.clear();
value_map_write_guard.clear();

Ok(())
}
Expand Down Expand Up @@ -443,8 +526,8 @@ mod test {
caching.set_cache(2, &"test2".to_string()).await?;
caching.clear().await?;

assert!(caching.value_map.lock().await.is_empty());
assert!(caching.expiration_timestamp_map.lock().await.is_empty());
assert!(caching.value_map.write().await.is_empty());
assert!(caching.expiration_timestamp_map.write().await.is_empty());

Ok(())
}
Expand All @@ -461,8 +544,8 @@ mod test {

caching.perform_cleanup().await?;

let value_map_locked = caching.value_map.lock().await;
let expiration_timestamp_locked = caching.expiration_timestamp_map.lock().await;
let value_map_locked = caching.value_map.write().await;
let expiration_timestamp_locked = caching.expiration_timestamp_map.write().await;

assert_eq!(value_map_locked.len(), 1);
assert_eq!(expiration_timestamp_locked.len(), 1);
Expand All @@ -482,8 +565,8 @@ mod test {

caching.perform_cleanup().await?;

let value_map_locked = caching.value_map.lock().await;
let expiration_timestamp_locked = caching.expiration_timestamp_map.lock().await;
let value_map_locked = caching.value_map.write().await;
let expiration_timestamp_locked = caching.expiration_timestamp_map.write().await;

assert!(value_map_locked.is_empty());
assert!(expiration_timestamp_locked.is_empty());
Expand Down Expand Up @@ -513,8 +596,8 @@ mod test {
set_mock_time(Duration::from_secs(11));

{
let value_map_locked = caching.value_map.lock().await;
let expiration_timestamp_locked = caching.expiration_timestamp_map.lock().await;
let value_map_locked = caching.value_map.write().await;
let expiration_timestamp_locked = caching.expiration_timestamp_map.write().await;

assert_eq!(value_map_locked.len(), 2);
assert_eq!(expiration_timestamp_locked.len(), 2);
Expand All @@ -523,8 +606,8 @@ mod test {
caching.perform_cleanup().await?;

{
let value_map_locked = caching.value_map.lock().await;
let expiration_timestamp_locked = caching.expiration_timestamp_map.lock().await;
let value_map_locked = caching.value_map.write().await;
let expiration_timestamp_locked = caching.expiration_timestamp_map.write().await;

assert_eq!(value_map_locked.len(), 1);
assert_eq!(expiration_timestamp_locked.len(), 1);
Expand Down
14 changes: 7 additions & 7 deletions caching/src/locked/caching.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ pub type CachingLocked<C: CachingStrategy> = BaseCachingLocked<C, RwLock<()>, Mu
pub struct BaseCachingLocked<C, L, M>
where
C: CachingStrategy,
L: Locker,
L: Locker<T = ()>,
M: MutexLike<T = HashMap<u64, Arc<L>>>,
{
pub caching: C,
Expand All @@ -28,7 +28,7 @@ where
impl<C, L, M> Clone for BaseCachingLocked<C, L, M>
where
C: CachingStrategy,
L: Locker,
L: Locker<T = ()>,
M: MutexLike<T = HashMap<u64, Arc<L>>>,
{
fn clone(&self) -> Self {
Expand All @@ -42,7 +42,7 @@ where
impl<C, L, M> Debug for BaseCachingLocked<C, L, M>
where
C: CachingStrategy,
L: Locker,
L: Locker<T = ()>,
M: MutexLike<T = HashMap<u64, Arc<L>>>,
{
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
Expand All @@ -56,7 +56,7 @@ where
impl<C, L, M> BaseCachingLocked<C, L, M>
where
C: CachingStrategy,
L: Locker,
L: Locker<T = ()>,
M: MutexLike<T = HashMap<u64, Arc<L>>>,
{
pub fn new(caching: C) -> BaseCachingLocked<C, L, M> {
Expand All @@ -70,15 +70,15 @@ where
impl<C, L, M> BaseCachingLocked<C, L, M>
where
C: CachingStrategy,
L: Locker,
L: Locker<T = ()>,
M: MutexLike<T = HashMap<u64, Arc<L>>>,
{
async fn get_locker(&self, key: u64) -> Result<Arc<L>, NovaXError> {
let mut lockers_map = self._lockers_map.lock().await;
let locker = if let Some(locker) = lockers_map.get(&key) {
locker.clone()
} else {
let locker = Arc::new(L::new());
let locker = Arc::new(L::new(()));
lockers_map.insert(key, locker.clone());
locker
};
Expand All @@ -91,7 +91,7 @@ where
impl<C, L, M> CachingStrategy for BaseCachingLocked<C, L, M>
where
C: CachingStrategy,
L: Locker,
L: Locker<T = ()>,
M: MutexLike<T = HashMap<u64, Arc<L>>>,
{
async fn get_cache<T: Serialize + DeserializeOwned + Send + Sync>(&self, key: u64) -> Result<Option<T>, NovaXError> {
Expand Down
Loading

0 comments on commit c23553a

Please sign in to comment.