Skip to content

Commit

Permalink
feat: support optional block timestamp (#111)
Browse files Browse the repository at this point in the history
  • Loading branch information
joshstevens19 authored Oct 8, 2024
1 parent cf3b98c commit d1683fb
Show file tree
Hide file tree
Showing 7 changed files with 35 additions and 13 deletions.
2 changes: 1 addition & 1 deletion cli/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ start_sophon:
start_xenea:
RUSTFLAGS='-C target-cpu=native' RUST_BACKTRACE='full' cargo run --release --features jemalloc -- start --path $(CURDIR)/../examples/xenea-bug graphql
codegen_typings_lens:
RUSTFLAGS='-C target-cpu=native' cargo run --release --features jemalloc -- codegen --path $(CURDIR)/../../lens-backend/crates/indexer typings
cargo run -- codegen --path $(CURDIR)/../../lens-backend/crates/lens-indexer typings
codegen_indexer_lens:
RUSTFLAGS='-C target-cpu=native' cargo run --release --features jemalloc -- codegen --path $(CURDIR)/../../lens-backend/crates/indexer indexer
start_uniswap_base:
Expand Down
13 changes: 8 additions & 5 deletions core/src/event/callback_registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use tracing::{debug, error};
use crate::{
event::contract_setup::{ContractInformation, NetworkContract},
indexer::start::ProcessedNetworkContract,
provider::WrappedLog,
};

pub type Decoder = Arc<dyn Fn(Vec<H256>, Bytes) -> Arc<dyn Any + Send + Sync> + Send + Sync>;
Expand All @@ -30,6 +31,7 @@ pub struct TxInformation {
pub address: Address,
pub block_hash: H256,
pub block_number: U64,
pub block_timestamp: Option<U256>,
pub transaction_hash: H256,
pub log_index: U256,
pub transaction_index: U64,
Expand All @@ -52,20 +54,21 @@ pub struct EventResult {
impl EventResult {
pub fn new(
network_contract: Arc<NetworkContract>,
log: Log,
log: WrappedLog,
start_block: U64,
end_block: U64,
) -> Self {
let log_meta = LogMeta::from(&log);
let log_address = log.address;
let log_meta = LogMeta::from(&log.inner);
let log_address = log.inner.address;
Self {
log: log.clone(),
decoded_data: network_contract.decode_log(log),
log: log.inner.clone(),
decoded_data: network_contract.decode_log(log.inner),
tx_information: TxInformation {
network: network_contract.network.to_string(),
address: log_address,
block_hash: log_meta.block_hash,
block_number: log_meta.block_number,
block_timestamp: log.block_timestamp,
transaction_hash: log_meta.transaction_hash,
transaction_index: log_meta.transaction_index,
log_index: log_meta.log_index,
Expand Down
11 changes: 7 additions & 4 deletions core/src/indexer/fetch_logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::{error::Error, str::FromStr, sync::Arc, time::Duration};
use ethers::{
addressbook::Address,
middleware::MiddlewareError,
prelude::{BlockNumber, JsonRpcError, Log, ValueOrArray, H256, U64},
prelude::{BlockNumber, JsonRpcError, ValueOrArray, H256, U64},
};
use regex::Regex;
use tokio::{
Expand All @@ -16,11 +16,11 @@ use tracing::{debug, error, info, warn};
use crate::{
event::{config::EventProcessingConfig, RindexerEventFilter},
indexer::{log_helpers::is_relevant_block, IndexingEventProgressStatus},
provider::JsonRpcCachedProvider,
provider::{JsonRpcCachedProvider, WrappedLog},
};

pub struct FetchLogsResult {
pub logs: Vec<Log>,
pub logs: Vec<WrappedLog>,
pub from_block: U64,
pub to_block: U64,
}
Expand Down Expand Up @@ -243,6 +243,7 @@ async fn fetch_historic_logs_stream(

if let Some(last_log) = last_log {
let next_from_block = last_log
.inner
.block_number
.expect("block number should always be present in a log") +
U64::from(1);
Expand Down Expand Up @@ -466,7 +467,9 @@ async fn live_indexing_stream(
to_block
);
} else if let Some(last_log) = last_log {
if let Some(last_log_block_number) = last_log.block_number {
if let Some(last_log_block_number) =
last_log.inner.block_number
{
current_filter = current_filter.set_from_block(
last_log_block_number + U64::from(1),
);
Expand Down
1 change: 1 addition & 0 deletions core/src/indexer/no_code.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ fn no_code_callback(params: Arc<NoCodeCallbackParams>) -> EventCallbackType {
block_hash,
block_number,
transaction_hash,
block_timestamp: None,
log_index,
transaction_index,
},
Expand Down
2 changes: 1 addition & 1 deletion core/src/indexer/process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,7 @@ async fn live_indexing_for_contract_event_dependencies<'a>(
);
} else if let Some(last_log) = last_log {
if let Some(last_log_block_number) =
last_log.block_number
last_log.inner.block_number
{
ordering_live_indexing_details.filter =
ordering_live_indexing_details
Expand Down
18 changes: 16 additions & 2 deletions core/src/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use ethers::{
types::{Block, BlockNumber, H256, U256, U64},
};
use reqwest::header::HeaderMap;
use serde::{Deserialize, Serialize};
use thiserror::Error;
use tokio::sync::Mutex;
use url::Url;
Expand All @@ -23,6 +24,16 @@ pub struct JsonRpcCachedProvider {
pub max_block_range: Option<U64>,
}

/// TODO: This is a temporary type until we migrate to alloy
#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
pub struct WrappedLog {
#[serde(flatten)]
pub inner: Log,
#[serde(rename = "blockTimestamp")]
#[serde(skip_serializing_if = "Option::is_none")]
pub block_timestamp: Option<U256>,
}

impl JsonRpcCachedProvider {
pub fn new(provider: Provider<RetryClient<Http>>, max_block_range: Option<U64>) -> Self {
JsonRpcCachedProvider {
Expand Down Expand Up @@ -58,8 +69,11 @@ impl JsonRpcCachedProvider {
self.provider.get_block_number().await
}

pub async fn get_logs(&self, filter: &RindexerEventFilter) -> Result<Vec<Log>, ProviderError> {
self.provider.get_logs(filter.raw_filter()).await
pub async fn get_logs(
&self,
filter: &RindexerEventFilter,
) -> Result<Vec<WrappedLog>, ProviderError> {
self.provider.request("eth_getLogs", [filter.raw_filter()]).await
}

pub async fn get_chain_id(&self) -> Result<U256, ProviderError> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,7 @@ pub struct TxInformation {
pub address: Address,
pub block_hash: H256,
pub block_number: U64,
pub block_timestamp: Option<U256>,
pub transaction_hash: H256,
pub log_index: U256,
pub transaction_index: U64,
Expand Down

0 comments on commit d1683fb

Please sign in to comment.