Skip to content

Commit

Permalink
stub
Browse files Browse the repository at this point in the history
  • Loading branch information
Longarithm committed Sep 17, 2024
1 parent ce8ca01 commit 71b6358
Show file tree
Hide file tree
Showing 5 changed files with 453 additions and 41 deletions.
106 changes: 92 additions & 14 deletions chain/chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,13 +92,16 @@ use near_primitives::views::{
};
use near_store::config::StateSnapshotType;
use near_store::flat::{store_helper, FlatStorageReadyStatus, FlatStorageStatus};
use near_store::get_genesis_state_roots;
use near_store::trie::mem::resharding::RetainMode;
use near_store::trie::mem::updating::apply_memtrie_changes;
use near_store::DBCol;
use near_store::{get_genesis_state_roots, PartialStorage};
use node_runtime::bootstrap_congestion_info;
use rayon::iter::{IntoParallelIterator, ParallelIterator};
use std::collections::{BTreeMap, HashMap, HashSet};
use std::fmt::{Debug, Formatter};
use std::num::NonZeroUsize;
use std::str::FromStr;
use std::sync::Arc;
use time::ext::InstantExt as _;
use tracing::{debug, debug_span, error, info, warn, Span};
Expand Down Expand Up @@ -1847,6 +1850,77 @@ impl Chain {
});
}

/// If shard layout changes after the given block, creates temporary
/// memtries for new shards to be able to process them in the next epoch.
/// Note this doesn't complete resharding, proper memtries are to be
/// created later.
fn process_instant_resharding_storage_update(
&mut self,
block: &Block,
shard_uid: ShardUId,
) -> Result<(), Error> {
let block_hash = block.hash();
let block_height = block.header().height();
let prev_hash = block.header().prev_hash();
if !self.epoch_manager.will_shard_layout_change(prev_hash)? {
return Ok(());
}

let next_epoch_id = self.epoch_manager.get_next_epoch_id_from_prev_block(prev_hash)?;
let next_shard_layout = self.epoch_manager.get_shard_layout(&next_epoch_id)?;
let children_shard_uids =
next_shard_layout.get_children_shards_uids(shard_uid.shard_id()).unwrap();

// Hack to ensure this logic is not applied before ReshardingV3.
// TODO(#12019): proper logic.
if next_shard_layout.version() < 3 || children_shard_uids.len() == 1 {
return Ok(());
}
assert_eq!(children_shard_uids.len(), 2);

let chunk_extra = self.get_chunk_extra(block_hash, &shard_uid)?;
let tries = self.runtime_adapter.get_tries();
let Some(mem_tries) = tries.get_mem_tries(shard_uid) else {
// TODO(#12019): what if node doesn't have memtrie? just pause
// processing?
return Ok(());
};
let boundary_account = AccountId::from_str("boundary.near").unwrap();

// TODO(#12019): leave only tracked shards.
for (new_shard_uid, retain_mode) in [
(children_shard_uids[0], RetainMode::Left),
(children_shard_uids[1], RetainMode::Right),
] {
let mut mem_tries = mem_tries.write().unwrap();
let mut mem_trie_update = mem_tries.update(*chunk_extra.state_root(), false)?;

let (trie_changes, partial_state) =
mem_trie_update.cut(boundary_account.clone(), retain_mode);
let partial_storage = PartialStorage { nodes: partial_state };
let mem_changes = trie_changes.mem_trie_changes.as_ref().unwrap();
let new_state_root = apply_memtrie_changes(&mut mem_tries, &mem_changes, block_height);
let mut new_chunk_extra = ChunkExtra::clone(&chunk_extra);
*new_chunk_extra.state_root_mut() = new_state_root;

let mut chain_store_update = ChainStoreUpdate::new(&mut self.chain_store);
chain_store_update.save_chunk_extra(block_hash, &new_shard_uid, new_chunk_extra);
chain_store_update.save_state_transition_data(
*block_hash,
new_shard_uid.shard_id(),
Some(partial_storage),
CryptoHash::default(),
);
chain_store_update.commit()?;

let mut store_update = self.chain_store.store().store_update();
tries.apply_insertions(&trie_changes, new_shard_uid, &mut store_update);
store_update.commit()?;
}

Ok(())
}

#[tracing::instrument(level = "debug", target = "chain", "postprocess_block_only", skip_all)]
fn postprocess_block_only(
&mut self,
Expand All @@ -1867,6 +1941,7 @@ impl Chain {
should_save_state_transition_data,
)?;
chain_update.commit()?;

Ok(new_head)
}

Expand Down Expand Up @@ -1936,30 +2011,33 @@ impl Chain {
true,
);
let care_about_shard_this_or_next_epoch = care_about_shard || will_care_about_shard;
let shard_uid = self.epoch_manager.shard_id_to_uid(shard_id, &epoch_id).unwrap();
if care_about_shard_this_or_next_epoch {
let shard_uid = self.epoch_manager.shard_id_to_uid(shard_id, &epoch_id).unwrap();
shards_cares_this_or_next_epoch.push(shard_uid);
}

// Update flat storage head to be the last final block. Note that this update happens
// in a separate db transaction from the update from block processing. This is intentional
// because flat_storage need to be locked during the update of flat head, otherwise
// flat_storage is in an inconsistent state that could be accessed by the other
// apply chunks processes. This means, the flat head is not always the same as
// the last final block on chain, which is OK, because in the flat storage implementation
// we don't assume that.
let need_flat_storage_update = if is_caught_up {
// If we already caught up this epoch, then flat storage exists for both shards which we already track
let need_storage_update = if is_caught_up {
// If we already caught up this epoch, then storage exists for both shards which we already track
// and shards which will be tracked in next epoch, so we can update them.
care_about_shard_this_or_next_epoch
} else {
// If we didn't catch up, we can update only shards tracked right now. Remaining shards will be updated
// during catchup of this block.
care_about_shard
};
tracing::debug!(target: "chain", shard_id, need_flat_storage_update, "Updating flat storage");

if need_flat_storage_update {
tracing::debug!(target: "chain", shard_id, need_storage_update, "Updating storage");

if need_storage_update {
// TODO(#12019): consider adding to catchup flow.
self.process_instant_resharding_storage_update(&block, shard_uid)?;

// Update flat storage head to be the last final block. Note that this update happens
// in a separate db transaction from the update from block processing. This is intentional
// because flat_storage need to be locked during the update of flat head, otherwise
// flat_storage is in an inconsistent state that could be accessed by the other
// apply chunks processes. This means, the flat head is not always the same as
// the last final block on chain, which is OK, because in the flat storage implementation
// we don't assume that.
self.update_flat_storage_and_memtrie(&block, shard_id)?;
}
}
Expand Down
1 change: 1 addition & 0 deletions core/store/src/trie/mem/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ pub mod lookup;
pub mod metrics;
pub mod node;
mod parallel_loader;
pub mod resharding;
pub mod updating;

/// Check this, because in the code we conveniently assume usize is 8 bytes.
Expand Down
Loading

0 comments on commit 71b6358

Please sign in to comment.