Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: script by hash reducer #25

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added dump.rdb
Binary file not shown.
63 changes: 33 additions & 30 deletions src/reducers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,12 @@ pub mod pool_by_stake;
pub mod utxo_by_address;
mod worker;

#[cfg(feature = "unstable")]
pub mod address_by_txo;
#[cfg(feature = "unstable")]
pub mod plutus_script_by_hash;
pub mod total_transactions_count;
#[cfg(feature = "unstable")]
pub mod total_transactions_count_by_contract_addresses;
#[cfg(feature = "unstable")]
pub mod transactions_count_by_contract_address;
#[cfg(feature = "unstable")]
pub mod transactions_count_by_contract_address_by_epoch;
#[cfg(feature = "unstable")]
pub mod transactions_count_by_epoch;

#[derive(Deserialize)]
Expand All @@ -31,19 +26,21 @@ pub enum Config {
PointByTx(point_by_tx::Config),
PoolByStake(pool_by_stake::Config),

#[cfg(feature = "unstable")]

AddressByTxo(address_by_txo::Config),
#[cfg(feature = "unstable")]

PlutusScriptByHash(plutus_script_by_hash::Config),

TotalTransactionsCount(total_transactions_count::Config),
#[cfg(feature = "unstable")]

TransactionsCountByEpoch(transactions_count_by_epoch::Config),
#[cfg(feature = "unstable")]

TransactionsCountByContractAddress(transactions_count_by_contract_address::Config),
#[cfg(feature = "unstable")]

TransactionsCountByContractAddressByEpoch(
transactions_count_by_contract_address_by_epoch::Config,
),
#[cfg(feature = "unstable")]

TotalTransactionsCountByContractAddresses(
total_transactions_count_by_contract_addresses::Config,
),
Expand All @@ -56,17 +53,19 @@ impl Config {
Config::PointByTx(c) => c.plugin(),
Config::PoolByStake(c) => c.plugin(),

#[cfg(feature = "unstable")]

Config::AddressByTxo(c) => c.plugin(chain),
#[cfg(feature = "unstable")]

Config::PlutusScriptByHash(c) => c.plugin(chain),

Config::TotalTransactionsCount(c) => c.plugin(),
#[cfg(feature = "unstable")]

Config::TransactionsCountByEpoch(c) => c.plugin(chain),
#[cfg(feature = "unstable")]

Config::TransactionsCountByContractAddress(c) => c.plugin(chain),
#[cfg(feature = "unstable")]

Config::TransactionsCountByContractAddressByEpoch(c) => c.plugin(chain),
#[cfg(feature = "unstable")]

Config::TotalTransactionsCountByContractAddresses(c) => c.plugin(),
}
}
Expand Down Expand Up @@ -106,19 +105,21 @@ pub enum Reducer {
PointByTx(point_by_tx::Reducer),
PoolByStake(pool_by_stake::Reducer),

#[cfg(feature = "unstable")]

AddressByTxo(address_by_txo::Reducer),
#[cfg(feature = "unstable")]

PlutusScriptByHash(plutus_script_by_hash::Reducer),

TotalTransactionsCount(total_transactions_count::Reducer),
#[cfg(feature = "unstable")]

TransactionsCountByEpoch(transactions_count_by_epoch::Reducer),
#[cfg(feature = "unstable")]

TransactionsCountByContractAddress(transactions_count_by_contract_address::Reducer),
#[cfg(feature = "unstable")]

TransactionsCountByContractAddressByEpoch(
transactions_count_by_contract_address_by_epoch::Reducer,
),
#[cfg(feature = "unstable")]

TotalTransactionsCountByContractAddresses(
total_transactions_count_by_contract_addresses::Reducer,
),
Expand All @@ -135,17 +136,19 @@ impl Reducer {
Reducer::PointByTx(x) => x.reduce_block(block, output),
Reducer::PoolByStake(x) => x.reduce_block(block, output),

#[cfg(feature = "unstable")]

Reducer::AddressByTxo(x) => x.reduce_block(block, output),
#[cfg(feature = "unstable")]

Reducer::PlutusScriptByHash(x) => x.reduce_block(block, output),

Reducer::TotalTransactionsCount(x) => x.reduce_block(block, output),
#[cfg(feature = "unstable")]

Reducer::TransactionsCountByEpoch(x) => x.reduce_block(block, output),
#[cfg(feature = "unstable")]

Reducer::TransactionsCountByContractAddress(x) => x.reduce_block(block, output),
#[cfg(feature = "unstable")]

Reducer::TransactionsCountByContractAddressByEpoch(x) => x.reduce_block(block, output),
#[cfg(feature = "unstable")]

Reducer::TotalTransactionsCountByContractAddresses(x) => x.reduce_block(block, output),
}
}
Expand Down
79 changes: 79 additions & 0 deletions src/reducers/plutus_script_by_hash.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
use pallas::crypto::hash::{Hash, Hasher};
use pallas::ledger::primitives::alonzo;
use pallas::ledger::primitives::alonzo::PlutusScript;
use serde::Deserialize;

