diff --git a/CHANGELOG.md b/CHANGELOG.md index 5581d07..97aa08b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,13 @@ # Mini Moka Cache — Change Log +## Version 0.10.2 + +### Fixed + +- Fixed a memory corruption bug caused by the timing of concurrent `insert`, + `get` and removal of the same cached entry. ([#15][gh-pull-0015]). + + ## Version 0.10.1 Bumped the minimum supported Rust version (MSRV) to 1.61 (May 19, 2022). @@ -45,6 +53,7 @@ lightweight. [moka-v0.9.6]: https://github.com/moka-rs/moka/tree/v0.9.6 +[gh-pull-0015]: https://github.com/moka-rs/mini-moka/pull/15/ [gh-pull-0006]: https://github.com/moka-rs/mini-moka/pull/6/ [gh-pull-0005]: https://github.com/moka-rs/mini-moka/pull/5/ [gh-pull-0002]: https://github.com/moka-rs/mini-moka/pull/2/ diff --git a/Cargo.toml b/Cargo.toml index 226a713..626a92f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "mini-moka" -version = "0.10.1" +version = "0.10.2" edition = "2018" rust-version = "1.61" diff --git a/src/common/concurrent.rs b/src/common/concurrent.rs index f08e241..239e0b6 100644 --- a/src/common/concurrent.rs +++ b/src/common/concurrent.rs @@ -1,9 +1,6 @@ use crate::common::{deque::DeqNode, time::Instant}; -use std::{ - ptr::NonNull, - sync::{Arc, Mutex}, -}; +use std::{ptr::NonNull, sync::Arc}; use tagptr::TagNonNull; use triomphe::Arc as TrioArc; @@ -47,11 +44,11 @@ impl Clone for KeyHash { pub(crate) struct KeyDate { key: Arc, - entry_info: TrioArc, + entry_info: TrioArc>, } impl KeyDate { - pub(crate) fn new(key: Arc, entry_info: &TrioArc) -> Self { + pub(crate) fn new(key: Arc, entry_info: &TrioArc>) -> Self { Self { key, entry_info: TrioArc::clone(entry_info), @@ -66,11 +63,11 @@ impl KeyDate { pub(crate) struct KeyHashDate { key: Arc, hash: u64, - entry_info: TrioArc, + entry_info: TrioArc>, } impl KeyHashDate { - pub(crate) fn new(kh: KeyHash, entry_info: &TrioArc) -> Self { + pub(crate) fn new(kh: KeyHash, entry_info: &TrioArc>) -> Self { Self { key: kh.key, hash: kh.hash, @@ -86,7 +83,7 @@ impl KeyHashDate { self.hash } - pub(crate) fn entry_info(&self) -> &EntryInfo { + pub(crate) fn entry_info(&self) -> &EntryInfo { &self.entry_info } } @@ -147,53 +144,25 @@ impl AccessTime for DeqNode> { } // DeqNode for an access order queue. -type KeyDeqNodeAo = TagNonNull>, 2>; +pub(crate) type KeyDeqNodeAo = TagNonNull>, 2>; // DeqNode for the write order queue. -type KeyDeqNodeWo = NonNull>>; - -pub(crate) struct DeqNodes { - access_order_q_node: Option>, - write_order_q_node: Option>, -} - -// We need this `unsafe impl` as DeqNodes have NonNull pointers. -unsafe impl Send for DeqNodes {} +pub(crate) type KeyDeqNodeWo = NonNull>>; pub(crate) struct ValueEntry { pub(crate) value: V, - info: TrioArc, - nodes: Mutex>, + info: TrioArc>, } impl ValueEntry { - pub(crate) fn new(value: V, entry_info: TrioArc) -> Self { - Self { - value, - info: entry_info, - nodes: Mutex::new(DeqNodes { - access_order_q_node: None, - write_order_q_node: None, - }), - } - } - - pub(crate) fn new_from(value: V, entry_info: TrioArc, other: &Self) -> Self { - let nodes = { - let other_nodes = other.nodes.lock().expect("lock poisoned"); - DeqNodes { - access_order_q_node: other_nodes.access_order_q_node, - write_order_q_node: other_nodes.write_order_q_node, - } - }; + pub(crate) fn new(value: V, entry_info: TrioArc>) -> Self { Self { value, info: entry_info, - nodes: Mutex::new(nodes), } } - pub(crate) fn entry_info(&self) -> &TrioArc { + pub(crate) fn entry_info(&self) -> &TrioArc> { &self.info } @@ -219,47 +188,31 @@ impl ValueEntry { } pub(crate) fn access_order_q_node(&self) -> Option> { - self.nodes - .lock() - .expect("lock poisoned") - .access_order_q_node + self.info.access_order_q_node() } pub(crate) fn set_access_order_q_node(&self, node: Option>) { - self.nodes - .lock() - .expect("lock poisoned") - .access_order_q_node = node; + self.info.set_access_order_q_node(node); } pub(crate) fn take_access_order_q_node(&self) -> Option> { - self.nodes - .lock() - .expect("lock poisoned") - .access_order_q_node - .take() + self.info.take_access_order_q_node() } pub(crate) fn write_order_q_node(&self) -> Option> { - self.nodes.lock().expect("lock poisoned").write_order_q_node + self.info.write_order_q_node() } pub(crate) fn set_write_order_q_node(&self, node: Option>) { - self.nodes.lock().expect("lock poisoned").write_order_q_node = node; + self.info.set_write_order_q_node(node) } pub(crate) fn take_write_order_q_node(&self) -> Option> { - self.nodes - .lock() - .expect("lock poisoned") - .write_order_q_node - .take() + self.info.take_write_order_q_node() } pub(crate) fn unset_q_nodes(&self) { - let mut nodes = self.nodes.lock().expect("lock poisoned"); - nodes.access_order_q_node = None; - nodes.write_order_q_node = None; + self.info.unset_q_nodes(); } } diff --git a/src/common/concurrent/entry_info.rs b/src/common/concurrent/entry_info.rs index b438604..8a10c95 100644 --- a/src/common/concurrent/entry_info.rs +++ b/src/common/concurrent/entry_info.rs @@ -1,9 +1,20 @@ -use std::sync::atomic::{AtomicBool, AtomicU32, Ordering}; +use std::sync::{ + atomic::{AtomicBool, AtomicU32, Ordering}, + Mutex, +}; -use super::AccessTime; +use super::{AccessTime, KeyDeqNodeAo, KeyDeqNodeWo}; use crate::common::{concurrent::atomic_time::AtomicInstant, time::Instant}; -pub(crate) struct EntryInfo { +pub(crate) struct DeqNodes { + access_order_q_node: Option>, + write_order_q_node: Option>, +} + +// We need this `unsafe impl` as DeqNodes have NonNull pointers. +unsafe impl Send for DeqNodes {} + +pub(crate) struct EntryInfo { /// `is_admitted` indicates that the entry has been admitted to the /// cache. When `false`, it means the entry is _temporary_ admitted to /// the cache or evicted from the cache (so it should not have LRU nodes). @@ -15,9 +26,10 @@ pub(crate) struct EntryInfo { last_accessed: AtomicInstant, last_modified: AtomicInstant, policy_weight: AtomicU32, + nodes: Mutex>, } -impl EntryInfo { +impl EntryInfo { #[inline] pub(crate) fn new(timestamp: Instant, policy_weight: u32) -> Self { Self { @@ -26,6 +38,10 @@ impl EntryInfo { last_accessed: AtomicInstant::new(timestamp), last_modified: AtomicInstant::new(timestamp), policy_weight: AtomicU32::new(policy_weight), + nodes: Mutex::new(DeqNodes { + access_order_q_node: None, + write_order_q_node: None, + }), } } @@ -54,12 +70,64 @@ impl EntryInfo { self.policy_weight.load(Ordering::Acquire) } + #[inline] pub(crate) fn set_policy_weight(&self, size: u32) { self.policy_weight.store(size, Ordering::Release); } + + #[inline] + pub(crate) fn access_order_q_node(&self) -> Option> { + self.nodes + .lock() + .expect("lock poisoned") + .access_order_q_node + } + + #[inline] + pub(crate) fn set_access_order_q_node(&self, node: Option>) { + self.nodes + .lock() + .expect("lock poisoned") + .access_order_q_node = node; + } + + #[inline] + pub(crate) fn take_access_order_q_node(&self) -> Option> { + self.nodes + .lock() + .expect("lock poisoned") + .access_order_q_node + .take() + } + + #[inline] + pub(crate) fn write_order_q_node(&self) -> Option> { + self.nodes.lock().expect("lock poisoned").write_order_q_node + } + + #[inline] + pub(crate) fn set_write_order_q_node(&self, node: Option>) { + self.nodes.lock().expect("lock poisoned").write_order_q_node = node; + } + + #[inline] + pub(crate) fn take_write_order_q_node(&self) -> Option> { + self.nodes + .lock() + .expect("lock poisoned") + .write_order_q_node + .take() + } + + #[inline] + pub(crate) fn unset_q_nodes(&self) { + let mut nodes = self.nodes.lock().expect("lock poisoned"); + nodes.access_order_q_node = None; + nodes.write_order_q_node = None; + } } -impl AccessTime for EntryInfo { +impl AccessTime for EntryInfo { #[inline] fn last_accessed(&self) -> Option { self.last_accessed.instant() diff --git a/src/sync/base_cache.rs b/src/sync/base_cache.rs index 969a038..e273531 100644 --- a/src/sync/base_cache.rs +++ b/src/sync/base_cache.rs @@ -287,9 +287,10 @@ where // Update .and_modify(|entry| { // NOTES on `new_value_entry_from` method: - // 1. The internal EntryInfo will be shared between the old and new ValueEntries. - // 2. This method will set the last_accessed and last_modified to the max value to - // prevent this new ValueEntry from being evicted by an expiration policy. + // 1. The internal EntryInfo will be shared between the old and new + // ValueEntries. + // 2. This method will set the dirty flag to prevent this new + // ValueEntry from being evicted by an expiration policy. // 3. This method will update the policy_weight with the new weight. let old_weight = entry.policy_weight(); let mut old_value = entry.value.clone(); @@ -359,7 +360,7 @@ where info.set_last_accessed(timestamp); info.set_last_modified(timestamp); info.set_policy_weight(policy_weight); - TrioArc::new(ValueEntry::new_from(value, info, other)) + TrioArc::new(ValueEntry::new(value, info)) } #[inline] @@ -829,7 +830,9 @@ where Ok(Hit(hash, entry, timestamp)) => { freq.increment(hash); entry.set_last_accessed(timestamp); - deqs.move_to_back_ao(&entry) + if entry.is_admitted() { + deqs.move_to_back_ao(&entry); + } } Ok(Miss(hash)) => freq.increment(hash), Err(_) => break, @@ -1209,7 +1212,7 @@ where if let Some((_k, entry)) = maybe_entry { Self::handle_remove(deqs, entry, counters); } else if let Some(entry) = self.cache.get(key) { - if entry.last_modified().is_none() { + if entry.is_dirty() { deqs.move_to_back_ao(&entry); deqs.move_to_back_wo(&entry); } else {