Skip to content

Commit

Permalink
use database for tracking address balance and only return latest anal…
Browse files Browse the repository at this point in the history
…ytics for richest addresses and token distribution endpoints
  • Loading branch information
Alex Coats committed Feb 28, 2024
1 parent 5fc8c22 commit e07404b
Show file tree
Hide file tree
Showing 28 changed files with 566 additions and 419 deletions.
40 changes: 0 additions & 40 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ uuid = { version = "1.3", default-features = false, features = [ "v4" ] }
# Optional
chrono = { version = "0.4", default-features = false, features = [ "std" ], optional = true }
influxdb = { version = "0.7", default-features = false, features = [ "use-serde", "reqwest-client-rustls", "derive" ], optional = true }
rayon = { version = "1.8", default-features = false }

# API
auth-helper = { version = "0.3", default-features = false, optional = true }
Expand Down
7 changes: 1 addition & 6 deletions documentation/api/api-explorer.yml
Original file line number Diff line number Diff line change
Expand Up @@ -221,10 +221,7 @@ paths:
tags:
- ledger
summary: Returns the top richest addresses.
description: >-
Returns the top richest addresses at the ledger state specified by the provided index.
parameters:
- $ref: "#/components/parameters/ledgerIndex"
- $ref: "#/components/parameters/top"
responses:
"200":
Expand All @@ -248,9 +245,7 @@ paths:
- ledger
summary: Returns the current token distribution.
description: >-
Returns the distribution of IOTA tokens at the ledger state specified by the provided index.
parameters:
- $ref: "#/components/parameters/ledgerIndex"
Returns the latest distribution of IOTA tokens.
responses:
"200":
description: Successful operation.
Expand Down
12 changes: 7 additions & 5 deletions src/analytics/ledger/active_addresses.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,33 +58,35 @@ impl IntervalAnalytics for AddressActivityMeasurement {
}
}

