Skip to content

Commit

Permalink
indexing for delegations, undelegations, and rewards
Browse files Browse the repository at this point in the history
  • Loading branch information
avahowell committed Sep 20, 2024
1 parent 7f08fcc commit a0d8d3b
Show file tree
Hide file tree
Showing 3 changed files with 220 additions and 24 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/bin/pindexer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ penumbra-shielded-pool = {workspace = true, default-features = false}
penumbra-stake = {workspace = true, default-features = false}
penumbra-app = {workspace = true, default-features = false}
penumbra-dex = {workspace = true, default-features = false}
penumbra-keys = {workspace = true, default-features = false}
penumbra-governance = {workspace = true, default-features = false}
penumbra-num = {workspace = true, default-features = false}
penumbra-asset = {workspace = true, default-features = false}
Expand Down
242 changes: 218 additions & 24 deletions crates/bin/pindexer/src/supply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ use anyhow::{anyhow, Context, Result};
use cometindex::{async_trait, sqlx, AppView, ContextualizedEvent, PgTransaction};
use penumbra_app::genesis::AppState;
use penumbra_asset::asset;
use penumbra_keys::Address;
use penumbra_num::Amount;
use penumbra_proto::{core::component::sct::v1 as pb, event::ProtoEvent};
use penumbra_proto::{event::ProtoEvent, penumbra::core::component::funding::v1 as pb};
use penumbra_stake::{validator::Validator, IdentityKey};
use sqlx::{types::chrono::DateTime, PgPool, Postgres, Transaction};
use std::str::FromStr;

/// Supply-relevant events.
/// The supply of the native staking token can change:
Expand All @@ -21,32 +23,211 @@ use sqlx::{types::chrono::DateTime, PgPool, Postgres, Transaction};
enum Event {
/// A parsed version of [pb::EventUndelegate]
Undelegate {
height: u64,
identity_key: IdentityKey,
unbonded_amount: Amount,
},
/// A parsed version of [pb::EventDelegate]
Delegate {
height: u64,
identity_key: IdentityKey,
amount: Amount,
}, // TKTK....
},
/// A parsed version of [pb::EventFundingStreamReward]
FundingStreamReward {
height: u64,
recipient: Address,
epoch_index: u64,
reward_amount: Amount,
},
}

