Skip to content

Commit

Permalink
feat(source/utxorpc): added payload validation
Browse files Browse the repository at this point in the history
  • Loading branch information
paulobressan committed Aug 4, 2023
1 parent 31df572 commit 79b337e
Showing 1 changed file with 83 additions and 22 deletions.
105 changes: 83 additions & 22 deletions src/sources/utxorpc.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
use futures::StreamExt;
use gasket::framework::*;

use pallas::ledger::traverse::MultiEraBlock;
use pallas::network::miniprotocols::Point;
use serde::Deserialize;
use tonic::Streaming;
use tracing::{debug, error};

use utxorpc::proto::sync::v1::any_chain_block::Chain;
use utxorpc::proto::sync::v1::chain_sync_service_client::ChainSyncServiceClient;
use utxorpc::proto::sync::v1::{follow_tip_response, FollowTipRequest, FollowTipResponse};
use utxorpc::proto::sync::v1::follow_tip_response::Action;
use utxorpc::proto::sync::v1::{FollowTipRequest, FollowTipResponse};

use crate::framework::*;

Expand Down Expand Up @@ -34,10 +38,7 @@ impl gasket::framework::Worker<Stage> for Worker {
Ok(Self { stream })
}

async fn schedule(
&mut self,
_stage: &mut Stage,
) -> Result<WorkSchedule<follow_tip_response::Action>, WorkerError> {
async fn schedule(&mut self, _: &mut Stage) -> Result<WorkSchedule<Action>, WorkerError> {
let result = self.stream.next().await;
if result.is_none() {
return Ok(WorkSchedule::Done);
Expand All @@ -58,32 +59,92 @@ impl gasket::framework::Worker<Stage> for Worker {
Ok(WorkSchedule::Unit(action))
}

async fn execute(
&mut self,
unit: &follow_tip_response::Action,
stage: &mut Stage,
) -> Result<(), WorkerError> {
async fn execute(&mut self, unit: &Action, stage: &mut Stage) -> Result<(), WorkerError> {
match unit {
follow_tip_response::Action::Apply(block) => {
debug!("APPLY {:?}", block)
Action::Apply(block) => {
if let Some(chain) = &block.chain {
match chain {
Chain::Cardano(block) => {
if block.body.is_some() {
let header = block.header.as_ref().unwrap();

let block = block.body.as_ref().unwrap();

for tx in block.tx.clone() {
let evt = ChainEvent::Apply(
Point::Specific(header.slot, header.hash.to_vec()),
Record::ParsedTx(tx),
);

stage.output.send(evt.into()).await.or_panic()?;
stage.chain_tip.set(header.slot as i64);
}
}
}
Chain::Raw(bytes) => {
let block = MultiEraBlock::decode(bytes).or_panic()?;

let evt = ChainEvent::Apply(
Point::Specific(block.slot(), block.hash().to_vec()),
Record::CborBlock(bytes.to_vec()),
);

stage.output.send(evt.into()).await.or_panic()?;
stage.chain_tip.set(block.slot() as i64);
}
}
}
}
utxorpc::proto::sync::v1::follow_tip_response::Action::Undo(any) => {
debug!("UNDO {:?}", any)
Action::Undo(block) => {
if let Some(chain) = &block.chain {
match chain {
Chain::Cardano(block) => {
if block.body.is_some() {
let header = block.header.as_ref().unwrap();

let block = block.body.as_ref().unwrap();

for tx in block.tx.clone() {
let evt = ChainEvent::Undo(
Point::Specific(header.slot, header.hash.to_vec()),
Record::ParsedTx(tx),
);

stage.output.send(evt.into()).await.or_panic()?;
stage.chain_tip.set(header.slot as i64);
}
}
}
Chain::Raw(bytes) => {
let block = MultiEraBlock::decode(bytes).or_panic()?;

let evt = ChainEvent::Undo(
Point::Specific(block.slot(), block.hash().to_vec()),
Record::CborBlock(bytes.to_vec()),
);

stage.output.send(evt.into()).await.or_panic()?;
stage.chain_tip.set(block.slot() as i64);
}
}
}
}
utxorpc::proto::sync::v1::follow_tip_response::Action::Reset(reset) => {
debug!("RESET {:?}", reset)
Action::Reset(reset) => {
stage
.output
.send(ChainEvent::reset(Point::new(reset.index, reset.hash.to_vec())).into())
.await
.or_panic()?;

stage.chain_tip.set(reset.index as i64);
}
}
Ok(())
}
}

#[derive(Stage)]
#[stage(
name = "source-utxorpc",
unit = "follow_tip_response::Action",
worker = "Worker"
)]
#[stage(name = "source-utxorpc", unit = "Action", worker = "Worker")]
pub struct Stage {
config: Config,

Expand All @@ -102,7 +163,7 @@ pub struct Config {
}

impl Config {
pub fn bootstrapper(self, ctx: &Context) -> Result<Stage, Error> {
pub fn bootstrapper(self, _: &Context) -> Result<Stage, Error> {
let stage = Stage {
config: self,
output: Default::default(),
Expand Down

0 comments on commit 79b337e

Please sign in to comment.