Skip to content

Commit

Permalink
Merge pull request #16 from moka-rs/v0.10.x
Browse files Browse the repository at this point in the history
Merge `v0.10.x` branch into `main` (2023-08-12)
  • Loading branch information
tatsuya6502 authored Aug 12, 2023
2 parents f7a15e2 + e95c168 commit 83846e6
Show file tree
Hide file tree
Showing 5 changed files with 110 additions and 77 deletions.
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
# Mini Moka Cache — Change Log

## Version 0.10.2

### Fixed

- Fixed a memory corruption bug caused by the timing of concurrent `insert`,
`get` and removal of the same cached entry. ([#15][gh-pull-0015]).


## Version 0.10.1

Bumped the minimum supported Rust version (MSRV) to 1.61 (May 19, 2022).
Expand Down Expand Up @@ -45,6 +53,7 @@ lightweight.
<!-- Links -->
[moka-v0.9.6]: https://github.com/moka-rs/moka/tree/v0.9.6

[gh-pull-0015]: https://github.com/moka-rs/mini-moka/pull/15/
[gh-pull-0006]: https://github.com/moka-rs/mini-moka/pull/6/
[gh-pull-0005]: https://github.com/moka-rs/mini-moka/pull/5/
[gh-pull-0002]: https://github.com/moka-rs/mini-moka/pull/2/
Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "mini-moka"
version = "0.10.1"
version = "0.10.2"
edition = "2018"
rust-version = "1.61"

Expand Down
83 changes: 18 additions & 65 deletions src/common/concurrent.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
use crate::common::{deque::DeqNode, time::Instant};

use std::{
ptr::NonNull,
sync::{Arc, Mutex},
};
use std::{ptr::NonNull, sync::Arc};
use tagptr::TagNonNull;
use triomphe::Arc as TrioArc;

Expand Down Expand Up @@ -47,11 +44,11 @@ impl<K> Clone for KeyHash<K> {

pub(crate) struct KeyDate<K> {
key: Arc<K>,
entry_info: TrioArc<EntryInfo>,
entry_info: TrioArc<EntryInfo<K>>,
}

impl<K> KeyDate<K> {
pub(crate) fn new(key: Arc<K>, entry_info: &TrioArc<EntryInfo>) -> Self {
pub(crate) fn new(key: Arc<K>, entry_info: &TrioArc<EntryInfo<K>>) -> Self {
Self {
key,
entry_info: TrioArc::clone(entry_info),
Expand All @@ -66,11 +63,11 @@ impl<K> KeyDate<K> {
pub(crate) struct KeyHashDate<K> {
key: Arc<K>,
hash: u64,
entry_info: TrioArc<EntryInfo>,
entry_info: TrioArc<EntryInfo<K>>,
}

impl<K> KeyHashDate<K> {
pub(crate) fn new(kh: KeyHash<K>, entry_info: &TrioArc<EntryInfo>) -> Self {
pub(crate) fn new(kh: KeyHash<K>, entry_info: &TrioArc<EntryInfo<K>>) -> Self {
Self {
key: kh.key,
hash: kh.hash,
Expand All @@ -86,7 +83,7 @@ impl<K> KeyHashDate<K> {
self.hash
}

pub(crate) fn entry_info(&self) -> &EntryInfo {
pub(crate) fn entry_info(&self) -> &EntryInfo<K> {
&self.entry_info
}
}
Expand Down Expand Up @@ -147,53 +144,25 @@ impl<K> AccessTime for DeqNode<KeyHashDate<K>> {
}

// DeqNode for an access order queue.
type KeyDeqNodeAo<K> = TagNonNull<DeqNode<KeyHashDate<K>>, 2>;
pub(crate) type KeyDeqNodeAo<K> = TagNonNull<DeqNode<KeyHashDate<K>>, 2>;

// DeqNode for the write order queue.
type KeyDeqNodeWo<K> = NonNull<DeqNode<KeyDate<K>>>;

pub(crate) struct DeqNodes<K> {
access_order_q_node: Option<KeyDeqNodeAo<K>>,
write_order_q_node: Option<KeyDeqNodeWo<K>>,
}

// We need this `unsafe impl` as DeqNodes have NonNull pointers.
unsafe impl<K> Send for DeqNodes<K> {}
pub(crate) type KeyDeqNodeWo<K> = NonNull<DeqNode<KeyDate<K>>>;

pub(crate) struct ValueEntry<K, V> {
pub(crate) value: V,
info: TrioArc<EntryInfo>,
nodes: Mutex<DeqNodes<K>>,
info: TrioArc<EntryInfo<K>>,
}

impl<K, V> ValueEntry<K, V> {
pub(crate) fn new(value: V, entry_info: TrioArc<EntryInfo>) -> Self {
Self {
value,
info: entry_info,
nodes: Mutex::new(DeqNodes {
access_order_q_node: None,
write_order_q_node: None,
}),
}
}

pub(crate) fn new_from(value: V, entry_info: TrioArc<EntryInfo>, other: &Self) -> Self {
let nodes = {
let other_nodes = other.nodes.lock().expect("lock poisoned");
DeqNodes {
access_order_q_node: other_nodes.access_order_q_node,
write_order_q_node: other_nodes.write_order_q_node,
}
};
pub(crate) fn new(value: V, entry_info: TrioArc<EntryInfo<K>>) -> Self {
Self {
value,
info: entry_info,
nodes: Mutex::new(nodes),
}
}

pub(crate) fn entry_info(&self) -> &TrioArc<EntryInfo> {
pub(crate) fn entry_info(&self) -> &TrioArc<EntryInfo<K>> {
&self.info
}

Expand All @@ -219,47 +188,31 @@ impl<K, V> ValueEntry<K, V> {
}

pub(crate) fn access_order_q_node(&self) -> Option<KeyDeqNodeAo<K>> {
self.nodes
.lock()
.expect("lock poisoned")
.access_order_q_node
self.info.access_order_q_node()
}

pub(crate) fn set_access_order_q_node(&self, node: Option<KeyDeqNodeAo<K>>) {
self.nodes
.lock()
.expect("lock poisoned")
.access_order_q_node = node;
self.info.set_access_order_q_node(node);
}

pub(crate) fn take_access_order_q_node(&self) -> Option<KeyDeqNodeAo<K>> {
self.nodes
.lock()
.expect("lock poisoned")
.access_order_q_node
.take()
self.info.take_access_order_q_node()
}

pub(crate) fn write_order_q_node(&self) -> Option<KeyDeqNodeWo<K>> {
self.nodes.lock().expect("lock poisoned").write_order_q_node
self.info.write_order_q_node()
}

pub(crate) fn set_write_order_q_node(&self, node: Option<KeyDeqNodeWo<K>>) {
self.nodes.lock().expect("lock poisoned").write_order_q_node = node;
self.info.set_write_order_q_node(node)
}

pub(crate) fn take_write_order_q_node(&self) -> Option<KeyDeqNodeWo<K>> {
self.nodes
.lock()
.expect("lock poisoned")
.write_order_q_node
.take()
self.info.take_write_order_q_node()
}

pub(crate) fn unset_q_nodes(&self) {
let mut nodes = self.nodes.lock().expect("lock poisoned");
nodes.access_order_q_node = None;
nodes.write_order_q_node = None;
self.info.unset_q_nodes();
}
}

Expand Down
78 changes: 73 additions & 5 deletions src/common/concurrent/entry_info.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,20 @@
use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
use std::sync::{
atomic::{AtomicBool, AtomicU32, Ordering},
Mutex,
};

use super::AccessTime;
use super::{AccessTime, KeyDeqNodeAo, KeyDeqNodeWo};
use crate::common::{concurrent::atomic_time::AtomicInstant, time::Instant};

pub(crate) struct EntryInfo {
pub(crate) struct DeqNodes<K> {
access_order_q_node: Option<KeyDeqNodeAo<K>>,
write_order_q_node: Option<KeyDeqNodeWo<K>>,
}

// We need this `unsafe impl` as DeqNodes have NonNull pointers.
unsafe impl<K> Send for DeqNodes<K> {}

pub(crate) struct EntryInfo<K> {
/// `is_admitted` indicates that the entry has been admitted to the
/// cache. When `false`, it means the entry is _temporary_ admitted to
/// the cache or evicted from the cache (so it should not have LRU nodes).
Expand All @@ -15,9 +26,10 @@ pub(crate) struct EntryInfo {
last_accessed: AtomicInstant,
last_modified: AtomicInstant,
policy_weight: AtomicU32,
nodes: Mutex<DeqNodes<K>>,
}

impl EntryInfo {
impl<K> EntryInfo<K> {
#[inline]
pub(crate) fn new(timestamp: Instant, policy_weight: u32) -> Self {
Self {
Expand All @@ -26,6 +38,10 @@ impl EntryInfo {
last_accessed: AtomicInstant::new(timestamp),
last_modified: AtomicInstant::new(timestamp),
policy_weight: AtomicU32::new(policy_weight),
nodes: Mutex::new(DeqNodes {
access_order_q_node: None,
write_order_q_node: None,
}),
}
}

Expand Down Expand Up @@ -54,12 +70,64 @@ impl EntryInfo {
self.policy_weight.load(Ordering::Acquire)
}

#[inline]
pub(crate) fn set_policy_weight(&self, size: u32) {
self.policy_weight.store(size, Ordering::Release);
}

#[inline]
pub(crate) fn access_order_q_node(&self) -> Option<KeyDeqNodeAo<K>> {
self.nodes
.lock()
.expect("lock poisoned")
.access_order_q_node
}

#[inline]
pub(crate) fn set_access_order_q_node(&self, node: Option<KeyDeqNodeAo<K>>) {
self.nodes
.lock()
.expect("lock poisoned")
.access_order_q_node = node;
}

#[inline]
pub(crate) fn take_access_order_q_node(&self) -> Option<KeyDeqNodeAo<K>> {
self.nodes
.lock()
.expect("lock poisoned")
.access_order_q_node
.take()
}

#[inline]
pub(crate) fn write_order_q_node(&self) -> Option<KeyDeqNodeWo<K>> {
self.nodes.lock().expect("lock poisoned").write_order_q_node
}

#[inline]
pub(crate) fn set_write_order_q_node(&self, node: Option<KeyDeqNodeWo<K>>) {
self.nodes.lock().expect("lock poisoned").write_order_q_node = node;
}

#[inline]
pub(crate) fn take_write_order_q_node(&self) -> Option<KeyDeqNodeWo<K>> {
self.nodes
.lock()
.expect("lock poisoned")
.write_order_q_node
.take()
}

#[inline]
pub(crate) fn unset_q_nodes(&self) {
let mut nodes = self.nodes.lock().expect("lock poisoned");
nodes.access_order_q_node = None;
nodes.write_order_q_node = None;
}
}

impl AccessTime for EntryInfo {
impl<K> AccessTime for EntryInfo<K> {
#[inline]
fn last_accessed(&self) -> Option<Instant> {
self.last_accessed.instant()
Expand Down
15 changes: 9 additions & 6 deletions src/sync/base_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -287,9 +287,10 @@ where
// Update
.and_modify(|entry| {
// NOTES on `new_value_entry_from` method:
// 1. The internal EntryInfo will be shared between the old and new ValueEntries.
// 2. This method will set the last_accessed and last_modified to the max value to
// prevent this new ValueEntry from being evicted by an expiration policy.
// 1. The internal EntryInfo will be shared between the old and new
// ValueEntries.
// 2. This method will set the dirty flag to prevent this new
// ValueEntry from being evicted by an expiration policy.
// 3. This method will update the policy_weight with the new weight.
let old_weight = entry.policy_weight();
let mut old_value = entry.value.clone();
Expand Down Expand Up @@ -359,7 +360,7 @@ where
info.set_last_accessed(timestamp);
info.set_last_modified(timestamp);
info.set_policy_weight(policy_weight);
TrioArc::new(ValueEntry::new_from(value, info, other))
TrioArc::new(ValueEntry::new(value, info))
}

#[inline]
Expand Down Expand Up @@ -829,7 +830,9 @@ where
Ok(Hit(hash, entry, timestamp)) => {
freq.increment(hash);
entry.set_last_accessed(timestamp);
deqs.move_to_back_ao(&entry)
if entry.is_admitted() {
deqs.move_to_back_ao(&entry);
}
}
Ok(Miss(hash)) => freq.increment(hash),
Err(_) => break,
Expand Down Expand Up @@ -1209,7 +1212,7 @@ where
if let Some((_k, entry)) = maybe_entry {
Self::handle_remove(deqs, entry, counters);
} else if let Some(entry) = self.cache.get(key) {
if entry.last_modified().is_none() {
if entry.is_dirty() {
deqs.move_to_back_ao(&entry);
deqs.move_to_back_wo(&entry);
} else {
Expand Down

0 comments on commit 83846e6

Please sign in to comment.