Skip to content

Commit

Permalink
Change MMR store to offchain storage.
Browse files Browse the repository at this point in the history
  • Loading branch information
shamil-gadelshin committed Sep 20, 2024
1 parent d6b1de8 commit a31f02d
Showing 1 changed file with 104 additions and 78 deletions.
182 changes: 104 additions & 78 deletions crates/subspace-service/src/mmr/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
use crate::mmr::get_offchain_key;
use crate::mmr::request_handler::{generate_protocol_name, MmrRequest, MmrResponse, MAX_MMR_ITEMS};
use futures::channel::oneshot;
use mmr_lib::util::MemStore;
use parity_scale_codec::{Decode, Encode};
use sc_network::{IfDisconnected, NetworkRequest, PeerId, RequestFailure};
use sc_network_sync::SyncingService;
Expand All @@ -12,10 +11,12 @@ use sp_blockchain::HeaderBackend;
use sp_core::offchain::storage::OffchainDb;
use sp_core::offchain::{DbExternalities, OffchainStorage, StorageKind};
use sp_core::{Hasher, H256};
use sp_mmr_primitives::mmr_lib::{MMRStoreReadOps, MMRStoreWriteOps};
use sp_mmr_primitives::utils::NodesUtils;
use sp_mmr_primitives::{mmr_lib, DataOrHash, MmrApi};
use sp_runtime::traits::{Block as BlockT, Keccak256, NumberFor};
use sp_subspace_mmr::MmrLeaf;
use std::cell::RefCell;
use std::sync::Arc;
use std::time::Duration;
use subspace_core_primitives::{BlockHash, BlockNumber};
Expand All @@ -25,8 +26,75 @@ use tracing::{debug, error, trace};
type Node<H, L> = DataOrHash<H, L>;
type MmrLeafOf = MmrLeaf<BlockNumber, BlockHash>;
type NodeOf = Node<Keccak256, MmrLeafOf>;
type MemStoreOf = MemStore<NodeOf>;
type MmrRef<'a> = mmr_lib::MMR<NodeOf, MmrHasher, &'a MemStoreOf>;
type MmrOf<OS> = mmr_lib::MMR<NodeOf, MmrHasher, OffchainMmrStorage<OS>>;

fn decode_mmr_data(mut data: &[u8]) -> mmr_lib::Result<NodeOf> {
let node = match NodeOf::decode(&mut data) {
Ok(node) => node,
Err(err) => {
error!(?err, "Can't decode MMR data");

return Err(mmr_lib::Error::StoreError(
"Can't decode MMR data".to_string(),
));
}
};

Ok(node)
}

struct OffchainMmrStorage<OS: OffchainStorage> {
offchain_db: RefCell<OffchainDb<OS>>,
}

impl<OS: OffchainStorage> OffchainMmrStorage<OS> {
fn new(offchain_storage: OS) -> Self {
let offchain_db = OffchainDb::new(offchain_storage);

Self {
offchain_db: RefCell::new(offchain_db),
}
}
}

impl<OS: OffchainStorage> MMRStoreReadOps<NodeOf> for OffchainMmrStorage<OS> {
fn get_elem(&self, pos: u64) -> mmr_lib::Result<Option<NodeOf>> {
let canon_key = get_offchain_key(pos);
let Some(data) = self
.offchain_db
.borrow_mut()
.local_storage_get(StorageKind::PERSISTENT, &canon_key)
else {
error!(%pos, "Can't get MMR data.");

return Ok(None);
};

let node = decode_mmr_data(data.as_slice());

node.map(Some)
}
}

impl<OS: OffchainStorage> MMRStoreWriteOps<NodeOf> for OffchainMmrStorage<OS> {
fn append(&mut self, pos: u64, elems: Vec<NodeOf>) -> mmr_lib::Result<()> {
let mut current_pos = pos;
for elem in elems {
let data = elem.encode();

let canon_key = get_offchain_key(current_pos);
self.offchain_db.borrow_mut().local_storage_set(
StorageKind::PERSISTENT,
&canon_key,
&data,
);

current_pos += 1;
}

Ok(())
}
}

/// Default Merging & Hashing behavior for MMR.
pub struct MmrHasher;
Expand Down Expand Up @@ -64,36 +132,9 @@ where
let info = client.info();
let protocol_name = generate_protocol_name(info.genesis_hash, fork_id.as_deref());

let mut offchain_db = OffchainDb::new(offchain_storage.clone());

