Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Tree Backfiller] Gap Filling #121

Merged
merged 2 commits into from
Dec 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 114 additions & 16 deletions tree_backfiller/src/backfiller.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,18 @@
use crate::db;
use crate::tree::{TreeGapFill, TreeGapModel};
use crate::{
metrics::{Metrics, MetricsArgs},
queue,
rpc::{Rpc, SolanaRpcArgs},
tree,
};

use anyhow::Result;
use clap::{Parser, ValueEnum};
use digital_asset_types::dao::cl_audits_v2;
use indicatif::HumanDuration;
use log::{error, info};
use sea_orm::SqlxPostgresConnector;
use sea_orm::{sea_query::OnConflict, EntityTrait};
use sea_orm::{ColumnTrait, EntityTrait, QueryFilter, QueryOrder, SqlxPostgresConnector};
use solana_sdk::signature::Signature;
use std::str::FromStr;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Duration;
Expand All @@ -36,9 +35,16 @@ pub struct Args {
#[arg(long, env, default_value = "10000")]
pub signature_channel_size: usize,

/// The size of the signature channel. This is the number of signatures that can be queued up.
#[arg(long, env, default_value = "1000")]
pub gap_channel_size: usize,

#[arg(long, env, default_value = "100")]
pub transaction_worker_count: usize,

#[arg(long, env, default_value = "25")]
pub gap_worker_count: usize,

#[arg(long, env, use_value_delimiter = true)]
pub only_trees: Option<Vec<String>>,

Expand Down Expand Up @@ -125,14 +131,22 @@ impl Clone for Counter {
/// This function returns a `Result` which is `Ok` if the backfilling process completes
/// successfully, or an `Error` if any part of the process fails.
pub async fn run(config: Args) -> Result<()> {
let pool = db::connect(config.database).await?;

let solana_rpc = Rpc::from_config(config.solana);
let transaction_solana_rpc = solana_rpc.clone();
let gap_solana_rpc = solana_rpc.clone();

let metrics = Metrics::try_from_config(config.metrics)?;
let tree_metrics = metrics.clone();
let transaction_metrics = metrics.clone();
let gap_metrics = metrics.clone();

let (sig_sender, mut sig_receiver) = mpsc::channel::<Signature>(config.signature_channel_size);
let (gap_sender, mut gap_receiver) = mpsc::channel::<TreeGapFill>(config.gap_channel_size);

let gap_count = Counter::new();
let gap_worker_gap_count = gap_count.clone();

let transaction_count = Counter::new();
let transaction_worker_transaction_count = transaction_count.clone();
Expand Down Expand Up @@ -174,12 +188,47 @@ pub async fn run(config: Args) -> Result<()> {
Ok::<(), anyhow::Error>(())
});

tokio::spawn(async move {
let semaphore = Arc::new(Semaphore::new(config.gap_worker_count));

while let Some(gap) = gap_receiver.recv().await {
let solana_rpc = gap_solana_rpc.clone();
let metrics = gap_metrics.clone();
let sig_sender = sig_sender.clone();
let semaphore = semaphore.clone();
let count = gap_worker_gap_count.clone();

count.increment();

tokio::spawn(async move {
let _permit = semaphore.acquire().await?;

let timing = Instant::now();

if let Err(e) = gap.crawl(&solana_rpc, sig_sender).await {
error!("tree transaction: {:?}", e);
metrics.increment("gap.failed");
} else {
metrics.increment("gap.succeeded");
}

metrics.time("gap.queued", timing.elapsed());

count.decrement();

Ok::<(), anyhow::Error>(())
});
}

Ok::<(), anyhow::Error>(())
});

let started = Instant::now();

let trees = if let Some(only_trees) = config.only_trees {
tree::find(&solana_rpc, only_trees).await?
tree::TreeResponse::find(&solana_rpc, only_trees).await?
} else {
tree::all(&solana_rpc).await?
tree::TreeResponse::all(&solana_rpc).await?
};
let tree_count = trees.len();

Expand All @@ -193,28 +242,72 @@ pub async fn run(config: Args) -> Result<()> {
let mut crawl_handles = Vec::with_capacity(tree_count);

for tree in trees {
let client = solana_rpc.clone();
let semaphore = semaphore.clone();
let sig_sender = sig_sender.clone();
let gap_sender = gap_sender.clone();
let metrics = tree_metrics.clone();
let pool = pool.clone();
let conn = SqlxPostgresConnector::from_sqlx_postgres_pool(pool);

let crawl_handle = tokio::spawn(async move {
let _permit = semaphore.acquire().await?;

let timing = Instant::now();

if let Err(e) = tree.crawl(&client, sig_sender).await {
metrics.increment("tree.failed");
error!("crawling tree: {:?}", e);
} else {
let mut gaps = TreeGapModel::find(&conn, tree.pubkey)
.await?
.into_iter()
.map(TryInto::try_into)
.collect::<Result<Vec<_>, _>>()?;

let upper_known_seq = cl_audits_v2::Entity::find()
.filter(cl_audits_v2::Column::Tree.eq(tree.pubkey.as_ref().to_vec()))
.order_by_desc(cl_audits_v2::Column::Seq)
.one(&conn)
.await?;

let lower_known_seq = cl_audits_v2::Entity::find()
.filter(cl_audits_v2::Column::Tree.eq(tree.pubkey.as_ref().to_vec()))
.order_by_asc(cl_audits_v2::Column::Seq)
.one(&conn)
.await?;

if let Some(upper_seq) = upper_known_seq {
let signature = Signature::try_from(upper_seq.tx.as_ref())?;
info!(
"tree {} has known highest seq {} filling tree from {}",
tree.pubkey, upper_seq.seq, signature
);
gaps.push(TreeGapFill::new(tree.pubkey, None, Some(signature)));
} else if tree.seq > 0 {
info!(
"tree {} has no known highest seq but the actual seq is {} filling whole tree",
tree.pubkey, tree.seq
);
gaps.push(TreeGapFill::new(tree.pubkey, None, None));
}

if let Some(lower_seq) = lower_known_seq {
let signature = Signature::try_from(lower_seq.tx.as_ref())?;

info!(
"crawled tree {} in {}",
tree.pubkey,
HumanDuration(timing.elapsed())
"tree {} has known lowest seq {} filling tree starting at {}",
tree.pubkey, lower_seq.seq, signature
);
metrics.increment("tree.succeeded");
gaps.push(TreeGapFill::new(tree.pubkey, Some(signature), None));
}

let gap_count = gaps.len();

for gap in gaps {
if let Err(e) = gap_sender.send(gap).await {
metrics.increment("gap.failed");
error!("send gap: {:?}", e);
}
}

info!("crawling tree {} with {} gaps", tree.pubkey, gap_count);

metrics.increment("tree.succeeded");
metrics.time("tree.crawled", timing.elapsed());

Ok::<(), anyhow::Error>(())
Expand All @@ -224,8 +317,13 @@ pub async fn run(config: Args) -> Result<()> {
}

futures::future::try_join_all(crawl_handles).await?;
info!("crawled all trees");

gap_count.zero().await;
info!("all gaps queued");

transaction_count.zero().await;
info!("all transactions queued");

metrics.time("job.completed", started.elapsed());

Expand Down
2 changes: 2 additions & 0 deletions tree_backfiller/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,15 @@ impl Rpc {
&self,
pubkey: &Pubkey,
before: Option<Signature>,
until: Option<Signature>,
) -> Result<Vec<RpcConfirmedTransactionStatusWithSignature>, ClientError> {
(|| async {
self.0
.get_signatures_for_address_with_config(
pubkey,
GetConfirmedSignaturesForAddress2Config {
before,
until,
commitment: Some(CommitmentConfig {
commitment: CommitmentLevel::Finalized,
}),
Expand Down
Loading
Loading