use crate::{crosscut, model};

#[derive(Deserialize)]
pub struct Config {
pub key_prefix: Option<String>,
pub filter: Option<Vec<String>>,
}

pub struct Reducer {
config: Config,
address_hrp: String,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think you need address_hrp for anything, probably copy and paste.

}

impl Reducer {
fn send_set_add(
&mut self,
slot: u64,
hash: Hash<28>,
cbor: PlutusScript,
output: &mut super::OutputPort,
) -> Result<(), gasket::error::Error> {
let key = format!("{}", hash);
Copy link
Collaborator

@matiwinnetou matiwinnetou Jun 10, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it looks like you are ignoring a configuration key (prefix)

let member = hex::encode::<Vec<u8>>(cbor.into());

let crdt = model::CRDTCommand::LastWriteWins(key, member, slot);

output.send(gasket::messaging::Message::from(crdt))?;

Ok(())
}

fn reduce_alonzo_compatible_tx(
&mut self,
slot: u64,
tx: &alonzo::TransactionWitnessSet,
output: &mut super::OutputPort,
) -> Result<(), gasket::error::Error> {
if let Some(plutus_scripts) = &tx.plutus_script {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your code is iterative but actually the way one should write this IMHO is that rather than returning OK at the end, one should rather map and collect all results and return them from this function. Now I am by far a Rust noob but perhaps Rust will take a first item out of all results or evaluate lazily and stop execution and return result on first error.

@scarmuega do you agree?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it return result on first error if we use the ? operator

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting operator... this is what I thought but you confirmed it.

Powerful and confusing operator.

for scr in plutus_scripts.iter() {
let hash = Hasher::<224>::hash_cbor(scr);

self.send_set_add(slot, hash, scr.clone(), output)?;
}
}

Ok(())
}

pub fn reduce_block(
&mut self,
block: &model::MultiEraBlock,
output: &mut super::OutputPort,
) -> Result<(), gasket::error::Error> {
match block {
model::MultiEraBlock::Byron(_) => Ok(()),
model::MultiEraBlock::AlonzoCompatible(x) => {
x.1.transaction_witness_sets.iter().try_for_each(|tx| {
self.reduce_alonzo_compatible_tx(x.1.header.header_body.slot, tx, output)
})
}
}
}
}

impl Config {
pub fn plugin(self, chain: &crosscut::ChainWellKnownInfo) -> super::Reducer {
let reducer = Reducer {
config: self,
address_hrp: chain.address_hrp.clone(),
};

super::Reducer::PlutusScriptByHash(reducer)
}
}
9 changes: 6 additions & 3 deletions testdrive/daemon.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,16 @@ key_prefix = "c2"
type = "PoolByStake"
key_prefix = "c3"

[[reducers]]
type = "PlutusScriptByHash"
key_prefix = "c4"

[storage]
type = "Redis"
connection_params = "redis://redis:6379"
connection_params = "redis://localhost:6379"

[intersect]
type = "Point"
value = [57867490, "c491c5006192de2c55a95fb3544f60b96bd1665accaf2dfa2ab12fc7191f016b"]
type = "Tip"

[chain]
type = "Mainnet"