Skip to content

Commit

Permalink
account_saver: optionally collect txs (#2793)
Browse files Browse the repository at this point in the history
  • Loading branch information
apfitzge authored Sep 3, 2024
1 parent 0259407 commit 6e83982
Show file tree
Hide file tree
Showing 3 changed files with 178 additions and 89 deletions.
5 changes: 5 additions & 0 deletions accounts-db/src/accounts_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2708,6 +2708,11 @@ impl AccountsDb {
self.base_working_path.clone()
}

/// Returns true if there is an accounts update notifier.
pub fn has_accounts_update_notifier(&self) -> bool {
self.accounts_update_notifier.is_some()
}

fn next_id(&self) -> AccountsFileId {
let next_id = self.next_id.fetch_add(1, Ordering::AcqRel);
assert!(
Expand Down
261 changes: 172 additions & 89 deletions runtime/src/account_saver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,11 @@ pub fn collect_accounts_to_store<'a, T: SVMMessage>(
processing_results: &'a mut [TransactionProcessingResult],
durable_nonce: &DurableNonce,
lamports_per_signature: u64,
collect_transactions: bool,
) -> (Vec<(&'a Pubkey, &'a AccountSharedData)>, Option<Vec<&'a T>>) {
let collect_capacity = max_number_of_accounts_to_collect(txs, processing_results);
let mut accounts = Vec::with_capacity(collect_capacity);
let mut transactions = Vec::with_capacity(collect_capacity);
let mut transactions = collect_transactions.then(|| Vec::with_capacity(collect_capacity));
for (processing_result, transaction) in processing_results.iter_mut().zip(txs) {
let Some(processed_tx) = processing_result.processed_transaction_mut() else {
// Don't store any accounts if tx wasn't executed
Expand Down Expand Up @@ -87,12 +88,12 @@ pub fn collect_accounts_to_store<'a, T: SVMMessage>(
}
}
}
(accounts, Some(transactions))
(accounts, transactions)
}

fn collect_accounts_for_successful_tx<'a, T: SVMMessage>(
collected_accounts: &mut Vec<(&'a Pubkey, &'a AccountSharedData)>,
collected_account_transactions: &mut Vec<&'a T>,
collected_account_transactions: &mut Option<Vec<&'a T>>,
transaction: &'a T,
transaction_accounts: &'a [TransactionAccount],
) {
Expand All @@ -109,13 +110,15 @@ fn collect_accounts_for_successful_tx<'a, T: SVMMessage>(
})
{
collected_accounts.push((address, account));
collected_account_transactions.push(transaction);
if let Some(collected_account_transactions) = collected_account_transactions {
collected_account_transactions.push(transaction);
}
}
}

fn collect_accounts_for_failed_tx<'a, T: SVMMessage>(
collected_accounts: &mut Vec<(&'a Pubkey, &'a AccountSharedData)>,
collected_account_transactions: &mut Vec<&'a T>,
collected_account_transactions: &mut Option<Vec<&'a T>>,
transaction: &'a T,
rollback_accounts: &'a mut RollbackAccounts,
durable_nonce: &DurableNonce,
Expand All @@ -125,7 +128,9 @@ fn collect_accounts_for_failed_tx<'a, T: SVMMessage>(
match rollback_accounts {
RollbackAccounts::FeePayerOnly { fee_payer_account } => {
collected_accounts.push((fee_payer_address, &*fee_payer_account));
collected_account_transactions.push(transaction);
if let Some(collected_account_transactions) = collected_account_transactions {
collected_account_transactions.push(transaction);
}
}
RollbackAccounts::SameNonceAndFeePayer { nonce } => {
// Since we know we are dealing with a valid nonce account,
Expand All @@ -134,22 +139,28 @@ fn collect_accounts_for_failed_tx<'a, T: SVMMessage>(
.try_advance_nonce(*durable_nonce, lamports_per_signature)
.unwrap();
collected_accounts.push((nonce.address(), nonce.account()));
collected_account_transactions.push(transaction);
if let Some(collected_account_transactions) = collected_account_transactions {
collected_account_transactions.push(transaction);
}
}
RollbackAccounts::SeparateNonceAndFeePayer {
nonce,
fee_payer_account,
} => {
collected_accounts.push((fee_payer_address, &*fee_payer_account));
collected_account_transactions.push(transaction);
if let Some(collected_account_transactions) = collected_account_transactions {
collected_account_transactions.push(transaction);
}

// Since we know we are dealing with a valid nonce account,
// unwrap is safe here
nonce
.try_advance_nonce(*durable_nonce, lamports_per_signature)
.unwrap();
collected_accounts.push((nonce.address(), nonce.account()));
collected_account_transactions.push(transaction);
if let Some(collected_account_transactions) = collected_account_transactions {
collected_account_transactions.push(transaction);
}
}
}
}
Expand Down Expand Up @@ -284,20 +295,32 @@ mod tests {
];
let max_collected_accounts = max_number_of_accounts_to_collect(&txs, &processing_results);
assert_eq!(max_collected_accounts, 2);
let (collected_accounts, transactions) =
collect_accounts_to_store(&txs, &mut processing_results, &DurableNonce::default(), 0);
assert_eq!(collected_accounts.len(), 2);
assert!(collected_accounts
.iter()
.any(|(pubkey, _account)| *pubkey == &keypair0.pubkey()));
assert!(collected_accounts
.iter()
.any(|(pubkey, _account)| *pubkey == &keypair1.pubkey()));