// Look for existing local MMR-nodes
let mut starting_position = {
let mut starting_position: Option<u32> = None;
for position in 0..=u32::MAX {
let canon_key = get_offchain_key(position.into());
if offchain_db
.local_storage_get(StorageKind::PERSISTENT, &canon_key)
.is_none()
{
starting_position = Some(position);
break;
}
}

match starting_position {
None => {
error!("Can't get starting MMR position - MMR storage is corrupted.");
return Err(sp_blockchain::Error::Application(
"Can't get starting MMR position - MMR storage is corrupted.".into(),
));
}
Some(last_processed_position) => {
debug!("MMR-sync last processed position: {last_processed_position}");

last_processed_position
}
}
};
let mut mmr = MmrOf::new(0, OffchainMmrStorage::new(offchain_storage));
let mut leaves_number = 0u32;
let mut starting_position = 0;

'outer: loop {
let peers_info = match sync_service.peers_info().await {
Expand Down Expand Up @@ -164,12 +205,28 @@ where
'data: for (position, data) in response.mmr_data.iter() {
// Ensure continuous sync
if *position == starting_position {
let canon_key = get_offchain_key((*position).into());
offchain_db.local_storage_set(
StorageKind::PERSISTENT,
&canon_key,
data,
);
let node = decode_mmr_data(data);

let node = match node {
Ok(node) => node,
Err(err) => {
debug!(?peer_info, ?err, %position, "Can't decode MMR data received from the peer.");

continue 'peers;
}
};

if matches!(node, Node::Data(_)) {
if let Err(err) = mmr.push(node) {
debug!(?peer_info, ?err, %position, "Can't add MMR data received from the peer.");

return Err(sp_blockchain::Error::Backend(
"Can't add MMR data to the MMR storage".to_string(),
));
}

leaves_number += 1;
}

starting_position += 1;
} else {
Expand All @@ -190,7 +247,7 @@ where
if target_position <= starting_position.into() {
debug!("Target position reached: {target_position}");

if !verify_mmr_data(client, offchain_storage, target_position) {
if !verify_mmr_data(client, &mmr, leaves_number) {
return Err(sp_blockchain::Error::Application(
"Can't get starting MMR position - data verification failed.".into(),
));
Expand All @@ -209,53 +266,22 @@ where
Ok(())
}

pub(crate) fn verify_mmr_data<Block, OS, Client>(
fn verify_mmr_data<Block, OS, Client>(
client: Arc<Client>,
offchain_storage: OS,
target_position: u64,
mmr: &MmrOf<OS>,
leaves_number: u32,
) -> bool
where
Block: BlockT,
OS: OffchainStorage,
Client: HeaderBackend<Block> + ProvideRuntimeApi<Block>,
Client::Api: MmrApi<Block, H256, NumberFor<Block>>,
{
let store = MemStoreOf::default();
let mut mmr = MmrRef::new(0, &store);
let mut leaves_num = 0u32;

debug!("Verifying MMR data...");

let mut offchain_db = OffchainDb::new(offchain_storage);

for position in 0..=target_position {
let canon_key = get_offchain_key(position);
let Some(data) = offchain_db.local_storage_get(StorageKind::PERSISTENT, &canon_key) else {
error!(%target_position, %position, "Can't get MMR data.");

return false;
};

let node = match NodeOf::decode(&mut data.as_slice()) {
Ok(node) => node,
Err(err) => {
error!(%position, ?err, "MMR data verification: error during leaf acquiring");
return false;
}
};

if matches!(node, NodeOf::Data(_),) {
if let Err(err) = mmr.push(node) {
error!(%position, ?err, "MMR data verification: error during adding the node.");
return false;
}
leaves_num += 1;
}
}

let block_number = leaves_num;
let block_number = leaves_number;
let Ok(Some(hash)) = client.hash(block_number.into()) else {
error!(%target_position, %block_number, "MMR data verification: error during hash acquisition");
error!(%leaves_number, %block_number, "MMR data verification: error during hash acquisition");
return false;
};

Expand All @@ -265,12 +291,12 @@ where
trace!("API root: {:?}", api_root);

let Ok(Node::Hash(mmr_root_hash)) = mmr_root.clone() else {
error!(%target_position, %block_number, ?mmr_root, "Can't get MMR root from local storage.");
error!(%leaves_number, %block_number, ?mmr_root, "Can't get MMR root from local storage.");
return false;
};

let Ok(Ok(api_root_hash)) = api_root else {
error!(%target_position, %block_number, ?mmr_root, "Can't get MMR root from API.");
error!(%leaves_number, %block_number, ?mmr_root, "Can't get MMR root from API.");
return false;
};

Expand Down

0 comments on commit a31f02d

Please sign in to comment.