diff --git a/tree_backfiller/src/backfiller.rs b/tree_backfiller/src/backfiller.rs index fa70220a..a206baa7 100644 --- a/tree_backfiller/src/backfiller.rs +++ b/tree_backfiller/src/backfiller.rs @@ -1,19 +1,18 @@ use crate::db; +use crate::tree::{TreeGapFill, TreeGapModel}; use crate::{ metrics::{Metrics, MetricsArgs}, queue, rpc::{Rpc, SolanaRpcArgs}, tree, }; - use anyhow::Result; use clap::{Parser, ValueEnum}; +use digital_asset_types::dao::cl_audits_v2; use indicatif::HumanDuration; use log::{error, info}; -use sea_orm::SqlxPostgresConnector; -use sea_orm::{sea_query::OnConflict, EntityTrait}; +use sea_orm::{ColumnTrait, EntityTrait, QueryFilter, QueryOrder, SqlxPostgresConnector}; use solana_sdk::signature::Signature; -use std::str::FromStr; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; use std::time::Duration; @@ -36,9 +35,16 @@ pub struct Args { #[arg(long, env, default_value = "10000")] pub signature_channel_size: usize, + /// The size of the signature channel. This is the number of signatures that can be queued up. + #[arg(long, env, default_value = "1000")] + pub gap_channel_size: usize, + #[arg(long, env, default_value = "100")] pub transaction_worker_count: usize, + #[arg(long, env, default_value = "25")] + pub gap_worker_count: usize, + #[arg(long, env, use_value_delimiter = true)] pub only_trees: Option>, @@ -125,14 +131,22 @@ impl Clone for Counter { /// This function returns a `Result` which is `Ok` if the backfilling process completes /// successfully, or an `Error` if any part of the process fails. pub async fn run(config: Args) -> Result<()> { + let pool = db::connect(config.database).await?; + let solana_rpc = Rpc::from_config(config.solana); let transaction_solana_rpc = solana_rpc.clone(); + let gap_solana_rpc = solana_rpc.clone(); let metrics = Metrics::try_from_config(config.metrics)?; let tree_metrics = metrics.clone(); let transaction_metrics = metrics.clone(); + let gap_metrics = metrics.clone(); let (sig_sender, mut sig_receiver) = mpsc::channel::(config.signature_channel_size); + let (gap_sender, mut gap_receiver) = mpsc::channel::(config.gap_channel_size); + + let gap_count = Counter::new(); + let gap_worker_gap_count = gap_count.clone(); let transaction_count = Counter::new(); let transaction_worker_transaction_count = transaction_count.clone(); @@ -174,12 +188,47 @@ pub async fn run(config: Args) -> Result<()> { Ok::<(), anyhow::Error>(()) }); + tokio::spawn(async move { + let semaphore = Arc::new(Semaphore::new(config.gap_worker_count)); + + while let Some(gap) = gap_receiver.recv().await { + let solana_rpc = gap_solana_rpc.clone(); + let metrics = gap_metrics.clone(); + let sig_sender = sig_sender.clone(); + let semaphore = semaphore.clone(); + let count = gap_worker_gap_count.clone(); + + count.increment(); + + tokio::spawn(async move { + let _permit = semaphore.acquire().await?; + + let timing = Instant::now(); + + if let Err(e) = gap.crawl(&solana_rpc, sig_sender).await { + error!("tree transaction: {:?}", e); + metrics.increment("gap.failed"); + } else { + metrics.increment("gap.succeeded"); + } + + metrics.time("gap.queued", timing.elapsed()); + + count.decrement(); + + Ok::<(), anyhow::Error>(()) + }); + } + + Ok::<(), anyhow::Error>(()) + }); + let started = Instant::now(); let trees = if let Some(only_trees) = config.only_trees { - tree::find(&solana_rpc, only_trees).await? + tree::TreeResponse::find(&solana_rpc, only_trees).await? } else { - tree::all(&solana_rpc).await? + tree::TreeResponse::all(&solana_rpc).await? }; let tree_count = trees.len(); @@ -193,28 +242,72 @@ pub async fn run(config: Args) -> Result<()> { let mut crawl_handles = Vec::with_capacity(tree_count); for tree in trees { - let client = solana_rpc.clone(); let semaphore = semaphore.clone(); - let sig_sender = sig_sender.clone(); + let gap_sender = gap_sender.clone(); let metrics = tree_metrics.clone(); + let pool = pool.clone(); + let conn = SqlxPostgresConnector::from_sqlx_postgres_pool(pool); let crawl_handle = tokio::spawn(async move { let _permit = semaphore.acquire().await?; let timing = Instant::now(); - if let Err(e) = tree.crawl(&client, sig_sender).await { - metrics.increment("tree.failed"); - error!("crawling tree: {:?}", e); - } else { + let mut gaps = TreeGapModel::find(&conn, tree.pubkey) + .await? + .into_iter() + .map(TryInto::try_into) + .collect::, _>>()?; + + let upper_known_seq = cl_audits_v2::Entity::find() + .filter(cl_audits_v2::Column::Tree.eq(tree.pubkey.as_ref().to_vec())) + .order_by_desc(cl_audits_v2::Column::Seq) + .one(&conn) + .await?; + + let lower_known_seq = cl_audits_v2::Entity::find() + .filter(cl_audits_v2::Column::Tree.eq(tree.pubkey.as_ref().to_vec())) + .order_by_asc(cl_audits_v2::Column::Seq) + .one(&conn) + .await?; + + if let Some(upper_seq) = upper_known_seq { + let signature = Signature::try_from(upper_seq.tx.as_ref())?; + info!( + "tree {} has known highest seq {} filling tree from {}", + tree.pubkey, upper_seq.seq, signature + ); + gaps.push(TreeGapFill::new(tree.pubkey, None, Some(signature))); + } else if tree.seq > 0 { + info!( + "tree {} has no known highest seq but the actual seq is {} filling whole tree", + tree.pubkey, tree.seq + ); + gaps.push(TreeGapFill::new(tree.pubkey, None, None)); + } + + if let Some(lower_seq) = lower_known_seq { + let signature = Signature::try_from(lower_seq.tx.as_ref())?; + info!( - "crawled tree {} in {}", - tree.pubkey, - HumanDuration(timing.elapsed()) + "tree {} has known lowest seq {} filling tree starting at {}", + tree.pubkey, lower_seq.seq, signature ); - metrics.increment("tree.succeeded"); + gaps.push(TreeGapFill::new(tree.pubkey, Some(signature), None)); } + let gap_count = gaps.len(); + + for gap in gaps { + if let Err(e) = gap_sender.send(gap).await { + metrics.increment("gap.failed"); + error!("send gap: {:?}", e); + } + } + + info!("crawling tree {} with {} gaps", tree.pubkey, gap_count); + + metrics.increment("tree.succeeded"); metrics.time("tree.crawled", timing.elapsed()); Ok::<(), anyhow::Error>(()) @@ -224,8 +317,13 @@ pub async fn run(config: Args) -> Result<()> { } futures::future::try_join_all(crawl_handles).await?; + info!("crawled all trees"); + + gap_count.zero().await; + info!("all gaps queued"); transaction_count.zero().await; + info!("all transactions queued"); metrics.time("job.completed", started.elapsed()); diff --git a/tree_backfiller/src/rpc.rs b/tree_backfiller/src/rpc.rs index 4d9e976b..361d88e1 100644 --- a/tree_backfiller/src/rpc.rs +++ b/tree_backfiller/src/rpc.rs @@ -62,6 +62,7 @@ impl Rpc { &self, pubkey: &Pubkey, before: Option, + until: Option, ) -> Result, ClientError> { (|| async { self.0 @@ -69,6 +70,7 @@ impl Rpc { pubkey, GetConfirmedSignaturesForAddress2Config { before, + until, commitment: Some(CommitmentConfig { commitment: CommitmentLevel::Finalized, }), diff --git a/tree_backfiller/src/tree.rs b/tree_backfiller/src/tree.rs index 30de2271..07f05bc3 100644 --- a/tree_backfiller/src/tree.rs +++ b/tree_backfiller/src/tree.rs @@ -4,9 +4,7 @@ use clap::Args; use flatbuffers::FlatBufferBuilder; use log::error; use plerkle_serialization::serializer::seralize_encoded_transaction_with_status; -use sea_orm::{ - ActiveValue::Set, ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter, QueryOrder, -}; +use sea_orm::{DatabaseConnection, DbBackend, FromQueryResult, Statement, Value}; use solana_client::rpc_filter::{Memcmp, RpcFilterType}; use solana_sdk::{account::Account, pubkey::Pubkey, signature::Signature}; use spl_account_compression::id; @@ -17,7 +15,6 @@ use std::str::FromStr; use thiserror::Error as ThisError; use tokio::sync::mpsc::Sender; -use crate::backfiller::CrawlDirection; use crate::{ queue::{QueuePool, QueuePoolError}, rpc::Rpc, @@ -48,7 +45,130 @@ pub enum TreeErrorKind { ParsePubkey(#[from] solana_sdk::pubkey::ParsePubkeyError), #[error("serialize tree response")] SerializeTreeResponse, + #[error("sea orm")] + Database(#[from] sea_orm::DbErr), + #[error("try from pubkey")] + TryFromPubkey, + #[error("try from signature")] + TryFromSignature, +} + +const TREE_GAP_SQL: &str = r#" +WITH sequenced_data AS ( + SELECT + tree, + seq, + LEAD(seq) OVER (ORDER BY seq ASC) AS next_seq, + tx AS current_tx, + LEAD(tx) OVER (ORDER BY seq ASC) AS next_tx + FROM + cl_audits_v2 + WHERE + tree = $1 +), +gaps AS ( + SELECT + tree, + seq AS gap_start_seq, + next_seq AS gap_end_seq, + current_tx AS lower_bound_tx, + next_tx AS upper_bound_tx + FROM + sequenced_data + WHERE + next_seq IS NOT NULL AND + next_seq - seq > 1 +) +SELECT + tree, + gap_start_seq, + gap_end_seq, + lower_bound_tx, + upper_bound_tx +FROM + gaps +ORDER BY + gap_start_seq; +"#; + +#[derive(Debug, FromQueryResult, PartialEq, Clone)] +pub struct TreeGapModel { + pub tree: Vec, + pub gap_start_seq: i64, + pub gap_end_seq: i64, + pub lower_bound_tx: Vec, + pub upper_bound_tx: Vec, +} + +impl TreeGapModel { + pub async fn find(conn: &DatabaseConnection, tree: Pubkey) -> Result, TreeErrorKind> { + let statement = Statement::from_sql_and_values( + DbBackend::Postgres, + TREE_GAP_SQL, + vec![Value::Bytes(Some(Box::new(tree.as_ref().to_vec())))], + ); + + TreeGapModel::find_by_statement(statement) + .all(conn) + .await + .map_err(Into::into) + } } + +impl TryFrom for TreeGapFill { + type Error = TreeErrorKind; + + fn try_from(model: TreeGapModel) -> Result { + let tree = Pubkey::try_from(model.tree).map_err(|_| TreeErrorKind::TryFromPubkey)?; + let upper = Signature::try_from(model.upper_bound_tx) + .map_err(|_| TreeErrorKind::TryFromSignature)?; + let lower = Signature::try_from(model.lower_bound_tx) + .map_err(|_| TreeErrorKind::TryFromSignature)?; + + Ok(Self::new(tree, Some(upper), Some(lower))) + } +} + +pub struct TreeGapFill { + tree: Pubkey, + before: Option, + until: Option, +} + +impl TreeGapFill { + pub fn new(tree: Pubkey, before: Option, until: Option) -> Self { + Self { + tree, + before, + until, + } + } + + pub async fn crawl(&self, client: &Rpc, sender: Sender) -> Result<()> { + let mut before = self.before; + + loop { + let sigs = client + .get_signatures_for_address(&self.tree, before, self.until) + .await?; + + for sig in sigs.iter() { + let sig = Signature::from_str(&sig.signature)?; + + sender.send(sig).await?; + + before = Some(sig); + } + + if sigs.len() < GET_SIGNATURES_FOR_ADDRESS_LIMIT { + break; + } + } + + Ok(()) + } +} + #[derive(Debug, Clone)] pub struct TreeHeaderResponse { pub max_depth: u32, @@ -62,6 +182,7 @@ impl TryFrom for TreeHeaderResponse { fn try_from(payload: ConcurrentMerkleTreeHeader) -> Result { let size = merkle_tree_get_size(&payload)?; + Ok(Self { max_depth: payload.get_max_depth(), max_buffer_size: payload.get_max_buffer_size(), @@ -75,14 +196,23 @@ impl TryFrom for TreeHeaderResponse { pub struct TreeResponse { pub pubkey: Pubkey, pub tree_header: TreeHeaderResponse, + pub seq: u64, } impl TreeResponse { pub fn try_from_rpc(pubkey: Pubkey, account: Account) -> Result { - let (header_bytes, _rest) = account.data.split_at(CONCURRENT_MERKLE_TREE_HEADER_SIZE_V1); + let bytes = account.data.as_slice(); + + let (header_bytes, rest) = bytes.split_at(CONCURRENT_MERKLE_TREE_HEADER_SIZE_V1); let header: ConcurrentMerkleTreeHeader = ConcurrentMerkleTreeHeader::try_from_slice(header_bytes)?; + let merkle_tree_size = merkle_tree_get_size(&header)?; + let (tree_bytes, _canopy_bytes) = rest.split_at(merkle_tree_size); + + let seq_bytes = tree_bytes[0..8].try_into()?; + let seq = u64::from_le_bytes(seq_bytes); + let (auth, _) = Pubkey::find_program_address(&[pubkey.as_ref()], &mpl_bubblegum::ID); header.assert_valid_authority(&auth)?; @@ -92,85 +222,63 @@ impl TreeResponse { Ok(Self { pubkey, tree_header, + seq, }) } - pub async fn crawl(&self, client: &Rpc, sender: Sender) -> Result<()> { - let mut before = None; - - loop { - let sigs = client - .get_signatures_for_address(&self.pubkey, before) - .await?; - - for sig in sigs.iter() { - let sig = Signature::from_str(&sig.signature)?; - - sender.send(sig).await?; - before = Some(sig); - } - - if sigs.len() < GET_SIGNATURES_FOR_ADDRESS_LIMIT { - break; - } - } - - Ok(()) + pub async fn all(client: &Rpc) -> Result, TreeErrorKind> { + Ok(client + .get_program_accounts( + &id(), + Some(vec![RpcFilterType::Memcmp(Memcmp::new_raw_bytes( + 0, + vec![1u8], + ))]), + ) + .await? + .into_iter() + .filter_map(|(pubkey, account)| Self::try_from_rpc(pubkey, account).ok()) + .collect()) } -} - -pub async fn all(client: &Rpc) -> Result, TreeErrorKind> { - Ok(client - .get_program_accounts( - &id(), - Some(vec![RpcFilterType::Memcmp(Memcmp::new_raw_bytes( - 0, - vec![1u8], - ))]), - ) - .await? - .into_iter() - .filter_map(|(pubkey, account)| TreeResponse::try_from_rpc(pubkey, account).ok()) - .collect()) -} -pub async fn find(client: &Rpc, pubkeys: Vec) -> Result, TreeErrorKind> { - let pubkeys: Vec = pubkeys - .into_iter() - .map(|p| Pubkey::from_str(&p)) - .collect::, _>>()?; - let pubkey_batches = pubkeys.chunks(100); - let pubkey_batches_count = pubkey_batches.len(); + pub async fn find(client: &Rpc, pubkeys: Vec) -> Result, TreeErrorKind> { + let pubkeys: Vec = pubkeys + .into_iter() + .map(|p| Pubkey::from_str(&p)) + .collect::, _>>()?; + let pubkey_batches = pubkeys.chunks(100); + let pubkey_batches_count = pubkey_batches.len(); - let mut gma_handles = Vec::with_capacity(pubkey_batches_count); + let mut gma_handles = Vec::with_capacity(pubkey_batches_count); - for batch in pubkey_batches { - gma_handles.push(async move { - let accounts = client.get_multiple_accounts(batch).await?; + for batch in pubkey_batches { + gma_handles.push(async move { + let accounts = client.get_multiple_accounts(batch).await?; - let results: Vec<(&Pubkey, Option)> = - batch.into_iter().zip(accounts).collect(); + let results: Vec<(&Pubkey, Option)> = + batch.into_iter().zip(accounts).collect(); - Ok::<_, TreeErrorKind>(results) - }) - } + Ok::<_, TreeErrorKind>(results) + }) + } - let result = futures::future::try_join_all(gma_handles).await?; + let result = futures::future::try_join_all(gma_handles).await?; - let trees = result - .into_iter() - .flatten() - .filter_map(|(pubkey, account)| { - if let Some(account) = account { - Some(TreeResponse::try_from_rpc(*pubkey, account)) - } else { - None - } - }) - .collect::, _>>() - .map_err(|_| TreeErrorKind::SerializeTreeResponse)?; + let trees = result + .into_iter() + .flatten() + .filter_map(|(pubkey, account)| { + if let Some(account) = account { + Some(Self::try_from_rpc(*pubkey, account)) + } else { + None + } + }) + .collect::, _>>() + .map_err(|_| TreeErrorKind::SerializeTreeResponse)?; - Ok(trees) + Ok(trees) + } } pub async fn transaction<'a>(