Skip to content

Commit

Permalink
Scx1332/improvements in scan (#154)
Browse files Browse the repository at this point in the history
* Improvements in blockchain scan

* f

* f

* f

* f

* f

* f

* f

* f

* f

* f

* f

* f

* Improvements in scanner

* fmt

* f

* f
  • Loading branch information
scx1332 authored Mar 27, 2024
1 parent d1cadd1 commit 20f300f
Show file tree
Hide file tree
Showing 10 changed files with 302 additions and 50 deletions.
149 changes: 149 additions & 0 deletions crates/erc20_payment_lib/src/server/web.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use actix_web::http::header::HeaderValue;
use actix_web::http::{header, StatusCode};
use actix_web::web::Data;
use actix_web::{web, HttpRequest, HttpResponse, Responder, Scope};
use chrono::{DateTime, Utc};
use erc20_payment_lib_common::ops::*;
use erc20_payment_lib_common::utils::datetime_from_u256_timestamp;
use erc20_payment_lib_common::{export_metrics_to_prometheus, FaucetData};
Expand Down Expand Up @@ -660,6 +661,153 @@ async fn new_transfer(
Ok("success".to_string())
}

#[derive(Deserialize)]
pub struct StatsTransferRequest {
receiver: Option<String>,
from: Option<String>,
to: Option<String>,
chain: Option<String>,
}

#[derive(Debug, Serialize)]
pub struct StatsTransferResult {
request_time: f64,
transfers: Vec<ChainTransferRespObj>,
}

#[derive(Serialize, sqlx::FromRow, Debug, Clone)]
#[serde(rename_all = "camelCase")]
pub struct ChainTransferRespObj {
pub id: i64,
pub from_addr: String,
pub receiver_addr: String,
pub chain_id: i64,
pub token_addr: Option<String>,
pub token_amount: String,
pub tx_hash: String,
pub block_number: i64,
pub fee_paid: Option<String>,
pub block_date: DateTime<Utc>,
pub block_timestamp: i64,
pub to_addr: String,
pub caller_addr: String,
}

pub async fn stats_transfers(
data: Data<Box<ServerData>>,
info: web::Query<StatsTransferRequest>,
) -> actix_web::Result<web::Json<StatsTransferResult>> {
let time_start = std::time::Instant::now();
let receiver = if info.receiver.clone() == Some("all".to_string()) {
None
} else {
let account = Address::from_str(
&info
.receiver
.clone()
.ok_or(actix_web::error::ErrorBadRequest("account not found"))?,
)
.map_err(|err| {
actix_web::error::ErrorBadRequest(format!("account has to be valid address {err}"))
})?;
Some(account)
};
let account_str = receiver.map(|account| format!("{:#x}", account));

let from = chrono::DateTime::from_timestamp(
i64::from_str(
&info
.from
.clone()
.ok_or(actix_web::error::ErrorBadRequest("From not found"))?,
)
.map_err(|err| {
actix_web::error::ErrorBadRequest(format!("From is not a valid timestamp {err}"))
})?,
0,
)
.ok_or(actix_web::error::ErrorBadRequest(
"From is not a valid timestamp.",
))?;
let to = chrono::DateTime::from_timestamp(
i64::from_str(
&info
.to
.clone()
.ok_or(actix_web::error::ErrorBadRequest("To not found"))?,
)
.map_err(|err| {
actix_web::error::ErrorBadRequest(format!("To is not a valid timestamp {err}"))
})?,
0,
)
.ok_or(actix_web::error::ErrorBadRequest(
"To is not a valid timestamp.",
))?;

let chain_id = i64::from_str(
&info
.chain
.clone()
.ok_or(actix_web::error::ErrorBadRequest("Chain id not found"))?,
)
.map_err(|err| actix_web::error::ErrorBadRequest(format!("Chain id a valid {err}")))?;

let conn = data.db_connection.lock().await.clone();
let transf = if let Some(receiver) = account_str.as_ref() {
let transf =
get_all_chain_transfers_by_receiver_ext(&conn, chain_id, from, to, receiver, None)
.await;
transf.map_err(|err| {
actix_web::error::ErrorBadRequest(format!("Unknown server error: {}", err))
})?
} else {
let transf = get_all_chain_transfers_ext(&conn, chain_id, from, to, None).await;
transf.map_err(|err| {
actix_web::error::ErrorBadRequest(format!("Unknown server error: {}", err))
})?
};

let mut resp = Vec::new();
for trans in transf.into_iter() {
let Some(blockchain_date) = trans.blockchain_date else {
continue;
};

if blockchain_date < from {
continue;
}
if let Some(account_str) = account_str.as_ref() {
if trans.receiver_addr != *account_str {
continue;
}
}

resp.push(ChainTransferRespObj {
id: trans.id,
from_addr: trans.from_addr,
receiver_addr: trans.receiver_addr,
chain_id: trans.chain_id,
token_addr: trans.token_addr,
token_amount: trans.token_amount,
tx_hash: trans.tx_hash,
block_number: trans.block_number,
fee_paid: trans.fee_paid,
block_date: blockchain_date,
block_timestamp: blockchain_date.timestamp(),
to_addr: trans.to_addr,
caller_addr: trans.caller_addr,
})
}

let time_end = time_start.elapsed().as_secs_f64();
//serialize
Ok(web::Json(StatsTransferResult {
request_time: time_end,
transfers: resp,
}))
}

pub async fn transfers(data: Data<Box<ServerData>>, req: HttpRequest) -> impl Responder {
let tx_id = req
.match_info()
Expand Down Expand Up @@ -1081,6 +1229,7 @@ pub fn runtime_web_scope(
.route("/rpc_pool", web::get().to(rpc_pool))
.route("/rpc_pool/metrics", web::get().to(rpc_pool_metrics))
.route("/config", web::get().to(config_endpoint))
.route("/stats/transfers", web::get().to(stats_transfers))
.route("/transactions", web::get().to(transactions))
.route("/transactions/count", web::get().to(transactions_count))
.route("/transactions/next", web::get().to(transactions_next))
Expand Down
2 changes: 1 addition & 1 deletion crates/erc20_payment_lib/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ pub async fn transaction_from_chain_and_into_db(
.map_err(|_err| ConversionError::from("Cannot parse tx_hash".to_string()))
.map_err(err_from!())?;

if let Some(chain_tx) = get_chain_tx_hash(conn, tx_hash.to_string())
if let Some(chain_tx) = get_chain_tx_hash(conn, format!("{:#x}", tx_hash))
.await
.map_err(err_from!())?
{
Expand Down
7 changes: 7 additions & 0 deletions crates/erc20_payment_lib/src/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -940,6 +940,13 @@ pub async fn find_receipt_extended(
chain_tx_dao.priority_fee = tx.max_priority_fee_per_gas.map(|x| x.to_string());
chain_tx_dao.fee_paid = (gas_used * effective_gas_price).to_string();

chain_tx_dao.method = if tx.input.0.len() >= 4 {
// extract method
format!("0x{}", hex::encode(&tx.input.0[0..4]))
} else {
"N/A".to_string()
};

//todo: move to lazy static
let erc20_transfer_event_signature: H256 =
H256::from_str("0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
DELETE FROM chain_tx;
CREATE UNIQUE INDEX "idx_chain_tx_tx_hash" ON "chain_tx" ("tx_hash");
CREATE INDEX "idx_chain_tx_blockchain_date" ON "chain_tx" ("blockchain_date");
CREATE INDEX "idx_chain_transfer_blockchain_date" ON "chain_transfer" ("blockchain_date");
CREATE INDEX "idx_chain_transfer_receiver_address" ON "chain_transfer" ("receiver_addr");
CREATE INDEX "idx_chain_transfer_from_address" ON "chain_transfer" ("from_addr");

Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ pub struct ChainTransferDbObj {
pub fee_paid: Option<String>,
pub blockchain_date: Option<DateTime<Utc>>,
}
/*

#[derive(Serialize, sqlx::FromRow, Debug, Clone)]
#[serde(rename_all = "camelCase")]
pub struct ChainTransferDbObjExt {
Expand All @@ -25,5 +25,10 @@ pub struct ChainTransferDbObjExt {
pub token_addr: Option<String>,
pub token_amount: String,
pub chain_tx_id: i64,
}*/
pub fee_paid: Option<String>,
pub blockchain_date: Option<DateTime<Utc>>,
pub tx_hash: String,
pub block_number: i64,
pub to_addr: String,
pub caller_addr: String,
}
2 changes: 1 addition & 1 deletion crates/erc20_payment_lib_common/src/db/model/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ mod transfer_in_dao;
mod tx_dao;

pub use allowance_dao::AllowanceDbObj;
pub use chain_transfer_dao::ChainTransferDbObj;
pub use chain_transfer_dao::{ChainTransferDbObj, ChainTransferDbObjExt};
pub use chain_tx_dao::ChainTxDbObj;
pub use scan_dao::ScanDaoDbObj;
pub use token_transfer_dao::TokenTransferDbObj;
Expand Down
51 changes: 50 additions & 1 deletion crates/erc20_payment_lib_common/src/db/ops/chain_transfer_ops.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use super::model::ChainTransferDbObj;
use crate::model::ChainTransferDbObjExt;
use chrono::{DateTime, Utc};
use sqlx::Executor;
use sqlx::Sqlite;
use sqlx::SqlitePool;
Expand Down Expand Up @@ -28,15 +30,62 @@ VALUES ($1, $2, $3, $4, $5, $6, $7, $8) RETURNING *;
.await?;
Ok(res)
}
pub async fn get_all_chain_transfers_ext(
conn: &SqlitePool,
chain_id: i64,
from: DateTime<Utc>,
to: DateTime<Utc>,
limit: Option<i64>,
) -> Result<Vec<ChainTransferDbObjExt>, sqlx::Error> {
let limit = limit.unwrap_or(i64::MAX);
let rows = sqlx::query_as::<_, ChainTransferDbObjExt>(
r"SELECT ct.*, cx.tx_hash, cx.block_number, cx.to_addr, cx.from_addr as caller_addr FROM chain_transfer as ct JOIN chain_tx as cx ON ct.chain_tx_id = cx.id WHERE ct.chain_id = $1 AND ct.blockchain_date >= $2 AND ct.blockchain_date <= $3 ORDER by id DESC LIMIT $4",
)
.bind(chain_id)
.bind(from)
.bind(to)
.bind(limit)
.fetch_all(conn)
.await?;
Ok(rows)
}

pub async fn get_all_chain_transfers_by_receiver_ext(
conn: &SqlitePool,
chain_id: i64,
from: DateTime<Utc>,
to: DateTime<Utc>,
receiver: &str,
limit: Option<i64>,
) -> Result<Vec<ChainTransferDbObjExt>, sqlx::Error> {
let limit = limit.unwrap_or(i64::MAX);
let rows = sqlx::query_as::<_, ChainTransferDbObjExt>(
r"SELECT ct.*, cx.tx_hash, cx.block_number, cx.to_addr, cx.from_addr as caller_addr FROM chain_transfer as ct JOIN chain_tx as cx ON ct.chain_tx_id = cx.id WHERE ct.chain_id = $1 AND ct.blockchain_date >= $2 AND ct.blockchain_date <= $3 AND ct.receiver_addr = $4 ORDER by id DESC LIMIT $5",
)
.bind(chain_id)
.bind(from)
.bind(to)
.bind(receiver)
.bind(limit)
.fetch_all(conn)
.await?;
Ok(rows)
}

pub async fn get_all_chain_transfers(
conn: &SqlitePool,
chain_id: i64,
from: DateTime<Utc>,
to: DateTime<Utc>,
limit: Option<i64>,
) -> Result<Vec<ChainTransferDbObj>, sqlx::Error> {
let limit = limit.unwrap_or(i64::MAX);
let rows = sqlx::query_as::<_, ChainTransferDbObj>(
r"SELECT * FROM chain_transfer ORDER by id DESC LIMIT $1",
r"SELECT * FROM chain_transfer WHERE chain_id = $1 AND blockchain_date >= $2 AND blockchain_date <= $3 ORDER by id DESC LIMIT $4",
)
.bind(chain_id)
.bind(from)
.bind(to)
.bind(limit)
.fetch_all(conn)
.await?;
Expand Down
21 changes: 21 additions & 0 deletions crates/erc20_payment_lib_common/src/db/ops/chain_tx_ops.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use super::model::ChainTxDbObj;
use chrono::{DateTime, Utc};
use sqlx::Executor;
use sqlx::Sqlite;
use sqlx::SqlitePool;
Expand Down Expand Up @@ -57,6 +58,26 @@ pub async fn get_chain_txs_by_chain_id(
Ok(rows)
}

pub async fn get_chain_txs_by_chain_id_and_dates(
conn: &SqlitePool,
chain_id: i64,
from: DateTime<Utc>,
to: DateTime<Utc>,
limit: Option<i64>,
) -> Result<Vec<ChainTxDbObj>, sqlx::Error> {
let limit = limit.unwrap_or(i64::MAX);
let rows = sqlx::query_as::<_, ChainTxDbObj>(
r"SELECT * FROM chain_tx WHERE chain_id = $1 AND blockchain_date >= $2 AND blockchain_date <= $3 ORDER by id DESC LIMIT $4",
)
.bind(chain_id)
.bind(from)
.bind(to)
.bind(limit)
.fetch_all(conn)
.await?;
Ok(rows)
}

pub async fn get_chain_tx(conn: &SqlitePool, id: i64) -> Result<ChainTxDbObj, sqlx::Error> {
let row = sqlx::query_as::<_, ChainTxDbObj>(r"SELECT * FROM chain_tx WHERE id = $1")
.bind(id)
Expand Down
Loading

0 comments on commit 20f300f

Please sign in to comment.