impl Event {
const NAMES: [&'static str; 2] = [
const NAMES: [&'static str; 4] = [
"penumbra.core.component.stake.v1.EventUndelegate",
"penumbra.core.component.stake.v1.EventDelegate",
"penumbra.core.component.stake.v1.EventRateDataChange",
"penumbra.core.component.funding.v1.EventFundingStreamReward",
];

async fn index<'d>(&self, dbtx: &mut Transaction<'d, Postgres>) -> anyhow::Result<()> {
match self {
Event::Delegate {
identity_key: _,
amount: _,
} => Ok(()),
height,
identity_key,
amount,
} => {
let previous_total_um: i64 = sqlx::query_scalar(
r#"SELECT total_um FROM supply_total_unstaked ORDER BY height DESC LIMIT 1"#,
)
.fetch_optional(dbtx.as_mut())
.await?
.unwrap_or(0);

let new_total_um = previous_total_um - amount.value() as i64;

sqlx::query(
r#"INSERT INTO supply_total_unstaked (height, total_um) VALUES ($1, $2)"#,
)
.bind(*height as i64)
.bind(new_total_um)
.execute(dbtx.as_mut())
.await?;

// Update supply_total_staked
let prev_um_eq_delegations: i64 = sqlx::query_scalar(
r"SELECT um_equivalent_delegations FROM supply_total_staked WHERE validator_id = $1 ORDER BY height DESC LIMIT 1"
)
.bind(identity_key.to_string())
.fetch_optional(dbtx.as_mut())
.await?
.unwrap_or(0);
let prev_delegations: i64 = sqlx::query_scalar(
"SELECT delegations FROM supply_total_staked WHERE validator_id = $1 ORDER BY height DESC LIMIT 1"
)
.bind(identity_key.to_string())
.fetch_optional(dbtx.as_mut())
.await?
.unwrap_or(0);

let new_um_eq_delegations = prev_um_eq_delegations + amount.value() as i64;

let new_delegations = if prev_um_eq_delegations == 0 {
amount.value() as i64
} else {
let delta_delegations = (amount.value() as f64 / prev_um_eq_delegations as f64)
* prev_delegations as f64;
prev_delegations + delta_delegations as i64
};

sqlx::query(
r#"INSERT INTO supply_total_staked (height, validator_id, um_equivalent_delegations, delegations) VALUES ($1, $2, $3, $4)"#,
)
.bind(*height as i64)
.bind(identity_key.to_string())
.bind(new_um_eq_delegations)
.bind(new_delegations)
.execute(dbtx.as_mut())
.await?;

Ok(())
}
Event::Undelegate {
identity_key: _,
unbonded_amount: _,
} => Ok(()),
height,
identity_key,
unbonded_amount,
} => {
let previous_total_um: i64 = sqlx::query_scalar(
r#"SELECT total_um FROM supply_total_unstaked ORDER BY height DESC LIMIT 1"#,
)
.fetch_optional(dbtx.as_mut())
.await?
.unwrap_or(0);

let new_total_um = previous_total_um + unbonded_amount.value() as i64;

sqlx::query(
r#"INSERT INTO supply_total_unstaked (height, total_um) VALUES ($1, $2)"#,
)
.bind(*height as i64)
.bind(new_total_um)
.execute(dbtx.as_mut())
.await?;

let prev_um_eq_delegations: i64 = sqlx::query_scalar(
r"SELECT um_equivalent_delegations FROM supply_total_staked WHERE validator_id = $1 ORDER BY height DESC LIMIT 1"
)
.bind(identity_key.to_string())
.fetch_optional(dbtx.as_mut())
.await?
.unwrap_or(0);

let prev_delegations: i64 = sqlx::query_scalar(
"SELECT delegations FROM supply_total_staked WHERE validator_id = $1 ORDER BY height DESC LIMIT 1"
)
.bind(identity_key.to_string())
.fetch_optional(dbtx.as_mut())
.await?
.unwrap_or(0);

let new_um_eq_delegations = prev_um_eq_delegations - unbonded_amount.value() as i64;

if prev_um_eq_delegations == 0 {
return Err(anyhow::anyhow!(
"Previous um_equivalent_delegations is zero"
));
}

let delta_delegations = (unbonded_amount.value() as f64
/ prev_um_eq_delegations as f64)
* prev_delegations as f64;
let new_delegations = prev_delegations - delta_delegations as i64;

sqlx::query(
r#"INSERT INTO supply_total_staked (height, validator_id, um_equivalent_delegations, delegations) VALUES ($1, $2, $3, $4)"#,
)
.bind(*height as i64)
.bind(identity_key.to_string())
.bind(new_um_eq_delegations)
.bind(new_delegations)
.execute(dbtx.as_mut())
.await?;

Ok(())
}
Event::FundingStreamReward {
height,
recipient: _,
epoch_index: _,
reward_amount,
} => {
let prev_unstaked: i64 = sqlx::query_scalar(
"SELECT total_um FROM supply_total_unstaked ORDER BY height DESC LIMIT 1",
)
.fetch_optional(dbtx.as_mut())
.await?
.ok_or(anyhow!("couldnt look up the previous supply"))?;

let new_unstaked = prev_unstaked + reward_amount.value() as i64;

sqlx::query("INSERT INTO supply_total_unstaked (height, total_um) VALUES ($1, $2)")
.bind(*height as i64)
.bind(new_unstaked)
.execute(dbtx.as_mut())
.await?;

Ok(())
}
}
}
}

