diff --git a/Cargo.lock b/Cargo.lock index 7e33acdb..5b941223 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1556,6 +1556,63 @@ dependencies = [ "syn 2.0.38", ] +[[package]] +name = "das-tree-backfiller" +version = "0.8.0" +dependencies = [ + "anchor-client", + "anchor-lang", + "anyhow", + "async-trait", + "base64 0.21.4", + "blockbuster", + "borsh 0.10.3", + "bs58 0.4.0", + "cadence", + "cadence-macros", + "chrono", + "clap 4.4.6", + "digital_asset_types", + "env_logger 0.10.0", + "figment", + "flatbuffers", + "futures", + "futures-util", + "lazy_static", + "log", + "mpl-bubblegum", + "num-traits", + "plerkle_messenger", + "plerkle_serialization", + "rand 0.8.5", + "redis", + "regex", + "reqwest", + "rust-crypto", + "sea-orm", + "sea-query 0.28.5", + "serde", + "serde_json", + "solana-account-decoder", + "solana-client", + "solana-geyser-plugin-interface", + "solana-sdk", + "solana-sdk-macro", + "solana-transaction-status", + "spl-account-compression", + "spl-concurrent-merkle-tree", + "spl-token 4.0.0", + "sqlx", + "stretto", + "thiserror", + "tokio", + "tokio-postgres", + "tokio-stream", + "tracing-subscriber", + "url", + "uuid", +] + [[package]] name = "data-encoding" version = "2.4.0" @@ -6373,18 +6430,18 @@ checksum = "222a222a5bfe1bba4a77b45ec488a741b3cb8872e5e499451fd7d0129c9c7c3d" [[package]] name = "thiserror" -version = "1.0.49" +version = "1.0.50" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1177e8c6d7ede7afde3585fd2513e611227efd6481bd78d2e82ba1ce16557ed4" +checksum = "f9a7210f5c9a7156bb50aa36aed4c95afb51df0df00713949448cf9e97d382d2" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.49" +version = "1.0.50" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "10712f02019e9288794769fba95cd6847df9874d49d871d062172f9dd41bc4cc" +checksum = "266b2e40bc00e5a6c09c3584011e08b06f123c00362c92b975ba9843aaaa14b8" dependencies = [ "proc-macro2 1.0.69", "quote 1.0.33", diff --git a/Cargo.toml b/Cargo.toml index f4b7545b..a96d5d56 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,6 +3,7 @@ members = [ "digital_asset_types", "metaplex-rpc-proxy", "nft_ingester", + "tree_backfiller", "tools/acc_forwarder", "tools/bgtask_creator", "tools/fetch_trees", diff --git a/backfiller.yaml b/backfiller.yaml new file mode 100644 index 00000000..e69de29b diff --git a/digital_asset_types/Cargo.toml b/digital_asset_types/Cargo.toml index 8e4b18bb..07a08f46 100644 --- a/digital_asset_types/Cargo.toml +++ b/digital_asset_types/Cargo.toml @@ -7,10 +7,17 @@ publish = false [dependencies] spl-concurrent-merkle-tree = "0.2.0" -sea-orm = { optional = true, version = "0.10.6", features = ["macros", "runtime-tokio-rustls", "sqlx-postgres", "with-chrono", "mock"] } +sea-orm = { optional = true, version = "0.10.6", features = [ + "macros", + "runtime-tokio-rustls", + "sqlx-postgres", + "with-chrono", +] } sea-query = { version = "0.28.1", features = ["postgres-array"] } serde = { version = "1.0.137", optional = true } -serde_json = { version = "1.0.81", optional = true, features=["preserve_order"] } +serde_json = { version = "1.0.81", optional = true, features = [ + "preserve_order", +] } bs58 = "0.4.0" borsh = { version = "~0.10.3", optional = true } borsh-derive = { version = "~0.10.3", optional = true } diff --git a/digital_asset_types/src/dao/generated/cl_audits.rs b/digital_asset_types/src/dao/generated/cl_audits.rs index ac22cc90..f013a713 100644 --- a/digital_asset_types/src/dao/generated/cl_audits.rs +++ b/digital_asset_types/src/dao/generated/cl_audits.rs @@ -92,6 +92,6 @@ impl From for ActiveModel { seq: item.seq, leaf_idx: item.leaf_idx, ..Default::default() - } + }; } -} \ No newline at end of file +} diff --git a/digital_asset_types/src/dao/generated/mod.rs b/digital_asset_types/src/dao/generated/mod.rs index 5db9a869..64fef921 100644 --- a/digital_asset_types/src/dao/generated/mod.rs +++ b/digital_asset_types/src/dao/generated/mod.rs @@ -16,3 +16,4 @@ pub mod sea_orm_active_enums; pub mod tasks; pub mod token_accounts; pub mod tokens; +pub mod tree_transactions; diff --git a/digital_asset_types/src/dao/generated/prelude.rs b/digital_asset_types/src/dao/generated/prelude.rs index 79759cd1..76403b9e 100644 --- a/digital_asset_types/src/dao/generated/prelude.rs +++ b/digital_asset_types/src/dao/generated/prelude.rs @@ -13,3 +13,4 @@ pub use super::raw_txn::Entity as RawTxn; pub use super::tasks::Entity as Tasks; pub use super::token_accounts::Entity as TokenAccounts; pub use super::tokens::Entity as Tokens; +pub use super::tree_transactions::Entity as TreeTransactions; diff --git a/digital_asset_types/src/dao/generated/tree_transactions.rs b/digital_asset_types/src/dao/generated/tree_transactions.rs new file mode 100644 index 00000000..3fdddf05 --- /dev/null +++ b/digital_asset_types/src/dao/generated/tree_transactions.rs @@ -0,0 +1,67 @@ +//! `SeaORM` Entity. Generated by sea-orm-codegen 0.10.5 + +use sea_orm::entity::prelude::*; +use serde::{Deserialize, Serialize}; + +#[derive(Copy, Clone, Default, Debug, DeriveEntity)] +pub struct Entity; + +impl EntityName for Entity { + fn table_name(&self) -> &str { + "tree_transactions" + } +} + +#[derive(Clone, Debug, PartialEq, DeriveModel, DeriveActiveModel, Eq, Serialize, Deserialize)] +pub struct Model { + pub signature: String, + pub tree: Vec, + pub slot: i64, + pub created_at: Option, + pub processed_at: Option, +} + +#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)] +pub enum Column { + Signature, + Tree, + Slot, + CreatedAt, + ProcessedAt, +} + +#[derive(Copy, Clone, Debug, EnumIter, DerivePrimaryKey)] +pub enum PrimaryKey { + Signature, +} + +impl PrimaryKeyTrait for PrimaryKey { + type ValueType = String; + fn auto_increment() -> bool { + false + } +} + +#[derive(Copy, Clone, Debug, EnumIter)] +pub enum Relation {} + +impl ColumnTrait for Column { + type EntityName = Entity; + fn def(&self) -> ColumnDef { + match self { + Self::Signature => ColumnType::Char(Some(84u32)).def(), + Self::Tree => ColumnType::Binary.def(), + Self::Slot => ColumnType::BigInteger.def(), + Self::CreatedAt => ColumnType::TimestampWithTimeZone.def().null(), + Self::ProcessedAt => ColumnType::TimestampWithTimeZone.def().null(), + } + } +} + +impl RelationTrait for Relation { + fn def(&self) -> RelationDef { + panic!("No RelationDef") + } +} + +impl ActiveModelBehavior for ActiveModel {} diff --git a/digital_asset_types/src/dao/scopes/asset.rs b/digital_asset_types/src/dao/scopes/asset.rs index ebe406e2..da58b5d2 100644 --- a/digital_asset_types/src/dao/scopes/asset.rs +++ b/digital_asset_types/src/dao/scopes/asset.rs @@ -414,7 +414,8 @@ pub async fn get_signatures_for_asset( ) -> Result)>, DbErr> { // if tree_id and leaf_idx are provided, use them directly to fetch transactions if let (Some(tree_id), Some(leaf_idx)) = (tree_id, leaf_idx) { - let transactions = fetch_transactions(conn, tree_id, leaf_idx, sort_direction, pagination, limit).await?; + let transactions = + fetch_transactions(conn, tree_id, leaf_idx, sort_direction, pagination, limit).await?; return Ok(transactions); } @@ -442,7 +443,8 @@ pub async fn get_signatures_for_asset( let leaf_id = asset .nonce .ok_or(DbErr::RecordNotFound("Leaf ID does not exist".to_string()))?; - let transactions = fetch_transactions(conn, tree, leaf_id, sort_direction, pagination, limit).await?; + let transactions = + fetch_transactions(conn, tree, leaf_id, sort_direction, pagination, limit).await?; Ok(transactions) } else { Ok(Vec::new()) @@ -461,7 +463,13 @@ pub async fn fetch_transactions( stmt = stmt.filter(cl_audits::Column::LeafIdx.eq(leaf_id)); stmt = stmt.order_by(cl_audits::Column::CreatedAt, sea_orm::Order::Desc); - stmt = paginate(pagination, limit, stmt, sort_direction, cl_audits::Column::Id); + stmt = paginate( + pagination, + limit, + stmt, + sort_direction, + cl_audits::Column::Id, + ); let transactions = stmt.all(conn).await?; let transaction_list: Vec<(String, Option)> = transactions .into_iter() diff --git a/migration/src/lib.rs b/migration/src/lib.rs index 4ae2cfe6..b2c175d1 100644 --- a/migration/src/lib.rs +++ b/migration/src/lib.rs @@ -32,6 +32,7 @@ mod m20230918_182123_add_raw_name_symbol; mod m20230919_072154_cl_audits; mod m20231101_120101_add_instruction_into_cl_audit; mod m20231101_120101_cl_audit_table_index; +mod m20231208_103949_create_tree_transactions_table; pub struct Migrator; @@ -71,6 +72,7 @@ impl MigratorTrait for Migrator { Box::new(m20230919_072154_cl_audits::Migration), Box::new(m20231101_120101_add_instruction_into_cl_audit::Migration), Box::new(m20231101_120101_cl_audit_table_index::Migration), + Box::new(m20231208_103949_create_tree_transactions_table::Migration), ] } } diff --git a/migration/src/m20231208_103949_create_tree_transactions_table.rs b/migration/src/m20231208_103949_create_tree_transactions_table.rs new file mode 100644 index 00000000..c3006151 --- /dev/null +++ b/migration/src/m20231208_103949_create_tree_transactions_table.rs @@ -0,0 +1,61 @@ +use sea_orm_migration::prelude::*; + +#[derive(DeriveMigrationName)] +pub struct Migration; + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .create_table( + Table::create() + .table(TreeTransactions::Table) + .if_not_exists() + .col( + ColumnDef::new(TreeTransactions::Signature) + .char_len(88) + .not_null() + .primary_key(), + ) + .col(ColumnDef::new(TreeTransactions::Tree).binary().not_null()) + .col(ColumnDef::new(TreeTransactions::Slot).big_integer().not_null()) + .col(ColumnDef::new(TreeTransactions::CreatedAt).timestamp_with_time_zone().default("now()")) + .col(ColumnDef::new(TreeTransactions::ProcessedAt).timestamp_with_time_zone()) + .to_owned(), + ) + .await?; + + manager + .create_index( + Index::create() + .name("tree_slot_index") + .table(TreeTransactions::Table) + .col(TreeTransactions::Tree) + .col(TreeTransactions::Slot) + .unique() + .to_owned(), + ) + .await + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .drop_index(Index::drop().name("tree_slot_index").table(TreeTransactions::Table).to_owned()) + .await?; + + manager + .drop_table(Table::drop().table(TreeTransactions::Table).to_owned()) + .await + } +} + +/// Learn more at https://docs.rs/sea-query#iden +#[derive(Iden)] +enum TreeTransactions { + Table, + Signature, + Tree, + CreatedAt, + ProcessedAt, + Slot, +} diff --git a/nft_ingester/Cargo.toml b/nft_ingester/Cargo.toml index 6fb9f998..1115020c 100644 --- a/nft_ingester/Cargo.toml +++ b/nft_ingester/Cargo.toml @@ -8,15 +8,33 @@ publish = false hex = "0.4.3" log = "0.4.17" env_logger = "0.10.0" -redis = { version = "0.22.3", features = ["aio", "tokio-comp", "streams", "tokio-native-tls-comp"] } -futures = {version = "0.3.25"} +redis = { version = "0.22.3", features = [ + "aio", + "tokio-comp", + "streams", + "tokio-native-tls-comp", +] } +futures = { version = "0.3.25" } futures-util = "0.3.27" base64 = "0.21.0" thiserror = "1.0.31" serde_json = "1.0.81" tokio = { version = "1.26.0", features = ["full", "tracing"] } -sqlx = { version = "0.6.2", features = ["macros", "runtime-tokio-rustls", "postgres", "uuid", "offline", "json"] } -sea-orm = { version = "0.10.6", features = ["macros", "runtime-tokio-rustls", "sqlx-postgres", "with-chrono", "mock"] } +sqlx = { version = "0.6.2", features = [ + "macros", + "runtime-tokio-rustls", + "postgres", + "uuid", + "offline", + "json", +] } +sea-orm = { version = "0.10.6", features = [ + "macros", + "runtime-tokio-rustls", + "sqlx-postgres", + "with-chrono", + "mock", +] } sea-query = { version = "0.28.1", features = ["postgres-array"] } chrono = "0.4.19" tokio-postgres = "0.7.7" @@ -28,7 +46,10 @@ plerkle_serialization = { version = "1.6.0" } flatbuffers = "23.1.21" lazy_static = "1.4.0" regex = "1.5.5" -digital_asset_types = { path = "../digital_asset_types", features = ["json_types", "sql_types"] } +digital_asset_types = { path = "../digital_asset_types", features = [ + "json_types", + "sql_types", +] } mpl-bubblegum = "1.0.1-beta.3" spl-account-compression = { version = "0.2.0", features = ["no-entrypoint"] } spl-concurrent-merkle-tree = "0.2.0" @@ -48,7 +69,7 @@ solana-geyser-plugin-interface = "~1.16.16" solana-sdk-macro = "~1.16.16" rand = "0.8.5" rust-crypto = "0.2.36" -url="2.3.1" +url = "2.3.1" anchor-lang = "0.28.0" borsh = "~0.10.3" stretto = { version = "0.7", features = ["async"] } diff --git a/nft_ingester/src/error/mod.rs b/nft_ingester/src/error/mod.rs index 37ed5f24..46bd678f 100644 --- a/nft_ingester/src/error/mod.rs +++ b/nft_ingester/src/error/mod.rs @@ -52,6 +52,10 @@ pub enum IngesterError { HttpError { status_code: String }, #[error("AssetIndex Error {0}")] AssetIndexError(String), + #[error("TryFromInt Error {0}")] + TryFromInt(#[from] std::num::TryFromIntError), + #[error("Chrono FixedOffset Error")] + ChronoFixedOffset, } impl From for IngesterError { diff --git a/nft_ingester/src/program_transformers/bubblegum/db.rs b/nft_ingester/src/program_transformers/bubblegum/db.rs index 0d6be6df..73c7bb07 100644 --- a/nft_ingester/src/program_transformers/bubblegum/db.rs +++ b/nft_ingester/src/program_transformers/bubblegum/db.rs @@ -1,16 +1,78 @@ use crate::error::IngesterError; use digital_asset_types::dao::{ - asset, asset_creators, asset_grouping, backfill_items, cl_audits, cl_items, + asset, asset_creators, asset_grouping, backfill_items, cl_audits, cl_items, tree_transactions, }; -use log::{debug, info, error}; +use log::{debug, error, info}; use mpl_bubblegum::types::Collection; use sea_orm::{ - query::*, sea_query::OnConflict, ActiveValue::Set, ColumnTrait, DbBackend, EntityTrait, + query::*, sea_query::OnConflict, ActiveModelTrait, ActiveValue::Set, ColumnTrait, DbBackend, + EntityTrait, }; use spl_account_compression::events::ChangeLogEventV1; use std::convert::From; +/// Mark tree transaction as processed. If the transaction already exists, update the `processed_at` field. +/// +/// This function takes in a tree ID, slot, transaction ID, and a transaction object. +/// It first checks if a tree transaction with the given transaction ID already exists. +/// If it does, it updates the `processed_at` field of the existing tree transaction with the current time. +/// If it doesn't, it creates a new tree transaction with the provided parameters and saves it. +/// +/// # Arguments +/// +/// * `tree_id` - A vector of bytes representing the ID of the tree. +/// * `slot` - A 64-bit unsigned integer representing the slot. +/// * `txn_id` - A string slice representing the transaction ID. +/// * `txn` - A reference to a transaction object. +/// +/// # Returns +/// +/// This function returns a `Result` that contains an empty tuple, or an `IngesterError` if the operation fails. +pub async fn save_tree_transaction<'c, T>( + tree_id: Vec, + slot: u64, + txn_id: &str, + txn: &T, +) -> Result<(), IngesterError> +where + T: ConnectionTrait + TransactionTrait, +{ + let now = chrono::Utc::now() + .with_timezone(&chrono::FixedOffset::east_opt(0).ok_or(IngesterError::ChronoFixedOffset)?); + + let tree_transaction = tree_transactions::Entity::find() + .filter(tree_transactions::Column::Signature.eq(txn_id)) + .one(txn) + .await?; + + if let Some(tree_transaction) = tree_transaction { + let mut tree_transaction: tree_transactions::ActiveModel = tree_transaction.into(); + + tree_transaction.processed_at = Set(Some(now)); + + tree_transaction.save(txn).await?; + } else { + let tree_transaction = tree_transactions::ActiveModel { + signature: Set(txn_id.to_string()), + slot: Set(i64::try_from(slot)?), + tree: Set(tree_id.to_vec()), + processed_at: Set(Some(now)), + ..Default::default() + }; + + tree_transactions::Entity::insert(tree_transaction) + .on_conflict( + OnConflict::column(tree_transactions::Column::Signature) + .do_nothing() + .to_owned(), + ) + .exec(txn) + .await?; + } + Ok(()) +} + pub async fn save_changelog_event<'c, T>( change_log_event: &ChangeLogEventV1, slot: u64, @@ -103,36 +165,7 @@ where } } - // If and only if the entire path of nodes was inserted into the `cl_items` table, then insert - // a single row into the `backfill_items` table. This way if an incomplete path was inserted - // into `cl_items` due to an error, a gap will be created for the tree and the backfiller will - // fix it. - if i - 1 == depth as i64 { - // See if the tree already exists in the `backfill_items` table. - let rows = backfill_items::Entity::find() - .filter(backfill_items::Column::Tree.eq(tree_id)) - .limit(1) - .all(txn) - .await?; - - // If the tree does not exist in `backfill_items` and the sequence number is greater than 1, - // then we know we will need to backfill the tree from sequence number 1 up to the current - // sequence number. So in this case we set at flag to force checking the tree. - let force_chk = rows.is_empty() && change_log_event.seq > 1; - - info!("Adding to backfill_items table at level {}", i - 1); - let item = backfill_items::ActiveModel { - tree: Set(tree_id.to_vec()), - seq: Set(change_log_event.seq as i64), - slot: Set(slot as i64), - force_chk: Set(force_chk), - backfilled: Set(false), - failed: Set(false), - ..Default::default() - }; - - backfill_items::Entity::insert(item).exec(txn).await?; - } + // TODO: drop `backfill_items` table if not needed anymore for backfilling Ok(()) //TODO -> set maximum size of path and break into multiple statements diff --git a/nft_ingester/src/program_transformers/bubblegum/mod.rs b/nft_ingester/src/program_transformers/bubblegum/mod.rs index 4e735576..e73c9004 100644 --- a/nft_ingester/src/program_transformers/bubblegum/mod.rs +++ b/nft_ingester/src/program_transformers/bubblegum/mod.rs @@ -84,18 +84,32 @@ where decompress::decompress(parsing_result, bundle, txn).await?; } InstructionName::VerifyCreator => { - creator_verification::process(parsing_result, bundle, txn, true, cl_audits, ix_str).await?; + creator_verification::process(parsing_result, bundle, txn, true, cl_audits, ix_str) + .await?; } InstructionName::UnverifyCreator => { - creator_verification::process(parsing_result, bundle, txn, false, cl_audits, ix_str).await?; + creator_verification::process(parsing_result, bundle, txn, false, cl_audits, ix_str) + .await?; } InstructionName::VerifyCollection | InstructionName::UnverifyCollection | InstructionName::SetAndVerifyCollection => { - collection_verification::process(parsing_result, bundle, txn, cl_audits, ix_str).await?; + collection_verification::process(parsing_result, bundle, txn, cl_audits, ix_str) + .await?; } _ => debug!("Bubblegum: Not Implemented Instruction"), } + // TODO: assuming tree update available on all transactions but need to confirm. + if let Some(tree_update) = &parsing_result.tree_update { + save_tree_transaction( + tree_update.id.to_bytes().to_vec(), + bundle.slot, + bundle.txn_id, + txn, + ) + .await?; + } + Ok(()) } diff --git a/tree_backfiller/Cargo.toml b/tree_backfiller/Cargo.toml new file mode 100644 index 00000000..4d4b1285 --- /dev/null +++ b/tree_backfiller/Cargo.toml @@ -0,0 +1,88 @@ +[package] +name = "das-tree-backfiller" +version = "0.8.0" +edition = "2021" +publish = false + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +log = "0.4.17" +env_logger = "0.10.0" +anyhow = "1.0.75" +redis = { version = "0.22.3", features = [ + "aio", + "tokio-comp", + "streams", + "tokio-native-tls-comp", +] } +futures = { version = "0.3.25" } +futures-util = "0.3.27" +base64 = "0.21.0" +thiserror = "1.0.31" +serde_json = "1.0.81" +anchor-client = "0.28.0" +tokio = { version = "1.26.0", features = ["full", "tracing"] } +sqlx = { version = "0.6.2", features = [ + "macros", + "runtime-tokio-rustls", + "postgres", + "uuid", + "offline", + "json", +] } +sea-orm = { version = "0.10.6", features = [ + "macros", + "runtime-tokio-rustls", + "sqlx-postgres", + "with-chrono", + "mock", +] } +sea-query = { version = "0.28.1", features = ["postgres-array"] } +chrono = "0.4.19" +tokio-postgres = "0.7.7" +serde = "1.0.136" +bs58 = "0.4.0" +reqwest = "0.11.11" +plerkle_messenger = { version = "1.6.0", features = ['redis'] } +plerkle_serialization = { version = "1.6.0" } +flatbuffers = "23.1.21" +lazy_static = "1.4.0" +regex = "1.5.5" +digital_asset_types = { path = "../digital_asset_types", features = [ + "json_types", + "sql_types", +] } +mpl-bubblegum = "1.0.1-beta.3" +spl-account-compression = { version = "0.2.0", features = ["no-entrypoint"] } +spl-concurrent-merkle-tree = "0.2.0" +uuid = "1.0.0" +async-trait = "0.1.53" +num-traits = "0.2.15" +blockbuster = "0.9.0-beta.1" +figment = { version = "0.10.6", features = ["env", "toml", "yaml"] } +cadence = "0.29.0" +cadence-macros = "0.29.0" +solana-sdk = "~1.16.16" +solana-client = "~1.16.16" +spl-token = { version = ">= 3.5.0, < 5.0", features = ["no-entrypoint"] } +solana-transaction-status = "~1.16.16" +solana-account-decoder = "~1.16.16" +solana-geyser-plugin-interface = "~1.16.16" +solana-sdk-macro = "~1.16.16" +rand = "0.8.5" +rust-crypto = "0.2.36" +url = "2.3.1" +anchor-lang = "0.28.0" +borsh = "~0.10.3" +stretto = { version = "0.7", features = ["async"] } +tokio-stream = "0.1.12" +tracing-subscriber = { version = "0.3.16", features = [ + "json", + "env-filter", + "ansi", +] } +clap = { version = "4.2.2", features = ["derive", "cargo", "env"] } + +[lints] +workspace = true diff --git a/tree_backfiller/src/backfiller.rs b/tree_backfiller/src/backfiller.rs new file mode 100644 index 00000000..ad4d34cf --- /dev/null +++ b/tree_backfiller/src/backfiller.rs @@ -0,0 +1,175 @@ +use crate::db; +use crate::{queue, tree}; +use anyhow::Result; +use clap::Parser; +use log::{debug, error, info}; +use sea_orm::SqlxPostgresConnector; +use solana_client::nonblocking::rpc_client::RpcClient; +use solana_sdk::signature::Signature; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; +use tokio::sync::{mpsc, Semaphore}; +use tokio::time::Duration; + +#[derive(Debug, Parser, Clone)] +pub struct Args { + /// Solana RPC URL + #[arg(long, env)] + pub solana_rpc_url: String, + + /// Number of tree crawler workers + #[arg(long, env, default_value = "1")] + pub tree_crawler_count: usize, + + /// The size of the signature channel. This is the number of signatures that can be queued up. If the channel is full, the crawler will block until there is space in the channel. + #[arg(long, env, default_value = "1")] + pub signature_channel_size: usize, + + #[arg(long, env, default_value = "1")] + pub queue_channel_size: usize, + + #[arg(long, env, default_value = "3000")] + pub transaction_check_timeout: u64, + + /// Database configuration + #[clap(flatten)] + pub database: db::PoolArgs, + + /// Redis configuration + #[clap(flatten)] + pub queue: queue::QueueArgs, +} + +/// A thread-safe counter. +pub struct Counter(Arc); + +impl Counter { + /// Creates a new counter initialized to zero. + pub fn new() -> Self { + Self(Arc::new(AtomicUsize::new(0))) + } + + /// Increments the counter by one. + pub fn increment(&self) { + self.0.fetch_add(1, Ordering::SeqCst); + } + + /// Decrements the counter by one. + pub fn decrement(&self) { + self.0.fetch_sub(1, Ordering::SeqCst); + } + + /// Returns the current value of the counter. + pub fn get(&self) -> usize { + self.0.load(Ordering::SeqCst) + } + + /// Returns a future that resolves when the counter reaches zero. + /// The future periodically checks the counter value and sleeps for a short duration. + pub fn zero(&self) -> impl std::future::Future { + let counter = self.clone(); + async move { + while counter.get() > 0 { + println!("Counter value: {}", counter.get()); + tokio::time::sleep(Duration::from_millis(100)).await; + } + } + } +} + +impl Clone for Counter { + /// Returns a clone of the counter. + /// The returned counter shares the same underlying atomic integer. + fn clone(&self) -> Self { + Self(Arc::clone(&self.0)) + } +} + +/// Runs the tree backfiller. +/// +/// This function takes a `Config` as input and returns a `Result<()>`. +/// It creates an `RpcClient` and retrieves all trees. +/// It then spawns a thread for each tree and a separate thread to handle transaction workers. +/// The function waits for all threads to finish before returning. +pub async fn run(config: Args) -> Result<()> { + let solana_rpc = Arc::new(RpcClient::new(config.solana_rpc_url)); + let sig_solana_rpc = Arc::clone(&solana_rpc); + + let pool = db::connect(config.database).await?; + + let (queue_sender, mut queue_receiver) = mpsc::channel::>(config.queue_channel_size); + let (sig_sender, mut sig_receiver) = mpsc::channel::(config.signature_channel_size); + + let transaction_worker_count = Counter::new(); + let transaction_worker_count_check = transaction_worker_count.clone(); + + tokio::spawn(async move { + loop { + tokio::select! { + Some(signature) = sig_receiver.recv() => { + let solana_rpc = Arc::clone(&sig_solana_rpc); + let transaction_worker_count_sig = transaction_worker_count.clone(); + let queue_sender = queue_sender.clone(); + + transaction_worker_count_sig.increment(); + + let transaction_task = async move { + if let Err(e) = tree::transaction(solana_rpc, queue_sender, signature).await { + error!("retrieving transaction: {:?}", e); + } + + transaction_worker_count_sig.decrement(); + }; + + tokio::spawn(transaction_task); + }, + else => break, + } + } + + Ok::<(), anyhow::Error>(()) + }); + + let queue_handler = tokio::spawn(async move { + let mut queue = queue::Queue::setup(config.queue).await?; + + while let Some(data) = queue_receiver.recv().await { + if let Err(e) = queue.push(&data).await { + error!("pushing to queue: {:?}", e); + } + } + + Ok::<(), anyhow::Error>(()) + }); + + let trees = tree::all(&solana_rpc).await?; + + let semaphore = Arc::new(Semaphore::new(config.tree_crawler_count)); + let mut crawl_handlers = Vec::with_capacity(trees.len()); + + for tree in trees { + let solana_rpc = Arc::clone(&solana_rpc); + let semaphore = Arc::clone(&semaphore); + let sig_sender = sig_sender.clone(); + let pool = pool.clone(); + let conn = SqlxPostgresConnector::from_sqlx_postgres_pool(pool); + + let crawl_handler = tokio::spawn(async move { + let _permit = semaphore.acquire().await?; + + if let Err(e) = tree.crawl(solana_rpc, sig_sender, conn).await { + error!("crawling tree: {:?}", e); + } + + Ok::<(), anyhow::Error>(()) + }); + + crawl_handlers.push(crawl_handler); + } + + futures::future::try_join_all(crawl_handlers).await?; + transaction_worker_count_check.zero().await; + let _ = queue_handler.await?; + + Ok(()) +} diff --git a/tree_backfiller/src/db.rs b/tree_backfiller/src/db.rs new file mode 100644 index 00000000..fd718113 --- /dev/null +++ b/tree_backfiller/src/db.rs @@ -0,0 +1,35 @@ +use anyhow::Result; +use clap::Parser; +use sqlx::{ + postgres::{PgConnectOptions, PgPoolOptions}, + PgPool, +}; + +#[derive(Debug, Parser, Clone)] +pub struct PoolArgs { + #[arg(long, env)] + pub database_url: String, + #[arg(long, env, default_value = "125")] + pub database_max_connections: u32, + #[arg(long, env, default_value = "5")] + pub database_min_connections: u32, +} + +///// Establishes a connection to the database using the provided configuration. +///// +///// # Arguments +///// +///// * `config` - A `PoolArgs` struct containing the database URL and the minimum and maximum number of connections. +///// +///// # Returns +///// +///// * `Result` - On success, returns a `DatabaseConnection`. On failure, returns a `DbErr`. +pub async fn connect(config: PoolArgs) -> Result { + let options: PgConnectOptions = config.database_url.parse()?; + + PgPoolOptions::new() + .min_connections(config.database_min_connections) + .max_connections(config.database_max_connections) + .connect_with(options) + .await +} diff --git a/tree_backfiller/src/main.rs b/tree_backfiller/src/main.rs new file mode 100644 index 00000000..2b5da240 --- /dev/null +++ b/tree_backfiller/src/main.rs @@ -0,0 +1,33 @@ +mod backfiller; +mod db; +mod queue; +mod tree; + +use anyhow::Result; +use clap::{Parser, Subcommand}; + +#[derive(Debug, Parser)] +#[clap(author, version)] +struct Args { + #[command(subcommand)] + command: Command, +} + +#[derive(Debug, Clone, Subcommand)] +enum Command { + /// The 'run' command is used to cross-reference the index against on-chain accounts. + /// It crawls through trees and backfills any missed tree transactions. + /// This is particularly useful for ensuring data consistency and completeness. + #[command(name = "run")] + Run(backfiller::Args), +} +#[tokio::main] +async fn main() -> Result<()> { + let args = Args::parse(); + + env_logger::init(); + + match args.command { + Command::Run(config) => backfiller::run(config).await, + } +} diff --git a/tree_backfiller/src/queue.rs b/tree_backfiller/src/queue.rs new file mode 100644 index 00000000..21511597 --- /dev/null +++ b/tree_backfiller/src/queue.rs @@ -0,0 +1,63 @@ +use anyhow::Result; +use clap::Parser; +use figment::value::{Dict, Value}; +use plerkle_messenger::{ + redis_messenger::RedisMessenger, Messenger, MessengerConfig, MessengerError, MessengerType, +}; + +const TRANSACTION_BACKFILL_STREAM: &'static str = "TXNFILL"; + +#[derive(Clone, Debug, Parser)] +pub struct QueueArgs { + #[arg(long, env)] + pub messenger_redis_url: String, + #[arg(long, env, default_value = "100")] + pub messenger_redis_batch_size: String, + #[arg(long, env, default_value = "10000000")] + pub messenger_stream_max_buffer_size: usize, +} + +impl From for MessengerConfig { + fn from(args: QueueArgs) -> Self { + let mut connection_config = Dict::new(); + + connection_config.insert( + "redis_connection_str".to_string(), + Value::from(args.messenger_redis_url), + ); + + connection_config.insert( + "batch_size".to_string(), + Value::from(args.messenger_redis_batch_size), + ); + + Self { + messenger_type: MessengerType::Redis, + connection_config: connection_config, + } + } +} + +#[derive(Debug)] +pub struct Queue(RedisMessenger); + +impl Queue { + pub async fn setup(config: QueueArgs) -> Result { + let mut messenger = RedisMessenger::new(config.clone().into()).await?; + + messenger.add_stream(TRANSACTION_BACKFILL_STREAM).await?; + + messenger + .set_buffer_size( + TRANSACTION_BACKFILL_STREAM, + config.messenger_stream_max_buffer_size, + ) + .await; + + Ok(Self(messenger)) + } + + pub async fn push(&mut self, message: &[u8]) -> Result<(), MessengerError> { + self.0.send(TRANSACTION_BACKFILL_STREAM, message).await + } +} diff --git a/tree_backfiller/src/tree.rs b/tree_backfiller/src/tree.rs new file mode 100644 index 00000000..f3214a10 --- /dev/null +++ b/tree_backfiller/src/tree.rs @@ -0,0 +1,204 @@ +use crate::queue::Queue; +use anyhow::Result; +use borsh::BorshDeserialize; +use clap::Args; +use digital_asset_types::dao::tree_transactions; +use flatbuffers::FlatBufferBuilder; +use plerkle_serialization::serializer::seralize_encoded_transaction_with_status; +use sea_orm::{ + sea_query::OnConflict, ActiveValue::Set, ColumnTrait, DatabaseConnection, EntityTrait, + QueryFilter, QueryOrder, +}; +use solana_account_decoder::UiAccountEncoding; +use solana_client::{ + nonblocking::rpc_client::RpcClient, + rpc_client::GetConfirmedSignaturesForAddress2Config, + rpc_config::{RpcAccountInfoConfig, RpcProgramAccountsConfig, RpcTransactionConfig}, + rpc_filter::{Memcmp, RpcFilterType}, +}; +use solana_sdk::{account::Account, pubkey::Pubkey, signature::Signature}; +use solana_transaction_status::UiTransactionEncoding; +use spl_account_compression::id; +use spl_account_compression::state::{ + merkle_tree_get_size, ConcurrentMerkleTreeHeader, CONCURRENT_MERKLE_TREE_HEADER_SIZE_V1, +}; +use std::str::FromStr; +use std::sync::Arc; +use thiserror::Error as ThisError; +use tokio::sync::mpsc::Sender; + +const GET_SIGNATURES_FOR_ADDRESS_LIMIT: usize = 1000; + +#[derive(Debug, Clone, Default, Args)] +pub struct ConfigBackfiller { + /// Solana RPC URL + #[arg(long, env)] + pub solana_rpc_url: String, +} + +#[derive(ThisError, Debug)] +pub enum TreeErrorKind { + #[error("solana rpc")] + Rpc(#[from] solana_client::client_error::ClientError), + #[error("anchor")] + Achor(#[from] anchor_client::anchor_lang::error::Error), + #[error("perkle serialize")] + PerkleSerialize(#[from] plerkle_serialization::error::PlerkleSerializationError), + #[error("perkle messenger")] + PlerkleMessenger(#[from] plerkle_messenger::MessengerError), + #[error("queue send")] + QueueSend(#[from] tokio::sync::mpsc::error::SendError>), +} +#[derive(Debug, Clone)] +pub struct TreeHeaderResponse { + pub max_depth: u32, + pub max_buffer_size: u32, + pub creation_slot: u64, + pub size: usize, +} + +impl TryFrom for TreeHeaderResponse { + type Error = TreeErrorKind; + + fn try_from(payload: ConcurrentMerkleTreeHeader) -> Result { + let size = merkle_tree_get_size(&payload)?; + Ok(Self { + max_depth: payload.get_max_depth(), + max_buffer_size: payload.get_max_buffer_size(), + creation_slot: payload.get_creation_slot(), + size, + }) + } +} + +#[derive(Debug, Clone)] +pub struct TreeResponse { + pub pubkey: Pubkey, + pub tree_header: TreeHeaderResponse, +} + +impl TreeResponse { + pub fn try_from_rpc(pubkey: Pubkey, account: Account) -> Result { + let (header_bytes, _rest) = account.data.split_at(CONCURRENT_MERKLE_TREE_HEADER_SIZE_V1); + let header: ConcurrentMerkleTreeHeader = + ConcurrentMerkleTreeHeader::try_from_slice(header_bytes)?; + + let (auth, _) = Pubkey::find_program_address(&[pubkey.as_ref()], &mpl_bubblegum::ID); + + header.assert_valid_authority(&auth)?; + + let tree_header = header.try_into()?; + + Ok(Self { + pubkey, + tree_header, + }) + } +} + +pub async fn all(client: &Arc) -> Result, TreeErrorKind> { + let config = RpcProgramAccountsConfig { + filters: Some(vec![RpcFilterType::Memcmp(Memcmp::new_raw_bytes( + 0, + vec![1u8], + ))]), + account_config: RpcAccountInfoConfig { + encoding: Some(UiAccountEncoding::Base64), + ..RpcAccountInfoConfig::default() + }, + ..RpcProgramAccountsConfig::default() + }; + + Ok(client + .get_program_accounts_with_config(&id(), config) + .await? + .into_iter() + .filter_map(|(pubkey, account)| TreeResponse::try_from_rpc(pubkey, account).ok()) + .collect()) +} + +impl TreeResponse { + pub async fn crawl( + &self, + client: Arc, + sender: Sender, + conn: DatabaseConnection, + ) -> Result<()> { + let mut before = None; + + let until = tree_transactions::Entity::find() + .filter(tree_transactions::Column::Tree.eq(self.pubkey.as_ref())) + .order_by_desc(tree_transactions::Column::Slot) + .one(&conn) + .await? + .map(|t| Signature::from_str(&t.signature).ok()) + .flatten(); + + loop { + let sigs = client + .get_signatures_for_address_with_config( + &self.pubkey, + GetConfirmedSignaturesForAddress2Config { + before, + until, + ..GetConfirmedSignaturesForAddress2Config::default() + }, + ) + .await?; + + for sig in sigs.iter() { + let slot = i64::try_from(sig.slot)?; + let sig = Signature::from_str(&sig.signature)?; + + let tree_transaction = tree_transactions::ActiveModel { + signature: Set(sig.to_string()), + tree: Set(self.pubkey.as_ref().to_vec()), + slot: Set(slot), + ..Default::default() + }; + + tree_transactions::Entity::insert(tree_transaction) + .on_conflict( + OnConflict::column(tree_transactions::Column::Signature) + .do_nothing() + .to_owned(), + ) + .exec(&conn) + .await?; + + sender.send(sig.clone()).await?; + + before = Some(sig); + } + + if sigs.len() < GET_SIGNATURES_FOR_ADDRESS_LIMIT { + break; + } + } + + Ok(()) + } +} + +pub async fn transaction<'a>( + client: Arc, + sender: Sender>, + signature: Signature, +) -> Result<(), TreeErrorKind> { + let transaction = client + .get_transaction_with_config( + &signature, + RpcTransactionConfig { + encoding: Some(UiTransactionEncoding::Base58), + max_supported_transaction_version: Some(0), + ..RpcTransactionConfig::default() + }, + ) + .await?; + + let message = seralize_encoded_transaction_with_status(FlatBufferBuilder::new(), transaction)?; + + sender.send(message.finished_data().to_vec()).await?; + + Ok(()) +}