Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Txpool v2 features for futures connections with other modules #2216

Draft
wants to merge 18 commits into
base: txpool-v2-api
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
- [2189](https://github.com/FuelLabs/fuel-core/pull/2151): Select next DA height to never include more than u16::MAX -1 transactions from L1.
- [2162](https://github.com/FuelLabs/fuel-core/pull/2162): Pool structure with dependencies, etc.. for the next transaction pool module.
- [2193](https://github.com/FuelLabs/fuel-core/pull/2193): Insertion in PoolV2 and tests refactoring

- [2216](https://github.com/FuelLabs/fuel-core/pull/2216): Add more function to the state and task of TxPoolV2 to handle the future interactions with others modules (PoA, BlockProducer, BlockImporter and P2P)

### Changed

Expand Down
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions crates/services/txpool_v2/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,15 @@ petgraph = "0.6.5"
rayon = { workspace = true }
tokio = { workspace = true, default-features = false, features = ["sync"] }
tokio-rayon = { workspace = true }
tokio-stream = { workspace = true }
tracing = { workspace = true }

[dev-dependencies]
mockall = { workspace = true }
proptest = { workspace = true }
test-strategy = { workspace = true }


[features]
test-helpers = [
"dep:mockall",
Expand Down
16 changes: 8 additions & 8 deletions crates/services/txpool_v2/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,14 +119,14 @@ impl BlackList {
pub struct Config {
/// Enable UTXO validation (will check if UTXO exists in the database and has correct data).
pub utxo_validation: bool,
/// Maximum block size in bytes.
pub max_block_size: u64,
/// Maximum gas per block.
pub max_block_gas: u64,
/// Maximum of subscriptions to listen to updates of a transaction.
pub max_tx_update_subscriptions: usize,
/// Maximum transactions per dependencies chain.
pub max_txs_chain_count: usize,
/// Pool limits
pub pool_limits: PoolLimits,
/// Interval for checking the time to live of transactions.
pub ttl_check_interval: Duration,
/// Maximum transaction time to live.
pub max_txs_ttl: Duration,
/// Heavy async processing configuration.
Expand Down Expand Up @@ -158,16 +158,16 @@ impl Default for Config {
fn default() -> Self {
Self {
utxo_validation: true,
max_block_gas: 100000000,
max_block_size: 1000000000,
max_tx_update_subscriptions: 1000,
max_txs_chain_count: 1000,
ttl_check_interval: Duration::from_secs(60),
max_txs_ttl: Duration::from_secs(60 * 10),
black_list: BlackList::default(),
pool_limits: PoolLimits {
max_txs: 10000,
max_gas: 100_000_000_000,
max_bytes_size: 10_000_000_000,
},
max_txs_ttl: Duration::from_secs(60 * 10),
black_list: BlackList::default(),
heavy_work: HeavyWorkConfig {
number_threads_verif_insert_transactions: 4,
number_pending_tasks_threads_verif_insert_transactions: 100,
Expand Down
5 changes: 4 additions & 1 deletion crates/services/txpool_v2/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@ mod pool;
mod ports;
mod selection_algorithms;
mod service;
mod shared_state;
mod storage;
mod tx_status_stream;
mod update_sender;
mod verifications;

type GasPrice = Word;
Expand All @@ -25,5 +28,5 @@ use fuel_core_types::fuel_asm::Word;
pub use service::{
new_service,
Service,
SharedState,
};
pub use shared_state::SharedState;
86 changes: 62 additions & 24 deletions crates/services/txpool_v2/src/pool.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,20 @@
use std::collections::HashMap;
use std::{
collections::HashMap,
sync::Arc,
};

use fuel_core_types::{
fuel_tx::{
consensus_parameters::gas,
field::BlobId,
Transaction,
TxId,
},
fuel_vm::checked_transaction::Checked,
services::txpool::PoolTransaction,
services::txpool::{
ArcPoolTx,
PoolTransaction,
},
};
use num_rational::Ratio;
use tracing::instrument;
Expand Down Expand Up @@ -87,7 +94,7 @@ where
/// Each result is a list of transactions that were removed from the pool
/// because of the insertion of the new transaction.
#[instrument(skip(self))]
pub fn insert(&mut self, tx: PoolTransaction) -> Result<Vec<PoolTransaction>, Error> {
pub fn insert(&mut self, tx: ArcPoolTx) -> Result<RemovedTransactions, Error> {
let latest_view = self
.persistent_storage_provider
.latest_view()
Expand Down Expand Up @@ -169,41 +176,72 @@ where
/// based on the constraints given in the configuration and the selection algorithm used.
pub fn extract_transactions_for_block(
&mut self,
) -> Result<Vec<PoolTransaction>, Error> {
max_gas: u64,
) -> Result<Vec<ArcPoolTx>, Error> {
self.selection_algorithm
.gather_best_txs(
Constraints {
max_gas: self.config.max_block_gas,
},
&self.storage,
)?
.gather_best_txs(Constraints { max_gas }, &self.storage)?
.into_iter()
.map(|storage_id| {
let storage_data = self
let transaction = self
.storage
.remove_transaction_without_dependencies(storage_id)?;
self.collision_manager
.on_removed_transaction(&storage_data.transaction)?;
.on_removed_transaction(&transaction)?;
self.selection_algorithm
.on_removed_transaction(&storage_data.transaction)?;
self.tx_id_to_storage_id
.remove(&storage_data.transaction.id());
Ok(storage_data.transaction)
.on_removed_transaction(&transaction)?;
self.tx_id_to_storage_id.remove(&transaction.id());
Ok(transaction)
})
.collect()
}

/// Prune transactions from the pool.
pub fn prune(&mut self) -> Result<Vec<PoolTransaction>, Error> {
Ok(vec![])
}

pub fn find_one(&self, tx_id: &TxId) -> Option<&PoolTransaction> {
pub fn find_one(&self, tx_id: &TxId) -> Option<ArcPoolTx> {
Storage::get(&self.storage, self.tx_id_to_storage_id.get(tx_id)?)
.map(|data| &data.transaction)
.map(|data| data.transaction.clone())
.ok()
}

pub fn contains(&self, tx_id: &TxId) -> bool {
self.tx_id_to_storage_id.contains_key(tx_id)
}

pub fn iter_tx_ids(&self) -> impl Iterator<Item = &TxId> {
self.tx_id_to_storage_id.keys()
}

/// Remove transaction but keep its dependents.
/// The dependents become executables.
pub fn remove_transaction(&mut self, tx_ids: Vec<TxId>) -> Result<(), Error> {
for tx_id in tx_ids {
if let Some(storage_id) = self.tx_id_to_storage_id.remove(&tx_id) {
let dependents = self.storage.get_dependents(storage_id)?.collect();
let transaction = self
.storage
.remove_transaction_without_dependencies(storage_id)?;
self.selection_algorithm
.new_executable_transactions(dependents, &self.storage)?;
self.update_components_and_caches_on_removal(&[transaction])?;
}
}
Ok(())
}

/// Remove transaction and its dependents.
pub fn remove_transaction_and_dependents(
&mut self,
tx_ids: Vec<TxId>,
) -> Result<(), Error> {
for tx_id in tx_ids {
if let Some(storage_id) = self.tx_id_to_storage_id.remove(&tx_id) {
let removed = self
.storage
.remove_transaction_and_dependents_subtree(storage_id)?;
self.update_components_and_caches_on_removal(&removed)?;
}
}
Ok(())
}

/// Check if the pool has enough space to store a transaction.
/// It will try to see if we can free some space depending on defined rules
/// If the pool is not full, it will return an empty list
Expand Down Expand Up @@ -307,7 +345,7 @@ where

fn update_components_and_caches_on_removal(
&mut self,
removed_transactions: &Vec<PoolTransaction>,
removed_transactions: &[ArcPoolTx],
) -> Result<(), Error> {
for tx in removed_transactions {
self.collision_manager.on_removed_transaction(tx)?;
Expand Down
49 changes: 49 additions & 0 deletions crates/services/txpool_v2/src/ports.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::sync::Arc;

use fuel_core_services::stream::BoxStream;
use fuel_core_storage::Result as StorageResult;
use fuel_core_types::{
blockchain::header::ConsensusParametersVersion,
Expand All @@ -12,10 +13,21 @@ use fuel_core_types::{
Bytes32,
ConsensusParameters,
ContractId,
Transaction,
TxId,
UtxoId,
},
fuel_types::Nonce,
fuel_vm::interpreter::Memory,
services::{
block_importer::SharedImportResult,
p2p::{
GossipsubMessageAcceptance,
GossipsubMessageInfo,
NetworkData,
PeerId,
},
},
tai64::Tai64,
};

Expand All @@ -26,6 +38,11 @@ use crate::{

pub use fuel_core_storage::transactional::AtomicView;

pub trait BlockImporter: Send + Sync {
/// Wait until the next block is available
fn block_events(&self) -> BoxStream<SharedImportResult>;
}

/// Trait for getting the latest consensus parameters.
#[cfg_attr(feature = "test-helpers", mockall::automock)]
pub trait ConsensusParametersProvider {
Expand Down Expand Up @@ -85,3 +102,35 @@ pub trait WasmChecker {
wasm_root: &Bytes32,
) -> Result<(), WasmValidityError>;
}

#[async_trait::async_trait]
pub trait P2P: Send + Sync {
type GossipedTransaction: NetworkData<Transaction>;

// Gossip broadcast a transaction inserted via API.
fn broadcast_transaction(&self, transaction: Arc<Transaction>) -> anyhow::Result<()>;

/// Creates a stream that is filled with the peer_id when they subscribe to
/// our transactions gossip.
fn subscribe_new_peers(&self) -> BoxStream<PeerId>;

/// Creates a stream of next transactions gossiped from the network.
fn gossiped_transaction_events(&self) -> BoxStream<Self::GossipedTransaction>;

// Report the validity of a transaction received from the network.
fn notify_gossip_transaction_validity(
&self,
message_info: GossipsubMessageInfo,
validity: GossipsubMessageAcceptance,
) -> anyhow::Result<()>;

// Asks the network to gather all tx ids of a specific peer
async fn request_tx_ids(&self, peer_id: PeerId) -> anyhow::Result<Vec<TxId>>;

// Asks the network to gather specific transactions from a specific peer
async fn request_txs(
&self,
peer_id: PeerId,
tx_ids: Vec<TxId>,
) -> anyhow::Result<Vec<Option<Transaction>>>;
}
Loading
Loading