Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: stub for instant resharding state root creation #12083

Merged
merged 9 commits into from
Sep 20, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 91 additions & 14 deletions chain/chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,13 +92,15 @@ use near_primitives::views::{
};
use near_store::config::StateSnapshotType;
use near_store::flat::{store_helper, FlatStorageReadyStatus, FlatStorageStatus};
use near_store::get_genesis_state_roots;
use near_store::trie::mem::resharding::RetainMode;
use near_store::DBCol;
use near_store::{get_genesis_state_roots, PartialStorage};
use node_runtime::bootstrap_congestion_info;
use rayon::iter::{IntoParallelIterator, ParallelIterator};
use std::collections::{BTreeMap, HashMap, HashSet};
use std::fmt::{Debug, Formatter};
use std::num::NonZeroUsize;
use std::str::FromStr;
use std::sync::Arc;
use time::ext::InstantExt as _;
use tracing::{debug, debug_span, error, info, warn, Span};
Expand Down Expand Up @@ -1847,6 +1849,77 @@ impl Chain {
});
}

/// If shard layout changes after the given block, creates temporary
/// memtries for new shards to be able to process them in the next epoch.
/// Note this doesn't complete resharding, proper memtries are to be
/// created later.
fn process_instant_resharding_storage_update(
Longarithm marked this conversation as resolved.
Show resolved Hide resolved
&mut self,
block: &Block,
shard_uid: ShardUId,
) -> Result<(), Error> {
let block_hash = block.hash();
let block_height = block.header().height();
let prev_hash = block.header().prev_hash();
if !self.epoch_manager.will_shard_layout_change(prev_hash)? {
return Ok(());
}

let next_epoch_id = self.epoch_manager.get_next_epoch_id_from_prev_block(prev_hash)?;
let next_shard_layout = self.epoch_manager.get_shard_layout(&next_epoch_id)?;
let children_shard_uids =
next_shard_layout.get_children_shards_uids(shard_uid.shard_id()).unwrap();

// Hack to ensure this logic is not applied before ReshardingV3.
// TODO(#12019): proper logic.
if next_shard_layout.version() < 3 || children_shard_uids.len() == 1 {
return Ok(());
}
assert_eq!(children_shard_uids.len(), 2);

let chunk_extra = self.get_chunk_extra(block_hash, &shard_uid)?;
let tries = self.runtime_adapter.get_tries();
let Some(mem_tries) = tries.get_mem_tries(shard_uid) else {
// TODO(#12019): what if node doesn't have memtrie? just pause
// processing?
return Ok(());
Longarithm marked this conversation as resolved.
Show resolved Hide resolved
};
let boundary_account = AccountId::from_str("boundary.near").unwrap();

// TODO(#12019): leave only tracked shards.
for (new_shard_uid, retain_mode) in [
(children_shard_uids[0], RetainMode::Left),
(children_shard_uids[1], RetainMode::Right),
] {
let mut mem_tries = mem_tries.write().unwrap();
let mut mem_trie_update = mem_tries.update(*chunk_extra.state_root(), false)?;

let (trie_changes, partial_state) =
mem_trie_update.cut(boundary_account.clone(), retain_mode);
Longarithm marked this conversation as resolved.
Show resolved Hide resolved
let partial_storage = PartialStorage { nodes: partial_state };
let mem_changes = trie_changes.mem_trie_changes.as_ref().unwrap();
let new_state_root = mem_tries.apply_memtrie_changes(block_height, mem_changes);
let mut new_chunk_extra = ChunkExtra::clone(&chunk_extra);
*new_chunk_extra.state_root_mut() = new_state_root;
Longarithm marked this conversation as resolved.
Show resolved Hide resolved

let mut chain_store_update = ChainStoreUpdate::new(&mut self.chain_store);
chain_store_update.save_chunk_extra(block_hash, &new_shard_uid, new_chunk_extra);
chain_store_update.save_state_transition_data(
*block_hash,
new_shard_uid.shard_id(),
Some(partial_storage),
CryptoHash::default(),
);
chain_store_update.commit()?;

let mut store_update = self.chain_store.store().store_update();
tries.apply_insertions(&trie_changes, new_shard_uid, &mut store_update);
store_update.commit()?;
}

Ok(())
}

#[tracing::instrument(level = "debug", target = "chain", "postprocess_block_only", skip_all)]
fn postprocess_block_only(
&mut self,
Expand All @@ -1867,6 +1940,7 @@ impl Chain {
should_save_state_transition_data,
)?;
chain_update.commit()?;

Ok(new_head)
}

Expand Down Expand Up @@ -1936,30 +2010,33 @@ impl Chain {
true,
);
let care_about_shard_this_or_next_epoch = care_about_shard || will_care_about_shard;
let shard_uid = self.epoch_manager.shard_id_to_uid(shard_id, &epoch_id).unwrap();
if care_about_shard_this_or_next_epoch {
let shard_uid = self.epoch_manager.shard_id_to_uid(shard_id, &epoch_id).unwrap();
shards_cares_this_or_next_epoch.push(shard_uid);
}

// Update flat storage head to be the last final block. Note that this update happens
// in a separate db transaction from the update from block processing. This is intentional
// because flat_storage need to be locked during the update of flat head, otherwise
// flat_storage is in an inconsistent state that could be accessed by the other
// apply chunks processes. This means, the flat head is not always the same as
// the last final block on chain, which is OK, because in the flat storage implementation
// we don't assume that.
let need_flat_storage_update = if is_caught_up {
// If we already caught up this epoch, then flat storage exists for both shards which we already track
let need_storage_update = if is_caught_up {
// If we already caught up this epoch, then storage exists for both shards which we already track
// and shards which will be tracked in next epoch, so we can update them.
care_about_shard_this_or_next_epoch
} else {
// If we didn't catch up, we can update only shards tracked right now. Remaining shards will be updated
// during catchup of this block.
care_about_shard
};
tracing::debug!(target: "chain", shard_id, need_flat_storage_update, "Updating flat storage");

if need_flat_storage_update {
tracing::debug!(target: "chain", shard_id, need_storage_update, "Updating storage");

if need_storage_update {
// TODO(#12019): consider adding to catchup flow.
self.process_instant_resharding_storage_update(&block, shard_uid)?;

// Update flat storage head to be the last final block. Note that this update happens
// in a separate db transaction from the update from block processing. This is intentional
// because flat_storage need to be locked during the update of flat head, otherwise
// flat_storage is in an inconsistent state that could be accessed by the other
// apply chunks processes. This means, the flat head is not always the same as
// the last final block on chain, which is OK, because in the flat storage implementation
// we don't assume that.
self.update_flat_storage_and_memtrie(&block, shard_id)?;
}
}
Expand Down
1 change: 1 addition & 0 deletions core/store/src/trie/mem/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ pub mod mem_tries;
pub mod metrics;
pub mod node;
mod parallel_loader;
pub mod resharding;
pub mod updating;

/// Check this, because in the code we conveniently assume usize is 8 bytes.
Expand Down
Loading
Loading