#[async_trait::async_trait]
impl Analytics for AddressActivityAnalytics {
type Measurement = AddressActivityMeasurement;

fn handle_transaction(
async fn handle_transaction(
&mut self,
_payload: &SignedTransactionPayload,
consumed: &[LedgerSpent],
created: &[LedgerOutput],
ctx: &dyn AnalyticsContext,
) {
) -> eyre::Result<()> {
for output in consumed {
self.add_address(output.output.locked_address(ctx.protocol_parameters()));
}

for output in created {
self.add_address(output.locked_address(ctx.protocol_parameters()));
}
Ok(())
}

fn take_measurement(&mut self, _ctx: &dyn AnalyticsContext) -> Self::Measurement {
AddressActivityMeasurement {
async fn take_measurement(&mut self, _ctx: &dyn AnalyticsContext) -> eyre::Result<Self::Measurement> {
Ok(AddressActivityMeasurement {
ed25519_count: std::mem::take(&mut self.ed25519_addresses).len(),
account_count: std::mem::take(&mut self.account_addresses).len(),
nft_count: std::mem::take(&mut self.nft_addresses).len(),
anchor_count: std::mem::take(&mut self.anchor_addresses).len(),
implicit_count: std::mem::take(&mut self.implicit_addresses).len(),
}
})
}
}

Expand Down
203 changes: 85 additions & 118 deletions src/analytics/ledger/address_balance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,23 @@

use std::collections::HashMap;

use iota_sdk::types::block::{
address::{AccountAddress, Address, AnchorAddress, Ed25519Address, ImplicitAccountCreationAddress, NftAddress},
payload::SignedTransactionPayload,
protocol::ProtocolParameters,
slot::SlotIndex,
};
use futures::prelude::stream::TryStreamExt;
use iota_sdk::types::block::{payload::SignedTransactionPayload, protocol::ProtocolParameters, slot::SlotIndex};
use serde::{Deserialize, Serialize};

use crate::{
analytics::{Analytics, AnalyticsContext},
model::ledger::{LedgerOutput, LedgerSpent},
db::{
mongodb::{collections::AddressBalanceCollection, DbError},
MongoDb, MongoDbCollection,
},
model::{
address::AddressDto,
ledger::{LedgerOutput, LedgerSpent},
},
};

#[derive(Debug)]
#[derive(Debug, Default)]
pub(crate) struct AddressBalanceMeasurement {
pub(crate) ed25519_address_with_balance_count: usize,
pub(crate) account_address_with_balance_count: usize,
Expand All @@ -43,145 +46,109 @@ pub(crate) struct DistributionStat {

/// Computes the number of addresses the currently hold a balance.
#[derive(Serialize, Deserialize, Default)]
pub(crate) struct AddressBalancesAnalytics {
ed25519_balances: HashMap<Ed25519Address, u64>,
account_balances: HashMap<AccountAddress, u64>,
nft_balances: HashMap<NftAddress, u64>,
anchor_balances: HashMap<AnchorAddress, u64>,
implicit_balances: HashMap<ImplicitAccountCreationAddress, u64>,
}
pub(crate) struct AddressBalancesAnalytics;

impl AddressBalancesAnalytics {
/// Initialize the analytics by reading the current ledger state.
pub(crate) fn init<'a>(
pub(crate) async fn init<'a>(
protocol_parameters: &ProtocolParameters,
slot: SlotIndex,
unspent_outputs: impl IntoIterator<Item = &'a LedgerOutput>,
) -> Self {
let mut balances = AddressBalancesAnalytics::default();
db: &MongoDb,
) -> Result<Self, DbError> {
db.collection::<AddressBalanceCollection>()
.collection()
.drop(None)
.await?;
let mut map = HashMap::new();
for output in unspent_outputs {
balances.add_address(output.locked_address_at(slot, protocol_parameters), output.amount());
*map.entry(output.locked_address_at(slot, protocol_parameters))
.or_default() += output.amount();
}
balances
}

fn add_address(&mut self, address: Address, output_amount: u64) {
match address {
Address::Ed25519(a) => *self.ed25519_balances.entry(a).or_default() += output_amount,
Address::Account(a) => *self.account_balances.entry(a).or_default() += output_amount,
Address::Nft(a) => *self.nft_balances.entry(a).or_default() += output_amount,
Address::Anchor(a) => *self.anchor_balances.entry(a).or_default() += output_amount,
Address::ImplicitAccountCreation(a) => *self.implicit_balances.entry(a).or_default() += output_amount,
_ => (),
}
}

fn remove_amount(&mut self, address: &Address, output_amount: u64) {
match address {
Address::Ed25519(a) => {
if let Some(amount) = self.ed25519_balances.get_mut(a) {
*amount -= output_amount;
if *amount == 0 {
self.ed25519_balances.remove(a);
}
}
}
Address::Account(a) => {
if let Some(amount) = self.account_balances.get_mut(a) {
*amount -= output_amount;
if *amount == 0 {
self.account_balances.remove(a);
}
}
}
Address::Nft(a) => {
if let Some(amount) = self.nft_balances.get_mut(a) {
*amount -= output_amount;
if *amount == 0 {
self.nft_balances.remove(a);
}
}
}
Address::Anchor(a) => {
if let Some(amount) = self.anchor_balances.get_mut(a) {
*amount -= output_amount;
if *amount == 0 {
self.anchor_balances.remove(a);
}
}
}
Address::ImplicitAccountCreation(a) => {
if let Some(amount) = self.implicit_balances.get_mut(a) {
*amount -= output_amount;
if *amount == 0 {
self.implicit_balances.remove(a);
}
}
}
_ => (),
for (address, balance) in map {
db.collection::<AddressBalanceCollection>()
.add_balance(&address, balance)
.await?;
}
Ok(AddressBalancesAnalytics)
}
}

#[async_trait::async_trait]
impl Analytics for AddressBalancesAnalytics {
type Measurement = AddressBalanceMeasurement;

fn handle_transaction(
async fn handle_transaction(
&mut self,
_payload: &SignedTransactionPayload,
consumed: &[LedgerSpent],
created: &[LedgerOutput],
ctx: &dyn AnalyticsContext,
) {
) -> eyre::Result<()> {
for output in consumed {
self.remove_amount(
&output.output.locked_address(ctx.protocol_parameters()),
output.amount(),
);
ctx.database()
.collection::<AddressBalanceCollection>()
.remove_balance(
&output.output.locked_address(ctx.protocol_parameters()),
output.amount(),
)
.await?;
}

for output in created {
self.add_address(output.locked_address(ctx.protocol_parameters()), output.amount())
ctx.database()
.collection::<AddressBalanceCollection>()
.add_balance(&output.locked_address(ctx.protocol_parameters()), output.amount())
.await?;
}
Ok(())
}

fn take_measurement(&mut self, ctx: &dyn AnalyticsContext) -> Self::Measurement {
async fn take_measurement(&mut self, ctx: &dyn AnalyticsContext) -> eyre::Result<Self::Measurement> {
let bucket_max = ctx.protocol_parameters().token_supply().ilog10() as usize + 1;
let mut token_distribution = vec![DistributionStat::default(); bucket_max];

// Balances are partitioned into ranges defined by: [10^index..10^(index+1)).
for amount in self.ed25519_balances.values() {
let index = amount.ilog10() as usize;
token_distribution[index].ed25519_count += 1;
token_distribution[index].ed25519_amount += *amount;
}
for amount in self.account_balances.values() {
let index = amount.ilog10() as usize;
token_distribution[index].account_count += 1;
token_distribution[index].account_amount += *amount;
}
for amount in self.nft_balances.values() {
let index = amount.ilog10() as usize;
token_distribution[index].nft_count += 1;
token_distribution[index].nft_amount += *amount;
}
for amount in self.anchor_balances.values() {
let index = amount.ilog10() as usize;
token_distribution[index].anchor_count += 1;
token_distribution[index].anchor_amount += *amount;
}
for amount in self.implicit_balances.values() {
let index = amount.ilog10() as usize;
token_distribution[index].implicit_count += 1;
token_distribution[index].implicit_amount += *amount;
}
AddressBalanceMeasurement {
ed25519_address_with_balance_count: self.ed25519_balances.len(),
account_address_with_balance_count: self.account_balances.len(),
nft_address_with_balance_count: self.nft_balances.len(),
anchor_address_with_balance_count: self.anchor_balances.len(),
implicit_address_with_balance_count: self.implicit_balances.len(),
token_distribution,
let mut balances = AddressBalanceMeasurement {
token_distribution: vec![DistributionStat::default(); bucket_max],
..Default::default()
};
let mut balances_stream = ctx
.database()
.collection::<AddressBalanceCollection>()
.get_all_balances()
.await?;
while let Some(rec) = balances_stream.try_next().await? {
// Balances are partitioned into ranges defined by: [10^index..10^(index+1)).
let index = rec.balance.ilog10() as usize;
match rec.address {
AddressDto::Ed25519(_) => {
balances.ed25519_address_with_balance_count += 1;
balances.token_distribution[index].ed25519_count += 1;
balances.token_distribution[index].ed25519_amount += rec.balance;
}
AddressDto::Account(_) => {
balances.account_address_with_balance_count += 1;
balances.token_distribution[index].account_count += 1;
balances.token_distribution[index].account_amount += rec.balance;
}
AddressDto::Nft(_) => {
balances.nft_address_with_balance_count += 1;
balances.token_distribution[index].nft_count += 1;
balances.token_distribution[index].nft_amount += rec.balance;
}
AddressDto::Anchor(_) => {
balances.anchor_address_with_balance_count += 1;
balances.token_distribution[index].anchor_count += 1;
balances.token_distribution[index].anchor_amount += rec.balance;
}
AddressDto::ImplicitAccountCreation(_) => {
balances.implicit_address_with_balance_count += 1;
balances.token_distribution[index].implicit_count += 1;
balances.token_distribution[index].implicit_amount += rec.balance;
}
_ => (),
}
}

Ok(balances)
}
}
Loading

0 comments on commit e07404b

Please sign in to comment.