Skip to content

Commit

Permalink
continue rpc node implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
vgantchev committed Aug 3, 2023
1 parent ab92f8d commit 1e677d7
Show file tree
Hide file tree
Showing 6 changed files with 475 additions and 130 deletions.
106 changes: 106 additions & 0 deletions node/src/cfg new partial.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
#[allow(clippy::type_complexity)]
pub fn new_partial<RuntimeApi, BIQ>(
config: &Configuration,
build_import_queue: BIQ,
) -> Result<
PartialComponents<
FullClient<RuntimeApi>,
FullBackend,
(),
sc_consensus::DefaultImportQueue<Block, FullClient<RuntimeApi>>,
sc_transaction_pool::FullPool<Block, FullClient<RuntimeApi>>,
(
ParachainBlockImport<RuntimeApi>,
Option<Telemetry>,
Option<TelemetryWorkerHandle>,
),
>,
sc_service::Error,
>
where
RuntimeApi: ConstructRuntimeApi<Block, FullClient<RuntimeApi>> + Send + Sync + 'static,
RuntimeApi::RuntimeApi: sp_transaction_pool::runtime_api::TaggedTransactionQueue<Block>
+ sp_api::Metadata<Block>
+ sp_session::SessionKeys<Block>
+ sp_api::ApiExt<Block, StateBackend = sc_client_api::StateBackendFor<FullBackend, Block>>
+ sp_offchain::OffchainWorkerApi<Block>
+ sp_block_builder::BlockBuilder<Block>,
sc_client_api::StateBackendFor<FullBackend, Block>: sp_api::StateBackend<BlakeTwo256>,
BIQ: FnOnce(
Arc<FullClient<RuntimeApi>>,
ParachainBlockImport<RuntimeApi>,
&Configuration,
Option<TelemetryHandle>,
&TaskManager,
) -> Result<
sc_consensus::DefaultImportQueue<Block, FullClient<RuntimeApi>>,
sc_service::Error,
>,
{
let telemetry = config
.telemetry_endpoints
.clone()
.filter(|x| !x.is_empty())
.map(|endpoints| -> Result<_, sc_telemetry::Error> {
let worker = TelemetryWorker::new(16)?;
let telemetry = worker.handle().new_telemetry(endpoints);
Ok((worker, telemetry))
})
.transpose()?;

let executor = sc_executor::WasmExecutor::<HostFunctions>::new(
config.wasm_method,
config.default_heap_pages,
config.max_runtime_instances,
None,
config.runtime_cache_size,
);

let (client, backend, keystore_container, task_manager) =
sc_service::new_full_parts::<Block, RuntimeApi, _>(
config,
telemetry.as_ref().map(|(_, telemetry)| telemetry.handle()),
executor,
)?;
let client = Arc::new(client);

let telemetry_worker_handle = telemetry.as_ref().map(|(worker, _)| worker.handle());

let telemetry = telemetry.map(|(worker, telemetry)| {
task_manager
.spawn_handle()
.spawn("telemetry", None, worker.run());
telemetry
});

let transaction_pool = sc_transaction_pool::BasicPool::new_full(
config.transaction_pool.clone(),
config.role.is_authority().into(),
config.prometheus_registry(),
task_manager.spawn_essential_handle(),
client.clone(),
);

let block_import = ParachainBlockImport::new(client.clone(), backend.clone());

let import_queue = build_import_queue(
client.clone(),
block_import.clone(),
config,
telemetry.as_ref().map(|telemetry| telemetry.handle()),
&task_manager,
)?;

let params = PartialComponents {
backend,
client,
import_queue,
keystore_container,
task_manager,
transaction_pool,
select_chain: (),
other: (block_import, telemetry, telemetry_worker_handle),
};

Ok(params)
}
172 changes: 172 additions & 0 deletions node/src/cfg start node impl.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
cfg start node impl

