Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Tree Transaction Backfiller #114

Merged
merged 5 commits into from
Dec 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 61 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ members = [
"digital_asset_types",
"metaplex-rpc-proxy",
"nft_ingester",
"tree_backfiller",
"tools/acc_forwarder",
"tools/bgtask_creator",
"tools/fetch_trees",
Expand Down
Empty file added backfiller.yaml
Empty file.
11 changes: 9 additions & 2 deletions digital_asset_types/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,17 @@ publish = false

[dependencies]
spl-concurrent-merkle-tree = "0.2.0"
sea-orm = { optional = true, version = "0.10.6", features = ["macros", "runtime-tokio-rustls", "sqlx-postgres", "with-chrono", "mock"] }
sea-orm = { optional = true, version = "0.10.6", features = [
"macros",
"runtime-tokio-rustls",
"sqlx-postgres",
"with-chrono",
] }
sea-query = { version = "0.28.1", features = ["postgres-array"] }
serde = { version = "1.0.137", optional = true }
serde_json = { version = "1.0.81", optional = true, features=["preserve_order"] }
serde_json = { version = "1.0.81", optional = true, features = [
"preserve_order",
] }
bs58 = "0.4.0"
borsh = { version = "~0.10.3", optional = true }
borsh-derive = { version = "~0.10.3", optional = true }
Expand Down
4 changes: 2 additions & 2 deletions digital_asset_types/src/dao/generated/cl_audits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,6 @@ impl From<crate::dao::cl_items::ActiveModel> for ActiveModel {
seq: item.seq,
leaf_idx: item.leaf_idx,
..Default::default()
}
};
}
}
}
1 change: 1 addition & 0 deletions digital_asset_types/src/dao/generated/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,4 @@ pub mod sea_orm_active_enums;
pub mod tasks;
pub mod token_accounts;
pub mod tokens;
pub mod tree_transactions;
1 change: 1 addition & 0 deletions digital_asset_types/src/dao/generated/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,4 @@ pub use super::raw_txn::Entity as RawTxn;
pub use super::tasks::Entity as Tasks;
pub use super::token_accounts::Entity as TokenAccounts;
pub use super::tokens::Entity as Tokens;
pub use super::tree_transactions::Entity as TreeTransactions;
67 changes: 67 additions & 0 deletions digital_asset_types/src/dao/generated/tree_transactions.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.10.5

use sea_orm::entity::prelude::*;
use serde::{Deserialize, Serialize};

#[derive(Copy, Clone, Default, Debug, DeriveEntity)]
pub struct Entity;

impl EntityName for Entity {
fn table_name(&self) -> &str {
"tree_transactions"
}
}

#[derive(Clone, Debug, PartialEq, DeriveModel, DeriveActiveModel, Eq, Serialize, Deserialize)]
pub struct Model {
pub signature: String,
pub tree: Vec<u8>,
pub slot: i64,
pub created_at: Option<DateTimeWithTimeZone>,
pub processed_at: Option<DateTimeWithTimeZone>,
}

#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
pub enum Column {
Signature,
Tree,
Slot,
CreatedAt,
ProcessedAt,
}

#[derive(Copy, Clone, Debug, EnumIter, DerivePrimaryKey)]
pub enum PrimaryKey {
Signature,
}

impl PrimaryKeyTrait for PrimaryKey {
type ValueType = String;
fn auto_increment() -> bool {
false
}
}

#[derive(Copy, Clone, Debug, EnumIter)]
pub enum Relation {}

impl ColumnTrait for Column {
type EntityName = Entity;
fn def(&self) -> ColumnDef {
match self {
Self::Signature => ColumnType::Char(Some(84u32)).def(),
Self::Tree => ColumnType::Binary.def(),
Self::Slot => ColumnType::BigInteger.def(),
Self::CreatedAt => ColumnType::TimestampWithTimeZone.def().null(),
Self::ProcessedAt => ColumnType::TimestampWithTimeZone.def().null(),
}
}
}

impl RelationTrait for Relation {
fn def(&self) -> RelationDef {
panic!("No RelationDef")
}
}

