Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: unbonds withdraws improvements #66

Merged
merged 8 commits into from
Jul 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 12 additions & 6 deletions chain/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use chain::repository;
use chain::services::db::get_pos_crawler_state;
use chain::services::namada::{
query_all_balances, query_all_bonds_and_unbonds, query_all_proposals,
query_last_block_height,
query_bonds, query_last_block_height,
};
use chain::services::{
db as db_service, namada as namada_service,
Expand Down Expand Up @@ -172,9 +172,7 @@ async fn crawling_fn(
tracing::info!("Creating {} governance votes...", proposals_votes.len());

let addresses = block.bond_addresses();
let bonds = namada_service::query_bonds(&client, addresses, epoch)
.await
.into_rpc_error()?;
let bonds = query_bonds(&client, addresses).await.into_rpc_error()?;
tracing::info!("Updating bonds for {} addresses", bonds.len());

let addresses = block.unbond_addresses();
Expand All @@ -183,6 +181,8 @@ async fn crawling_fn(
.into_rpc_error()?;
tracing::info!("Updating unbonds for {} addresses", unbonds.len());

let withdraw_addreses = block.withdraw_addresses();

let revealed_pks = block.revealed_pks();
tracing::info!(
"Updating revealed pks for {} addresses",
Expand Down Expand Up @@ -218,6 +218,11 @@ async fn crawling_fn(

repository::pos::insert_bonds(transaction_conn, bonds)?;
repository::pos::insert_unbonds(transaction_conn, unbonds)?;
repository::pos::remove_withdraws(
transaction_conn,
epoch,
withdraw_addreses,
)?;

repository::pos::delete_claimed_rewards(
transaction_conn,
Expand Down Expand Up @@ -284,8 +289,9 @@ async fn initial_query(
let balances = query_all_balances(client).await.into_rpc_error()?;

tracing::info!("Querying bonds and unbonds...");
let (bonds, unbonds) =
query_all_bonds_and_unbonds(client).await.into_rpc_error()?;
let (bonds, unbonds) = query_all_bonds_and_unbonds(client, None, None)
.await
.into_rpc_error()?;

tracing::info!("Querying proposals...");
let proposals = query_all_proposals(client).await.into_rpc_error()?;
Expand Down
39 changes: 37 additions & 2 deletions chain/src/repository/pos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,17 @@ use std::collections::HashSet;
use anyhow::Context;
use diesel::upsert::excluded;
use diesel::{
ExpressionMethods, PgConnection, QueryDsl, RunQueryDsl, SelectableHelper,
BoolExpressionMethods, ExpressionMethods, PgConnection, QueryDsl,
RunQueryDsl, SelectableHelper,
};
use orm::bond::BondInsertDb;
use orm::schema::{bonds, pos_rewards, unbonds, validators};
use orm::unbond::UnbondInsertDb;
use orm::validators::{ValidatorDb, ValidatorUpdateMetadataDb};
use shared::block::Epoch;
use shared::bond::Bonds;
use shared::id::Id;
use shared::unbond::Unbonds;
use shared::unbond::{UnbondAddresses, Unbonds};
use shared::validator::ValidatorMetadataChange;

pub fn insert_bonds(
Expand Down Expand Up @@ -91,6 +93,39 @@ pub fn insert_unbonds(
anyhow::Ok(())
}

pub fn remove_withdraws(
transaction_conn: &mut PgConnection,
current_epoch: Epoch,
unbond_addresses: Vec<UnbondAddresses>,
) -> anyhow::Result<()> {
let sources = unbond_addresses
.iter()
.map(|unbond| unbond.source.to_string())
.collect::<Vec<String>>();

let validators = unbond_addresses
.iter()
.map(|unbond| unbond.validator.to_string())
.collect::<Vec<String>>();

diesel::delete(
unbonds::table.filter(
unbonds::columns::address
.eq_any(sources)
.and(unbonds::columns::validator_id.eq_any(
validators::table.select(validators::columns::id).filter(
validators::columns::namada_address.eq_any(validators),
),
))
.and(unbonds::columns::withdraw_epoch.le(current_epoch as i32)),
),
)
.execute(transaction_conn)
.context("Failed to remove withdraws from db")?;

anyhow::Ok(())
}

pub fn delete_claimed_rewards(
transaction_conn: &mut PgConnection,
reward_claimers: HashSet<Id>,
Expand Down
130 changes: 72 additions & 58 deletions chain/src/services/namada.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,8 @@ pub async fn query_last_block_height(
// parallel)
pub async fn query_all_bonds_and_unbonds(
client: &HttpClient,
source: Option<Id>,
target: Option<Id>,
) -> anyhow::Result<(Bonds, Unbonds)> {
type Source = NamadaSdkAddress;
type Validator = NamadaSdkAddress;
Expand All @@ -173,9 +175,13 @@ pub async fn query_all_bonds_and_unbonds(
type UnbondKey = (Source, Validator, WithdrawEpoch);
type UnbondsMap = HashMap<UnbondKey, NamadaSdkAmount>;

let bonds_and_unbonds = bonds_and_unbonds(client, &None, &None)
.await
.context("Failed to query all bonds and unbonds")?;
let bonds_and_unbonds = bonds_and_unbonds(
client,
&source.map(NamadaSdkAddress::from),
&target.map(NamadaSdkAddress::from),
)
.await
.context("Failed to query all bonds and unbonds")?;

let mut bonds: BondsMap = HashMap::new();
let mut unbonds: UnbondsMap = HashMap::new();
Expand Down Expand Up @@ -206,8 +212,6 @@ pub async fn query_all_bonds_and_unbonds(
}
}

// TODO: we can iter in parallel

// Map the types, mostly because we can't add indexer amounts
let bonds = bonds
.into_iter()
Expand Down Expand Up @@ -320,84 +324,98 @@ pub async fn query_next_governance_id(
pub async fn query_bonds(
client: &HttpClient,
addresses: Vec<BondAddresses>,
epoch: Epoch,
) -> anyhow::Result<Bonds> {
let pos_parameters = rpc::get_pos_params(client)
.await
.with_context(|| "Failed to query pos parameters".to_string())?;
let pipeline_length = pos_parameters.pipeline_len as u32;

let bonds = futures::stream::iter(addresses)
.filter_map(|BondAddresses { source, target }| {
let source = NamadaSdkAddress::from_str(&source.to_string())
.expect("Failed to parse source address");
let target = NamadaSdkAddress::from_str(&target.to_string())
.expect("Failed to parse target address");

async {
let amount = RPC
.vp()
.pos()
.bond_with_slashing(
client,
&source,
&target,
&Some(to_epoch(epoch + pipeline_length)),
)
let nested_bonds = futures::stream::iter(addresses)
.filter_map(|BondAddresses { source, target }| async move {
// TODO: if this is too slow do not use query_all_bonds_and_unbonds
let (bonds, _) =
query_all_bonds_and_unbonds(client, Some(source), Some(target))
.await
.context("Failed to query bond amount")
.context("Failed to query all bonds and unbonds")
.ok()?;

Some(Bond {
source: Id::from(source),
target: Id::from(target),
amount: Amount::from(amount),
start: epoch + pipeline_length,
})
}
Some(bonds)
})
.map(futures::future::ready)
.buffer_unordered(20)
.collect::<Vec<_>>()
.await;

let bonds = nested_bonds.iter().flatten().cloned().collect();

anyhow::Ok(bonds)
}

pub async fn query_unbonds(
client: &HttpClient,
addresses: Vec<UnbondAddresses>,
) -> anyhow::Result<Unbonds> {
let unbonds = futures::stream::iter(addresses)
.filter_map(|UnbondAddresses { source, validator }| async move {
let nested_unbonds = futures::stream::iter(addresses)
.filter_map(|UnbondAddresses { source, validator }| {
let source = NamadaSdkAddress::from_str(&source.to_string())
.expect("Failed to parse source address");
let validator = NamadaSdkAddress::from_str(&validator.to_string())
.expect("Failed to parse validator address");

let res = RPC
.vp()
.pos()
.unbond_with_slashing(client, &source, &validator)
.await
.context("Failed to query unbond amount")
.ok()?;

let ((_, withdraw_epoch), amount) =
res.last().expect("Unbonds are empty");
async move {
let unbonds = RPC
.vp()
.pos()
.unbond_with_slashing(client, &source, &validator)
.await
.context("Failed to query unbond amount")
.ok()?;

Some(Unbond {
source: Id::from(source),
target: Id::from(validator),
amount: Amount::from(*amount),
withdraw_at: withdraw_epoch.0 as Epoch,
})
let mut unbonds_map: HashMap<(Id, Id, Epoch), Amount> =
HashMap::new();

for ((_, withdraw_epoch), amount) in unbonds {
let record = unbonds_map.get_mut(&(
Id::from(source.clone()),
Id::from(validator.clone()),
withdraw_epoch.0 as Epoch,
));

// We have to merge the unbonds with the same withdraw
// epoch into one otherwise we can't
// insert them into the db
match record {
Some(r) => {
*r = r.checked_add(&Amount::from(amount)).unwrap();
}
None => {
unbonds_map.insert(
(
Id::from(source.clone()),
Id::from(validator.clone()),
withdraw_epoch.0 as Epoch,
),
Amount::from(amount),
);
}
}
}

let unbonds: Vec<Unbond> = unbonds_map
.into_iter()
.map(|((source, target, start), amount)| Unbond {
source,
target,
amount,
withdraw_at: start,
})
.collect();

Some(unbonds)
}
})
.map(futures::future::ready)
.buffer_unordered(20)
.collect::<Vec<_>>()
.await;

let unbonds = nested_unbonds.iter().flatten().cloned().collect();

anyhow::Ok(unbonds)
}

Expand Down Expand Up @@ -490,7 +508,3 @@ pub async fn query_all_votes(
fn to_block_height(block_height: u32) -> NamadaSdkBlockHeight {
NamadaSdkBlockHeight::from(block_height as u64)
}

fn to_epoch(epoch: u32) -> NamadaSdkEpoch {
NamadaSdkEpoch::from(epoch as u64)
}
3 changes: 2 additions & 1 deletion orm/migrations/2024-05-31-125032_chain_parameters/up.sql
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ CREATE TABLE chain_parameters (
apr VARCHAR NOT NULL,
native_token_address VARCHAR NOT NULL,
chain_id VARCHAR NOT NULL,
genesis_time BIGINT NOT NULL
genesis_time BIGINT NOT NULL,
epoch_switch_blocks_delay INT NOT NULL
);

ALTER TABLE chain_parameters ADD UNIQUE (chain_id);
3 changes: 2 additions & 1 deletion orm/src/balances.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use std::str::FromStr;

use bigdecimal::BigDecimal;
use diesel::{Insertable, Queryable, Selectable};
use shared::balance::Balance;
use std::str::FromStr;

use crate::schema::balances;

Expand Down
41 changes: 40 additions & 1 deletion orm/src/group_by_macros.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use crate::schema::{bonds, validators};
use diesel::allow_columns_to_appear_in_same_group_by_clause;
use diesel::expression::{SqlLiteral, ValidGrouping};

use crate::schema::{bonds, unbonds, validators};

allow_columns_to_appear_in_same_group_by_clause!(
bonds::address,
Expand All @@ -16,3 +18,40 @@ allow_columns_to_appear_in_same_group_by_clause!(
validators::avatar,
validators::state,
);

macro_rules! impl_valid_grouping {
($valid_grouping_type:ty, $column:path) => {
impl ValidGrouping<$valid_grouping_type> for $column {
type IsAggregate = diesel::expression::is_aggregate::Yes;
}
};

($valid_grouping_type:ty, $column:path, $($columns:path),+) => {
impl ValidGrouping<$valid_grouping_type> for $column {
type IsAggregate = diesel::expression::is_aggregate::Yes;
}

impl_valid_grouping!($valid_grouping_type, $($columns),+);
};
}

impl_valid_grouping!(
(
unbonds::address,
validators::id,
SqlLiteral<diesel::sql_types::Integer>
),
unbonds::address,
validators::id,
validators::namada_address,
validators::voting_power,
validators::max_commission,
validators::commission,
validators::name,
validators::email,
validators::website,
validators::description,
validators::discord_handle,
validators::avatar,
validators::state
);
Loading
Loading