Skip to content

Commit

Permalink
[Tree Backfiller] Gap Filling (#121)
Browse files Browse the repository at this point in the history
* feat: find gaps in trees

* refactor(backfiller): search for gaps in cl_audits_v2. add worker queue for gaps so crawling done concurrently.
  • Loading branch information
kespinola authored and linuskendall committed Jan 11, 2024
1 parent a30096e commit c63c473
Show file tree
Hide file tree
Showing 3 changed files with 296 additions and 88 deletions.
130 changes: 114 additions & 16 deletions tree_backfiller/src/backfiller.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,18 @@
use crate::db;
use crate::tree::{TreeGapFill, TreeGapModel};
use crate::{
metrics::{Metrics, MetricsArgs},
queue,
rpc::{Rpc, SolanaRpcArgs},
tree,
};

use anyhow::Result;
use clap::{Parser, ValueEnum};
use digital_asset_types::dao::cl_audits_v2;
use indicatif::HumanDuration;
use log::{error, info};
use sea_orm::SqlxPostgresConnector;
use sea_orm::{sea_query::OnConflict, EntityTrait};
use sea_orm::{ColumnTrait, EntityTrait, QueryFilter, QueryOrder, SqlxPostgresConnector};
use solana_sdk::signature::Signature;
use std::str::FromStr;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Duration;
Expand All @@ -36,9 +35,16 @@ pub struct Args {
#[arg(long, env, default_value = "10000")]
pub signature_channel_size: usize,

/// The size of the signature channel. This is the number of signatures that can be queued up.
#[arg(long, env, default_value = "1000")]
pub gap_channel_size: usize,

#[arg(long, env, default_value = "100")]
pub transaction_worker_count: usize,

#[arg(long, env, default_value = "25")]
pub gap_worker_count: usize,

#[arg(long, env, use_value_delimiter = true)]
pub only_trees: Option<Vec<String>>,

Expand Down Expand Up @@ -125,14 +131,22 @@ impl Clone for Counter {
/// This function returns a `Result` which is `Ok` if the backfilling process completes
/// successfully, or an `Error` if any part of the process fails.
pub async fn run(config: Args) -> Result<()> {
let pool = db::connect(config.database).await?;

let solana_rpc = Rpc::from_config(config.solana);
let transaction_solana_rpc = solana_rpc.clone();
let gap_solana_rpc = solana_rpc.clone();

let metrics = Metrics::try_from_config(config.metrics)?;
let tree_metrics = metrics.clone();
let transaction_metrics = metrics.clone();
let gap_metrics = metrics.clone();

let (sig_sender, mut sig_receiver) = mpsc::channel::<Signature>(config.signature_channel_size);
let (gap_sender, mut gap_receiver) = mpsc::channel::<TreeGapFill>(config.gap_channel_size);

let gap_count = Counter::new();
let gap_worker_gap_count = gap_count.clone();

let transaction_count = Counter::new();
let transaction_worker_transaction_count = transaction_count.clone();
Expand Down Expand Up @@ -174,12 +188,47 @@ pub async fn run(config: Args) -> Result<()> {
Ok::<(), anyhow::Error>(())
});

tokio::spawn(async move {
let semaphore = Arc::new(Semaphore::new(config.gap_worker_count));

while let Some(gap) = gap_receiver.recv().await {
let solana_rpc = gap_solana_rpc.clone();
let metrics = gap_metrics.clone();
let sig_sender = sig_sender.clone();
let semaphore = semaphore.clone();
let count = gap_worker_gap_count.clone();

count.increment();

tokio::spawn(async move {
let _permit = semaphore.acquire().await?;

let timing = Instant::now();

if let Err(e) = gap.crawl(&solana_rpc, sig_sender).await {
error!("tree transaction: {:?}", e);
metrics.increment("gap.failed");
} else {
metrics.increment("gap.succeeded");
}

metrics.time("gap.queued", timing.elapsed());

count.decrement();

Ok::<(), anyhow::Error>(())
});
}

Ok::<(), anyhow::Error>(())
});

let started = Instant::now();

let trees = if let Some(only_trees) = config.only_trees {
tree::find(&solana_rpc, only_trees).await?
tree::TreeResponse::find(&solana_rpc, only_trees).await?
} else {
tree::all(&solana_rpc).await?
tree::TreeResponse::all(&solana_rpc).await?
};
let tree_count = trees.len();

Expand All @@ -193,28 +242,72 @@ pub async fn run(config: Args) -> Result<()> {
let mut crawl_handles = Vec::with_capacity(tree_count);

for tree in trees {
let client = solana_rpc.clone();
let semaphore = Arc::<tokio::sync::Semaphore>::clone(&semaphore);
let sig_sender = sig_sender.clone();
let gap_sender = gap_sender.clone();
let metrics = tree_metrics.clone();
let pool = pool.clone();
let conn = SqlxPostgresConnector::from_sqlx_postgres_pool(pool);

let crawl_handle = tokio::spawn(async move {
let _permit = semaphore.acquire().await?;

let timing = Instant::now();

if let Err(e) = tree.crawl(&client, sig_sender).await {
metrics.increment("tree.failed");
error!("crawling tree: {:?}", e);
} else {
let mut gaps = TreeGapModel::find(&conn, tree.pubkey)
.await?
.into_iter()
.map(TryInto::try_into)
.collect::<Result<Vec<_>, _>>()?;

let upper_known_seq = cl_audits_v2::Entity::find()
.filter(cl_audits_v2::Column::Tree.eq(tree.pubkey.as_ref().to_vec()))
.order_by_desc(cl_audits_v2::Column::Seq)
.one(&conn)
.await?;

let lower_known_seq = cl_audits_v2::Entity::find()
.filter(cl_audits_v2::Column::Tree.eq(tree.pubkey.as_ref().to_vec()))
.order_by_asc(cl_audits_v2::Column::Seq)
.one(&conn)
.await?;

if let Some(upper_seq) = upper_known_seq {
let signature = Signature::try_from(upper_seq.tx.as_ref())?;
info!(
"crawled tree {} in {}",
tree.pubkey,
HumanDuration(timing.elapsed())
"tree {} has known highest seq {} filling tree from {}",
tree.pubkey, upper_seq.seq, signature
);
metrics.increment("tree.succeeded");
gaps.push(TreeGapFill::new(tree.pubkey, None, Some(signature)));
} else if tree.seq > 0 {
info!(
"tree {} has no known highest seq but the actual seq is {} filling whole tree",
tree.pubkey, tree.seq
);
gaps.push(TreeGapFill::new(tree.pubkey, None, None));
}

if let Some(lower_seq) = lower_known_seq {
let signature = Signature::try_from(lower_seq.tx.as_ref())?;

info!(
"tree {} has known lowest seq {} filling tree starting at {}",
tree.pubkey, lower_seq.seq, signature
);
gaps.push(TreeGapFill::new(tree.pubkey, Some(signature), None));
}

let gap_count = gaps.len();

for gap in gaps {
if let Err(e) = gap_sender.send(gap).await {
metrics.increment("gap.failed");
error!("send gap: {:?}", e);
}
}

info!("crawling tree {} with {} gaps", tree.pubkey, gap_count);

metrics.increment("tree.succeeded");
metrics.time("tree.crawled", timing.elapsed());

Ok::<(), anyhow::Error>(())
Expand All @@ -224,8 +317,13 @@ pub async fn run(config: Args) -> Result<()> {
}

futures::future::try_join_all(crawl_handles).await?;
info!("crawled all trees");

gap_count.zero().await;
info!("all gaps queued");

transaction_count.zero().await;
info!("all transactions queued");

metrics.time("job.completed", started.elapsed());

Expand Down
2 changes: 2 additions & 0 deletions tree_backfiller/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,15 @@ impl Rpc {
&self,
pubkey: &Pubkey,
before: Option<Signature>,
until: Option<Signature>,
) -> Result<Vec<RpcConfirmedTransactionStatusWithSignature>, ClientError> {
(|| async {
self.0
.get_signatures_for_address_with_config(
pubkey,
GetConfirmedSignaturesForAddress2Config {
before,
until,
commitment: Some(CommitmentConfig {
commitment: CommitmentLevel::Finalized,
}),
Expand Down
Loading

0 comments on commit c63c473

Please sign in to comment.