let transactions = transactions.unwrap();
assert_eq!(transactions.len(), 2);
assert!(transactions.iter().any(|txn| (*txn).eq(&tx0)));
assert!(transactions.iter().any(|txn| (*txn).eq(&tx1)));

for collect_transactions in [false, true] {
let (collected_accounts, transactions) = collect_accounts_to_store(
&txs,
&mut processing_results,
&DurableNonce::default(),
0,
collect_transactions,
);
assert_eq!(collected_accounts.len(), 2);
assert!(collected_accounts
.iter()
.any(|(pubkey, _account)| *pubkey == &keypair0.pubkey()));
assert!(collected_accounts
.iter()
.any(|(pubkey, _account)| *pubkey == &keypair1.pubkey()));

if collect_transactions {
let transactions = transactions.unwrap();
assert_eq!(transactions.len(), 2);
assert!(transactions.iter().any(|txn| (*txn).eq(&tx0)));
assert!(transactions.iter().any(|txn| (*txn).eq(&tx1)));
} else {
assert!(transactions.is_none());
}
}
}

#[test]
Expand Down Expand Up @@ -343,18 +366,33 @@ mod tests {
let max_collected_accounts = max_number_of_accounts_to_collect(&txs, &processing_results);
assert_eq!(max_collected_accounts, 1);
let durable_nonce = DurableNonce::from_blockhash(&Hash::new_unique());
let (collected_accounts, _) =
collect_accounts_to_store(&txs, &mut processing_results, &durable_nonce, 0);
assert_eq!(collected_accounts.len(), 1);
assert_eq!(
collected_accounts
.iter()
.find(|(pubkey, _account)| *pubkey == &from_address)
.map(|(_pubkey, account)| *account)
.cloned()
.unwrap(),
from_account_pre,
);

for collect_transactions in [false, true] {
let (collected_accounts, transactions) = collect_accounts_to_store(
&txs,
&mut processing_results,
&durable_nonce,
0,
collect_transactions,
);
assert_eq!(collected_accounts.len(), 1);
assert_eq!(
collected_accounts
.iter()
.find(|(pubkey, _account)| *pubkey == &from_address)
.map(|(_pubkey, account)| *account)
.cloned()
.unwrap(),
from_account_pre,
);

if collect_transactions {
let transactions = transactions.unwrap();
assert_eq!(transactions.len(), collected_accounts.len());
} else {
assert!(transactions.is_none());
}
}
}

#[test]
Expand Down Expand Up @@ -428,33 +466,48 @@ mod tests {
)];
let max_collected_accounts = max_number_of_accounts_to_collect(&txs, &processing_results);
assert_eq!(max_collected_accounts, 2);
let (collected_accounts, _) =
collect_accounts_to_store(&txs, &mut processing_results, &durable_nonce, 0);
assert_eq!(collected_accounts.len(), 2);
assert_eq!(
collected_accounts

for collect_transactions in [false, true] {
let (collected_accounts, transactions) = collect_accounts_to_store(
&txs,
&mut processing_results,
&durable_nonce,
0,
collect_transactions,
);
assert_eq!(collected_accounts.len(), 2);
assert_eq!(
collected_accounts
.iter()
.find(|(pubkey, _account)| *pubkey == &from_address)
.map(|(_pubkey, account)| *account)
.cloned()
.unwrap(),
from_account_pre,
);
let collected_nonce_account = collected_accounts
.iter()
.find(|(pubkey, _account)| *pubkey == &from_address)
.find(|(pubkey, _account)| *pubkey == &nonce_address)
.map(|(_pubkey, account)| *account)
.cloned()
.unwrap(),
from_account_pre,
);
let collected_nonce_account = collected_accounts
.iter()
.find(|(pubkey, _account)| *pubkey == &nonce_address)
.map(|(_pubkey, account)| *account)
.cloned()
.unwrap();
assert_eq!(
collected_nonce_account.lamports(),
nonce_account_pre.lamports(),
);
assert!(nonce_account::verify_nonce_account(
&collected_nonce_account,
durable_nonce.as_hash()
)
.is_some());
.unwrap();
assert_eq!(
collected_nonce_account.lamports(),
nonce_account_pre.lamports(),
);
assert!(nonce_account::verify_nonce_account(
&collected_nonce_account,
durable_nonce.as_hash()
)
.is_some());

if collect_transactions {
let transactions = transactions.unwrap();
assert_eq!(transactions.len(), collected_accounts.len());
} else {
assert!(transactions.is_none());
}
}
}

