Skip to content

Commit

Permalink
Update MMR-sync
Browse files Browse the repository at this point in the history
# Conflicts:
#	domains/client/domain-operator/src/snap_sync.rs
  • Loading branch information
shamil-gadelshin committed Oct 8, 2024
1 parent a31f02d commit 91ebc88
Show file tree
Hide file tree
Showing 7 changed files with 72 additions and 62 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crates/subspace-service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -893,7 +893,7 @@ where
if let Some(offchain_storage) = backend.offchain_storage() {
// Allow both outgoing and incoming requests.
let (handler, protocol_config) =
MmrRequestHandler::new::<NetworkWorker<Block, <Block as BlockT>::Hash>, _>(
MmrRequestHandler::new::<NetworkWorker<Block, <Block as BlockT>::Hash>>(
&config.base.protocol_id(),
fork_id.as_deref(),
client.clone(),
Expand Down
6 changes: 6 additions & 0 deletions crates/subspace-service/src/mmr.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
use sp_core::H256;
use sp_mmr_primitives::utils::NodesUtils;
use sp_mmr_primitives::{NodeIndex, INDEXING_PREFIX};
use subspace_runtime_primitives::opaque::Header;

pub(crate) mod request_handler;
pub(crate) mod sync;

pub(crate) fn get_offchain_key(index: NodeIndex) -> Vec<u8> {
NodesUtils::node_canon_offchain_key(INDEXING_PREFIX, index)
}

pub(crate) fn get_temp_key(index: NodeIndex, hash: H256) -> Vec<u8> {
NodesUtils::node_temp_offchain_key::<Header>(INDEXING_PREFIX, index, hash)
}
51 changes: 39 additions & 12 deletions crates/subspace-service/src/mmr/request_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.

use crate::mmr::get_offchain_key;
use crate::mmr::sync::decode_mmr_data;
use crate::mmr::{get_offchain_key, get_temp_key};
use futures::channel::oneshot;
use futures::stream::StreamExt;
use parity_scale_codec::{Decode, Encode};
Expand All @@ -23,8 +24,10 @@ use sc_network::config::ProtocolId;
use sc_network::request_responses::{IncomingRequest, OutgoingResponse};
use sc_network::{NetworkBackend, PeerId};
use schnellru::{ByLength, LruMap};
use sp_blockchain::HeaderBackend;
use sp_core::offchain::storage::OffchainDb;
use sp_core::offchain::{DbExternalities, OffchainStorage, StorageKind};
use sp_mmr_primitives::utils::NodesUtils;
use sp_runtime::codec;
use sp_runtime::traits::Block as BlockT;
use std::collections::BTreeMap;
Expand Down Expand Up @@ -115,7 +118,10 @@ enum SeenRequestsValue {
}

/// Handler for incoming block requests from a remote peer.
pub struct MmrRequestHandler<Block: BlockT, OS> {
pub struct MmrRequestHandler<Block, OS, Client>
where
Block: BlockT,
{
request_receiver: async_channel::Receiver<IncomingRequest>,
/// Maps from request to number of times we have seen this request.
///
Expand All @@ -124,17 +130,20 @@ pub struct MmrRequestHandler<Block: BlockT, OS> {

offchain_db: OffchainDb<OS>,

client: Arc<Client>,

_phantom: PhantomData<Block>,
}

impl<Block, OS> MmrRequestHandler<Block, OS>
impl<Block, OS, Client> MmrRequestHandler<Block, OS, Client>
where
Block: BlockT,

Block: BlockT<Hash = sp_core::H256>,
Client:
HeaderBackend<Block> + BlockBackend<Block> + ProofProvider<Block> + Send + Sync + 'static,
OS: OffchainStorage,
{
/// Create a new [`MmrRequestHandler`].
pub fn new<NB, Client>(
pub fn new<NB>(
protocol_id: &ProtocolId,
fork_id: Option<&str>,
client: Arc<Client>,
Expand All @@ -143,7 +152,6 @@ where
) -> (Self, NB::RequestResponseProtocolConfig)
where
NB: NetworkBackend<Block, <Block as BlockT>::Hash>,
Client: BlockBackend<Block> + ProofProvider<Block> + Send + Sync + 'static,
{
// Reserve enough request slots for one request per peer when we are at the maximum
// number of peers.
Expand All @@ -166,6 +174,7 @@ where

(
Self {
client,
request_receiver,
seen_requests,
offchain_db: OffchainDb::new(offchain_storage),
Expand Down Expand Up @@ -232,17 +241,35 @@ where
Err(())
} else {
let mut mmr_data = BTreeMap::new();
for block_number in
request.starting_position..(request.starting_position + request.limit)
{
let canon_key = get_offchain_key(block_number.into());
for position in request.starting_position..(request.starting_position + request.limit) {
let canon_key = get_offchain_key(position.into());
let storage_value = self
.offchain_db
.local_storage_get(StorageKind::PERSISTENT, &canon_key);

let block_number = NodesUtils::leaf_index_that_added_node(position.into());
trace!(%position, %block_number, "Storage data present: {}", storage_value.is_some());

if let Some(storage_value) = storage_value {
mmr_data.insert(block_number, storage_value);
mmr_data.insert(position, storage_value);
} else {
if let Ok(Some(hash)) = self.client.hash((block_number as u32).into()) {
let temp_key = get_temp_key(position.into(), hash);
let storage_value = self
.offchain_db
.local_storage_get(StorageKind::PERSISTENT, &temp_key);

if let Some(storage_value) = storage_value {
let data = decode_mmr_data(&storage_value);
trace!(%position, %block_number,"MMR node: {data:?}");
mmr_data.insert(position, storage_value);
continue;
} else {
debug!(%position, %block_number, ?hash, "Didn't find value in storage.")
}
} else {
debug!(%position, %block_number, "Didn't find hash.")
}
break; // No more storage values
}
}
Expand Down
49 changes: 24 additions & 25 deletions crates/subspace-service/src/mmr/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ type MmrLeafOf = MmrLeaf<BlockNumber, BlockHash>;
type NodeOf = Node<Keccak256, MmrLeafOf>;
type MmrOf<OS> = mmr_lib::MMR<NodeOf, MmrHasher, OffchainMmrStorage<OS>>;

fn decode_mmr_data(mut data: &[u8]) -> mmr_lib::Result<NodeOf> {
pub(crate) fn decode_mmr_data(mut data: &[u8]) -> mmr_lib::Result<NodeOf> {
let node = match NodeOf::decode(&mut data) {
Ok(node) => node,
Err(err) => {
Expand Down Expand Up @@ -203,35 +203,34 @@ where

// Save the MMR-nodes from response to the local storage
'data: for (position, data) in response.mmr_data.iter() {
// Ensure continuous sync
if *position == starting_position {
let node = decode_mmr_data(data);
let node = decode_mmr_data(data);

let node = match node {
Ok(node) => node,
Err(err) => {
debug!(?peer_info, ?err, %position, "Can't decode MMR data received from the peer.");
let node = match node {
Ok(node) => node,
Err(err) => {
debug!(?peer_info, ?err, %position, "Can't decode MMR data received from the peer.");

continue 'peers;
}
};

if matches!(node, Node::Data(_)) {
if let Err(err) = mmr.push(node) {
debug!(?peer_info, ?err, %position, "Can't add MMR data received from the peer.");
continue 'peers;
}
};

return Err(sp_blockchain::Error::Backend(
"Can't add MMR data to the MMR storage".to_string(),
));
}
if matches!(node, Node::Data(_)) {
if let Err(err) = mmr.push(node) {
debug!(?peer_info, ?err, %position, "Can't add MMR data received from the peer.");

leaves_number += 1;
return Err(sp_blockchain::Error::Backend(
"Can't add MMR data to the MMR storage".to_string(),
));
}

starting_position += 1;
} else {
debug!("MMR-sync gap detected={peer_id}, position={position}",);
break 'data; // We don't support gaps in MMR data
leaves_number += 1;
}

starting_position += 1;

if u64::from(*position) >= target_position {
debug!(%target_position, "MMR-sync: target position reached.");
break 'data;
}
}
}
Expand All @@ -249,7 +248,7 @@ where

if !verify_mmr_data(client, &mmr, leaves_number) {
return Err(sp_blockchain::Error::Application(
"Can't get starting MMR position - data verification failed.".into(),
"MMR data verification failed.".into(),
));
}

Expand Down
1 change: 0 additions & 1 deletion domains/client/domain-operator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ futures-timer = "3.0.3"
parking_lot = "0.12.2"
sc-client-api = { git = "https://github.com/subspace/polkadot-sdk", rev = "5626154d0781ac9a6ffd5a6207ed237f425ae631" }
sc-consensus = { git = "https://github.com/subspace/polkadot-sdk", rev = "5626154d0781ac9a6ffd5a6207ed237f425ae631" }
sc-consensus-subspace = { version = "0.1.0", default-features = false, path = "../../../crates/sc-consensus-subspace" }
sc-domains = { version = "0.1.0", path = "../../../crates/sc-domains" }
sc-network = { git = "https://github.com/subspace/polkadot-sdk", default-features = false, rev = "5626154d0781ac9a6ffd5a6207ed237f425ae631" }
sc-network-common = { git = "https://github.com/subspace/polkadot-sdk", default-features = false, rev = "5626154d0781ac9a6ffd5a6207ed237f425ae631" }
Expand Down
24 changes: 2 additions & 22 deletions domains/client/domain-operator/src/snap_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use sc_client_api::{AuxStore, Backend, ProofProvider};
use sc_consensus::{
BlockImport, BlockImportParams, ForkChoiceStrategy, ImportedState, StateAction, StorageChanges,
};
use sc_consensus_subspace::archiver::FINALIZATION_DEPTH_IN_SEGMENTS;
use sc_network::{NetworkRequest, PeerId};
use sc_network_common::sync::message::{
BlockAttributes, BlockData, BlockRequest, Direction, FromBlock,
Expand Down Expand Up @@ -346,27 +345,8 @@ where
.backend
.offchain_storage()
{
// let target_block = sync_params
// .consensus_chain_sync_params
// .segment_headers_store
// .last_segment_header()
// .map(|header| header.last_archived_block().number);

let target_block = sync_params
.consensus_chain_sync_params
.segment_headers_store
.max_segment_index()
// Skip last `FINALIZATION_DEPTH_IN_SEGMENTS` archived segments
.and_then(|max_segment_index| {
max_segment_index.checked_sub(FINALIZATION_DEPTH_IN_SEGMENTS)
})
.and_then(|segment_index| {
sync_params
.consensus_chain_sync_params
.segment_headers_store
.get_segment_header(segment_index)
})
.map(|segment_header| segment_header.last_archived_block().number);
let target_block = Some(consensus_block_number);


mmr_sync(
sync_params.consensus_chain_sync_params.fork_id,
Expand Down

0 comments on commit 91ebc88

Please sign in to comment.