diff --git a/rpc-server/src/modules/blocks/methods.rs b/rpc-server/src/modules/blocks/methods.rs index 0f74ec9b..b6ee3beb 100644 --- a/rpc-server/src/modules/blocks/methods.rs +++ b/rpc-server/src/modules/blocks/methods.rs @@ -143,7 +143,7 @@ async fn block_call( mut block_request: near_jsonrpc::primitives::types::blocks::RpcBlockRequest, ) -> Result { tracing::debug!("`block` called with parameters: {:?}", block_request); - let result = fetch_block(&data, block_request.block_reference.clone(), "block").await; + let result = fetch_block(&data, &block_request.block_reference, "block").await; #[cfg(feature = "shadow_data_consistency")] { @@ -180,7 +180,7 @@ async fn changes_in_block_call( { let cache_block = fetch_block_from_cache_or_get( &data, - params.block_reference.clone(), + ¶ms.block_reference, "EXPERIMENTAL_changes_in_block", ) .await @@ -213,13 +213,10 @@ async fn changes_in_block_by_type_call( data: Data, mut params: near_jsonrpc::primitives::types::changes::RpcStateChangesInBlockByTypeRequest, ) -> Result { - let cache_block = fetch_block_from_cache_or_get( - &data, - params.block_reference.clone(), - "EXPERIMENTAL_changes", - ) - .await - .map_err(near_jsonrpc::primitives::errors::RpcError::from)?; + let cache_block = + fetch_block_from_cache_or_get(&data, ¶ms.block_reference, "EXPERIMENTAL_changes") + .await + .map_err(near_jsonrpc::primitives::errors::RpcError::from)?; let result = fetch_changes_in_block_by_type( &data, cache_block, @@ -251,7 +248,7 @@ async fn changes_in_block_by_type_call( #[cfg_attr(feature = "tracing-instrumentation", tracing::instrument(skip(data)))] pub async fn fetch_block( data: &Data, - block_reference: near_primitives::types::BlockReference, + block_reference: &near_primitives::types::BlockReference, method_name: &str, ) -> Result< near_jsonrpc::primitives::types::blocks::RpcBlockResponse, @@ -260,11 +257,11 @@ pub async fn fetch_block( tracing::debug!("`fetch_block` call"); let block_height = match block_reference { near_primitives::types::BlockReference::BlockId(block_id) => match block_id { - near_primitives::types::BlockId::Height(block_height) => Ok(block_height), + near_primitives::types::BlockId::Height(block_height) => Ok(*block_height), near_primitives::types::BlockId::Hash(block_hash) => { match data .db_manager - .get_block_by_hash(block_hash, method_name) + .get_block_by_hash(*block_hash, method_name) .await { Ok(block_height) => Ok(block_height), diff --git a/rpc-server/src/modules/blocks/utils.rs b/rpc-server/src/modules/blocks/utils.rs index 4c99d113..b4ec01e0 100644 --- a/rpc-server/src/modules/blocks/utils.rs +++ b/rpc-server/src/modules/blocks/utils.rs @@ -78,16 +78,16 @@ pub async fn fetch_chunk_from_s3( #[cfg_attr(feature = "tracing-instrumentation", tracing::instrument(skip(data)))] pub async fn fetch_block_from_cache_or_get( data: &jsonrpc_v2::Data, - block_reference: near_primitives::types::BlockReference, + block_reference: &near_primitives::types::BlockReference, method_name: &str, ) -> Result { - let block = match block_reference.clone() { + let block = match block_reference { near_primitives::types::BlockReference::BlockId(block_id) => { let block_height = match block_id { - near_primitives::types::BlockId::Height(block_height) => block_height, + near_primitives::types::BlockId::Height(block_height) => *block_height, near_primitives::types::BlockId::Hash(hash) => data .db_manager - .get_block_by_hash(hash, method_name) + .get_block_by_hash(*hash, method_name) .await .map_err(|err| { near_jsonrpc::primitives::types::blocks::RpcBlockError::UnknownBlock { diff --git a/rpc-server/src/modules/gas/methods.rs b/rpc-server/src/modules/gas/methods.rs index 2ce4b46a..a2799cf4 100644 --- a/rpc-server/src/modules/gas/methods.rs +++ b/rpc-server/src/modules/gas/methods.rs @@ -62,7 +62,7 @@ async fn gas_price_call( data: &Data, block_reference: near_primitives::types::BlockReference, ) -> Result { - let block = fetch_block_from_cache_or_get(data, block_reference, "gas_price") + let block = fetch_block_from_cache_or_get(data, &block_reference, "gas_price") .await .map_err(|err| { near_jsonrpc::primitives::types::gas_price::RpcGasPriceError::UnknownBlock { diff --git a/rpc-server/src/modules/network/methods.rs b/rpc-server/src/modules/network/methods.rs index 41b112e7..89c63ede 100644 --- a/rpc-server/src/modules/network/methods.rs +++ b/rpc-server/src/modules/network/methods.rs @@ -154,9 +154,12 @@ pub async fn validators_ordered( if let Some(block_id) = &request.block_id { let block_reference = near_primitives::types::BlockReference::from(block_id.clone()); - if let Ok(block) = - fetch_block_from_cache_or_get(&data, block_reference, "EXPERIMENTAL_validators_ordered") - .await + if let Ok(block) = fetch_block_from_cache_or_get( + &data, + &block_reference, + "EXPERIMENTAL_validators_ordered", + ) + .await { let final_block = data.blocks_info_by_finality.final_cache_block().await; // `expected_earliest_available_block` calculated by formula: @@ -237,7 +240,7 @@ async fn validators_call( })?, near_primitives::types::EpochReference::BlockId(block_id) => { let block_reference = near_primitives::types::BlockReference::BlockId(block_id.clone()); - let block = fetch_block_from_cache_or_get(data, block_reference, "validators") + let block = fetch_block_from_cache_or_get(data, &block_reference, "validators") .await .map_err(|_err| { near_jsonrpc::primitives::types::validator::RpcValidatorError::UnknownEpoch @@ -263,7 +266,7 @@ async fn protocol_config_call( near_jsonrpc::primitives::types::config::RpcProtocolConfigError, > { let block = - fetch_block_from_cache_or_get(data, block_reference, "EXPERIMENTAL_protocol_config") + fetch_block_from_cache_or_get(data, &block_reference, "EXPERIMENTAL_protocol_config") .await .map_err(|err| { near_jsonrpc::primitives::types::config::RpcProtocolConfigError::UnknownBlock { diff --git a/rpc-server/src/modules/queries/contract_runner/code_storage.rs b/rpc-server/src/modules/queries/contract_runner/code_storage.rs new file mode 100644 index 00000000..ecd86267 --- /dev/null +++ b/rpc-server/src/modules/queries/contract_runner/code_storage.rs @@ -0,0 +1,295 @@ +use std::collections::HashMap; + +use crate::modules::queries::utils::get_state_key_value_from_db; +use database::ReaderDbManager; +use futures::executor::block_on; + +pub type Result = ::std::result::Result; + +pub struct CodeStorage { + db_manager: std::sync::Arc>, + account_id: near_primitives::types::AccountId, + block_height: near_primitives::types::BlockHeight, + validators: HashMap, + data_count: u64, + is_optimistic: bool, + optimistic_data: + HashMap>, +} + +pub struct StorageValuePtr { + value: Vec, +} + +impl near_vm_runner::logic::ValuePtr for StorageValuePtr { + fn len(&self) -> u32 { + self.value.len() as u32 + } + + fn deref(&self) -> Result> { + Ok(self.value.clone()) + } +} + +impl CodeStorage { + pub fn init( + db_manager: std::sync::Arc>, + account_id: near_primitives::types::AccountId, + block_height: near_primitives::types::BlockHeight, + validators: HashMap, + optimistic_data: HashMap< + readnode_primitives::StateKey, + Option, + >, + ) -> Self { + Self { + db_manager, + account_id, + block_height, + validators, + data_count: Default::default(), // TODO: Using for generate_data_id + is_optimistic: !optimistic_data.is_empty(), + optimistic_data, + } + } + + fn get_state_key_data(&self, key: &[u8]) -> readnode_primitives::StateValue { + let get_db_data = get_state_key_value_from_db( + &self.db_manager, + &self.account_id, + self.block_height, + key.to_vec(), + "query_call_function", + ); + let (_, data) = block_on(get_db_data); + data + } + + fn optimistic_storage_get( + &self, + key: &[u8], + ) -> Result>> { + if let Some(value) = self.optimistic_data.get(key) { + Ok(value.as_ref().map(|data| { + Box::new(StorageValuePtr { + value: data.clone(), + }) as Box<_> + })) + } else { + self.database_storage_get(key) + } + } + + fn database_storage_get( + &self, + key: &[u8], + ) -> Result>> { + let data = self.get_state_key_data(key); + Ok(if !data.is_empty() { + Some(Box::new(StorageValuePtr { value: data }) as Box<_>) + } else { + None + }) + } + + fn optimistic_storage_has_key(&mut self, key: &[u8]) -> Result { + if let Some(value) = self.optimistic_data.get(key) { + Ok(value.is_some()) + } else { + self.database_storage_has_key(key) + } + } + + fn database_storage_has_key(&mut self, key: &[u8]) -> Result { + Ok(!self.get_state_key_data(key).is_empty()) + } +} + +impl near_vm_runner::logic::External for CodeStorage { + #[cfg_attr(feature = "tracing-instrumentation", tracing::instrument(skip(self)))] + fn storage_set(&mut self, _key: &[u8], _value: &[u8]) -> Result<()> { + Err(near_vm_runner::logic::VMLogicError::HostError( + near_vm_runner::logic::HostError::ProhibitedInView { + method_name: String::from("storage_set"), + }, + )) + } + + #[cfg_attr( + feature = "tracing-instrumentation", + tracing::instrument(skip(self, _mode)) + )] + fn storage_get( + &self, + key: &[u8], + _mode: near_vm_runner::logic::StorageGetMode, + ) -> Result>> { + if self.is_optimistic { + self.optimistic_storage_get(key) + } else { + self.database_storage_get(key) + } + } + + #[cfg_attr(feature = "tracing-instrumentation", tracing::instrument(skip(self)))] + fn storage_remove(&mut self, _key: &[u8]) -> Result<()> { + Err(near_vm_runner::logic::VMLogicError::HostError( + near_vm_runner::logic::HostError::ProhibitedInView { + method_name: String::from("storage_remove"), + }, + )) + } + + #[cfg_attr(feature = "tracing-instrumentation", tracing::instrument(skip(self)))] + fn storage_remove_subtree(&mut self, _prefix: &[u8]) -> Result<()> { + Err(near_vm_runner::logic::VMLogicError::HostError( + near_vm_runner::logic::HostError::ProhibitedInView { + method_name: String::from("storage_remove_subtree"), + }, + )) + } + + #[cfg_attr( + feature = "tracing-instrumentation", + tracing::instrument(skip(self, _mode)) + )] + fn storage_has_key( + &mut self, + key: &[u8], + _mode: near_vm_runner::logic::StorageGetMode, + ) -> Result { + if self.is_optimistic { + self.optimistic_storage_has_key(key) + } else { + self.database_storage_has_key(key) + } + } + + #[cfg_attr(feature = "tracing-instrumentation", tracing::instrument(skip(self)))] + fn generate_data_id(&mut self) -> near_primitives::hash::CryptoHash { + // TODO: Should be improvement in future + // Generates some hash for the data ID to receive data. + // This hash should not be functionality + let data_id = near_primitives::hash::hash(&self.data_count.to_le_bytes()); + self.data_count += 1; + data_id + } + + #[cfg_attr(feature = "tracing-instrumentation", tracing::instrument(skip(self)))] + fn get_trie_nodes_count(&self) -> near_vm_runner::logic::TrieNodesCount { + near_vm_runner::logic::TrieNodesCount { + db_reads: 0, + mem_reads: 0, + } + } + + #[cfg_attr(feature = "tracing-instrumentation", tracing::instrument(skip(self)))] + fn validator_stake( + &self, + account_id: &near_primitives::types::AccountId, + ) -> Result> { + Ok(self.validators.get(account_id).cloned()) + } + + #[cfg_attr(feature = "tracing-instrumentation", tracing::instrument(skip(self)))] + fn validator_total_stake(&self) -> Result { + Ok(self.validators.values().sum()) + } + + fn create_receipt( + &mut self, + _receipt_indices: Vec, + _receiver_id: near_primitives::types::AccountId, + ) -> Result { + Err(near_vm_runner::logic::VMLogicError::HostError( + near_vm_runner::logic::HostError::ProhibitedInView { + method_name: String::from("create_receipt"), + }, + )) + } + + fn append_action_create_account( + &mut self, + _receipt_index: near_vm_runner::logic::types::ReceiptIndex, + ) -> Result<()> { + Ok(()) + } + + fn append_action_deploy_contract( + &mut self, + _receipt_index: near_vm_runner::logic::types::ReceiptIndex, + _code: Vec, + ) -> Result<()> { + Ok(()) + } + + fn append_action_function_call_weight( + &mut self, + _receipt_index: near_vm_runner::logic::types::ReceiptIndex, + _method_name: Vec, + _args: Vec, + _attached_deposit: near_primitives::types::Balance, + _prepaid_gas: near_primitives::types::Gas, + _gas_weight: near_primitives::types::GasWeight, + ) -> Result<()> { + Ok(()) + } + + fn append_action_transfer( + &mut self, + _receipt_index: near_vm_runner::logic::types::ReceiptIndex, + _deposit: near_primitives::types::Balance, + ) -> Result<()> { + Ok(()) + } + + fn append_action_stake( + &mut self, + _receipt_index: near_vm_runner::logic::types::ReceiptIndex, + _stake: near_primitives::types::Balance, + _public_key: near_crypto::PublicKey, + ) { + } + + fn append_action_add_key_with_full_access( + &mut self, + _receipt_index: near_vm_runner::logic::types::ReceiptIndex, + _public_key: near_crypto::PublicKey, + _nonce: near_primitives::types::Nonce, + ) { + } + + fn append_action_add_key_with_function_call( + &mut self, + _receipt_index: near_vm_runner::logic::types::ReceiptIndex, + _public_key: near_crypto::PublicKey, + _nonce: near_primitives::types::Nonce, + _allowance: Option, + _receiver_id: near_primitives::types::AccountId, + _method_names: Vec>, + ) -> Result<()> { + Ok(()) + } + + fn append_action_delete_key( + &mut self, + _receipt_index: near_vm_runner::logic::types::ReceiptIndex, + _public_key: near_crypto::PublicKey, + ) { + } + + fn append_action_delete_account( + &mut self, + _receipt_index: near_vm_runner::logic::types::ReceiptIndex, + _beneficiary_id: near_primitives::types::AccountId, + ) -> Result<()> { + Ok(()) + } + + fn get_receipt_receiver( + &self, + _receipt_index: near_vm_runner::logic::types::ReceiptIndex, + ) -> &near_primitives::types::AccountId { + panic!("Prohibited in view. `get_receipt_receiver`"); + } +} diff --git a/rpc-server/src/modules/queries/contract_runner/mod.rs b/rpc-server/src/modules/queries/contract_runner/mod.rs new file mode 100644 index 00000000..72e1178b --- /dev/null +++ b/rpc-server/src/modules/queries/contract_runner/mod.rs @@ -0,0 +1,242 @@ +use std::collections::HashMap; + +use near_vm_runner::internal::VMKindExt; +use near_vm_runner::ContractRuntimeCache; + +use crate::errors::FunctionCallError; +use crate::modules::blocks::BlocksInfoByFinality; + +use code_storage::CodeStorage; + +mod code_storage; + +pub struct RunContractResponse { + pub result: Vec, + pub logs: Vec, + pub block_height: near_primitives::types::BlockHeight, + pub block_hash: near_primitives::hash::CryptoHash, +} + +#[allow(clippy::too_many_arguments)] +#[cfg_attr( + feature = "tracing-instrumentation", + tracing::instrument(skip(db_manager, compiled_contract_code_cache, contract_code_cache)) +)] +pub async fn run_contract( + account_id: &near_primitives::types::AccountId, + method_name: &str, + args: &near_primitives::types::FunctionArgs, + db_manager: &std::sync::Arc>, + compiled_contract_code_cache: &std::sync::Arc, + contract_code_cache: &std::sync::Arc< + crate::cache::RwLockLruMemoryCache>, + >, + blocks_info_by_finality: &std::sync::Arc, + block: crate::modules::blocks::CacheBlock, + max_gas_burnt: near_primitives::types::Gas, + optimistic_data: HashMap< + readnode_primitives::StateKey, + Option, + >, +) -> Result { + let contract = db_manager + .get_account(account_id, block.block_height, "query_call_function") + .await + .map_err(|_| FunctionCallError::AccountDoesNotExist { + requested_account_id: account_id.clone(), + })?; + + let (epoch_height, validators) = + epoch_height_and_validators_with_balances(db_manager, blocks_info_by_finality, block) + .await?; + + // Prepare context for the VM run contract + let public_key = near_crypto::PublicKey::empty(near_crypto::KeyType::ED25519); + let random_seed = near_primitives::utils::create_random_seed( + block.latest_protocol_version, + near_primitives::hash::CryptoHash::default(), + block.state_root, + ); + let context = near_vm_runner::logic::VMContext { + current_account_id: account_id.clone(), + signer_account_id: account_id.clone(), + signer_account_pk: borsh::to_vec(&public_key).expect("Failed to serialize"), + predecessor_account_id: account_id.clone(), + input: args.to_vec(), + block_height: block.block_height, + block_timestamp: block.block_timestamp, + epoch_height, + account_balance: contract.data.amount(), + account_locked_balance: contract.data.locked(), + storage_usage: contract.data.storage_usage(), + attached_deposit: 0, + prepaid_gas: max_gas_burnt, + random_seed, + view_config: Some(near_primitives::config::ViewConfig { max_gas_burnt }), + output_data_receivers: vec![], + }; + + // Init runtime config for each protocol version + let store = near_parameters::RuntimeConfigStore::free(); + let config = store + .get_config(block.latest_protocol_version) + .wasm_config + .clone(); + let vm_config = near_parameters::vm::Config { + vm_kind: config.vm_kind.replace_with_wasmtime_if_unsupported(), + ..config + }; + let code_hash = contract.data.code_hash(); + + // Check if the contract code is already in the cache + let key = near_vm_runner::get_contract_cache_key(code_hash, &vm_config); + let contract_code = if compiled_contract_code_cache.has(&key).unwrap_or(false) { + None + } else { + Some(match contract_code_cache.get(&code_hash).await { + Some(code) => near_vm_runner::ContractCode::new(code, Some(code_hash)), + None => { + let code = db_manager + .get_contract_code(account_id, block.block_height, "query_call_function") + .await + .map_err(|_| FunctionCallError::InvalidAccountId { + requested_account_id: account_id.clone(), + })?; + contract_code_cache + .put(contract.data.code_hash(), code.data.clone()) + .await; + near_vm_runner::ContractCode::new(code.data, Some(contract.data.code_hash())) + } + }) + }; + + // Init an external scylla interface for the Runtime logic + let code_storage = CodeStorage::init( + db_manager.clone(), + account_id.clone(), + block.block_height, + validators, + optimistic_data, + ); + + // Execute the contract in the near VM + let result = run_code_in_vm_runner( + code_hash, + contract_code, + method_name.to_string(), + context, + code_storage, + vm_config, + compiled_contract_code_cache, + ) + .await + .map_err(|e| FunctionCallError::InternalError { + error_message: e.to_string(), + })?; + + if let Some(err) = result.aborted { + let message = format!("wasm execution failed with error: {:?}", err); + Err(FunctionCallError::VMError { + error_message: message, + }) + } else { + let logs = result.logs; + let result = match result.return_data { + near_vm_runner::logic::ReturnData::Value(buf) => buf, + near_vm_runner::logic::ReturnData::ReceiptIndex(_) + | near_vm_runner::logic::ReturnData::None => vec![], + }; + Ok(RunContractResponse { + result, + logs, + block_height: block.block_height, + block_hash: block.block_hash, + }) + } +} + +async fn epoch_height_and_validators_with_balances( + db_manager: &std::sync::Arc>, + blocks_info_by_finality: &std::sync::Arc, + block: crate::modules::blocks::CacheBlock, +) -> Result<(u64, HashMap), FunctionCallError> { + let (epoch_height, epoch_validators) = + if blocks_info_by_finality.final_cache_block().await.epoch_id == block.epoch_id { + let validators = blocks_info_by_finality.validators().await; + (validators.epoch_height, validators.current_validators) + } else { + let validators = db_manager + .get_validators_by_epoch_id(block.epoch_id, "query_call_function") + .await + .map_err(|_| FunctionCallError::InternalError { + error_message: "Failed to get epoch info".to_string(), + })?; + ( + validators.epoch_height, + validators.validators_info.current_validators, + ) + }; + Ok(( + epoch_height, + epoch_validators + .iter() + .map(|validator| (validator.account_id.clone(), validator.stake)) + .collect(), + )) +} + +#[allow(clippy::too_many_arguments)] +#[cfg_attr( + feature = "tracing-instrumentation", + tracing::instrument(skip(context, code_storage, contract_code, compiled_contract_code_cache)) +)] +async fn run_code_in_vm_runner( + code_hash: near_primitives::hash::CryptoHash, + contract_code: Option, + method_name: String, + context: near_vm_runner::logic::VMContext, + mut code_storage: CodeStorage, + vm_config: near_parameters::vm::Config, + compiled_contract_code_cache: &std::sync::Arc, +) -> Result { + let compiled_contract_code_cache_handle = compiled_contract_code_cache.handle(); + let span = tracing::debug_span!("run_code_in_vm_runner"); + + let results = tokio::task::spawn_blocking(move || { + let _entered = span.entered(); + let promise_results = vec![]; + let fees = near_parameters::RuntimeFeesConfig::free(); + + let runtime = vm_config + .vm_kind + .runtime(vm_config.clone()) + .expect("runtime has not been enabled at compile time"); + + if let Some(code) = &contract_code { + runtime + .precompile(code, &compiled_contract_code_cache_handle) + .expect("Compilation failed") + .expect("Cache failed"); + }; + + runtime.run( + code_hash, + None, + &method_name, + &mut code_storage, + &context, + &fees, + &promise_results, + Some(&compiled_contract_code_cache_handle), + ) + }) + .await; + match results { + Ok(result) => result, + Err(err) => Err( + near_vm_runner::logic::errors::VMRunnerError::WasmUnknownError { + debug_message: format!("Failed to run contract: {:?}", err), + }, + ), + } +} diff --git a/rpc-server/src/modules/queries/methods.rs b/rpc-server/src/modules/queries/methods.rs index 815b02c1..720bb88f 100644 --- a/rpc-server/src/modules/queries/methods.rs +++ b/rpc-server/src/modules/queries/methods.rs @@ -2,12 +2,14 @@ use crate::config::ServerContext; use crate::errors::RPCError; use crate::modules::blocks::utils::fetch_block_from_cache_or_get; use crate::modules::blocks::CacheBlock; -#[cfg(feature = "account_access_keys")] -use crate::modules::queries::utils::fetch_list_access_keys_from_db; -use crate::modules::queries::utils::{get_state_from_db, run_contract, RunContractResponse}; use jsonrpc_v2::{Data, Params}; use near_jsonrpc::RpcRequest; +use super::contract_runner; +#[cfg(feature = "account_access_keys")] +use super::utils::fetch_list_access_keys_from_db; +use super::utils::get_state_from_db; + /// `query` rpc method implementation /// calls proxy_rpc_call to get `query` from near-rpc if request parameters not supported by read-rpc /// as example: BlockReference for Finality::None is not supported by read-rpc when near_state_indexer is not running @@ -54,7 +56,7 @@ async fn query_call( ) -> Result { tracing::debug!("`query` call. Params: {:?}", query_request,); - let block = fetch_block_from_cache_or_get(data, query_request.block_reference.clone(), "query") + let block = fetch_block_from_cache_or_get(data, &query_request.block_reference, "query") .await .map_err(near_jsonrpc::primitives::errors::RpcError::from)?; let result = match &query_request.request { @@ -370,38 +372,17 @@ async fn function_call( is_optimistic, ); - let call_results = if is_optimistic { - optimistic_function_call(data, block, account_id, method_name, args).await + // Depending on the optimistic flag we need to run the contract with the optimistic + // state changes or not. + let maybe_optimistic_data = if is_optimistic { + data.blocks_info_by_finality + .optimistic_state_changes_in_block(account_id, &[]) + .await } else { - database_function_call(data, block, account_id, method_name, args).await + Default::default() }; - let call_results = - call_results.map_err(|err| err.to_rpc_query_error(block.block_height, block.block_hash))?; - Ok(near_jsonrpc::primitives::types::query::RpcQueryResponse { - kind: near_jsonrpc::primitives::types::query::QueryResponseKind::CallResult( - near_primitives::views::CallResult { - result: call_results.result, - logs: call_results.logs, - }, - ), - block_height: block.block_height, - block_hash: block.block_hash, - }) -} -#[cfg_attr(feature = "tracing-instrumentation", tracing::instrument(skip(data)))] -async fn optimistic_function_call( - data: &Data, - block: CacheBlock, - account_id: &near_primitives::types::AccountId, - method_name: &str, - args: &near_primitives::types::FunctionArgs, -) -> Result { - let optimistic_data = data - .blocks_info_by_finality - .optimistic_state_changes_in_block(account_id, &[]) - .await; - run_contract( + let call_results = contract_runner::run_contract( account_id, method_name, args, @@ -411,32 +392,22 @@ async fn optimistic_function_call( &data.blocks_info_by_finality, block, data.max_gas_burnt, - optimistic_data, // run contract with optimistic data + maybe_optimistic_data, ) - .await -} + .await; -#[cfg_attr(feature = "tracing-instrumentation", tracing::instrument(skip(data)))] -async fn database_function_call( - data: &Data, - block: CacheBlock, - account_id: &near_primitives::types::AccountId, - method_name: &str, - args: &near_primitives::types::FunctionArgs, -) -> Result { - run_contract( - account_id, - method_name, - args, - &data.db_manager, - &data.compiled_contract_code_cache, - &data.contract_code_cache, - &data.blocks_info_by_finality, - block, - data.max_gas_burnt, - Default::default(), // run contract with empty optimistic data - ) - .await + let call_results = + call_results.map_err(|err| err.to_rpc_query_error(block.block_height, block.block_hash))?; + Ok(near_jsonrpc::primitives::types::query::RpcQueryResponse { + kind: near_jsonrpc::primitives::types::query::QueryResponseKind::CallResult( + near_primitives::views::CallResult { + result: call_results.result, + logs: call_results.logs, + }, + ), + block_height: block.block_height, + block_hash: block.block_hash, + }) } #[cfg_attr(feature = "tracing-instrumentation", tracing::instrument(skip(data)))] diff --git a/rpc-server/src/modules/queries/mod.rs b/rpc-server/src/modules/queries/mod.rs index ae861ab3..e363fa87 100644 --- a/rpc-server/src/modules/queries/mod.rs +++ b/rpc-server/src/modules/queries/mod.rs @@ -1,298 +1,3 @@ -use std::collections::HashMap; - -use crate::modules::queries::utils::get_state_key_value_from_db; -use database::ReaderDbManager; -use futures::executor::block_on; - +mod contract_runner; pub mod methods; pub mod utils; - -pub type Result = ::std::result::Result; - -pub struct CodeStorage { - db_manager: std::sync::Arc>, - account_id: near_primitives::types::AccountId, - block_height: near_primitives::types::BlockHeight, - validators: HashMap, - data_count: u64, - is_optimistic: bool, - optimistic_data: - HashMap>, -} - -pub struct StorageValuePtr { - value: Vec, -} - -impl near_vm_runner::logic::ValuePtr for StorageValuePtr { - fn len(&self) -> u32 { - self.value.len() as u32 - } - - fn deref(&self) -> Result> { - Ok(self.value.clone()) - } -} - -impl CodeStorage { - pub fn init( - db_manager: std::sync::Arc>, - account_id: near_primitives::types::AccountId, - block_height: near_primitives::types::BlockHeight, - validators: HashMap, - optimistic_data: HashMap< - readnode_primitives::StateKey, - Option, - >, - ) -> Self { - Self { - db_manager, - account_id, - block_height, - validators, - data_count: Default::default(), // TODO: Using for generate_data_id - is_optimistic: !optimistic_data.is_empty(), - optimistic_data, - } - } - - fn get_state_key_data(&self, key: &[u8]) -> readnode_primitives::StateValue { - let get_db_data = get_state_key_value_from_db( - &self.db_manager, - &self.account_id, - self.block_height, - key.to_vec(), - "query_call_function", - ); - let (_, data) = block_on(get_db_data); - data - } - - fn optimistic_storage_get( - &self, - key: &[u8], - ) -> Result>> { - if let Some(value) = self.optimistic_data.get(key) { - Ok(value.as_ref().map(|data| { - Box::new(StorageValuePtr { - value: data.clone(), - }) as Box<_> - })) - } else { - self.database_storage_get(key) - } - } - - fn database_storage_get( - &self, - key: &[u8], - ) -> Result>> { - let data = self.get_state_key_data(key); - Ok(if !data.is_empty() { - Some(Box::new(StorageValuePtr { value: data }) as Box<_>) - } else { - None - }) - } - - fn optimistic_storage_has_key(&mut self, key: &[u8]) -> Result { - if let Some(value) = self.optimistic_data.get(key) { - Ok(value.is_some()) - } else { - self.database_storage_has_key(key) - } - } - - fn database_storage_has_key(&mut self, key: &[u8]) -> Result { - Ok(!self.get_state_key_data(key).is_empty()) - } -} - -impl near_vm_runner::logic::External for CodeStorage { - #[cfg_attr(feature = "tracing-instrumentation", tracing::instrument(skip(self)))] - fn storage_set(&mut self, _key: &[u8], _value: &[u8]) -> Result<()> { - Err(near_vm_runner::logic::VMLogicError::HostError( - near_vm_runner::logic::HostError::ProhibitedInView { - method_name: String::from("storage_set"), - }, - )) - } - - #[cfg_attr( - feature = "tracing-instrumentation", - tracing::instrument(skip(self, _mode)) - )] - fn storage_get( - &self, - key: &[u8], - _mode: near_vm_runner::logic::StorageGetMode, - ) -> Result>> { - if self.is_optimistic { - self.optimistic_storage_get(key) - } else { - self.database_storage_get(key) - } - } - - #[cfg_attr(feature = "tracing-instrumentation", tracing::instrument(skip(self)))] - fn storage_remove(&mut self, _key: &[u8]) -> Result<()> { - Err(near_vm_runner::logic::VMLogicError::HostError( - near_vm_runner::logic::HostError::ProhibitedInView { - method_name: String::from("storage_remove"), - }, - )) - } - - #[cfg_attr(feature = "tracing-instrumentation", tracing::instrument(skip(self)))] - fn storage_remove_subtree(&mut self, _prefix: &[u8]) -> Result<()> { - Err(near_vm_runner::logic::VMLogicError::HostError( - near_vm_runner::logic::HostError::ProhibitedInView { - method_name: String::from("storage_remove_subtree"), - }, - )) - } - - #[cfg_attr( - feature = "tracing-instrumentation", - tracing::instrument(skip(self, _mode)) - )] - fn storage_has_key( - &mut self, - key: &[u8], - _mode: near_vm_runner::logic::StorageGetMode, - ) -> Result { - if self.is_optimistic { - self.optimistic_storage_has_key(key) - } else { - self.database_storage_has_key(key) - } - } - - #[cfg_attr(feature = "tracing-instrumentation", tracing::instrument(skip(self)))] - fn generate_data_id(&mut self) -> near_primitives::hash::CryptoHash { - // TODO: Should be improvement in future - // Generates some hash for the data ID to receive data. - // This hash should not be functionality - let data_id = near_primitives::hash::hash(&self.data_count.to_le_bytes()); - self.data_count += 1; - data_id - } - - #[cfg_attr(feature = "tracing-instrumentation", tracing::instrument(skip(self)))] - fn get_trie_nodes_count(&self) -> near_vm_runner::logic::TrieNodesCount { - near_vm_runner::logic::TrieNodesCount { - db_reads: 0, - mem_reads: 0, - } - } - - #[cfg_attr(feature = "tracing-instrumentation", tracing::instrument(skip(self)))] - fn validator_stake( - &self, - account_id: &near_primitives::types::AccountId, - ) -> Result> { - Ok(self.validators.get(account_id).cloned()) - } - - #[cfg_attr(feature = "tracing-instrumentation", tracing::instrument(skip(self)))] - fn validator_total_stake(&self) -> Result { - Ok(self.validators.values().sum()) - } - - fn create_receipt( - &mut self, - _receipt_indices: Vec, - _receiver_id: near_primitives::types::AccountId, - ) -> Result { - Err(near_vm_runner::logic::VMLogicError::HostError( - near_vm_runner::logic::HostError::ProhibitedInView { - method_name: String::from("create_receipt"), - }, - )) - } - - fn append_action_create_account( - &mut self, - _receipt_index: near_vm_runner::logic::types::ReceiptIndex, - ) -> Result<()> { - Ok(()) - } - - fn append_action_deploy_contract( - &mut self, - _receipt_index: near_vm_runner::logic::types::ReceiptIndex, - _code: Vec, - ) -> Result<()> { - Ok(()) - } - - fn append_action_function_call_weight( - &mut self, - _receipt_index: near_vm_runner::logic::types::ReceiptIndex, - _method_name: Vec, - _args: Vec, - _attached_deposit: near_primitives::types::Balance, - _prepaid_gas: near_primitives::types::Gas, - _gas_weight: near_primitives::types::GasWeight, - ) -> Result<()> { - Ok(()) - } - - fn append_action_transfer( - &mut self, - _receipt_index: near_vm_runner::logic::types::ReceiptIndex, - _deposit: near_primitives::types::Balance, - ) -> Result<()> { - Ok(()) - } - - fn append_action_stake( - &mut self, - _receipt_index: near_vm_runner::logic::types::ReceiptIndex, - _stake: near_primitives::types::Balance, - _public_key: near_crypto::PublicKey, - ) { - } - - fn append_action_add_key_with_full_access( - &mut self, - _receipt_index: near_vm_runner::logic::types::ReceiptIndex, - _public_key: near_crypto::PublicKey, - _nonce: near_primitives::types::Nonce, - ) { - } - - fn append_action_add_key_with_function_call( - &mut self, - _receipt_index: near_vm_runner::logic::types::ReceiptIndex, - _public_key: near_crypto::PublicKey, - _nonce: near_primitives::types::Nonce, - _allowance: Option, - _receiver_id: near_primitives::types::AccountId, - _method_names: Vec>, - ) -> Result<()> { - Ok(()) - } - - fn append_action_delete_key( - &mut self, - _receipt_index: near_vm_runner::logic::types::ReceiptIndex, - _public_key: near_crypto::PublicKey, - ) { - } - - fn append_action_delete_account( - &mut self, - _receipt_index: near_vm_runner::logic::types::ReceiptIndex, - _beneficiary_id: near_primitives::types::AccountId, - ) -> Result<()> { - Ok(()) - } - - fn get_receipt_receiver( - &self, - _receipt_index: near_vm_runner::logic::types::ReceiptIndex, - ) -> &near_primitives::types::AccountId { - panic!("Prohibited in view. `get_receipt_receiver`"); - } -} diff --git a/rpc-server/src/modules/queries/utils.rs b/rpc-server/src/modules/queries/utils.rs index 70400748..320e789b 100644 --- a/rpc-server/src/modules/queries/utils.rs +++ b/rpc-server/src/modules/queries/utils.rs @@ -1,19 +1,6 @@ use std::collections::HashMap; use futures::StreamExt; -use near_vm_runner::internal::VMKindExt; -use near_vm_runner::ContractRuntimeCache; - -use crate::errors::FunctionCallError; -use crate::modules::blocks::BlocksInfoByFinality; -use crate::modules::queries::CodeStorage; - -pub struct RunContractResponse { - pub result: Vec, - pub logs: Vec, - pub block_height: near_primitives::types::BlockHeight, - pub block_hash: near_primitives::hash::CryptoHash, -} // Function to get state key value from the database // This function to wrap the database call to get state key value @@ -140,214 +127,3 @@ pub async fn fetch_list_access_keys_from_db( .collect(); Ok(account_keys_view) } - -#[allow(clippy::too_many_arguments)] -#[cfg_attr( - feature = "tracing-instrumentation", - tracing::instrument(skip(context, code_storage, contract_code, compiled_contract_code_cache)) -)] -async fn run_code_in_vm_runner( - code_hash: near_primitives::hash::CryptoHash, - contract_code: Option, - method_name: String, - context: near_vm_runner::logic::VMContext, - mut code_storage: CodeStorage, - vm_config: near_parameters::vm::Config, - compiled_contract_code_cache: &std::sync::Arc, -) -> Result { - let compiled_contract_code_cache_handle = compiled_contract_code_cache.handle(); - let span = tracing::debug_span!("run_code_in_vm_runner"); - - let results = tokio::task::spawn_blocking(move || { - let _entered = span.entered(); - let promise_results = vec![]; - let fees = near_parameters::RuntimeFeesConfig::free(); - - let runtime = vm_config - .vm_kind - .runtime(vm_config.clone()) - .expect("runtime has not been enabled at compile time"); - - if let Some(code) = &contract_code { - runtime - .precompile(code, &compiled_contract_code_cache_handle) - .expect("Compilation failed") - .expect("Cache failed"); - }; - - runtime.run( - code_hash, - None, - &method_name, - &mut code_storage, - &context, - &fees, - &promise_results, - Some(&compiled_contract_code_cache_handle), - ) - }) - .await; - match results { - Ok(result) => result, - Err(err) => Err( - near_vm_runner::logic::errors::VMRunnerError::WasmUnknownError { - debug_message: format!("Failed to run contract: {:?}", err), - }, - ), - } -} - -#[allow(clippy::too_many_arguments)] -#[cfg_attr( - feature = "tracing-instrumentation", - tracing::instrument(skip(db_manager, compiled_contract_code_cache, contract_code_cache)) -)] -pub async fn run_contract( - account_id: &near_primitives::types::AccountId, - method_name: &str, - args: &near_primitives::types::FunctionArgs, - db_manager: &std::sync::Arc>, - compiled_contract_code_cache: &std::sync::Arc, - contract_code_cache: &std::sync::Arc< - crate::cache::RwLockLruMemoryCache>, - >, - blocks_info_by_finality: &std::sync::Arc, - block: crate::modules::blocks::CacheBlock, - max_gas_burnt: near_primitives::types::Gas, - optimistic_data: HashMap< - readnode_primitives::StateKey, - Option, - >, -) -> Result { - let contract = db_manager - .get_account(account_id, block.block_height, "query_call_function") - .await - .map_err(|_| FunctionCallError::AccountDoesNotExist { - requested_account_id: account_id.clone(), - })?; - - let (epoch_height, epoch_validators) = - if blocks_info_by_finality.final_cache_block().await.epoch_id == block.epoch_id { - let validators = blocks_info_by_finality.validators().await; - (validators.epoch_height, validators.current_validators) - } else { - let validators = db_manager - .get_validators_by_epoch_id(block.epoch_id, "query_call_function") - .await - .map_err(|_| FunctionCallError::InternalError { - error_message: "Failed to get epoch info".to_string(), - })?; - ( - validators.epoch_height, - validators.validators_info.current_validators, - ) - }; - let validators = epoch_validators - .iter() - .map(|validator| (validator.account_id.clone(), validator.stake)) - .collect(); - - // Prepare context for the VM run contract - let public_key = near_crypto::PublicKey::empty(near_crypto::KeyType::ED25519); - let random_seed = near_primitives::utils::create_random_seed( - block.latest_protocol_version, - near_primitives::hash::CryptoHash::default(), - block.state_root, - ); - let context = near_vm_runner::logic::VMContext { - current_account_id: account_id.clone(), - signer_account_id: account_id.clone(), - signer_account_pk: borsh::to_vec(&public_key).expect("Failed to serialize"), - predecessor_account_id: account_id.clone(), - input: args.to_vec(), - block_height: block.block_height, - block_timestamp: block.block_timestamp, - epoch_height, - account_balance: contract.data.amount(), - account_locked_balance: contract.data.locked(), - storage_usage: contract.data.storage_usage(), - attached_deposit: 0, - prepaid_gas: max_gas_burnt, - random_seed, - view_config: Some(near_primitives::config::ViewConfig { max_gas_burnt }), - output_data_receivers: vec![], - }; - - // Init runtime config for each protocol version - let store = near_parameters::RuntimeConfigStore::free(); - let config = store - .get_config(block.latest_protocol_version) - .wasm_config - .clone(); - let vm_config = near_parameters::vm::Config { - vm_kind: config.vm_kind.replace_with_wasmtime_if_unsupported(), - ..config - }; - let code_hash = contract.data.code_hash(); - // Check if the contract code is already in the cache - - let key = near_vm_runner::get_contract_cache_key(code_hash, &vm_config); - let contract_code = if compiled_contract_code_cache.has(&key).unwrap_or(false) { - None - } else { - Some(match contract_code_cache.get(&code_hash).await { - Some(code) => near_vm_runner::ContractCode::new(code, Some(code_hash)), - None => { - let code = db_manager - .get_contract_code(account_id, block.block_height, "query_call_function") - .await - .map_err(|_| FunctionCallError::InvalidAccountId { - requested_account_id: account_id.clone(), - })?; - contract_code_cache - .put(contract.data.code_hash(), code.data.clone()) - .await; - near_vm_runner::ContractCode::new(code.data, Some(contract.data.code_hash())) - } - }) - }; - - // Init an external scylla interface for the Runtime logic - let code_storage = CodeStorage::init( - db_manager.clone(), - account_id.clone(), - block.block_height, - validators, - optimistic_data, - ); - - // Execute the contract in the near VM - let result = run_code_in_vm_runner( - code_hash, - contract_code, - method_name.to_string(), - context, - code_storage, - vm_config, - compiled_contract_code_cache, - ) - .await - .map_err(|e| FunctionCallError::InternalError { - error_message: e.to_string(), - })?; - - if let Some(err) = result.aborted { - let message = format!("wasm execution failed with error: {:?}", err); - Err(FunctionCallError::VMError { - error_message: message, - }) - } else { - let logs = result.logs; - let result = match result.return_data { - near_vm_runner::logic::ReturnData::Value(buf) => buf, - near_vm_runner::logic::ReturnData::ReceiptIndex(_) - | near_vm_runner::logic::ReturnData::None => vec![], - }; - Ok(RunContractResponse { - result, - logs, - block_height: block.block_height, - block_hash: block.block_hash, - }) - } -} diff --git a/rpc-server/src/modules/state/methods.rs b/rpc-server/src/modules/state/methods.rs index e06b5b23..eeaf1b29 100644 --- a/rpc-server/src/modules/state/methods.rs +++ b/rpc-server/src/modules/state/methods.rs @@ -9,7 +9,7 @@ pub async fn view_state_paginated( Params(params): Params, ) -> Result { let block_reference = near_primitives::types::BlockReference::BlockId(params.block_id.clone()); - let block = fetch_block_from_cache_or_get(&data, block_reference, "view_state_paginated") + let block = fetch_block_from_cache_or_get(&data, &block_reference, "view_state_paginated") .await .map_err(near_jsonrpc::primitives::errors::RpcError::from)?;