diff --git a/src/analytics/ledger/address_balance.rs b/src/analytics/ledger/address_balance.rs index 6c7861bfe..b584a85ab 100644 --- a/src/analytics/ledger/address_balance.rs +++ b/src/analytics/ledger/address_balance.rs @@ -1,7 +1,7 @@ // Copyright 2023 IOTA Stiftung // SPDX-License-Identifier: Apache-2.0 -use std::collections::HashMap; +use std::collections::{hash_map::Entry, HashMap}; use futures::prelude::stream::TryStreamExt; use iota_sdk::types::block::{payload::SignedTransactionPayload, protocol::ProtocolParameters, slot::SlotIndex}; @@ -53,7 +53,7 @@ impl AddressBalancesAnalytics { /// Initialize the analytics by reading the current ledger state. pub(crate) async fn init<'a>( protocol_parameters: &ProtocolParameters, - slot: SlotIndex, + _slot: SlotIndex, unspent_outputs: impl IntoIterator, db: &MongoDb, ) -> Result { @@ -61,14 +61,13 @@ impl AddressBalancesAnalytics { .collection() .drop(None) .await?; - let mut map = HashMap::new(); + let mut balances = HashMap::new(); for output in unspent_outputs { - *map.entry(output.locked_address_at(slot, protocol_parameters)) - .or_default() += output.amount(); + *balances.entry(output.locked_address(protocol_parameters)).or_default() += output.amount(); } - for (address, balance) in map { + for (address, balance) in balances { db.collection::() - .add_balance(&address, balance) + .insert_balance(&address, balance) .await?; } Ok(AddressBalancesAnalytics) @@ -87,22 +86,46 @@ impl Analytics for AddressBalancesAnalytics { created: &[LedgerOutput], ctx: &dyn AnalyticsContext, ) -> eyre::Result<()> { + let mut balances = HashMap::<_, u64>::new(); + for output in created { + let address = output.locked_address(ctx.protocol_parameters()); + let mut entry = balances.entry(address.clone()); + let balance = match entry { + Entry::Occupied(ref mut o) => o.get_mut(), + Entry::Vacant(v) => { + let balance = ctx + .database() + .collection::() + .get_balance(&address) + .await?; + v.insert(balance) + } + }; + *balance += output.amount(); + } for output in consumed { - ctx.database() - .collection::() - .remove_balance( - &output.output.locked_address(ctx.protocol_parameters()), - output.amount(), - ) - .await?; + let address = output.output.locked_address(ctx.protocol_parameters()); + let mut entry = balances.entry(address.clone()); + let balance = match entry { + Entry::Occupied(ref mut o) => o.get_mut(), + Entry::Vacant(v) => { + let balance = ctx + .database() + .collection::() + .get_balance(&address) + .await?; + v.insert(balance) + } + }; + *balance -= output.amount(); } - - for output in created { + for (address, balance) in balances { ctx.database() .collection::() - .add_balance(&output.locked_address(ctx.protocol_parameters()), output.amount()) + .insert_balance(&address, balance) .await?; } + Ok(()) } diff --git a/src/db/mongodb/collections/analytics/address_balance.rs b/src/db/mongodb/collections/analytics/address_balance.rs index e1c469650..41ba372d3 100644 --- a/src/db/mongodb/collections/analytics/address_balance.rs +++ b/src/db/mongodb/collections/analytics/address_balance.rs @@ -89,46 +89,17 @@ pub struct DistributionStat { } impl AddressBalanceCollection { - /// Add an amount of balance to the given address. - pub async fn add_balance(&self, address: &Address, amount: u64) -> Result<(), DbError> { + /// Insert a balance for an address. + pub async fn insert_balance(&self, address: &Address, balance: u64) -> Result<(), DbError> { self.update_one( doc! { "_id": AddressDto::from(address) }, - vec![doc! { "$set": { - "balance": { - "$toString": { "$add": [ - { "$toDecimal": { "$ifNull": [ "$balance", 0 ] } }, - { "$toDecimal": amount.to_string() } - ] } - } - } }], + doc! { "$set": { "balance": balance.to_string() } }, UpdateOptions::builder().upsert(true).build(), ) .await?; Ok(()) } - /// Remove an amount of balance from the given address. - pub async fn remove_balance(&self, address: &Address, amount: u64) -> Result<(), DbError> { - let address_dto = AddressDto::from(address); - self.update_one( - doc! { "_id": &address_dto }, - vec![doc! { "$set": { - "balance": { - "$toString": { "$subtract": [ - { "$toDecimal": { "$ifNull": [ "$balance", 0 ] } }, - { "$toDecimal": amount.to_string() } - ] } - } - } }], - None, - ) - .await?; - if self.get_balance(address).await? == 0 { - self.collection().delete_one(doc! { "_id": address_dto }, None).await?; - } - Ok(()) - } - /// Get the balance of an address. pub async fn get_balance(&self, address: &Address) -> Result { Ok(self