Skip to content

Commit

Permalink
[WIP] Tree Transaction Backfiller (#114)
Browse files Browse the repository at this point in the history
* refactor(backfiller): tree backfilling using getSignaturesForAdress. fetch all trees, fetch associated transactions

* feat(backfiller): generate table and model for  query last transaction record for fast forwarding tree transaction crawling

* feat(backfiller): push transaction payloads to redis through the perkle messenger. mark tree transactons as processed_at so know it completed the index loop.

* fix(backfiller): git history changes made from running formatter. just include changes needed by the backfiller.

* fix(backfiller): support mock feature for sea-orm by switching to pg pool and sea_orm adapter.
  • Loading branch information
kespinola authored and linuskendall committed Jan 11, 2024
1 parent f3986ae commit 2a12b85
Show file tree
Hide file tree
Showing 20 changed files with 887 additions and 39 deletions.
57 changes: 57 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ members = [
"metaplex-rpc-proxy",
"migration",
"nft_ingester",
"tree_backfiller",
"tools/acc_forwarder",
"tools/bgtask_creator",
"tools/fetch_trees",
Expand Down
Empty file added backfiller.yaml
Empty file.
2 changes: 1 addition & 1 deletion digital_asset_types/src/dao/generated/cl_audits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,6 @@ impl From<crate::dao::cl_items::ActiveModel> for ActiveModel {
seq: item.seq,
leaf_idx: item.leaf_idx,
..Default::default()
}
};
}
}
1 change: 1 addition & 0 deletions digital_asset_types/src/dao/generated/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,4 @@ pub mod sea_orm_active_enums;
pub mod tasks;
pub mod token_accounts;
pub mod tokens;
pub mod tree_transactions;
1 change: 1 addition & 0 deletions digital_asset_types/src/dao/generated/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,4 @@ pub use super::raw_txn::Entity as RawTxn;
pub use super::tasks::Entity as Tasks;
pub use super::token_accounts::Entity as TokenAccounts;
pub use super::tokens::Entity as Tokens;
pub use super::tree_transactions::Entity as TreeTransactions;
67 changes: 67 additions & 0 deletions digital_asset_types/src/dao/generated/tree_transactions.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.10.5

use sea_orm::entity::prelude::*;
use serde::{Deserialize, Serialize};

#[derive(Copy, Clone, Default, Debug, DeriveEntity)]
pub struct Entity;

impl EntityName for Entity {
fn table_name(&self) -> &str {
"tree_transactions"
}
}

#[derive(Clone, Debug, PartialEq, DeriveModel, DeriveActiveModel, Eq, Serialize, Deserialize)]
pub struct Model {
pub signature: String,
pub tree: Vec<u8>,
pub slot: i64,
pub created_at: Option<DateTimeWithTimeZone>,
pub processed_at: Option<DateTimeWithTimeZone>,
}

#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
pub enum Column {
Signature,
Tree,
Slot,
CreatedAt,
ProcessedAt,
}

#[derive(Copy, Clone, Debug, EnumIter, DerivePrimaryKey)]
pub enum PrimaryKey {
Signature,
}

impl PrimaryKeyTrait for PrimaryKey {
type ValueType = String;
fn auto_increment() -> bool {
false
}
}

#[derive(Copy, Clone, Debug, EnumIter)]
pub enum Relation {}

impl ColumnTrait for Column {
type EntityName = Entity;
fn def(&self) -> ColumnDef {
match self {
Self::Signature => ColumnType::Char(Some(84u32)).def(),
Self::Tree => ColumnType::Binary.def(),
Self::Slot => ColumnType::BigInteger.def(),
Self::CreatedAt => ColumnType::TimestampWithTimeZone.def().null(),
Self::ProcessedAt => ColumnType::TimestampWithTimeZone.def().null(),
}
}
}

impl RelationTrait for Relation {
fn def(&self) -> RelationDef {
panic!("No RelationDef")
}
}

