Skip to content

Commit

Permalink
Merge branch 'master' into 20241001_loaderv2_tests
Browse files Browse the repository at this point in the history
  • Loading branch information
2501babe authored Oct 2, 2024
2 parents 124ebfa + 4a1b75f commit e545d22
Show file tree
Hide file tree
Showing 50 changed files with 1,672 additions and 951 deletions.
23 changes: 11 additions & 12 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ assert_cmd = "2.0"
assert_matches = "1.5.0"
async-channel = "1.9.0"
async-lock = "3.4.0"
async-trait = "0.1.82"
async-trait = "0.1.83"
atty = "0.2.11"
backoff = "0.4.0"
base64 = "0.22.1"
Expand Down Expand Up @@ -297,7 +297,7 @@ libsecp256k1 = { version = "0.6.0", default-features = false, features = [
light-poseidon = "0.2.0"
log = "0.4.22"
lru = "0.7.7"
lz4 = "1.27.0"
lz4 = "1.28.0"
memmap2 = "0.5.10"
memoffset = "0.9"
merlin = "3"
Expand Down Expand Up @@ -501,7 +501,7 @@ syn = "2.0"
sys-info = "0.9.1"
sysctl = "0.4.6"
systemstat = "0.2.3"
tar = "0.4.41"
tar = "0.4.42"
tarpc = "0.29.0"
tempfile = "3.12.0"
test-case = "3.3.1"
Expand Down
122 changes: 36 additions & 86 deletions accounts-db/src/accounts_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1082,14 +1082,6 @@ pub struct AccountStorageEntry {
/// the append_vec, once maxed out, then emptied, can be reclaimed
count_and_status: SeqLock<(usize, AccountStorageStatus)>,

/// This is the total number of accounts stored ever since initialized to keep
/// track of lifetime count of all store operations. And this differs from
/// count_and_status in that this field won't be decremented.
///
/// This is used as a rough estimate for slot shrinking. As such a relaxed
/// use case, this value ARE NOT strictly synchronized with count_and_status!
approx_store_count: AtomicUsize,

alive_bytes: AtomicUsize,
}

Expand All @@ -1110,7 +1102,6 @@ impl AccountStorageEntry {
slot,
accounts,
count_and_status: SeqLock::new((0, AccountStorageStatus::Available)),
approx_store_count: AtomicUsize::new(0),
alive_bytes: AtomicUsize::new(0),
}
}
Expand All @@ -1127,7 +1118,6 @@ impl AccountStorageEntry {
id: self.id,
slot: self.slot,
count_and_status: SeqLock::new(*count_and_status),
approx_store_count: AtomicUsize::new(self.approx_stored_count()),
alive_bytes: AtomicUsize::new(self.alive_bytes()),
accounts,
})
Expand All @@ -1137,14 +1127,13 @@ impl AccountStorageEntry {
slot: Slot,
id: AccountsFileId,
accounts: AccountsFile,
num_accounts: usize,
_num_accounts: usize,
) -> Self {
Self {
id,
slot,
accounts,
count_and_status: SeqLock::new((0, AccountStorageStatus::Available)),
approx_store_count: AtomicUsize::new(num_accounts),
alive_bytes: AtomicUsize::new(0),
}
}
Expand Down Expand Up @@ -1178,10 +1167,6 @@ impl AccountStorageEntry {
self.count_and_status.read().0
}

pub fn approx_stored_count(&self) -> usize {
self.approx_store_count.load(Ordering::Relaxed)
}

