Skip to content

Commit

Permalink
Replace atomic::Atomic with parking_lot::Mutex
Browse files Browse the repository at this point in the history
  • Loading branch information
nazar-pc committed Sep 18, 2024
1 parent c55c997 commit 7eeb365
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 63 deletions.
7 changes: 0 additions & 7 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion crates/sc-proof-of-time/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ include = [
]

[dependencies]
atomic = "0.5.3"
core_affinity = "0.8.1"
derive_more = { version = "1.0.0", features = ["full"] }
futures = "0.3.29"
Expand Down
9 changes: 4 additions & 5 deletions crates/sc-proof-of-time/src/source/gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use std::collections::{HashMap, VecDeque};
use std::future::poll_fn;
use std::num::NonZeroU32;
use std::pin::pin;
use std::sync::{atomic, Arc};
use std::sync::Arc;
use subspace_core_primitives::{PotCheckpoints, PotSeed};
use tracing::{debug, error, trace, warn};

Expand Down Expand Up @@ -208,7 +208,7 @@ where
}

async fn handle_proof_candidate(&mut self, sender: PeerId, proof: GossipProof) {
let next_slot_input = self.state.next_slot_input(atomic::Ordering::Relaxed);
let next_slot_input = self.state.next_slot_input();

match proof.slot.cmp(&next_slot_input.slot) {
cmp::Ordering::Less => {
Expand Down Expand Up @@ -568,7 +568,7 @@ where

match GossipProof::decode(&mut data) {
Ok(proof) => {
let next_slot_input = self.state.next_slot_input(atomic::Ordering::Relaxed);
let next_slot_input = self.state.next_slot_input();
let current_slot = next_slot_input.slot - Slot::from(1);

if proof.slot < current_slot {
Expand Down Expand Up @@ -643,8 +643,7 @@ where
}

fn message_expired<'a>(&'a self) -> Box<dyn FnMut(Block::Hash, &[u8]) -> bool + 'a> {
let current_slot =
u64::from(self.state.next_slot_input(atomic::Ordering::Relaxed).slot) - 1;
let current_slot = u64::from(self.state.next_slot_input().slot) - 1;
Box::new(move |_topic, mut data| {
if let Ok(proof) = GossipProof::decode(&mut data) {
// Slot is the only meaningful expiration policy here
Expand Down
81 changes: 33 additions & 48 deletions crates/sc-proof-of-time/src/source/state.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
use crate::verifier::PotVerifier;
use atomic::Atomic;
use parking_lot::Mutex;
use sp_consensus_slots::Slot;
use sp_consensus_subspace::{PotNextSlotInput, PotParametersChange};
use std::sync::atomic::Ordering;
use subspace_core_primitives::PotOutput;

#[derive(Debug, Copy, Clone, Eq, PartialEq)]
Expand Down Expand Up @@ -62,7 +61,7 @@ pub(super) enum PotStateUpdateOutcome {

#[derive(Debug)]
pub(super) struct PotState {
inner_state: Atomic<InnerState>,
inner_state: Mutex<InnerState>,
verifier: PotVerifier,
}

Expand All @@ -78,13 +77,13 @@ impl PotState {
};

Self {
inner_state: Atomic::new(inner),
inner_state: Mutex::new(inner),
verifier,
}
}

pub(super) fn next_slot_input(&self, ordering: Ordering) -> PotNextSlotInput {
self.inner_state.load(ordering).next_slot_input
pub(super) fn next_slot_input(&self) -> PotNextSlotInput {
self.inner_state.lock().next_slot_input
}

/// Extend state if it matches provided expected next slot input.
Expand All @@ -93,35 +92,24 @@ impl PotState {
/// `Err(existing_next_slot_input)` in case state was changed in the meantime.
pub(super) fn try_extend(
&self,
expected_previous_next_slot_input: PotNextSlotInput,
expected_existing_next_slot_input: PotNextSlotInput,
best_slot: Slot,
best_output: PotOutput,
maybe_updated_parameters_change: Option<Option<PotParametersChange>>,
) -> Result<PotNextSlotInput, PotNextSlotInput> {
let old_inner_state = self.inner_state.load(Ordering::Acquire);
if expected_previous_next_slot_input != old_inner_state.next_slot_input {
return Err(old_inner_state.next_slot_input);
let mut existing_inner_state = self.inner_state.lock();
if expected_existing_next_slot_input != existing_inner_state.next_slot_input {
return Err(existing_inner_state.next_slot_input);
}

let new_inner_state = old_inner_state.update(
*existing_inner_state = existing_inner_state.update(
best_slot,
best_output,
maybe_updated_parameters_change,
&self.verifier,
);

// Use `compare_exchange` to ensure we only update previously known value and not
// accidentally override something that doesn't match expectations anymore
self.inner_state
.compare_exchange(
old_inner_state,
new_inner_state,
Ordering::AcqRel,
// We don't care about the value read in case of failure
Ordering::Acquire,
)
.map(|_old_inner_state| new_inner_state.next_slot_input)
.map_err(|existing_inner_state| existing_inner_state.next_slot_input)
Ok(existing_inner_state.next_slot_input)
}

/// Update state, overriding PoT chain if it doesn't match provided values.
Expand All @@ -133,33 +121,30 @@ impl PotState {
best_output: PotOutput,
maybe_updated_parameters_change: Option<Option<PotParametersChange>>,
) -> PotStateUpdateOutcome {
let mut best_state = None;
// Use `fetch_update` such that we don't accidentally downgrade best slot to smaller value
let previous_best_state = self
.inner_state
.fetch_update(Ordering::AcqRel, Ordering::Acquire, |inner_state| {
best_state = Some(inner_state.update(
best_slot,
best_output,
maybe_updated_parameters_change,
&self.verifier,
));

best_state
})
.expect("Callback always returns `Some`; qed");
let best_state = best_state.expect("Replaced with `Some` above; qed");

if previous_best_state.next_slot_input == best_state.next_slot_input {
let previous_best_state;
let new_best_state;
{
let mut inner_state = self.inner_state.lock();
previous_best_state = *inner_state;
new_best_state = previous_best_state.update(
best_slot,
best_output,
maybe_updated_parameters_change,
&self.verifier,
);
*inner_state = new_best_state;
}

if previous_best_state.next_slot_input == new_best_state.next_slot_input {
return PotStateUpdateOutcome::NoChange;
}

if previous_best_state.next_slot_input.slot < best_state.next_slot_input.slot {
if previous_best_state.next_slot_input.slot < new_best_state.next_slot_input.slot {
let mut slot_iterations = previous_best_state.next_slot_input.slot_iterations;
let mut seed = previous_best_state.next_slot_input.seed;

for slot in u64::from(previous_best_state.next_slot_input.slot)
..u64::from(best_state.next_slot_input.slot)
..u64::from(new_best_state.next_slot_input.slot)
{
let slot = Slot::from(slot);

Expand All @@ -181,21 +166,21 @@ impl PotState {
slot_iterations = pot_input.slot_iterations;
seed = pot_input.seed;

if next_slot == best_state.next_slot_input.slot
&& slot_iterations == best_state.next_slot_input.slot_iterations
&& seed == best_state.next_slot_input.seed
if next_slot == new_best_state.next_slot_input.slot
&& slot_iterations == new_best_state.next_slot_input.slot_iterations
&& seed == new_best_state.next_slot_input.seed
{
return PotStateUpdateOutcome::Extension {
from: previous_best_state.next_slot_input,
to: best_state.next_slot_input,
to: new_best_state.next_slot_input,
};
}
}
}

PotStateUpdateOutcome::Reorg {
from: previous_best_state.next_slot_input,
to: best_state.next_slot_input,
to: new_best_state.next_slot_input,
}
}
}
3 changes: 1 addition & 2 deletions crates/sc-proof-of-time/src/source/timekeeper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use futures::executor::block_on;
use futures::SinkExt;
use sp_consensus_slots::Slot;
use std::num::NonZeroU32;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use subspace_core_primitives::{PotCheckpoints, PotSeed};
use subspace_proof_of_time::PotError;
Expand All @@ -29,7 +28,7 @@ pub(super) fn run_timekeeper(
pot_verifier: PotVerifier,
mut proofs_sender: mpsc::Sender<TimekeeperProof>,
) -> Result<(), PotError> {
let mut next_slot_input = state.next_slot_input(Ordering::Acquire);
let mut next_slot_input = state.next_slot_input();

loop {
trace!(
Expand Down

0 comments on commit 7eeb365

Please sign in to comment.