impl ActiveModelBehavior for ActiveModel {}
14 changes: 11 additions & 3 deletions digital_asset_types/src/dao/scopes/asset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,8 @@ pub async fn get_signatures_for_asset(
) -> Result<Vec<(String, Option<String>)>, DbErr> {
// if tree_id and leaf_idx are provided, use them directly to fetch transactions
if let (Some(tree_id), Some(leaf_idx)) = (tree_id, leaf_idx) {
let transactions = fetch_transactions(conn, tree_id, leaf_idx, sort_direction, pagination, limit).await?;
let transactions =
fetch_transactions(conn, tree_id, leaf_idx, sort_direction, pagination, limit).await?;
return Ok(transactions);
}

Expand Down Expand Up @@ -442,7 +443,8 @@ pub async fn get_signatures_for_asset(
let leaf_id = asset
.nonce
.ok_or(DbErr::RecordNotFound("Leaf ID does not exist".to_string()))?;
let transactions = fetch_transactions(conn, tree, leaf_id, sort_direction, pagination, limit).await?;
let transactions =
fetch_transactions(conn, tree, leaf_id, sort_direction, pagination, limit).await?;
Ok(transactions)
} else {
Ok(Vec::new())
Expand All @@ -461,7 +463,13 @@ pub async fn fetch_transactions(
stmt = stmt.filter(cl_audits::Column::LeafIdx.eq(leaf_id));
stmt = stmt.order_by(cl_audits::Column::CreatedAt, sea_orm::Order::Desc);

stmt = paginate(pagination, limit, stmt, sort_direction, cl_audits::Column::Id);
stmt = paginate(
pagination,
limit,
stmt,
sort_direction,
cl_audits::Column::Id,
);
let transactions = stmt.all(conn).await?;
let transaction_list: Vec<(String, Option<String>)> = transactions
.into_iter()
Expand Down
2 changes: 2 additions & 0 deletions migration/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ mod m20230918_182123_add_raw_name_symbol;
mod m20230919_072154_cl_audits;
mod m20231101_120101_add_instruction_into_cl_audit;
mod m20231101_120101_cl_audit_table_index;
mod m20231208_103949_create_tree_transactions_table;

pub struct Migrator;

Expand Down Expand Up @@ -71,6 +72,7 @@ impl MigratorTrait for Migrator {
Box::new(m20230919_072154_cl_audits::Migration),
Box::new(m20231101_120101_add_instruction_into_cl_audit::Migration),
Box::new(m20231101_120101_cl_audit_table_index::Migration),
Box::new(m20231208_103949_create_tree_transactions_table::Migration),
]
}
}
61 changes: 61 additions & 0 deletions migration/src/m20231208_103949_create_tree_transactions_table.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
use sea_orm_migration::prelude::*;

#[derive(DeriveMigrationName)]
pub struct Migration;

#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.create_table(
Table::create()
.table(TreeTransactions::Table)
.if_not_exists()
.col(
ColumnDef::new(TreeTransactions::Signature)
.char_len(88)
.not_null()
.primary_key(),
)
.col(ColumnDef::new(TreeTransactions::Tree).binary().not_null())
.col(ColumnDef::new(TreeTransactions::Slot).big_integer().not_null())
.col(ColumnDef::new(TreeTransactions::CreatedAt).timestamp_with_time_zone().default("now()"))
.col(ColumnDef::new(TreeTransactions::ProcessedAt).timestamp_with_time_zone())
.to_owned(),
)
.await?;

manager
.create_index(
Index::create()
.name("tree_slot_index")
.table(TreeTransactions::Table)
.col(TreeTransactions::Tree)
.col(TreeTransactions::Slot)
.unique()
.to_owned(),
)
.await
}

async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.drop_index(Index::drop().name("tree_slot_index").table(TreeTransactions::Table).to_owned())
.await?;

manager
.drop_table(Table::drop().table(TreeTransactions::Table).to_owned())
.await
}
}

/// Learn more at https://docs.rs/sea-query#iden
#[derive(Iden)]
enum TreeTransactions {
Table,
Signature,
Tree,
CreatedAt,
ProcessedAt,
Slot,
}
33 changes: 27 additions & 6 deletions nft_ingester/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,33 @@ publish = false
hex = "0.4.3"
log = "0.4.17"
env_logger = "0.10.0"
redis = { version = "0.22.3", features = ["aio", "tokio-comp", "streams", "tokio-native-tls-comp"] }
futures = {version = "0.3.25"}
redis = { version = "0.22.3", features = [
"aio",
"tokio-comp",
"streams",
"tokio-native-tls-comp",
] }
futures = { version = "0.3.25" }
futures-util = "0.3.27"
base64 = "0.21.0"
thiserror = "1.0.31"
serde_json = "1.0.81"
tokio = { version = "1.26.0", features = ["full", "tracing"] }
sqlx = { version = "0.6.2", features = ["macros", "runtime-tokio-rustls", "postgres", "uuid", "offline", "json"] }
sea-orm = { version = "0.10.6", features = ["macros", "runtime-tokio-rustls", "sqlx-postgres", "with-chrono", "mock"] }
sqlx = { version = "0.6.2", features = [
"macros",
"runtime-tokio-rustls",
"postgres",
"uuid",
"offline",
"json",
] }
sea-orm = { version = "0.10.6", features = [
"macros",
"runtime-tokio-rustls",
"sqlx-postgres",
"with-chrono",
"mock",
] }
sea-query = { version = "0.28.1", features = ["postgres-array"] }
chrono = "0.4.19"
tokio-postgres = "0.7.7"
Expand All @@ -28,7 +46,10 @@ plerkle_serialization = { version = "1.6.0" }
flatbuffers = "23.1.21"
lazy_static = "1.4.0"
regex = "1.5.5"
digital_asset_types = { path = "../digital_asset_types", features = ["json_types", "sql_types"] }
digital_asset_types = { path = "../digital_asset_types", features = [
"json_types",
"sql_types",
] }
mpl-bubblegum = "1.0.1-beta.3"
spl-account-compression = { version = "0.2.0", features = ["no-entrypoint"] }
spl-concurrent-merkle-tree = "0.2.0"
Expand All @@ -48,7 +69,7 @@ solana-geyser-plugin-interface = "~1.16.16"
solana-sdk-macro = "~1.16.16"
rand = "0.8.5"
rust-crypto = "0.2.36"
url="2.3.1"
url = "2.3.1"
anchor-lang = "0.28.0"
borsh = "~0.10.3"
stretto = { version = "0.7", features = ["async"] }
Expand Down
4 changes: 4 additions & 0 deletions nft_ingester/src/error/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ pub enum IngesterError {
HttpError { status_code: String },
#[error("AssetIndex Error {0}")]
AssetIndexError(String),
#[error("TryFromInt Error {0}")]
TryFromInt(#[from] std::num::TryFromIntError),
#[error("Chrono FixedOffset Error")]
ChronoFixedOffset,
}

impl From<reqwest::Error> for IngesterError {
Expand Down
Loading
Loading