From 01b5f3dbd35345302e5fc6a83667b5e5e017d4ec Mon Sep 17 00:00:00 2001 From: Noah Prince <83885631+ChewingGlass@users.noreply.github.com> Date: Tue, 19 Dec 2023 14:34:50 -0600 Subject: [PATCH] feat(#504): Parallelize claim delegation rewards (#509) * Allow parallel claims * Parallelize calls * Appease clippy --- packages/spl-utils/src/transaction.ts | 64 ++++++++++++++++++- .../src/hooks/useClaimAllPositionsRewards.ts | 10 ++- .../src/hooks/useClaimPositionRewards.ts | 12 ++-- .../delegation/claim_rewards_v0.rs | 12 ++-- programs/helium-sub-daos/src/state.rs | 63 +++++++++++++++++- tests/helium-sub-daos.ts | 9 --- 6 files changed, 143 insertions(+), 27 deletions(-) diff --git a/packages/spl-utils/src/transaction.ts b/packages/spl-utils/src/transaction.ts index afcc35930..66a8290f3 100644 --- a/packages/spl-utils/src/transaction.ts +++ b/packages/spl-utils/src/transaction.ts @@ -467,7 +467,7 @@ async function withRetries( throw new Error("Failed after retries") } -type Status = { +export type Status = { totalProgress: number; currentBatchProgress: number; currentBatchSize: number; @@ -637,3 +637,65 @@ async function getAllTxns( ) ).flat(); } + +// Batch instructions parallel into as many txs as it takes +export async function batchParallelInstructions( + provider: AnchorProvider, + instructions: TransactionInstruction[], + onProgress?: (status: Status) => void, + triesRemaining: number = 10 // Number of blockhashes to try resending txs with before giving up +): Promise { + let currentTxInstructions: TransactionInstruction[] = []; + const blockhash = (await provider.connection.getLatestBlockhash()).blockhash; + const transactions: Transaction[] = []; + + for (const instruction of instructions) { + currentTxInstructions.push(instruction); + const tx = new Transaction({ + feePayer: provider.wallet.publicKey, + recentBlockhash: blockhash, + }); + tx.add(...currentTxInstructions); + try { + if ( + tx.serialize({ + requireAllSignatures: false, + verifySignatures: false, + }).length >= + 1232 - (64 + 32) * tx.signatures.length + ) { + // yes it's ugly to throw and catch, but .serialize can _also_ throw this error + throw new Error("Transaction too large"); + } + } catch (e: any) { + if (e.toString().includes("Transaction too large")) { + currentTxInstructions.pop(); + const tx = new Transaction({ + feePayer: provider.wallet.publicKey, + recentBlockhash: blockhash, + }); + tx.add(...currentTxInstructions); + transactions.push(tx); + currentTxInstructions = [instruction]; + } else { + throw e; + } + } + } + + if (currentTxInstructions.length > 0) { + const tx = new Transaction({ + feePayer: provider.wallet.publicKey, + recentBlockhash: blockhash, + }); + tx.add(...currentTxInstructions); + transactions.push(tx); + } + + await bulkSendTransactions( + provider, + transactions, + onProgress, + triesRemaining + ); +} diff --git a/packages/voter-stake-registry-hooks/src/hooks/useClaimAllPositionsRewards.ts b/packages/voter-stake-registry-hooks/src/hooks/useClaimAllPositionsRewards.ts index 6cd88725c..e314edc08 100644 --- a/packages/voter-stake-registry-hooks/src/hooks/useClaimAllPositionsRewards.ts +++ b/packages/voter-stake-registry-hooks/src/hooks/useClaimAllPositionsRewards.ts @@ -6,7 +6,7 @@ import { delegatedPositionKey, init, } from "@helium/helium-sub-daos-sdk"; -import { chunks, sendMultipleInstructions } from "@helium/spl-utils"; +import { batchParallelInstructions, chunks, sendMultipleInstructions, Status } from "@helium/spl-utils"; import { PublicKey, TransactionInstruction } from "@solana/web3.js"; import { useAsyncCallback } from "react-async-hook"; import { useHeliumVsrState } from "../contexts/heliumVsrContext"; @@ -19,9 +19,11 @@ export const useClaimAllPositionsRewards = () => { async ({ positions, programId = PROGRAM_ID, + onProgress, }: { positions: PositionWithMeta[]; programId?: PublicKey; + onProgress?: (status: Status) => void; }) => { const isInvalid = !unixNow || !provider || !positions.every((pos) => pos.hasRewards); @@ -68,6 +70,12 @@ export const useClaimAllPositionsRewards = () => { ); } + await batchParallelInstructions( + provider, + multiDemArray.flat(), + onProgress + ); + for (const positionInsturctions of multiDemArray) { // This is an arbitrary threshold and we assume that up to 4 instructions can be inserted as a single Tx const ixsChunks = chunks(positionInsturctions, 4); diff --git a/packages/voter-stake-registry-hooks/src/hooks/useClaimPositionRewards.ts b/packages/voter-stake-registry-hooks/src/hooks/useClaimPositionRewards.ts index baaa1989e..5c0ca3995 100644 --- a/packages/voter-stake-registry-hooks/src/hooks/useClaimPositionRewards.ts +++ b/packages/voter-stake-registry-hooks/src/hooks/useClaimPositionRewards.ts @@ -6,7 +6,7 @@ import { delegatedPositionKey, init, } from "@helium/helium-sub-daos-sdk"; -import { chunks, sendMultipleInstructions } from "@helium/spl-utils"; +import { batchParallelInstructions, Status } from "@helium/spl-utils"; import { PublicKey, TransactionInstruction } from "@solana/web3.js"; import { useAsyncCallback } from "react-async-hook"; import { useHeliumVsrState } from "../contexts/heliumVsrContext"; @@ -19,9 +19,11 @@ export const useClaimPositionRewards = () => { async ({ position, programId = PROGRAM_ID, + onProgress, }: { position: PositionWithMeta; programId?: PublicKey; + onProgress?: (status: Status) => void; }) => { const isInvalid = !unixNow || !provider || !position.hasRewards; @@ -60,13 +62,7 @@ export const useClaimPositionRewards = () => { ) ); - // This is an arbitrary threshold and we assume that up to 4 instructions can be inserted as a single Tx - const ixsChunks = chunks(instructions, 4); - await sendMultipleInstructions( - provider, - ixsChunks, - ixsChunks.map((_) => []) - ); + await batchParallelInstructions(provider, instructions, onProgress); } } ); diff --git a/programs/helium-sub-daos/src/instructions/delegation/claim_rewards_v0.rs b/programs/helium-sub-daos/src/instructions/delegation/claim_rewards_v0.rs index f128f1f58..e93c6c48f 100644 --- a/programs/helium-sub-daos/src/instructions/delegation/claim_rewards_v0.rs +++ b/programs/helium-sub-daos/src/instructions/delegation/claim_rewards_v0.rs @@ -118,12 +118,10 @@ pub fn handler(ctx: Context, args: ClaimRewardsArgsV0) -> Result // check epoch that's being claimed is over let epoch = current_epoch(registrar.clock_unix_timestamp()); if !TESTING { - require_gt!(epoch, args.epoch, ErrorCode::EpochNotOver,); - require_eq!( - args.epoch, - delegated_position.last_claimed_epoch + 1, - ErrorCode::InvalidClaimEpoch - ) + require_gt!(epoch, args.epoch, ErrorCode::EpochNotOver); + if delegated_position.is_claimed(args.epoch)? { + return Err(error!(ErrorCode::InvalidClaimEpoch)); + } } let delegated_vehnt_at_epoch = position.voting_power( @@ -148,7 +146,7 @@ pub fn handler(ctx: Context, args: ClaimRewardsArgsV0) -> Result ) .unwrap(); - delegated_position.last_claimed_epoch += 1; + delegated_position.set_claimed(args.epoch)?; let amount_left = ctx.accounts.delegator_pool.amount; transfer_v0( diff --git a/programs/helium-sub-daos/src/state.rs b/programs/helium-sub-daos/src/state.rs index 50f38bae0..749688e10 100644 --- a/programs/helium-sub-daos/src/state.rs +++ b/programs/helium-sub-daos/src/state.rs @@ -1,3 +1,4 @@ +use crate::error::ErrorCode; use anchor_lang::prelude::*; use crate::EPOCH_LENGTH; @@ -126,10 +127,47 @@ pub struct DelegatedPositionV0 { pub position: Pubkey, pub hnt_amount: u64, pub sub_dao: Pubkey, - pub last_claimed_epoch: u64, // the epoch number that the dnt rewards were last claimed at + pub last_claimed_epoch: u64, // the latest epoch not included claimed_epochs_bitmap pub start_ts: i64, pub purged: bool, // if true, this position has been removed from subdao calculations. rewards can still be claimed. pub bump_seed: u8, + // A bitmap of epochs past last_claimed_epoch (exclusive) that have been claimed. + // This bitmap gets rotated as last_claimed_epoch increases. + // This allows for claiming ~128 epochs worth of rewards in parallel. + pub claimed_epochs_bitmap: u128, +} + +impl DelegatedPositionV0 { + pub fn is_claimed(&self, epoch: u64) -> Result { + if epoch <= self.last_claimed_epoch { + Ok(true) + } else if epoch > self.last_claimed_epoch + 128 { + Err(error!(ErrorCode::InvalidClaimEpoch)) + } else { + let bit_index = (epoch - self.last_claimed_epoch - 1) as u128; + Ok(self.claimed_epochs_bitmap >> (127_u128 - bit_index) & 1 == 1) + } + } + + pub fn set_claimed(&mut self, epoch: u64) -> Result<()> { + if epoch <= self.last_claimed_epoch { + Err(error!(ErrorCode::InvalidClaimEpoch)) + } else if epoch > self.last_claimed_epoch + 128 { + Err(error!(ErrorCode::InvalidClaimEpoch)) + } else { + let bit_index = (epoch - self.last_claimed_epoch - 1) as u128; + // Set the bit at bit_index to 1 + self.claimed_epochs_bitmap |= 1_u128 << (127_u128 - bit_index); + + // Shift claimed_epochs_bitmap to the left until the first bit is 0 + while self.claimed_epochs_bitmap & (1_u128 << 127) != 0 { + self.claimed_epochs_bitmap <<= 1; + self.last_claimed_epoch += 1; + } + + Ok(()) + } + } } #[account] @@ -195,3 +233,26 @@ pub struct SubDaoV0 { pub dc_onboarding_fees_paid: u64, // the total amount of dc onboarding fees paid to this subdao by active hotspots (inactive hotspots are excluded) pub active_device_authority: Pubkey, // authority that can mark hotspots as active/inactive } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_claimed() { + let mut position = DelegatedPositionV0::default(); + let mut epoch = 2; + + assert!(!position.is_claimed(epoch).unwrap()); + position.set_claimed(epoch).unwrap(); + assert!(position.is_claimed(epoch).unwrap()); + assert_eq!(position.last_claimed_epoch, 0); + + epoch = 1; + assert!(!position.is_claimed(epoch).unwrap()); + position.set_claimed(epoch).unwrap(); + assert!(position.is_claimed(epoch).unwrap()); + assert_eq!(position.last_claimed_epoch, 2); + assert_eq!(position.claimed_epochs_bitmap, 0); + } +} diff --git a/tests/helium-sub-daos.ts b/tests/helium-sub-daos.ts index 8c504d1bf..8a1dcc774 100644 --- a/tests/helium-sub-daos.ts +++ b/tests/helium-sub-daos.ts @@ -818,15 +818,6 @@ describe("helium-sub-daos", () => { await program.account.subDaoEpochInfoV0.fetch(subDaoEpochInfo) ).epoch; - const thread = PublicKey.findProgramAddressSync( - [ - Buffer.from("thread", "utf8"), - subDao.toBuffer(), - Buffer.from("end-epoch", "utf8"), - ], - THREAD_PID - )[0]; - await program.methods .calculateUtilityScoreV0({ epoch,