diff --git a/accounts-db/src/accounts_db.rs b/accounts-db/src/accounts_db.rs index af886da75c7df3..02449fcc021c33 100644 --- a/accounts-db/src/accounts_db.rs +++ b/accounts-db/src/accounts_db.rs @@ -1352,8 +1352,6 @@ impl StoreAccountsTiming { struct CleaningInfo { slot_list: SlotList, ref_count: u64, - /// True for pubkeys mapping to older versions of accounts that should be purged. - should_purge: bool, } /// This is the return type of AccountsDb::construct_candidate_clean_keys. @@ -2752,59 +2750,54 @@ impl AccountsDb { .expect("Cluster type must be set at initialization") } - /// Reclaim older states of accounts older than max_clean_root_inclusive for AccountsDb bloat mitigation. - /// Any accounts which are removed from the accounts index are returned in PubkeysRemovedFromAccountsIndex. - /// These should NOT be unref'd later from the accounts index. - fn clean_accounts_older_than_root( + /// While scanning cleaning candidates obtain slots that can be + /// reclaimed for each pubkey. In addition, if the pubkey is + /// removed from the index, insert in pubkeys_removed_from_accounts_index. + fn collect_reclaims( &self, - candidates: &[RwLock>], + pubkey: &Pubkey, max_clean_root_inclusive: Option, ancient_account_cleans: &AtomicU64, epoch_schedule: &EpochSchedule, - ) -> (ReclaimResult, PubkeysRemovedFromAccountsIndex) { - let pubkeys_removed_from_accounts_index = HashSet::default(); + pubkeys_removed_from_accounts_index: &Mutex, + ) -> SlotList { let one_epoch_old = self.get_oldest_non_ancient_slot(epoch_schedule); - let pubkeys_removed_from_accounts_index = Mutex::new(pubkeys_removed_from_accounts_index); - let mut clean_rooted = Measure::start("clean_old_root-ms"); - let reclaim_vecs = candidates - .par_iter() - .filter_map(|candidates_bin| { - let mut reclaims = Vec::new(); - for (pubkey, cleaning_info) in candidates_bin.read().unwrap().iter() { - if cleaning_info.should_purge { - let removed_from_index = self.accounts_index.clean_rooted_entries( - pubkey, - &mut reclaims, - max_clean_root_inclusive, - ); - if removed_from_index { - pubkeys_removed_from_accounts_index - .lock() - .unwrap() - .insert(*pubkey); - } - } - } - - (!reclaims.is_empty()).then(|| { - // figure out how many ancient accounts have been reclaimed - let old_reclaims = reclaims - .iter() - .filter_map(|(slot, _)| (slot < &one_epoch_old).then_some(1)) - .sum(); - ancient_account_cleans.fetch_add(old_reclaims, Ordering::Relaxed); - reclaims - }) - }) - .collect::>(); + let mut reclaims = Vec::new(); + let removed_from_index = self.accounts_index.clean_rooted_entries( + pubkey, + &mut reclaims, + max_clean_root_inclusive, + ); + if removed_from_index { + pubkeys_removed_from_accounts_index + .lock() + .unwrap() + .insert(*pubkey); + } + if !reclaims.is_empty() { + // figure out how many ancient accounts have been reclaimed + let old_reclaims = reclaims + .iter() + .filter_map(|(slot, _)| (slot < &one_epoch_old).then_some(1)) + .sum(); + ancient_account_cleans.fetch_add(old_reclaims, Ordering::Relaxed); + } clean_rooted.stop(); - let pubkeys_removed_from_accounts_index = - pubkeys_removed_from_accounts_index.into_inner().unwrap(); self.clean_accounts_stats .clean_old_root_us .fetch_add(clean_rooted.as_us(), Ordering::Relaxed); + reclaims + } + /// Reclaim older states of accounts older than max_clean_root_inclusive for AccountsDb bloat mitigation. + /// Any accounts which are removed from the accounts index are returned in PubkeysRemovedFromAccountsIndex. + /// These should NOT be unref'd later from the accounts index. + fn clean_accounts_older_than_root( + &self, + reclaims: &SlotList, + pubkeys_removed_from_accounts_index: &HashSet, + ) -> ReclaimResult { let mut measure = Measure::start("clean_old_root_reclaims"); // Don't reset from clean, since the pubkeys in those stores may need to be unref'ed @@ -2812,18 +2805,18 @@ impl AccountsDb { let reset_accounts = false; let reclaim_result = self.handle_reclaims( - (!reclaim_vecs.is_empty()).then(|| reclaim_vecs.iter().flatten()), + (!reclaims.is_empty()).then(|| reclaims.iter()), None, reset_accounts, - &pubkeys_removed_from_accounts_index, + pubkeys_removed_from_accounts_index, HandleReclaims::ProcessDeadSlots(&self.clean_accounts_stats.purge_stats), ); measure.stop(); - debug!("{} {}", clean_rooted, measure); + debug!("{}", measure); self.clean_accounts_stats .clean_old_root_reclaim_us .fetch_add(measure.as_us(), Ordering::Relaxed); - (reclaim_result, pubkeys_removed_from_accounts_index) + reclaim_result } fn do_reset_uncleaned_roots(&self, max_clean_root: Option) { @@ -2856,9 +2849,8 @@ impl AccountsDb { CleaningInfo { slot_list, ref_count, - should_purge: _, }, - ) in bin.iter().filter(|x| !x.1.slot_list.is_empty()) + ) in bin.iter() { let mut failed_slot = None; let all_stores_being_deleted = slot_list.len() as RefCount == *ref_count; @@ -3349,7 +3341,10 @@ impl AccountsDb { let not_found_on_fork_accum = AtomicU64::new(0); let missing_accum = AtomicU64::new(0); let useful_accum = AtomicU64::new(0); - + let reclaims: SlotList = Vec::with_capacity(num_candidates as usize); + let reclaims = Mutex::new(reclaims); + let pubkeys_removed_from_accounts_index: PubkeysRemovedFromAccountsIndex = HashSet::new(); + let pubkeys_removed_from_accounts_index = Mutex::new(pubkeys_removed_from_accounts_index); // parallel scan the index. let do_clean_scan = || { candidates.par_iter().for_each(|candidates_bin| { @@ -3363,86 +3358,97 @@ impl AccountsDb { // avoid capturing the HashMap in the // closure passed to scan thus making // conflicting read and write borrows. - candidates_bin - .iter_mut() - .for_each(|(candidate_pubkey, candidate_info)| { - self.accounts_index.scan( - [*candidate_pubkey].iter(), - |_candidate_pubkey, slot_list_and_ref_count, _entry| { - let mut useless = true; - if let Some((slot_list, ref_count)) = slot_list_and_ref_count { - // find the highest rooted slot in the slot list - let index_in_slot_list = self.accounts_index.latest_slot( - None, - slot_list, - max_clean_root_inclusive, - ); + candidates_bin.retain(|candidate_pubkey, candidate_info| { + let mut should_purge = false; + self.accounts_index.scan( + [*candidate_pubkey].iter(), + |_candidate_pubkey, slot_list_and_ref_count, _entry| { + let mut useless = true; + if let Some((slot_list, ref_count)) = slot_list_and_ref_count { + // find the highest rooted slot in the slot list + let index_in_slot_list = self.accounts_index.latest_slot( + None, + slot_list, + max_clean_root_inclusive, + ); - match index_in_slot_list { - Some(index_in_slot_list) => { - // found info relative to max_clean_root - let (slot, account_info) = - &slot_list[index_in_slot_list]; - if account_info.is_zero_lamport() { + match index_in_slot_list { + Some(index_in_slot_list) => { + // found info relative to max_clean_root + let (slot, account_info) = &slot_list[index_in_slot_list]; + if account_info.is_zero_lamport() { + useless = false; + // The latest one is zero lamports. We may be able to purge it. + // Add all the rooted entries that contain this pubkey. + // We know the highest rooted entry is zero lamports. + candidate_info.slot_list = + self.accounts_index.get_rooted_entries( + slot_list, + max_clean_root_inclusive, + ); + candidate_info.ref_count = ref_count; + } else { + found_not_zero += 1; + } + if uncleaned_roots.contains(slot) { + // Assertion enforced by `accounts_index.get()`, the latest slot + // will not be greater than the given `max_clean_root` + if let Some(max_clean_root_inclusive) = + max_clean_root_inclusive + { + assert!(slot <= &max_clean_root_inclusive); + } + if slot_list.len() > 1 { + // no need to purge old accounts if there is only 1 slot in the slot list + should_purge = true; + purges_old_accounts_local += 1; useless = false; - // The latest one is zero lamports. We may be able to purge it. - // Add all the rooted entries that contain this pubkey. - // We know the highest rooted entry is zero lamports. - candidate_info.slot_list = - self.accounts_index.get_rooted_entries( - slot_list, - max_clean_root_inclusive, - ); - candidate_info.ref_count = ref_count; } else { - found_not_zero += 1; + self.clean_accounts_stats + .uncleaned_roots_slot_list_1 + .fetch_add(1, Ordering::Relaxed); } - if uncleaned_roots.contains(slot) { - // Assertion enforced by `accounts_index.get()`, the latest slot - // will not be greater than the given `max_clean_root` - if let Some(max_clean_root_inclusive) = - max_clean_root_inclusive - { - assert!(slot <= &max_clean_root_inclusive); - } - if slot_list.len() > 1 { - // no need to purge old accounts if there is only 1 slot in the slot list - candidate_info.should_purge = true; - purges_old_accounts_local += 1; - useless = false; - } else { - self.clean_accounts_stats - .uncleaned_roots_slot_list_1 - .fetch_add(1, Ordering::Relaxed); - } - } - } - None => { - // This pubkey is in the index but not in a root slot, so clean - // it up by adding it to the to-be-purged list. - // - // Also, this pubkey must have been touched by some slot since - // it was in the dirty list, so we assume that the slot it was - // touched in must be unrooted. - not_found_on_fork += 1; - candidate_info.should_purge = true; - purges_old_accounts_local += 1; - useless = false; } } - } else { - missing += 1; - } - if !useless { - useful += 1; + None => { + // This pubkey is in the index but not in a root slot, so clean + // it up by adding it to the to-be-purged list. + // + // Also, this pubkey must have been touched by some slot since + // it was in the dirty list, so we assume that the slot it was + // touched in must be unrooted. + not_found_on_fork += 1; + should_purge = true; + purges_old_accounts_local += 1; + useless = false; + } } - AccountsIndexScanResult::OnlyKeepInMemoryIfDirty - }, - None, - false, - ScanFilter::All, + } else { + missing += 1; + } + if !useless { + useful += 1; + } + AccountsIndexScanResult::OnlyKeepInMemoryIfDirty + }, + None, + false, + ScanFilter::All, + ); + if should_purge { + let reclaims_new = self.collect_reclaims( + candidate_pubkey, + max_clean_root_inclusive, + &ancient_account_cleans, + epoch_schedule, + &pubkeys_removed_from_accounts_index, ); - }); + if !reclaims_new.is_empty() { + reclaims.lock().unwrap().extend(reclaims_new); + } + } + !candidate_info.slot_list.is_empty() + }); found_not_zero_accum.fetch_add(found_not_zero, Ordering::Relaxed); not_found_on_fork_accum.fetch_add(not_found_on_fork, Ordering::Relaxed); missing_accum.fetch_add(missing, Ordering::Relaxed); @@ -3457,16 +3463,13 @@ impl AccountsDb { } accounts_scan.stop(); - + let retained_keys_count = Self::count_pubkeys(&candidates); + let reclaims = reclaims.into_inner().unwrap(); + let mut pubkeys_removed_from_accounts_index = + pubkeys_removed_from_accounts_index.into_inner().unwrap(); let mut clean_old_rooted = Measure::start("clean_old_roots"); - let ((purged_account_slots, removed_accounts), mut pubkeys_removed_from_accounts_index) = - self.clean_accounts_older_than_root( - &candidates, - max_clean_root_inclusive, - &ancient_account_cleans, - epoch_schedule, - ); - + let (purged_account_slots, removed_accounts) = + self.clean_accounts_older_than_root(&reclaims, &pubkeys_removed_from_accounts_index); self.do_reset_uncleaned_roots(max_clean_root_inclusive); clean_old_rooted.stop(); @@ -3481,13 +3484,10 @@ impl AccountsDb { CleaningInfo { slot_list, ref_count, - should_purge: _, }, ) in candidates_bin.write().unwrap().iter_mut() { - if slot_list.is_empty() { - continue; // seems simpler than filtering. `candidates` contains all the pubkeys we original started with - } + debug_assert!(!slot_list.is_empty(), "candidate slot_list can't be empty"); if purged_account_slots.contains_key(pubkey) { *ref_count = self.accounts_index.ref_count_from_storage(pubkey); } @@ -3563,7 +3563,6 @@ impl AccountsDb { let CleaningInfo { slot_list, ref_count: _, - should_purge: _, } = cleaning_info; (!slot_list.is_empty()).then_some(( *pubkey, @@ -3631,6 +3630,7 @@ impl AccountsDb { ("dirty_pubkeys_count", key_timings.dirty_pubkeys_count, i64), ("useful_keys", useful_accum.load(Ordering::Relaxed), i64), ("total_keys_count", num_candidates, i64), + ("retained_keys_count", retained_keys_count, i64), ( "scan_found_not_zero", found_not_zero_accum.load(Ordering::Relaxed), @@ -3866,11 +3866,8 @@ impl AccountsDb { let CleaningInfo { slot_list, ref_count: _, - should_purge: _, } = cleaning_info; - if slot_list.is_empty() { - return false; - } + debug_assert!(!slot_list.is_empty(), "candidate slot_list can't be empty"); // Only keep candidates where the entire history of the account in the root set // can be purged. All AppendVecs for those updates are dead. for (slot, _account_info) in slot_list.iter() { @@ -12959,7 +12956,6 @@ pub mod tests { CleaningInfo { slot_list: rooted_entries, ref_count, - should_purge: false, }, ); } @@ -12970,7 +12966,6 @@ pub mod tests { CleaningInfo { slot_list: list, ref_count, - should_purge: _, }, ) in candidates_bin.iter() { @@ -15275,7 +15270,6 @@ pub mod tests { CleaningInfo { slot_list: vec![(slot, account_info)], ref_count: 1, - should_purge: false, }, ); let accounts_db = AccountsDb::new_single_for_tests();