async fn start_node_impl<RuntimeApi, RB, BIQ, BIC>(
parachain_config: Configuration,
polkadot_config: Configuration,
collator_options: CollatorOptions,
id: ParaId,
rpc_ext_builder: RB,
build_import_queue: BIQ,
build_consensus: BIC,
) -> sc_service::error::Result<(TaskManager, Arc<FullClient<RuntimeApi>>)>
where
RuntimeApi: ConstructRuntimeApi<Block, FullClient<RuntimeApi>> + Send + Sync + 'static,
RuntimeApi::RuntimeApi: sp_transaction_pool::runtime_api::TaggedTransactionQueue<Block>
+ sp_api::Metadata<Block>
+ sp_session::SessionKeys<Block>
+ sp_api::ApiExt<Block, StateBackend = sc_client_api::StateBackendFor<FullBackend, Block>>
+ sp_offchain::OffchainWorkerApi<Block>
+ sp_block_builder::BlockBuilder<Block>
+ cumulus_primitives_core::CollectCollationInfo<Block>,
sc_client_api::StateBackendFor<FullBackend, Block>: sp_api::StateBackend<BlakeTwo256>,
RB: Fn(
Arc<FullClient<RuntimeApi>>,
Arc<sc_transaction_pool::FullPool<Block, FullClient<RuntimeApi>>>,
DenyUnsafe,
) -> Result<rpc::RpcExtension, sc_service::Error>
+ 'static,
BIQ: FnOnce(
Arc<FullClient<RuntimeApi>>,
ParachainBlockImport<RuntimeApi>,
&Configuration,
Option<TelemetryHandle>,
&TaskManager,
) -> Result<
sc_consensus::DefaultImportQueue<Block, FullClient<RuntimeApi>>,
sc_service::Error,
>,
BIC: FnOnce(
Arc<FullClient<RuntimeApi>>,
ParachainBlockImport<RuntimeApi>,
Option<&Registry>,
Option<TelemetryHandle>,
&TaskManager,
Arc<dyn RelayChainInterface>,
Arc<sc_transaction_pool::FullPool<Block, FullClient<RuntimeApi>>>,
Arc<NetworkService<Block, Hash>>,
SyncCryptoStorePtr,
bool,
) -> Result<Box<dyn ParachainConsensus<Block>>, sc_service::Error>,
{
let parachain_config = prepare_node_config(parachain_config);

let params = new_partial::<RuntimeApi, BIQ>(&parachain_config, build_import_queue)?;
let (block_import, mut telemetry, telemetry_worker_handle) = params.other;

let client = params.client.clone();
let backend = params.backend.clone();
let mut task_manager = params.task_manager;

let (relay_chain_interface, collator_key) = build_relay_chain_interface(
polkadot_config,
&parachain_config,
telemetry_worker_handle,
&mut task_manager,
collator_options.clone(),
None,
)
.await
.map_err(|e| match e {
RelayChainError::ServiceError(polkadot_service::Error::Sub(x)) => x,
s => s.to_string().into(),
})?;
let block_announce_validator = BlockAnnounceValidator::new(relay_chain_interface.clone(), id);

let force_authoring = parachain_config.force_authoring;
let validator = parachain_config.role.is_authority();
let prometheus_registry = parachain_config.prometheus_registry().cloned();
let transaction_pool = params.transaction_pool.clone();
let import_queue_service = params.import_queue.service();
let (network, system_rpc_tx, tx_handler_controller, start_network) =
sc_service::build_network(sc_service::BuildNetworkParams {
config: &parachain_config,
client: client.clone(),
transaction_pool: transaction_pool.clone(),
spawn_handle: task_manager.spawn_handle(),
import_queue: params.import_queue,
block_announce_validator_builder: Some(Box::new(|_| {
Box::new(block_announce_validator)
})),
warp_sync: None,
})?;

let rpc_client = client.clone();
let pool = transaction_pool.clone();
let rpc_builder = { move |deny, _| rpc_ext_builder(rpc_client.clone(), pool.clone(), deny) };

sc_service::spawn_tasks(sc_service::SpawnTasksParams {
rpc_builder: Box::new(rpc_builder),
client: client.clone(),
transaction_pool: transaction_pool.clone(),
task_manager: &mut task_manager,
config: parachain_config,
keystore: params.keystore_container.sync_keystore(),
backend: backend.clone(),
network: network.clone(),
system_rpc_tx,
tx_handler_controller,
telemetry: telemetry.as_mut(),
})?;

let announce_block = {
let network = network.clone();
Arc::new(move |hash, data| network.announce_block(hash, data))
};

let relay_chain_slot_duration = Duration::from_secs(6);

let _overseer_handle = relay_chain_interface
.overseer_handle()
.map_err(|e| sc_service::Error::Application(Box::new(e)))?;

if validator {
let parachain_consensus = build_consensus(
client.clone(),
block_import,
prometheus_registry.as_ref(),
telemetry.as_ref().map(|t| t.handle()),
&task_manager,
relay_chain_interface.clone(),
transaction_pool,
network,
params.keystore_container.sync_keystore(),
force_authoring,
)?;

let spawner = task_manager.spawn_handle();

let params = StartCollatorParams {
para_id: id,
block_status: client.clone(),
announce_block,
client: client.clone(),
task_manager: &mut task_manager,
relay_chain_interface,
spawner,
parachain_consensus,
import_queue: import_queue_service,
collator_key: collator_key.ok_or_else(|| {
sc_service::error::Error::Other("Collator Key is None".to_string())
})?,
relay_chain_slot_duration,
};

start_collator(params).await?;
} else {
let params = StartFullNodeParams {
client: client.clone(),
announce_block,
task_manager: &mut task_manager,
para_id: id,
relay_chain_interface,
relay_chain_slot_duration,
import_queue: import_queue_service,
};

start_full_node(params)?;
}

start_network.start_network();

Ok((task_manager, client))
}
3 changes: 2 additions & 1 deletion node/src/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
// limitations under the License.

