Skip to content

Commit

Permalink
(tx-indexer): Drop validator feature along with code, add save_receip…
Browse files Browse the repository at this point in the history
…ts_outcomes feature by default (#334)
  • Loading branch information
khorolets committed Aug 9, 2024
1 parent 88e6595 commit 5aeb8f8
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 68 deletions.
5 changes: 3 additions & 2 deletions tx-indexer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ near-jsonrpc-client.workspace = true
near-lake-framework.workspace = true

[features]
default = []
validator = []
default = ["save_receipts_outcomes"]
# this feature enables storing receipt and outcome data to DB
save_receipts_outcomes = []
tracing-instrumentation = ["configuration/tracing-instrumentation"]
86 changes: 20 additions & 66 deletions tx-indexer/src/collector.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
#[cfg(feature = "validator")]
use borsh::BorshDeserialize;

use futures::{
future::{join_all, try_join_all},
StreamExt,
Expand All @@ -12,6 +9,7 @@ use crate::storage;

const SAVE_ATTEMPTS: usize = 20;

#[allow(unused_variables)]
#[cfg_attr(feature = "tracing-instrumentation", tracing::instrument(skip_all))]
pub(crate) async fn index_transactions(
streamer_message: &near_indexer_primitives::StreamerMessage,
Expand All @@ -26,8 +24,19 @@ pub(crate) async fn index_transactions(

let save_finished_tx_details_future =
save_finished_transaction_details(tx_collecting_storage, tx_details_storage);
let save_outcomes_and_receipts_future =
save_outcomes_and_receipts(db_manager, tx_collecting_storage);

let save_outcomes_and_receipts_future = {
#[cfg(feature = "save_outcomes_and_receipts")]
{
save_outcomes_and_receipts(db_manager, tx_collecting_storage);
}
#[cfg(not(feature = "save_outcomes_and_receipts"))]
{
// if feature is disabled just return Ok(()) to skip saving outcomes and receipts
// to the database, this is useful for testing and reindexing only transaction details
futures::future::ready(Ok(()))
}
};
futures::try_join!(
save_finished_tx_details_future,
save_outcomes_and_receipts_future
Expand Down Expand Up @@ -73,6 +82,7 @@ async fn save_finished_transaction_details(
Ok(())
}

#[cfg(feature = "save_outcomes_and_receipts")]
async fn save_outcomes_and_receipts(
db_manager: &std::sync::Arc<Box<dyn database::TxIndexerDbManager + Sync + Send + 'static>>,
tx_collecting_storage: &std::sync::Arc<crate::storage::CacheStorage>,
Expand Down Expand Up @@ -111,6 +121,7 @@ async fn save_outcomes_and_receipts(
Ok(())
}

#[cfg(feature = "save_outcomes_and_receipts")]
#[cfg_attr(feature = "tracing-instrumentation", tracing::instrument(skip_all))]
async fn save_receipts_and_outcomes_details(
db_manager: &std::sync::Arc<Box<dyn database::TxIndexerDbManager + Sync + Send + 'static>>,
Expand Down Expand Up @@ -152,6 +163,7 @@ async fn save_receipts_and_outcomes_details(
}
}

#[cfg(feature = "save_outcomes_and_receipts")]
async fn save_outcome_and_receipt_to_shard(
db_manager: &std::sync::Arc<Box<dyn database::TxIndexerDbManager + Sync + Send + 'static>>,
shard_id: database::primitives::ShardId,
Expand Down Expand Up @@ -464,11 +476,6 @@ async fn save_transaction_details_to_storage(
let transaction_hash = transaction_details.transaction.hash.to_string();
let tx_bytes = transaction_details.tx_serialize()?;

// We faced the issue when the transaction was saved to the storage but later
// was failing to deserialize. To avoid this issue and monitor the situation
// without runing the user experience, we will try to save the transaction
// to the storage and validate that it is saved correctly for 3 attempts
// before we throw an error.
let mut save_attempts = 0;
'retry: loop {
save_attempts += 1;
Expand All @@ -494,62 +501,9 @@ async fn save_transaction_details_to_storage(
save_attempts,
);
}
#[cfg(feature = "validator")]
{
// At this moment transaction seems to be stored, and we want to validate the correctness of the stored data
// To validate we will try to retrieve the transaction from the storage and validate that it is deserializable
// If the transaction is not deserializable, we will try to save it again
let mut retrieve_attempts = 0;
'validator: loop {
retrieve_attempts += 1;
if retrieve_attempts >= SAVE_ATTEMPTS {
tracing::error!(
target: crate::INDEXER,
"Failed to retrieve transaction {} for validation after {} attempts",
transaction_hash,
retrieve_attempts,
);
break 'validator;
}
tokio::time::sleep(std::time::Duration::from_millis(150)).await;
let Ok(tx_details_bytes_from_storage) =
tx_details_storage.retrieve(&transaction_hash).await
else {
tracing::error!(
target: crate::INDEXER,
"Failed to retrieve transaction {} from storage",
transaction_hash,
);
continue 'validator;
};

match readnode_primitives::TransactionDetails::try_from_slice(
&tx_details_bytes_from_storage,
) {
Ok(_) => {
// We assume that the transaction is saved correctly
// We can remove the transaction from the cache storage
metrics::TX_IN_MEMORY_CACHE.dec();
break 'retry Ok(());
}
Err(err) => {
tracing::warn!(
target: crate::INDEXER,
"Failed to validate transaction {} \n{:#?}",
transaction_hash,
err
);
// If the transaction is not deserializable, we will try to save it again
continue 'retry;
}
}
}
}
#[cfg(not(feature = "validator"))]
{
metrics::TX_IN_MEMORY_CACHE.dec();
break 'retry Ok(());
}

metrics::TX_IN_MEMORY_CACHE.dec();
break 'retry Ok(());
}
Err(err) => {
crate::metrics::TX_STORE_ERRORS_TOTAL.inc();
Expand Down
3 changes: 3 additions & 0 deletions tx-indexer/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,7 @@ impl CacheStorage {
Ok(transactions)
}

#[cfg(feature = "save_outcomes_and_receipts")]
#[cfg_attr(feature = "tracing-instrumentation", tracing::instrument(skip_all))]
pub(crate) async fn outcomes_and_receipts_to_save(
&self,
Expand Down Expand Up @@ -459,6 +460,7 @@ impl CacheStorage {
Ok(())
}

#[cfg(feature = "save_outcomes_and_receipts")]
pub(crate) async fn return_outcomes_to_save(
&self,
shard_id: u64,
Expand All @@ -478,6 +480,7 @@ impl CacheStorage {
});
}

#[cfg(feature = "save_outcomes_and_receipts")]
pub(crate) async fn return_receipts_to_save(
&self,
shard_id: u64,
Expand Down

0 comments on commit 5aeb8f8

Please sign in to comment.