pub fn alive_bytes(&self) -> usize {
self.alive_bytes.load(Ordering::Acquire)
}
Expand Down Expand Up @@ -1217,8 +1202,6 @@ impl AccountStorageEntry {
fn add_accounts(&self, num_accounts: usize, num_bytes: usize) {
let mut count_and_status = self.count_and_status.lock_write();
*count_and_status = (count_and_status.0 + num_accounts, count_and_status.1);
self.approx_store_count
.fetch_add(num_accounts, Ordering::Relaxed);
self.alive_bytes.fetch_add(num_bytes, Ordering::Release);
}

Expand Down Expand Up @@ -4385,7 +4368,7 @@ impl AccountsDb {
let shrink_collect =
self.shrink_collect::<AliveAccounts<'_>>(store, &unique_accounts, &self.shrink_stats);

// This shouldn't happen if alive_bytes/approx_stored_count are accurate.
// This shouldn't happen if alive_bytes is accurate.
// However, it is possible that the remaining alive bytes could be 0. In that case, the whole slot should be marked dead by clean.
if Self::should_not_shrink(
shrink_collect.alive_total_bytes as u64,
Expand Down Expand Up @@ -4593,17 +4576,13 @@ impl AccountsDb {
alive_ratio: f64,
store: Arc<AccountStorageEntry>,
}
let mut measure = Measure::start("select_top_sparse_storage_entries-ms");
let mut store_usage: Vec<StoreUsageInfo> = Vec::with_capacity(shrink_slots.len());
let mut total_alive_bytes: u64 = 0;
let mut candidates_count: usize = 0;
let mut total_bytes: u64 = 0;
let mut total_candidate_stores: usize = 0;
for slot in shrink_slots {
let Some(store) = self.storage.get_slot_storage_entry(*slot) else {
continue;
};
candidates_count += 1;
let alive_bytes = store.alive_bytes();
total_alive_bytes += alive_bytes as u64;
total_bytes += store.capacity();
Expand All @@ -4613,7 +4592,6 @@ impl AccountsDb {
alive_ratio,
store: store.clone(),
});
total_candidate_stores += 1;
}
store_usage.sort_by(|a, b| {
a.alive_ratio
Expand Down Expand Up @@ -4650,20 +4628,6 @@ impl AccountsDb {
shrink_slots.insert(usage.slot, Arc::clone(store));
}
}
measure.stop();
inc_new_counter_debug!(
"shrink_select_top_sparse_storage_entries-ms",
measure.as_ms() as usize
);
inc_new_counter_debug!(
"shrink_select_top_sparse_storage_entries-seeds",
candidates_count
);
inc_new_counter_debug!(
"shrink_total_preliminary_candidate_stores",
total_candidate_stores
);

(shrink_slots, shrink_slots_next_batch)
}

Expand Down Expand Up @@ -5067,8 +5031,8 @@ impl AccountsDb {

let shrink_candidates_slots =
std::mem::take(&mut *self.shrink_candidate_slots.lock().unwrap());

let (shrink_slots, shrink_slots_next_batch) = {
let candidates_count = shrink_candidates_slots.len();
let ((shrink_slots, shrink_slots_next_batch), select_time_us) = measure_us!({
if let AccountShrinkThreshold::TotalSpace { shrink_ratio } = self.shrink_ratio {
let (shrink_slots, shrink_slots_next_batch) =
self.select_candidates_by_total_usage(&shrink_candidates_slots, shrink_ratio);
Expand All @@ -5087,7 +5051,7 @@ impl AccountsDb {
None,
)
}
};
});

if shrink_slots.is_empty()
&& shrink_slots_next_batch
Expand All @@ -5101,41 +5065,43 @@ impl AccountsDb {
let _guard = (!shrink_slots.is_empty())
.then_some(|| self.active_stats.activate(ActiveStatItem::Shrink));

let mut measure_shrink_all_candidates = Measure::start("shrink_all_candidate_slots-ms");
let num_candidates = shrink_slots.len();
let shrink_candidates_count = shrink_slots.len();
self.thread_pool_clean.install(|| {
shrink_slots
.into_par_iter()
.for_each(|(slot, slot_shrink_candidate)| {
if self.ancient_append_vec_offset.is_some() && slot < oldest_non_ancient_slot {
self.shrink_stats
.num_ancient_slots_shrunk
.fetch_add(1, Ordering::Relaxed);
}
let mut measure = Measure::start("shrink_candidate_slots-ms");
self.shrink_storage(&slot_shrink_candidate);
measure.stop();
inc_new_counter_info!("shrink_candidate_slots-ms", measure.as_ms() as usize);
});
let num_selected = shrink_slots.len();
let (_, shrink_all_us) = measure_us!({
self.thread_pool_clean.install(|| {
shrink_slots
.into_par_iter()
.for_each(|(slot, slot_shrink_candidate)| {
if self.ancient_append_vec_offset.is_some()
&& slot < oldest_non_ancient_slot
{
self.shrink_stats
.num_ancient_slots_shrunk
.fetch_add(1, Ordering::Relaxed);
}
self.shrink_storage(&slot_shrink_candidate);
});
})
});
measure_shrink_all_candidates.stop();
inc_new_counter_info!(
"shrink_all_candidate_slots-ms",
measure_shrink_all_candidates.as_ms() as usize
);
inc_new_counter_info!("shrink_all_candidate_slots-count", shrink_candidates_count);

let mut pended_counts: usize = 0;
if let Some(shrink_slots_next_batch) = shrink_slots_next_batch {
let mut shrink_slots = self.shrink_candidate_slots.lock().unwrap();
pended_counts += shrink_slots_next_batch.len();
pended_counts = shrink_slots_next_batch.len();
for slot in shrink_slots_next_batch {
shrink_slots.insert(slot);
}
}
inc_new_counter_info!("shrink_pended_stores-count", pended_counts);

num_candidates
datapoint_info!(
"shrink_candidate_slots",
("select_time_us", select_time_us, i64),
("shrink_all_us", shrink_all_us, i64),
("candidates_count", candidates_count, i64),
("selected_count", num_selected, i64),
("deferred_to_next_round_count", pended_counts, i64)
);

num_selected
}

/// This is only called at startup from bank when we are being extra careful such as when we downloaded a snapshot.
Expand Down Expand Up @@ -8040,16 +8006,14 @@ impl AccountsDb {

fn is_shrinking_productive(store: &AccountStorageEntry) -> bool {
let alive_count = store.count();
let stored_count = store.approx_stored_count();
let alive_bytes = store.alive_bytes() as u64;
let total_bytes = store.capacity();

if Self::should_not_shrink(alive_bytes, total_bytes) {
trace!(
"shrink_slot_forced ({}): not able to shrink at all: alive/stored: {}/{} ({}b / {}b) save: {}",
"shrink_slot_forced ({}): not able to shrink at all: num alive: {}, bytes alive: {}, bytes total: {}, bytes saved: {}",
store.slot(),
alive_count,
stored_count,
alive_bytes,
total_bytes,
total_bytes.saturating_sub(alive_bytes),
Expand Down Expand Up @@ -9378,12 +9342,6 @@ impl AccountsDb {
store
.alive_bytes
.store(entry.stored_size, Ordering::Release);
assert!(
store.approx_stored_count() >= entry.count,
"{}, {}",
store.approx_stored_count(),
entry.count
);
} else {
trace!("id: {} clearing count", id);
store.count_and_status.lock_write().0 = 0;
Expand Down Expand Up @@ -9424,11 +9382,10 @@ impl AccountsDb {
for slot in &slots {
let entry = self.storage.get_slot_storage_entry(*slot).unwrap();
info!(
" slot: {} id: {} count_and_status: {:?} approx_store_count: {} len: {} capacity: {}",
" slot: {} id: {} count_and_status: {:?} len: {} capacity: {}",
slot,
entry.id(),
entry.count_and_status.read(),
entry.approx_store_count.load(Ordering::Relaxed),
entry.accounts.len(),
entry.accounts.capacity(),
);
Expand Down Expand Up @@ -9653,10 +9610,7 @@ impl AccountsDb {
pub fn all_account_count_in_accounts_file(&self, slot: Slot) -> usize {
let store = self.storage.get_slot_storage_entry(slot);
if let Some(store) = store {
let count = store.accounts_count();
let stored_count = store.approx_stored_count();
assert_eq!(stored_count, count);
count
store.accounts_count()
} else {
0
}
Expand Down Expand Up @@ -9921,10 +9875,6 @@ pub mod tests {

// construct append vec with account to generate an index from
append_vec.accounts.append_accounts(&storable_accounts, 0);
// append vecs set this at load
append_vec
.approx_store_count
.store(data.len(), Ordering::Relaxed);

let genesis_config = GenesisConfig::default();
assert!(!db.accounts_index.contains(&pubkey));
Expand Down
4 changes: 4 additions & 0 deletions accounts-db/src/accounts_hash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1265,6 +1265,10 @@ pub struct AccountLtHash(pub LtHash);
pub const ZERO_LAMPORT_ACCOUNT_LT_HASH: AccountLtHash =
AccountLtHash(LtHash([0; LtHash::NUM_ELEMENTS]));

/// Lattice hash of all accounts
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct AccountsLtHash(pub LtHash);

/// Hash of accounts
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
pub enum AccountsHashKind {
Expand Down
Loading

0 comments on commit e545d22

Please sign in to comment.