use crate::cli::{Cli, RelayChainCli, Subcommand};
use crate::service::{evm::new_partial, HydraDXExecutorDispatch};
use crate::service::{new_partial, HydraDXExecutorDispatch};
use crate::{chain_spec, service};

use codec::Encode;
Expand Down Expand Up @@ -345,6 +345,7 @@ pub fn run() -> sc_cli::Result<()> {
if config.role.is_authority() { "yes" } else { "no" }
);

/// evmTODO ethereum_config
crate::service::start_node(config, polkadot_config, collator_options, id)
.await
.map(|r| r.1)
Expand Down
43 changes: 35 additions & 8 deletions node/src/evm/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ use std::{
time::Duration,
};

use cumulus_client_consensus_common::{ParachainBlockImportMarker};
use crate::rpc::{RuntimeApiStorageOverride, SchemaV1Override, SchemaV2Override, SchemaV3Override, StorageOverride};
use crate::service::FullClient;
use cumulus_client_consensus_common::ParachainBlockImportMarker;
use fc_consensus::FrontierBlockImport;
use fc_db::Backend as FrontierBackend;
use fc_mapping_sync::{MappingSyncWorker, SyncStrategy};
Expand All @@ -38,17 +40,44 @@ use futures::{future, StreamExt};
use hydradx_runtime::Block;
use polkadot_cli::Cli;
use sc_cli::SubstrateCli;
use sc_client_api::{backend::AuxStore, BlockOf, BlockchainEvents, StateBackend, StorageProvider};
use sc_client_api::{backend::AuxStore, Backend, BlockOf, BlockchainEvents, StateBackend, StorageProvider};
use sc_consensus::{BlockCheckParams, BlockImport as BlockImportT, BlockImportParams, ImportResult};
use sc_service::{BasePath, Configuration, TFullBackend, TaskManager};
use sp_api::{ConstructRuntimeApi, ProvideRuntimeApi};
use sp_block_builder::BlockBuilder as BlockBuilderApi;
use sp_blockchain::{Backend, Error as BlockchainError, well_known_cache_keys::Id as CacheKeyId, HeaderBackend, HeaderMetadata};
use sp_blockchain::{well_known_cache_keys::Id as CacheKeyId, Error as BlockchainError, HeaderBackend, HeaderMetadata};
use sp_consensus::Error as ConsensusError;
use sp_core::H256;
use sp_runtime::traits::{BlakeTwo256, Block as BlockT};
use crate::service::FullClient;
use crate::rpc::{RuntimeApiStorageOverride, SchemaV1Override, SchemaV2Override, SchemaV3Override, StorageOverride};

