From 68b663d0f1714cb94880222217a649d1bf7a2785 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maciej=20Muszy=C5=84ski?= <30803273+TheMMaciek@users.noreply.github.com> Date: Mon, 26 Jul 2021 14:23:57 +0200 Subject: [PATCH] Implement joining pool --- .gitignore | 5 +- .../scala/org/constellation/schema/Kryo.scala | 0 .../schema/observation/ObservationEvent.scala | 6 + .../serialization/SchemaKryoRegistrar.scala | 7 + .../schema/snapshot/Snapshot.scala | 19 +- src/main/resources/application.conf | 1 + src/main/resources/logback-dev.xml | 10 +- src/main/resources/logback-prod.xml | 4 +- src/main/resources/logback.xml | 6 +- src/main/scala/org/constellation/API.scala | 2 +- .../constellation/ConstellationNode$.scala | 49 +++- src/main/scala/org/constellation/DAO.scala | 38 ++- .../checkpoint/CheckpointCompare.scala | 8 +- .../checkpoint/CheckpointService.scala | 16 +- .../consensus/ConsensusManager.scala | 20 +- .../consensus/ConsensusScheduler.scala | 7 +- .../CheckpointStorageAlgebra.scala | 1 + .../cluster/ClusterStorageAlgebra.scala | 31 ++- .../domain/cluster/NodeStorageAlgebra.scala | 4 +- .../domain/configuration/CliConfig.scala | 5 +- .../domain/configuration/NodeConfig.scala | 8 +- .../HealthCheckConsensusManagerBase.scala | 10 +- .../PingHealthCheckConsensusManager.scala | 14 +- ...gProposalHealthCheckConsensusManager.scala | 10 +- .../observation/ObservationService.scala | 24 +- .../p2p/client/ClusterClientAlgebra.scala | 5 + .../p2p/client/SnapshotClientAlgebra.scala | 3 + .../domain/redownload/DownloadService.scala | 2 +- .../domain/redownload/RedownloadService.scala | 245 +++++++++++++++--- .../redownload/RedownloadStorageAlgebra.scala | 4 + .../snapshot/SnapshotStorageAlgebra.scala | 3 +- .../transaction/TransactionGossiping.scala | 2 +- .../sampling/PartitionerPeerSampling.scala | 17 +- .../gossip/sampling/RandomPeerSampling.scala | 1 + .../CheckpointStorageInterpreter.scala | 11 + .../cluster/ClusterStorageInterpreter.scala | 183 ++++++++++++- .../configuration/CliConfigParser.scala | 14 +- .../endpoints/ClusterEndpoints.scala | 34 ++- .../endpoints/ConsensusEndpoints.scala | 5 +- .../endpoints/SnapshotEndpoints.scala | 25 +- .../p2p/client/ClusterClientInterpreter.scala | 11 + .../client/SnapshotClientInterpreter.scala | 9 + .../redownload/RedownloadPeriodicCheck.scala | 17 +- .../RedownloadStorageInterpreter.scala | 13 +- .../snapshot/SnapshotStorageInterpreter.scala | 5 +- .../scala/org/constellation/p2p/Cluster.scala | 99 ++++++- .../p2p/PeerRegistrationRequest.scala | 3 +- .../rewards/RewardsManager.scala | 2 +- .../rollback/RollbackService.scala | 18 +- .../snapshot/SnapshotTrigger.scala | 3 + .../storage/SnapshotService.scala | 242 +++++++++++++++-- .../trust/SelfAvoidingWalk.scala | 21 +- .../trust/TrustDataPollingScheduler.scala | 11 +- .../constellation/trust/TrustManager.scala | 13 +- .../constellation/util/HealthChecker.scala | 2 +- .../constellation/util/MetricsUpdater.scala | 3 + .../scala/org/constellation/Fixtures.scala | 5 +- .../scala/org/constellation/TestHelpers.scala | 8 +- .../RedownloadPeriodicCheckTest.scala | 4 +- .../snapshot/SnapshotDiskStorageTest.scala | 5 +- .../SnapshotInfoLocalStorageTest.scala | 5 +- 61 files changed, 1143 insertions(+), 215 deletions(-) create mode 100644 schema/src/main/scala/org/constellation/schema/Kryo.scala diff --git a/.gitignore b/.gitignore index eb10f8eba..82ff747dc 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,6 @@ .idea/ +.vscode/ +.metals/ target/ *.iml .DS_STORE @@ -18,8 +20,7 @@ terraform/aws/whitelisting-* .dag *.log /logs -.metals/ .bloop/ -project/metals.sbt +metals.sbt keys/ integration-tests/src/test/resources/terraform-output.json diff --git a/schema/src/main/scala/org/constellation/schema/Kryo.scala b/schema/src/main/scala/org/constellation/schema/Kryo.scala new file mode 100644 index 000000000..e69de29bb diff --git a/schema/src/main/scala/org/constellation/schema/observation/ObservationEvent.scala b/schema/src/main/scala/org/constellation/schema/observation/ObservationEvent.scala index 290882860..01421df59 100644 --- a/schema/src/main/scala/org/constellation/schema/observation/ObservationEvent.scala +++ b/schema/src/main/scala/org/constellation/schema/observation/ObservationEvent.scala @@ -11,6 +11,12 @@ case class CheckpointBlockWithMissingSoe(checkpointBaseHash: String) extends Obs case class RequestTimeoutOnConsensus(roundId: RoundId) extends ObservationEvent case class RequestTimeoutOnResolving(hashes: List[String]) extends ObservationEvent case class CheckpointBlockInvalid(checkpointBaseHash: String, reason: String) extends ObservationEvent +case object NodeMemberOfActivePool extends ObservationEvent +case object NodeNotMemberOfActivePool extends ObservationEvent +case object NodeJoinsTheCluster extends ObservationEvent +case object NodeLeavesTheCluster extends ObservationEvent +case object NodeKickedOutByHealthcheck extends ObservationEvent + object ObservationEvent { implicit val encodeEvent: Encoder[ObservationEvent] = deriveEncoder diff --git a/schema/src/main/scala/org/constellation/schema/serialization/SchemaKryoRegistrar.scala b/schema/src/main/scala/org/constellation/schema/serialization/SchemaKryoRegistrar.scala index 5cb8b8940..ed43a6040 100644 --- a/schema/src/main/scala/org/constellation/schema/serialization/SchemaKryoRegistrar.scala +++ b/schema/src/main/scala/org/constellation/schema/serialization/SchemaKryoRegistrar.scala @@ -12,6 +12,7 @@ import org.constellation.schema.signature.{HashSignature, SignatureBatch, Signed import org.constellation.schema.snapshot.{ FilterData, HeightRange, + NextActiveNodes, Snapshot, SnapshotInfo, SnapshotInfoV1, @@ -113,6 +114,12 @@ object SchemaKryoRegistrar (classOf[HeightRange], 204, DefaultSerializer), (classOf[CheckpointBlockPayload], 205, DefaultSerializer), (classOf[FinishedCheckpointBlock], 206, DefaultSerializer), + (NodeMemberOfActivePool.getClass, 207, DefaultSerializer), + (NodeNotMemberOfActivePool.getClass, 208, DefaultSerializer), + (NodeJoinsTheCluster.getClass, 209, DefaultSerializer), + (NodeLeavesTheCluster.getClass, 210, DefaultSerializer), + (NodeKickedOutByHealthcheck.getClass, 211, DefaultSerializer), + (classOf[NextActiveNodes], 212, DefaultSerializer), (classOf[CheckpointCache], 1034, CompatibleSerializer), (classOf[StoredSnapshot], 1035, CompatibleSerializer), (classOf[SnapshotInfo], 1036, CompatibleSerializer) diff --git a/schema/src/main/scala/org/constellation/schema/snapshot/Snapshot.scala b/schema/src/main/scala/org/constellation/schema/snapshot/Snapshot.scala index 2fac2f717..75e8e83cd 100644 --- a/schema/src/main/scala/org/constellation/schema/snapshot/Snapshot.scala +++ b/schema/src/main/scala/org/constellation/schema/snapshot/Snapshot.scala @@ -5,11 +5,22 @@ import org.constellation.schema.signature.Signable import scala.collection.SortedMap -case class Snapshot(lastSnapshot: String, checkpointBlocks: Seq[String], publicReputation: SortedMap[Id, Double]) - extends Signable { - override def toEncode = checkpointBlocks :+ lastSnapshot +case class Snapshot( + lastSnapshot: String, + checkpointBlocks: Seq[String], + publicReputation: SortedMap[Id, Double], + nextActiveNodes: NextActiveNodes, + authorizedNodes: Set[Id] +) extends Signable { + override def toEncode: Seq[String] = + (checkpointBlocks :+ lastSnapshot) ++ + nextActiveNodes.light.toSeq.map(_.hex).sorted ++ + nextActiveNodes.full.toSeq.map(_.hex).sorted ++ + authorizedNodes.toSeq.map(_.hex).sorted } object Snapshot { - val snapshotZero: Snapshot = Snapshot("", Seq(), SortedMap.empty) + val snapshotZero: Snapshot = Snapshot("", Seq(), SortedMap.empty, NextActiveNodes(Set.empty, Set.empty), Set.empty) } + +case class NextActiveNodes(light: Set[Id], full: Set[Id]) diff --git a/src/main/resources/application.conf b/src/main/resources/application.conf index 509d10402..8873c5e06 100644 --- a/src/main/resources/application.conf +++ b/src/main/resources/application.conf @@ -82,6 +82,7 @@ constellation { stallCountThreshold = 2 proposalLookupLimit = 6 distanceFromMajority = 8 + activePeersRotationInterval = 100 } schema { v1 { diff --git a/src/main/resources/logback-dev.xml b/src/main/resources/logback-dev.xml index 790b40632..ed7323e91 100644 --- a/src/main/resources/logback-dev.xml +++ b/src/main/resources/logback-dev.xml @@ -1,7 +1,7 @@ - + @@ -38,12 +38,12 @@ - - + + - + - \ No newline at end of file + diff --git a/src/main/resources/logback-prod.xml b/src/main/resources/logback-prod.xml index b344f5141..7ae5ce69c 100644 --- a/src/main/resources/logback-prod.xml +++ b/src/main/resources/logback-prod.xml @@ -1,7 +1,7 @@ - + @@ -50,4 +50,4 @@ - \ No newline at end of file + diff --git a/src/main/resources/logback.xml b/src/main/resources/logback.xml index b0ba629f0..ab207c53c 100644 --- a/src/main/resources/logback.xml +++ b/src/main/resources/logback.xml @@ -1,7 +1,7 @@ - + @@ -32,10 +32,10 @@ - + - + diff --git a/src/main/scala/org/constellation/API.scala b/src/main/scala/org/constellation/API.scala index f60898429..15817d706 100644 --- a/src/main/scala/org/constellation/API.scala +++ b/src/main/scala/org/constellation/API.scala @@ -15,7 +15,7 @@ case class PeerMetadata( timeAdded: Long = System.currentTimeMillis(), auxHost: String = "", auxAddresses: Seq[String] = Seq(), // for testing multi key address partitioning - nodeType: NodeType = NodeType.Full, + nodeType: NodeType, resourceInfo: ResourceInfo ) { def toPeerClientMetadata: PeerClientMetadata = PeerClientMetadata(host, httpPort, id) diff --git a/src/main/scala/org/constellation/ConstellationNode$.scala b/src/main/scala/org/constellation/ConstellationNode$.scala index e29e6d1ee..3f2c0b7c5 100644 --- a/src/main/scala/org/constellation/ConstellationNode$.scala +++ b/src/main/scala/org/constellation/ConstellationNode$.scala @@ -29,7 +29,9 @@ import org.constellation.infrastructure.p2p.ClientInterpreter import org.constellation.infrastructure.p2p.PeerResponse.PeerClientMetadata import org.constellation.keytool.KeyStoreUtils import org.constellation.p2p.DataResolver +import org.constellation.schema.NodeType.Full import org.constellation.schema.checkpoint.{CheckpointBlock, CheckpointCache} +import org.constellation.schema.snapshot.NextActiveNodes import org.constellation.schema.{GenesisObservation, Id, NodeState} import org.constellation.serialization.KryoSerializer import org.constellation.session.SessionTokenService @@ -47,6 +49,7 @@ import org.slf4j.MDC import pureconfig._ import pureconfig.generic.auto._ +import scala.collection.immutable.ListMap import scala.concurrent.ExecutionContext import scala.concurrent.duration._ import scala.io.Source @@ -166,6 +169,7 @@ object ConstellationNode$ extends IOApp with IOApp.WithContext { dao.snapshotInfoStorage, dao.snapshotService, dao.nodeStorage, + dao.redownloadService, dao.redownloadStorage, dao.snapshotProposalGossipService, dao.messageValidator, @@ -300,9 +304,30 @@ object ConstellationNode$ extends IOApp with IOApp.WithContext { ClientInterpreter[IO](client, sessionTokenService)(ce, cs) } - clusterStorage = new ClusterStorageInterpreter[IO]() keyPair = nodeConfig.primaryKeyPair id = keyPair.getPublic.toId + clusterStorage = new ClusterStorageInterpreter[IO](id, nodeConfig.nodeType) + _ <- Stream.eval { + val authorizedPeers = + if (nodeConfig.initialActiveFullNodes.contains(id)) nodeConfig.initialActiveFullNodes + else Set.empty[Id] + + clusterStorage.setAuthorizedPeers(authorizedPeers) + } + _ <- Stream.eval { + val activeFullPeers = + if (nodeConfig.initialActiveFullNodes.contains(id)) + nodeConfig.initialActiveFullNodes - id + else + Set.empty[Id] + + clusterStorage.setActiveFullPeers(activeFullPeers) + } + _ <- Stream.eval { + if (nodeConfig.initialActiveFullNodes.contains(id)) + clusterStorage.setAsActivePeer(Full) + else IO.unit + } metrics = new Metrics( registry, clusterStorage, @@ -320,7 +345,7 @@ object ConstellationNode$ extends IOApp with IOApp.WithContext { transactionChainService = TransactionChainService[IO] transactionService = TransactionService[IO](transactionChainService, rateLimiting, metrics) trustManager = TrustManager[IO](id, clusterStorage) - observationService = new ObservationService[IO](trustManager, metrics) + observationService = new ObservationService[IO](trustManager, clusterStorage, metrics) dataResolver = DataResolver[IO]( keyPair, @@ -467,7 +492,7 @@ object ConstellationNode$ extends IOApp with IOApp.WithContext { } } - private def getWhitelisting[F[_]: Sync](cliConfig: CliConfig): F[Map[Id, Option[String]]] = + private def getWhitelisting[F[_]: Sync](cliConfig: CliConfig): F[ListMap[Id, Option[String]]] = for { source <- Sync[F].delay { Source.fromFile(cliConfig.whitelisting) @@ -475,10 +500,10 @@ object ConstellationNode$ extends IOApp with IOApp.WithContext { lines = source.getLines().filter(_.nonEmpty).toList values = lines.map(_.split(",").map(_.trim).toList) mappedValues = values.map { - case id :: alias :: Nil => Map(Id(id) -> Some(alias)) - case id :: Nil => Map(Id(id) -> None) - case _ => Map.empty[Id, Option[String]] - }.fold(Map.empty[Id, Option[String]])(_ ++ _) + case id :: alias :: Nil => ListMap(Id(id) -> Some(alias)) + case id :: Nil => ListMap(Id(id) -> None) + case _ => ListMap.empty[Id, Option[String]] + }.fold(ListMap.empty[Id, Option[String]])(_ ++ _) _ <- Sync[F].delay { source.close() } @@ -527,11 +552,16 @@ object ConstellationNode$ extends IOApp with IOApp.WithContext { whitelisting <- getWhitelisting(cliConfig) + initialActiveFullNodes <- whitelisting match { + case ids if ids.size >= 3 => ids.take(3).keySet.pure[F] + case _ => Sync[F].raiseError(new Throwable(s"Not enough peers whitelisted to pick initial active peers!")) + } + nodeConfig = NodeConfig( seeds = Seq.empty[HostPort], primaryKeyPair = keyPair, isGenesisNode = cliConfig.genesisNode, - isLightNode = cliConfig.lightNode, + nodeType = cliConfig.nodeType, isRollbackNode = cliConfig.rollbackNode, rollbackHeight = cliConfig.rollbackHeight, rollbackHash = cliConfig.rollbackHash, @@ -548,7 +578,8 @@ object ConstellationNode$ extends IOApp with IOApp.WithContext { dataPollingManagerOn = config.getBoolean("constellation.dataPollingManagerOn"), allocAccountBalances = allocAccountBalances, whitelisting = whitelisting, - minRequiredSpace = constellationConfig.getInt("min-required-space") + minRequiredSpace = constellationConfig.getInt("min-required-space"), + initialActiveFullNodes = initialActiveFullNodes ) } yield nodeConfig } diff --git a/src/main/scala/org/constellation/DAO.scala b/src/main/scala/org/constellation/DAO.scala index e496171e0..8ab4d0c29 100644 --- a/src/main/scala/org/constellation/DAO.scala +++ b/src/main/scala/org/constellation/DAO.scala @@ -32,7 +32,7 @@ import org.constellation.domain.snapshot.SnapshotStorageAlgebra import org.constellation.domain.storage.LocalFileStorage import org.constellation.domain.transaction.{ TransactionChainService, - TransactionGossiping, + //TransactionGossiping, TransactionService, TransactionValidator } @@ -220,8 +220,8 @@ class DAO( val messageValidator: MessageValidator = MessageValidator(id) val eigenTrust: EigenTrust[IO] = new EigenTrust[IO](id) - val transactionGossiping: TransactionGossiping[IO] = - new TransactionGossiping[IO](transactionService, clusterStorage, processingConfig.txGossipingFanout, id) +// val transactionGossiping: TransactionGossiping[IO] = +// new TransactionGossiping[IO](transactionService, clusterStorage, processingConfig.txGossipingFanout, id) val transactionValidator: TransactionValidator[IO] = new TransactionValidator[IO](transactionService) val genesis: Genesis[IO] = new Genesis[IO]( @@ -236,15 +236,21 @@ class DAO( metrics ) - val partitionerPeerSampling: PartitionerPeerSampling[IO] = - PartitionerPeerSampling[IO](id, clusterStorage, trustManager) + val snapshotPartitionerPeerSampling: PartitionerPeerSampling[IO] = + PartitionerPeerSampling[IO](id, () => clusterStorage.getActiveFullPeersIds(), trustManager) // TODO: separate functions for fetching id's only? here and in next line + + val blockPartitionerPeerSampling: PartitionerPeerSampling[IO] = + PartitionerPeerSampling[IO](id, () => clusterStorage.getActivePeersIds(), trustManager) val trustDataPollingScheduler: TrustDataPollingScheduler = TrustDataPollingScheduler( ConfigUtil.config, trustManager, clusterStorage, apiClient, - partitionerPeerSampling, + List( + snapshotPartitionerPeerSampling, + blockPartitionerPeerSampling + ), unboundedExecutionContext, metrics ) @@ -257,10 +263,10 @@ class DAO( // ) val snapshotProposalGossipService: SnapshotProposalGossipService[IO] = - SnapshotProposalGossipService[IO](id, keyPair, partitionerPeerSampling, clusterStorage, apiClient, metrics) + SnapshotProposalGossipService[IO](id, keyPair, snapshotPartitionerPeerSampling, clusterStorage, apiClient, metrics) val checkpointBlockGossipService: CheckpointBlockGossipService[IO] = - CheckpointBlockGossipService[IO](id, keyPair, partitionerPeerSampling, clusterStorage, apiClient, metrics) + CheckpointBlockGossipService[IO](id, keyPair, blockPartitionerPeerSampling, clusterStorage, apiClient, metrics) val checkpointBlockValidator: CheckpointBlockValidator[IO] = new CheckpointBlockValidator[IO]( addressService, @@ -300,7 +306,7 @@ class DAO( checkpointsQueueInstance ) - val checkpointCompare = new CheckpointCompare(checkpointService, unboundedExecutionContext) + val checkpointCompare = new CheckpointCompare(checkpointService, clusterStorage, unboundedExecutionContext) val rewardsManager: RewardsManager[IO] = new RewardsManager[IO]( eigenTrust, @@ -342,6 +348,8 @@ class DAO( ) val snapshotService: SnapshotService[IO] = SnapshotService[IO]( + apiClient, + clusterStorage, addressService, checkpointStorage, snapshotServiceStorage, @@ -360,7 +368,9 @@ class DAO( unboundedExecutionContext, metrics, processingConfig, - id + id, + keyPair, + nodeConfig ) // ConfigUtil.constellation.getInt("snapshot.meaningfulSnapshotsCount"), @@ -414,10 +424,12 @@ class DAO( redownloadStorage, downloadService, eigenTrust, + observationService, broadcastService, processingConfig, Blocker.liftExecutionContext(unboundedExecutionContext), id, + keyPair, alias.getOrElse("alias"), metrics, nodeConfig, @@ -436,6 +448,7 @@ class DAO( ConfigUtil.config, consensusManager, nodeStorage, + clusterStorage, checkpointStorage, snapshotServiceStorage, redownloadStorage, @@ -549,7 +562,8 @@ class DAO( val redownloadPeriodicCheck: RedownloadPeriodicCheck = new RedownloadPeriodicCheck( processingConfig.redownloadPeriodicCheckTimeSeconds, unboundedExecutionContext, - redownloadService + redownloadService, + clusterStorage ) def idDir: File = File(s"tmp/${id.medium}") @@ -570,7 +584,7 @@ class DAO( f } - val nodeType: NodeType = NodeType.Full + def nodeType: NodeType = nodeConfig.nodeType lazy val messageService: MessageService[IO] = new MessageService[IO]() diff --git a/src/main/scala/org/constellation/checkpoint/CheckpointCompare.scala b/src/main/scala/org/constellation/checkpoint/CheckpointCompare.scala index 2c4d93d13..36955c959 100644 --- a/src/main/scala/org/constellation/checkpoint/CheckpointCompare.scala +++ b/src/main/scala/org/constellation/checkpoint/CheckpointCompare.scala @@ -1,6 +1,8 @@ package org.constellation.checkpoint import cats.effect.IO +import cats.syntax.all._ +import org.constellation.domain.cluster.ClusterStorageAlgebra import org.constellation.util.PeriodicIO import scala.concurrent.ExecutionContext @@ -8,10 +10,14 @@ import scala.concurrent.duration._ class CheckpointCompare( checkpointService: CheckpointService[IO], + clusterStorage: ClusterStorageAlgebra[IO], unboundedExecutionContext: ExecutionContext ) extends PeriodicIO("CheckpointCompare", unboundedExecutionContext) { - override def trigger(): IO[Unit] = checkpointService.compareAcceptedCheckpoints() + override def trigger(): IO[Unit] = clusterStorage.isAnActivePeer.ifM( + checkpointService.compareAcceptedCheckpoints(), + IO.unit + ) schedule(10.seconds) } diff --git a/src/main/scala/org/constellation/checkpoint/CheckpointService.scala b/src/main/scala/org/constellation/checkpoint/CheckpointService.scala index e51f2e981..3ff15c0e1 100644 --- a/src/main/scala/org/constellation/checkpoint/CheckpointService.scala +++ b/src/main/scala/org/constellation/checkpoint/CheckpointService.scala @@ -454,12 +454,14 @@ class CheckpointService[F[_]: Timer: Clock]( metrics.updateMetric("activeTips", tips.size) (tips.size, readyFacilitators) match { case (size, facilitators) if size >= numFacilitatorPeers && facilitators.nonEmpty => - calculateTipsSOE(tips).flatMap { tipSOE => - facilitatorFilter.filterPeers(facilitators, numFacilitatorPeers, tipSOE).map { - case f if f.size >= numFacilitatorPeers => - Some(PulledTips(tipSOE, calculateFinalFacilitators(f, tipSOE.soe.map(_.hash).reduce(_ + _)))) - case _ => None - } + calculateTipsSOE(tips).flatMap { + tipSOE => + facilitatorFilter.filterPeers(facilitators, numFacilitatorPeers, tipSOE).map { + case f if f.size >= numFacilitatorPeers => + // TODO: joining pool note: calculateFinalFacilitators seems not needed as filterPeers will return the number of facilitators equal (or smaller) to numFacilitatorPeers + Some(PulledTips(tipSOE, calculateFinalFacilitators(f, tipSOE.soe.map(_.hash).reduce(_ + _)))) + case _ => None + } } case (size, _) if size >= numFacilitatorPeers => calculateTipsSOE(tips).map(t => Some(PulledTips(t, Map.empty[Id, PeerData]))) @@ -471,7 +473,7 @@ class CheckpointService[F[_]: Timer: Clock]( def compareAcceptedCheckpoints(): F[Unit] = for { - peers <- clusterStorage.getJoinedPeers + peers <- clusterStorage.getActivePeers randomPeerSeq <- F.delay { Random.shuffle(peers.values.toSeq).take(1) } randomPeer <- if (randomPeerSeq.nonEmpty) randomPeerSeq.head.pure[F] else F.raiseError[PeerData](new Throwable(s"No peers to compare accepted checkpoints.")) diff --git a/src/main/scala/org/constellation/consensus/ConsensusManager.scala b/src/main/scala/org/constellation/consensus/ConsensusManager.scala index 3e88b7b02..393d0d461 100644 --- a/src/main/scala/org/constellation/consensus/ConsensusManager.scala +++ b/src/main/scala/org/constellation/consensus/ConsensusManager.scala @@ -33,7 +33,7 @@ import scala.concurrent.ExecutionContext import scala.concurrent.duration._ import scala.util.Try -class ConsensusManager[F[_]: Concurrent: ContextShift: Timer]( +class ConsensusManager[F[_]]( transactionService: TransactionService[F], checkpointService: CheckpointService[F], checkpointStorage: CheckpointStorageAlgebra[F], @@ -51,7 +51,7 @@ class ConsensusManager[F[_]: Concurrent: ContextShift: Timer]( nodeId: Id, keyPair: KeyPair, checkpointsQueueInstance: DataResolverCheckpointsEnqueue[F] -) { +)(implicit F: Concurrent[F], CS: ContextShift[F], T: Timer[F]) { import ConsensusManager._ @@ -170,7 +170,7 @@ class ConsensusManager[F[_]: Concurrent: ContextShift: Timer]( for { transactions <- transactionService.pullForConsensus(maxTransactionThreshold) _ <- logger.info(s"Pulled for new consensus: ${transactions.size}") - facilitators <- clusterStorage.getReadyAndFullPeers + facilitators <- clusterStorage.getActivePeers tips <- checkpointService.pullTips(facilitators)(metrics) _ <- if (tips.isEmpty) Sync[F].raiseError[Unit](NoTipsForConsensus(roundId, transactions.map(_.transaction), List.empty[Observation])) @@ -179,7 +179,6 @@ class ConsensusManager[F[_]: Concurrent: ContextShift: Timer]( Sync[F].raiseError[Unit](NoPeersForConsensus(roundId, transactions.map(_.transaction), List.empty[Observation])) else Sync[F].unit observations <- observationService.pullForConsensus(maxObservationThreshold) - allFacilitators = tips.get.peers.values.map(_.peerMetadata.id).toSet ++ Set(nodeId) roundData = ( RoundData( roundId, @@ -198,6 +197,10 @@ class ConsensusManager[F[_]: Concurrent: ContextShift: Timer]( def participateInBlockCreationRound(roundData: RoundData): F[(ConsensusInfo[F], RoundData)] = (for { + _ <- clusterStorage.isAnActivePeer.ifM( + F.unit, + F.raiseError(NodeNotAnActiveLightNode(nodeId)) + ) _ <- metrics.incrementMetricAsync("consensus_participateInRound") state <- nodeStorage.getNodeState state <- nodeStorage.getNodeState @@ -229,6 +232,8 @@ class ConsensusManager[F[_]: Concurrent: ContextShift: Timer]( metrics.incrementMetricAsync("consensus_participateInRound_snapshotHeightAboveTipError") case InvalidNodeState(_, _) => metrics.incrementMetricAsync("consensus_participateInRound_invalidNodeStateError") + case NodeNotAnActiveLightNode(_) => + metrics.incrementMetricAsync("consensus_participateInRound_nodeNotAnActiveLightNode") case _ => metrics.incrementMetricAsync("consensus_participateInRound_unknownError") } @@ -349,7 +354,7 @@ class ConsensusManager[F[_]: Concurrent: ContextShift: Timer]( } yield () private[consensus] def adjustPeers(roundData: RoundData): F[RoundData] = - clusterStorage.getPeers.map { peers => + clusterStorage.getActivePeers.map { peers => val initiator = peers.get(roundData.facilitatorId.id) match { case Some(value) => value case None => @@ -467,4 +472,9 @@ object ConsensusManager { extends Exception( s"Can't participate in round $id snapshot height: $snapHeight is above or/equal proposed tip $tipHeight" ) + + case class NodeNotAnActiveLightNode(id: Id) + extends Exception( + s"Node is not and active light node currently. Id=$id" + ) } diff --git a/src/main/scala/org/constellation/consensus/ConsensusScheduler.scala b/src/main/scala/org/constellation/consensus/ConsensusScheduler.scala index bff87cb76..af5529b18 100644 --- a/src/main/scala/org/constellation/consensus/ConsensusScheduler.scala +++ b/src/main/scala/org/constellation/consensus/ConsensusScheduler.scala @@ -6,7 +6,7 @@ import com.typesafe.config.Config import org.constellation.ConfigUtil import org.constellation.consensus.ConsensusManager.{ConsensusError, ConsensusStartError} import org.constellation.domain.checkpointBlock.CheckpointStorageAlgebra -import org.constellation.domain.cluster.NodeStorageAlgebra +import org.constellation.domain.cluster.{ClusterStorageAlgebra, NodeStorageAlgebra} import org.constellation.domain.redownload.RedownloadStorageAlgebra import org.constellation.domain.snapshot.SnapshotStorageAlgebra import org.constellation.schema.NodeState @@ -19,6 +19,7 @@ class ConsensusScheduler( config: Config, consensusManager: ConsensusManager[IO], nodeStorage: NodeStorageAlgebra[IO], + clusterStorage: ClusterStorageAlgebra[IO], checkpointStorage: CheckpointStorageAlgebra[IO], snapshotStorage: SnapshotStorageAlgebra[IO], redownloadStorage: RedownloadStorageAlgebra[IO], @@ -42,7 +43,9 @@ class ConsensusScheduler( canStartOwnConsensus <- nodeStorage.getNodeState.map(NodeState.canStartOwnConsensus) distanceFromMajorityNotExceeded <- isDistanceFromMajorityNotExceeded minTipDistanceNotExceeded <- isMinTipDistanceNotExceeded - _ <- if (canStartOwnConsensus && distanceFromMajorityNotExceeded && minTipDistanceNotExceeded) crossTalkConsensus + isAnActivePeer <- clusterStorage.isAnActivePeer + _ <- if (canStartOwnConsensus && distanceFromMajorityNotExceeded && minTipDistanceNotExceeded && isAnActivePeer) + crossTalkConsensus else skip } yield () diff --git a/src/main/scala/org/constellation/domain/checkpointBlock/CheckpointStorageAlgebra.scala b/src/main/scala/org/constellation/domain/checkpointBlock/CheckpointStorageAlgebra.scala index 496d6d66a..1c39b7f79 100644 --- a/src/main/scala/org/constellation/domain/checkpointBlock/CheckpointStorageAlgebra.scala +++ b/src/main/scala/org/constellation/domain/checkpointBlock/CheckpointStorageAlgebra.scala @@ -89,6 +89,7 @@ trait CheckpointStorageAlgebra[F[_]] { def removeTips(soeHashes: Set[String]): F[Unit] def setTips(tips: Set[String]): F[Unit] def countTips: F[Int] + def countMissingTips: F[Int] def getMinTipHeight: F[Long] def getMinWaitingHeight: F[Option[Long]] diff --git a/src/main/scala/org/constellation/domain/cluster/ClusterStorageAlgebra.scala b/src/main/scala/org/constellation/domain/cluster/ClusterStorageAlgebra.scala index 43b99e068..e7af4ae6c 100644 --- a/src/main/scala/org/constellation/domain/cluster/ClusterStorageAlgebra.scala +++ b/src/main/scala/org/constellation/domain/cluster/ClusterStorageAlgebra.scala @@ -1,7 +1,9 @@ package org.constellation.domain.cluster -import org.constellation.p2p.{JoinedHeight, PeerData} -import org.constellation.schema.{Id, NodeState} +import org.constellation.p2p.{JoinedHeight, MajorityHeight, PeerData} +import org.constellation.schema.snapshot.NextActiveNodes +import org.constellation.schema.{Id, NodeState, NodeType} +import org.constellation.util.Metrics trait ClusterStorageAlgebra[F[_]] { @@ -15,7 +17,6 @@ trait ClusterStorageAlgebra[F[_]] { def getPeers: F[Map[Id, PeerData]] def getReadyPeers: F[Map[Id, PeerData]] def getJoinedPeers: F[Map[Id, PeerData]] - def getReadyAndFullPeers: F[Map[Id, PeerData]] def getLeavingPeers: F[Map[Id, PeerData]] def getNotOfflinePeers: F[Map[Id, PeerData]] def clearPeers(): F[Unit] @@ -24,4 +25,28 @@ trait ClusterStorageAlgebra[F[_]] { def setNodeState(id: Id, state: NodeState): F[Unit] + def getActivePeersIds(withSelfId: Boolean = false): F[Set[Id]] + def getActivePeers: F[Map[Id, PeerData]] + def getActiveLightPeersIds(withSelfId: Boolean = false): F[Set[Id]] + def getActiveLightPeers(): F[Map[Id, PeerData]] + def getActiveFullPeersIds(withSelfId: Boolean = false): F[Set[Id]] + def getActiveFullPeers(): F[Map[Id, PeerData]] + def setActiveFullPeers(peers: Set[Id]): F[Unit] + def isAnActivePeer: F[Boolean] + def isAnActiveLightPeer: F[Boolean] + def isAnActiveFullPeer: F[Boolean] + def getActiveBetweenHeights: F[MajorityHeight] + def setActiveBetweenHeights(majorityHeight: MajorityHeight): F[Unit] + def clearActiveBetweenHeights(): F[Unit] + def getAuthorizedPeers: F[Set[Id]] + def setAuthorizedPeers(peers: Set[Id]): F[Unit] + def addAuthorizedPeer(id: Id): F[Unit] + def removeAuthorizedPeer(id: Id): F[Unit] + def getFullPeers(): F[Map[Id, PeerData]] + def getFullPeersIds(withSelfId: Boolean = false): F[Set[Id]] + def getLightPeers(): F[Map[Id, PeerData]] + def getLightPeersIds(withSelfId: Boolean = false): F[Set[Id]] + def setAsActivePeer(asType: NodeType): F[Unit] + def unsetAsActivePeer(): F[Unit] + def setActivePeers(nextActiveNodes: NextActiveNodes, latestMajorityHeight: MajorityHeight, metrics: Metrics): F[Unit] } diff --git a/src/main/scala/org/constellation/domain/cluster/NodeStorageAlgebra.scala b/src/main/scala/org/constellation/domain/cluster/NodeStorageAlgebra.scala index a3fcac6c0..8f862734b 100644 --- a/src/main/scala/org/constellation/domain/cluster/NodeStorageAlgebra.scala +++ b/src/main/scala/org/constellation/domain/cluster/NodeStorageAlgebra.scala @@ -1,6 +1,6 @@ package org.constellation.domain.cluster -import org.constellation.p2p.SetStateResult +import org.constellation.p2p.{MajorityHeight, SetStateResult} import org.constellation.schema.NodeState trait NodeStorageAlgebra[F[_]] { @@ -8,6 +8,8 @@ trait NodeStorageAlgebra[F[_]] { def setOwnJoinedHeight(height: Long): F[Unit] def clearOwnJoinedHeight(): F[Unit] +// def getActiveBetweenHeights: F[MajorityHeight] + def didParticipateInGenesisFlow: F[Option[Boolean]] def setParticipatedInGenesisFlow(participated: Boolean): F[Unit] diff --git a/src/main/scala/org/constellation/domain/configuration/CliConfig.scala b/src/main/scala/org/constellation/domain/configuration/CliConfig.scala index cd4612783..29f9ab706 100644 --- a/src/main/scala/org/constellation/domain/configuration/CliConfig.scala +++ b/src/main/scala/org/constellation/domain/configuration/CliConfig.scala @@ -1,5 +1,8 @@ package org.constellation.domain.configuration +import org.constellation.schema.NodeType +import org.constellation.schema.NodeType.Light + // scopt requires default args for all properties. // Make sure to check for null early -- don't propogate nulls anywhere else. case class CliConfig( @@ -14,7 +17,7 @@ case class CliConfig( whitelisting: String = null, debug: Boolean = false, startOfflineMode: Boolean = false, - lightNode: Boolean = false, + nodeType: NodeType = Light, genesisNode: Boolean = false, rollbackNode: Boolean = false, rollbackHeight: Long = 0L, diff --git a/src/main/scala/org/constellation/domain/configuration/NodeConfig.scala b/src/main/scala/org/constellation/domain/configuration/NodeConfig.scala index 92f77f285..4ad32147d 100644 --- a/src/main/scala/org/constellation/domain/configuration/NodeConfig.scala +++ b/src/main/scala/org/constellation/domain/configuration/NodeConfig.scala @@ -4,14 +4,15 @@ import java.security.KeyPair import org.constellation.ProcessingConfig import org.constellation.keytool.KeyUtils -import org.constellation.schema.Id +import org.constellation.schema.NodeType.Light +import org.constellation.schema.{Id, NodeType} import org.constellation.util.{AccountBalance, HostPort} case class NodeConfig( seeds: Seq[HostPort] = Seq(), primaryKeyPair: KeyPair = KeyUtils.makeKeyPair(), isGenesisNode: Boolean = false, - isLightNode: Boolean = false, + nodeType: NodeType = Light, isRollbackNode: Boolean = false, rollbackHeight: Long = 0L, rollbackHash: String = "", @@ -28,5 +29,6 @@ case class NodeConfig( dataPollingManagerOn: Boolean = false, allocAccountBalances: Seq[AccountBalance] = Seq.empty, whitelisting: Map[Id, Option[String]] = Map.empty, - minRequiredSpace: Int = 5 + minRequiredSpace: Int = 5, + initialActiveFullNodes: Set[Id] = Set.empty ) diff --git a/src/main/scala/org/constellation/domain/healthcheck/HealthCheckConsensusManagerBase.scala b/src/main/scala/org/constellation/domain/healthcheck/HealthCheckConsensusManagerBase.scala index cde0f4440..8067adf1d 100644 --- a/src/main/scala/org/constellation/domain/healthcheck/HealthCheckConsensusManagerBase.scala +++ b/src/main/scala/org/constellation/domain/healthcheck/HealthCheckConsensusManagerBase.scala @@ -43,6 +43,8 @@ abstract class HealthCheckConsensusManagerBase[ val logger: PrefixedHealthCheckLogger[F] = new PrefixedHealthCheckLogger[F](healthCheckType) // TO IMPLEMENT + def getHealthcheckPeers(): F[Map[Id, PeerData]] + def checkHealthForPeer(key: K): F[Fiber[F, A]] def periodicOperationWhenNoConsensusesInProgress(): F[Unit] @@ -63,7 +65,7 @@ abstract class HealthCheckConsensusManagerBase[ negativeOutcomePeers: Map[K, (HealthcheckConsensusDecision[K], HealthCheckConsensus[F, K, A, B, C])] ): F[Unit] = for { - peers <- clusterStorage.getPeers + peers <- getHealthcheckPeers() toMarkOfflinePeers = peers.filterKeys(negativeOutcomePeers.keySet.map(_.id).contains).values.toList _ <- markOffline(toMarkOfflinePeers) } yield () @@ -227,7 +229,7 @@ abstract class HealthCheckConsensusManagerBase[ def startOwnConsensusForId(key: K, delayedHealthCheckStatus: Fiber[F, A]): F[Unit] = for { - allPeers <- clusterStorage.getPeers + allPeers <- getHealthcheckPeers() roundId <- createHealthcheckRoundId() _ <- startNewRoundForId( key, @@ -243,7 +245,7 @@ abstract class HealthCheckConsensusManagerBase[ roundId: HealthcheckRoundId ): F[Either[HistoricalRoundData[K, A, B], HealthCheckConsensus[F, K, A, B, C]]] = for { - allPeers <- clusterStorage.getPeers + allPeers <- getHealthcheckPeers() delayedHealthCheckStatus <- checkHealthForPeer(key) result <- startNewRoundForId( key, @@ -456,7 +458,7 @@ abstract class HealthCheckConsensusManagerBase[ private def inspectConsensusPeers(consensuses: List[HealthCheckConsensus[F, K, A, B, C]]): F[Unit] = for { _ <- logger.debug("Started inspection of consensus peers!") - peers <- clusterStorage.getPeers + peers <- clusterStorage.getPeers // TODO: getHealthcheckPeers seems not suitable here leavingOrOfflinePeers = peers.filter { case (_, peerData) => isInvalidForJoining(peerData.peerMetadata.nodeState) }.keySet diff --git a/src/main/scala/org/constellation/domain/healthcheck/ping/PingHealthCheckConsensusManager.scala b/src/main/scala/org/constellation/domain/healthcheck/ping/PingHealthCheckConsensusManager.scala index 5c07c82b1..7829b8677 100644 --- a/src/main/scala/org/constellation/domain/healthcheck/ping/PingHealthCheckConsensusManager.scala +++ b/src/main/scala/org/constellation/domain/healthcheck/ping/PingHealthCheckConsensusManager.scala @@ -85,6 +85,8 @@ class PingHealthCheckConsensusManager[F[_]]( case (_, peerData) => canActAsJoiningSource(peerData.peerMetadata.nodeState) }.keySet + override def getHealthcheckPeers(): F[Map[Id, PeerData]] = clusterStorage.getPeers + override def periodicPeersHealthCheck(): F[Unit] = for { peersUnderConsensus <- getPeersUnderConsensus() @@ -93,7 +95,7 @@ class PingHealthCheckConsensusManager[F[_]]( unresponsivePeers <- peerHealthCheck .check(peersUnderConsensus, peersToRunHealthcheckFor) .map(_.map(_.peerMetadata.id).toSet) - peers <- clusterStorage.getPeers + peers <- getHealthcheckPeers() notOfflinePeers = getNotOfflinePeers(peers) historical <- getHistoricalRounds() peerToRunConsensusFor = unresponsivePeers @@ -127,7 +129,7 @@ class PingHealthCheckConsensusManager[F[_]]( override def checkHealthForPeer(key: PingHealthCheckKey): F[Fiber[F, PeerPingHealthCheckStatus]] = for { - allPeers <- clusterStorage.getPeers + allPeers <- getHealthcheckPeers() checkedPeerData = allPeers.get(key.id) // should I check the status if it's not offline or leaving at the moment delayedHealthCheckStatus <- F.start { checkedPeerData @@ -152,7 +154,7 @@ class PingHealthCheckConsensusManager[F[_]]( ]] ): F[Unit] = for { - peers <- clusterStorage.getPeers + peers <- getHealthcheckPeers() notOfflinePeers = getNotOfflinePeers(peers) peersWeDidntHave <- fetchAbsentPeers(consensuses) peersWeStillDontHave = peersWeDidntHave -- notOfflinePeers @@ -184,7 +186,7 @@ class PingHealthCheckConsensusManager[F[_]]( ] ): F[Unit] = for { - peers <- clusterStorage.getPeers + peers <- getHealthcheckPeers() notOfflinePeers = getNotOfflinePeers(peers) peersToAdd = positiveOutcomePeers -- notOfflinePeers.map(PingHealthCheckKey(_)) _ <- if (peersToAdd.nonEmpty) @@ -210,7 +212,7 @@ class PingHealthCheckConsensusManager[F[_]]( roundInProgress = None, joiningHeight = ownJoinedHeight ) - allPeers <- clusterStorage.getPeers + allPeers <- getHealthcheckPeers() peersWeRunConsensusForButDontHave = (rounds -- allPeers.keySet).map { case (id, roundIds) => id -> NodeReconciliationData( @@ -339,7 +341,7 @@ class PingHealthCheckConsensusManager[F[_]]( def manageSchedulingReconciliation(): F[Unit] = for { - peersCount <- clusterStorage.getPeers.map(_.size) + peersCount <- getHealthcheckPeers().map(_.size) currentEpoch <- getTimeInSeconds() runAfter <- runReconciliationRoundAfter.get _ <- runAfter match { diff --git a/src/main/scala/org/constellation/domain/healthcheck/proposal/MissingProposalHealthCheckConsensusManager.scala b/src/main/scala/org/constellation/domain/healthcheck/proposal/MissingProposalHealthCheckConsensusManager.scala index ccd21b575..ac0dcfa1d 100644 --- a/src/main/scala/org/constellation/domain/healthcheck/proposal/MissingProposalHealthCheckConsensusManager.scala +++ b/src/main/scala/org/constellation/domain/healthcheck/proposal/MissingProposalHealthCheckConsensusManager.scala @@ -34,13 +34,17 @@ import org.constellation.domain.healthcheck.{ } import org.constellation.domain.redownload.{MissingProposalFinder, RedownloadService, RedownloadStorageAlgebra} import org.constellation.infrastructure.p2p.ClientInterpreter -import org.constellation.p2p.{Cluster, MajorityHeight} +import org.constellation.p2p.{Cluster, MajorityHeight, PeerData} import org.constellation.schema.Id import org.constellation.schema.snapshot.HeightRange import org.constellation.util.Metrics import scala.concurrent.duration.{DurationInt, FiniteDuration} +// TODO: peers selection needs to be reverified and the negative outcome action as well I think we may need to +// not remove the peers as this means they will only be removed on full active peers and not all the nodes +// so instead we may need to remove these nodes from active pool not from peers list, but in this case +// light nodes also need to be notified class MissingProposalHealthCheckConsensusManager[F[_]]( ownId: Id, cluster: Cluster[F], @@ -84,6 +88,8 @@ class MissingProposalHealthCheckConsensusManager[F[_]]( case class MajorityMaxHeightWithTimestamp(height: Long, timestamp: FiniteDuration) private val lastMajorityHeightWithTimestamp: Ref[F, Option[MajorityMaxHeightWithTimestamp]] = Ref.unsafe(None) + override def getHealthcheckPeers(): F[Map[Id, PeerData]] = clusterStorage.getActiveFullPeers() + override def periodicOperation(): F[Unit] = for { currentTimestamp <- getTimeInSeconds() @@ -109,7 +115,7 @@ class MissingProposalHealthCheckConsensusManager[F[_]]( for { currentTimestamp <- getTimeInSeconds() maybeLatestMajorityWithTimestamp <- lastMajorityHeightWithTimestamp.get - peers <- clusterStorage.getPeers + peers <- getHealthcheckPeers() ownPeer <- nodeStorage.getOwnJoinedHeight peersCache = peers.map { case (id, peerData) => (id, peerData.majorityHeight) diff --git a/src/main/scala/org/constellation/domain/observation/ObservationService.scala b/src/main/scala/org/constellation/domain/observation/ObservationService.scala index 7a5179dfb..93058fce4 100644 --- a/src/main/scala/org/constellation/domain/observation/ObservationService.scala +++ b/src/main/scala/org/constellation/domain/observation/ObservationService.scala @@ -2,16 +2,18 @@ package org.constellation.domain.observation import cats.effect.Concurrent import cats.syntax.all._ -import org.constellation.DAO +import org.constellation.domain.cluster.ClusterStorageAlgebra import org.constellation.domain.consensus.{ConsensusService, ConsensusStatus} import org.constellation.schema.checkpoint.CheckpointCache -import org.constellation.schema.observation.Observation -import org.constellation.schema.transaction.TransactionCacheData +import org.constellation.schema.observation.{NodeJoinsTheCluster, NodeLeavesTheCluster, Observation} import org.constellation.trust.TrustManager import org.constellation.util.Metrics -class ObservationService[F[_]: Concurrent](trustManager: TrustManager[F], metrics: Metrics) - extends ConsensusService[F, Observation] { +class ObservationService[F[_]: Concurrent]( + trustManager: TrustManager[F], + clusterStorage: ClusterStorageAlgebra[F], + metrics: Metrics +) extends ConsensusService[F, Observation] { protected[domain] val pending = new PendingObservationsMemPool[F]() override def metricRecordPrefix: Option[String] = "Observation".some @@ -22,11 +24,13 @@ class ObservationService[F[_]: Concurrent](trustManager: TrustManager[F], metric super .accept(o) .flatTap(_ => trustManager.updateStoredReputation(o)) + .flatTap(_ => handleAcceptedObservation(o)) .flatTap(_ => metrics.incrementMetricAsync[F]("observationAccepted")) def applyAfterRedownload(o: Observation, cpc: Option[CheckpointCache]): F[Unit] = super .accept(o) + // Why arent we updating trustManager??? .flatTap(_ => metrics.incrementMetricAsync[F]("observationAccepted")) .flatTap(_ => metrics.incrementMetricAsync[F]("observationAcceptedFromRedownload")) @@ -41,4 +45,14 @@ class ObservationService[F[_]: Concurrent](trustManager: TrustManager[F], metric } } .void + + private def handleAcceptedObservation(o: Observation): F[Unit] = + o.signedObservationData.data.event match { + case NodeJoinsTheCluster => + clusterStorage.addAuthorizedPeer(o.signedObservationData.data.id) + case NodeLeavesTheCluster => + clusterStorage.removeAuthorizedPeer(o.signedObservationData.data.id) + case _ => + Concurrent[F].unit + } } diff --git a/src/main/scala/org/constellation/domain/p2p/client/ClusterClientAlgebra.scala b/src/main/scala/org/constellation/domain/p2p/client/ClusterClientAlgebra.scala index 795eef426..aa6f6aa3a 100644 --- a/src/main/scala/org/constellation/domain/p2p/client/ClusterClientAlgebra.scala +++ b/src/main/scala/org/constellation/domain/p2p/client/ClusterClientAlgebra.scala @@ -4,6 +4,7 @@ import org.constellation.domain.trust.TrustData import org.constellation.infrastructure.p2p.PeerResponse.PeerResponse import org.constellation.p2p.Cluster.ClusterNode import org.constellation.p2p.{JoinedHeight, PeerUnregister, SetNodeStatus} +import org.constellation.schema.Id trait ClusterClientAlgebra[F[_]] { def getInfo(): PeerResponse[F, List[ClusterNode]] @@ -15,4 +16,8 @@ trait ClusterClientAlgebra[F[_]] { def deregister(peerUnregister: PeerUnregister): PeerResponse[F, Unit] def getTrust(): PeerResponse[F, TrustData] + + def getActiveFullNodes(): PeerResponse[F, Option[Set[Id]]] + + def notifyAboutClusterJoin(): PeerResponse[F, Unit] } diff --git a/src/main/scala/org/constellation/domain/p2p/client/SnapshotClientAlgebra.scala b/src/main/scala/org/constellation/domain/p2p/client/SnapshotClientAlgebra.scala index 855b84eab..8a402dea1 100644 --- a/src/main/scala/org/constellation/domain/p2p/client/SnapshotClientAlgebra.scala +++ b/src/main/scala/org/constellation/domain/p2p/client/SnapshotClientAlgebra.scala @@ -6,6 +6,7 @@ import org.constellation.infrastructure.p2p.PeerResponse.PeerResponse import org.constellation.schema.Id import org.constellation.schema.signature.Signed import org.constellation.schema.snapshot.{LatestMajorityHeight, SnapshotProposal, SnapshotProposalPayload} +import org.constellation.storage.JoinActivePoolCommand trait SnapshotClientAlgebra[F[_]] { def getStoredSnapshots(): PeerResponse[F, List[String]] @@ -29,4 +30,6 @@ trait SnapshotClientAlgebra[F[_]] { def getLatestMajorityHeight(): PeerResponse[F, LatestMajorityHeight] def postPeerProposal(message: GossipMessage[SnapshotProposalPayload]): PeerResponse[F, Unit] + + def notifyNextActivePeer(joinActivePoolCommand: JoinActivePoolCommand): PeerResponse[F, Unit] } diff --git a/src/main/scala/org/constellation/domain/redownload/DownloadService.scala b/src/main/scala/org/constellation/domain/redownload/DownloadService.scala index e491036bb..21521fd10 100644 --- a/src/main/scala/org/constellation/domain/redownload/DownloadService.scala +++ b/src/main/scala/org/constellation/domain/redownload/DownloadService.scala @@ -65,7 +65,7 @@ class DownloadService[F[_]]( private[redownload] def fetchAndPersistBlocks(): F[Unit] = for { - peers <- clusterStorage.getPeers.map(_.values.toList) + peers <- clusterStorage.getActiveFullPeers().map(_.values.toList) readyPeers = peers.filter(p => NodeState.canActAsDownloadSource(p.peerMetadata.nodeState)) clients = readyPeers.map(_.peerMetadata.toPeerClientMetadata).toSet _ <- redownloadService.useRandomClient(clients) { client => diff --git a/src/main/scala/org/constellation/domain/redownload/RedownloadService.scala b/src/main/scala/org/constellation/domain/redownload/RedownloadService.scala index b70c063fa..6b2530780 100644 --- a/src/main/scala/org/constellation/domain/redownload/RedownloadService.scala +++ b/src/main/scala/org/constellation/domain/redownload/RedownloadService.scala @@ -12,19 +12,20 @@ import org.constellation.collection.MapUtils._ import org.constellation.concurrency.cuckoo.CuckooFilter import org.constellation.domain.checkpointBlock.CheckpointStorageAlgebra import org.constellation.domain.cloud.CloudService.CloudServiceEnqueue +import org.constellation.domain.healthcheck.HealthCheckLoggingHelper.logIds import org.constellation.domain.cluster.{BroadcastService, ClusterStorageAlgebra, NodeStorageAlgebra} import org.constellation.domain.redownload.RedownloadService._ import org.constellation.domain.snapshot.SnapshotStorageAlgebra import org.constellation.domain.storage.LocalFileStorage import org.constellation.infrastructure.p2p.PeerResponse.PeerClientMetadata import org.constellation.infrastructure.p2p.{ClientInterpreter, PeerResponse} -import org.constellation.p2p.MajorityHeight +import org.constellation.p2p.{Cluster, MajorityHeight, PeerData} import org.constellation.rewards.RewardsManager import org.constellation.schema.signature.Signed import org.constellation.schema.snapshot._ import org.constellation.schema.{Id, NodeState} import org.constellation.serialization.KryoSerializer -import org.constellation.storage.SnapshotService +import org.constellation.storage.{JoinActivePoolCommand, SnapshotService} import org.constellation.util.Logging.stringifyStackTrace import org.constellation.util.Metrics @@ -65,6 +66,10 @@ class RedownloadService[F[_]: NonEmptyParallel: Applicative]( private val stallCountThreshold = ConfigUtil.getOrElse("constellation.snapshot.stallCount", 4L) private val proposalLookupLimit = ConfigUtil.getOrElse("constellation.snapshot.proposalLookupLimit", 6L) + private val snapshotHeightInterval: Int = ConfigUtil.constellation.getInt("snapshot.snapshotHeightInterval") + private val activePeersRotationInterval: Int = ConfigUtil.constellation.getInt("snapshot.activePeersRotationInterval") + private val activePeersRotationEveryNHeights: Int = snapshotHeightInterval * activePeersRotationInterval + def clear: F[Unit] = for { _ <- redownloadStorage.clear() @@ -72,10 +77,21 @@ class RedownloadService[F[_]: NonEmptyParallel: Applicative]( } yield () // TODO? - def fetchAndUpdatePeersProposals(): F[Unit] = + def fetchAndUpdatePeersProposals(activeFullNodes: Option[List[PeerData]] = None): F[Unit] = for { _ <- logger.debug("Fetching and updating peer proposals") - peers <- clusterStorage.getJoinedPeers.map(_.values.toList) + //peers <- clusterStorage.getJoinedPeers.map(_.values.toList) + //TODO: should joining height be checked here after introducing joining pool? + peers <- { + activeFullNodes match { + case Some(activePeers) => activePeers.pure[F] + case None => + clusterStorage.getActiveFullPeers + .map( + _.values.toList.filter(p => NodeState.isNotOffline(p.peerMetadata.nodeState)) + ) + } + } apiClients = peers.map(_.peerMetadata.toPeerClientMetadata) responses <- apiClients.traverse { client => fetchCreatedSnapshots(client).map(client.id -> _) @@ -97,9 +113,14 @@ class RedownloadService[F[_]: NonEmptyParallel: Applicative]( logger.error(e)(s"Fetch peers proposals error") >> F.pure(Map.empty[Long, Signed[SnapshotProposal]]) } - private[redownload] def fetchStoredSnapshotsFromAllPeers(): F[Map[PeerClientMetadata, Set[String]]] = + private[redownload] def fetchStoredSnapshotsFromAllPeers( + lastActivePeers: Option[List[PeerData]] = None + ): F[Map[PeerClientMetadata, Set[String]]] = for { - peers <- clusterStorage.getPeers.map(_.values.toList) + peers <- lastActivePeers match { + case Some(lastActive) => lastActive.pure[F] + case None => clusterStorage.getActiveFullPeers().map(_.values.toList) + } apiClients = peers.map(_.peerMetadata.toPeerClientMetadata) responses <- apiClients.traverse { client => fetchStoredSnapshots(client) @@ -166,10 +187,11 @@ class RedownloadService[F[_]: NonEmptyParallel: Applicative]( )(client) private def fetchAndStoreMissingSnapshots( - snapshotsToDownload: SnapshotsAtHeight + snapshotsToDownload: SnapshotsAtHeight, + lastActivePeers: Option[List[PeerData]] = None ): EitherT[F, Throwable, Unit] = for { - storedSnapshots <- fetchStoredSnapshotsFromAllPeers().attemptT + storedSnapshots <- fetchStoredSnapshotsFromAllPeers(lastActivePeers).attemptT candidates = snapshotsToDownload.values.toList.map { hash => (hash, storedSnapshots.filter { case (_, hashes) => hashes.contains(hash) }.keySet) }.toMap @@ -210,9 +232,12 @@ class RedownloadService[F[_]: NonEmptyParallel: Applicative]( makeAttempt((stopAt + 1) % poolArray.length) } - private def fetchAndPersistBlocksAboveMajority(majorityState: SnapshotsAtHeight): EitherT[F, Throwable, Unit] = + private def fetchAndPersistBlocksAboveMajority( + majorityState: SnapshotsAtHeight, + lastActivePeers: Option[List[PeerData]] = None + ): EitherT[F, Throwable, Unit] = for { - storedSnapshots <- fetchStoredSnapshotsFromAllPeers().attemptT + storedSnapshots <- fetchStoredSnapshotsFromAllPeers(lastActivePeers).attemptT maxMajorityHash = majorityState(redownloadStorage.maxHeight(majorityState)) peersWithMajority = storedSnapshots.filter { case (_, hashes) => hashes.contains(maxMajorityHash) }.keySet @@ -249,19 +274,23 @@ class RedownloadService[F[_]: NonEmptyParallel: Applicative]( checkpointService.addToAcceptance(block) } - snapshotInfoFromMemPool <- fetchSnapshotInfo(client).flatMap { snapshotInfo => - C.evalOn(boundedExecutionContext)(F.delay { - KryoSerializer.deserializeCast[SnapshotInfo](snapshotInfo) - }) - } - - _ <- checkpointStorage.setTips(snapshotInfoFromMemPool.tips) - _ <- checkpointStorage.setUsages(snapshotInfoFromMemPool.usages) +// snapshotInfoFromMemPool <- fetchSnapshotInfo(client).flatMap { snapshotInfo => +// C.evalOn(boundedExecutionContext)(F.delay { +// KryoSerializer.deserializeCast[SnapshotInfo](snapshotInfo) +// }) +// } +// +// _ <- checkpointStorage.setTips(snapshotInfoFromMemPool.tips) +// _ <- checkpointStorage.setUsages(snapshotInfoFromMemPool.usages) } yield () }.attemptT } yield () - def checkForAlignmentWithMajoritySnapshot(joiningHeight: Option[Long] = None): F[Unit] = { + def checkForAlignmentWithMajoritySnapshot( + joiningHeight: Option[Long] = None, + isJoiningActivePool: Boolean = false, + lastActivePeers: Option[List[PeerData]] = None + ): F[Unit] = { def wrappedRedownload( shouldRedownload: Boolean, meaningfulAcceptedSnapshots: SnapshotsAtHeight, @@ -274,7 +303,7 @@ class RedownloadService[F[_]: NonEmptyParallel: Applicative]( for { _ <- logger.debug(s"Alignment result: $result") _ <- logger.debug("Redownload needed! Applying the following redownload plan:") - _ <- applyPlan(plan, meaningfulMajorityState).value.flatMap(F.fromEither) + _ <- applyPlan(plan, meaningfulMajorityState, lastActivePeers).value.flatMap(F.fromEither) _ <- metrics.updateMetricAsync[F]("redownload_hasLastRedownloadFailed", 0) } yield () }.handleErrorWith { error => @@ -292,11 +321,15 @@ class RedownloadService[F[_]: NonEmptyParallel: Applicative]( lastMajority.minHeight ) - peers <- clusterStorage.getPeers - ownPeer <- nodeStorage.getOwnJoinedHeight + peers <- lastActivePeers match { + case Some(lastActivePeers) => lastActivePeers.map(pd => pd.peerMetadata.id -> pd).toMap.pure[F] + case None => clusterStorage.getActiveFullPeers() + } + //TODO: or should this be in clusterStorage? + activeBetweenHeights <- clusterStorage.getActiveBetweenHeights //nodeStorage.getActiveBetweenHeights peersCache = peers.map { case (id, peerData) => (id, peerData.majorityHeight) - } ++ Map(nodeId -> NonEmptyList.one(MajorityHeight(ownPeer, None))) + } ++ Map(nodeId -> NonEmptyList.one(activeBetweenHeights)) _ <- logger.debug(s"Peers with majority heights $peersCache") @@ -320,13 +353,25 @@ class RedownloadService[F[_]: NonEmptyParallel: Applicative]( calculatedMajorityWithoutGaps = if (gaps.isEmpty) calculatedMajority else calculatedMajority.removeHeightsAbove(gaps.min) - joinHeight = joiningHeight.getOrElse(ownPeer.getOrElse(calculatedMajorityWithoutGaps.maxHeight)) + //TODO: reverify I'm not sure how joining height and activeBetweenHeights should interact here + joinHeight = joiningHeight.getOrElse( + activeBetweenHeights.joined.getOrElse(calculatedMajorityWithoutGaps.maxHeight) + ) _ <- if (joiningHeight.nonEmpty) logger.debug(s"Join height is ${joinHeight}") else F.unit majorityBeforeCutOff = { val intersect = lastMajority.keySet & calculatedMajorityWithoutGaps.keySet - if (lastMajority.isEmpty || intersect.nonEmpty) + val continuation = { + val maybeLastMax = lastMajority.keySet.toList.maximumOption + val maybeNewMin = calculatedMajorityWithoutGaps.keySet.toList.minimumOption + + (maybeLastMax, maybeNewMin) match { + case (Some(lastMax), Some(newMin)) => newMin - lastMax == heightInterval + case _ => false + } + } + if (lastMajority.isEmpty || intersect.nonEmpty || continuation) calculatedMajorityWithoutGaps else lastMajority @@ -357,7 +402,7 @@ class RedownloadService[F[_]: NonEmptyParallel: Applicative]( meaningfulAcceptedSnapshots, meaningfulMajority, redownloadInterval, - joiningHeight.nonEmpty + joiningHeight.nonEmpty || isJoiningActivePool ) _ <- if (shouldPerformRedownload) { @@ -420,20 +465,137 @@ class RedownloadService[F[_]: NonEmptyParallel: Applicative]( _ <- acceptCheckpointBlocks().value.flatMap(F.fromEither) _ <- if (!shouldPerformRedownload) findAndFetchMissingProposals(peerProposals, peersCache) else F.unit + _ <- logger.debug( + s"activeBetweenHeights: $activeBetweenHeights maxMajorityHeight: ${meaningfulMajority.maxHeight}" + ) + _ <- activeBetweenHeights.left + .exists(_ <= meaningfulMajority.maxHeight) + .pure[F] + .ifM( + notifyNewActivePeersAndLeaveThePool(), + logger.debug(s"Still in the active window!") + ) } yield () nodeStorage.getNodeState.map { current => if (joiningHeight.nonEmpty) NodeState.validForDownload.contains(current) else NodeState.validForRedownload.contains(current) - }.ifM( - wrappedCheck, - logger.debug(s"Node state is not valid for redownload, skipping. isDownload=${joiningHeight.nonEmpty}") >> F.unit - ) + }.flatMap(isValidForRedownload => clusterStorage.isAnActiveFullPeer.map(_ && isValidForRedownload)) + .map(_ || isJoiningActivePool) // TODO: Improve - different logs for every reason wrappedCheck won't run + .ifM( + wrappedCheck, + logger + .debug(s"Node state is not valid for redownload, skipping. isDownload=${joiningHeight.nonEmpty}") >> F.unit + ) }.handleErrorWith { error => logger.error(error)("Error during checking alignment with majority snapshot.") >> error.raiseError[F, Unit] } + // TODO: If node is a full node it shouldn't handle any requests here + private def addAndCheckJoinActivePoolCommand(senderId: Id, joinActivePoolCommand: JoinActivePoolCommand): F[Unit] = + for { + _ <- clusterStorage.isAnActiveFullPeer.ifM( + F.raiseError[Unit](new Throwable(s"Full peers don't process join active pool requests.")), + F.unit + ) + currentRequests <- redownloadStorage.addJoinActivePoolCommand(senderId, joinActivePoolCommand) + fullNodes <- clusterStorage.getActiveFullPeers() + _ <- logger.debug(s"Full nodes when joining command processed: ${fullNodes.keySet}") + _ <- clusterStorage.isAnActiveLightPeer.ifM( + if (currentRequests.values.toList.distinct.size == 1 && currentRequests.keySet == fullNodes.keySet) { + redownloadStorage.clearJoinActivePoolCommands() >> + logger.debug( + s"Added joinActivePool request and check if all previous full nodes sent the notification was met!" + ) + } else + F.raiseError[Unit]( + new Throwable( + s"Added joinActivePool request but not all previous full nodes sent the notification. request=$joinActivePoolCommand! currentRequests size = ${currentRequests.keySet} != $fullNodes." + ) + ), + if (currentRequests.values.toList.distinct.size == 1 && currentRequests.size == 3) + redownloadStorage.clearJoinActivePoolCommands() >> + logger.debug(s"Added joinActivePool request and the condition to join active pool was met!") + else + F.raiseError[Unit]( + new Throwable( + s"Added joinActivePool request but the amount of needed requests wasn't met. request=$joinActivePoolCommand! currentRequests size = ${currentRequests.size} != 3." + ) + ) + ) + } yield () + + def redownloadBeforeJoiningActivePeersPool(senderId: Id, joinActivePoolCommand: JoinActivePoolCommand): F[Unit] = { + for { + _ <- addAndCheckJoinActivePoolCommand(senderId, joinActivePoolCommand) + JoinActivePoolCommand(lastActiveFullNodes, lastActiveBetweenHeight) = joinActivePoolCommand + _ <- logger.debug(s"Joining active peers pool! lastActivePeers: ${logIds(lastActiveFullNodes)}") + _ <- logger.debug(s"Joining active peers pool! lastActiveBetweenHeight: $lastActiveBetweenHeight") + peers <- lastActiveFullNodes.toList + .traverse( + clusterStorage.getPeer + ) + .map(_.flatten.map(_.copy(majorityHeight = NonEmptyList.one(lastActiveBetweenHeight)))) // TODO: should we handle the situation when at least one of the last active peers is not found? + _ <- logger.debug(s"Joining active peers pool! peers=$peers") + _ <- clear + _ <- fetchAndUpdatePeersProposals(peers.some) + _ <- checkForAlignmentWithMajoritySnapshot(isJoiningActivePool = true, lastActivePeers = peers.some) + latestMajorityHeight <- redownloadStorage.getLatestMajorityHeight + _ <- logger.debug(s"Joining active peers pool! latestMajorityHeight: $latestMajorityHeight") + // TODO: this was in snapshotService + newActivePeers <- snapshotServiceStorage.getNextSnapshotFacilitators + _ <- logger.debug(s"Joining active peers pool! newActivePeers: $newActivePeers") //TODO: logging function + activeBetweenHeights = Cluster.calculateActiveBetweenHeights( + latestMajorityHeight, + activePeersRotationEveryNHeights + ) + _ <- clusterStorage.setActivePeers(newActivePeers, activeBetweenHeights, metrics) + } yield () + }.handleErrorWith { e => + logger.error(e)("Error during joining active peers pool!") + } + + // TODO: notify new light nodes to join the pool also and update your light nodes list + // TODO: we need a consensus over new light and full nodes between current active full nodes + private def notifyNewActivePeersAndLeaveThePool(): F[Unit] = + for { + _ <- logger.debug(s"Notifying new active peers to join the active pool!") + lastActiveFullNodes <- clusterStorage.getActiveFullPeers().map(_.keySet + nodeId) + lastActiveLightNodes <- clusterStorage.getActiveLightPeers().map(_.keySet) + lastActiveBetweenHeights <- clusterStorage.getActiveBetweenHeights + joinActivePoolCommand = JoinActivePoolCommand(lastActiveFullNodes, lastActiveBetweenHeights) + nextActiveNodes <- snapshotServiceStorage.getStoredSnapshot.map(_.snapshot.nextActiveNodes) + _ <- logger.debug(s"LastActiveFullNodes: ${logIds(lastActiveFullNodes)}") + _ <- logger.debug(s"NextActiveFullNodes: ${logIds(nextActiveNodes.full)}") + _ <- logger.debug(s"LastActiveLightNodes: ${logIds(lastActiveLightNodes)}") + _ <- logger.debug(s"NextActiveLightNodes: ${logIds(nextActiveNodes.light)}") + //TODO: what about missing peers + peersToNotify <- (nextActiveNodes.full ++ nextActiveNodes.light ++ lastActiveLightNodes -- lastActiveFullNodes).toList + .traverse(clusterStorage.getPeer) + .map(_.flatten) + .map(_.map(_.peerMetadata.toPeerClientMetadata)) + + _ <- peersToNotify.traverse { peerClientMetadata => + // notify the peers or if everybody does redownload then it's not needed, maybe send the last snapshot + // to the nodes joining the L0 pool explicitly first + apiClient.snapshot.notifyNextActivePeer(joinActivePoolCommand)(peerClientMetadata) + } + _ <- logger.debug(s"Setting next active peers!") + latestMajorityHeight <- redownloadStorage.getLatestMajorityHeight + activeBetweenHeights = Cluster + .calculateActiveBetweenHeights(latestMajorityHeight, activePeersRotationEveryNHeights) + _ <- clusterStorage + .setActivePeers(nextActiveNodes, activeBetweenHeights, metrics) // TODO: or nextActivePeers but then all nodes should redownload + _ <- nextActiveNodes.full + .contains(nodeId) + .pure[F] + .ifM( + F.unit, + logger.debug("Leaving active peers pool!") + ) + } yield () + def removeUnacceptedSnapshotsFromDisk(): EitherT[F, Throwable, Unit] = for { nextSnapshotHash <- snapshotServiceStorage.getNextSnapshotHash.attemptT @@ -643,13 +805,14 @@ class RedownloadService[F[_]: NonEmptyParallel: Applicative]( private[redownload] def applyPlan( plan: RedownloadPlan, - majorityState: SnapshotsAtHeight + majorityState: SnapshotsAtHeight, + lastActivePeers: Option[List[PeerData]] = None ): EitherT[F, Throwable, Unit] = for { _ <- EitherT.liftF(logRedownload(plan, majorityState)) _ <- EitherT.liftF(logger.debug("Fetching and storing missing snapshots on disk.")) - _ <- fetchAndStoreMissingSnapshots(plan.toDownload) + _ <- fetchAndStoreMissingSnapshots(plan.toDownload, lastActivePeers) _ <- EitherT.liftF(logger.debug("Filling missing createdSnapshots by majority state")) _ <- redownloadStorage.updateCreatedSnapshots(plan).attemptT @@ -658,7 +821,7 @@ class RedownloadService[F[_]: NonEmptyParallel: Applicative]( _ <- redownloadStorage.updateAcceptedSnapshots(plan).attemptT _ <- EitherT.liftF(logger.debug("Fetching and persisting blocks above majority.")) - _ <- fetchAndPersistBlocksAboveMajority(majorityState) + _ <- fetchAndPersistBlocksAboveMajority(majorityState, lastActivePeers) _ <- EitherT.liftF(logger.debug("RedownloadPlan has been applied succesfully.")) @@ -683,13 +846,13 @@ class RedownloadService[F[_]: NonEmptyParallel: Applicative]( _ <- blocksToAccept.toList.traverse { checkpointService.addToAcceptance } } yield ()).attemptT - private[redownload] def updateHighestSnapshotInfo(): EitherT[F, Throwable, Unit] = - for { - highestSnapshotInfo <- redownloadStorage.getAcceptedSnapshots.attemptT - .map(_.maxBy { case (height, _) => height } match { case (_, hash) => hash }) - .flatMap(hash => snapshotInfoStorage.read(hash)) - _ <- snapshotService.setSnapshot(highestSnapshotInfo).attemptT - } yield () +// private[redownload] def updateHighestSnapshotInfo(): EitherT[F, Throwable, Unit] = +// for { +// highestSnapshotInfo <- redownloadStorage.getAcceptedSnapshots.attemptT +// .map(_.maxBy { case (height, _) => height } match { case (_, hash) => hash }) +// .flatMap(hash => snapshotInfoStorage.read(hash)) +// _ <- snapshotService.setSnapshot(highestSnapshotInfo).attemptT +// } yield () private[redownload] def shouldRedownload( acceptedSnapshots: SnapshotsAtHeight, @@ -697,7 +860,7 @@ class RedownloadService[F[_]: NonEmptyParallel: Applicative]( redownloadInterval: Int, isDownload: Boolean = false ): Boolean = - if (majorityState.isEmpty) false + if (majorityState.isEmpty) false // TODO: should it skyrocket when majority isEmpty else isDownload || (getAlignmentResult(acceptedSnapshots, majorityState, redownloadInterval) match { case AlignedWithMajority => false diff --git a/src/main/scala/org/constellation/domain/redownload/RedownloadStorageAlgebra.scala b/src/main/scala/org/constellation/domain/redownload/RedownloadStorageAlgebra.scala index cdc62e64a..fb61aea16 100644 --- a/src/main/scala/org/constellation/domain/redownload/RedownloadStorageAlgebra.scala +++ b/src/main/scala/org/constellation/domain/redownload/RedownloadStorageAlgebra.scala @@ -10,6 +10,7 @@ import org.constellation.domain.redownload.RedownloadService.{ import org.constellation.schema.Id import org.constellation.schema.signature.Signed import org.constellation.schema.snapshot.{FilterData, HeightRange, SnapshotProposal} +import org.constellation.storage.JoinActivePoolCommand trait RedownloadStorageAlgebra[F[_]] { def getCreatedSnapshots: F[SnapshotProposalsAtHeight] @@ -53,4 +54,7 @@ trait RedownloadStorageAlgebra[F[_]] { def resetMajorityStallCount: F[Unit] def incrementMajorityStallCount: F[Unit] + def addJoinActivePoolCommand(senderId: Id, command: JoinActivePoolCommand): F[Map[Id, JoinActivePoolCommand]] + def clearJoinActivePoolCommands(): F[Unit] + } diff --git a/src/main/scala/org/constellation/domain/snapshot/SnapshotStorageAlgebra.scala b/src/main/scala/org/constellation/domain/snapshot/SnapshotStorageAlgebra.scala index c091e3d10..01aac38d1 100644 --- a/src/main/scala/org/constellation/domain/snapshot/SnapshotStorageAlgebra.scala +++ b/src/main/scala/org/constellation/domain/snapshot/SnapshotStorageAlgebra.scala @@ -1,6 +1,6 @@ package org.constellation.domain.snapshot -import org.constellation.schema.snapshot.StoredSnapshot +import org.constellation.schema.snapshot.{NextActiveNodes, StoredSnapshot} trait SnapshotStorageAlgebra[F[_]] { @@ -15,4 +15,5 @@ trait SnapshotStorageAlgebra[F[_]] { def exists(hash: String): F[Boolean] + def getNextSnapshotFacilitators: F[NextActiveNodes] } diff --git a/src/main/scala/org/constellation/domain/transaction/TransactionGossiping.scala b/src/main/scala/org/constellation/domain/transaction/TransactionGossiping.scala index 0fbbc6d82..f508d9eb1 100644 --- a/src/main/scala/org/constellation/domain/transaction/TransactionGossiping.scala +++ b/src/main/scala/org/constellation/domain/transaction/TransactionGossiping.scala @@ -29,7 +29,7 @@ class TransactionGossiping[F[_]: Concurrent: Clock]( private def getDiffPeers(tx: TransactionCacheData): F[Set[Id]] = for { - all <- clusterStorage.getPeers + all <- clusterStorage.getActivePeers //TODO: not important, not used class used <- getUsedPeers(tx) } yield all.keySet.diff(used) diff --git a/src/main/scala/org/constellation/gossip/sampling/PartitionerPeerSampling.scala b/src/main/scala/org/constellation/gossip/sampling/PartitionerPeerSampling.scala index 89a7e7bca..2ab2d5225 100644 --- a/src/main/scala/org/constellation/gossip/sampling/PartitionerPeerSampling.scala +++ b/src/main/scala/org/constellation/gossip/sampling/PartitionerPeerSampling.scala @@ -14,7 +14,7 @@ import org.constellation.util.Partitioner._ class PartitionerPeerSampling[F[_]: Concurrent]( selfId: Id, - clusterStorage: ClusterStorageAlgebra[F], + peersSelectionFn: () => F[Set[Id]], trustManager: TrustManager[F] ) extends PeerSampling[F] { type Partition = IndexedSeq[Id] @@ -35,10 +35,13 @@ class PartitionerPeerSampling[F[_]: Concurrent]( def repartition(selfTdi: TrustDataInternal, tdi: List[TrustDataInternal]): F[Unit] = for { + subjectPeers <- peersSelectionFn() + filteredSelfTdi = selfTdi.copy(view = selfTdi.view.filterKeys(subjectPeers.contains)) + filteredTdi = tdi.map(t => t.copy(view = t.view.filterKeys(subjectPeers.contains))) partitions <- Sync[F].delay { - calculatePartitions(selfTdi, tdi) + calculatePartitions(filteredSelfTdi, filteredTdi) } - _ <- trustDataInternalCache.modify(_ => (tdi, ())) + _ <- trustDataInternalCache.modify(_ => (filteredTdi, ())) _ <- partitionCache.modify(_ => (partitions, ())) } yield () @@ -47,14 +50,14 @@ class PartitionerPeerSampling[F[_]: Concurrent]( private def cachedPartitionsCoverAllPeers: F[Boolean] = for { - peerIds <- clusterStorage.getNotOfflinePeers.map(_.keySet) + peerIds <- peersSelectionFn() cachedTdi <- trustDataInternalCache.get.map(_.map(_.id).toSet) } yield peerIds == cachedTdi private def repartitionWithDefaults(): F[Unit] = for { selfTdi <- trustManager.getTrustDataInternalSelf - peerIds <- clusterStorage.getNotOfflinePeers.map(_.keySet) + peerIds <- peersSelectionFn() cachedTdi <- trustDataInternalCache.get missingTdi = peerIds -- cachedTdi.map(_.id) _ <- logger.debug( @@ -85,7 +88,7 @@ object PartitionerPeerSampling { def apply[F[_]: Concurrent]( selfId: Id, - clusterStorage: ClusterStorageAlgebra[F], + peersSelectionFn: () => F[Set[Id]], trustManager: TrustManager[F] - ): PartitionerPeerSampling[F] = new PartitionerPeerSampling(selfId, clusterStorage, trustManager) + ): PartitionerPeerSampling[F] = new PartitionerPeerSampling(selfId, peersSelectionFn, trustManager) } diff --git a/src/main/scala/org/constellation/gossip/sampling/RandomPeerSampling.scala b/src/main/scala/org/constellation/gossip/sampling/RandomPeerSampling.scala index 48a6283e5..1bea7791b 100644 --- a/src/main/scala/org/constellation/gossip/sampling/RandomPeerSampling.scala +++ b/src/main/scala/org/constellation/gossip/sampling/RandomPeerSampling.scala @@ -47,6 +47,7 @@ class RandomPeerSampling[F[_]](selfId: Id, clusterStorage: ClusterStorageAlgebra .map(IndexedSeq[Id](_: _*)) } + // TODO: in case we start using RandomPeerSampling check what kind of peers should be fetched here private def getPeers: F[Set[Id]] = clusterStorage.getNotOfflinePeers.map(_.keySet) protected def generatePathId: F[String] = FUUID.randomFUUID.map(_.toString) diff --git a/src/main/scala/org/constellation/infrastructure/checkpointBlock/CheckpointStorageInterpreter.scala b/src/main/scala/org/constellation/infrastructure/checkpointBlock/CheckpointStorageInterpreter.scala index 813db70bd..755880fc4 100644 --- a/src/main/scala/org/constellation/infrastructure/checkpointBlock/CheckpointStorageInterpreter.scala +++ b/src/main/scala/org/constellation/infrastructure/checkpointBlock/CheckpointStorageInterpreter.scala @@ -4,6 +4,8 @@ import cats.effect.Concurrent import cats.effect.concurrent.{Ref, Semaphore} import cats.syntax.all._ import constellation.SHA256ByteExt +import io.chrisdavenport.log4cats.SelfAwareStructuredLogger +import io.chrisdavenport.log4cats.slf4j.Slf4jLogger import io.chrisdavenport.mapref.MapRef import org.constellation.concurrency.MapRefUtils import org.constellation.concurrency.MapRefUtils.MapRefOps @@ -20,6 +22,8 @@ import scala.collection.SortedSet class CheckpointStorageInterpreter[F[_]](implicit F: Concurrent[F]) extends CheckpointStorageAlgebra[F] { + implicit val logger: SelfAwareStructuredLogger[F] = Slf4jLogger.getLogger[F] + val checkpoints: MapRef[F, String, Option[CheckpointCache]] = MapRefUtils.ofConcurrentHashMap() // Consider cache and time-removal @@ -300,6 +304,13 @@ class CheckpointStorageInterpreter[F[_]](implicit F: Concurrent[F]) extends Chec def countTips: F[Int] = getTips.map(_.size) + def countMissingTips: F[Int] = + tips.get + .flatMap(_.toList.traverse(tip => getCheckpoint(tip).map(tip -> _))) + .map(_.collect { case (str, None) => str }) + .flatTap(missing => logger.debug(s"Found missing tips: ${missing.map(_.take(5))}")) + .map(_.size) + def getMinTipHeight: F[Long] = for { tips <- getTips diff --git a/src/main/scala/org/constellation/infrastructure/cluster/ClusterStorageInterpreter.scala b/src/main/scala/org/constellation/infrastructure/cluster/ClusterStorageInterpreter.scala index 62e951a8f..14fc6c948 100644 --- a/src/main/scala/org/constellation/infrastructure/cluster/ClusterStorageInterpreter.scala +++ b/src/main/scala/org/constellation/infrastructure/cluster/ClusterStorageInterpreter.scala @@ -1,18 +1,30 @@ package org.constellation.infrastructure.cluster +import cats.data.NonEmptyList import cats.effect.concurrent.Ref import cats.effect.{Clock, Sync} import cats.syntax.all._ import org.constellation.domain.cluster.ClusterStorageAlgebra -import org.constellation.p2p.{JoinedHeight, PeerData} +import org.constellation.p2p.Cluster.MissingActivePeers +import org.constellation.p2p.{JoinedHeight, MajorityHeight, PeerData} +import org.constellation.schema.NodeType.{Full, Light} +import org.constellation.schema.snapshot.NextActiveNodes import org.constellation.schema.{Id, NodeState, NodeType} +import org.constellation.util.Metrics -import scala.concurrent.duration._ - -class ClusterStorageInterpreter[F[_]]()(implicit F: Sync[F], C: Clock[F]) extends ClusterStorageAlgebra[F] { +class ClusterStorageInterpreter[F[_]](nodeId: Id, nodeType: NodeType)(implicit F: Sync[F], C: Clock[F]) + extends ClusterStorageAlgebra[F] { private val peers: Ref[F, Map[Id, PeerData]] = Ref.unsafe(Map.empty[Id, PeerData]) + // TODO: move initialization of the refs below to somewhere outside - Cluster class startup? + private val authorizedPeers: Ref[F, Set[Id]] = Ref.unsafe[F, Set[Id]](Set.empty) + private val activeFullNodes: Ref[F, Set[Id]] = Ref.unsafe[F, Set[Id]](Set.empty) + private val activeLightNodes: Ref[F, Set[Id]] = Ref.unsafe[F, Set[Id]](Set.empty) + private val isActiveFullPeer: Ref[F, Boolean] = Ref.unsafe[F, Boolean](false) + private val isActiveLightPeer: Ref[F, Boolean] = Ref.unsafe[F, Boolean](false) + private val activeBetweenHeights: Ref[F, Option[MajorityHeight]] = Ref.unsafe[F, Option[MajorityHeight]](None) + def getPeer(host: String): F[Option[PeerData]] = getPeers.map(_.values.find { _.peerMetadata.host == host @@ -34,9 +46,6 @@ class ClusterStorageInterpreter[F[_]]()(implicit F: Sync[F], C: Clock[F]) extend def getLeavingPeers: F[Map[Id, PeerData]] = getPeers.map(_.filter(eqNodeState(Set(NodeState.Leaving)))) - def getReadyAndFullPeers: F[Map[Id, PeerData]] = - getReadyPeers.map(_.filter(eqNodeType(NodeType.Full))) - def getReadyPeers: F[Map[Id, PeerData]] = getPeers.map(_.filter(eqNodeState(NodeState.readyStates))) @@ -83,4 +92,164 @@ class ClusterStorageInterpreter[F[_]]()(implicit F: Sync[F], C: Clock[F]) extend def clearPeers(): F[Unit] = peers.modify(_ => (Map.empty, ())) + + def getActiveBetweenHeights: F[MajorityHeight] = + activeBetweenHeights.get.map(_.getOrElse(MajorityHeight(None, None))) + + def clearActiveBetweenHeights(): F[Unit] = + activeBetweenHeights.modify(_ => (None, ())) + + def getAuthorizedPeers: F[Set[Id]] = authorizedPeers.get + + def setAuthorizedPeers(peers: Set[Id]): F[Unit] = + authorizedPeers.modify(_ => (peers, ())) + + def addAuthorizedPeer(id: Id): F[Unit] = + authorizedPeers.modify(current => (current + id, ())) + + def removeAuthorizedPeer(id: Id): F[Unit] = + authorizedPeers.modify(current => (current - id, ())) + + def isAnActiveFullPeer: F[Boolean] = isActiveFullPeer.get + + def isAnActiveLightPeer: F[Boolean] = isActiveLightPeer.get + + def isAnActivePeer: F[Boolean] = + for { + isActiveFullPeer <- isAnActiveFullPeer + isActiveLightPeer <- isAnActiveLightPeer + } yield isActiveLightPeer || isActiveFullPeer + + def getFullPeers(): F[Map[Id, PeerData]] = + getPeers.map(_.filter { case (_, peerData) => peerData.peerMetadata.nodeType == Full }) + + def getFullPeersIds(withSelfId: Boolean): F[Set[Id]] = + getFullPeers().map(_.keySet ++ (if (nodeType == Full) Set(nodeId) else Set.empty)) + + def getLightPeers(): F[Map[Id, PeerData]] = + getPeers.map(_.filter { case (_, peerData) => peerData.peerMetadata.nodeType == Light }) + + def getLightPeersIds(withSelfId: Boolean): F[Set[Id]] = + getLightPeers().map(_.keySet ++ (if (nodeType == Light) Set(nodeId) else Set.empty)) + + def getActiveFullPeersIds(withSelfId: Boolean): F[Set[Id]] = + for { + activeFullNodes <- activeFullNodes.get + ownId <- isAnActiveFullPeer + .map(_ && withSelfId) + .ifM( + Set(nodeId).pure[F], + Set.empty[Id].pure[F] + ) + } yield activeFullNodes ++ ownId + + def getActiveFullPeers(): F[Map[Id, PeerData]] = + for { + allPeers <- getPeers + activeFullNodes <- getActiveFullPeersIds() + activePeersMaybeData = activeFullNodes + .map(id => id -> allPeers.get(id)) + .toMap + activeBetweenHeights <- getActiveBetweenHeights + activePeersData <- activePeersMaybeData match { + case peers if peers.forall(_._2.nonEmpty) => + peers.mapValues(_.get.copy(majorityHeight = NonEmptyList.one(activeBetweenHeights))).pure[F] + case incompleteActivePeers => + val notFound = incompleteActivePeers.collect { case (id, None) => id }.toSet + F.raiseError(MissingActivePeers(notFound, Full)) + } + } yield activePeersData + + def setActiveFullPeers(peers: Set[Id]): F[Unit] = + activeFullNodes.modify(_ => (peers, ())) + + def getActiveLightPeersIds(withSelfId: Boolean = false): F[Set[Id]] = + for { + activeLightNodes <- activeLightNodes.get + ownId <- isAnActiveLightPeer + .map(_ && withSelfId) + .ifM( + Set(nodeId).pure[F], + Set.empty[Id].pure[F] + ) + } yield activeLightNodes ++ ownId + + def getActiveLightPeers(): F[Map[Id, PeerData]] = + for { + allPeers <- getPeers + activeLightNodes <- getActiveLightPeersIds() + activePeersData = activeLightNodes + .map(id => id -> allPeers.get(id)) + .collect { case (id, Some(peerData)) => id -> peerData } + .toMap + notFound = activeLightNodes -- activePeersData.keySet + peersData <- if (notFound.isEmpty) + activePeersData.pure[F] + else { + F.raiseError(MissingActivePeers(notFound, Light)) + } + } yield peersData + + def getActivePeersIds(withSelfId: Boolean): F[Set[Id]] = + for { + activeLight <- getActiveLightPeersIds(withSelfId) + activeFull <- getActiveFullPeersIds(withSelfId) + } yield activeLight ++ activeFull + + def getActivePeers: F[Map[Id, PeerData]] = + for { + activeLight <- getActiveLightPeers() + activeFull <- getActiveFullPeers() + } yield activeLight ++ activeFull + + def setAsActivePeer(asType: NodeType): F[Unit] = + (nodeType, asType) match { + case (Full, Full) => + isActiveFullPeer.modify(_ => (true, ())) + case (Light, Light) => + isActiveLightPeer.modify(_ => (true, ())) + case (_, _) => + F.raiseError(new Throwable("Asked to join the network as a different node type!")) + } + + def unsetAsActivePeer(): F[Unit] = + isActiveFullPeer.modify(_ => (false, ())) >> + isActiveLightPeer.modify(_ => (false, ())) + + def setActiveBetweenHeights(majorityHeight: MajorityHeight): F[Unit] = + activeBetweenHeights.modify(_ => (majorityHeight.some, ())) + // TODO: move logic below outside clusterStorage +// + def setActiveBetweenHeights(starting: Long): F[Unit] = +// + activeBetweenHeights.modify { _ => +// + val ending = starting + activePeersRotationEveryNHeights +// + // TODO: for sure it can be done better +// + (MajorityHeight(if (starting < 2L ) starting.some else (starting - 2L).some, ending.some).some, ()) +// + } + + def setActivePeers( + nextActiveNodes: NextActiveNodes, + latestMajorityHeight: MajorityHeight, + metrics: Metrics + ): F[Unit] = + if (nextActiveNodes.full.contains(nodeId)) + activeFullNodes.modify(_ => (nextActiveNodes.full - nodeId, ())) >> + activeLightNodes.modify(_ => (nextActiveNodes.light, ())) >> + setActiveBetweenHeights(latestMajorityHeight) >> + setAsActivePeer(Full) >> + metrics.updateMetricAsync("snapshot_isMemberOfFullActivePool", 1) >> + metrics.updateMetricAsync("snapshot_isMemberOfLightActivePool", 0) + else if (nextActiveNodes.light.contains(nodeId)) + activeLightNodes.modify(_ => (nextActiveNodes.light - nodeId, ())) >> + activeFullNodes.modify(_ => (nextActiveNodes.full, ())) >> + setAsActivePeer(Light) >> + metrics.updateMetricAsync("snapshot_isMemberOfFullActivePool", 0) >> + metrics.updateMetricAsync("snapshot_isMemberOfLightActivePool", 1) + else + activeFullNodes.modify(_ => (Set.empty, ())) >> + activeLightNodes.modify(_ => (Set.empty, ())) >> + unsetAsActivePeer() >> + clearActiveBetweenHeights() >> + metrics.updateMetricAsync("snapshot_isMemberOfFullActivePool", 0) >> + metrics.updateMetricAsync("snapshot_isMemberOfLightActivePool", 0) + } diff --git a/src/main/scala/org/constellation/infrastructure/configuration/CliConfigParser.scala b/src/main/scala/org/constellation/infrastructure/configuration/CliConfigParser.scala index faec32edd..0a61d25c5 100644 --- a/src/main/scala/org/constellation/infrastructure/configuration/CliConfigParser.scala +++ b/src/main/scala/org/constellation/infrastructure/configuration/CliConfigParser.scala @@ -6,6 +6,7 @@ import cats.syntax.all._ import com.typesafe.config.Config import org.constellation.BuildInfo import org.constellation.domain.configuration.CliConfig +import org.constellation.schema.NodeType.{Full, Light} import org.constellation.util.HostPort import scopt.OParser @@ -47,9 +48,16 @@ object CliConfigParser { opt[Unit]('o', "offline") .action((x, c) => c.copy(startOfflineMode = true)) .text("Start the node in offline mode. Won't connect automatically"), - opt[Unit]('l', "light") - .action((x, c) => c.copy(lightNode = true)) - .text("Start a light node, only validates & stores portions of the graph"), + opt[String]("node-type") + .action( + (x, c) => + x match { + case "light" => c.copy(nodeType = Light) + case "full" => c.copy(nodeType = Full) + case _ => c + } + ) + .text("A node type. Possible values: light,full. Light by default."), opt[Unit]('g', "genesis") .action((x, c) => c.copy(genesisNode = true)) .text("Start in single node genesis mode"), diff --git a/src/main/scala/org/constellation/infrastructure/endpoints/ClusterEndpoints.scala b/src/main/scala/org/constellation/infrastructure/endpoints/ClusterEndpoints.scala index fe4317320..5ee4b7eb6 100644 --- a/src/main/scala/org/constellation/infrastructure/endpoints/ClusterEndpoints.scala +++ b/src/main/scala/org/constellation/infrastructure/endpoints/ClusterEndpoints.scala @@ -18,6 +18,7 @@ import TrustData._ import Id._ import org.constellation.domain.cluster.{ClusterStorageAlgebra, NodeStorageAlgebra} import org.constellation.schema.observation.ObservationEvent +import org.constellation.session.Registration.`X-Id` class ClusterEndpoints[F[_]](implicit F: Concurrent[F]) extends Http4sDsl[F] { @@ -29,7 +30,9 @@ class ClusterEndpoints[F[_]](implicit F: Concurrent[F]) extends Http4sDsl[F] { setNodeStatusEndpoint(cluster, clusterStorage) <+> setJoiningHeightEndpoint(clusterStorage) <+> deregisterEndpoint(cluster) <+> - trustEndpoint(trustManager) + trustEndpoint(trustManager) <+> + getActiveFullNodesEndpoint(clusterStorage) <+> + receiveJoiningNotificationEndpoint(cluster) private def infoEndpoint(cluster: Cluster[F]): HttpRoutes[F] = HttpRoutes.of[F] { @@ -73,6 +76,35 @@ class ClusterEndpoints[F[_]](implicit F: Concurrent[F]) extends Http4sDsl[F] { else TrustData(predicted).pure[F] }.map(_.asJson).flatMap(Ok(_)) } + + private def getActiveFullNodesEndpoint(clusterStorage: ClusterStorageAlgebra[F]): HttpRoutes[F] = HttpRoutes.of[F] { + case GET -> Root / "cluster" / "active-full-nodes" => + clusterStorage.isAnActiveFullPeer + .ifM( + clusterStorage.getActiveFullPeersIds(true).map { + case activeFullNodes if activeFullNodes.isEmpty => none[Set[Id]] + case activeFullNodes => activeFullNodes.some + }, + none[Set[Id]].pure[F] + ) + .flatMap(payload => Ok(payload.asJson)) + } + + private def receiveJoiningNotificationEndpoint(cluster: Cluster[F]): HttpRoutes[F] = HttpRoutes.of[F] { + case req @ POST -> Root / "cluster" / "join-notification" => + for { + maybeId <- F.delay(req.headers.get(`X-Id`).map(_.value).map(Id(_))) + response <- { + maybeId match { + case Some(id) => + cluster + .handleJoiningClusterNotification(id) >> // TODO: shouldn't we have observation service available here? + Ok() + case None => BadRequest() + } + } + } yield response + } } object ClusterEndpoints { diff --git a/src/main/scala/org/constellation/infrastructure/endpoints/ConsensusEndpoints.scala b/src/main/scala/org/constellation/infrastructure/endpoints/ConsensusEndpoints.scala index b6f613ca6..3db15031e 100644 --- a/src/main/scala/org/constellation/infrastructure/endpoints/ConsensusEndpoints.scala +++ b/src/main/scala/org/constellation/infrastructure/endpoints/ConsensusEndpoints.scala @@ -14,7 +14,7 @@ import org.constellation.consensus.Consensus.{ SelectedUnionBlock, UnionBlockProposal } -import org.constellation.consensus.ConsensusManager.SnapshotHeightAboveTip +import org.constellation.consensus.ConsensusManager.{NodeNotAnActiveLightNode, SnapshotHeightAboveTip} import org.constellation.consensus.{ConsensusManager, RoundDataRemote} import org.constellation.domain.transaction.TransactionService import org.constellation.p2p.PeerData @@ -87,6 +87,9 @@ class ConsensusEndpoints[F[_]](implicit F: Concurrent[F], C: ContextShift[F]) ex logger .error(s"Error when participating in new round: ${cmd.roundId} cause: ${err.getMessage}") >> BadRequest(err.getMessage) + case err @ NodeNotAnActiveLightNode(_) => + logger.error(s"Error when participating in new round: ${cmd.roundId} cause: ${err.getMessage}") >> + BadRequest(err.getMessage) case err => logger .error(s"Error when participating in new round: ${cmd.roundId} cause: ${err.getMessage}") >> diff --git a/src/main/scala/org/constellation/infrastructure/endpoints/SnapshotEndpoints.scala b/src/main/scala/org/constellation/infrastructure/endpoints/SnapshotEndpoints.scala index bd12aafaf..6cc9689a8 100644 --- a/src/main/scala/org/constellation/infrastructure/endpoints/SnapshotEndpoints.scala +++ b/src/main/scala/org/constellation/infrastructure/endpoints/SnapshotEndpoints.scala @@ -29,7 +29,7 @@ import org.constellation.schema.snapshot.{ import org.constellation.schema.{Id, NodeState} import org.constellation.serialization.KryoSerializer import org.constellation.session.Registration.`X-Id` -import org.constellation.storage.SnapshotService +import org.constellation.storage.{JoinActivePoolCommand, SnapshotService} import org.constellation.util.Metrics import org.http4s.circe._ import org.http4s.dsl.Http4sDsl @@ -66,6 +66,7 @@ class SnapshotEndpoints[F[_]](implicit F: Concurrent[F], C: ContextShift[F]) ext snapshotInfoStorage: LocalFileStorage[F, SnapshotInfo], snapshotService: SnapshotService[F], nodeStorage: NodeStorageAlgebra[F], + redownloadService: RedownloadService[F], redownloadStorage: RedownloadStorageAlgebra[F], snapshotProposalGossipService: SnapshotProposalGossipService[F], messageValidator: MessageValidator, @@ -81,7 +82,8 @@ class SnapshotEndpoints[F[_]](implicit F: Concurrent[F], C: ContextShift[F]) ext getSnapshotInfo(snapshotService, nodeStorage) <+> getSnapshotInfoByHash(snapshotInfoStorage) <+> getLatestMajorityHeight(redownloadStorage) <+> - postSnapshotProposal(snapshotProposalGossipService, redownloadStorage, messageValidator, metrics) + postSnapshotProposal(snapshotProposalGossipService, redownloadStorage, messageValidator, metrics) <+> + redownloadAndJoinActivePeersPool(redownloadService) def ownerEndpoints( nodeId: Id, @@ -273,6 +275,23 @@ class SnapshotEndpoints[F[_]](implicit F: Concurrent[F], C: ContextShift[F]) ext .map(_.asJson) .flatMap(Ok(_)) } + + private def redownloadAndJoinActivePeersPool(redownloadService: RedownloadService[F]): HttpRoutes[F] = + HttpRoutes.of[F] { + case req @ POST -> Root / "join-active-pool" => + for { + maybeId <- F.delay(req.headers.get(`X-Id`).map(_.value).map(Id(_))) + joinActivePoolCommand <- req.decodeJson[JoinActivePoolCommand] + response <- maybeId match { + case Some(senderId) => + F.start(redownloadService.redownloadBeforeJoiningActivePeersPool(senderId, joinActivePoolCommand)) + .void + .flatMap(foo => Ok(foo.asJson)) + case None => + BadRequest() + } + } yield response + } } object SnapshotEndpoints { @@ -291,6 +310,7 @@ object SnapshotEndpoints { snapshotInfoStorage: LocalFileStorage[F, SnapshotInfo], snapshotService: SnapshotService[F], nodeStorage: NodeStorageAlgebra[F], + redownloadService: RedownloadService[F], redownloadStorage: RedownloadStorageAlgebra[F], snapshotProposalGossipService: SnapshotProposalGossipService[F], messageValidator: MessageValidator, @@ -303,6 +323,7 @@ object SnapshotEndpoints { snapshotInfoStorage, snapshotService, nodeStorage, + redownloadService, redownloadStorage, snapshotProposalGossipService, messageValidator, diff --git a/src/main/scala/org/constellation/infrastructure/p2p/client/ClusterClientInterpreter.scala b/src/main/scala/org/constellation/infrastructure/p2p/client/ClusterClientInterpreter.scala index ca157bed9..96b758920 100644 --- a/src/main/scala/org/constellation/infrastructure/p2p/client/ClusterClientInterpreter.scala +++ b/src/main/scala/org/constellation/infrastructure/p2p/client/ClusterClientInterpreter.scala @@ -47,6 +47,17 @@ class ClusterClientInterpreter[F[_]: ContextShift](client: Client[F], sessionTok def getTrust(): PeerResponse[F, TrustData] = PeerResponse[F, TrustData]("trust")(client, sessionTokenService) + + def getActiveFullNodes(): PeerResponse[F, Option[Set[Id]]] = + PeerResponse[F, Option[Set[Id]]]("cluster/active-full-nodes")(client, sessionTokenService) + + def notifyAboutClusterJoin(): PeerResponse[F, Unit] = + PeerResponse[F, Boolean]("cluster/join-notification", POST)(client, sessionTokenService) { (req, c) => + c.successful(req) + }.flatMapF { isSuccess => + if (isSuccess) F.unit + else F.raiseError(new Throwable("Failed to notify active full node about cluster joining!")) + } } object ClusterClientInterpreter { diff --git a/src/main/scala/org/constellation/infrastructure/p2p/client/SnapshotClientInterpreter.scala b/src/main/scala/org/constellation/infrastructure/p2p/client/SnapshotClientInterpreter.scala index e78bb43c7..fd54b7bf9 100644 --- a/src/main/scala/org/constellation/infrastructure/p2p/client/SnapshotClientInterpreter.scala +++ b/src/main/scala/org/constellation/infrastructure/p2p/client/SnapshotClientInterpreter.scala @@ -12,6 +12,7 @@ import org.constellation.schema.Id import org.constellation.schema.signature.Signed import org.constellation.schema.snapshot.{LatestMajorityHeight, SnapshotProposal, SnapshotProposalPayload} import org.constellation.session.SessionTokenService +import org.constellation.storage.JoinActivePoolCommand import org.http4s.Method._ import org.http4s.Status.Successful import org.http4s.circe.CirceEntityDecoder._ @@ -99,6 +100,14 @@ class SnapshotClientInterpreter[F[_]: ContextShift]( ) ) ) + + def notifyNextActivePeer(joinActivePoolCommand: JoinActivePoolCommand): PeerResponse[F, Unit] = + PeerResponse("join-active-pool", POST)(client, sessionTokenService) { (req, c) => + c.successful(req.withEntity(joinActivePoolCommand)) + }.flatMapF { isSuccess => + if (isSuccess) F.unit + else F.raiseError(new Throwable("Failed to notify next active peer about it's turn to join the pool!")) + } } object SnapshotClientInterpreter { diff --git a/src/main/scala/org/constellation/infrastructure/redownload/RedownloadPeriodicCheck.scala b/src/main/scala/org/constellation/infrastructure/redownload/RedownloadPeriodicCheck.scala index 8e58891fb..c37811dbb 100644 --- a/src/main/scala/org/constellation/infrastructure/redownload/RedownloadPeriodicCheck.scala +++ b/src/main/scala/org/constellation/infrastructure/redownload/RedownloadPeriodicCheck.scala @@ -1,7 +1,8 @@ package org.constellation.infrastructure.redownload import cats.effect.IO -import org.constellation.DAO +import cats.syntax.all._ +import org.constellation.domain.cluster.ClusterStorageAlgebra import org.constellation.domain.redownload.RedownloadService import org.constellation.util.Logging.logThread import org.constellation.util.PeriodicIO @@ -12,13 +13,19 @@ import scala.concurrent.duration._ class RedownloadPeriodicCheck( periodSeconds: Int = 30, unboundedExecutionContext: ExecutionContext, - redownloadService: RedownloadService[IO] + redownloadService: RedownloadService[IO], + clusterStorage: ClusterStorageAlgebra[IO] ) extends PeriodicIO("RedownloadPeriodicCheck", unboundedExecutionContext) { private def triggerRedownloadCheck(): IO[Unit] = - for { - _ <- redownloadService.checkForAlignmentWithMajoritySnapshot() - } yield () + clusterStorage.isAnActiveFullPeer.ifM( + for { + _ <- redownloadService.checkForAlignmentWithMajoritySnapshot() + } yield (), + IO.delay { + logger.debug(s"Node is not an active peers currently! Skipping redownload check!") + } + ) override def trigger(): IO[Unit] = logThread(triggerRedownloadCheck(), "triggerRedownloadCheck", logger) diff --git a/src/main/scala/org/constellation/infrastructure/redownload/RedownloadStorageInterpreter.scala b/src/main/scala/org/constellation/infrastructure/redownload/RedownloadStorageInterpreter.scala index f6c4c4458..3666d56dc 100644 --- a/src/main/scala/org/constellation/infrastructure/redownload/RedownloadStorageInterpreter.scala +++ b/src/main/scala/org/constellation/infrastructure/redownload/RedownloadStorageInterpreter.scala @@ -18,11 +18,12 @@ import org.constellation.schema.Id import org.constellation.schema.signature.Signed import org.constellation.schema.signature.Signed.signed import org.constellation.schema.snapshot.{FilterData, HeightRange, SnapshotProposal} -import java.security.KeyPair +import java.security.KeyPair import org.constellation.collection.MapUtils._ import io.chrisdavenport.log4cats.slf4j.Slf4jLogger import org.constellation.concurrency.cuckoo.{CuckooFilter, MutableCuckooFilter} +import org.constellation.storage.JoinActivePoolCommand import scala.collection.immutable.SortedMap @@ -54,6 +55,8 @@ class RedownloadStorageInterpreter[F[_]]( private val lastMajorityState: Ref[F, SnapshotsAtHeight] = Ref.unsafe(Map.empty) private val lastSentHeight: Ref[F, Long] = Ref.unsafe(-1L) + private val joinActivePoolCommandRequests: Ref[F, Map[Id, JoinActivePoolCommand]] = Ref.unsafe(Map.empty) + private val majorityStallCount: Ref[F, Int] = Ref.unsafe(0) private val localFilter: MutableCuckooFilter[F, ProposalCoordinate] = @@ -232,4 +235,12 @@ class RedownloadStorageInterpreter[F[_]]( implicit val proposalCoordinateToString: ProposalCoordinate => String = { case (id, height) => s"$id:$height" } + + def addJoinActivePoolCommand(senderId: Id, command: JoinActivePoolCommand): F[Map[Id, JoinActivePoolCommand]] = + joinActivePoolCommandRequests.modify { last => + val updated = last + (senderId -> command) + (updated, updated) + } + + def clearJoinActivePoolCommands(): F[Unit] = joinActivePoolCommandRequests.modify(_ => (Map.empty, ())) } diff --git a/src/main/scala/org/constellation/infrastructure/snapshot/SnapshotStorageInterpreter.scala b/src/main/scala/org/constellation/infrastructure/snapshot/SnapshotStorageInterpreter.scala index f502f3c30..ab97fa47d 100644 --- a/src/main/scala/org/constellation/infrastructure/snapshot/SnapshotStorageInterpreter.scala +++ b/src/main/scala/org/constellation/infrastructure/snapshot/SnapshotStorageInterpreter.scala @@ -6,7 +6,7 @@ import cats.syntax.all._ import org.constellation.domain.snapshot.SnapshotStorageAlgebra import org.constellation.domain.storage.LocalFileStorage import org.constellation.schema.snapshot.Snapshot.snapshotZero -import org.constellation.schema.snapshot.StoredSnapshot +import org.constellation.schema.snapshot.{NextActiveNodes, StoredSnapshot} import org.constellation.concurrency.SetRefUtils.RefOps class SnapshotStorageInterpreter[F[_]]( @@ -69,4 +69,7 @@ class SnapshotStorageInterpreter[F[_]]( def setNextSnapshotHash(hash: String): F[Unit] = nextSnapshotHash.set(hash) + def getNextSnapshotFacilitators: F[NextActiveNodes] = + storedSnapshot.get.map(_.snapshot.nextActiveNodes) + } diff --git a/src/main/scala/org/constellation/p2p/Cluster.scala b/src/main/scala/org/constellation/p2p/Cluster.scala index b39df30aa..cb2460fbe 100644 --- a/src/main/scala/org/constellation/p2p/Cluster.scala +++ b/src/main/scala/org/constellation/p2p/Cluster.scala @@ -20,13 +20,17 @@ import org.constellation.infrastructure.p2p.{ClientInterpreter, PeerResponse} import org.constellation.p2p.Cluster.ClusterNode import org.constellation.rewards.EigenTrust import org.constellation.schema.snapshot.LatestMajorityHeight -import org.constellation.schema.{Id, NodeState, PeerNotification} +import org.constellation.schema.{Id, NodeState, NodeType, PeerNotification} import org.constellation.serialization.KryoSerializer import org.constellation.session.SessionTokenService import org.constellation.util.Logging._ import org.constellation.util._ import org.constellation.collection.MapUtils._ +import org.constellation.domain.healthcheck.HealthCheckLoggingHelper.{logId, logIds} +import org.constellation.domain.observation.ObservationService +import org.constellation.schema.observation.{NodeJoinsTheCluster, Observation} +import java.security.KeyPair import scala.concurrent.duration._ import scala.util.Random @@ -107,10 +111,12 @@ class Cluster[F[_]]( redownloadStorage: RedownloadStorageAlgebra[F], downloadService: DownloadService[F], eigenTrust: EigenTrust[F], + observationService: ObservationService[F], broadcastService: BroadcastService[F], processingConfig: ProcessingConfig, unboundedBlocker: Blocker, nodeId: Id, + keyPair: KeyPair, alias: String, metrics: Metrics, nodeConfig: NodeConfig, @@ -125,6 +131,10 @@ class Cluster[F[_]]( T: Timer[F] ) { + val snapshotHeightInterval: Int = ConfigUtil.constellation.getInt("snapshot.snapshotHeightInterval") + val activePeersRotationInterval: Int = ConfigUtil.constellation.getInt("snapshot.activePeersRotationInterval") + val activePeersRotationEveryNHeights: Int = snapshotHeightInterval * activePeersRotationInterval + private val peerDiscovery = PeerDiscovery[F](apiClient, this, clusterStorage, nodeId, unboundedBlocker) implicit val logger = Slf4jLogger.getLogger[F] @@ -274,7 +284,12 @@ class Cluster[F[_]]( _ <- attemptRegisterPeer(hp) _ <- T.sleep(15.seconds) joiningHeight <- discoverJoiningHeight() - _ <- downloadService.download(joiningHeight) + // TODO: verify if this flow is still correct after refactor given joining pool functionality + _ <- clusterStorage.isAnActiveFullPeer.ifM( + downloadService.download(joiningHeight), + notifyCurrentActiveFullNodesAboutJoining() >> + compareAndSet(NodeState.initial, NodeState.Ready).void + ) _ <- setOwnJoinedHeight() _ <- broadcastOwnJoinedHeight() } yield () @@ -321,7 +336,9 @@ class Cluster[F[_]]( height <- nodeStorage.getOwnJoinedHeight _ <- height .map(_.pure[F]) - .getOrElse(discoverJoiningHeight().flatTap { nodeStorage.setOwnJoinedHeight }.flatTap { + .getOrElse(discoverJoiningHeight().flatTap { + nodeStorage.setOwnJoinedHeight + }.flatTap { metrics.updateMetricAsync[F]("cluster_ownJoinedHeight", _) }) } yield () @@ -490,14 +507,15 @@ class Cluster[F[_]]( nodeState = state.nodeState, auxAddresses = state.addresses, resourceInfo = request.resourceInfo, - alias = alias + alias = alias, + nodeType = request.nodeType ) majorityHeight = MajorityHeight(request.majorityHeight) majorityHeights = existingMajorityHeight .map(_.prepend(majorityHeight)) .getOrElse(NonEmptyList.one(majorityHeight)) peerData = PeerData(peerMetadata, majorityHeights) - _ <- updatePeerInfo(peerData) >> updateJoiningHeight >> updateToken >> C.shift >> F.start( + _ <- updatePeerInfo(peerData) /*>> updateJoiningHeight*/ >> updateToken >> C.shift >> F.start( if (withDiscovery) peerDiscovery.discoverFrom(peerMetadata) else F.unit ) } yield () @@ -535,6 +553,7 @@ class Cluster[F[_]]( whitelistingHash = KryoSerializer.serializeAnyRef(nodeConfig.whitelisting).sha256 oOwnToken <- sessionTokenService.getOwnToken() ownToken <- oOwnToken.map(F.pure).getOrElse(F.raiseError(new Throwable("Own token not set!"))) + nodeType = nodeConfig.nodeType _ <- logger.debug( s"Pending registration request: ownHeight=$height peers=$peersSize isInitialFacilitator=$isInitialFacilitator participatedInRollbackFlow=$participatedInRollbackFlow participatedInGenesisFlow=$participatedInGenesisFlow" @@ -554,6 +573,7 @@ class Cluster[F[_]]( joinsAsInitialFacilitator = isInitialFacilitator, whitelistingHash, ownToken, + nodeType, isReconciliationJoin ) } @@ -623,6 +643,7 @@ class Cluster[F[_]]( private def broadcastLeaveRequest(majorityHeight: Long): F[Unit] = { def peerUnregister(c: PeerClientMetadata) = PeerUnregister(peerHostPort.host, peerHostPort.port, nodeId, majorityHeight) + broadcastService .broadcast( c => PeerResponse.run(apiClient.cluster.deregister(peerUnregister(c)), unboundedBlocker)(c), @@ -640,6 +661,61 @@ class Cluster[F[_]]( } validWithLoopbackGuard(hp.host) && !hostAlreadyExists } + + private def notifyCurrentActiveFullNodesAboutJoining(): F[Unit] = { + // TODO: is using tailRecM like this tail recursive already? :D + def loopNotifyUntilSuccess(lastActivePeersData: List[PeerData]): F[Unit] = + F.tailRecM(lastActivePeersData) { + case Nil => + F.raiseError(new Throwable("Run out of active Full nodes when attempted to notify about cluster joining!")) + case peer :: otherPeers => + logger.debug(s"Trying to send joining pool observation to ${peer.peerMetadata.id.medium}") >> + PeerResponse + .run(apiClient.cluster.notifyAboutClusterJoin(), unboundedBlocker)(peer.peerMetadata.toPeerClientMetadata) + .flatMap(_ => logger.debug(s"Success sending joining pool observation to ${peer.peerMetadata.id.medium}")) + .map(_.asRight[List[PeerData]]) + .handleErrorWith { e => + logger.debug(e)( + s"Error sending join notification to active peer ${logId(peer.peerMetadata.id)}. Trying next peers: ${logIds(otherPeers.map(_.peerMetadata.id).toSet)}" + ) >> + otherPeers.asLeft[Unit].pure[F] + } + } + + for { + fullNodes <- clusterStorage.getFullPeers() + _ <- logger.debug( + s"Full nodes to broadcast request about current active full nodes to. Nodes ${fullNodes.keySet.map(_.medium)}" + ) + lastActivePeers <- broadcastService + .broadcast( + PeerResponse.run(apiClient.cluster.getActiveFullNodes(), unboundedBlocker), + subset = fullNodes.keySet + ) + .map(_.values.toList.separate._2.flatten.toSet) + .flatMap { + case setsOfActive if setsOfActive.size == 1 => + logger.debug(s"Unanimous full active nodes indication! ${setsOfActive.flatten.map(_.medium)}") >> + setsOfActive.flatten.pure[F] + case setsOfActive => + logger.debug(s"Non-unanimous full active nodes indication! ${setsOfActive.map(_.map(_.medium))}") >> + Set.empty[Id].pure[F] + } + + lastActivePeersData <- lastActivePeers.toList + .traverse(clusterStorage.getPeer) + .map(_.flatten) + _ <- loopNotifyUntilSuccess(lastActivePeersData) + } yield () + } + + def handleJoiningClusterNotification(id: Id): F[Unit] = // TODO: pass observation service explicitly to Cluster + observationService + .put( + Observation.create(id, NodeJoinsTheCluster)(keyPair) + ) + .flatMap(obs => logger.debug(s"Registered NodeJoinedTheCluster observation: $obs")) >> + metrics.incrementMetricAsync("processedNodeJoinedTheClusterObservation") } object Cluster { @@ -655,10 +731,12 @@ object Cluster { redownloadStorage: RedownloadStorageAlgebra[F], downloadService: DownloadService[F], eigenTrust: EigenTrust[F], + observationService: ObservationService[F], broadcastService: BroadcastService[F], processingConfig: ProcessingConfig, unboundedBlocker: Blocker, nodeId: Id, + keyPair: KeyPair, alias: String, metrics: Metrics, nodeConfig: NodeConfig, @@ -677,10 +755,12 @@ object Cluster { redownloadStorage, downloadService, eigenTrust, + observationService, broadcastService, processingConfig, unboundedBlocker, nodeId, + keyPair, alias, metrics, nodeConfig, @@ -702,4 +782,13 @@ object Cluster { implicit val clusterNodeDecoder: Decoder[ClusterNode] = deriveDecoder } + case class MissingActivePeers(ids: Set[Id], nodeType: NodeType) + extends Throwable(s"Missing active $nodeType peers: $ids") + + def calculateActiveBetweenHeights(starting: Long, rotationInterval: Long): MajorityHeight = { + val ending = starting + rotationInterval + // TODO: for sure it can be done better + MajorityHeight(if (starting < 2L) starting.some else (starting - 2L).some, ending.some) + } + } diff --git a/src/main/scala/org/constellation/p2p/PeerRegistrationRequest.scala b/src/main/scala/org/constellation/p2p/PeerRegistrationRequest.scala index 6bae4f89c..6ed792796 100644 --- a/src/main/scala/org/constellation/p2p/PeerRegistrationRequest.scala +++ b/src/main/scala/org/constellation/p2p/PeerRegistrationRequest.scala @@ -3,7 +3,7 @@ package org.constellation.p2p import io.circe.generic.semiauto.{deriveDecoder, deriveEncoder} import io.circe.{Decoder, Encoder} import org.constellation.ResourceInfo -import org.constellation.schema.Id +import org.constellation.schema.{Id, NodeType} import org.constellation.session.SessionTokenService.Token case class PeerRegistrationRequest( @@ -17,6 +17,7 @@ case class PeerRegistrationRequest( joinsAsInitialFacilitator: Boolean, whitelistingHash: String, token: Token, + nodeType: NodeType, isReconciliationJoin: Boolean ) diff --git a/src/main/scala/org/constellation/rewards/RewardsManager.scala b/src/main/scala/org/constellation/rewards/RewardsManager.scala index d9f68c116..fa7832810 100644 --- a/src/main/scala/org/constellation/rewards/RewardsManager.scala +++ b/src/main/scala/org/constellation/rewards/RewardsManager.scala @@ -71,7 +71,7 @@ class RewardsManager[F[_]: Concurrent]( _ <- eigenTrust.retrain(rewardSnapshot.observations) trustMap <- eigenTrust.getTrustForAddresses - peers <- clusterStorage.getPeers + peers <- clusterStorage.getActiveFullPeers() ownJoiningHeight <- nodeStorage.getOwnJoinedHeight ownMajorityHeight = MajorityHeight( diff --git a/src/main/scala/org/constellation/rollback/RollbackService.scala b/src/main/scala/org/constellation/rollback/RollbackService.scala index 6da2a86f8..99b158870 100644 --- a/src/main/scala/org/constellation/rollback/RollbackService.scala +++ b/src/main/scala/org/constellation/rollback/RollbackService.scala @@ -44,15 +44,15 @@ class RollbackService[F[_]]( private val snapshotInfoV1MaxHeight: Long = ConfigUtil.constellation.getLong("schema.v1.snapshotInfo") - def restore(): EitherT[F, Throwable, Unit] = - for { - _ <- logger.debug("Performing rollback by finding the highest snapshot in the cloud.").attemptT - highest <- getHighest() - _ <- logger.debug(s"Max height found: $highest").attemptT - _ <- highest match { - case (height, hash) => restore(height, hash) - } - } yield () +// def restore(): EitherT[F, Throwable, Unit] = +// for { +// _ <- logger.debug("Performing rollback by finding the highest snapshot in the cloud.").attemptT +// highest <- getHighest() +// _ <- logger.debug(s"Max height found: $highest").attemptT +// _ <- highest match { +// case (height, hash) => restore(height, hash) +// } +// } yield () def restore(height: Long, hash: String): EitherT[F, Throwable, Unit] = validate(height, hash).flatMap(restore(_, height)) diff --git a/src/main/scala/org/constellation/snapshot/SnapshotTrigger.scala b/src/main/scala/org/constellation/snapshot/SnapshotTrigger.scala index 8f0b5495d..c7f5e9721 100644 --- a/src/main/scala/org/constellation/snapshot/SnapshotTrigger.scala +++ b/src/main/scala/org/constellation/snapshot/SnapshotTrigger.scala @@ -12,6 +12,7 @@ import org.constellation.schema.signature.Signed.signed import org.constellation.schema.snapshot.{SnapshotProposal, SnapshotProposalPayload} import org.constellation.storage.{ HeightIntervalConditionNotMet, + NodeNotPartOfL0FacilitatorsPool, NotEnoughSpace, SnapshotError, SnapshotIllegalState, @@ -63,6 +64,8 @@ class SnapshotTrigger(periodSeconds: Int = 5, unboundedExecutionContext: Executi (IO.shift >> cluster.leave(IO.unit)).start >> handleError(NotEnoughSpace, stateSet) case Left(SnapshotIllegalState) => handleError(SnapshotIllegalState, stateSet) + case Left(err @ NodeNotPartOfL0FacilitatorsPool) => + handleError(err, stateSet) case Left(err @ HeightIntervalConditionNotMet) => resetNodeState(stateSet) >> IO(logger.warn(s"Snapshot attempt: ${err.message}")) >> diff --git a/src/main/scala/org/constellation/storage/SnapshotService.scala b/src/main/scala/org/constellation/storage/SnapshotService.scala index 60b25ceaf..d111e3e9a 100644 --- a/src/main/scala/org/constellation/storage/SnapshotService.scala +++ b/src/main/scala/org/constellation/storage/SnapshotService.scala @@ -7,25 +7,34 @@ import cats.syntax.all._ import constellation.withMetric import io.chrisdavenport.log4cats.SelfAwareStructuredLogger import io.chrisdavenport.log4cats.slf4j.Slf4jLogger +import io.circe.Codec +import io.circe.generic.semiauto.deriveCodec import org.constellation.consensus._ import org.constellation.domain.checkpointBlock.CheckpointStorageAlgebra +import org.constellation.domain.cluster.ClusterStorageAlgebra +import org.constellation.domain.configuration.NodeConfig +import org.constellation.domain.healthcheck.HealthCheckLoggingHelper.{logIdShort, logIds} import org.constellation.domain.observation.ObservationService import org.constellation.domain.redownload.RedownloadStorageAlgebra import org.constellation.domain.rewards.StoredRewards import org.constellation.domain.snapshot.SnapshotStorageAlgebra import org.constellation.domain.storage.LocalFileStorage import org.constellation.domain.transaction.TransactionService +import org.constellation.infrastructure.p2p.ClientInterpreter import org.constellation.p2p.DataResolver.DataResolverCheckpointsEnqueue +import org.constellation.p2p.{Cluster, MajorityHeight} import org.constellation.rewards.EigenTrust import org.constellation.schema.Id import org.constellation.schema.checkpoint.{CheckpointBlock, CheckpointCache} -import org.constellation.schema.snapshot.{Snapshot, SnapshotInfo, StoredSnapshot, TotalSupply} +import org.constellation.schema.observation.{NodeMemberOfActivePool, NodeNotMemberOfActivePool, Observation} +import org.constellation.schema.snapshot.{NextActiveNodes, Snapshot, SnapshotInfo, StoredSnapshot, TotalSupply} import org.constellation.schema.transaction.TransactionCacheData import org.constellation.serialization.KryoSerializer import org.constellation.trust.TrustManager import org.constellation.util.Metrics import org.constellation.{ConfigUtil, ProcessingConfig} +import java.security.KeyPair import scala.collection.SortedMap import scala.concurrent.ExecutionContext @@ -40,6 +49,7 @@ class SnapshotService[F[_]: Concurrent]( trustManager: TrustManager[F], snapshotStorage: LocalFileStorage[F, StoredSnapshot], snapshotInfoStorage: LocalFileStorage[F, SnapshotInfo], + clusterStorage: ClusterStorageAlgebra[F], eigenTrustStorage: LocalFileStorage[F, StoredRewards], eigenTrust: EigenTrust[F], redownloadStorage: RedownloadStorageAlgebra[F], @@ -48,7 +58,9 @@ class SnapshotService[F[_]: Concurrent]( unboundedExecutionContext: ExecutionContext, metrics: Metrics, processingConfig: ProcessingConfig, - nodeId: Id + nodeId: Id, + keyPair: KeyPair, + nodeConfig: NodeConfig )(implicit C: ContextShift[F], P: Parallel[F]) { val logger: SelfAwareStructuredLogger[F] = Slf4jLogger.getLogger[F] @@ -56,29 +68,36 @@ class SnapshotService[F[_]: Concurrent]( val snapshotHeightInterval: Int = ConfigUtil.constellation.getInt("snapshot.snapshotHeightInterval") val snapshotHeightDelayInterval: Int = ConfigUtil.constellation.getInt("snapshot.snapshotHeightDelayInterval") val distanceFromMajority: Int = ConfigUtil.constellation.getInt("snapshot.distanceFromMajority") + val activePeersRotationInterval: Int = ConfigUtil.constellation.getInt("snapshot.activePeersRotationInterval") + val activePeersRotationEveryNHeights: Int = snapshotHeightInterval * activePeersRotationInterval + + def getNextSnapshotFacilitators: F[NextActiveNodes] = + snapshotServiceStorage.getStoredSnapshot + .map(_.snapshot.nextActiveNodes) def attemptSnapshot(): EitherT[F, SnapshotError, SnapshotCreated] = for { + lastStoredSnapshot <- snapshotServiceStorage.getStoredSnapshot.attemptT + .leftMap[SnapshotError](SnapshotUnexpectedError) + _ <- checkFullActivePeersPoolMembership(lastStoredSnapshot) + _ <- checkActiveBetweenHeightsCondition() _ <- checkDiskSpace() // _ <- validateMaxAcceptedCBHashesInMemory() - nextHeightInterval <- getNextHeightInterval.attemptT.leftMap(SnapshotUnexpectedError).leftWiden[SnapshotError] + nextHeightInterval <- getNextHeightInterval.attemptT.leftMap[SnapshotError](SnapshotUnexpectedError) _ <- validateMaxDistanceFromMajority(nextHeightInterval) minTipHeight <- checkpointStorage.getMinTipHeight.attemptT - .leftMap(SnapshotUnexpectedError) - .leftWiden[SnapshotError] + .leftMap[SnapshotError](SnapshotUnexpectedError) minWaitingHeight <- checkpointStorage.getMinWaitingHeight.attemptT - .leftMap(SnapshotUnexpectedError) - .leftWiden[SnapshotError] + .leftMap[SnapshotError](SnapshotUnexpectedError) _ <- validateSnapshotHeightIntervalCondition(nextHeightInterval, minTipHeight, minWaitingHeight) blocksWithinHeightInterval <- getBlocksWithinHeightInterval(nextHeightInterval).attemptT - .leftMap(SnapshotUnexpectedError) - .leftWiden[SnapshotError] + .leftMap[SnapshotError](SnapshotUnexpectedError) _ <- validateAcceptedCBsSinceSnapshot(blocksWithinHeightInterval.size) @@ -88,11 +107,28 @@ class SnapshotService[F[_]: Concurrent]( hashesForNextSnapshot = allBlocks.map(_.checkpointBlock.soeHash) hashesWithHeightForNextSnapshot = hashesForNextSnapshot.map((_, nextHeightInterval)) publicReputation <- trustManager.getPredictedReputation.attemptT - .leftMap(SnapshotUnexpectedError) - .leftWiden[SnapshotError] - nextSnapshot <- getNextSnapshot(hashesForNextSnapshot, publicReputation).attemptT - .leftMap(SnapshotUnexpectedError) - .leftWiden[SnapshotError] + .leftMap[SnapshotError](SnapshotUnexpectedError) + publicReputationString = publicReputation.map { case (id, rep) => logIdShort(id) + " -> " + rep }.toList.toString + _ <- logger + .debug(s"Snapshot attempt current reputation: $publicReputationString") + .attemptT + .leftMap[SnapshotError](SnapshotUnexpectedError) + _ <- metrics + .updateMetricAsync("currentSnapshotReputation", publicReputationString) + .attemptT + .leftMap[SnapshotError](SnapshotUnexpectedError) + + // should trustManager generate the facilitators below? + nextActiveAndAuthorizedNodes <- calculateNextActiveNodes(publicReputation, nextHeightInterval, lastStoredSnapshot).attemptT + .leftMap[SnapshotError](SnapshotUnexpectedError) + (nextActiveNodes, authorizedPeers) = nextActiveAndAuthorizedNodes + nextSnapshot <- getNextSnapshot( + hashesForNextSnapshot, + publicReputation, + nextActiveNodes, + authorizedPeers + ).attemptT + .leftMap[SnapshotError](SnapshotUnexpectedError) _ <- snapshotServiceStorage .setNextSnapshotHash(nextSnapshot.hash) .attemptT @@ -150,11 +186,66 @@ class SnapshotService[F[_]: Concurrent]( nextHeightInterval, publicReputation ) + activeFullNodes <- clusterStorage + .getActiveFullPeersIds(true) + .attemptT + .leftMap[SnapshotError](SnapshotUnexpectedError) + activeLightNodes <- clusterStorage + .getActiveLightPeersIds(true) //TODO: withSelfId not necessary as Light node will never attemptSnapshot unless it's a Full node??? + .attemptT + .leftMap[SnapshotError](SnapshotUnexpectedError) + activePeers = activeFullNodes ++ activeLightNodes + inactivePeers <- clusterStorage.getPeers // TODO: take only nodes that successfully sent the Join Cluster Observation? + .map(_.keySet -- activePeers) + .attemptT + .leftMap[SnapshotError](SnapshotUnexpectedError) + _ <- sendActivePoolObservations(activePeers = activePeers, inactivePeers = inactivePeers).attemptT + .leftMap[SnapshotError](SnapshotUnexpectedError) } yield created // TODO //_ <- if (ConfigUtil.isEnabledCloudStorage) cloudStorage.upload(Seq(File(path))).void else Sync[F].unit + private def calculateNextActiveNodes( + publicReputation: Map[Id, Double], + nextHeightInterval: Long, + lastStoredSnapshot: StoredSnapshot + ): F[(NextActiveNodes, Set[Id])] = + for { + fullNodes <- clusterStorage.getFullPeersIds(withSelfId = true) + lightNodes <- clusterStorage.getLightPeersIds(withSelfId = true) + authorizedPeers <- clusterStorage.getAuthorizedPeers + _ <- logger.debug(s"available Full nodes: ${fullNodes.map(_.short)}") + _ <- logger.debug(s"available Light nodes: ${lightNodes.map(_.short)}") + _ <- logger.debug(s"available Authorized nodes: ${authorizedPeers.map(_.short)}") + nextActiveNodes = if (nextHeightInterval % activePeersRotationEveryNHeights == 0) { + val nextFull = publicReputation + .filterKeys(p => fullNodes.contains(p) && authorizedPeers.contains(p)) + .toList + .sortBy { case (_, reputation) => reputation } + .map(_._1) + .reverse + .take(3) + .toSet + + //val lightCandidates = publicReputation.filterKeys(lightNodes.contains) + //val nextLight = (if (lightCandidates.size >= 3) lightCandidates else publicReputation).toList + val nextLight = publicReputation + .filterKeys(p => lightNodes.contains(p) && authorizedPeers.contains(p)) + .toList + .sortBy { case (_, reputation) => reputation } + .map(_._1) + .reverse + .take(3) + .toSet + + NextActiveNodes(light = nextLight, full = nextFull) + } else if (lastStoredSnapshot.snapshot == Snapshot.snapshotZero) + NextActiveNodes(light = Set.empty, full = nodeConfig.initialActiveFullNodes) + else + lastStoredSnapshot.snapshot.nextActiveNodes + } yield (nextActiveNodes, authorizedPeers) + def writeSnapshotInfoToDisk(): EitherT[F, SnapshotInfoIOError, Unit] = getSnapshotInfo().attemptT.flatMap { info => val hash = info.snapshot.snapshot.hash @@ -219,6 +310,12 @@ class SnapshotService[F[_]: Concurrent]( _ <- snapshotServiceStorage.setStoredSnapshot(snapshotInfo.snapshot) _ <- snapshotServiceStorage.setLastSnapshotHeight(snapshotInfo.lastSnapshotHeight) _ <- snapshotServiceStorage.setNextSnapshotHash(snapshotInfo.nextSnapshotHash) + // TODO: find proper momment in the flow for this update + authorizedNodes = if (snapshotInfo.snapshot.snapshot == Snapshot.snapshotZero) + nodeConfig.initialActiveFullNodes + else + snapshotInfo.snapshot.snapshot.authorizedNodes + _ <- clusterStorage.setAuthorizedPeers(authorizedNodes) _ <- checkpointStorage.setCheckpoints(snapshotInfo.checkpoints) _ <- checkpointStorage.setWaitingForAcceptance(snapshotInfo.waitingForAcceptance) _ <- checkpointStorage.setAccepted(snapshotInfo.accepted) @@ -395,11 +492,22 @@ class SnapshotService[F[_]: Concurrent]( private def getNextSnapshot( hashesForNextSnapshot: Seq[String], - publicReputation: Map[Id, Double] + publicReputation: Map[Id, Double], + nextActiveNodes: NextActiveNodes, + authorizedNodes: Set[Id] ): F[Snapshot] = snapshotServiceStorage.getStoredSnapshot .map(_.snapshot.hash) - .map(hash => Snapshot(hash, hashesForNextSnapshot, SortedMap(publicReputation.toSeq: _*))) + .map( + hash => + Snapshot( + hash, + hashesForNextSnapshot, + SortedMap(publicReputation.toSeq: _*), + nextActiveNodes, + authorizedNodes + ) + ) private[storage] def applySnapshot()(implicit C: ContextShift[F]): EitherT[F, SnapshotError, Unit] = { val write: Snapshot => EitherT[F, SnapshotError, Unit] = (currentSnapshot: Snapshot) => @@ -476,6 +584,7 @@ class SnapshotService[F[_]: Concurrent]( .flatMap { case maybeBlocks if maybeBlocks.exists( + // TODO: What is this? maybeCache => maybeCache._2.isEmpty || maybeCache._2.isEmpty ) => EitherT { @@ -565,11 +674,83 @@ class SnapshotService[F[_]: Concurrent]( _ <- transactionService.applySnapshotDirect(txs.map(TransactionCacheData(_))) } yield () + + private def isMemberOfFullActivePeersPool(snapshot: Snapshot): Boolean = + snapshot.nextActiveNodes.full.contains(nodeId) // full or light??? + + private def checkFullActivePeersPoolMembership(storedSnapshot: StoredSnapshot): EitherT[F, SnapshotError, Unit] = + for { + checkedSnapshot <- if (storedSnapshot.snapshot == Snapshot.snapshotZero) + EitherT.rightT[F, SnapshotError]( + storedSnapshot.snapshot + .copy(nextActiveNodes = NextActiveNodes(light = Set.empty, nodeConfig.initialActiveFullNodes)) + ) + else { + EitherT.rightT[F, SnapshotError]( + storedSnapshot.snapshot + ) + } + activeBetweenHeights = Cluster.calculateActiveBetweenHeights(0L, activePeersRotationEveryNHeights) + _ <- if (storedSnapshot.snapshot == Snapshot.snapshotZero) + clusterStorage + .setActivePeers(checkedSnapshot.nextActiveNodes, activeBetweenHeights, metrics) + .attemptT + .leftMap[SnapshotError](SnapshotUnexpectedError) + else + EitherT.rightT[F, SnapshotError](()) + + _ <- if (isMemberOfFullActivePeersPool(checkedSnapshot)) + EitherT.rightT[F, SnapshotError](()) + else + EitherT.leftT[F, Unit](NodeNotPartOfL0FacilitatorsPool.asInstanceOf[SnapshotError]) + + } yield () + + private def checkActiveBetweenHeightsCondition(): EitherT[F, SnapshotError, Unit] = + for { + nextHeight <- getNextHeightInterval.attemptT + .leftMap[SnapshotError](SnapshotUnexpectedError) + activeBetweenHeights <- clusterStorage.getActiveBetweenHeights.attemptT + .leftMap[SnapshotError](SnapshotUnexpectedError) + result <- if (activeBetweenHeights.joined.forall(_ <= nextHeight) && activeBetweenHeights.left.forall( + _ >= nextHeight + )) + EitherT.rightT[F, SnapshotError](()) + else + EitherT.leftT[F, Unit](ActiveBetweenHeightsConditionNotMet.asInstanceOf[SnapshotError]) //can we do it without asInstanceOf? + } yield result + + //def getTimeInSeconds(): F[Long] = C.monotonic(SECONDS) + + private def sendActivePoolObservations(activePeers: Set[Id], inactivePeers: Set[Id]): F[Unit] = + for { + _ <- logger.debug(s"sending observation for ActivePeers: ${logIds(activePeers)}") + _ <- logger.debug(s"sending observation for InactivePeers: ${logIds(inactivePeers)}") + //currentEpoch <- getTimeInSeconds() + activePeersObservations = activePeers.map { id => + Observation.create(id, NodeMemberOfActivePool /*, currentEpoch*/ )(keyPair) + } + inactivePeersObservations = inactivePeers.map { id => + Observation.create(id, NodeNotMemberOfActivePool /*, currentEpoch*/ )(keyPair) + } + observations = activePeersObservations ++ inactivePeersObservations + // _ <- (activePeersObservations ++ inactivePeersObservations).toList.traverse { observation => + // trustManager.updateStoredReputation(observation) + // } + _ <- observations.toList.traverse( + observationService + .put(_) + .void + .handleErrorWith(e => logger.warn(e)("Error during sending active pool membership observations")) + ) + } yield () } object SnapshotService { def apply[F[_]: Concurrent]( + apiClient: ClientInterpreter[F], + clusterStorage: ClusterStorageAlgebra[F], addressService: AddressService[F], checkpointStorage: CheckpointStorageAlgebra[F], snapshotServiceStorage: SnapshotStorageAlgebra[F], @@ -588,7 +769,9 @@ object SnapshotService { unboundedExecutionContext: ExecutionContext, metrics: Metrics, processingConfig: ProcessingConfig, - nodeId: Id + nodeId: Id, + keyPair: KeyPair, + nodeConfig: NodeConfig )(implicit C: ContextShift[F], P: Parallel[F]) = new SnapshotService[F]( addressService, @@ -601,6 +784,7 @@ object SnapshotService { trustManager, snapshotStorage, snapshotInfoStorage, + clusterStorage, eigenTrustStorage, eigenTrust, redownloadStorage, @@ -609,10 +793,24 @@ object SnapshotService { unboundedExecutionContext, metrics, processingConfig, - nodeId + nodeId, + keyPair, + nodeConfig ) } +case class JoinActivePoolCommand(lastActiveFullNodes: Set[Id], lastActiveBetweenHeight: MajorityHeight) + +object JoinActivePoolCommand { + implicit val joinActivePoolCommandCodec: Codec[JoinActivePoolCommand] = deriveCodec +} + +//sealed trait ActivePoolAction +//case object JoinLightPool extends ActivePoolAction +//case object JoinFullPool extends ActivePoolAction +//case object LeaveLightPool extends ActivePoolAction +//case object LeaveFullPool extends ActivePoolAction + sealed trait SnapshotError extends Throwable { def message: String } @@ -661,3 +859,11 @@ case class EigenTrustIOError(cause: Throwable) extends SnapshotError { } case class SnapshotCreated(hash: String, height: Long, publicReputation: Map[Id, Double]) + +case object NodeNotPartOfL0FacilitatorsPool extends SnapshotError { + def message: String = "Node is not a part of L0 facilitators pool at the current snapshot height!" +} + +case object ActiveBetweenHeightsConditionNotMet extends SnapshotError { + def message: String = "Next snapshot height is not between current active heights range on the given node!" +} diff --git a/src/main/scala/org/constellation/trust/SelfAvoidingWalk.scala b/src/main/scala/org/constellation/trust/SelfAvoidingWalk.scala index e22be92c0..15ff14d3d 100644 --- a/src/main/scala/org/constellation/trust/SelfAvoidingWalk.scala +++ b/src/main/scala/org/constellation/trust/SelfAvoidingWalk.scala @@ -172,7 +172,7 @@ object SelfAvoidingWalk extends StrictLogging { iterationNum += 1 walkScores = merged walkProbability = renormalized - println(s"runWalkBatches - Batch number $iterationNum with delta $delta") + logger.debug(s"runWalkBatches - Batch number $iterationNum with delta $delta") } reweightEdges(walkProbability, nodes.map { n => @@ -333,7 +333,8 @@ object SelfAvoidingWalk extends StrictLogging { }.getOrElse(weightedEdgeZero) // TODO: Normalize again - weightedEdgesAll.zipWithIndex.foreach { println } + // it doesn't do anything so I'm commenting it out + //weightedEdgesAll.zipWithIndex.foreach { println } // println(s"n1 id: ${n1.id}") @@ -352,15 +353,15 @@ object SelfAvoidingWalk extends StrictLogging { var nodesCycle = nodes // TODO: Fix stack overflow issue - // if (nodesCycle.size > 2) { //note, need min of 3 nodes - // println(s"runWalkFeedbackUpdateSingleNode nodes ${nodes.toList} for node $selfId") - // (0 until feedbackCycles).foreach { cycle => - // println(s"feedback cycle $cycle for node $selfId") - // nodesCycle = runWalkBatchesFeedback(selfId, nodes, batchIterationSize, epsilon, maxIterations) - // } - // } + if (nodesCycle.size > 2) { //note, need min of 3 nodes + logger.debug(s"runWalkFeedbackUpdateSingleNode nodes ${nodes.toList} for node $selfId") + (0 until feedbackCycles).foreach { cycle => + logger.debug(s"feedback cycle $cycle for node $selfId") + nodesCycle = runWalkBatchesFeedback(selfId, nodes, batchIterationSize, epsilon, maxIterations) + } + } val res: TrustNode = nodesCycle.filter(_.id == selfId).head - println(s"runWalkFeedbackUpdateSingleNode res: TrustNode ${res} for node $selfId") + logger.debug(s"runWalkFeedbackUpdateSingleNode res: TrustNode ${res} for node $selfId") res } diff --git a/src/main/scala/org/constellation/trust/TrustDataPollingScheduler.scala b/src/main/scala/org/constellation/trust/TrustDataPollingScheduler.scala index 83b206977..18225ecf6 100644 --- a/src/main/scala/org/constellation/trust/TrustDataPollingScheduler.scala +++ b/src/main/scala/org/constellation/trust/TrustDataPollingScheduler.scala @@ -20,13 +20,14 @@ class TrustDataPollingScheduler( trustManager: TrustManager[IO], clusterStorage: ClusterStorageAlgebra[IO], apiClient: ClientInterpreter[IO], - partitionerPeerSampling: PartitionerPeerSampling[IO], + partitioners: List[PartitionerPeerSampling[IO]], unboundedExecutionContext: ExecutionContext, metrics: Metrics ) extends PeriodicIO("TrustDataPollingScheduler", unboundedExecutionContext) { override def trigger(): IO[Unit] = - clusterStorage.getPeers + clusterStorage + .getActiveFullPeers() .map(_.filter(t => NodeState.validForLettingOthersDownload.contains(t._2.peerMetadata.nodeState)).values.toList) .flatMap( _.traverse( @@ -45,7 +46,7 @@ class TrustDataPollingScheduler( for { selfTdi <- trustManager.getTrustDataInternalSelf _ <- trustManager.handleTrustScoreUpdate(tdi) - _ <- partitionerPeerSampling.repartition(selfTdi, tdi) + _ <- partitioners.traverse(_.repartition(selfTdi, tdi)) //TODO: error handling? } yield () } .flatTap(_ => metrics.incrementMetricAsync[IO]("trustDataPollingRound")) @@ -61,7 +62,7 @@ object TrustDataPollingScheduler { trustManager: TrustManager[IO], clusterStorage: ClusterStorageAlgebra[IO], apiClient: ClientInterpreter[IO], - partitionerPeerSampling: PartitionerPeerSampling[IO], + partitioners: List[PartitionerPeerSampling[IO]], unboundedExecutionContext: ExecutionContext, metrics: Metrics ) = @@ -70,7 +71,7 @@ object TrustDataPollingScheduler { trustManager, clusterStorage, apiClient, - partitionerPeerSampling, + partitioners, unboundedExecutionContext, metrics ) diff --git a/src/main/scala/org/constellation/trust/TrustManager.scala b/src/main/scala/org/constellation/trust/TrustManager.scala index 97b08ecc9..a95b360ab 100644 --- a/src/main/scala/org/constellation/trust/TrustManager.scala +++ b/src/main/scala/org/constellation/trust/TrustManager.scala @@ -21,8 +21,8 @@ class TrustManager[F[_]](nodeId: Id, clusterStorage: ClusterStorageAlgebra[F])(i def getTrustDataInternalSelf: F[TrustDataInternal] = getStoredReputation.map(TrustDataInternal(nodeId, _)) def handleTrustScoreUpdate(peerTrustScores: List[TrustDataInternal]): F[Unit] = - clusterStorage.getPeers.flatMap { peers => - if (peers.size > 2) { + clusterStorage.getActiveFullPeers().flatMap { peers => + if (peers.size > 1) { for { reputation <- getStoredReputation _ <- logger.info(s"Begin handleTrustScoreUpdate for peerTrustScores: ${peerTrustScores.toString()}") @@ -44,7 +44,7 @@ class TrustManager[F[_]](nodeId: Id, clusterStorage: ClusterStorageAlgebra[F])(i trustNodesInternal = scores ++ simulatedTrustDataForMissingNodes trustNodes = TrustManager.calculateTrustNodes(trustNodesInternal.distinct, nodeId, scoringMap) - // _ <- logger.debug(s"TrustManager.calculateTrustNodes for trustNodes: ${trustNodes.toString()}") + _ <- logger.debug(s"TrustManager.calculateTrustNodes for trustNodes: ${trustNodes.toString()}") idMappedScores: Map[Id, Double] = SelfAvoidingWalk .runWalkFeedbackUpdateSingleNode( @@ -69,7 +69,7 @@ class TrustManager[F[_]](nodeId: Id, clusterStorage: ClusterStorageAlgebra[F])(i def filterFn(peers: Map[Id, PeerData])(id: Id): Boolean = !peers.get(id).exists(p => NodeState.offlineStates.contains(p.peerMetadata.nodeState)) - clusterStorage.getPeers.flatMap { peers => + clusterStorage.getPeers.flatMap { peers => // TODO: getPeers or getActivePeerInfo here storedReputation.get.map( reputation => peers.mapValues(_ => 1d) + (nodeId -> 1d) ++ reputation.filterKeys(filterFn(peers)) ) @@ -81,8 +81,7 @@ class TrustManager[F[_]](nodeId: Id, clusterStorage: ClusterStorageAlgebra[F])(i val id = o.signedObservationData.data.id storedReputation.modify { reputation => - // val updated = Math.max(reputation.getOrElse(id, 1d) + score, -1d) - val updated = 1d + val updated = Math.max(reputation.getOrElse(id, 1d) + score, -1d) (reputation + (id -> updated), ()) } } @@ -103,6 +102,8 @@ object TrustManager { case _: RequestTimeoutOnConsensus => -0.1 case _: RequestTimeoutOnResolving => -0.1 case _: CheckpointBlockInvalid => -0.1 + case _ @NodeMemberOfActivePool => -0.05 + case _ @NodeNotMemberOfActivePool => 0.02 case _ => 0d } diff --git a/src/main/scala/org/constellation/util/HealthChecker.scala b/src/main/scala/org/constellation/util/HealthChecker.scala index 1da5f634b..6f5582535 100644 --- a/src/main/scala/org/constellation/util/HealthChecker.scala +++ b/src/main/scala/org/constellation/util/HealthChecker.scala @@ -69,7 +69,7 @@ class HealthChecker[F[_]: Concurrent]( private def collectNextSnapshotHeights(): F[Map[Id, Long]] = for { - peers <- clusterStorage.getJoinedPeers + peers <- clusterStorage.getActiveFullPeers() // TODO: was getJoinedPeers before nextSnapshotHeights <- peers.values.toList .map(_.peerMetadata.toPeerClientMetadata) .traverse( diff --git a/src/main/scala/org/constellation/util/MetricsUpdater.scala b/src/main/scala/org/constellation/util/MetricsUpdater.scala index 451d83f9f..6cbd50cc9 100644 --- a/src/main/scala/org/constellation/util/MetricsUpdater.scala +++ b/src/main/scala/org/constellation/util/MetricsUpdater.scala @@ -46,6 +46,9 @@ class MetricsUpdater( _ <- checkpointStorage.countTips >>= { value => metrics.updateMetricAsync[IO]("activeTips", value) } + _ <- checkpointStorage.countMissingTips >>= { value => + metrics.updateMetricAsync[IO]("missingActiveTips", value) + } _ <- checkpointStorage.getMinTipHeight >>= { value => metrics.updateMetricAsync[IO]("minTipHeight", value) } diff --git a/src/test/scala/org/constellation/Fixtures.scala b/src/test/scala/org/constellation/Fixtures.scala index 007ec1ec9..5c08e7523 100644 --- a/src/test/scala/org/constellation/Fixtures.scala +++ b/src/test/scala/org/constellation/Fixtures.scala @@ -4,7 +4,6 @@ import java.io.{BufferedReader, FileInputStream, InputStreamReader} import java.net.InetSocketAddress import java.security.{KeyPair, KeyStore, PrivateKey, PublicKey} import java.util.Random - import cats.effect.{ContextShift, IO} import constellation._ import io.chrisdavenport.log4cats.slf4j.Slf4jLogger @@ -15,6 +14,7 @@ import org.constellation.schema.edge.{Edge, EdgeHashType, ObservationEdge, Signe import org.constellation.schema.signature.{HashSignature, SignatureBatch} import org.constellation.schema.transaction.{LastTransactionRef, Transaction, TransactionEdgeData} import org.constellation.schema.Id +import org.constellation.schema.NodeType.Full import scala.concurrent.ExecutionContext import scala.util.{Random => ScalaRandom} @@ -89,7 +89,8 @@ object Fixtures { val address4: InetSocketAddress = constellation.addressToSocket("localhost:16184") val address5: InetSocketAddress = constellation.addressToSocket("localhost:16185") - val addPeerRequest = PeerMetadata("host:", 1, id: Id, resourceInfo = ResourceInfo(diskUsableBytes = 1073741824)) + val addPeerRequest = + PeerMetadata("host:", 1, id: Id, resourceInfo = ResourceInfo(diskUsableBytes = 1073741824), nodeType = Full) val tempKeySet = Seq(tempKey, tempKey2, tempKey3, tempKey4, tempKey5) diff --git a/src/test/scala/org/constellation/TestHelpers.scala b/src/test/scala/org/constellation/TestHelpers.scala index 2d4b4ae91..030a7a78e 100644 --- a/src/test/scala/org/constellation/TestHelpers.scala +++ b/src/test/scala/org/constellation/TestHelpers.scala @@ -2,7 +2,6 @@ package org.constellation import java.security.KeyPair import java.util.UUID - import better.files.File import cats.effect.IO import com.google.common.hash.Hashing @@ -10,6 +9,7 @@ import com.typesafe.scalalogging.Logger import org.constellation.checkpoint.CheckpointService import org.constellation.consensus.ConsensusRemoteSender import org.constellation.domain.blacklist.BlacklistedAddresses +import org.constellation.domain.cluster.ClusterStorageAlgebra import org.constellation.domain.configuration.NodeConfig import org.constellation.domain.observation.ObservationService import org.constellation.domain.redownload.{DownloadService, RedownloadService} @@ -20,6 +20,7 @@ import org.constellation.infrastructure.p2p.ClientInterpreter import org.constellation.keytool.KeyUtils import org.constellation.keytool.KeyUtils.makeKeyPair import org.constellation.p2p.{Cluster, DataResolver, PeerData} +import org.constellation.schema.NodeType.Full import org.constellation.schema.{Id, NodeState} import org.constellation.storage._ import org.constellation.util.Metrics @@ -43,7 +44,8 @@ object TestHelpers extends IdiomaticMockito with IdiomaticMockitoCats with Argum facilitatorId1, timeAdded = System .currentTimeMillis() - (ProcessingConfig().minPeerTimeAddedSeconds * 4000), - resourceInfo = mock[ResourceInfo] + resourceInfo = mock[ResourceInfo], + nodeType = Full ) facilitatorId1 -> peerData1 @@ -129,6 +131,8 @@ object TestHelpers extends IdiomaticMockito with IdiomaticMockitoCats with Argum // val metrics = new Metrics(registry, 600, ExecutionContext.global)(dao) // dao.metrics shouldReturn metrics + dao.clusterStorage shouldReturn mock[ClusterStorageAlgebra[IO]] + val cluster = mock[Cluster[IO]] // cluster.getNodeState shouldReturnF NodeState.Ready dao.cluster shouldReturn cluster diff --git a/src/test/scala/org/constellation/infrastructure/redownload/RedownloadPeriodicCheckTest.scala b/src/test/scala/org/constellation/infrastructure/redownload/RedownloadPeriodicCheckTest.scala index 689ea0b49..85ab5afe5 100644 --- a/src/test/scala/org/constellation/infrastructure/redownload/RedownloadPeriodicCheckTest.scala +++ b/src/test/scala/org/constellation/infrastructure/redownload/RedownloadPeriodicCheckTest.scala @@ -31,7 +31,9 @@ class RedownloadPeriodicCheckTest "triggerRedownloadCheck" - { "calls check for alignment with majority snapshot" in { - val redownloadPeriodicCheck = new RedownloadPeriodicCheck(30, ExecutionContext.global, dao.redownloadService) + dao.clusterStorage.isAnActiveFullPeer shouldReturnF true + val redownloadPeriodicCheck = + new RedownloadPeriodicCheck(30, ExecutionContext.global, dao.redownloadService, dao.clusterStorage) val trigger = redownloadPeriodicCheck.trigger() val cancel = redownloadPeriodicCheck.cancel() diff --git a/src/test/scala/org/constellation/infrastructure/snapshot/SnapshotDiskStorageTest.scala b/src/test/scala/org/constellation/infrastructure/snapshot/SnapshotDiskStorageTest.scala index 2cb13fd96..9503568a3 100644 --- a/src/test/scala/org/constellation/infrastructure/snapshot/SnapshotDiskStorageTest.scala +++ b/src/test/scala/org/constellation/infrastructure/snapshot/SnapshotDiskStorageTest.scala @@ -4,7 +4,7 @@ import better.files._ import cats.effect.{ContextShift, IO} import cats.syntax.all._ import org.constellation.schema.checkpoint.CheckpointCache -import org.constellation.schema.snapshot.{Snapshot, StoredSnapshot} +import org.constellation.schema.snapshot.{NextActiveNodes, Snapshot, StoredSnapshot} import org.constellation.serialization.KryoSerializer import org.scalatest.BeforeAndAfterAll import org.scalatest.freespec.AnyFreeSpec @@ -85,7 +85,8 @@ class SnapshotDiskStorageTest extends AnyFreeSpec with Matchers with BeforeAndAf val snapshotStorage = SnapshotLocalStorage[IO](snapshotsDir.pathAsString) snapshotStorage.createDirectoryIfNotExists().value.unsafeRunSync - val snapshot = Snapshot("lastHash", Seq.empty[String], SortedMap.empty) + val snapshot = + Snapshot("lastHash", Seq.empty[String], SortedMap.empty, NextActiveNodes(Set.empty, Set.empty), Set.empty) val storedSnapshot = StoredSnapshot(snapshot, Seq.empty[CheckpointCache]) val hash = "abc123" diff --git a/src/test/scala/org/constellation/infrastructure/snapshot/SnapshotInfoLocalStorageTest.scala b/src/test/scala/org/constellation/infrastructure/snapshot/SnapshotInfoLocalStorageTest.scala index 36271edec..a0399a3d1 100644 --- a/src/test/scala/org/constellation/infrastructure/snapshot/SnapshotInfoLocalStorageTest.scala +++ b/src/test/scala/org/constellation/infrastructure/snapshot/SnapshotInfoLocalStorageTest.scala @@ -4,7 +4,7 @@ import better.files.File import cats.effect.{ContextShift, IO, Timer} import cats.syntax.all._ import org.constellation.schema.checkpoint.CheckpointCache -import org.constellation.schema.snapshot.{Snapshot, SnapshotInfo, StoredSnapshot} +import org.constellation.schema.snapshot.{NextActiveNodes, Snapshot, SnapshotInfo, StoredSnapshot} import org.constellation.serialization.KryoSerializer import org.scalatest.BeforeAndAfter import org.scalatest.freespec.AnyFreeSpec @@ -66,7 +66,8 @@ class SnapshotInfoLocalStorageTest extends AnyFreeSpec with Matchers with Before val snapshotInfoStorage = SnapshotInfoLocalStorage[IO](snapshotInfosDir.pathAsString) snapshotInfoStorage.createDirectoryIfNotExists().value.unsafeRunSync - val snapshot = Snapshot("lastHash", Seq.empty[String], SortedMap.empty) + val snapshot = + Snapshot("lastHash", Seq.empty[String], SortedMap.empty, NextActiveNodes(Set.empty, Set.empty), Set.empty) val storedSnapshot = StoredSnapshot(snapshot, Seq.empty[CheckpointCache]) val snapshotInfo = SnapshotInfo(storedSnapshot)