Skip to content

Commit

Permalink
Extract indexing engine
Browse files Browse the repository at this point in the history
commit-id:bf2373d1
  • Loading branch information
tarrencev committed Jun 3, 2023
1 parent 3ae23da commit 95c1fc8
Show file tree
Hide file tree
Showing 3 changed files with 167 additions and 126 deletions.
4 changes: 2 additions & 2 deletions crates/torii/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ use url::Url;

use crate::indexer::start_indexer;

mod processors;

mod engine;
mod graphql;
mod indexer;
mod processors;
mod storage;
mod tests;

Expand Down
161 changes: 161 additions & 0 deletions crates/torii/src/engine.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
use std::error::Error;
use std::sync::Arc;
use std::time::Duration;

use starknet::core::types::{
BlockId, BlockWithTxs, Event, InvokeTransaction, MaybePendingBlockWithTxs,
MaybePendingTransactionReceipt, StarknetError, Transaction, TransactionReceipt,
};
use starknet::providers::jsonrpc::{JsonRpcClient, JsonRpcTransport};
use starknet::providers::{Provider, ProviderError};
use tokio::time::sleep;
use tracing::error;

use crate::processors::{BlockProcessor, EventProcessor, TransactionProcessor};
use crate::storage::Storage;

pub struct Processors<S: Storage, T: JsonRpcTransport + Sync + Send> {
block: Vec<Arc<dyn BlockProcessor<S, T>>>,
transaction: Vec<Arc<dyn TransactionProcessor<S, T>>>,
event: Vec<Arc<dyn EventProcessor<S, T>>>,
}

impl<S: Storage, T: JsonRpcTransport + Sync + Send> Default for Processors<S, T> {
fn default() -> Self {
Self { block: vec![], transaction: vec![], event: vec![] }
}
}

pub struct Engine<'a, S: Storage, T: JsonRpcTransport + Sync + Send> {
storage: &'a S,
provider: &'a JsonRpcClient<T>,
processors: Processors<S, T>,
}

impl<'a, S: Storage, T: JsonRpcTransport + Sync + Send> Engine<'a, S, T> {
pub fn new(
storage: &'a S,
provider: &'a JsonRpcClient<T>,
processors: Processors<S, T>,
) -> Self {
Self { storage, provider, processors }
}

pub async fn start(&self) -> Result<(), Box<dyn Error>> {
let mut current_block_number = self.storage.head().await?;

loop {
sleep(Duration::from_secs(1)).await;

let block_with_txs =
match self.provider.get_block_with_txs(BlockId::Number(current_block_number)).await
{
Ok(block_with_txs) => block_with_txs,
Err(e) => {
if let ProviderError::StarknetError(StarknetError::BlockNotFound) = e {
continue;
}

error!("getting block: {}", e);
continue;
}
};

let block_with_txs = match block_with_txs {
MaybePendingBlockWithTxs::Block(block_with_txs) => block_with_txs,
_ => continue,
};

process_block(self.storage, self.provider, &self.processors.block, &block_with_txs)
.await?;

for transaction in block_with_txs.transactions {
let invoke_transaction = match &transaction {
Transaction::Invoke(invoke_transaction) => invoke_transaction,
_ => continue,
};

let invoke_transaction = match invoke_transaction {
InvokeTransaction::V1(invoke_transaction) => invoke_transaction,
_ => continue,
};

let receipt = match self
.provider
.get_transaction_receipt(invoke_transaction.transaction_hash)
.await
{
Ok(receipt) => receipt,
_ => continue,
};

let receipt = match receipt {
MaybePendingTransactionReceipt::Receipt(receipt) => receipt,
_ => continue,
};

process_transaction(
self.storage,
self.provider,
&self.processors.transaction,
&receipt.clone(),
)
.await?;

if let TransactionReceipt::Invoke(invoke_receipt) = receipt.clone() {
for event in &invoke_receipt.events {
process_event(
self.storage,
self.provider,
&self.processors.event,
&receipt,
event,
)
.await?;
}
}
}

current_block_number += 1;
}
}
}

async fn process_block<S: Storage, T: starknet::providers::jsonrpc::JsonRpcTransport>(
storage: &S,
provider: &JsonRpcClient<T>,
processors: &[Arc<dyn BlockProcessor<S, T>>],
block: &BlockWithTxs,
) -> Result<(), Box<dyn Error>> {
for processor in processors {
processor.process(storage, provider, block).await?;
}
Ok(())
}

async fn process_transaction<S: Storage, T: starknet::providers::jsonrpc::JsonRpcTransport>(
storage: &S,
provider: &JsonRpcClient<T>,
processors: &[Arc<dyn TransactionProcessor<S, T>>],
receipt: &TransactionReceipt,
) -> Result<(), Box<dyn Error>> {
for processor in processors {
processor.process(storage, provider, receipt).await?;
}

Ok(())
}

async fn process_event<S: Storage, T: starknet::providers::jsonrpc::JsonRpcTransport>(
storage: &S,
provider: &JsonRpcClient<T>,
processors: &[Arc<dyn EventProcessor<S, T>>],
_receipt: &TransactionReceipt,
event: &Event,
) -> Result<(), Box<dyn Error>> {
for processor in processors {
processor.process(storage, provider, event).await?;
}

Ok(())
}
128 changes: 4 additions & 124 deletions crates/torii/src/indexer.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,14 @@
use std::error::Error;
use std::sync::Arc;
use std::time::Duration;

