From 31df57248eddd8b51defc2fc76012763e556e0ac Mon Sep 17 00:00:00 2001 From: paulobressan Date: Wed, 2 Aug 2023 10:46:27 -0300 Subject: [PATCH 1/8] feat(sources): added connection to utxorpc source --- Cargo.lock | 37 +++++++---- Cargo.toml | 37 ++++++----- examples/utxorpc/daemon.toml | 9 +++ src/sources/mod.rs | 18 ++++++ src/sources/utxorpc.rs | 115 +++++++++++++++++++++++++++++++++++ 5 files changed, 185 insertions(+), 31 deletions(-) create mode 100644 examples/utxorpc/daemon.toml create mode 100644 src/sources/utxorpc.rs diff --git a/Cargo.lock b/Cargo.lock index da816786..81f29000 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2065,7 +2065,7 @@ checksum = "716bf1ec3ce0f09beff948d50bca7e62ae407681d763a3a06a3babde75c255f2" dependencies = [ "deno_core", "once_cell", - "rustls 0.21.1", + "rustls 0.21.5", "rustls-native-certs", "rustls-pemfile", "serde", @@ -3256,7 +3256,7 @@ checksum = "0646026eb1b3eea4cd9ba47912ea5ce9cc07713d105b1a14698f4e6433d348b7" dependencies = [ "http", "hyper 0.14.26", - "rustls 0.21.1", + "rustls 0.21.5", "tokio", "tokio-rustls 0.24.0", ] @@ -4261,6 +4261,7 @@ dependencies = [ "elasticsearch", "env_logger", "file-rotate", + "futures", "gasket", "google-cloud-default", "google-cloud-googleapis", @@ -4285,6 +4286,7 @@ dependencies = [ "strum_macros", "thiserror", "tokio", + "tonic", "tracing", "tracing-subscriber", "unicode-truncate", @@ -5185,7 +5187,7 @@ dependencies = [ "once_cell", "percent-encoding", "pin-project-lite", - "rustls 0.21.1", + "rustls 0.21.5", "rustls-pemfile", "serde", "serde_json", @@ -5356,13 +5358,13 @@ dependencies = [ [[package]] name = "rustls" -version = "0.21.1" +version = "0.21.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c911ba11bc8433e811ce56fde130ccf32f5127cab0e0194e9c68c5a5b671791e" +checksum = "79ea77c539259495ce8ca47f53e66ae0330a8819f67e23ac96ca02f50e7b7d36" dependencies = [ "log", "ring", - "rustls-webpki", + "rustls-webpki 0.101.2", "sct", ] @@ -5373,16 +5375,16 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "67c8d6cf0e464eff7cee6ba0419f56a65d29999fc164dd719c8633fbb401365f" dependencies = [ "log", - "rustls 0.21.1", + "rustls 0.21.5", "rustls-native-certs", - "rustls-webpki", + "rustls-webpki 0.100.1", ] [[package]] name = "rustls-native-certs" -version = "0.6.2" +version = "0.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0167bac7a9f490495f3c33013e7722b53cb087ecbe082fb0c6387c96f634ea50" +checksum = "a9aace74cb666635c918e9c12bc0d348266037aa8eb599b5cba565709a8dff00" dependencies = [ "openssl-probe", "rustls-pemfile", @@ -5409,6 +5411,16 @@ dependencies = [ "untrusted", ] +[[package]] +name = "rustls-webpki" +version = "0.101.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "513722fd73ad80a71f72b61009ea1b584bcfa1483ca93949c8f290298837fa59" +dependencies = [ + "ring", + "untrusted", +] + [[package]] name = "rustversion" version = "1.0.12" @@ -6626,7 +6638,7 @@ version = "0.24.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e0d409377ff5b1e3ca6437aa86c1eb7d40c134bfec254e44c830defa92669db5" dependencies = [ - "rustls 0.21.1", + "rustls 0.21.5", "tokio", ] @@ -6715,6 +6727,7 @@ dependencies = [ "percent-encoding", "pin-project", "prost", + "rustls-native-certs", "rustls-pemfile", "tokio", "tokio-rustls 0.24.0", @@ -7329,7 +7342,7 @@ version = "0.23.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b03058f88386e5ff5310d9111d53f48b17d732b401aeb83a8d5190f2ac459338" dependencies = [ - "rustls-webpki", + "rustls-webpki 0.100.1", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 9b679e1f..e2713d87 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,6 +10,21 @@ license = "Apache-2.0" readme = "README.md" authors = ["Santiago Carmuega "] +[features] +default = ["deno"] +deno = ["deno_core", "deno_runtime"] +sink-file-rotate = ["file-rotate"] +sink-webhook = ["reqwest"] +sink-rabbitmq = ["lapin"] +sink-kafka = ["kafka"] +sink-aws-sqs = ["aws-config", "aws-types", "aws-sdk-sqs"] +sink-aws-lambda = ["aws-config", "aws-types", "aws-sdk-lambda"] +sink-aws-s3 = ["aws-config", "aws-types", "aws-sdk-s3"] +sink-gcp-pubsub = ["google-cloud-pubsub", "google-cloud-googleapis", "google-cloud-default"] +sink-gcp-cloudfunction = ["reqwest", "jsonwebtoken"] +sink-redis = ["r2d2_redis"] +sink-elasticsearch = ["elasticsearch"] +source-utxorpc = ["tonic","futures"] [dependencies] pallas = "0.19.0-alpha.1" @@ -28,11 +43,7 @@ clap = { version = "4.2.7", features = ["derive"] } env_logger = "0.10.0" crossterm = "0.26" merge = "0.1.0" -config = { version = "0.13.2", default-features = false, features = [ - "toml", - "yaml", - "json", -] } +config = { version = "0.13.2", default-features = false, features = ["toml","yaml","json"] } serde = { version = "1.0.152", features = ["derive"] } serde_json = { version = "1.0.89", features = ["arbitrary_precision"] } strum = "0.24" @@ -65,18 +76,6 @@ deno_core = { version = "0.188.0", optional = true } deno_runtime = { version = "0.114.0", optional = true } jsonwebtoken = { version = "8.3.0", optional = true } file-rotate = { version = "0.7.5", optional = true } +tonic = { version = "0.9.2", features = ["tls", "tls-roots"], optional = true} +futures = { version = "0.3.28", optional = true } -[features] -default = ["deno"] -deno = ["deno_core", "deno_runtime"] -sink-file-rotate = ["file-rotate"] -sink-webhook = ["reqwest"] -sink-rabbitmq = ["lapin"] -sink-kafka = ["kafka"] -sink-aws-sqs = ["aws-config", "aws-types", "aws-sdk-sqs"] -sink-aws-lambda = ["aws-config", "aws-types", "aws-sdk-lambda"] -sink-aws-s3 = ["aws-config", "aws-types", "aws-sdk-s3"] -sink-gcp-pubsub = ["google-cloud-pubsub", "google-cloud-googleapis", "google-cloud-default"] -sink-gcp-cloudfunction = ["reqwest", "jsonwebtoken"] -sink-redis = ["r2d2_redis"] -sink-elasticsearch = ["elasticsearch"] diff --git a/examples/utxorpc/daemon.toml b/examples/utxorpc/daemon.toml new file mode 100644 index 00000000..14988558 --- /dev/null +++ b/examples/utxorpc/daemon.toml @@ -0,0 +1,9 @@ +[source] +type = "UtxoRPC" +url = "https://50051-romantic-calmness-b55bqg.us1.demeter.run" + +[intersect] +type = "Tip" + +[sink] +type = "Stdout" diff --git a/src/sources/mod.rs b/src/sources/mod.rs index 94121a72..a50dfca8 100644 --- a/src/sources/mod.rs +++ b/src/sources/mod.rs @@ -12,12 +12,18 @@ pub mod n2n; #[cfg(feature = "aws")] pub mod s3; +#[cfg(feature = "source-utxorpc")] +pub mod utxorpc; + pub enum Bootstrapper { N2N(n2n::Stage), N2C(n2c::Stage), #[cfg(feature = "aws")] S3(s3::Stage), + + #[cfg(feature = "source-utxorpc")] + UtxoRPC(utxorpc::Stage), } impl StageBootstrapper for Bootstrapper { @@ -28,6 +34,9 @@ impl StageBootstrapper for Bootstrapper { #[cfg(feature = "aws")] Bootstrapper::S3(p) => p.output.connect(adapter), + + #[cfg(feature = "source-utxorpc")] + Bootstrapper::UtxoRPC(p) => p.output.connect(adapter), } } @@ -42,6 +51,9 @@ impl StageBootstrapper for Bootstrapper { #[cfg(feature = "aws")] Bootstrapper::S3(x) => gasket::runtime::spawn_stage(x, policy), + + #[cfg(feature = "source-utxorpc")] + Bootstrapper::UtxoRPC(x) => gasket::runtime::spawn_stage(x, policy), } } } @@ -56,6 +68,9 @@ pub enum Config { #[cfg(feature = "aws")] S3(s3::Config), + + #[cfg(feature = "source-utxorpc")] + UtxoRPC(utxorpc::Config), } impl Config { @@ -66,6 +81,9 @@ impl Config { #[cfg(feature = "aws")] Config::S3(c) => Ok(Bootstrapper::S3(c.bootstrapper(ctx)?)), + + #[cfg(feature = "source-utxorpc")] + Config::UtxoRPC(c) => Ok(Bootstrapper::UtxoRPC(c.bootstrapper(ctx)?)), } } } diff --git a/src/sources/utxorpc.rs b/src/sources/utxorpc.rs new file mode 100644 index 00000000..50939967 --- /dev/null +++ b/src/sources/utxorpc.rs @@ -0,0 +1,115 @@ +use futures::StreamExt; +use gasket::framework::*; + +use serde::Deserialize; +use tonic::Streaming; +use tracing::{debug, error}; + +use utxorpc::proto::sync::v1::chain_sync_service_client::ChainSyncServiceClient; +use utxorpc::proto::sync::v1::{follow_tip_response, FollowTipRequest, FollowTipResponse}; + +use crate::framework::*; + +pub struct Worker { + stream: Streaming, +} + +#[async_trait::async_trait(?Send)] +impl gasket::framework::Worker for Worker { + async fn bootstrap(stage: &Stage) -> Result { + debug!("connecting"); + + let mut client = ChainSyncServiceClient::connect(stage.config.url.clone()) + .await + .or_panic()?; + + // TODO: configure intersect + + let stream = client + .follow_tip(FollowTipRequest::default()) + .await + .or_restart()? + .into_inner(); + + Ok(Self { stream }) + } + + async fn schedule( + &mut self, + _stage: &mut Stage, + ) -> Result, WorkerError> { + let result = self.stream.next().await; + if result.is_none() { + return Ok(WorkSchedule::Done); + } + + let result = result.unwrap(); + if let Err(err) = result { + error!("{err}"); + return Err(WorkerError::Retry); + } + + let response: FollowTipResponse = result.unwrap(); + if response.action.is_none() { + return Ok(WorkSchedule::Done); + } + + let action = response.action.unwrap(); + Ok(WorkSchedule::Unit(action)) + } + + async fn execute( + &mut self, + unit: &follow_tip_response::Action, + stage: &mut Stage, + ) -> Result<(), WorkerError> { + match unit { + follow_tip_response::Action::Apply(block) => { + debug!("APPLY {:?}", block) + } + utxorpc::proto::sync::v1::follow_tip_response::Action::Undo(any) => { + debug!("UNDO {:?}", any) + } + utxorpc::proto::sync::v1::follow_tip_response::Action::Reset(reset) => { + debug!("RESET {:?}", reset) + } + } + Ok(()) + } +} + +#[derive(Stage)] +#[stage( + name = "source-utxorpc", + unit = "follow_tip_response::Action", + worker = "Worker" +)] +pub struct Stage { + config: Config, + + pub output: SourceOutputPort, + + #[metric] + ops_count: gasket::metrics::Counter, + + #[metric] + chain_tip: gasket::metrics::Gauge, +} + +#[derive(Deserialize)] +pub struct Config { + url: String, +} + +impl Config { + pub fn bootstrapper(self, ctx: &Context) -> Result { + let stage = Stage { + config: self, + output: Default::default(), + ops_count: Default::default(), + chain_tip: Default::default(), + }; + + Ok(stage) + } +} From 79b337ef34614ebe68d1121b673fc45d3b614a25 Mon Sep 17 00:00:00 2001 From: paulobressan Date: Fri, 4 Aug 2023 10:18:38 -0300 Subject: [PATCH 2/8] feat(source/utxorpc): added payload validation --- src/sources/utxorpc.rs | 105 ++++++++++++++++++++++++++++++++--------- 1 file changed, 83 insertions(+), 22 deletions(-) diff --git a/src/sources/utxorpc.rs b/src/sources/utxorpc.rs index 50939967..54259dd3 100644 --- a/src/sources/utxorpc.rs +++ b/src/sources/utxorpc.rs @@ -1,12 +1,16 @@ use futures::StreamExt; use gasket::framework::*; +use pallas::ledger::traverse::MultiEraBlock; +use pallas::network::miniprotocols::Point; use serde::Deserialize; use tonic::Streaming; use tracing::{debug, error}; +use utxorpc::proto::sync::v1::any_chain_block::Chain; use utxorpc::proto::sync::v1::chain_sync_service_client::ChainSyncServiceClient; -use utxorpc::proto::sync::v1::{follow_tip_response, FollowTipRequest, FollowTipResponse}; +use utxorpc::proto::sync::v1::follow_tip_response::Action; +use utxorpc::proto::sync::v1::{FollowTipRequest, FollowTipResponse}; use crate::framework::*; @@ -34,10 +38,7 @@ impl gasket::framework::Worker for Worker { Ok(Self { stream }) } - async fn schedule( - &mut self, - _stage: &mut Stage, - ) -> Result, WorkerError> { + async fn schedule(&mut self, _: &mut Stage) -> Result, WorkerError> { let result = self.stream.next().await; if result.is_none() { return Ok(WorkSchedule::Done); @@ -58,20 +59,84 @@ impl gasket::framework::Worker for Worker { Ok(WorkSchedule::Unit(action)) } - async fn execute( - &mut self, - unit: &follow_tip_response::Action, - stage: &mut Stage, - ) -> Result<(), WorkerError> { + async fn execute(&mut self, unit: &Action, stage: &mut Stage) -> Result<(), WorkerError> { match unit { - follow_tip_response::Action::Apply(block) => { - debug!("APPLY {:?}", block) + Action::Apply(block) => { + if let Some(chain) = &block.chain { + match chain { + Chain::Cardano(block) => { + if block.body.is_some() { + let header = block.header.as_ref().unwrap(); + + let block = block.body.as_ref().unwrap(); + + for tx in block.tx.clone() { + let evt = ChainEvent::Apply( + Point::Specific(header.slot, header.hash.to_vec()), + Record::ParsedTx(tx), + ); + + stage.output.send(evt.into()).await.or_panic()?; + stage.chain_tip.set(header.slot as i64); + } + } + } + Chain::Raw(bytes) => { + let block = MultiEraBlock::decode(bytes).or_panic()?; + + let evt = ChainEvent::Apply( + Point::Specific(block.slot(), block.hash().to_vec()), + Record::CborBlock(bytes.to_vec()), + ); + + stage.output.send(evt.into()).await.or_panic()?; + stage.chain_tip.set(block.slot() as i64); + } + } + } } - utxorpc::proto::sync::v1::follow_tip_response::Action::Undo(any) => { - debug!("UNDO {:?}", any) + Action::Undo(block) => { + if let Some(chain) = &block.chain { + match chain { + Chain::Cardano(block) => { + if block.body.is_some() { + let header = block.header.as_ref().unwrap(); + + let block = block.body.as_ref().unwrap(); + + for tx in block.tx.clone() { + let evt = ChainEvent::Undo( + Point::Specific(header.slot, header.hash.to_vec()), + Record::ParsedTx(tx), + ); + + stage.output.send(evt.into()).await.or_panic()?; + stage.chain_tip.set(header.slot as i64); + } + } + } + Chain::Raw(bytes) => { + let block = MultiEraBlock::decode(bytes).or_panic()?; + + let evt = ChainEvent::Undo( + Point::Specific(block.slot(), block.hash().to_vec()), + Record::CborBlock(bytes.to_vec()), + ); + + stage.output.send(evt.into()).await.or_panic()?; + stage.chain_tip.set(block.slot() as i64); + } + } + } } - utxorpc::proto::sync::v1::follow_tip_response::Action::Reset(reset) => { - debug!("RESET {:?}", reset) + Action::Reset(reset) => { + stage + .output + .send(ChainEvent::reset(Point::new(reset.index, reset.hash.to_vec())).into()) + .await + .or_panic()?; + + stage.chain_tip.set(reset.index as i64); } } Ok(()) @@ -79,11 +144,7 @@ impl gasket::framework::Worker for Worker { } #[derive(Stage)] -#[stage( - name = "source-utxorpc", - unit = "follow_tip_response::Action", - worker = "Worker" -)] +#[stage(name = "source-utxorpc", unit = "Action", worker = "Worker")] pub struct Stage { config: Config, @@ -102,7 +163,7 @@ pub struct Config { } impl Config { - pub fn bootstrapper(self, ctx: &Context) -> Result { + pub fn bootstrapper(self, _: &Context) -> Result { let stage = Stage { config: self, output: Default::default(), From 1cfe6f60a292fb877ae949e8c62c054a505db4ea Mon Sep 17 00:00:00 2001 From: paulobressan Date: Mon, 7 Aug 2023 17:52:47 -0300 Subject: [PATCH 3/8] feat(sources/utxorpc): implemented intersect and cursor --- src/sources/utxorpc.rs | 178 +++++++++++++++++++++++++++++------------ 1 file changed, 127 insertions(+), 51 deletions(-) diff --git a/src/sources/utxorpc.rs b/src/sources/utxorpc.rs index 54259dd3..0f0f2fca 100644 --- a/src/sources/utxorpc.rs +++ b/src/sources/utxorpc.rs @@ -4,63 +4,26 @@ use gasket::framework::*; use pallas::ledger::traverse::MultiEraBlock; use pallas::network::miniprotocols::Point; use serde::Deserialize; +use tonic::transport::Channel; use tonic::Streaming; use tracing::{debug, error}; use utxorpc::proto::sync::v1::any_chain_block::Chain; use utxorpc::proto::sync::v1::chain_sync_service_client::ChainSyncServiceClient; use utxorpc::proto::sync::v1::follow_tip_response::Action; -use utxorpc::proto::sync::v1::{FollowTipRequest, FollowTipResponse}; +use utxorpc::proto::sync::v1::{BlockRef, DumpHistoryRequest, FollowTipRequest, FollowTipResponse}; use crate::framework::*; pub struct Worker { - stream: Streaming, + client: ChainSyncServiceClient, + stream: Option>, + block_ref: Option, } -#[async_trait::async_trait(?Send)] -impl gasket::framework::Worker for Worker { - async fn bootstrap(stage: &Stage) -> Result { - debug!("connecting"); - - let mut client = ChainSyncServiceClient::connect(stage.config.url.clone()) - .await - .or_panic()?; - - // TODO: configure intersect - - let stream = client - .follow_tip(FollowTipRequest::default()) - .await - .or_restart()? - .into_inner(); - - Ok(Self { stream }) - } - - async fn schedule(&mut self, _: &mut Stage) -> Result, WorkerError> { - let result = self.stream.next().await; - if result.is_none() { - return Ok(WorkSchedule::Done); - } - - let result = result.unwrap(); - if let Err(err) = result { - error!("{err}"); - return Err(WorkerError::Retry); - } - - let response: FollowTipResponse = result.unwrap(); - if response.action.is_none() { - return Ok(WorkSchedule::Done); - } - - let action = response.action.unwrap(); - Ok(WorkSchedule::Unit(action)) - } - - async fn execute(&mut self, unit: &Action, stage: &mut Stage) -> Result<(), WorkerError> { - match unit { +impl Worker { + async fn process_next(&self, stage: &mut Stage, action: &Action) -> Result<(), WorkerError> { + match action { Action::Apply(block) => { if let Some(chain) = &block.chain { match chain { @@ -132,27 +95,137 @@ impl gasket::framework::Worker for Worker { Action::Reset(reset) => { stage .output - .send(ChainEvent::reset(Point::new(reset.index, reset.hash.to_vec())).into()) + .send(ChainEvent::Reset(Point::new(reset.index, reset.hash.to_vec())).into()) .await .or_panic()?; stage.chain_tip.set(reset.index as i64); } } + + Ok(()) + } + + async fn next_stream(&mut self) -> Result>, WorkerError> { + if self.stream.is_none() { + let stream = self + .client + .follow_tip(FollowTipRequest::default()) + .await + .or_restart()? + .into_inner(); + + self.stream = Some(stream); + } + + let result = self.stream.as_mut().unwrap().next().await; + + if result.is_none() { + return Ok(WorkSchedule::Idle); + } + + let result = result.unwrap(); + if let Err(err) = result { + error!("{err}"); + return Err(WorkerError::Retry); + } + + let response: FollowTipResponse = result.unwrap(); + if response.action.is_none() { + return Ok(WorkSchedule::Idle); + } + + let action = response.action.unwrap(); + + Ok(WorkSchedule::Unit(vec![action])) + } + + async fn next_dump_history(&mut self) -> Result>, WorkerError> { + let mut dump_history_request = DumpHistoryRequest::default(); + dump_history_request.start_token = self.block_ref.clone(); + dump_history_request.max_items = 20; + + let result = self + .client + .dump_history(dump_history_request) + .await + .or_restart()? + .into_inner(); + + self.block_ref = result.next_token; + + if !result.block.is_empty() { + let actions: Vec = result.block.into_iter().map(|b| Action::Apply(b)).collect(); + return Ok(WorkSchedule::Unit(actions)); + } + + return Ok(WorkSchedule::Idle); + } +} + +#[async_trait::async_trait(?Send)] +impl gasket::framework::Worker for Worker { + async fn bootstrap(stage: &Stage) -> Result { + debug!("connecting"); + + let client = ChainSyncServiceClient::connect(stage.config.url.clone()) + .await + .or_panic()?; + + let mut point: Option<(u64, Vec)> = match stage.intersect.clone() { + IntersectConfig::Point(slot, hash) => Some((slot, hash.into())), + _ => None, + }; + + if let Some(latest_point) = stage.cursor.latest_known_point() { + point = match latest_point { + Point::Specific(slot, hash) => Some((slot, hash)), + _ => None, + }; + } + + let block_ref = if let Some((slot, hash)) = point { + let mut block_ref = BlockRef::default(); + block_ref.index = slot; + block_ref.hash = hash.into(); + Some(block_ref) + } else { + None + }; + + Ok(Self { + client, + stream: None, + block_ref, + }) + } + + async fn schedule(&mut self, _: &mut Stage) -> Result>, WorkerError> { + if self.block_ref.is_some() { + return Ok(self.next_dump_history().await?); + } + + Ok(self.next_stream().await?) + } + + async fn execute(&mut self, unit: &Vec, stage: &mut Stage) -> Result<(), WorkerError> { + for action in unit { + self.process_next(stage, action).await.or_retry()?; + } + Ok(()) } } #[derive(Stage)] -#[stage(name = "source-utxorpc", unit = "Action", worker = "Worker")] +#[stage(name = "source-utxorpc", unit = "Vec", worker = "Worker")] pub struct Stage { config: Config, - + cursor: Cursor, + intersect: IntersectConfig, pub output: SourceOutputPort, - #[metric] ops_count: gasket::metrics::Counter, - #[metric] chain_tip: gasket::metrics::Gauge, } @@ -160,12 +233,15 @@ pub struct Stage { #[derive(Deserialize)] pub struct Config { url: String, + max_items } impl Config { - pub fn bootstrapper(self, _: &Context) -> Result { + pub fn bootstrapper(self, ctx: &Context) -> Result { let stage = Stage { config: self, + cursor: ctx.cursor.clone(), + intersect: ctx.intersect.clone(), output: Default::default(), ops_count: Default::default(), chain_tip: Default::default(), From 5c9801c6e758495f725d45dadb5155710a56696e Mon Sep 17 00:00:00 2001 From: paulobressan Date: Tue, 8 Aug 2023 14:19:50 -0300 Subject: [PATCH 4/8] feat(source/utxorpc): added max_items to dump history --- src/sources/utxorpc.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/sources/utxorpc.rs b/src/sources/utxorpc.rs index 0f0f2fca..b7c57062 100644 --- a/src/sources/utxorpc.rs +++ b/src/sources/utxorpc.rs @@ -19,6 +19,7 @@ pub struct Worker { client: ChainSyncServiceClient, stream: Option>, block_ref: Option, + max_items: u32, } impl Worker { @@ -143,7 +144,7 @@ impl Worker { async fn next_dump_history(&mut self) -> Result>, WorkerError> { let mut dump_history_request = DumpHistoryRequest::default(); dump_history_request.start_token = self.block_ref.clone(); - dump_history_request.max_items = 20; + dump_history_request.max_items = self.max_items; let result = self .client @@ -193,9 +194,12 @@ impl gasket::framework::Worker for Worker { None }; + let max_items = stage.config.max_items.unwrap_or(20); + Ok(Self { client, stream: None, + max_items, block_ref, }) } @@ -233,7 +237,7 @@ pub struct Stage { #[derive(Deserialize)] pub struct Config { url: String, - max_items + max_items: Option, } impl Config { From 076cdbd212cbb0a316af517ee86ce8bab731fd63 Mon Sep 17 00:00:00 2001 From: paulobressan Date: Tue, 8 Aug 2023 15:05:00 -0300 Subject: [PATCH 5/8] docs(sources): added utxorpc docs --- docs/pages/v2/sources/utxorpc.mdx | 31 +++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) create mode 100644 docs/pages/v2/sources/utxorpc.mdx diff --git a/docs/pages/v2/sources/utxorpc.mdx b/docs/pages/v2/sources/utxorpc.mdx new file mode 100644 index 00000000..35b4a29e --- /dev/null +++ b/docs/pages/v2/sources/utxorpc.mdx @@ -0,0 +1,31 @@ +# UtxoRPC with Dolos + +The UtxoRPC with Dolos (UtxoRPC) source uses gRPC to fetch blocks and receive blocks from a no Dolos. + +## Configuration + +The following snippet shows an example of how to set up a typical UtxoRPC source: + +```toml +[source] +type = "UtxoRPC" +url = ["https://"] +max_items = 20 +``` + +### Section `source`: + +- `type`: this field must be set to the literal value `UtxoRPC` +- `url`: A string contains Dolos gRPC url +- `max_items`: Number of blocks that will be requested from Dolos when using a chain distance point. Default value is `20` + +## Examples + +Connecting to a remote Dolos node in preprod through gRPC: + +```toml +[source] +type = "UtxoRPC" +url = ["https://50051-romantic-calmness-b55bqg.us1.demeter.run"] +max_items = 20 +``` From d0288d18a0d5225598a936b8e7c8d255c69980ed Mon Sep 17 00:00:00 2001 From: paulobressan Date: Tue, 8 Aug 2023 16:20:29 -0300 Subject: [PATCH 6/8] chore(examples): added dolos source example --- examples/dolos_source/daemon.toml | 9 +++++++++ 1 file changed, 9 insertions(+) create mode 100644 examples/dolos_source/daemon.toml diff --git a/examples/dolos_source/daemon.toml b/examples/dolos_source/daemon.toml new file mode 100644 index 00000000..14988558 --- /dev/null +++ b/examples/dolos_source/daemon.toml @@ -0,0 +1,9 @@ +[source] +type = "UtxoRPC" +url = "https://50051-romantic-calmness-b55bqg.us1.demeter.run" + +[intersect] +type = "Tip" + +[sink] +type = "Stdout" From 3279546ea0041d3b4e69170f975df7c438b6952c Mon Sep 17 00:00:00 2001 From: paulobressan Date: Tue, 8 Aug 2023 16:20:41 -0300 Subject: [PATCH 7/8] chore(examples): added n2c source example --- examples/n2c_source/.gitignore | 1 + examples/n2c_source/daemon.toml | 9 +++++++++ examples/n2c_source/docker-compose.yml | 20 ++++++++++++++++++++ examples/utxorpc/daemon.toml | 9 --------- 4 files changed, 30 insertions(+), 9 deletions(-) create mode 100644 examples/n2c_source/.gitignore create mode 100644 examples/n2c_source/daemon.toml create mode 100644 examples/n2c_source/docker-compose.yml delete mode 100644 examples/utxorpc/daemon.toml diff --git a/examples/n2c_source/.gitignore b/examples/n2c_source/.gitignore new file mode 100644 index 00000000..32b6e495 --- /dev/null +++ b/examples/n2c_source/.gitignore @@ -0,0 +1 @@ +node \ No newline at end of file diff --git a/examples/n2c_source/daemon.toml b/examples/n2c_source/daemon.toml new file mode 100644 index 00000000..b01b5492 --- /dev/null +++ b/examples/n2c_source/daemon.toml @@ -0,0 +1,9 @@ +[source] +type = "N2C" +socket_path = "examples/n2c_source/node/node.socket" + +[intersect] +type = "Tip" + +[sink] +type = "Stdout" diff --git a/examples/n2c_source/docker-compose.yml b/examples/n2c_source/docker-compose.yml new file mode 100644 index 00000000..f0cd6f9e --- /dev/null +++ b/examples/n2c_source/docker-compose.yml @@ -0,0 +1,20 @@ +version: "3" +services: + node: + image: inputoutput/cardano-node + container_name: node + ports: + - 3001:3001 + volumes: + - ./node:/data:rw + - ./node:/ipc:rw + environment: + - NETWORK=preprod + - CARDANO_BIND_ADDR=0.0.0.0 + - CARDANO_PORT=3001 + networks: + - dolos-network + +networks: + dolos-network: + driver: bridge diff --git a/examples/utxorpc/daemon.toml b/examples/utxorpc/daemon.toml deleted file mode 100644 index 14988558..00000000 --- a/examples/utxorpc/daemon.toml +++ /dev/null @@ -1,9 +0,0 @@ -[source] -type = "UtxoRPC" -url = "https://50051-romantic-calmness-b55bqg.us1.demeter.run" - -[intersect] -type = "Tip" - -[sink] -type = "Stdout" From 708aa3006268d6e9ca6c6578dd6354bdf214caa3 Mon Sep 17 00:00:00 2001 From: paulobressan Date: Tue, 8 Aug 2023 19:56:38 -0300 Subject: [PATCH 8/8] chore(sources/utxorpc): changed max_items to max_items_per_page --- docs/pages/v2/sources/utxorpc.mdx | 6 +++--- src/sources/utxorpc.rs | 10 +++++----- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/docs/pages/v2/sources/utxorpc.mdx b/docs/pages/v2/sources/utxorpc.mdx index 35b4a29e..e95b0313 100644 --- a/docs/pages/v2/sources/utxorpc.mdx +++ b/docs/pages/v2/sources/utxorpc.mdx @@ -10,14 +10,14 @@ The following snippet shows an example of how to set up a typical UtxoRPC source [source] type = "UtxoRPC" url = ["https://"] -max_items = 20 +max_items_per_page = 20 ``` ### Section `source`: - `type`: this field must be set to the literal value `UtxoRPC` - `url`: A string contains Dolos gRPC url -- `max_items`: Number of blocks that will be requested from Dolos when using a chain distance point. Default value is `20` +- `max_items_per_page`: Number of blocks that will be requested from Dolos when using a chain distance point. Default value is `20` ## Examples @@ -27,5 +27,5 @@ Connecting to a remote Dolos node in preprod through gRPC: [source] type = "UtxoRPC" url = ["https://50051-romantic-calmness-b55bqg.us1.demeter.run"] -max_items = 20 +max_items_per_page = 20 ``` diff --git a/src/sources/utxorpc.rs b/src/sources/utxorpc.rs index b7c57062..9c9c51a4 100644 --- a/src/sources/utxorpc.rs +++ b/src/sources/utxorpc.rs @@ -19,7 +19,7 @@ pub struct Worker { client: ChainSyncServiceClient, stream: Option>, block_ref: Option, - max_items: u32, + max_items_per_page: u32, } impl Worker { @@ -144,7 +144,7 @@ impl Worker { async fn next_dump_history(&mut self) -> Result>, WorkerError> { let mut dump_history_request = DumpHistoryRequest::default(); dump_history_request.start_token = self.block_ref.clone(); - dump_history_request.max_items = self.max_items; + dump_history_request.max_items = self.max_items_per_page; let result = self .client @@ -194,12 +194,12 @@ impl gasket::framework::Worker for Worker { None }; - let max_items = stage.config.max_items.unwrap_or(20); + let max_items_per_page = stage.config.max_items_per_page.unwrap_or(20); Ok(Self { client, stream: None, - max_items, + max_items_per_page, block_ref, }) } @@ -237,7 +237,7 @@ pub struct Stage { #[derive(Deserialize)] pub struct Config { url: String, - max_items: Option, + max_items_per_page: Option, } impl Config {