Skip to content

Commit

Permalink
mark PeerEventBus-based flow as temporary and use FetcherService Sour…
Browse files Browse the repository at this point in the history
…ceQueue
  • Loading branch information
Jaap van der Plas committed Aug 26, 2021
1 parent 4dee3df commit 30136c1
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,8 @@ case class BranchBuffer(byParent: Map[Hash, Block] = Map.empty, branchFound: Que
object BranchBuffer {
type Hash = ByteString

def flow(blockchainReader: BlockchainReader): Flow[Seq[Block], NonEmptyList[Block], NotUsed] =
Flow[Seq[Block]]
.mapConcat(_.sortBy(_.number).reverse)
def flow(blockchainReader: BlockchainReader): Flow[Block, NonEmptyList[Block], NotUsed] =
Flow[Block]
.scan(BranchBuffer()) { case (buffer, block) => buffer.handle(blockchainReader.getBestBranch(), block) }
.collect { case BranchBuffer(_, head +: tail) => NonEmptyList(head, tail.toList) }
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import io.iohk.ethereum.network.p2p.messages.ETH62.BlockHeaders
import io.iohk.ethereum.utils.Config.SyncConfig
import akka.stream.scaladsl.SourceQueue
import akka.stream.QueueOfferResult.Enqueued
import akka.NotUsed

//not used atm, a part of the future ExecutionSync
class FetcherService(
Expand All @@ -48,10 +49,10 @@ class FetcherService(

//TODO: add private def requestStateNode(hash: ByteString): Task[Either[RequestFailed, Seq[ByteString]]] = ???

private def placeBlockInPeerStream(peer: Peer, block: Block): Task[Either[RequestFailed, Unit]] =
def placeBlockInPeerStream(block: Block): Task[Either[String, Unit]] =
Task.deferFuture(sourceQueue.offer(block)).map {
case Enqueued => Right(())
case result => Left(RequestFailed(peer, result.toString()))
case reason => Left(s"SourceQueue.offer failed: $reason")
}

def fetchBlocksUntil(
Expand Down Expand Up @@ -87,13 +88,11 @@ class FetcherService(
bodies <- EitherT(requestBodies(headers.headers.map(_.hash)))
blocks = buildBlocks(headers.headers, bodies.bodies)
_ <- EitherT.cond[Task](blocks.length == headers.headers.length, (), RequestFailed(peer, "Unmatching bodies"))
_ <- blocks.traverse(block => EitherT(placeBlockInPeerStream(peer, block)))
_ <- blocks.traverse(block => EitherT(placeBlockInPeerStream(block)).leftMap(RequestFailed(peer, _)))
} yield peer
}

object FetcherService {
type Hashes = Seq[ByteString]

case class BlockIdentifier(transactionsRoot: ByteString, ommersHash: ByteString)
object BlockIdentifier {
def apply(blockHeader: BlockHeader): BlockIdentifier =
Expand All @@ -117,31 +116,25 @@ object FetcherService {
/** State of block fetching stream after processing a given incoming message with block headers or bodies
*
* @param outstanding headers that are yet to be matched to bodies
* @param bodiesRequest information for requesting bodies corresponding to newly outstanding headers
* @param result blocks produced by matching received headers with received bodies
*/
case class FetchState(
outstanding: Set[BlockHeader],
bodiesRequest: Option[(PeerId, Hashes)],
result: Seq[Block]
)
object FetchState {
val initial: FetchState = FetchState(Set.empty, None, Nil)
val initial: FetchState = FetchState(Set.empty, Nil)
}

def fetchBlocksForHeaders[M](bodyRequestor: Sink[(PeerId, Hashes), M]): Flow[MessageFromPeer, Seq[Block], M] =
// TODO: remove once we have the FetcherService instance integrated
val tempFlow: Flow[MessageFromPeer, Seq[Block], NotUsed] =
Flow[MessageFromPeer]
.scan(FetchState.initial) {
case (FetchState(outstanding, _, _), MessageFromPeer(BlockHeaders(headers), peerId)) =>
FetchState(outstanding.concat(headers), Some(peerId -> headers.map(_.hash)), Nil)
case (FetchState(outstanding, _, _), MessageFromPeer(BlockBodies(bodies), _)) =>
case (FetchState(outstanding, _), MessageFromPeer(BlockHeaders(headers), peerId)) =>
FetchState(outstanding.concat(headers), Nil)
case (FetchState(outstanding, _), MessageFromPeer(BlockBodies(bodies), _)) =>
val blocks = buildBlocks(outstanding.toSeq, bodies)
FetchState(outstanding.removedAll(blocks.map(_.header)), None, blocks)
FetchState(outstanding.removedAll(blocks.map(_.header)), blocks)
}
.alsoToMat(
Flow[FetchState]
.collect { case FetchState(_, Some(bodiesRequest), _) => bodiesRequest }
.toMat(bodyRequestor)(Keep.right)
)(Keep.right)
.collect { case FetchState(_, _, blocks) if blocks.nonEmpty => blocks }
.collect { case FetchState(_, blocks) if blocks.nonEmpty => blocks }
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ import io.iohk.ethereum.network.p2p.messages.ETH62
import io.iohk.ethereum.nodebuilder.BlockchainConfigBuilder
import io.iohk.ethereum.utils.ByteStringUtils
import io.iohk.ethereum.utils.Config.SyncConfig
import akka.stream.scaladsl.Source
import io.iohk.ethereum.network.Peer

class RegularSync(
peersClient: ActorRef,
Expand Down Expand Up @@ -107,11 +109,15 @@ class RegularSync(
BlockFetcher.PrintStatus
)

val (blockSourceQueue, blockSource) = Source.queue[Block](256, OverflowStrategy.fail).preMaterialize()
val fetcherService = new FetcherService(blockchainReader, syncConfig, blockSourceQueue)

override def receive: Receive = running(
ProgressState(startedFetching = false, initialBlock = 0, currentBlock = 0, bestKnownNetworkBlock = 0)
)

private def startNewFlow() =
private def startTemporaryBlockProducer() = {
import monix.execution.Scheduler.Implicits.global
PeerEventBusActor
.messageSource(
peerEventBus,
Expand All @@ -121,12 +127,20 @@ class RegularSync(
PeerEventBusActor.PeerSelector.AllPeers
)
)
.via(FetcherService.tempFlow)
.buffer(256, OverflowStrategy.fail)
.via(
FetcherService.fetchBlocksForHeaders(
Sink.ignore // BlockFetcher is relied on for requesting the bodies
)
)
.mapConcat(identity)
.runWith(Sink.foreachAsync(1) { block =>
fetcherService
.placeBlockInPeerStream(block)
.runToFuture
.collect { case Right(()) => () }

})
}

private def startNewFlow() =
blockSource
.via(BranchBuffer.flow(blockchainReader))
.runWith(Sink.foreach { blocks =>
importer ! BlockFetcher.PickedBlocks(blocks)
Expand Down

0 comments on commit 30136c1

Please sign in to comment.