Skip to content

Commit

Permalink
filter out non-finalized blocks
Browse files Browse the repository at this point in the history
  • Loading branch information
Alex6323 committed Apr 30, 2024
1 parent 57ba866 commit 343285b
Showing 1 changed file with 33 additions and 20 deletions.
53 changes: 33 additions & 20 deletions src/tangle/slot_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,11 @@ use std::{
task::{Context, Poll},
};

use futures::{stream::BoxStream, Stream, TryStreamExt};
use iota_sdk::types::block::slot::{SlotCommitment, SlotCommitmentId, SlotIndex};
use futures::{stream::BoxStream, Stream, StreamExt, TryStreamExt};
use iota_sdk::types::{
api::core::BlockState,
block::slot::{SlotCommitment, SlotCommitmentId, SlotIndex},
};

use super::InputSource;
use crate::model::{
Expand Down Expand Up @@ -43,25 +46,35 @@ impl<'a, I: InputSource> Slot<'a, I> {
pub async fn accepted_block_stream(
&self,
) -> Result<impl Stream<Item = Result<BlockWithTransactionMetadata, I::Error>> + '_, I::Error> {
Ok(self.source.accepted_blocks(self.index()).await?.and_then(|res| async {
let transaction = if let Some(transaction_id) = res
.block
.inner()
.body()
.as_basic_opt()
.and_then(|body| body.payload())
.and_then(|p| p.as_signed_transaction_opt())
.map(|txn| txn.transaction().id())
{
Some(self.source.transaction_metadata(transaction_id).await?)
} else {
None
};
Ok(BlockWithTransactionMetadata {
transaction,
block: res,
Ok(self
.source
.accepted_blocks(self.index())
.await?
.filter(|res| {
futures::future::ready(matches!(
res,
Ok(block_with_metadata)
if block_with_metadata.metadata.block_state == Some(BlockState::Finalized)))
})
}))
.and_then(|res| async {
let transaction = if let Some(transaction_id) = res
.block
.inner()
.body()
.as_basic_opt()
.and_then(|body| body.payload())
.and_then(|p| p.as_signed_transaction_opt())
.map(|txn| txn.transaction().id())
{
Some(self.source.transaction_metadata(transaction_id).await?)
} else {
None
};
Ok(BlockWithTransactionMetadata {
transaction,
block: res,
})
}))
}

/// Returns the ledger update store.
Expand Down

0 comments on commit 343285b

Please sign in to comment.