Skip to content

Commit

Permalink
feat: crawlers states (#73)
Browse files Browse the repository at this point in the history
* feat: initial crawlers states

* refactor: rename status to state

* fix: some fixes

* feat: crawlers state endpoint

* chore: crawler state cleanup

* chore: fmt fix

* feat: use crawler in interval crawlers

* chore: fmt fix

* fix: transactions crawler starting from last saved transaction

* feat: fix upserts and remove clear_db call

* feat: make interval crawlers processing right away

* feat: return timestamp 0 when no values in the db
  • Loading branch information
mateuszjasiuk authored Jul 16, 2024
1 parent 5f5d94e commit 0aa9369
Show file tree
Hide file tree
Showing 79 changed files with 1,762 additions and 830 deletions.
5 changes: 4 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ deadpool-diesel = { version = "0.5.0", features = ["postgres"] }
diesel = { version = "2.1.0", features = [
"postgres",
"serde_json",
"numeric"
"numeric",
"chrono",
] }
diesel-derive-enum = { version = "2.1.0", features = ["postgres"] }
orm = { path = "orm" }
Expand All @@ -82,3 +83,5 @@ duration-str = "0.7.1"
fake = { version = "2.9.2", features = ["derive"] }
rand = "0.8.5"
bigdecimal = "0.4.5"
strum = "0.26.3"
strum_macros = "0.26.3"
1 change: 1 addition & 0 deletions chain/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ tokio.workspace = true
tracing.workspace = true
tracing-subscriber.workspace = true
serde_json.workspace = true
chrono.workspace = true
clap.workspace = true
anyhow.workspace = true
namada_sdk.workspace = true
Expand Down
95 changes: 70 additions & 25 deletions chain/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::convert::identity;
use std::sync::Arc;
use std::time::Duration;

Expand All @@ -14,6 +15,7 @@ use chain::services::{
db as db_service, namada as namada_service,
tendermint as tendermint_service,
};
use chrono::{NaiveDateTime, Utc};
use clap::Parser;
use clap_verbosity_flag::LevelFilter;
use deadpool_diesel::postgres::Object;
Expand All @@ -23,7 +25,7 @@ use shared::block::Block;
use shared::block_result::BlockResult;
use shared::checksums::Checksums;
use shared::crawler::crawl;
use shared::crawler_state::CrawlerState;
use shared::crawler_state::ChainCrawlerState;
use shared::error::{AsDbError, AsRpcError, ContextDbInteractError, MainError};
use tendermint_rpc::HttpClient;
use tokio::time::sleep;
Expand Down Expand Up @@ -73,13 +75,10 @@ async fn main() -> Result<(), MainError> {

initial_query(&client, &conn, config.initial_query_retry_time).await?;

let last_block_height = db_service::get_last_synched_block(&conn)
let crawler_state = db_service::get_chain_crawler_state(&conn)
.await
.into_db_error()?;

let next_block =
last_block_height.expect("Last block height has to be set!");

crawl(
move |block_height| {
crawling_fn(
Expand All @@ -89,7 +88,7 @@ async fn main() -> Result<(), MainError> {
checksums.clone(),
)
},
next_block,
crawler_state.last_processed_block,
)
.await
}
Expand All @@ -100,14 +99,15 @@ async fn crawling_fn(
conn: Arc<Object>,
checksums: Checksums,
) -> Result<(), MainError> {
tracing::info!("Attempting to process block: {}...", block_height);
let should_process = can_process(block_height, client.clone()).await?;

if !should_process {
let timestamp = Utc::now().naive_utc();
update_crawler_timestamp(&conn, timestamp).await?;

if !namada_service::is_block_committed(&client, block_height)
.await
.into_rpc_error()?
{
tracing::warn!("Block {} was not processed, retry...", block_height);
return Err(MainError::RpcError);

return Err(MainError::NoAction);
}

tracing::info!("Query block...");
Expand Down Expand Up @@ -195,8 +195,11 @@ async fn crawling_fn(

let timestamp_in_sec = DateTimeUtc::now().0.timestamp();

let crawler_state =
CrawlerState::new(block_height, epoch, timestamp_in_sec);
let crawler_state = ChainCrawlerState {
last_processed_block: block_height,
last_processed_epoch: epoch,
timestamp: timestamp_in_sec,
};

conn.interact(move |conn| {
conn.build_transaction()
Expand Down Expand Up @@ -239,7 +242,7 @@ async fn crawling_fn(
revealed_pks,
)?;

repository::crawler::insert_crawler_state(
repository::crawler_state::upsert_crawler_state(
transaction_conn,
crawler_state,
)?;
Expand Down Expand Up @@ -268,14 +271,16 @@ async fn initial_query(
.into_rpc_error()?;

loop {
let pos_crawler_epoch =
let pos_crawler_state =
get_pos_crawler_state(conn).await.into_db_error();

match pos_crawler_epoch {
match pos_crawler_state {
// >= in case epochs are really short
Ok(pos_crawler_epoch) if pos_crawler_epoch.epoch >= epoch => {
Ok(pos_crawler_state)
if pos_crawler_state.last_processed_epoch >= epoch =>
{
// We assign pos crawler epoch as epoch to process
epoch = pos_crawler_epoch.epoch;
epoch = pos_crawler_state.last_processed_epoch;
break;
}
_ => {}
Expand Down Expand Up @@ -307,9 +312,13 @@ async fn initial_query(
.await
.into_rpc_error()?;

let timestamp_in_sec = DateTimeUtc::now().0.timestamp();
let crawler_state =
CrawlerState::new(block_height, epoch, timestamp_in_sec);
let timestamp = DateTimeUtc::now().0.timestamp();

let crawler_state = ChainCrawlerState {
last_processed_block: block_height,
last_processed_epoch: epoch,
timestamp,
};

tracing::info!("Inserting initial data... ");

Expand All @@ -335,7 +344,7 @@ async fn initial_query(
repository::pos::insert_bonds(transaction_conn, bonds)?;
repository::pos::insert_unbonds(transaction_conn, unbonds)?;

repository::crawler::insert_crawler_state(
repository::crawler_state::upsert_crawler_state(
transaction_conn,
crawler_state,
)?;
Expand All @@ -345,7 +354,43 @@ async fn initial_query(
})
.await
.context_db_interact_error()
.into_db_error()?
.context("Commit initial db transaction error")
.and_then(identity)
.into_db_error()
}

async fn can_process(
block_height: u32,
client: Arc<HttpClient>,
) -> Result<bool, MainError> {
tracing::info!("Attempting to process block: {}...", block_height);

let last_block_height = namada_service::query_last_block_height(&client)
.await
.map_err(|e| {
tracing::error!(
"Failed to query Namada's last committed block: {}",
e
);
MainError::RpcError
})?;

Ok(last_block_height >= block_height)
}

async fn update_crawler_timestamp(
conn: &Object,
timestamp: NaiveDateTime,
) -> Result<(), MainError> {
conn.interact(move |transaction_conn| {
repository::crawler_state::update_crawler_timestamp(
transaction_conn,
timestamp,
)?;

anyhow::Ok(())
})
.await
.context_db_interact_error()
.and_then(identity)
.into_db_error()
}
18 changes: 0 additions & 18 deletions chain/src/repository/crawler.rs

This file was deleted.

43 changes: 43 additions & 0 deletions chain/src/repository/crawler_state.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
use anyhow::Context;
use chrono::NaiveDateTime;
use diesel::upsert::excluded;
use diesel::{ExpressionMethods, PgConnection, RunQueryDsl};
use orm::crawler_state::{ChainStateInsertDb, CrawlerNameDb};
use orm::schema::crawler_state;
use shared::crawler_state::{ChainCrawlerState, CrawlerName};

pub fn upsert_crawler_state(
transaction_conn: &mut PgConnection,
crawler_state: ChainCrawlerState,
) -> anyhow::Result<()> {
diesel::insert_into(crawler_state::table)
.values::<&ChainStateInsertDb>(
&(CrawlerName::Chain, crawler_state).into(),
)
.on_conflict(crawler_state::name)
.do_update()
.set((
crawler_state::timestamp.eq(excluded(crawler_state::timestamp)),
crawler_state::last_processed_block
.eq(excluded(crawler_state::last_processed_block)),
crawler_state::last_processed_epoch
.eq(excluded(crawler_state::last_processed_epoch)),
))
.execute(transaction_conn)
.context("Failed to update crawler state in db")?;

anyhow::Ok(())
}

pub fn update_crawler_timestamp(
transaction_conn: &mut PgConnection,
timestamp: NaiveDateTime,
) -> anyhow::Result<()> {
diesel::update(crawler_state::table)
.filter(crawler_state::name.eq(CrawlerNameDb::from(CrawlerName::Chain)))
.set(crawler_state::timestamp.eq(timestamp))
.execute(transaction_conn)
.context("Failed to update crawler timestamp in db")?;

anyhow::Ok(())
}
28 changes: 23 additions & 5 deletions chain/src/repository/gov.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
use anyhow::Context;
use diesel::{PgConnection, RunQueryDsl};
use diesel::upsert::excluded;
use diesel::{ExpressionMethods, PgConnection, RunQueryDsl};
use orm::governance_proposal::GovernanceProposalInsertDb;
use orm::governance_votes::GovernanceProposalVoteInsertDb;
use orm::schema::{governance_proposals, governance_votes};
use shared::proposal::{GovernanceProposal, TallyType};
use shared::vote::GovernanceVote;

pub fn insert_proposals(
transaction_conn: &mut PgConnection,
proposals: Vec<(GovernanceProposal, TallyType)>,
) -> anyhow::Result<()> {
diesel::insert_into(orm::schema::governance_proposals::table)
diesel::insert_into(governance_proposals::table)
.values::<&Vec<GovernanceProposalInsertDb>>(
&proposals
.into_iter()
Expand All @@ -20,7 +22,18 @@ pub fn insert_proposals(
})
.collect::<Vec<_>>(),
)
.on_conflict_do_nothing()
.on_conflict(governance_proposals::id)
.do_update()
.set((
governance_proposals::result
.eq(excluded(governance_proposals::result)),
governance_proposals::yay_votes
.eq(excluded(governance_proposals::yay_votes)),
governance_proposals::nay_votes
.eq(excluded(governance_proposals::nay_votes)),
governance_proposals::abstain_votes
.eq(excluded(governance_proposals::abstain_votes)),
))
.execute(transaction_conn)
.context("Failed to update governance proposals in db")?;

Expand All @@ -31,7 +44,7 @@ pub fn insert_votes(
transaction_conn: &mut PgConnection,
proposals_votes: Vec<GovernanceVote>,
) -> anyhow::Result<()> {
diesel::insert_into(orm::schema::governance_votes::table)
diesel::insert_into(governance_votes::table)
.values::<&Vec<GovernanceProposalVoteInsertDb>>(
&proposals_votes
.into_iter()
Expand All @@ -40,7 +53,12 @@ pub fn insert_votes(
})
.collect::<Vec<_>>(),
)
.on_conflict_do_nothing()
.on_conflict((
governance_votes::voter_address,
governance_votes::proposal_id,
))
.do_update()
.set((governance_votes::kind.eq(excluded(governance_votes::kind)),))
.execute(transaction_conn)
.context("Failed to update governance votes in db")?;

Expand Down
2 changes: 1 addition & 1 deletion chain/src/repository/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
pub mod balance;
pub mod crawler;
pub mod crawler_state;
pub mod gov;
pub mod pos;
pub mod revealed_pk;
1 change: 1 addition & 0 deletions chain/src/repository/revealed_pk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ pub fn insert_revealed_pks(
.map(|(pk, address)| RevealedPkInsertDb::from(pk, address))
.collect::<Vec<_>>(),
)
// If pk was already revealed, do nothing
.on_conflict_do_nothing()
.execute(transaction_conn)
.context("Failed to update balances in db")?;
Expand Down
Loading

0 comments on commit 0aa9369

Please sign in to comment.