diff --git a/.sqlx/query-111a622bad3c042be9ee7fdf3ce9db91900473950c921260eaee6c3bff4c7096.json b/.sqlx/query-111a622bad3c042be9ee7fdf3ce9db91900473950c921260eaee6c3bff4c7096.json new file mode 100644 index 00000000..b8acf613 --- /dev/null +++ b/.sqlx/query-111a622bad3c042be9ee7fdf3ce9db91900473950c921260eaee6c3bff4c7096.json @@ -0,0 +1,16 @@ +{ + "db_name": "PostgreSQL", + "query": "UPDATE l1_blob_transaction SET state = $1, finalized_at = $2 WHERE nonce = $3", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Int2", + "Timestamptz", + "Int8" + ] + }, + "nullable": [] + }, + "hash": "111a622bad3c042be9ee7fdf3ce9db91900473950c921260eaee6c3bff4c7096" +} diff --git a/.sqlx/query-2096d1ddfd76357493610c7d1a11022ce99ec1d9fd603fb10d4723ca05267627.json b/.sqlx/query-2096d1ddfd76357493610c7d1a11022ce99ec1d9fd603fb10d4723ca05267627.json new file mode 100644 index 00000000..7583fe66 --- /dev/null +++ b/.sqlx/query-2096d1ddfd76357493610c7d1a11022ce99ec1d9fd603fb10d4723ca05267627.json @@ -0,0 +1,23 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT EXISTS (SELECT 1 FROM l1_blob_transaction WHERE state = $1 OR state = $2) AS has_nonfinalized_transactions;", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "has_nonfinalized_transactions", + "type_info": "Bool" + } + ], + "parameters": { + "Left": [ + "Int2", + "Int2" + ] + }, + "nullable": [ + null + ] + }, + "hash": "2096d1ddfd76357493610c7d1a11022ce99ec1d9fd603fb10d4723ca05267627" +} diff --git a/.sqlx/query-54e8d847be33e87b093cd912a5103ee17bbca6e95490b89b42ae9c0e181bafdd.json b/.sqlx/query-54e8d847be33e87b093cd912a5103ee17bbca6e95490b89b42ae9c0e181bafdd.json deleted file mode 100644 index 965495c6..00000000 --- a/.sqlx/query-54e8d847be33e87b093cd912a5103ee17bbca6e95490b89b42ae9c0e181bafdd.json +++ /dev/null @@ -1,54 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n SELECT f.*\n FROM l1_fragments f\n LEFT JOIN l1_transaction_fragments tf ON tf.fragment_id = f.id\n LEFT JOIN l1_blob_transaction t ON t.id = tf.transaction_id\n JOIN bundles b ON b.id = f.bundle_id\n WHERE (t.id IS NULL OR t.state = $1) \n AND b.end_height >= $2 -- Exclude bundles ending before starting_height\n ORDER BY b.start_height ASC, f.idx ASC\n LIMIT $3;\n ", - "describe": { - "columns": [ - { - "ordinal": 0, - "name": "id", - "type_info": "Int4" - }, - { - "ordinal": 1, - "name": "idx", - "type_info": "Int4" - }, - { - "ordinal": 2, - "name": "data", - "type_info": "Bytea" - }, - { - "ordinal": 3, - "name": "total_bytes", - "type_info": "Int8" - }, - { - "ordinal": 4, - "name": "unused_bytes", - "type_info": "Int8" - }, - { - "ordinal": 5, - "name": "bundle_id", - "type_info": "Int4" - } - ], - "parameters": { - "Left": [ - "Int2", - "Int8", - "Int8" - ] - }, - "nullable": [ - false, - false, - false, - false, - false, - false - ] - }, - "hash": "54e8d847be33e87b093cd912a5103ee17bbca6e95490b89b42ae9c0e181bafdd" -} diff --git a/.sqlx/query-fdf01b0abadc3b475b6b057d0cb2177bd92c6739d1c13ceda27478d1a9705f1a.json b/.sqlx/query-fdf01b0abadc3b475b6b057d0cb2177bd92c6739d1c13ceda27478d1a9705f1a.json new file mode 100644 index 00000000..0b31d99d --- /dev/null +++ b/.sqlx/query-fdf01b0abadc3b475b6b057d0cb2177bd92c6739d1c13ceda27478d1a9705f1a.json @@ -0,0 +1,54 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT \n sub.id, \n sub.idx, \n sub.bundle_id, \n sub.data, \n sub.unused_bytes, \n sub.total_bytes\n FROM (\n SELECT DISTINCT ON (f.id) \n f.*, \n b.start_height\n FROM l1_fragments f\n JOIN bundles b ON b.id = f.bundle_id\n WHERE \n b.end_height >= $2\n AND NOT EXISTS (\n SELECT 1\n FROM l1_transaction_fragments tf\n JOIN l1_blob_transaction t ON t.id = tf.transaction_id\n WHERE tf.fragment_id = f.id \n AND t.state <> $1\n )\n ORDER BY \n f.id, \n b.start_height ASC, \n f.idx ASC\n ) AS sub\n ORDER BY \n sub.start_height ASC, \n sub.idx ASC\n LIMIT $3;\n", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "id", + "type_info": "Int4" + }, + { + "ordinal": 1, + "name": "idx", + "type_info": "Int4" + }, + { + "ordinal": 2, + "name": "bundle_id", + "type_info": "Int4" + }, + { + "ordinal": 3, + "name": "data", + "type_info": "Bytea" + }, + { + "ordinal": 4, + "name": "unused_bytes", + "type_info": "Int8" + }, + { + "ordinal": 5, + "name": "total_bytes", + "type_info": "Int8" + } + ], + "parameters": { + "Left": [ + "Int2", + "Int8", + "Int8" + ] + }, + "nullable": [ + false, + false, + false, + false, + false, + false + ] + }, + "hash": "fdf01b0abadc3b475b6b057d0cb2177bd92c6739d1c13ceda27478d1a9705f1a" +} diff --git a/Cargo.lock b/Cargo.lock index 9af3152b..41257eaf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -241,6 +241,7 @@ dependencies = [ "alloy-contract", "alloy-core", "alloy-eips", + "alloy-json-rpc", "alloy-network", "alloy-node-bindings", "alloy-provider", @@ -2653,8 +2654,8 @@ dependencies = [ "mockall", "ports", "pretty_assertions", + "proptest", "rand", - "rayon", "serde", "test-case", "thiserror", diff --git a/Cargo.toml b/Cargo.toml index 34c486d4..ad566391 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -34,6 +34,7 @@ clock = { path = "./packages/clock", default-features = false } actix-web = { version = "4", default-features = false } bytesize = { version = "1.3", default-features = false } alloy = { version = "0.3.6", default-features = false } +proptest = { version = "1.0", default-features = false } rayon = { version = "1.10", default-features = false } num_cpus = { version = "1.16", default-features = false } anyhow = { version = "1.0", default-features = false } @@ -54,7 +55,7 @@ hex = { version = "0.4", default-features = false } humantime = { version = "2.1", default-features = false } itertools = { version = "0.13", default-features = false } mockall = { version = "0.12", default-features = false } -num-bigint = { version = "0.4.6", default-features = false} +num-bigint = { version = "0.4.6", default-features = false } nonempty = { version = "0.10", default-features = false } portpicker = { version = "0.1", default-features = false } pretty_assertions = { version = "1.4", default-features = false } diff --git a/committer/src/main.rs b/committer/src/main.rs index ca2b9d37..fff06f4a 100644 --- a/committer/src/main.rs +++ b/committer/src/main.rs @@ -8,6 +8,7 @@ use api::launch_api_server; use errors::{Result, WithContext}; use metrics::prometheus::Registry; use ports::l1::Contract; +use setup::last_finalization_metric; use tokio_util::sync::CancellationToken; use crate::setup::shut_down; @@ -29,7 +30,8 @@ async fn main() -> Result<()> { let metrics_registry = Registry::default(); - let storage = setup::storage(&config, &metrics_registry) + let finalization_metric = last_finalization_metric(); + let storage = setup::storage(&config, &metrics_registry, &finalization_metric) .await .with_context(|| "failed to connect to database")?; @@ -92,6 +94,7 @@ async fn main() -> Result<()> { cancel_token.clone(), &metrics_registry, &config, + finalization_metric, ); handles.push(state_committer_handle); diff --git a/committer/src/setup.rs b/committer/src/setup.rs index e4e2a3a2..2c6a72e7 100644 --- a/committer/src/setup.rs +++ b/committer/src/setup.rs @@ -1,8 +1,12 @@ use std::{num::NonZeroU32, time::Duration}; use clock::SystemClock; -use eth::{AwsConfig, Eip4844BlobEncoder}; -use metrics::{prometheus::Registry, HealthChecker, RegistersMetrics}; +use eth::{AwsConfig, Eip4844BlobEncoder, KmsKeys}; +use metrics::{ + prometheus::{IntGauge, Registry}, + HealthChecker, RegistersMetrics, +}; +use ports::storage::Storage; use services::{ BlockBundler, BlockBundlerConfig, BlockCommitter, BlockValidator, Runner, WalletBalanceTracker, }; @@ -156,18 +160,28 @@ pub fn block_importer( ) } +pub fn last_finalization_metric() -> IntGauge { + IntGauge::new( + "seconds_since_last_finalized_fragment", + "The number of seconds since the last finalized fragment", + ) + .expect("seconds_since_last_finalized_fragment gauge to be correctly configured") +} + pub fn state_listener( l1: L1, storage: Database, cancel_token: CancellationToken, registry: &Registry, config: &config::Config, + last_finalization: IntGauge, ) -> tokio::task::JoinHandle<()> { let state_listener = services::StateListener::new( l1, storage, config.app.num_blocks_to_finalize_tx, SystemClock, + last_finalization, ); state_listener.register_metrics(registry); @@ -192,12 +206,16 @@ pub async fn l1_adapter( let l1 = L1::connect( config.eth.rpc.clone(), config.eth.state_contract_address, - config.eth.main_key_arn.clone(), - config.eth.blob_pool_key_arn.clone(), + KmsKeys { + main_key_arn: config.eth.main_key_arn.clone(), + blob_pool_key_arn: config.eth.blob_pool_key_arn.clone(), + }, internal_config.eth_errors_before_unhealthy, aws_client, - config.app.tx_max_fee as u128, - config.app.send_tx_request_timeout, + eth::TxConfig { + tx_max_fee: config.app.tx_max_fee as u128, + send_tx_request_timeout: config.app.send_tx_request_timeout, + }, ) .await?; @@ -257,12 +275,20 @@ pub fn logger() { .init(); } -pub async fn storage(config: &config::Config, registry: &Registry) -> Result { +pub async fn storage( + config: &config::Config, + registry: &Registry, + last_finalization: &IntGauge, +) -> Result { let postgres = Database::connect(&config.app.db).await?; postgres.migrate().await?; postgres.register_metrics(registry); + if let Some(last_fragment_time) = postgres.last_time_a_fragment_was_finalized().await? { + last_finalization.set(last_fragment_time.timestamp()); + } + Ok(postgres) } diff --git a/e2e/src/eth_node/state_contract.rs b/e2e/src/eth_node/state_contract.rs index 3eaca93b..5ae61e0c 100644 --- a/e2e/src/eth_node/state_contract.rs +++ b/e2e/src/eth_node/state_contract.rs @@ -27,20 +27,23 @@ impl DeployedContract { address: Address, key: KmsKey, tx_max_fee: u128, - request_timeout: Duration, + send_tx_request_timeout: Duration, ) -> anyhow::Result { - let blob_wallet = None; let aws_client = AwsClient::new(AwsConfig::for_testing(key.url).await); let chain_state_contract = WebsocketClient::connect( url, address, - key.id, - blob_wallet, + eth::KmsKeys { + main_key_arn: key.id, + blob_pool_key_arn: None, + }, 5, aws_client, - tx_max_fee, - request_timeout, + eth::TxConfig { + tx_max_fee, + send_tx_request_timeout, + }, ) .await?; diff --git a/helm/fuel-block-committer/templates/config.yaml b/helm/fuel-block-committer/templates/config.yaml deleted file mode 100644 index d8ccd7a3..00000000 --- a/helm/fuel-block-committer/templates/config.yaml +++ /dev/null @@ -1,6 +0,0 @@ -apiVersion: v1 -kind: ConfigMap -metadata: - name: {{ include "fuel-block-committer.fullname" . }}-config -data: - committer-config: "" diff --git a/helm/fuel-block-committer/templates/serviceaccount.yaml b/helm/fuel-block-committer/templates/serviceaccount.yaml deleted file mode 100644 index 5b0a04a3..00000000 --- a/helm/fuel-block-committer/templates/serviceaccount.yaml +++ /dev/null @@ -1,12 +0,0 @@ -{{- if .Values.serviceAccount.create -}} -apiVersion: v1 -kind: ServiceAccount -metadata: - name: {{ include "fuel-block-committer.serviceAccountName" . }} - labels: - {{- include "fuel-block-committer.labels" . | nindent 4 }} - {{- with .Values.serviceAccount.annotations }} - annotations: - {{- toYaml . | nindent 4 }} - {{- end }} -{{- end }} diff --git a/packages/eth/Cargo.toml b/packages/eth/Cargo.toml index 0f95f6f6..1913ad0c 100644 --- a/packages/eth/Cargo.toml +++ b/packages/eth/Cargo.toml @@ -37,16 +37,17 @@ url = { workspace = true } [dev-dependencies] alloy = { workspace = true, features = [ - "signer-local", - "node-bindings" -]} + "signer-local", + "node-bindings", + "json-rpc", +] } pretty_assertions = { workspace = true, features = ["default"] } mockall = { workspace = true } ports = { workspace = true, features = ["l1", "test-helpers"] } rand = { workspace = true, features = ["small_rng"] } test-case = { workspace = true } tokio = { workspace = true, features = ["macros"] } -rayon = { workspace = true } +proptest = { workspace = true, features = ["default"] } [features] test-helpers = [] diff --git a/packages/eth/src/blob_encoding/copied_from_alloy.rs b/packages/eth/src/blob_encoding/copied_from_alloy.rs index ccfcf3fe..24acacdc 100644 --- a/packages/eth/src/blob_encoding/copied_from_alloy.rs +++ b/packages/eth/src/blob_encoding/copied_from_alloy.rs @@ -15,6 +15,7 @@ pub struct PartialSidecar { blobs: Vec, /// The number of field elements that we have ingested, total. fe: usize, + amount_last_ingested: usize, } impl Default for PartialSidecar { @@ -33,12 +34,21 @@ impl PartialSidecar { Self::with_capacity(2) } + pub fn unused_bytes_in_last_fe(&self) -> usize { + // The first byte is always "used" by the coder because it always leaves it empty. + 31usize.saturating_sub(self.amount_last_ingested) + } + /// Create a new builder, preallocating room for `capacity` blobs, and push /// an empty blob to it. pub fn with_capacity(capacity: usize) -> Self { let mut blobs = Vec::with_capacity(capacity); blobs.push(Blob::new([0u8; BYTES_PER_BLOB])); - Self { blobs, fe: 0 } + Self { + blobs, + fe: 0, + amount_last_ingested: 0, + } } /// Get the number of unused field elements that have been allocated @@ -92,6 +102,7 @@ impl PartialSidecar { /// If the data is >=32 bytes. Or if there are not enough free FEs to /// encode the data. pub fn ingest_partial_fe(&mut self, data: &[u8]) { + self.amount_last_ingested = data.len(); let fe = self.next_unused_fe_mut(); fe[1..1 + data.len()].copy_from_slice(data); self.fe += 1; @@ -284,6 +295,10 @@ impl SidecarBuilder { } } + pub fn unused_bytes_in_last_fe(&self) -> usize { + self.inner.unused_bytes_in_last_fe() + } + /// Calculate the length of bytes used by field elements in the builder. /// /// This is always strictly greater than the number of bytes that have been diff --git a/packages/eth/src/blob_encoding/encoder.rs b/packages/eth/src/blob_encoding/encoder.rs index f6e77c01..e73e9aac 100644 --- a/packages/eth/src/blob_encoding/encoder.rs +++ b/packages/eth/src/blob_encoding/encoder.rs @@ -37,19 +37,14 @@ impl Eip4844BlobEncoder { impl ports::l1::FragmentEncoder for Eip4844BlobEncoder { fn encode(&self, data: NonEmpty) -> ports::l1::Result> { - let builder = SidecarBuilder::::from_coder_and_data( - SimpleCoder::default(), - &Vec::from(data), - ); - - let single_blobs = - split_sidecar(builder).map_err(|e| ports::l1::Error::Other(e.to_string()))?; - - Ok(single_blobs + let fragments = encode_into_blobs(data) + .map_err(|e| ports::l1::Error::Other(e.to_string()))? .into_iter() - .map(|blob| blob.encode()) + .map(|blob| blob.as_fragment()) .collect_nonempty() - .expect("cannot be empty")) + .expect("cannot be empty"); + + Ok(fragments) } fn gas_usage(&self, num_bytes: NonZeroUsize) -> u64 { @@ -107,7 +102,7 @@ impl SingleBlob { }) } - fn encode(&self) -> Fragment { + fn as_fragment(&self) -> Fragment { let mut bytes = Vec::with_capacity(Self::SIZE); bytes.extend_from_slice(self.blobs.as_slice()); bytes.extend_from_slice(self.commitment.as_ref()); @@ -122,10 +117,17 @@ impl SingleBlob { } } -fn split_sidecar(builder: SidecarBuilder) -> crate::error::Result> { - let num_bytes = u32::try_from(builder.len()).map_err(|_| { - crate::error::Error::Other("cannot handle more than u32::MAX bytes".to_string()) - })?; +fn encode_into_blobs(data: NonEmpty) -> crate::error::Result> { + let builder = SidecarBuilder::::from_coder_and_data( + SimpleCoder::default(), + &Vec::from(data), + ); + let total_used_bytes = u32::try_from(builder.len()) + .map_err(|_| { + crate::error::Error::Other("cannot handle more than u32::MAX bytes".to_string()) + })? + .saturating_sub(builder.unused_bytes_in_last_fe() as u32); + let sidecar = builder .build() .map_err(|e| crate::error::Error::Other(e.to_string()))?; @@ -138,8 +140,12 @@ fn split_sidecar(builder: SidecarBuilder) -> crate::error::Result NonEmpty { + if amount == 0 { + panic!("cannot create empty data"); + } else { + let mut rng = SmallRng::from_seed([0; 32]); + let mut data = vec![0; amount]; + rng.fill(&mut data[..]); + NonEmpty::collect(data).unwrap() + } + } + #[test] fn roundtrip_split_encode_decode_merge() { - let mut random_data = vec![0; 110_000]; - let mut rng = rand::rngs::SmallRng::from_seed([0; 32]); - rng.fill_bytes(&mut random_data); - - let builder = SidecarBuilder::from_coder_and_data(SimpleCoder::default(), &random_data); + let random_data = non_empty_rand_data(110_000); - let single_blobs = split_sidecar(builder.clone()).unwrap(); + let single_blobs = encode_into_blobs(random_data.clone()).unwrap(); let merged_sidecar = merge_into_sidecar(single_blobs); - assert_eq!(merged_sidecar, builder.build().unwrap()); let should_be_original_data = SimpleCoder::default() .decode_all(&merged_sidecar.blobs) .unwrap() .into_iter() .flatten() - .collect_vec(); + .collect_nonempty() + .unwrap(); assert_eq!(should_be_original_data, random_data); } - #[test] - fn shows_unused_bytes() { - let mut random_data = vec![0; 1000]; - let mut rng = rand::rngs::SmallRng::from_seed([0; 32]); - rng.fill_bytes(&mut random_data); - - let sidecar = SidecarBuilder::from_coder_and_data(SimpleCoder::default(), &random_data); - - let single_blobs = split_sidecar(sidecar.clone()).unwrap(); - - assert_eq!(single_blobs.len(), 1); - assert_eq!(single_blobs[0].unused_bytes, 129984); - } - - #[test] - fn alloy_blob_encoding_issue_regression() { - let test = |amount| { + use proptest::prelude::*; + proptest::proptest! { + // // You maybe want to make this limit bigger when changing the code + #![proptest_config(ProptestConfig { cases: 10, .. ProptestConfig::default() })] + #[test] + fn alloy_blob_encoding_issue_regression(amount in 1..=DATA_GAS_PER_BLOB*20) { // given let encoder = Eip4844BlobEncoder; let mut rng = SmallRng::from_seed([0; 32]); - let mut data = vec![0; amount]; + let mut data = vec![0; amount as usize]; rng.fill_bytes(&mut data[..]); // when @@ -307,6 +310,7 @@ mod tests { let sidecar = Eip4844BlobEncoder::decode(fragments).unwrap(); let mut builder = SidecarBuilder::::new(); + // TODO: ingest more at once for byte in &data { builder.ingest(std::slice::from_ref(byte)); } @@ -320,24 +324,42 @@ mod tests { .flatten() .collect_vec(); - if data != decoded_data { - Err(crate::error::Error::Other(format!( - "data mismatch for {amount} B" - ))) - } else { - Ok(amount) + prop_assert_eq!(data, decoded_data); + } + } + proptest::proptest! { + // // You maybe want to make this limit bigger when changing the code + #![proptest_config(ProptestConfig { cases: 10, .. ProptestConfig::default() })] + #[test] + fn shows_unused_bytes(num_bytes in 1..=DATA_GAS_PER_BLOB*6) { + // given + let mut data = non_empty_rand_data(num_bytes as usize); + // because of our assertion of zeroes at end + *data.last_mut() = 1; + + + // when + let single_blobs = encode_into_blobs(data).unwrap(); + + // then + let num_blobs = single_blobs.len(); + + for blob in single_blobs.iter().take(num_blobs - 1) { + prop_assert_eq!(blob.unused_bytes, 0); } - }; - let failure = (126_000..2_000_000) - .step_by(50_000) - .collect_vec() - .into_par_iter() - .map(test) - .find_any(|ret| ret.is_err()); + let last_blob = single_blobs.last(); + let unused_bytes = last_blob.unused_bytes as usize; + + // a hacky way to validate but good enough when input data is random + let zeroes_at_end = last_blob + .blobs + .iter() + .rev() + .take_while(|byte| **byte == 0) + .count(); - if let Some(Err(amount)) = failure { - panic!("Alloy blob issue found for {amount} B"); + prop_assert_eq!(zeroes_at_end, unused_bytes); } } } diff --git a/packages/eth/src/error.rs b/packages/eth/src/error.rs index 6efc7b6b..a953505e 100644 --- a/packages/eth/src/error.rs +++ b/packages/eth/src/error.rs @@ -7,13 +7,20 @@ use alloy::{ pub enum Error { #[error("network error: {0}")] Network(String), + #[error("network error: {0}")] + TxExecution(String), #[error("other error: {0}")] Other(String), } impl From> for Error { fn from(err: RpcError) -> Self { - Self::Network(err.to_string()) + match err { + RpcError::ErrorResp(err) if err.code >= -32613 && err.code <= -32000 => { + Self::TxExecution(err.message) + } + _ => Self::Network(err.to_string()), + } } } @@ -44,7 +51,50 @@ impl From for ports::l1::Error { fn from(err: Error) -> Self { match err { Error::Network(err) => Self::Network(err), - Error::Other(err) => Self::Other(err), + Error::Other(err) | Error::TxExecution(err) => Self::Other(err), + } + } +} + +#[cfg(test)] +mod tests { + use alloy::rpc::json_rpc::ErrorPayload; + + use super::*; + + #[test] + fn correctly_detects_tx_execution_error() { + for code in 32_000..=32613 { + let err = RpcError::ErrorResp(ErrorPayload { + code: -code, + message: "some message".to_owned(), + data: None, + }); + + let our_error = crate::error::Error::from(err); + let Error::TxExecution(msg) = our_error else { + panic!("Expected TxExecution got: {}", our_error) + }; + + assert!(msg.contains("some message")); + } + } + + #[test] + fn rest_of_the_error_range_is_classified_as_network_caused() { + for code in [31_999, 32614] { + let err = RpcError::ErrorResp(ErrorPayload { + code: -code, + message: "some message".to_owned(), + data: None, + }); + + let our_error = crate::error::Error::from(err); + let Error::Network(msg) = our_error else { + panic!("Expected Network got: {}", our_error) + }; + + assert!(msg.contains("some message")); } } } diff --git a/packages/eth/src/lib.rs b/packages/eth/src/lib.rs index b6a3502d..80700ef4 100644 --- a/packages/eth/src/lib.rs +++ b/packages/eth/src/lib.rs @@ -14,7 +14,7 @@ mod websocket; pub use alloy::primitives::Address; pub use aws::*; -pub use websocket::WebsocketClient; +pub use websocket::{KmsKeys, TxConfig, WebsocketClient}; impl Contract for WebsocketClient { delegate! { diff --git a/packages/eth/src/websocket.rs b/packages/eth/src/websocket.rs index a1d9fc21..a7e886c3 100644 --- a/packages/eth/src/websocket.rs +++ b/packages/eth/src/websocket.rs @@ -24,24 +24,34 @@ pub struct WebsocketClient { contract_caller_address: Address, } +#[derive(Debug, Clone)] +pub struct KmsKeys { + pub main_key_arn: String, + pub blob_pool_key_arn: Option, +} + +#[derive(Debug, Clone)] +pub struct TxConfig { + pub tx_max_fee: u128, + pub send_tx_request_timeout: Duration, +} + impl WebsocketClient { pub async fn connect( url: Url, contract_address: Address, - main_key_arn: String, - blob_pool_key_arn: Option, + keys: KmsKeys, unhealthy_after_n_errors: usize, aws_client: AwsClient, - tx_max_fee: u128, - send_tx_request_timeout: Duration, + tx_config: TxConfig, ) -> ports::l1::Result { - let blob_signer = if let Some(key_arn) = blob_pool_key_arn { + let blob_signer = if let Some(key_arn) = keys.blob_pool_key_arn { Some(aws_client.make_signer(key_arn).await?) } else { None }; - let main_signer = aws_client.make_signer(main_key_arn).await?; + let main_signer = aws_client.make_signer(keys.main_key_arn).await?; let blob_poster_address = blob_signer.as_ref().map(|signer| signer.address()); let contract_caller_address = main_signer.address(); @@ -51,8 +61,8 @@ impl WebsocketClient { contract_address, main_signer, blob_signer, - tx_max_fee, - send_tx_request_timeout, + tx_config.tx_max_fee, + tx_config.send_tx_request_timeout, ) .await?; diff --git a/packages/eth/src/websocket/connection.rs b/packages/eth/src/websocket/connection.rs index a88d4835..fe7e79a5 100644 --- a/packages/eth/src/websocket/connection.rs +++ b/packages/eth/src/websocket/connection.rs @@ -125,13 +125,11 @@ impl WsConnection { } fn get_max_fee(tx: &L1Tx, gas_limit: u128, num_fragments: usize) -> u128 { - let max_fee = tx.max_fee.saturating_mul(gas_limit).saturating_add( + tx.max_fee.saturating_mul(gas_limit).saturating_add( tx.blob_fee .saturating_mul(num_fragments as u128) .saturating_mul(DATA_GAS_PER_BLOB as u128), - ); - - max_fee + ) } } @@ -443,8 +441,6 @@ impl WsConnection { #[cfg(test)] mod tests { - use std::u128; - use alloy::{node_bindings::Anvil, signers::local::PrivateKeySigner}; use ports::l1::FragmentEncoder; diff --git a/packages/ports/src/ports/storage.rs b/packages/ports/src/ports/storage.rs index 7bfde9df..ab9d7cfa 100644 --- a/packages/ports/src/ports/storage.rs +++ b/packages/ports/src/ports/storage.rs @@ -189,6 +189,7 @@ pub trait Storage: Send + Sync { async fn get_pending_txs(&self) -> Result>; async fn get_latest_pending_txs(&self) -> Result>; async fn has_pending_txs(&self) -> Result; + async fn has_nonfinalized_txs(&self) -> Result; async fn oldest_nonfinalized_fragments( &self, starting_height: u32, @@ -196,7 +197,11 @@ pub trait Storage: Send + Sync { ) -> Result>; async fn fragments_submitted_by_tx(&self, tx_hash: [u8; 32]) -> Result>; async fn last_time_a_fragment_was_finalized(&self) -> Result>>; - async fn update_tx_state(&self, hash: [u8; 32], state: TransactionState) -> Result<()>; + async fn batch_update_tx_states( + &self, + selective_changes: Vec<([u8; 32], TransactionState)>, + noncewide_changes: Vec<([u8; 32], u32, TransactionState)>, + ) -> Result<()>; } impl Storage for Arc { @@ -231,6 +236,7 @@ impl Storage for Arc { async fn get_pending_txs(&self) -> Result>; async fn get_latest_pending_txs(&self) -> Result>; async fn has_pending_txs(&self) -> Result; + async fn has_nonfinalized_txs(&self) -> Result; async fn oldest_nonfinalized_fragments( &self, starting_height: u32, @@ -238,7 +244,11 @@ impl Storage for Arc { ) -> Result>; async fn fragments_submitted_by_tx(&self, tx_hash: [u8; 32]) -> Result>; async fn last_time_a_fragment_was_finalized(&self) -> Result>>; - async fn update_tx_state(&self, hash: [u8; 32], state: TransactionState) -> Result<()>; + async fn batch_update_tx_states( + &self, + selective_changes: Vec<([u8; 32], TransactionState)>, + noncewide_changes: Vec<([u8; 32], u32, TransactionState)>, + ) -> Result<()>; } } } @@ -275,6 +285,7 @@ impl Storage for &T { async fn get_pending_txs(&self) -> Result>; async fn get_latest_pending_txs(&self) -> Result>; async fn has_pending_txs(&self) -> Result; + async fn has_nonfinalized_txs(&self) -> Result; async fn oldest_nonfinalized_fragments( &self, starting_height: u32, @@ -282,7 +293,11 @@ impl Storage for &T { ) -> Result>; async fn fragments_submitted_by_tx(&self, tx_hash: [u8; 32]) -> Result>; async fn last_time_a_fragment_was_finalized(&self) -> Result>>; - async fn update_tx_state(&self, hash: [u8; 32], state: TransactionState) -> Result<()>; + async fn batch_update_tx_states( + &self, + selective_changes: Vec<([u8; 32], TransactionState)>, + noncewide_changes: Vec<([u8; 32], u32, TransactionState)>, + ) -> Result<()>; } } } diff --git a/packages/services/src/block_committer.rs b/packages/services/src/block_committer.rs index f5a9fdda..0af9ec2e 100644 --- a/packages/services/src/block_committer.rs +++ b/packages/services/src/block_committer.rs @@ -1,9 +1,5 @@ use std::num::NonZeroU32; -use metrics::{ - prometheus::{core::Collector, IntGauge, Opts}, - RegistersMetrics, -}; use ports::{ fuel::FuelBlock, storage::Storage, @@ -236,7 +232,7 @@ mod tests { use rand::{rngs::StdRng, Rng, SeedableRng}; use storage::{DbWithProcess, PostgresProcess}; - use crate::{test_utils::mocks::l1::FullL1Mock, BlockValidator}; + use crate::BlockValidator; use super::*; diff --git a/packages/services/src/lib.rs b/packages/services/src/lib.rs index f7b99a90..87fe0175 100644 --- a/packages/services/src/lib.rs +++ b/packages/services/src/lib.rs @@ -105,6 +105,7 @@ pub(crate) mod test_utils { use clock::TestClock; use eth::Eip4844BlobEncoder; use fuel_crypto::SecretKey; + use metrics::prometheus::IntGauge; use mocks::l1::TxStatus; use ports::{ storage::Storage, @@ -547,10 +548,16 @@ pub(crate) mod test_utils { let l1_mock = mocks::l1::txs_finished(0, 0, [(tx, TxStatus::Success)]); - StateListener::new(l1_mock, self.db(), 0, clock.clone()) - .run() - .await - .unwrap(); + StateListener::new( + l1_mock, + self.db(), + 0, + clock.clone(), + IntGauge::new("test", "test").unwrap(), + ) + .run() + .await + .unwrap(); } pub async fn insert_fragments(&self, height: u32, amount: usize) -> Vec { diff --git a/packages/services/src/state_committer.rs b/packages/services/src/state_committer.rs index d8be22cf..6db03476 100644 --- a/packages/services/src/state_committer.rs +++ b/packages/services/src/state_committer.rs @@ -133,7 +133,7 @@ where } } - async fn get_pending_transaction(&self) -> Result> { + async fn latest_pending_transaction(&self) -> Result> { let tx = self.storage.get_latest_pending_txs().await?; Ok(tx) } @@ -206,7 +206,11 @@ where } } - async fn resubmit_fragments_if_stalled(&self, previous_tx: L1Tx) -> Result<()> { + async fn resubmit_fragments_if_stalled(&self) -> Result<()> { + let Some(previous_tx) = self.latest_pending_transaction().await? else { + return Ok(()); + }; + let elapsed = self.elapsed_since_tx_submitted(&previous_tx)?; if elapsed >= self.config.gas_bump_timeout { @@ -232,9 +236,10 @@ where C: Clock + Send + Sync, { async fn run(&mut self) -> Result<()> { - match self.get_pending_transaction().await? { - Some(previous_tx) => self.resubmit_fragments_if_stalled(previous_tx).await?, - None => self.submit_fragments_if_ready().await?, + if self.storage.has_nonfinalized_txs().await? { + self.resubmit_fragments_if_stalled().await? + } else { + self.submit_fragments_if_ready().await? }; Ok(()) diff --git a/packages/services/src/state_listener.rs b/packages/services/src/state_listener.rs index abc0373b..79590040 100644 --- a/packages/services/src/state_listener.rs +++ b/packages/services/src/state_listener.rs @@ -1,3 +1,5 @@ +use std::collections::HashSet; + use metrics::{ prometheus::{core::Collector, IntGauge, Opts}, RegistersMetrics, @@ -20,12 +22,18 @@ pub struct StateListener { } impl StateListener { - pub fn new(l1_adapter: L1, storage: Db, num_blocks_to_finalize: u64, clock: C) -> Self { + pub fn new( + l1_adapter: L1, + storage: Db, + num_blocks_to_finalize: u64, + clock: C, + last_finalization_time_metric: IntGauge, + ) -> Self { Self { l1_adapter, storage, num_blocks_to_finalize, - metrics: Metrics::default(), + metrics: Metrics::new(last_finalization_time_metric), clock, } } @@ -40,7 +48,17 @@ where async fn check_non_finalized_txs(&mut self, non_finalized_txs: Vec) -> crate::Result<()> { let current_block_number: u64 = self.l1_adapter.get_block_number().await?.into(); + // we need to accumulate all the changes and then update the db atomically + // to avoid race conditions with other services + let mut skip_nonces = HashSet::new(); + let mut selective_change = vec![]; + let mut noncewide_changes = vec![]; + for tx in non_finalized_txs { + if skip_nonces.contains(&tx.nonce) { + continue; + } + // get response if tx is included in a block let Some(tx_response) = self.l1_adapter.get_transaction_response(tx.hash).await? else { // not included in block - check what happened to the tx @@ -48,9 +66,7 @@ where match (tx.state, self.l1_adapter.is_squeezed_out(tx.hash).await?) { (TransactionState::Pending, true) => { //not in the mempool anymore set it to failed - self.storage - .update_tx_state(tx.hash, TransactionState::Failed) - .await?; + selective_change.push((tx.hash, tx.nonce, TransactionState::Failed)); info!( "blob tx {} not found in mempool. Setting to failed", @@ -60,9 +76,7 @@ where (TransactionState::IncludedInBlock, false) => { // if tx was in block and reorg happened now it is in the mempool - we need to set the tx to pending - self.storage - .update_tx_state(tx.hash, TransactionState::Pending) - .await?; + selective_change.push((tx.hash, tx.nonce, TransactionState::Pending)); info!( "blob tx {} returned to mempool. Setting to pending", @@ -75,10 +89,11 @@ where continue; }; + skip_nonces.insert(tx.nonce); + if !tx_response.succeeded() { - self.storage - .update_tx_state(tx.hash, TransactionState::Failed) - .await?; + // set tx to failed all txs with the same nonce to failed + noncewide_changes.push((tx.hash, tx.nonce, TransactionState::Failed)); info!("failed blob tx {}", hex::encode(tx.hash)); continue; @@ -89,9 +104,8 @@ where { // tx included in block but is not yet finalized if tx.state == TransactionState::Pending { - self.storage - .update_tx_state(tx.hash, TransactionState::IncludedInBlock) - .await?; + // set tx to included and all txs with the same nonce to failed + noncewide_changes.push((tx.hash, tx.nonce, TransactionState::IncludedInBlock)); info!( "blob tx {} included in block {}", @@ -103,9 +117,11 @@ where continue; } - self.storage - .update_tx_state(tx.hash, TransactionState::Finalized(self.clock.now())) - .await?; + // st tx to finalized and all txs with the same nonce to failed + let now = self.clock.now(); + noncewide_changes.push((tx.hash, tx.nonce, TransactionState::Finalized(now))); + + self.metrics.last_finalization_time.set(now.timestamp()); info!("blob tx {} finalized", hex::encode(tx.hash)); @@ -114,6 +130,16 @@ where .set(i64::try_from(tx_response.block_number()).unwrap_or(i64::MAX)) } + selective_change.retain(|(_, nonce, _)| !skip_nonces.contains(nonce)); + let selective_change: Vec<_> = selective_change + .into_iter() + .map(|(hash, _, state)| (hash, state)) + .collect(); + + self.storage + .batch_update_tx_states(selective_change, noncewide_changes) + .await?; + Ok(()) } } @@ -140,16 +166,20 @@ where #[derive(Clone)] struct Metrics { last_eth_block_w_blob: IntGauge, + last_finalization_time: IntGauge, } impl RegistersMetrics for StateListener { fn metrics(&self) -> Vec> { - vec![Box::new(self.metrics.last_eth_block_w_blob.clone())] + vec![ + Box::new(self.metrics.last_eth_block_w_blob.clone()), + Box::new(self.metrics.last_finalization_time.clone()), + ] } } -impl Default for Metrics { - fn default() -> Self { +impl Metrics { + fn new(last_finalization_time: IntGauge) -> Self { let last_eth_block_w_blob = IntGauge::with_opts(Opts::new( "last_eth_block_w_blob", "The height of the latest Ethereum block used for state submission.", @@ -158,6 +188,7 @@ impl Default for Metrics { Self { last_eth_block_w_blob, + last_finalization_time, } } } @@ -194,8 +225,13 @@ mod tests { let test_clock = TestClock::default(); let now = test_clock.now(); - let mut listener = - StateListener::new(l1_mock, setup.db(), num_blocks_to_finalize, test_clock); + let mut listener = StateListener::new( + l1_mock, + setup.db(), + num_blocks_to_finalize, + test_clock, + IntGauge::new("test", "test").unwrap(), + ); // when listener.run().await.unwrap(); @@ -242,6 +278,7 @@ mod tests { setup.db(), num_blocks_to_finalize, TestClock::default(), + IntGauge::new("test", "test").unwrap(), ); // when @@ -289,6 +326,7 @@ mod tests { setup.db(), num_blocks_to_finalize, test_clock.clone(), + IntGauge::new("test", "test").unwrap(), ); { @@ -356,6 +394,7 @@ mod tests { setup.db(), num_blocks_to_finalize, test_clock.clone(), + IntGauge::new("test", "test").unwrap(), ); { @@ -417,6 +456,7 @@ mod tests { setup.db(), num_blocks_to_finalize, TestClock::default(), + IntGauge::new("test", "test").unwrap(), ); // when diff --git a/packages/storage/src/lib.rs b/packages/storage/src/lib.rs index 0c99a0a4..b7de8261 100644 --- a/packages/storage/src/lib.rs +++ b/packages/storage/src/lib.rs @@ -126,8 +126,18 @@ impl Storage for Postgres { Ok(self._has_pending_txs().await?) } - async fn update_tx_state(&self, hash: [u8; 32], state: TransactionState) -> Result<()> { - Ok(self._update_tx_state(hash, state).await?) + async fn has_nonfinalized_txs(&self) -> Result { + Ok(self._has_nonfinalized_txs().await?) + } + + async fn batch_update_tx_states( + &self, + selective_changes: Vec<([u8; 32], TransactionState)>, + noncewide_changes: Vec<([u8; 32], u32, TransactionState)>, + ) -> Result<()> { + Ok(self + ._batch_update_tx_states(selective_changes, noncewide_changes) + .await?) } } @@ -376,18 +386,21 @@ mod tests { let storage = start_db().await; let fragment_ids = ensure_some_fragments_exists_in_the_db(&storage).await; - let hash = rand::random::<[u8; 32]>(); let tx = L1Tx { - hash, + hash: rand::random::<[u8; 32]>(), ..Default::default() }; + let hash = tx.hash; + let nonce = tx.nonce; + storage.record_pending_tx(tx, fragment_ids).await.unwrap(); let finalization_time = Utc::now(); // when + let changes = vec![(hash, nonce, TransactionState::Finalized(finalization_time))]; storage - .update_tx_state(hash, TransactionState::Finalized(finalization_time)) + .batch_update_tx_states(vec![], changes) .await .unwrap(); diff --git a/packages/storage/src/mappings/tables.rs b/packages/storage/src/mappings/tables.rs index 5441fd4b..41ba0ba2 100644 --- a/packages/storage/src/mappings/tables.rs +++ b/packages/storage/src/mappings/tables.rs @@ -75,7 +75,6 @@ pub struct L1FuelBlockSubmissionTx { pub finalized_at: Option>, } -// TODO: dry this up impl L1FuelBlockSubmissionTx { pub fn parse_state(&self) -> Result { match (self.state, self.finalized_at) { diff --git a/packages/storage/src/postgres.rs b/packages/storage/src/postgres.rs index 198a9143..de45a366 100644 --- a/packages/storage/src/postgres.rs +++ b/packages/storage/src/postgres.rs @@ -20,7 +20,6 @@ use crate::mappings::tables::{self, L1TxState}; #[derive(Debug, Clone)] struct Metrics { height_of_latest_commitment: IntGauge, - seconds_since_last_finalized_fragment: IntGauge, lowest_unbundled_height: IntGauge, } @@ -32,12 +31,6 @@ impl Default for Metrics { ) .expect("height_of_latest_commitment gauge to be correctly configured"); - let seconds_since_last_finalized_fragment = IntGauge::new( - "seconds_since_last_finalized_fragment", - "The number of seconds since the last finalized fragment", - ) - .expect("seconds_since_last_finalized_fragment gauge to be correctly configured"); - let lowest_unbundled_height = IntGauge::new( "lowest_unbundled_height", "The height of the lowest block unbundled block", @@ -46,7 +39,6 @@ impl Default for Metrics { Self { height_of_latest_commitment, - seconds_since_last_finalized_fragment, lowest_unbundled_height, } } @@ -62,7 +54,6 @@ impl RegistersMetrics for Postgres { fn metrics(&self) -> Vec> { vec![ Box::new(self.metrics.height_of_latest_commitment.clone()), - Box::new(self.metrics.seconds_since_last_finalized_fragment.clone()), Box::new(self.metrics.lowest_unbundled_height.clone()), ] } @@ -271,17 +262,38 @@ impl Postgres { let limit: i64 = limit.try_into().unwrap_or(i64::MAX); let fragments = sqlx::query_as!( tables::BundleFragment, - r#" - SELECT f.* - FROM l1_fragments f - LEFT JOIN l1_transaction_fragments tf ON tf.fragment_id = f.id - LEFT JOIN l1_blob_transaction t ON t.id = tf.transaction_id - JOIN bundles b ON b.id = f.bundle_id - WHERE (t.id IS NULL OR t.state = $1) - AND b.end_height >= $2 -- Exclude bundles ending before starting_height - ORDER BY b.start_height ASC, f.idx ASC - LIMIT $3; - "#, + r#"SELECT + sub.id, + sub.idx, + sub.bundle_id, + sub.data, + sub.unused_bytes, + sub.total_bytes + FROM ( + SELECT DISTINCT ON (f.id) + f.*, + b.start_height + FROM l1_fragments f + JOIN bundles b ON b.id = f.bundle_id + WHERE + b.end_height >= $2 + AND NOT EXISTS ( + SELECT 1 + FROM l1_transaction_fragments tf + JOIN l1_blob_transaction t ON t.id = tf.transaction_id + WHERE tf.fragment_id = f.id + AND t.state <> $1 + ) + ORDER BY + f.id, + b.start_height ASC, + f.idx ASC + ) AS sub + ORDER BY + sub.start_height ASC, + sub.idx ASC + LIMIT $3; +"#, i16::from(L1TxState::Failed), i64::from(starting_height), limit @@ -430,15 +442,6 @@ impl Postgres { .await? .and_then(|response| response.last_fragment_time); - if let Some(last_fragment_time) = response { - let now = Utc::now(); - let seconds_since_last_finalized_fragment = - now.signed_duration_since(last_fragment_time).num_seconds(); - self.metrics - .seconds_since_last_finalized_fragment - .set(seconds_since_last_finalized_fragment); - } - Ok(response) } @@ -546,6 +549,17 @@ impl Postgres { .has_pending_transactions.unwrap_or(false)) } + pub(crate) async fn _has_nonfinalized_txs(&self) -> Result { + Ok(sqlx::query!( + "SELECT EXISTS (SELECT 1 FROM l1_blob_transaction WHERE state = $1 OR state = $2) AS has_nonfinalized_transactions;", + i16::from(L1TxState::Pending), + i16::from(L1TxState::IncludedInBlock) + ) + .fetch_one(&self.connection_pool) + .await? + .has_nonfinalized_transactions.unwrap_or(false)) + } + pub(crate) async fn _get_non_finalized_txs(&self) -> Result> { sqlx::query_as!( tables::L1Tx, @@ -608,6 +622,68 @@ impl Postgres { Ok(()) } + pub(crate) async fn _batch_update_tx_states( + &self, + selective_changes: Vec<([u8; 32], TransactionState)>, + noncewide_changes: Vec<([u8; 32], u32, TransactionState)>, + ) -> Result<()> { + let mut tx = self.connection_pool.begin().await?; + + for change in selective_changes { + let hash = change.0; + let state = change.1; + + let finalized_at = match &state { + TransactionState::Finalized(date_time) => Some(*date_time), + _ => None, + }; + let state = i16::from(L1TxState::from(&state)); + + sqlx::query!( + "UPDATE l1_blob_transaction SET state = $1, finalized_at = $2 WHERE hash = $3", + state, + finalized_at, + hash.as_slice(), + ) + .execute(&mut *tx) + .await?; + } + + for change in noncewide_changes { + let hash = change.0; + let nonce = change.1; + let state = change.2; + + let finalized_at = match &state { + TransactionState::Finalized(date_time) => Some(*date_time), + _ => None, + }; + let state = i16::from(L1TxState::from(&state)); + + sqlx::query!( + "UPDATE l1_blob_transaction SET state = $1, finalized_at = $2 WHERE nonce = $3", + i16::from(L1TxState::Failed), + Option::>::None, + i64::from(nonce), + ) + .execute(&mut *tx) + .await?; + + sqlx::query!( + "UPDATE l1_blob_transaction SET state = $1, finalized_at = $2 WHERE hash = $3", + state, + finalized_at, + hash.as_slice(), + ) + .execute(&mut *tx) + .await?; + } + + tx.commit().await?; + + Ok(()) + } + pub(crate) async fn _insert_bundle_and_fragments( &self, block_range: RangeInclusive, diff --git a/packages/storage/src/test_instance.rs b/packages/storage/src/test_instance.rs index 8339dcbd..69dad5ea 100644 --- a/packages/storage/src/test_instance.rs +++ b/packages/storage/src/test_instance.rs @@ -196,6 +196,7 @@ impl Storage for DbWithProcess { async fn get_pending_txs(&self) -> ports::storage::Result>; async fn get_latest_pending_txs(&self) -> ports::storage::Result>; async fn has_pending_txs(&self) -> ports::storage::Result; + async fn has_nonfinalized_txs(&self) -> ports::storage::Result; async fn oldest_nonfinalized_fragments( &self, starting_height: u32, @@ -203,7 +204,11 @@ impl Storage for DbWithProcess { ) -> ports::storage::Result>; async fn fragments_submitted_by_tx(&self, tx_hash: [u8; 32]) -> ports::storage::Result>; async fn last_time_a_fragment_was_finalized(&self) -> ports::storage::Result>>; - async fn update_tx_state(&self, hash: [u8; 32], state: TransactionState) -> ports::storage::Result<()>; + async fn batch_update_tx_states( + &self, + selective_changes: Vec<([u8; 32], TransactionState)>, + noncewide_changes: Vec<([u8; 32], u32, TransactionState)>, + ) -> ports::storage::Result<()>; } } } diff --git a/run_tests.sh b/run_tests.sh index 6f236515..3280743e 100755 --- a/run_tests.sh +++ b/run_tests.sh @@ -9,4 +9,4 @@ cargo test --manifest-path "$workspace_cargo_manifest" --workspace --exclude e2e # So that we may have a binary in `target/release` cargo build --release --manifest-path "$workspace_cargo_manifest" --bin fuel-block-committer -PATH="$script_location/target/release:$PATH" cargo test --manifest-path "$workspace_cargo_manifest" --package e2e --jobs 1 +PATH="$script_location/target/release:$PATH" cargo test --manifest-path "$workspace_cargo_manifest" --package e2e -- --test-threads=1