impl<'a> TryFrom<&'a ContextualizedEvent> for Event {
type Error = anyhow::Error;

fn try_from(event: &'a ContextualizedEvent) -> Result<Self, Self::Error> {
match event.event.kind.as_str() {
// undelegation
x if x == Event::NAMES[0] => {}
// delegation
x if x == Event::NAMES[1] => {}
// funding stream reward
x if x == Event::NAMES[2] => {
let pe = pb::EventFundingStreamReward::from_event(event.as_ref())?;
let recipient = Address::from_str(&pe.recipient)?;
let epoch_index = pe.epoch_index;
let reward_amount = Amount::try_from(
pe.reward_amount
.ok_or(anyhow!("event missing in funding stream reward"))?,
)?;
Ok(Self::FundingStreamReward {
height: event.block_height,
recipient,
epoch_index,
reward_amount,
})
}
x => Err(anyhow!(format!("unrecognized event kind: {x}"))),
}
}
}
Expand All @@ -60,17 +241,22 @@ async fn add_genesis_native_token_allocation_supply<'a>(
.content()
.ok_or_else(|| anyhow::anyhow!("cannot initialized indexer from checkpoint genesis"))?;

let mut native_token_sum: Amount = Amount::zero();
let mut unstaked_native_token_sum: Amount = Amount::zero();
for allo in &content.shielded_pool_content.allocations {
if allo.denom().base_denom().denom == "upenumbra" {
let value = allo.value();
native_token_sum = native_token_sum.checked_add(&value.amount).unwrap();
unstaked_native_token_sum = unstaked_native_token_sum
.checked_add(&value.amount)
.unwrap();
}
}

// Given a genesis validator, we need to figure out its delegations at
// genesis by getting its delegation token then summing up all the allocations.
// Build up a table of the total allocations first.
sqlx::query("INSERT INTO supply_total_unstaked (height, total_um) VALUES ($1, $2)")
.bind(0i64)
.bind(unstaked_native_token_sum.value() as i64)
.execute(dbtx.as_mut())
.await?;

let mut allos = BTreeMap::<asset::Id, Amount>::new();
for allo in &content.shielded_pool_content.allocations {
let value = allo.value();
Expand All @@ -82,17 +268,17 @@ async fn add_genesis_native_token_allocation_supply<'a>(

// at genesis, assume a 1:1 ratio between delegation amount and native token amount.
for val in &content.stake_content.validators {
// FIXME: this shouldn't be a proto type but now that has been propagated
// all through the rest of the code for no reason
let val = Validator::try_from(val.clone())?;
let delegation_amount = allos.get(&val.token().id()).cloned().unwrap_or_default();
native_token_sum = native_token_sum + delegation_amount;
}

sqlx::query("INSERT INTO supply_initial_genesis (value) VALUES ($1)")
.bind(native_token_sum.value() as i64)
.execute(dbtx.as_mut())
.await?;
sqlx::query("INSERT INTO supply_total_staked (height, validator_id, um_equivalent_delegations, delegations) VALUES ($1, $2, $3, $4)")
.bind(0i64)
.bind(val.identity_key.to_string())
.bind(delegation_amount.value() as i64)
.bind(delegation_amount.value() as i64)
.execute(dbtx.as_mut())
.await?;
}

Ok(())
}
Expand All @@ -119,9 +305,17 @@ impl AppView for Supply {
sqlx::query(
// table name is module path + struct name
"
CREATE TABLE IF NOT EXISTS supply_initial_genesis (
value BIGINT PRIMARY KEY,
CREATE TABLE IF NOT EXISTS supply_total_unstaked (
height BIGINT PRIMARY KEY,
total_um BIGINT NOT NULL,
);
CREATE TABLE IF NOT EXISTS supply_total_staked (
height BIGINT PRIMARY KEY,
validator_id TEXT NOT NULL,
um_equivalent_delegations BIGINT NOT NULL,
delegations BIGINT NOT NULL,
)
",
)
.execute(dbtx.as_mut())
Expand Down

0 comments on commit a0d8d3b

Please sign in to comment.