From 1b5f9426f968e13b801fff8c0425d18190fb6950 Mon Sep 17 00:00:00 2001 From: Arya Date: Wed, 12 Jun 2024 09:35:13 -0400 Subject: [PATCH] change(state): Add an init function for a standalone `ReadStateService` (#8595) * Adds an init_read_only() fn in zebra-state * moves elasticsearch initialization to `FinalizedState::new_with_debug()` * Updates callers of `FinalizedState::{new, new_with_debug}` to pass a bool to try enabling elasticsearch --------- Co-authored-by: Alfredo Garcia --- zebra-state/src/arbitrary.rs | 21 ----- zebra-state/src/lib.rs | 9 +- zebra-state/src/service.rs | 90 +++++++++++-------- zebra-state/src/service/chain_tip.rs | 16 ++-- zebra-state/src/service/finalized_state.rs | 34 ++++++- .../disk_format/tests/snapshot.rs | 2 +- .../src/service/finalized_state/tests/prop.rs | 4 +- .../zebra_db/block/tests/snapshot.rs | 2 +- .../service/non_finalized_state/tests/prop.rs | 2 +- .../non_finalized_state/tests/vectors.rs | 16 ++-- zebra-state/src/tests/setup.rs | 2 +- 11 files changed, 116 insertions(+), 82 deletions(-) diff --git a/zebra-state/src/arbitrary.rs b/zebra-state/src/arbitrary.rs index 9f87c749c98..c2176296ca7 100644 --- a/zebra-state/src/arbitrary.rs +++ b/zebra-state/src/arbitrary.rs @@ -50,27 +50,6 @@ where } } -impl From for ChainTipBlock { - fn from(prepared: SemanticallyVerifiedBlock) -> Self { - let SemanticallyVerifiedBlock { - block, - hash, - height, - new_outputs: _, - transaction_hashes, - } = prepared; - - Self { - hash, - height, - time: block.header.time, - transactions: block.transactions.clone(), - transaction_hashes, - previous_block_hash: block.header.previous_block_hash, - } - } -} - impl SemanticallyVerifiedBlock { /// Returns a [`ContextuallyVerifiedBlock`] created from this block, /// with fake zero-valued spent UTXOs. diff --git a/zebra-state/src/lib.rs b/zebra-state/src/lib.rs index 59088a931bd..58a83c9e15d 100644 --- a/zebra-state/src/lib.rs +++ b/zebra-state/src/lib.rs @@ -46,8 +46,10 @@ pub use request::{ }; pub use response::{KnownBlock, MinedTx, ReadResponse, Response}; pub use service::{ - chain_tip::{ChainTipChange, LatestChainTip, TipAction}, - check, init, spawn_init, + chain_tip::{ChainTipBlock, ChainTipChange, ChainTipSender, LatestChainTip, TipAction}, + check, init, init_read_only, + non_finalized_state::NonFinalizedState, + spawn_init, spawn_init_read_only, watch_receiver::WatchReceiver, OutputIndex, OutputLocation, TransactionIndex, TransactionLocation, }; @@ -76,7 +78,6 @@ pub use response::GetBlockTemplateChainInfo; #[cfg(any(test, feature = "proptest-impl"))] pub use service::{ arbitrary::{populated_state, CHAIN_TIP_UPDATE_WAIT_LIMIT}, - chain_tip::{ChainTipBlock, ChainTipSender}, finalized_state::{RawBytes, KV, MAX_ON_DISK_HEIGHT}, init_test, init_test_services, }; @@ -96,4 +97,4 @@ pub(crate) use config::hidden::{ write_database_format_version_to_disk, write_state_database_format_version_to_disk, }; -pub(crate) use request::ContextuallyVerifiedBlock; +pub use request::ContextuallyVerifiedBlock; diff --git a/zebra-state/src/service.rs b/zebra-state/src/service.rs index 994e8ad6b32..0fbe8d8eaad 100644 --- a/zebra-state/src/service.rs +++ b/zebra-state/src/service.rs @@ -24,15 +24,6 @@ use std::{ time::{Duration, Instant}, }; -#[cfg(feature = "elasticsearch")] -use elasticsearch::{ - auth::Credentials::Basic, - cert::CertificateValidation, - http::transport::{SingleNodeConnectionPool, TransportBuilder}, - http::Url, - Elasticsearch, -}; - use futures::future::FutureExt; use tokio::sync::{oneshot, watch}; use tower::{util::BoxService, Service, ServiceExt}; @@ -319,29 +310,12 @@ impl StateService { checkpoint_verify_concurrency_limit: usize, ) -> (Self, ReadStateService, LatestChainTip, ChainTipChange) { let timer = CodeTimer::start(); - - #[cfg(feature = "elasticsearch")] - let finalized_state = { - let conn_pool = SingleNodeConnectionPool::new( - Url::parse(config.elasticsearch_url.as_str()) - .expect("configured elasticsearch url is invalid"), - ); - let transport = TransportBuilder::new(conn_pool) - .cert_validation(CertificateValidation::None) - .auth(Basic( - config.clone().elasticsearch_username, - config.clone().elasticsearch_password, - )) - .build() - .expect("elasticsearch transport builder should not fail"); - let elastic_db = Some(Elasticsearch::new(transport)); - - FinalizedState::new(&config, network, elastic_db) - }; - - #[cfg(not(feature = "elasticsearch"))] - let finalized_state = { FinalizedState::new(&config, network) }; - + let finalized_state = FinalizedState::new( + &config, + network, + #[cfg(feature = "elasticsearch")] + true, + ); timer.finish(module_path!(), line!(), "opening finalized state database"); let timer = CodeTimer::start(); @@ -387,7 +361,7 @@ impl StateService { let read_service = ReadStateService::new( &finalized_state, - block_write_task, + Some(block_write_task), non_finalized_state_receiver, ); @@ -828,14 +802,14 @@ impl ReadStateService { /// and a watch channel for updating the shared recent non-finalized chain. pub(crate) fn new( finalized_state: &FinalizedState, - block_write_task: Arc>, + block_write_task: Option>>, non_finalized_state_receiver: watch::Receiver, ) -> Self { let read_service = Self { network: finalized_state.network(), db: finalized_state.db.clone(), non_finalized_state_receiver: WatchReceiver::new(non_finalized_state_receiver), - block_write_task: Some(block_write_task), + block_write_task, }; tracing::debug!("created new read-only state service"); @@ -1945,6 +1919,52 @@ pub fn init( ) } +/// Initialize a read state service from the provided [`Config`]. +/// Returns a read-only state service, +/// +/// Each `network` has its own separate on-disk database. +/// +/// To share access to the state, clone the returned [`ReadStateService`]. +pub fn init_read_only( + config: Config, + network: &Network, +) -> ( + ReadStateService, + ZebraDb, + tokio::sync::watch::Sender, +) { + let finalized_state = FinalizedState::new_with_debug( + &config, + network, + true, + #[cfg(feature = "elasticsearch")] + false, + true, + ); + let (non_finalized_state_sender, non_finalized_state_receiver) = + tokio::sync::watch::channel(NonFinalizedState::new(network)); + + ( + ReadStateService::new(&finalized_state, None, non_finalized_state_receiver), + finalized_state.db.clone(), + non_finalized_state_sender, + ) +} + +/// Calls [`init_read_only`] with the provided [`Config`] and [`Network`] from a blocking task. +/// Returns a [`tokio::task::JoinHandle`] with a read state service and chain tip sender. +pub fn spawn_init_read_only( + config: Config, + network: &Network, +) -> tokio::task::JoinHandle<( + ReadStateService, + ZebraDb, + tokio::sync::watch::Sender, +)> { + let network = network.clone(); + tokio::task::spawn_blocking(move || init_read_only(config, &network)) +} + /// Calls [`init`] with the provided [`Config`] and [`Network`] from a blocking task. /// Returns a [`tokio::task::JoinHandle`] with a boxed state service, /// a read state service, and receivers for state chain tip updates. diff --git a/zebra-state/src/service/chain_tip.rs b/zebra-state/src/service/chain_tip.rs index 3f3468f4ca6..3fb016aa356 100644 --- a/zebra-state/src/service/chain_tip.rs +++ b/zebra-state/src/service/chain_tip.rs @@ -107,15 +107,15 @@ impl From for ChainTipBlock { } } -impl From for ChainTipBlock { - fn from(finalized: CheckpointVerifiedBlock) -> Self { - let CheckpointVerifiedBlock(SemanticallyVerifiedBlock { +impl From for ChainTipBlock { + fn from(prepared: SemanticallyVerifiedBlock) -> Self { + let SemanticallyVerifiedBlock { block, hash, height, + new_outputs: _, transaction_hashes, - .. - }) = finalized; + } = prepared; Self { hash, @@ -128,6 +128,12 @@ impl From for ChainTipBlock { } } +impl From for ChainTipBlock { + fn from(CheckpointVerifiedBlock(prepared): CheckpointVerifiedBlock) -> Self { + prepared.into() + } +} + /// A sender for changes to the non-finalized and finalized chain tips. #[derive(Debug)] pub struct ChainTipSender { diff --git a/zebra-state/src/service/finalized_state.rs b/zebra-state/src/service/finalized_state.rs index 7256d89dac2..f8c9bade5c1 100644 --- a/zebra-state/src/service/finalized_state.rs +++ b/zebra-state/src/service/finalized_state.rs @@ -142,14 +142,14 @@ impl FinalizedState { pub fn new( config: &Config, network: &Network, - #[cfg(feature = "elasticsearch")] elastic_db: Option, + #[cfg(feature = "elasticsearch")] enable_elastic_db: bool, ) -> Self { Self::new_with_debug( config, network, false, #[cfg(feature = "elasticsearch")] - elastic_db, + enable_elastic_db, false, ) } @@ -162,9 +162,37 @@ impl FinalizedState { config: &Config, network: &Network, debug_skip_format_upgrades: bool, - #[cfg(feature = "elasticsearch")] elastic_db: Option, + #[cfg(feature = "elasticsearch")] enable_elastic_db: bool, read_only: bool, ) -> Self { + #[cfg(feature = "elasticsearch")] + let elastic_db = if enable_elastic_db { + use elasticsearch::{ + auth::Credentials::Basic, + cert::CertificateValidation, + http::transport::{SingleNodeConnectionPool, TransportBuilder}, + http::Url, + Elasticsearch, + }; + + let conn_pool = SingleNodeConnectionPool::new( + Url::parse(config.elasticsearch_url.as_str()) + .expect("configured elasticsearch url is invalid"), + ); + let transport = TransportBuilder::new(conn_pool) + .cert_validation(CertificateValidation::None) + .auth(Basic( + config.clone().elasticsearch_username, + config.clone().elasticsearch_password, + )) + .build() + .expect("elasticsearch transport builder should not fail"); + + Some(Elasticsearch::new(transport)) + } else { + None + }; + let db = ZebraDb::new( config, STATE_DATABASE_KIND, diff --git a/zebra-state/src/service/finalized_state/disk_format/tests/snapshot.rs b/zebra-state/src/service/finalized_state/disk_format/tests/snapshot.rs index 14a8dd6c2a7..eb12cf41f1b 100644 --- a/zebra-state/src/service/finalized_state/disk_format/tests/snapshot.rs +++ b/zebra-state/src/service/finalized_state/disk_format/tests/snapshot.rs @@ -62,7 +62,7 @@ fn test_raw_rocksdb_column_families_with_network(network: Network) { &Config::ephemeral(), &network, #[cfg(feature = "elasticsearch")] - None, + false, ); // Snapshot the column family names diff --git a/zebra-state/src/service/finalized_state/tests/prop.rs b/zebra-state/src/service/finalized_state/tests/prop.rs index d48761795f5..7ff2c3ac91b 100644 --- a/zebra-state/src/service/finalized_state/tests/prop.rs +++ b/zebra-state/src/service/finalized_state/tests/prop.rs @@ -24,7 +24,7 @@ fn blocks_with_v5_transactions() -> Result<()> { .and_then(|v| v.parse().ok()) .unwrap_or(DEFAULT_PARTIAL_CHAIN_PROPTEST_CASES)), |((chain, count, network, _history_tree) in PreparedChain::default())| { - let mut state = FinalizedState::new(&Config::ephemeral(), &network, #[cfg(feature = "elasticsearch")] None); + let mut state = FinalizedState::new(&Config::ephemeral(), &network, #[cfg(feature = "elasticsearch")] false); let mut height = Height(0); // use `count` to minimize test failures, so they are easier to diagnose for block in chain.iter().take(count) { @@ -65,7 +65,7 @@ fn all_upgrades_and_wrong_commitments_with_fake_activation_heights() -> Result<( .unwrap_or(DEFAULT_PARTIAL_CHAIN_PROPTEST_CASES)), |((chain, _count, network, _history_tree) in PreparedChain::default().with_valid_commitments().no_shrink())| { - let mut state = FinalizedState::new(&Config::ephemeral(), &network, #[cfg(feature = "elasticsearch")] None); + let mut state = FinalizedState::new(&Config::ephemeral(), &network, #[cfg(feature = "elasticsearch")] false); let mut height = Height(0); let heartwood_height = NetworkUpgrade::Heartwood.activation_height(&network).unwrap(); let heartwood_height_plus1 = (heartwood_height + 1).unwrap(); diff --git a/zebra-state/src/service/finalized_state/zebra_db/block/tests/snapshot.rs b/zebra-state/src/service/finalized_state/zebra_db/block/tests/snapshot.rs index dbd0e7c1dae..c5f1ba371d5 100644 --- a/zebra-state/src/service/finalized_state/zebra_db/block/tests/snapshot.rs +++ b/zebra-state/src/service/finalized_state/zebra_db/block/tests/snapshot.rs @@ -169,7 +169,7 @@ fn test_block_and_transaction_data_with_network(network: Network) { &Config::ephemeral(), &network, #[cfg(feature = "elasticsearch")] - None, + false, ); // Assert that empty databases are the same, regardless of the network. diff --git a/zebra-state/src/service/non_finalized_state/tests/prop.rs b/zebra-state/src/service/non_finalized_state/tests/prop.rs index d4ef55344c0..68f7d77588b 100644 --- a/zebra-state/src/service/non_finalized_state/tests/prop.rs +++ b/zebra-state/src/service/non_finalized_state/tests/prop.rs @@ -479,7 +479,7 @@ fn rejection_restores_internal_state_genesis() -> Result<()> { } ))| { let mut state = NonFinalizedState::new(&network); - let finalized_state = FinalizedState::new(&Config::ephemeral(), &network, #[cfg(feature = "elasticsearch")] None); + let finalized_state = FinalizedState::new(&Config::ephemeral(), &network, #[cfg(feature = "elasticsearch")] false); let fake_value_pool = ValueBalance::::fake_populated_pool(); finalized_state.set_finalized_value_pool(fake_value_pool); diff --git a/zebra-state/src/service/non_finalized_state/tests/vectors.rs b/zebra-state/src/service/non_finalized_state/tests/vectors.rs index 6f908f080c0..b489d6f94f0 100644 --- a/zebra-state/src/service/non_finalized_state/tests/vectors.rs +++ b/zebra-state/src/service/non_finalized_state/tests/vectors.rs @@ -157,7 +157,7 @@ fn best_chain_wins_for_network(network: Network) -> Result<()> { &Config::ephemeral(), &network, #[cfg(feature = "elasticsearch")] - None, + false, ); state.commit_new_chain(block2.prepare(), &finalized_state)?; @@ -194,7 +194,7 @@ fn finalize_pops_from_best_chain_for_network(network: Network) -> Result<()> { &Config::ephemeral(), &network, #[cfg(feature = "elasticsearch")] - None, + false, ); let fake_value_pool = ValueBalance::::fake_populated_pool(); @@ -245,7 +245,7 @@ fn commit_block_extending_best_chain_doesnt_drop_worst_chains_for_network( &Config::ephemeral(), &network, #[cfg(feature = "elasticsearch")] - None, + false, ); let fake_value_pool = ValueBalance::::fake_populated_pool(); @@ -289,7 +289,7 @@ fn shorter_chain_can_be_best_chain_for_network(network: Network) -> Result<()> { &Config::ephemeral(), &network, #[cfg(feature = "elasticsearch")] - None, + false, ); let fake_value_pool = ValueBalance::::fake_populated_pool(); @@ -334,7 +334,7 @@ fn longer_chain_with_more_work_wins_for_network(network: Network) -> Result<()> &Config::ephemeral(), &network, #[cfg(feature = "elasticsearch")] - None, + false, ); let fake_value_pool = ValueBalance::::fake_populated_pool(); @@ -378,7 +378,7 @@ fn equal_length_goes_to_more_work_for_network(network: Network) -> Result<()> { &Config::ephemeral(), &network, #[cfg(feature = "elasticsearch")] - None, + false, ); let fake_value_pool = ValueBalance::::fake_populated_pool(); @@ -426,7 +426,7 @@ fn history_tree_is_updated_for_network_upgrade( &Config::ephemeral(), &network, #[cfg(feature = "elasticsearch")] - None, + false, ); state @@ -525,7 +525,7 @@ fn commitment_is_validated_for_network_upgrade(network: Network, network_upgrade &Config::ephemeral(), &network, #[cfg(feature = "elasticsearch")] - None, + false, ); state diff --git a/zebra-state/src/tests/setup.rs b/zebra-state/src/tests/setup.rs index d407726330e..cc53a0d7ee5 100644 --- a/zebra-state/src/tests/setup.rs +++ b/zebra-state/src/tests/setup.rs @@ -101,7 +101,7 @@ pub(crate) fn new_state_with_mainnet_genesis( // The tests that use this setup function also commit invalid blocks to the state. true, #[cfg(feature = "elasticsearch")] - None, + false, false, ); let non_finalized_state = NonFinalizedState::new(&network);