From c55fdefdf93e211a316b9581c701528ece1e8dac Mon Sep 17 00:00:00 2001 From: ccellado Date: Thu, 8 Feb 2024 18:27:54 +0400 Subject: [PATCH 1/2] Refactor Message class Add changes to ergo-core readme --- ergo-core/README.md | 4 +- .../network/message/MessageBase.scala | 48 +++++++++++++++++++ .../network/message/Message.scala | 43 +++-------------- .../network/message/MessageSerializer.scala | 32 ++++++++----- 4 files changed, 76 insertions(+), 51 deletions(-) create mode 100644 ergo-core/src/main/scala/org/ergoplatform/network/message/MessageBase.scala diff --git a/ergo-core/README.md b/ergo-core/README.md index b07bce00ef..f86714cbad 100644 --- a/ergo-core/README.md +++ b/ergo-core/README.md @@ -72,7 +72,9 @@ val handshakeMessageSerialized = HandshakeSerializer.toBytes(handshakeMessage) Serialize the message and send it. If the message arrived successfully, start communicating with the peer node. -All communication is wrapped with Message headers, format described [here](https://docs.ergoplatform.com/dev/p2p/network/#message-format). +All communication is wrapped with message headers. +Format described [here](https://docs.ergoplatform.com/dev/p2p/network/#message-format). +[MessageBase](src/main/scala/org/ergoplatform/network/message/MessageBase.scala) interface to implement. ## Syncing with the node diff --git a/ergo-core/src/main/scala/org/ergoplatform/network/message/MessageBase.scala b/ergo-core/src/main/scala/org/ergoplatform/network/message/MessageBase.scala new file mode 100644 index 0000000000..d348cbfb63 --- /dev/null +++ b/ergo-core/src/main/scala/org/ergoplatform/network/message/MessageBase.scala @@ -0,0 +1,48 @@ +package org.ergoplatform.network.message + +import org.ergoplatform.network.message.MessageConstants._ + +import scala.util.{Success, Try} + +/** + * Trait for a ergo network message + * + * @param spec - message specification + * @param input - message being wrapped, whether in byte-array form (if from outside), + * or structured data (if formed locally) + * @tparam Content - message data type + */ +trait MessageBase[Content] { + val spec: MessageSpec[Content] + val input: Either[Array[Byte], Content] + + /** + * Message data bytes + */ + lazy val dataBytes: Array[Byte] = input match { + case Left(db) => db + case Right(d) => spec.toBytes(d) + } + + /** + * Structured message content + */ + lazy val data: Try[Content] = input match { + case Left(db) => spec.parseBytesTry(db) + case Right(d) => Success(d) + } + + lazy val dataLength: Int = dataBytes.length + + /** + * @return serialized message length in bytes + */ + def messageLength: Int = { + if (dataLength > 0) { + HeaderLength + ChecksumLength + dataLength + } else { + HeaderLength + } + } + +} diff --git a/src/main/scala/org/ergoplatform/network/message/Message.scala b/src/main/scala/org/ergoplatform/network/message/Message.scala index 994e01ddeb..f24f3ebec8 100644 --- a/src/main/scala/org/ergoplatform/network/message/Message.scala +++ b/src/main/scala/org/ergoplatform/network/message/Message.scala @@ -1,10 +1,7 @@ package org.ergoplatform.network.message - import akka.actor.DeadLetterSuppression import scorex.core.network.ConnectedPeer -import scala.util.{Success, Try} -import org.ergoplatform.network.message.MessageConstants._ /** * Wrapper for a network message, whether come from external peer or generated locally @@ -15,38 +12,10 @@ import org.ergoplatform.network.message.MessageConstants._ * @param source - source peer, if the message is from outside * @tparam Content - message data type */ -case class Message[Content](spec: MessageSpec[Content], - input: Either[Array[Byte], Content], - source: Option[ConnectedPeer]) - extends DeadLetterSuppression { - - /** - * Message data bytes - */ - lazy val dataBytes: Array[Byte] = input match { - case Left(db) => db - case Right(d) => spec.toBytes(d) - } - - /** - * Structured message content - */ - lazy val data: Try[Content] = input match { - case Left(db) => spec.parseBytesTry(db) - case Right(d) => Success(d) - } - - lazy val dataLength: Int = dataBytes.length - - /** - * @return serialized message length in bytes - */ - def messageLength: Int = { - if (dataLength > 0) { - HeaderLength + ChecksumLength + dataLength - } else { - HeaderLength - } - } -} +case class Message[Content]( + spec: MessageSpec[Content], + input: Either[Array[Byte], Content], + source: Option[ConnectedPeer] +) extends MessageBase[Content] + with DeadLetterSuppression diff --git a/src/main/scala/org/ergoplatform/network/message/MessageSerializer.scala b/src/main/scala/org/ergoplatform/network/message/MessageSerializer.scala index d98e662e9c..165a50cefa 100644 --- a/src/main/scala/org/ergoplatform/network/message/MessageSerializer.scala +++ b/src/main/scala/org/ergoplatform/network/message/MessageSerializer.scala @@ -1,25 +1,23 @@ package org.ergoplatform.network.message import java.nio.ByteOrder - import akka.util.ByteString import scorex.core.network.{ConnectedPeer, MaliciousBehaviorException} import scorex.crypto.hash.Blake2b256 import scala.util.Try - class MessageSerializer(specs: Seq[MessageSpec[_]], magicBytes: Array[Byte]) { import MessageConstants.{ChecksumLength, HeaderLength, MagicLength} import scala.language.existentials - private implicit val byteOrder: ByteOrder = ByteOrder.BIG_ENDIAN + implicit private val byteOrder: ByteOrder = ByteOrder.BIG_ENDIAN private val specsMap = Map(specs.map(s => s.messageCode -> s): _*) .ensuring(m => m.size == specs.size, "Duplicate message codes") - def serialize(obj: Message[_]): ByteString = { + def serialize[A <: MessageBase[_]](obj: A): ByteString = { val builder = ByteString.createBuilder .putBytes(magicBytes) .putByte(obj.spec.messageCode) @@ -34,14 +32,17 @@ class MessageSerializer(specs: Seq[MessageSpec[_]], magicBytes: Array[Byte]) { } //MAGIC ++ Array(spec.messageCode) ++ Ints.toByteArray(dataLength) ++ dataWithChecksum - def deserialize(byteString: ByteString, sourceOpt: Option[ConnectedPeer]): Try[Option[Message[_]]] = Try { + def deserialize( + byteString: ByteString, + sourceOpt: Option[ConnectedPeer] + ): Try[Option[Message[_]]] = Try { if (byteString.length < HeaderLength) { None } else { - val it = byteString.iterator - val magic = it.getBytes(MagicLength) + val it = byteString.iterator + val magic = it.getBytes(MagicLength) val msgCode = it.getByte - val length = it.getInt + val length = it.getInt //peer is trying to cause buffer overflow or breaking the parsing if (length < 0) { @@ -53,17 +54,22 @@ class MessageSerializer(specs: Seq[MessageSpec[_]], magicBytes: Array[Byte]) { } else { //peer is from another network if (!java.util.Arrays.equals(magic, magicBytes)) { - throw MaliciousBehaviorException(s"Wrong magic bytes, expected ${magicBytes.mkString}, got ${magic.mkString} in : ${byteString.utf8String}") + throw MaliciousBehaviorException( + s"Wrong magic bytes, expected ${magicBytes.mkString}, got ${magic.mkString} in : ${byteString.utf8String}" + ) } - val spec = specsMap.getOrElse(msgCode, throw new Error(s"No message handler found for $msgCode")) + val spec = specsMap + .getOrElse(msgCode, throw new Error(s"No message handler found for $msgCode")) val msgData = if (length > 0) { val checksum = it.getBytes(ChecksumLength) - val data = it.getBytes(length) - val digest = Blake2b256.hash(data).take(ChecksumLength) + val data = it.getBytes(length) + val digest = Blake2b256.hash(data).take(ChecksumLength) //peer reported incorrect checksum if (!java.util.Arrays.equals(checksum, digest)) { - throw MaliciousBehaviorException(s"Wrong checksum, expected ${digest.mkString}, got ${checksum.mkString}") + throw MaliciousBehaviorException( + s"Wrong checksum, expected ${digest.mkString}, got ${checksum.mkString}" + ) } data } else { From 810d70805a48b044db50becde0d07951effdcf8f Mon Sep 17 00:00:00 2001 From: ccellado Date: Thu, 8 Feb 2024 18:36:56 +0400 Subject: [PATCH 2/2] Refactor Message class Add changes to ergo-core readme --- .../org/ergoplatform/network/message/MessageSerializer.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/scala/org/ergoplatform/network/message/MessageSerializer.scala b/src/main/scala/org/ergoplatform/network/message/MessageSerializer.scala index 165a50cefa..a94f91f92b 100644 --- a/src/main/scala/org/ergoplatform/network/message/MessageSerializer.scala +++ b/src/main/scala/org/ergoplatform/network/message/MessageSerializer.scala @@ -12,7 +12,7 @@ class MessageSerializer(specs: Seq[MessageSpec[_]], magicBytes: Array[Byte]) { import scala.language.existentials - implicit private val byteOrder: ByteOrder = ByteOrder.BIG_ENDIAN + private implicit val byteOrder: ByteOrder = ByteOrder.BIG_ENDIAN private val specsMap = Map(specs.map(s => s.messageCode -> s): _*) .ensuring(m => m.size == specs.size, "Duplicate message codes")