Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement pending join pool #1587

Draft
wants to merge 1 commit into
base: dev
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
.idea/
.vscode/
.metals/
target/
*.iml
.DS_STORE
Expand All @@ -18,8 +20,7 @@ terraform/aws/whitelisting-*
.dag
*.log
/logs
.metals/
.bloop/
project/metals.sbt
metals.sbt
keys/
integration-tests/src/test/resources/terraform-output.json
Empty file.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,12 @@ case class CheckpointBlockWithMissingSoe(checkpointBaseHash: String) extends Obs
case class RequestTimeoutOnConsensus(roundId: RoundId) extends ObservationEvent
case class RequestTimeoutOnResolving(hashes: List[String]) extends ObservationEvent
case class CheckpointBlockInvalid(checkpointBaseHash: String, reason: String) extends ObservationEvent
case object NodeMemberOfActivePool extends ObservationEvent
case object NodeNotMemberOfActivePool extends ObservationEvent
case object NodeJoinsTheCluster extends ObservationEvent
case object NodeLeavesTheCluster extends ObservationEvent
case object NodeKickedOutByHealthcheck extends ObservationEvent


object ObservationEvent {
implicit val encodeEvent: Encoder[ObservationEvent] = deriveEncoder
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import org.constellation.schema.signature.{HashSignature, SignatureBatch, Signed
import org.constellation.schema.snapshot.{
FilterData,
HeightRange,
NextActiveNodes,
Snapshot,
SnapshotInfo,
SnapshotInfoV1,
Expand Down Expand Up @@ -113,6 +114,12 @@ object SchemaKryoRegistrar
(classOf[HeightRange], 204, DefaultSerializer),
(classOf[CheckpointBlockPayload], 205, DefaultSerializer),
(classOf[FinishedCheckpointBlock], 206, DefaultSerializer),
(NodeMemberOfActivePool.getClass, 207, DefaultSerializer),
(NodeNotMemberOfActivePool.getClass, 208, DefaultSerializer),
(NodeJoinsTheCluster.getClass, 209, DefaultSerializer),
(NodeLeavesTheCluster.getClass, 210, DefaultSerializer),
(NodeKickedOutByHealthcheck.getClass, 211, DefaultSerializer),
(classOf[NextActiveNodes], 212, DefaultSerializer),
(classOf[CheckpointCache], 1034, CompatibleSerializer),
(classOf[StoredSnapshot], 1035, CompatibleSerializer),
(classOf[SnapshotInfo], 1036, CompatibleSerializer)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,22 @@ import org.constellation.schema.signature.Signable

import scala.collection.SortedMap

case class Snapshot(lastSnapshot: String, checkpointBlocks: Seq[String], publicReputation: SortedMap[Id, Double])
extends Signable {
override def toEncode = checkpointBlocks :+ lastSnapshot
case class Snapshot(
lastSnapshot: String,
checkpointBlocks: Seq[String],
publicReputation: SortedMap[Id, Double],
nextActiveNodes: NextActiveNodes,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if we will make a rollback and there are no such nodes in the cluster?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rollback case may not be handled yet. Essentially during rollback the NextActiveNodes should be ignored and we should start from the initial nodes - currently based on the whitelisting file. I mentioned it on the daily sometime ago.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it will be then NextActiveNodes(Set.empty, Set.empty) but @TheMMaciek please confirm

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

During rollback these will be set to some nodes, but we need to ignore them during rollback and use the initialFullNodes -> currently these are first 3 nodes in the whitelisting file. The initial nodes logic can be changed of course, especially if we will remove whitelisting.

authorizedNodes: Set[Id]
) extends Signable {
override def toEncode: Seq[String] =
(checkpointBlocks :+ lastSnapshot) ++
nextActiveNodes.light.toSeq.map(_.hex).sorted ++
nextActiveNodes.full.toSeq.map(_.hex).sorted ++
authorizedNodes.toSeq.map(_.hex).sorted
}

object Snapshot {
val snapshotZero: Snapshot = Snapshot("", Seq(), SortedMap.empty)
val snapshotZero: Snapshot = Snapshot("", Seq(), SortedMap.empty, NextActiveNodes(Set.empty, Set.empty), Set.empty)
}

case class NextActiveNodes(light: Set[Id], full: Set[Id])
1 change: 1 addition & 0 deletions src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ constellation {
stallCountThreshold = 2
proposalLookupLimit = 6
distanceFromMajority = 8
activePeersRotationInterval = 100
}
schema {
v1 {
Expand Down
10 changes: 5 additions & 5 deletions src/main/resources/logback-dev.xml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<configuration>
<jmxConfigurator/>

<appender name="metrics" class="io.prometheus.client.logback.InstrumentedAppender" />
<appender name="metrics" class="io.prometheus.client.logback.InstrumentedAppender"/>

<property name="HOME_LOG" value="logs/app.log"/>
<property name="HOME_ROLLING_LOG" value="logs/archived/app.%d{yyyy-MM-dd}.%i.log.gz"/>
Expand Down Expand Up @@ -38,12 +38,12 @@
</appender>

<logger name="org.constellation" level="debug">
<appender-ref ref="rollingFile" />
<appender-ref ref="jsonFile" />
<appender-ref ref="rollingFile"/>
<appender-ref ref="jsonFile"/>
</logger>

<root level="info">
<appender-ref ref="metrics" />
<appender-ref ref="metrics"/>
</root>

</configuration>
</configuration>
4 changes: 2 additions & 2 deletions src/main/resources/logback-prod.xml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<configuration>
<jmxConfigurator/>

<appender name="metrics" class="io.prometheus.client.logback.InstrumentedAppender" />
<appender name="metrics" class="io.prometheus.client.logback.InstrumentedAppender"/>

<property name="HOME_LOG" value="logs/app.log"/>
<property name="HOME_ROLLING_LOG" value="logs/archived/app.%d{yyyy-MM-dd}.%i.log.gz"/>
Expand Down Expand Up @@ -50,4 +50,4 @@
<appender-ref ref="metrics"/>
</root>

</configuration>
</configuration>
6 changes: 3 additions & 3 deletions src/main/resources/logback.xml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<configuration>
<jmxConfigurator/>

<appender name="metrics" class="io.prometheus.client.logback.InstrumentedAppender" />
<appender name="metrics" class="io.prometheus.client.logback.InstrumentedAppender"/>

<appender name="console" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
Expand Down Expand Up @@ -32,10 +32,10 @@
</logger>

<logger name="org.http4s" level="info">
<appender-ref ref="apiConsole" />
<appender-ref ref="apiConsole"/>
</logger>

<root level="info">
<appender-ref ref="metrics" />
<appender-ref ref="metrics"/>
</root>
</configuration>
2 changes: 1 addition & 1 deletion src/main/scala/org/constellation/API.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ case class PeerMetadata(
timeAdded: Long = System.currentTimeMillis(),
auxHost: String = "",
auxAddresses: Seq[String] = Seq(), // for testing multi key address partitioning
nodeType: NodeType = NodeType.Full,
nodeType: NodeType,
resourceInfo: ResourceInfo
) {
def toPeerClientMetadata: PeerClientMetadata = PeerClientMetadata(host, httpPort, id)
Expand Down
49 changes: 40 additions & 9 deletions src/main/scala/org/constellation/ConstellationNode$.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ import org.constellation.infrastructure.p2p.ClientInterpreter
import org.constellation.infrastructure.p2p.PeerResponse.PeerClientMetadata
import org.constellation.keytool.KeyStoreUtils
import org.constellation.p2p.DataResolver
import org.constellation.schema.NodeType.Full
import org.constellation.schema.checkpoint.{CheckpointBlock, CheckpointCache}
import org.constellation.schema.snapshot.NextActiveNodes
import org.constellation.schema.{GenesisObservation, Id, NodeState}
import org.constellation.serialization.KryoSerializer
import org.constellation.session.SessionTokenService
Expand All @@ -47,6 +49,7 @@ import org.slf4j.MDC
import pureconfig._
import pureconfig.generic.auto._

import scala.collection.immutable.ListMap
import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
import scala.io.Source
Expand Down Expand Up @@ -166,6 +169,7 @@ object ConstellationNode$ extends IOApp with IOApp.WithContext {
dao.snapshotInfoStorage,
dao.snapshotService,
dao.nodeStorage,
dao.redownloadService,
dao.redownloadStorage,
dao.snapshotProposalGossipService,
dao.messageValidator,
Expand Down Expand Up @@ -300,9 +304,30 @@ object ConstellationNode$ extends IOApp with IOApp.WithContext {

ClientInterpreter[IO](client, sessionTokenService)(ce, cs)
}
clusterStorage = new ClusterStorageInterpreter[IO]()
keyPair = nodeConfig.primaryKeyPair
id = keyPair.getPublic.toId
clusterStorage = new ClusterStorageInterpreter[IO](id, nodeConfig.nodeType)
_ <- Stream.eval {
val authorizedPeers =
if (nodeConfig.initialActiveFullNodes.contains(id)) nodeConfig.initialActiveFullNodes
else Set.empty[Id]

clusterStorage.setAuthorizedPeers(authorizedPeers)
}
_ <- Stream.eval {
val activeFullPeers =
if (nodeConfig.initialActiveFullNodes.contains(id))
nodeConfig.initialActiveFullNodes - id
else
Set.empty[Id]

clusterStorage.setActiveFullPeers(activeFullPeers)
}
_ <- Stream.eval {
if (nodeConfig.initialActiveFullNodes.contains(id))
clusterStorage.setAsActivePeer(Full)
else IO.unit
}
metrics = new Metrics(
registry,
clusterStorage,
Expand All @@ -320,7 +345,7 @@ object ConstellationNode$ extends IOApp with IOApp.WithContext {
transactionChainService = TransactionChainService[IO]
transactionService = TransactionService[IO](transactionChainService, rateLimiting, metrics)
trustManager = TrustManager[IO](id, clusterStorage)
observationService = new ObservationService[IO](trustManager, metrics)
observationService = new ObservationService[IO](trustManager, clusterStorage, metrics)

dataResolver = DataResolver[IO](
keyPair,
Expand Down Expand Up @@ -467,18 +492,18 @@ object ConstellationNode$ extends IOApp with IOApp.WithContext {
}
}

private def getWhitelisting[F[_]: Sync](cliConfig: CliConfig): F[Map[Id, Option[String]]] =
private def getWhitelisting[F[_]: Sync](cliConfig: CliConfig): F[ListMap[Id, Option[String]]] =
for {
source <- Sync[F].delay {
Source.fromFile(cliConfig.whitelisting)
}
lines = source.getLines().filter(_.nonEmpty).toList
values = lines.map(_.split(",").map(_.trim).toList)
mappedValues = values.map {
case id :: alias :: Nil => Map(Id(id) -> Some(alias))
case id :: Nil => Map(Id(id) -> None)
case _ => Map.empty[Id, Option[String]]
}.fold(Map.empty[Id, Option[String]])(_ ++ _)
case id :: alias :: Nil => ListMap(Id(id) -> Some(alias))
case id :: Nil => ListMap(Id(id) -> None)
case _ => ListMap.empty[Id, Option[String]]
}.fold(ListMap.empty[Id, Option[String]])(_ ++ _)
_ <- Sync[F].delay {
source.close()
}
Expand Down Expand Up @@ -527,11 +552,16 @@ object ConstellationNode$ extends IOApp with IOApp.WithContext {

whitelisting <- getWhitelisting(cliConfig)

initialActiveFullNodes <- whitelisting match {
case ids if ids.size >= 3 => ids.take(3).keySet.pure[F]
case _ => Sync[F].raiseError(new Throwable(s"Not enough peers whitelisted to pick initial active peers!"))
}

nodeConfig = NodeConfig(
seeds = Seq.empty[HostPort],
primaryKeyPair = keyPair,
isGenesisNode = cliConfig.genesisNode,
isLightNode = cliConfig.lightNode,
nodeType = cliConfig.nodeType,
isRollbackNode = cliConfig.rollbackNode,
rollbackHeight = cliConfig.rollbackHeight,
rollbackHash = cliConfig.rollbackHash,
Expand All @@ -548,7 +578,8 @@ object ConstellationNode$ extends IOApp with IOApp.WithContext {
dataPollingManagerOn = config.getBoolean("constellation.dataPollingManagerOn"),
allocAccountBalances = allocAccountBalances,
whitelisting = whitelisting,
minRequiredSpace = constellationConfig.getInt("min-required-space")
minRequiredSpace = constellationConfig.getInt("min-required-space"),
initialActiveFullNodes = initialActiveFullNodes
)
} yield nodeConfig
}
Expand Down
Loading