Skip to content

Commit

Permalink
Merge branch 'dev' into eip1559-gas-fee-estimate
Browse files Browse the repository at this point in the history
* dev:
  feat(zcoin): balance event streaming (KomodoPlatform#2076)
  feat(zcoin): tx_history support for WASM target (KomodoPlatform#2077)
  • Loading branch information
dimxy committed Mar 30, 2024
2 parents 90f362a + a81a67f commit 27a9743
Show file tree
Hide file tree
Showing 15 changed files with 497 additions and 201 deletions.
1 change: 0 additions & 1 deletion mm2src/coins/my_tx_history_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -514,7 +514,6 @@ where
})
}

#[cfg(not(target_arch = "wasm32"))]
pub async fn z_coin_tx_history_rpc(
ctx: MmArc,
request: MyTxHistoryRequestV2<i64>,
Expand Down
259 changes: 91 additions & 168 deletions mm2src/coins/z_coin.rs

Large diffs are not rendered by default.

12 changes: 9 additions & 3 deletions mm2src/coins/z_coin/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,13 @@ pub mod blockdb;
pub use blockdb::*;

pub mod walletdb;
#[cfg(target_arch = "wasm32")] mod z_params;
#[cfg(target_arch = "wasm32")]
pub(crate) use z_params::ZcashParamsWasmImpl;

pub use walletdb::*;

use crate::z_coin::z_balance_streaming::ZBalanceEventSender;
use mm2_err_handle::mm_error::MmResult;
#[cfg(target_arch = "wasm32")]
use walletdb::wasm::storage::DataConnStmtCacheWasm;
Expand Down Expand Up @@ -55,7 +60,7 @@ pub struct CompactBlockRow {
#[derive(Clone)]
pub enum BlockProcessingMode {
Validate,
Scan(DataConnStmtCacheWrapper),
Scan(DataConnStmtCacheWrapper, Option<ZBalanceEventSender>),
}

/// Checks that the scanned blocks in the data database, when combined with the recent
Expand Down Expand Up @@ -114,7 +119,7 @@ pub async fn scan_cached_block(
params: &ZcoinConsensusParams,
block: &CompactBlock,
last_height: &mut BlockHeight,
) -> Result<(), ValidateBlocksError> {
) -> Result<usize, ValidateBlocksError> {
let mut data_guard = data.inner().clone();
// Fetch the ExtendedFullViewingKeys we are tracking
let extfvks = data_guard.get_extended_full_viewing_keys().await?;
Expand Down Expand Up @@ -201,5 +206,6 @@ pub async fn scan_cached_block(

*last_height = current_height;

Ok(())
// If there are any transactions in the block, return the transaction count
Ok(txs.len())
}
16 changes: 12 additions & 4 deletions mm2src/coins/z_coin/storage/blockdb/blockdb_idb_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use crate::z_coin::storage::{scan_cached_block, validate_chain, BlockDbImpl, Blo
use crate::z_coin::z_coin_errors::ZcoinStorageError;

use async_trait::async_trait;
use futures_util::SinkExt;
use mm2_core::mm_ctx::MmArc;
use mm2_db::indexed_db::{BeBigUint, ConstructibleDb, DbIdentifier, DbInstance, DbLocked, DbUpgrader, IndexedDb,
IndexedDbBuilder, InitDbResult, MultiIndex, OnUpgradeResult, TableSignature};
Expand Down Expand Up @@ -123,7 +124,7 @@ impl BlockDbImpl {
}

/// Asynchronously rewinds the storage to a specified block height, effectively
/// removing data beyond the specified height from the storage.
/// removing data beyond the specified height from the storage.
pub async fn rewind_to_height(&self, height: BlockHeight) -> ZcoinStorageRes<usize> {
let locked_db = self.lock_db().await?;
let db_transaction = locked_db.get_inner().transaction().await?;
Expand Down Expand Up @@ -224,7 +225,7 @@ impl BlockDbImpl {
BlockProcessingMode::Validate => validate_from
.map(|(height, _)| height)
.unwrap_or(BlockHeight::from_u32(params.sapling_activation_height) - 1),
BlockProcessingMode::Scan(data) => data.inner().block_height_extrema().await.map(|opt| {
BlockProcessingMode::Scan(data, _) => data.inner().block_height_extrema().await.map(|opt| {
opt.map(|(_, max)| max)
.unwrap_or(BlockHeight::from_u32(params.sapling_activation_height) - 1)
})?,
Expand All @@ -250,8 +251,15 @@ impl BlockDbImpl {
BlockProcessingMode::Validate => {
validate_chain(block, &mut prev_height, &mut prev_hash).await?;
},
BlockProcessingMode::Scan(data) => {
scan_cached_block(data, &params, &block, &mut from_height).await?;
BlockProcessingMode::Scan(data, z_balance_change_sender) => {
let tx_size = scan_cached_block(data, &params, &block, &mut from_height).await?;
// If there is/are transactions present in the current scanned block(s),
// we trigger a `Triggered` event to update the balance change.
if tx_size > 0 {
if let Some(mut sender) = z_balance_change_sender.clone() {
sender.send(()).await.expect("No receiver is available/dropped");
};
};
},
}
}
Expand Down
14 changes: 11 additions & 3 deletions mm2src/coins/z_coin/storage/blockdb/blockdb_sql_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::z_coin::ZcoinConsensusParams;
use common::async_blocking;
use db_common::sqlite::rusqlite::{params, Connection};
use db_common::sqlite::{query_single_row, run_optimization_pragmas, rusqlite};
use futures_util::SinkExt;
use itertools::Itertools;
use mm2_core::mm_ctx::MmArc;
use mm2_err_handle::prelude::*;
Expand Down Expand Up @@ -193,7 +194,7 @@ impl BlockDbImpl {
BlockProcessingMode::Validate => validate_from
.map(|(height, _)| height)
.unwrap_or(BlockHeight::from_u32(params.sapling_activation_height) - 1),
BlockProcessingMode::Scan(data) => {
BlockProcessingMode::Scan(data, _) => {
let data = data.inner();
data.block_height_extrema().await.map(|opt| {
opt.map(|(_, max)| max)
Expand Down Expand Up @@ -224,8 +225,15 @@ impl BlockDbImpl {
BlockProcessingMode::Validate => {
validate_chain(block, &mut prev_height, &mut prev_hash).await?;
},
BlockProcessingMode::Scan(data) => {
scan_cached_block(data, &params, &block, &mut from_height).await?;
BlockProcessingMode::Scan(data, z_balance_change_sender) => {
let tx_size = scan_cached_block(data, &params, &block, &mut from_height).await?;
// If there are transactions present in the current scanned block,
// we send a `Triggered` event to update the balance change.
if tx_size > 0 {
if let Some(mut sender) = z_balance_change_sender.clone() {
sender.send(()).await.expect("No receiver is available/dropped");
};
};
},
}
}
Expand Down
2 changes: 1 addition & 1 deletion mm2src/coins/z_coin/storage/walletdb/wasm/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ impl<'a> WalletIndexedDb {
Ok(db)
}

async fn lock_db(&self) -> ZcoinStorageRes<WalletDbInnerLocked<'_>> {
pub(crate) async fn lock_db(&self) -> ZcoinStorageRes<WalletDbInnerLocked<'_>> {
self.db
.get_or_initialize()
.await
Expand Down
File renamed without changes.
File renamed without changes.
110 changes: 110 additions & 0 deletions mm2src/coins/z_coin/z_balance_streaming.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
use crate::common::Future01CompatExt;
use crate::hd_wallet::AsyncMutex;
use crate::z_coin::ZCoin;
use crate::{MarketCoinOps, MmCoin};

use async_trait::async_trait;
use common::executor::{AbortSettings, SpawnAbortable};
use common::log::{error, info};
use futures::channel::mpsc::{UnboundedReceiver, UnboundedSender};
use futures::channel::oneshot;
use futures::channel::oneshot::{Receiver, Sender};
use futures_util::StreamExt;
use mm2_core::mm_ctx::MmArc;
use mm2_event_stream::behaviour::{EventBehaviour, EventInitStatus};
use mm2_event_stream::{Event, EventStreamConfiguration};
use std::sync::Arc;

pub type ZBalanceEventSender = UnboundedSender<()>;
pub type ZBalanceEventHandler = Arc<AsyncMutex<UnboundedReceiver<()>>>;

#[async_trait]
impl EventBehaviour for ZCoin {
const EVENT_NAME: &'static str = "COIN_BALANCE";
const ERROR_EVENT_NAME: &'static str = "COIN_BALANCE_ERROR";

async fn handle(self, _interval: f64, tx: Sender<EventInitStatus>) {
const RECEIVER_DROPPED_MSG: &str = "Receiver is dropped, which should never happen.";

macro_rules! send_status_on_err {
($match: expr, $sender: tt, $msg: literal) => {
match $match {
Some(t) => t,
None => {
$sender
.send(EventInitStatus::Failed($msg.to_owned()))
.expect(RECEIVER_DROPPED_MSG);
panic!("{}", $msg);
},
}
};
}

let ctx = send_status_on_err!(
MmArc::from_weak(&self.as_ref().ctx),
tx,
"MM context must have been initialized already."
);
let z_balance_change_handler = send_status_on_err!(
self.z_fields.z_balance_event_handler.as_ref(),
tx,
"Z balance change receiver can not be empty."
);

tx.send(EventInitStatus::Success).expect(RECEIVER_DROPPED_MSG);

// Locks the balance change handler, iterates through received events, and updates balance changes accordingly.
let mut bal = z_balance_change_handler.lock().await;
while (bal.next().await).is_some() {
match self.my_balance().compat().await {
Ok(balance) => {
let payload = json!({
"ticker": self.ticker(),
"address": self.my_z_address_encoded(),
"balance": { "spendable": balance.spendable, "unspendable": balance.unspendable }
});

ctx.stream_channel_controller
.broadcast(Event::new(Self::EVENT_NAME.to_string(), payload.to_string()))
.await;
},
Err(err) => {
let ticker = self.ticker();
error!("Failed getting balance for '{ticker}'. Error: {err}");
let e = serde_json::to_value(err).expect("Serialization should't fail.");
return ctx
.stream_channel_controller
.broadcast(Event::new(
format!("{}:{}", Self::ERROR_EVENT_NAME, ticker),
e.to_string(),
))
.await;
},
};
}
}

async fn spawn_if_active(self, config: &EventStreamConfiguration) -> EventInitStatus {
if let Some(event) = config.get_event(Self::EVENT_NAME) {
info!(
"{} event is activated for {} address {}. `stream_interval_seconds`({}) has no effect on this.",
Self::EVENT_NAME,
self.ticker(),
self.my_z_address_encoded(),
event.stream_interval_seconds
);

let (tx, rx): (Sender<EventInitStatus>, Receiver<EventInitStatus>) = oneshot::channel();
let fut = self.clone().handle(event.stream_interval_seconds, tx);
let settings =
AbortSettings::info_on_abort(format!("{} event is stopped for {}.", Self::EVENT_NAME, self.ticker()));
self.spawner().spawn_with_settings(fut, settings);

rx.await.unwrap_or_else(|e| {
EventInitStatus::Failed(format!("Event initialization status must be received: {}", e))
})
} else {
EventInitStatus::Inactive
}
}
}
36 changes: 22 additions & 14 deletions mm2src/coins/z_coin/z_coin_errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,7 @@ pub enum ZCoinBuildError {
ZCashParamsError(String),
ZDerivationPathNotSet,
SaplingParamsInvalidChecksum,
FailedSpawningBalanceEvents(String),
}

#[cfg(not(target_arch = "wasm32"))]
Expand All @@ -272,27 +273,32 @@ impl From<ZcoinClientInitError> for ZCoinBuildError {
fn from(err: ZcoinClientInitError) -> Self { ZCoinBuildError::RpcClientInitErr(err) }
}

#[cfg(not(target_arch = "wasm32"))]
pub(super) enum SqlTxHistoryError {
#[derive(Debug, Display)]
pub(crate) enum ZTxHistoryError {
#[cfg(not(target_arch = "wasm32"))]
Sql(SqliteError),
#[cfg(target_arch = "wasm32")]
IndexedDbError(String),
FromIdDoesNotExist(i64),
}

#[cfg(not(target_arch = "wasm32"))]
impl From<SqliteError> for SqlTxHistoryError {
fn from(err: SqliteError) -> Self { SqlTxHistoryError::Sql(err) }
impl From<ZTxHistoryError> for MyTxHistoryErrorV2 {
fn from(err: ZTxHistoryError) -> Self { MyTxHistoryErrorV2::StorageError(err.to_string()) }
}

#[cfg(not(target_arch = "wasm32"))]
impl From<SqlTxHistoryError> for MyTxHistoryErrorV2 {
fn from(err: SqlTxHistoryError) -> Self {
match err {
SqlTxHistoryError::Sql(sql) => MyTxHistoryErrorV2::StorageError(sql.to_string()),
SqlTxHistoryError::FromIdDoesNotExist(id) => {
MyTxHistoryErrorV2::StorageError(format!("from_id {} does not exist", id))
},
}
}
impl From<SqliteError> for ZTxHistoryError {
fn from(err: SqliteError) -> Self { ZTxHistoryError::Sql(err) }
}

#[cfg(target_arch = "wasm32")]
impl From<DbTransactionError> for ZTxHistoryError {
fn from(err: DbTransactionError) -> Self { ZTxHistoryError::IndexedDbError(err.to_string()) }
}

#[cfg(target_arch = "wasm32")]
impl From<CursorError> for ZTxHistoryError {
fn from(err: CursorError) -> Self { ZTxHistoryError::IndexedDbError(err.to_string()) }
}

pub(super) struct NoInfoAboutTx(pub(super) H256Json);
Expand All @@ -316,6 +322,7 @@ pub enum ZCoinBalanceError {
impl From<ZcoinStorageError> for ZCoinBalanceError {
fn from(value: ZcoinStorageError) -> Self { ZCoinBalanceError::BalanceError(value.to_string()) }
}

/// The `ValidateBlocksError` enum encapsulates different types of errors that may occur
/// during the validation and scanning process of zcoin blocks.
#[derive(Debug, Display)]
Expand All @@ -342,6 +349,7 @@ pub enum ValidateBlocksError {
impl From<ValidateBlocksError> for ZcoinStorageError {
fn from(value: ValidateBlocksError) -> Self { Self::ValidateBlocksError(value) }
}

impl From<MmError<ZcoinStorageError>> for ValidateBlocksError {
fn from(value: MmError<ZcoinStorageError>) -> Self { Self::ZcoinStorageError(value.to_string()) }
}
Expand Down
13 changes: 9 additions & 4 deletions mm2src/coins/z_coin/z_rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use zcash_primitives::zip32::ExtendedSpendingKey;
pub(crate) mod z_coin_grpc {
tonic::include_proto!("pirate.wallet.sdk.rpc");
}
use crate::z_coin::z_balance_streaming::ZBalanceEventSender;
use z_coin_grpc::compact_tx_streamer_client::CompactTxStreamerClient;
use z_coin_grpc::{ChainSpec, CompactBlock as TonicCompactBlock};

Expand Down Expand Up @@ -507,6 +508,7 @@ pub(super) async fn init_light_client<'a>(
sync_params: &Option<SyncStartPoint>,
skip_sync_params: bool,
z_spending_key: &ExtendedSpendingKey,
z_balance_event_sender: Option<ZBalanceEventSender>,
) -> Result<(AsyncMutex<SaplingSyncConnector>, WalletDbShared), MmError<ZcoinClientInitError>> {
let coin = builder.ticker.to_string();
let (sync_status_notifier, sync_watcher) = channel(1);
Expand Down Expand Up @@ -543,7 +545,7 @@ pub(super) async fn init_light_client<'a>(
WalletDbShared::new(builder, maybe_checkpoint_block, z_spending_key, continue_from_prev_sync).await?;
// Check min_height in blocks_db and rewind blocks_db to 0 if sync_height != min_height
if !continue_from_prev_sync && (sync_height != min_height) {
// let user know we're clearing cache and resyncing from new provided height.
// let user know we're clearing cache and re-syncing from new provided height.
if min_height > 0 {
info!("Older/Newer sync height detected!, rewinding blocks_db to new height: {sync_height:?}");
}
Expand All @@ -566,6 +568,7 @@ pub(super) async fn init_light_client<'a>(
is_pre_sapling: sync_height < sapling_activation_height,
actual: sync_height.max(sapling_activation_height),
},
z_balance_event_sender,
};

let abort_handle = spawn_abortable(light_wallet_db_sync_loop(sync_handle, Box::new(light_rpc_clients)));
Expand All @@ -582,6 +585,7 @@ pub(super) async fn init_native_client<'a>(
native_client: NativeClient,
blocks_db: BlockDbImpl,
z_spending_key: &ExtendedSpendingKey,
z_balance_event_sender: Option<ZBalanceEventSender>,
) -> Result<(AsyncMutex<SaplingSyncConnector>, WalletDbShared), MmError<ZcoinClientInitError>> {
let coin = builder.ticker.to_string();
let (sync_status_notifier, sync_watcher) = channel(1);
Expand Down Expand Up @@ -610,6 +614,7 @@ pub(super) async fn init_native_client<'a>(
scan_blocks_per_iteration: builder.z_coin_params.scan_blocks_per_iteration,
scan_interval_ms: builder.z_coin_params.scan_interval_ms,
first_sync_block,
z_balance_event_sender,
};
let abort_handle = spawn_abortable(light_wallet_db_sync_loop(sync_handle, Box::new(native_client)));

Expand Down Expand Up @@ -708,6 +713,7 @@ pub struct SaplingSyncLoopHandle {
scan_blocks_per_iteration: u32,
scan_interval_ms: u64,
first_sync_block: FirstSyncBlock,
z_balance_event_sender: Option<ZBalanceEventSender>,
}

impl SaplingSyncLoopHandle {
Expand Down Expand Up @@ -804,8 +810,7 @@ impl SaplingSyncLoopHandle {
}
}

let latest_block_height = blocks_db.get_latest_block().await?;
let current_block = BlockHeight::from_u32(latest_block_height);
let current_block = BlockHeight::from_u32(blocks_db.get_latest_block().await?);
loop {
match wallet_ops.block_height_extrema().await? {
Some((_, max_in_wallet)) => {
Expand All @@ -822,7 +827,7 @@ impl SaplingSyncLoopHandle {
blocks_db
.process_blocks_with_mode(
self.consensus_params.clone(),
BlockProcessingMode::Scan(scan),
BlockProcessingMode::Scan(scan, self.z_balance_event_sender.clone()),
None,
Some(self.scan_blocks_per_iteration),
)
Expand Down
Loading

0 comments on commit 27a9743

Please sign in to comment.