impl ActiveModelBehavior for ActiveModel {}
12 changes: 8 additions & 4 deletions digital_asset_types/src/dao/scopes/asset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -469,12 +469,14 @@ pub async fn get_signatures_for_asset(
asset_id: Option<Vec<u8>>,
tree_id: Option<Vec<u8>>,
leaf_idx: Option<i64>,
sort_direction: Order,
pagination: &Pagination,
limit: u64,
) -> Result<Vec<(String, Option<String>)>, DbErr> {
// if tree_id and leaf_idx are provided, use them directly to fetch transactions
if let (Some(tree_id), Some(leaf_idx)) = (tree_id, leaf_idx) {
let transactions = fetch_transactions(conn, tree_id, leaf_idx, pagination, limit).await?;
let transactions =
fetch_transactions(conn, tree_id, leaf_idx, sort_direction, pagination, limit).await?;
return Ok(transactions);
}

Expand Down Expand Up @@ -502,7 +504,8 @@ pub async fn get_signatures_for_asset(
let leaf_id = asset
.nonce
.ok_or(DbErr::RecordNotFound("Leaf ID does not exist".to_string()))?;
let transactions = fetch_transactions(conn, tree, leaf_id, pagination, limit).await?;
let transactions =
fetch_transactions(conn, tree, leaf_id, sort_direction, pagination, limit).await?;
Ok(transactions)
} else {
Ok(Vec::new())
Expand All @@ -513,6 +516,7 @@ pub async fn fetch_transactions(
conn: &impl ConnectionTrait,
tree: Vec<u8>,
leaf_id: i64,
sort_direction: Order,
pagination: &Pagination,
limit: u64,
) -> Result<Vec<(String, Option<String>)>, DbErr> {
Expand All @@ -524,8 +528,8 @@ pub async fn fetch_transactions(
pagination,
limit,
stmt,
Order::Asc,
asset::Column::Id
sort_direction,
cl_audits::Column::Id,
);
let transactions = stmt.all(conn).await?;
let transaction_list: Vec<(String, Option<String>)> = transactions
Expand Down
6 changes: 5 additions & 1 deletion digital_asset_types/src/dapi/signatures_for_asset.rs
Original file line number Diff line number Diff line change
@@ -1,25 +1,29 @@
use crate::dao::scopes;
use crate::dao::PageOptions;
use crate::rpc::filter::AssetSorting;
use crate::rpc::response::TransactionSignatureList;
use sea_orm::DatabaseConnection;
use sea_orm::DbErr;
use super::common::build_transaction_signatures_response;
use super::common::create_pagination;
use super::common::{create_pagination,create_sorting};


pub async fn get_signatures_for_asset(
db: &DatabaseConnection,
asset_id: Option<Vec<u8>>,
tree: Option<Vec<u8>>,
leaf_idx: Option<i64>,
sorting: AssetSorting,
page_options: &PageOptions,
) -> Result<TransactionSignatureList, DbErr> {
let pagination = create_pagination(&page_options)?;
let (sort_direction, sort_column) = create_sorting(sorting);
let transactions = scopes::asset::get_signatures_for_asset(
db,
asset_id,
tree,
leaf_idx,
sort_direction,
&pagination,
page_options.limit
)
Expand Down
5 changes: 5 additions & 0 deletions migration/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ mod m20231101_120101_add_instruction_into_cl_audit;
mod m20231101_120101_cl_audit_table_index;
mod m20231019_120101_add_seq_numbers_bgum_update_metadata;
mod m20231206_120101_remove_was_decompressed;
mod m20231208_103949_create_tree_transactions_table;

pub struct Migrator;

Expand Down Expand Up @@ -77,7 +78,11 @@ impl MigratorTrait for Migrator {
=======
Box::new(m20231101_120101_add_instruction_into_cl_audit::Migration),
Box::new(m20231101_120101_cl_audit_table_index::Migration),
<<<<<<< HEAD
>>>>>>> 4295c8f... feat: Add GetSigntaturesForAsset API
=======
Box::new(m20231208_103949_create_tree_transactions_table::Migration),
>>>>>>> 1968c00... [WIP] Tree Transaction Backfiller (#114)
]
}
}
61 changes: 61 additions & 0 deletions migration/src/m20231208_103949_create_tree_transactions_table.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
use sea_orm_migration::prelude::*;

#[derive(DeriveMigrationName)]
pub struct Migration;

#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.create_table(
Table::create()
.table(TreeTransactions::Table)
.if_not_exists()
.col(
ColumnDef::new(TreeTransactions::Signature)
.char_len(88)
.not_null()
.primary_key(),
)
.col(ColumnDef::new(TreeTransactions::Tree).binary().not_null())
.col(ColumnDef::new(TreeTransactions::Slot).big_integer().not_null())
.col(ColumnDef::new(TreeTransactions::CreatedAt).timestamp_with_time_zone().default("now()"))
.col(ColumnDef::new(TreeTransactions::ProcessedAt).timestamp_with_time_zone())
.to_owned(),
)
.await?;

manager
.create_index(
Index::create()
.name("tree_slot_index")
.table(TreeTransactions::Table)
.col(TreeTransactions::Tree)
.col(TreeTransactions::Slot)
.unique()
.to_owned(),
)
.await
}

async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.drop_index(Index::drop().name("tree_slot_index").table(TreeTransactions::Table).to_owned())
.await?;

manager
.drop_table(Table::drop().table(TreeTransactions::Table).to_owned())
.await
}
}

/// Learn more at https://docs.rs/sea-query#iden
#[derive(Iden)]
enum TreeTransactions {
Table,
Signature,
Tree,
CreatedAt,
ProcessedAt,
Slot,
}
4 changes: 4 additions & 0 deletions nft_ingester/src/error/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ pub enum IngesterError {
HttpError { status_code: String },
#[error("AssetIndex Error {0}")]
AssetIndexError(String),
#[error("TryFromInt Error {0}")]
TryFromInt(#[from] std::num::TryFromIntError),
#[error("Chrono FixedOffset Error")]
ChronoFixedOffset,
}

impl From<reqwest::Error> for IngesterError {
Expand Down
Loading

0 comments on commit 2a12b85

Please sign in to comment.