/// The ethereum-compatibility configuration used to run a node.
/// evmTODO: revise settings, these are by Centrifuge
#[derive(Clone, Copy, Debug, clap::Parser)]
pub struct EthereumConfig {
/// Maximum number of logs in a query.
#[clap(long, default_value = "10000")]
pub max_past_logs: u32,

/// Maximum fee history cache size.
#[clap(long, default_value = "2048")]
pub fee_history_limit: u64,

#[clap(long)]
pub enable_dev_signer: bool,

/// Maximum allowed gas limit will be `block.gas_limit *
/// execute_gas_limit_multiplier` when using eth_call/eth_estimateGas.
#[clap(long, default_value = "10")]
pub execute_gas_limit_multiplier: u64,

/// Size in bytes of the LRU cache for block data.
#[clap(long, default_value = "50")]
pub eth_log_block_cache: usize,

/// Size in bytes of the LRU cache for transactions statuses data.
#[clap(long, default_value = "50")]
pub eth_statuses_cache: usize,
}

pub type Hash = sp_core::H256;

Expand Down Expand Up @@ -168,9 +197,7 @@ where
C: ProvideRuntimeApi<B> + StorageProvider<B, BE> + AuxStore,
C: HeaderBackend<B> + HeaderMetadata<B, Error = BlockchainError>,
C: Send + Sync + 'static,
C::Api: sp_api::ApiExt<B>
+ fp_rpc::EthereumRuntimeRPCApi<B>
+ fp_rpc::ConvertTransactionRuntimeApi<B>,
C::Api: sp_api::ApiExt<B> + fp_rpc::EthereumRuntimeRPCApi<B> + fp_rpc::ConvertTransactionRuntimeApi<B>,
BE: Backend<B> + 'static,
BE::State: StateBackend<BlakeTwo256>,
{
Expand Down
10 changes: 5 additions & 5 deletions node/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@

use std::sync::Arc;

use hydradx_runtime::{opaque::Block, AccountId, Balance, Index};
use fc_db::Backend as FrontierBackend;
pub use fc_rpc::{
EthBlockDataCacheTask, OverrideHandle, RuntimeApiStorageOverride, SchemaV1Override,
SchemaV2Override, SchemaV3Override, StorageOverride,
EthBlockDataCacheTask, OverrideHandle, RuntimeApiStorageOverride, SchemaV1Override, SchemaV2Override,
SchemaV3Override, StorageOverride,
};
pub use fc_rpc_core::types::{FeeHistoryCache, FeeHistoryCacheLimit, FilterPool};
use fp_rpc::{ConvertTransaction, ConvertTransactionRuntimeApi, EthereumRuntimeRPCApi};
use hydradx_runtime::{opaque::Block, AccountId, Balance, Index};
use jsonrpsee::RpcModule;
use sc_client_api::{
backend::{Backend, StateBackend, StorageProvider},
Expand Down Expand Up @@ -140,8 +140,8 @@ where
CT: ConvertTransaction<<B as BlockT>::Extrinsic> + Send + Sync + 'static,
{
use fc_rpc::{
Eth, EthApiServer, EthDevSigner, EthFilter, EthFilterApiServer, EthPubSub,
EthPubSubApiServer, EthSigner, Net, NetApiServer, Web3, Web3ApiServer,
Eth, EthApiServer, EthDevSigner, EthFilter, EthFilterApiServer, EthPubSub, EthPubSubApiServer, EthSigner, Net,
NetApiServer, Web3, Web3ApiServer,
};

let Deps {
Expand Down
Loading

0 comments on commit 1e677d7

Please sign in to comment.