#[test]
Expand Down Expand Up @@ -526,24 +579,39 @@ mod tests {
)];
let max_collected_accounts = max_number_of_accounts_to_collect(&txs, &processing_results);
assert_eq!(max_collected_accounts, 1);
let (collected_accounts, _) =
collect_accounts_to_store(&txs, &mut processing_results, &durable_nonce, 0);
assert_eq!(collected_accounts.len(), 1);
let collected_nonce_account = collected_accounts
.iter()
.find(|(pubkey, _account)| *pubkey == &nonce_address)
.map(|(_pubkey, account)| *account)
.cloned()
.unwrap();
assert_eq!(
collected_nonce_account.lamports(),
nonce_account_pre.lamports()
);
assert!(nonce_account::verify_nonce_account(
&collected_nonce_account,
durable_nonce.as_hash()
)
.is_some());

for collect_transactions in [false, true] {
let (collected_accounts, transactions) = collect_accounts_to_store(
&txs,
&mut processing_results,
&durable_nonce,
0,
collect_transactions,
);
assert_eq!(collected_accounts.len(), 1);
let collected_nonce_account = collected_accounts
.iter()
.find(|(pubkey, _account)| *pubkey == &nonce_address)
.map(|(_pubkey, account)| *account)
.cloned()
.unwrap();
assert_eq!(
collected_nonce_account.lamports(),
nonce_account_pre.lamports()
);
assert!(nonce_account::verify_nonce_account(
&collected_nonce_account,
durable_nonce.as_hash()
)
.is_some());

if collect_transactions {
let transactions = transactions.unwrap();
assert_eq!(transactions.len(), collected_accounts.len());
} else {
assert!(transactions.is_none());
}
}
}

#[test]
Expand Down Expand Up @@ -572,17 +640,32 @@ mod tests {
let max_collected_accounts = max_number_of_accounts_to_collect(&txs, &processing_results);
assert_eq!(max_collected_accounts, 1);
let durable_nonce = DurableNonce::from_blockhash(&Hash::new_unique());
let (collected_accounts, _) =
collect_accounts_to_store(&txs, &mut processing_results, &durable_nonce, 0);
assert_eq!(collected_accounts.len(), 1);
assert_eq!(
collected_accounts
.iter()
.find(|(pubkey, _account)| *pubkey == &from_address)
.map(|(_pubkey, account)| *account)
.cloned()
.unwrap(),
from_account_pre,
);

for collect_transactions in [false, true] {
let (collected_accounts, transactions) = collect_accounts_to_store(
&txs,
&mut processing_results,
&durable_nonce,
0,
collect_transactions,
);
assert_eq!(collected_accounts.len(), 1);
assert_eq!(
collected_accounts
.iter()
.find(|(pubkey, _account)| *pubkey == &from_address)
.map(|(_pubkey, account)| *account)
.cloned()
.unwrap(),
from_account_pre,
);

if collect_transactions {
let transactions = transactions.unwrap();
assert_eq!(transactions.len(), collected_accounts.len());
} else {
assert!(transactions.is_none());
}
}
}
}
1 change: 1 addition & 0 deletions runtime/src/bank.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3804,6 +3804,7 @@ impl Bank {
&mut processing_results,
&durable_nonce,
lamports_per_signature,
self.accounts().accounts_db.has_accounts_update_notifier(),
);
self.rc.accounts.store_cached(
(self.slot(), accounts_to_store.as_slice()),
Expand Down

0 comments on commit 6e83982

Please sign in to comment.