use num::BigUint;
use starknet::core::types::{
BlockId, BlockWithTxs, Event, InvokeTransaction, MaybePendingBlockWithTxs,
MaybePendingTransactionReceipt, StarknetError, Transaction, TransactionReceipt,
};
use starknet::providers::jsonrpc::{JsonRpcClient, JsonRpcTransport};
use starknet::providers::{Provider, ProviderError};
use tokio::time::sleep;
use tokio_util::sync::CancellationToken;
use tracing::{error, info};
use tracing::info;

use crate::engine::{Engine, Processors};
// use crate::processors::component_register::ComponentRegistrationProcessor;
// use crate::processors::component_state_update::ComponentStateUpdateProcessor;
// use crate::processors::system_register::SystemRegistrationProcessor;
use crate::processors::{BlockProcessor, EventProcessor, TransactionProcessor};
use crate::storage::Storage;

pub async fn start_indexer<S: Storage, T: JsonRpcTransport + Sync + Send>(
Expand All @@ -27,120 +19,8 @@ pub async fn start_indexer<S: Storage, T: JsonRpcTransport + Sync + Send>(
) -> Result<(), Box<dyn Error>> {
info!("starting indexer");

let block_processors: Vec<Arc<dyn BlockProcessor<S, T>>> = vec![];
let transaction_processors: Vec<Arc<dyn TransactionProcessor<S, T>>> = vec![];
let event_processors: Vec<Arc<dyn EventProcessor<S, T>>> = vec![];
let engine = Engine::new(storage, provider, Processors::default());
engine.start().await?;

let mut current_block_number = storage.head().await?;

loop {
sleep(Duration::from_secs(1)).await;

let block_with_txs =
match provider.get_block_with_txs(BlockId::Number(current_block_number)).await {
Ok(block_with_txs) => block_with_txs,
Err(e) => {
if let ProviderError::StarknetError(StarknetError::BlockNotFound) = e {
continue;
}

error!("getting block: {}", e);
continue;
}
};

let block_with_txs = match block_with_txs {
MaybePendingBlockWithTxs::Block(block_with_txs) => block_with_txs,
_ => continue,
};

process_block(storage, provider, &block_processors, &block_with_txs).await?;

for transaction in block_with_txs.transactions {
let invoke_transaction = match &transaction {
Transaction::Invoke(invoke_transaction) => invoke_transaction,
_ => continue,
};

let invoke_transaction = match invoke_transaction {
InvokeTransaction::V1(invoke_transaction) => invoke_transaction,
_ => continue,
};

let receipt =
match provider.get_transaction_receipt(invoke_transaction.transaction_hash).await {
Ok(receipt) => receipt,
_ => continue,
};

let receipt = match receipt {
MaybePendingTransactionReceipt::Receipt(receipt) => receipt,
_ => continue,
};

process_transaction(storage, provider, &transaction_processors, &receipt.clone())
.await?;

if let TransactionReceipt::Invoke(invoke_receipt) = receipt.clone() {
for event in &invoke_receipt.events {
process_event(storage, provider, &event_processors, &receipt, event).await?;
}
}
}

current_block_number += 1;
}
}

async fn process_block<S: Storage, T: starknet::providers::jsonrpc::JsonRpcTransport>(
storage: &S,
provider: &JsonRpcClient<T>,
processors: &[Arc<dyn BlockProcessor<S, T>>],
block: &BlockWithTxs,
) -> Result<(), Box<dyn Error>> {
for processor in processors {
processor.process(storage, provider, block).await?;
}
Ok(())
}

async fn process_transaction<S: Storage, T: starknet::providers::jsonrpc::JsonRpcTransport>(
storage: &S,
provider: &JsonRpcClient<T>,
processors: &[Arc<dyn TransactionProcessor<S, T>>],
receipt: &TransactionReceipt,
) -> Result<(), Box<dyn Error>> {
for processor in processors {
processor.process(storage, provider, receipt).await?;
}

Ok(())
}

async fn process_event<S: Storage, T: starknet::providers::jsonrpc::JsonRpcTransport>(
storage: &S,
provider: &JsonRpcClient<T>,
processors: &[Arc<dyn EventProcessor<S, T>>],
_receipt: &TransactionReceipt,
event: &Event,
) -> Result<(), Box<dyn Error>> {
for processor in processors {
processor.process(storage, provider, event).await?;
}

Ok(())
}

// #[test]
// fn test_indexer() {
// use crate::start_apibara;

// let rpc_url = "http://localhost:5050";
// let (sequencer, rpc) = build_mock_rpc(5050);
// let ct = CancellationToken::new();
// let pool = sqlx::sqlite::SqlitePool::connect("sqlite::memory:").unwrap();
// let world = BigUint::from(0x1234567890);
// let provider = JsonRpcClient::new(HttpTransport::new(Uri::parse(rpc_url)));

// start_indexer(ct, world, Uri::from_str("http://localhost:7171").unwrap(), pool, &provider)
// }

0 comments on commit 95c1fc8

Please sign in to comment.