Skip to content

Commit

Permalink
Add a notification system for finalizations to serai-client, use in c…
Browse files Browse the repository at this point in the history
…oordinator
  • Loading branch information
kayabaNerve committed Aug 30, 2023
1 parent e9fca37 commit d5a19ec
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 4 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

36 changes: 32 additions & 4 deletions coordinator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use serai_client::{primitives::NetworkId, Public, Serai};

use message_queue::{Service, client::MessageQueue};

use futures::stream::StreamExt;
use tokio::{sync::RwLock, time::sleep};

use ::tributary::{
Expand Down Expand Up @@ -70,7 +71,7 @@ async fn add_tributary<D: Db, P: P2p>(
log::info!("adding tributary {:?}", spec.set());

let tributary = Tributary::<_, Transaction, _>::new(
// TODO2: Use a db on a distinct volume
// TODO2: Use a db on a distinct volume to protect against DoS attacks
db,
spec.genesis(),
spec.start_time(),
Expand Down Expand Up @@ -102,7 +103,36 @@ pub async fn scan_substrate<D: Db, Pro: Processors>(
let mut db = substrate::SubstrateDb::new(db);
let mut next_substrate_block = db.next_block();

let new_substrate_block_notifier = {
let serai = &serai;
move || async move {
loop {
match serai.newly_finalized_block().await {
Ok(sub) => return sub,
Err(e) => {
log::error!("couldn't communicate with serai node: {e}");
sleep(Duration::from_secs(5)).await;
}
}
}
}
};
let mut substrate_block_notifier = new_substrate_block_notifier().await;

loop {
// await the next block, yet if our notifier had an error, re-create it
{
if substrate_block_notifier
.next()
.await
.and_then(|result| if result.is_err() { None } else { Some(()) })
.is_none()
{
substrate_block_notifier = new_substrate_block_notifier().await;
continue;
}
}

match substrate::handle_new_blocks(
&mut db,
&key,
Expand All @@ -125,9 +155,7 @@ pub async fn scan_substrate<D: Db, Pro: Processors>(
)
.await
{
// TODO2: Should this use a notification system for new blocks?
// Right now it's sleeping for half the block time.
Ok(()) => sleep(Duration::from_secs(3)).await,
Ok(()) => {}
Err(e) => {
log::error!("couldn't communicate with serai node: {e}");
sleep(Duration::from_secs(5)).await;
Expand Down
2 changes: 2 additions & 0 deletions substrate/client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ rustdoc-args = ["--cfg", "docsrs"]
zeroize = "^1.5"
thiserror = { version = "1", optional = true }

futures = "0.3"

scale = { package = "parity-scale-codec", version = "3" }
scale-info = { version = "2", optional = true }

Expand Down
14 changes: 14 additions & 0 deletions substrate/client/src/serai/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use thiserror::Error;

use futures::stream::{Stream, StreamExt};

use scale::{Encode, Decode, Compact};
mod scale_value;
pub(crate) use scale_value::{Value, Composite, scale_value, scale_composite};
Expand Down Expand Up @@ -259,6 +261,18 @@ impl Serai {
self.get_block(hash.into()).await
}

/// A stream which yields whenever new block(s) have been finalized.
pub async fn newly_finalized_block(
&self,
) -> Result<impl Stream<Item = Result<(), SeraiError>>, SeraiError> {
Ok(self.0.rpc().subscribe_finalized_block_headers().await.map_err(SeraiError::RpcError)?.map(
|next| {
next.map_err(SeraiError::RpcError)?;
Ok(())
},
))
}

pub async fn get_nonce(&self, address: &SeraiAddress) -> Result<u32, SeraiError> {
self
.0
Expand Down

0 comments on commit d5a19ec

Please sign in to comment.