Skip to content

Commit

Permalink
refactor: bubblegum backfill orders off of slot
Browse files Browse the repository at this point in the history
  • Loading branch information
kespinola committed Sep 17, 2024
1 parent febc55c commit 4b56567
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 11 deletions.
8 changes: 4 additions & 4 deletions backfill/src/worker/program_transformer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ impl ProgramTransformerWorkerArgs {
&self,
context: BubblegumBackfillContext,
forwarder: UnboundedSender<DownloadMetadataInfo>,
) -> Result<(JoinHandle<()>, Sender<(Option<i64>, TransactionInfo)>)> {
) -> Result<(JoinHandle<()>, Sender<TransactionInfo>)> {
let (sender, mut receiver) =
channel::<(Option<i64>, TransactionInfo)>(self.program_transformer_channel_size);
channel::<TransactionInfo>(self.program_transformer_channel_size);

let handle = tokio::spawn(async move {
let mut transactions = Vec::new();
Expand All @@ -35,9 +35,9 @@ impl ProgramTransformerWorkerArgs {
transactions.push(gap);
}

transactions.sort_by(|(a, _), (b, _)| a.cmp(b));
transactions.sort_by(|a, b| a.slot.cmp(&b.slot));

for (_, transaction) in transactions {
for transaction in transactions {
if let Err(e) = program_transformer.handle_transaction(&transaction).await {
error!("handle transaction: {:?}", e)
};
Expand Down
11 changes: 4 additions & 7 deletions backfill/src/worker/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ impl SignatureWorkerArgs {
pub fn start(
&self,
context: crate::BubblegumBackfillContext,
forwarder: Sender<(Option<i64>, TransactionInfo)>,
forwarder: Sender<TransactionInfo>,
) -> Result<(JoinHandle<()>, Sender<Signature>)> {
let (sig_sender, mut sig_receiver) = channel::<Signature>(self.signature_channel_size);
let worker_count = self.signature_worker_count;
Expand Down Expand Up @@ -177,16 +177,13 @@ impl SignatureWorkerArgs {

async fn queue_transaction<'a>(
client: Rpc,
sender: Sender<(Option<i64>, TransactionInfo)>,
sender: Sender<TransactionInfo>,
signature: Signature,
) -> Result<(), ErrorKind> {
let transaction = client.get_transaction(&signature).await?;

sender
.send((
transaction.block_time,
FetchedEncodedTransactionWithStatusMeta(transaction).try_into()?,
))
.send(FetchedEncodedTransactionWithStatusMeta(transaction).try_into()?)
.await
.map_err(|e| ErrorKind::Generic(e.to_string()))?;

Expand All @@ -195,7 +192,7 @@ async fn queue_transaction<'a>(

fn spawn_transaction_worker(
client: Rpc,
sender: Sender<(Option<i64>, TransactionInfo)>,
sender: Sender<TransactionInfo>,
signature: Signature,
) -> JoinHandle<()> {
tokio::spawn(async move {
Expand Down

0 comments on commit 4b56567

Please sign in to comment.