Skip to content

Commit

Permalink
feat(#504): Parallelize claim delegation rewards (#509)
Browse files Browse the repository at this point in the history
* Allow parallel claims

* Parallelize calls

* Appease clippy
  • Loading branch information
ChewingGlass authored Dec 19, 2023
1 parent 50471c3 commit 01b5f3d
Show file tree
Hide file tree
Showing 6 changed files with 143 additions and 27 deletions.
64 changes: 63 additions & 1 deletion packages/spl-utils/src/transaction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,7 @@ async function withRetries<A>(
throw new Error("Failed after retries")
}

type Status = {
export type Status = {
totalProgress: number;
currentBatchProgress: number;
currentBatchSize: number;
Expand Down Expand Up @@ -637,3 +637,65 @@ async function getAllTxns(
)
).flat();
}

// Batch instructions parallel into as many txs as it takes
export async function batchParallelInstructions(
provider: AnchorProvider,
instructions: TransactionInstruction[],
onProgress?: (status: Status) => void,
triesRemaining: number = 10 // Number of blockhashes to try resending txs with before giving up
): Promise<void> {
let currentTxInstructions: TransactionInstruction[] = [];
const blockhash = (await provider.connection.getLatestBlockhash()).blockhash;
const transactions: Transaction[] = [];

for (const instruction of instructions) {
currentTxInstructions.push(instruction);
const tx = new Transaction({
feePayer: provider.wallet.publicKey,
recentBlockhash: blockhash,
});
tx.add(...currentTxInstructions);
try {
if (
tx.serialize({
requireAllSignatures: false,
verifySignatures: false,
}).length >=
1232 - (64 + 32) * tx.signatures.length
) {
// yes it's ugly to throw and catch, but .serialize can _also_ throw this error
throw new Error("Transaction too large");
}
} catch (e: any) {
if (e.toString().includes("Transaction too large")) {
currentTxInstructions.pop();
const tx = new Transaction({
feePayer: provider.wallet.publicKey,
recentBlockhash: blockhash,
});
tx.add(...currentTxInstructions);
transactions.push(tx);
currentTxInstructions = [instruction];
} else {
throw e;
}
}
}

if (currentTxInstructions.length > 0) {
const tx = new Transaction({
feePayer: provider.wallet.publicKey,
recentBlockhash: blockhash,
});
tx.add(...currentTxInstructions);
transactions.push(tx);
}

await bulkSendTransactions(
provider,
transactions,
onProgress,
triesRemaining
);
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import {
delegatedPositionKey,
init,
} from "@helium/helium-sub-daos-sdk";
import { chunks, sendMultipleInstructions } from "@helium/spl-utils";
import { batchParallelInstructions, chunks, sendMultipleInstructions, Status } from "@helium/spl-utils";
import { PublicKey, TransactionInstruction } from "@solana/web3.js";
import { useAsyncCallback } from "react-async-hook";
import { useHeliumVsrState } from "../contexts/heliumVsrContext";
Expand All @@ -19,9 +19,11 @@ export const useClaimAllPositionsRewards = () => {
async ({
positions,
programId = PROGRAM_ID,
onProgress,
}: {
positions: PositionWithMeta[];
programId?: PublicKey;
onProgress?: (status: Status) => void;
}) => {
const isInvalid =
!unixNow || !provider || !positions.every((pos) => pos.hasRewards);
Expand Down Expand Up @@ -68,6 +70,12 @@ export const useClaimAllPositionsRewards = () => {
);
}

await batchParallelInstructions(
provider,
multiDemArray.flat(),
onProgress
);

for (const positionInsturctions of multiDemArray) {
// This is an arbitrary threshold and we assume that up to 4 instructions can be inserted as a single Tx
const ixsChunks = chunks(positionInsturctions, 4);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import {
delegatedPositionKey,
init,
} from "@helium/helium-sub-daos-sdk";
import { chunks, sendMultipleInstructions } from "@helium/spl-utils";
import { batchParallelInstructions, Status } from "@helium/spl-utils";
import { PublicKey, TransactionInstruction } from "@solana/web3.js";
import { useAsyncCallback } from "react-async-hook";
import { useHeliumVsrState } from "../contexts/heliumVsrContext";
Expand All @@ -19,9 +19,11 @@ export const useClaimPositionRewards = () => {
async ({
position,
programId = PROGRAM_ID,
onProgress,
}: {
position: PositionWithMeta;
programId?: PublicKey;
onProgress?: (status: Status) => void;
}) => {
const isInvalid = !unixNow || !provider || !position.hasRewards;

Expand Down Expand Up @@ -60,13 +62,7 @@ export const useClaimPositionRewards = () => {
)
);

// This is an arbitrary threshold and we assume that up to 4 instructions can be inserted as a single Tx
const ixsChunks = chunks(instructions, 4);
await sendMultipleInstructions(
provider,
ixsChunks,
ixsChunks.map((_) => [])
);
await batchParallelInstructions(provider, instructions, onProgress);
}
}
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,12 +118,10 @@ pub fn handler(ctx: Context<ClaimRewardsV0>, args: ClaimRewardsArgsV0) -> Result
// check epoch that's being claimed is over
let epoch = current_epoch(registrar.clock_unix_timestamp());
if !TESTING {
require_gt!(epoch, args.epoch, ErrorCode::EpochNotOver,);
require_eq!(
args.epoch,
delegated_position.last_claimed_epoch + 1,
ErrorCode::InvalidClaimEpoch
)
require_gt!(epoch, args.epoch, ErrorCode::EpochNotOver);
if delegated_position.is_claimed(args.epoch)? {
return Err(error!(ErrorCode::InvalidClaimEpoch));
}
}

let delegated_vehnt_at_epoch = position.voting_power(
Expand All @@ -148,7 +146,7 @@ pub fn handler(ctx: Context<ClaimRewardsV0>, args: ClaimRewardsArgsV0) -> Result
)
.unwrap();

delegated_position.last_claimed_epoch += 1;
delegated_position.set_claimed(args.epoch)?;

let amount_left = ctx.accounts.delegator_pool.amount;
transfer_v0(
Expand Down
63 changes: 62 additions & 1 deletion programs/helium-sub-daos/src/state.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::error::ErrorCode;
use anchor_lang::prelude::*;

use crate::EPOCH_LENGTH;
Expand Down Expand Up @@ -126,10 +127,47 @@ pub struct DelegatedPositionV0 {
pub position: Pubkey,
pub hnt_amount: u64,
pub sub_dao: Pubkey,
pub last_claimed_epoch: u64, // the epoch number that the dnt rewards were last claimed at
pub last_claimed_epoch: u64, // the latest epoch not included claimed_epochs_bitmap
pub start_ts: i64,
pub purged: bool, // if true, this position has been removed from subdao calculations. rewards can still be claimed.
pub bump_seed: u8,
// A bitmap of epochs past last_claimed_epoch (exclusive) that have been claimed.
// This bitmap gets rotated as last_claimed_epoch increases.
// This allows for claiming ~128 epochs worth of rewards in parallel.
pub claimed_epochs_bitmap: u128,
}

impl DelegatedPositionV0 {
pub fn is_claimed(&self, epoch: u64) -> Result<bool> {
if epoch <= self.last_claimed_epoch {
Ok(true)
} else if epoch > self.last_claimed_epoch + 128 {
Err(error!(ErrorCode::InvalidClaimEpoch))
} else {
let bit_index = (epoch - self.last_claimed_epoch - 1) as u128;
Ok(self.claimed_epochs_bitmap >> (127_u128 - bit_index) & 1 == 1)
}
}

pub fn set_claimed(&mut self, epoch: u64) -> Result<()> {
if epoch <= self.last_claimed_epoch {
Err(error!(ErrorCode::InvalidClaimEpoch))
} else if epoch > self.last_claimed_epoch + 128 {
Err(error!(ErrorCode::InvalidClaimEpoch))
} else {
let bit_index = (epoch - self.last_claimed_epoch - 1) as u128;
// Set the bit at bit_index to 1
self.claimed_epochs_bitmap |= 1_u128 << (127_u128 - bit_index);

// Shift claimed_epochs_bitmap to the left until the first bit is 0
while self.claimed_epochs_bitmap & (1_u128 << 127) != 0 {
self.claimed_epochs_bitmap <<= 1;
self.last_claimed_epoch += 1;
}

Ok(())
}
}
}

#[account]
Expand Down Expand Up @@ -195,3 +233,26 @@ pub struct SubDaoV0 {
pub dc_onboarding_fees_paid: u64, // the total amount of dc onboarding fees paid to this subdao by active hotspots (inactive hotspots are excluded)
pub active_device_authority: Pubkey, // authority that can mark hotspots as active/inactive
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_claimed() {
let mut position = DelegatedPositionV0::default();
let mut epoch = 2;

assert!(!position.is_claimed(epoch).unwrap());
position.set_claimed(epoch).unwrap();
assert!(position.is_claimed(epoch).unwrap());
assert_eq!(position.last_claimed_epoch, 0);

epoch = 1;
assert!(!position.is_claimed(epoch).unwrap());
position.set_claimed(epoch).unwrap();
assert!(position.is_claimed(epoch).unwrap());
assert_eq!(position.last_claimed_epoch, 2);
assert_eq!(position.claimed_epochs_bitmap, 0);
}
}
9 changes: 0 additions & 9 deletions tests/helium-sub-daos.ts
Original file line number Diff line number Diff line change
Expand Up @@ -818,15 +818,6 @@ describe("helium-sub-daos", () => {
await program.account.subDaoEpochInfoV0.fetch(subDaoEpochInfo)
).epoch;

const thread = PublicKey.findProgramAddressSync(
[
Buffer.from("thread", "utf8"),
subDao.toBuffer(),
Buffer.from("end-epoch", "utf8"),
],
THREAD_PID
)[0];

await program.methods
.calculateUtilityScoreV0({
epoch,
Expand Down

0 comments on commit 01b5f3d

Please sign in to comment.