Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: missing block catchup #26

Merged
merged 6 commits into from
Jul 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ clap = { version ="4.3", features =["derive", "env"] }
url = "2.3"
serde_json = "1.0.96"
rusqlite = { version = "0.29", features = ["bundled"] }
strum = "0.24.1"
futures = "0.3.28"

[dev-dependencies]
Expand Down
2 changes: 1 addition & 1 deletion src/adapters.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
pub mod block_fetcher;
pub mod ethereum_adapter;
pub mod fuel_adapter;
pub mod runner;
pub mod storage;
2 changes: 1 addition & 1 deletion src/adapters/ethereum_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use futures::Stream;
pub use monitored_adapter::MonitoredEthAdapter;
pub use websocket::EthereumWs;

use crate::{adapters::block_fetcher::FuelBlock, errors::Result};
use crate::{adapters::fuel_adapter::FuelBlock, errors::Result};

#[derive(Debug, Clone, Copy)]
pub struct FuelBlockCommitedOnEth {
Expand Down
2 changes: 1 addition & 1 deletion src/adapters/ethereum_adapter/monitored_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ use prometheus::{IntCounter, Opts};

use crate::{
adapters::{
block_fetcher::FuelBlock,
ethereum_adapter::{EthereumAdapter, EventStreamer},
fuel_adapter::FuelBlock,
},
errors::{Error, Result},
telemetry::{ConnectionHealthTracker, HealthChecker, RegistersMetrics},
Expand Down
2 changes: 1 addition & 1 deletion src/adapters/ethereum_adapter/websocket/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ use url::Url;

use crate::{
adapters::{
block_fetcher::FuelBlock,
ethereum_adapter::{
websocket::event_streamer::EthEventStreamer, EthereumAdapter, EventStreamer,
},
fuel_adapter::FuelBlock,
},
errors::{Error, Result},
};
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
mod fuel_block_fetcher;
mod fuel_client;
mod fuel_metrics;

pub use fuel_block_fetcher::FuelBlockFetcher;
pub use fuel_client::FuelClient;
use serde::{Deserialize, Serialize};

use crate::errors::Result;
Expand All @@ -14,6 +14,7 @@ pub struct FuelBlock {

#[cfg_attr(test, mockall::automock)]
#[async_trait::async_trait]
pub trait BlockFetcher: Send + Sync {
pub trait FuelAdapter: Send + Sync {
async fn block_at_height(&self, height: u32) -> Result<Option<FuelBlock>>;
async fn latest_block(&self) -> Result<FuelBlock>;
}
Original file line number Diff line number Diff line change
@@ -1,27 +1,27 @@
use fuel_core_client::client::FuelClient;
use fuel_core_client::client::{schema::block::Block as FuelGqlBlock, FuelClient as FuelGqlClient};
use url::Url;

use crate::{
adapters::block_fetcher::{fuel_metrics::FuelMetrics, BlockFetcher, FuelBlock},
adapters::fuel_adapter::{fuel_metrics::FuelMetrics, FuelAdapter, FuelBlock},
errors::{Error, Result},
telemetry::{ConnectionHealthTracker, HealthChecker, RegistersMetrics},
};

impl RegistersMetrics for FuelBlockFetcher {
impl RegistersMetrics for FuelClient {
fn metrics(&self) -> Vec<Box<dyn prometheus::core::Collector>> {
self.metrics.metrics()
}
}

pub struct FuelBlockFetcher {
client: FuelClient,
pub struct FuelClient {
client: FuelGqlClient,
metrics: FuelMetrics,
health_tracker: ConnectionHealthTracker,
}

impl FuelBlockFetcher {
impl FuelClient {
pub fn new(url: &Url, unhealthy_after_n_errors: usize) -> Self {
let client = FuelClient::new(url).expect("Url to be well formed");
let client = FuelGqlClient::new(url).expect("Url to be well formed");
Self {
client,
metrics: FuelMetrics::default(),
Expand All @@ -43,18 +43,32 @@ impl FuelBlockFetcher {
}
}

impl From<FuelGqlBlock> for FuelBlock {
fn from(value: FuelGqlBlock) -> Self {
FuelBlock {
hash: *value.id.0 .0,
height: value.header.height.0,
}
}
}

#[async_trait::async_trait]
impl BlockFetcher for FuelBlockFetcher {
impl FuelAdapter for FuelClient {
async fn block_at_height(&self, height: u32) -> Result<Option<FuelBlock>> {
let maybe_block = self
.client
.block_by_height(height as u64)
.await
.map_err(|e| Error::NetworkError(e.to_string()))?;

Ok(maybe_block.map(Into::into))
}

async fn latest_block(&self) -> Result<FuelBlock> {
match self.client.chain_info().await {
Ok(chain_info) => {
self.handle_network_success();

let latest_block = chain_info.latest_block;
Ok(FuelBlock {
hash: *latest_block.id.0 .0,
height: latest_block.header.height.0,
})
Ok(chain_info.latest_block.into())
}
Err(err) => {
self.handle_network_error();
Expand Down Expand Up @@ -85,28 +99,51 @@ mod tests {

let url = Url::parse(&format!("http://{addr}")).unwrap();

let block_fetcher = FuelBlockFetcher::new(&url, 1);
let fuel_adapter = FuelClient::new(&url, 1);

// when
let result = block_fetcher.latest_block().await.unwrap();
let result = fuel_adapter.latest_block().await.unwrap();

// then
assert_eq!(result.height, 5);
}

#[tokio::test]
async fn can_fetch_block_at_height() {
// given
let node_config = Config {
manual_blocks_enabled: true,
..Config::local_node()
};

let (provider, addr) =
setup_test_provider(vec![], vec![], Some(node_config), Some(Default::default())).await;
provider.produce_blocks(5, None).await.unwrap();

let url = Url::parse(&format!("http://{addr}")).unwrap();

let fuel_adapter = FuelClient::new(&url, 1);

// when
let result = fuel_adapter.block_at_height(3).await.unwrap().unwrap();

// then
assert_eq!(result.height, 3);
}

#[tokio::test]
async fn updates_metrics_in_case_of_network_err() {
// temporary 'fake' address to cause a network error the same effect will be achieved by
// killing the node once the SDK supports it.
let url = Url::parse("localhost:12344").unwrap();

let block_fetcher = FuelBlockFetcher::new(&url, 1);
let fuel_adapter = FuelClient::new(&url, 1);

let registry = Registry::default();
block_fetcher.register_metrics(&registry);
fuel_adapter.register_metrics(&registry);

// when
let result = block_fetcher.latest_block().await;
let result = fuel_adapter.latest_block().await;

// then
assert!(result.is_err());
Expand All @@ -127,18 +164,18 @@ mod tests {
// killing the node once the SDK supports it.
let url = Url::parse("http://localhost:12344").unwrap();

let block_fetcher = FuelBlockFetcher::new(&url, 3);
let health_check = block_fetcher.connection_health_checker();
let fuel_adapter = FuelClient::new(&url, 3);
let health_check = fuel_adapter.connection_health_checker();

assert!(health_check.healthy());

let _ = block_fetcher.latest_block().await;
let _ = fuel_adapter.latest_block().await;
assert!(health_check.healthy());

let _ = block_fetcher.latest_block().await;
let _ = fuel_adapter.latest_block().await;
assert!(health_check.healthy());

let _ = block_fetcher.latest_block().await;
let _ = fuel_adapter.latest_block().await;
assert!(!health_check.healthy());
}
}
2 changes: 1 addition & 1 deletion src/adapters/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ pub mod sqlite_db;
use async_trait::async_trait;
use serde::{Deserialize, Serialize};

use crate::{adapters::block_fetcher::FuelBlock, errors::Result};
use crate::{adapters::fuel_adapter::FuelBlock, errors::Result};

#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
pub struct BlockSubmission {
Expand Down
2 changes: 1 addition & 1 deletion src/adapters/storage/sqlite_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use tokio::{sync::Mutex, task};

use crate::{
adapters::{
block_fetcher::FuelBlock,
fuel_adapter::FuelBlock,
storage::{BlockSubmission, Storage},
},
errors::{Error, Result},
Expand Down
2 changes: 1 addition & 1 deletion src/services/block_committer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ use tracing::{error, info};

use crate::{
adapters::{
block_fetcher::FuelBlock,
ethereum_adapter::EthereumAdapter,
fuel_adapter::FuelBlock,
runner::Runner,
storage::{BlockSubmission, Storage},
},
Expand Down
Loading