Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update MMR sync. #3104

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crates/subspace-service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -893,7 +893,7 @@ where
if let Some(offchain_storage) = backend.offchain_storage() {
// Allow both outgoing and incoming requests.
let (handler, protocol_config) =
MmrRequestHandler::new::<NetworkWorker<Block, <Block as BlockT>::Hash>, _>(
MmrRequestHandler::new::<NetworkWorker<Block, <Block as BlockT>::Hash>>(
&config.base.protocol_id(),
fork_id.as_deref(),
client.clone(),
Expand Down
6 changes: 6 additions & 0 deletions crates/subspace-service/src/mmr.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
use sp_core::H256;
use sp_mmr_primitives::utils::NodesUtils;
use sp_mmr_primitives::{NodeIndex, INDEXING_PREFIX};
use subspace_runtime_primitives::opaque::Header;

pub(crate) mod request_handler;
pub(crate) mod sync;

pub(crate) fn get_offchain_key(index: NodeIndex) -> Vec<u8> {
NodesUtils::node_canon_offchain_key(INDEXING_PREFIX, index)
}

pub(crate) fn get_temp_key(index: NodeIndex, hash: H256) -> Vec<u8> {
NodesUtils::node_temp_offchain_key::<Header>(INDEXING_PREFIX, index, hash)
}
51 changes: 39 additions & 12 deletions crates/subspace-service/src/mmr/request_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.

use crate::mmr::get_offchain_key;
use crate::mmr::sync::decode_mmr_data;
use crate::mmr::{get_offchain_key, get_temp_key};
use futures::channel::oneshot;
use futures::stream::StreamExt;
use parity_scale_codec::{Decode, Encode};
Expand All @@ -23,8 +24,10 @@ use sc_network::config::ProtocolId;
use sc_network::request_responses::{IncomingRequest, OutgoingResponse};
use sc_network::{NetworkBackend, PeerId};
use schnellru::{ByLength, LruMap};
use sp_blockchain::HeaderBackend;
use sp_core::offchain::storage::OffchainDb;
use sp_core::offchain::{DbExternalities, OffchainStorage, StorageKind};
use sp_mmr_primitives::utils::NodesUtils;
use sp_runtime::codec;
use sp_runtime::traits::Block as BlockT;
use std::collections::BTreeMap;
Expand Down Expand Up @@ -115,7 +118,10 @@ enum SeenRequestsValue {
}

/// Handler for incoming block requests from a remote peer.
pub struct MmrRequestHandler<Block: BlockT, OS> {
pub struct MmrRequestHandler<Block, OS, Client>
where
Block: BlockT,
{
request_receiver: async_channel::Receiver<IncomingRequest>,
/// Maps from request to number of times we have seen this request.
///
Expand All @@ -124,17 +130,20 @@ pub struct MmrRequestHandler<Block: BlockT, OS> {

offchain_db: OffchainDb<OS>,

client: Arc<Client>,

_phantom: PhantomData<Block>,
}

impl<Block, OS> MmrRequestHandler<Block, OS>
impl<Block, OS, Client> MmrRequestHandler<Block, OS, Client>
where
Block: BlockT,

Block: BlockT<Hash = sp_core::H256>,
Client:
HeaderBackend<Block> + BlockBackend<Block> + ProofProvider<Block> + Send + Sync + 'static,
OS: OffchainStorage,
{
/// Create a new [`MmrRequestHandler`].
pub fn new<NB, Client>(
pub fn new<NB>(
protocol_id: &ProtocolId,
fork_id: Option<&str>,
client: Arc<Client>,
Expand All @@ -143,7 +152,6 @@ where
) -> (Self, NB::RequestResponseProtocolConfig)
where
NB: NetworkBackend<Block, <Block as BlockT>::Hash>,
Client: BlockBackend<Block> + ProofProvider<Block> + Send + Sync + 'static,
{
// Reserve enough request slots for one request per peer when we are at the maximum
// number of peers.
Expand All @@ -166,6 +174,7 @@ where

(
Self {
client,
request_receiver,
seen_requests,
offchain_db: OffchainDb::new(offchain_storage),
Expand Down Expand Up @@ -232,17 +241,35 @@ where
Err(())
} else {
let mut mmr_data = BTreeMap::new();
for block_number in
request.starting_position..(request.starting_position + request.limit)
{
let canon_key = get_offchain_key(block_number.into());
for position in request.starting_position..(request.starting_position + request.limit) {
let canon_key = get_offchain_key(position.into());
let storage_value = self
.offchain_db
.local_storage_get(StorageKind::PERSISTENT, &canon_key);

let block_number = NodesUtils::leaf_index_that_added_node(position.into());
trace!(%position, %block_number, "Storage data present: {}", storage_value.is_some());

if let Some(storage_value) = storage_value {
mmr_data.insert(block_number, storage_value);
mmr_data.insert(position, storage_value);
} else {
if let Ok(Some(hash)) = self.client.hash((block_number as u32).into()) {
let temp_key = get_temp_key(position.into(), hash);
let storage_value = self
.offchain_db
.local_storage_get(StorageKind::PERSISTENT, &temp_key);

if let Some(storage_value) = storage_value {
let data = decode_mmr_data(&storage_value);
trace!(%position, %block_number,"MMR node: {data:?}");
mmr_data.insert(position, storage_value);
continue;
} else {
debug!(%position, %block_number, ?hash, "Didn't find value in storage.")
}
} else {
debug!(%position, %block_number, "Didn't find hash.")
}
break; // No more storage values
}
}
Expand Down
49 changes: 24 additions & 25 deletions crates/subspace-service/src/mmr/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ type MmrLeafOf = MmrLeaf<BlockNumber, BlockHash>;
type NodeOf = Node<Keccak256, MmrLeafOf>;
type MmrOf<OS> = mmr_lib::MMR<NodeOf, MmrHasher, OffchainMmrStorage<OS>>;

fn decode_mmr_data(mut data: &[u8]) -> mmr_lib::Result<NodeOf> {
pub(crate) fn decode_mmr_data(mut data: &[u8]) -> mmr_lib::Result<NodeOf> {
let node = match NodeOf::decode(&mut data) {
Ok(node) => node,
Err(err) => {
Expand Down Expand Up @@ -203,35 +203,34 @@ where

// Save the MMR-nodes from response to the local storage
'data: for (position, data) in response.mmr_data.iter() {
// Ensure continuous sync
if *position == starting_position {
let node = decode_mmr_data(data);
let node = decode_mmr_data(data);

let node = match node {
Ok(node) => node,
Err(err) => {
debug!(?peer_info, ?err, %position, "Can't decode MMR data received from the peer.");
let node = match node {
Ok(node) => node,
Err(err) => {
debug!(?peer_info, ?err, %position, "Can't decode MMR data received from the peer.");

continue 'peers;
}
};

if matches!(node, Node::Data(_)) {
if let Err(err) = mmr.push(node) {
debug!(?peer_info, ?err, %position, "Can't add MMR data received from the peer.");
continue 'peers;
}
};

return Err(sp_blockchain::Error::Backend(
"Can't add MMR data to the MMR storage".to_string(),
));
}
if matches!(node, Node::Data(_)) {
if let Err(err) = mmr.push(node) {
debug!(?peer_info, ?err, %position, "Can't add MMR data received from the peer.");

leaves_number += 1;
return Err(sp_blockchain::Error::Backend(
"Can't add MMR data to the MMR storage".to_string(),
));
}

starting_position += 1;
} else {
debug!("MMR-sync gap detected={peer_id}, position={position}",);
break 'data; // We don't support gaps in MMR data
leaves_number += 1;
}

starting_position += 1;

if u64::from(*position) >= target_position {
debug!(%target_position, "MMR-sync: target position reached.");
break 'data;
}
}
}
Expand All @@ -249,7 +248,7 @@ where

if !verify_mmr_data(client, &mmr, leaves_number) {
return Err(sp_blockchain::Error::Application(
"Can't get starting MMR position - data verification failed.".into(),
"MMR data verification failed.".into(),
));
}

Expand Down
1 change: 0 additions & 1 deletion domains/client/domain-operator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ futures-timer = "3.0.3"
parking_lot = "0.12.2"
sc-client-api = { git = "https://github.com/subspace/polkadot-sdk", rev = "5626154d0781ac9a6ffd5a6207ed237f425ae631" }
sc-consensus = { git = "https://github.com/subspace/polkadot-sdk", rev = "5626154d0781ac9a6ffd5a6207ed237f425ae631" }
sc-consensus-subspace = { version = "0.1.0", default-features = false, path = "../../../crates/sc-consensus-subspace" }
sc-domains = { version = "0.1.0", path = "../../../crates/sc-domains" }
sc-network = { git = "https://github.com/subspace/polkadot-sdk", default-features = false, rev = "5626154d0781ac9a6ffd5a6207ed237f425ae631" }
sc-network-common = { git = "https://github.com/subspace/polkadot-sdk", default-features = false, rev = "5626154d0781ac9a6ffd5a6207ed237f425ae631" }
Expand Down
23 changes: 1 addition & 22 deletions domains/client/domain-operator/src/snap_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use sc_client_api::{AuxStore, Backend, ProofProvider};
use sc_consensus::{
BlockImport, BlockImportParams, ForkChoiceStrategy, ImportedState, StateAction, StorageChanges,
};
use sc_consensus_subspace::archiver::FINALIZATION_DEPTH_IN_SEGMENTS;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please make this constant non-public again

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. I'd consider moving the constant next time we make it public.

use sc_network::{NetworkRequest, PeerId};
use sc_network_common::sync::message::{
BlockAttributes, BlockData, BlockRequest, Direction, FromBlock,
Expand Down Expand Up @@ -346,27 +345,7 @@ where
.backend
.offchain_storage()
{
// let target_block = sync_params
// .consensus_chain_sync_params
// .segment_headers_store
// .last_segment_header()
// .map(|header| header.last_archived_block().number);

let target_block = sync_params
.consensus_chain_sync_params
.segment_headers_store
.max_segment_index()
// Skip last `FINALIZATION_DEPTH_IN_SEGMENTS` archived segments
.and_then(|max_segment_index| {
max_segment_index.checked_sub(FINALIZATION_DEPTH_IN_SEGMENTS)
})
.and_then(|segment_index| {
sync_params
.consensus_chain_sync_params
.segment_headers_store
.get_segment_header(segment_index)
})
.map(|segment_header| segment_header.last_archived_block().number);
let target_block = Some(consensus_block_number);

mmr_sync(
sync_params.consensus_chain_sync_params.fork_id,
Expand Down
Loading