diff --git a/jvm-libs/json-rpc/build.gradle b/jvm-libs/json-rpc/build.gradle index 16d2ef7dd..9a06d380d 100644 --- a/jvm-libs/json-rpc/build.gradle +++ b/jvm-libs/json-rpc/build.gradle @@ -6,6 +6,8 @@ plugins { dependencies { implementation project(":jvm-libs:metrics:micrometer") implementation project(":jvm-libs:future-extensions") + implementation project(":jvm-libs:kotlin-extensions") + implementation project(":jvm-libs:generic:serialization:jackson") implementation "com.fasterxml.jackson.core:jackson-annotations:${libs.versions.jackson.get()}" api "com.fasterxml.jackson.core:jackson-databind:${libs.versions.jackson.get()}" implementation "com.fasterxml.jackson.module:jackson-module-kotlin:${libs.versions.jackson.get()}" @@ -22,6 +24,7 @@ dependencies { testImplementation "io.rest-assured:rest-assured:${libs.versions.restassured.get()}" testImplementation "io.rest-assured:json-schema-validator:${libs.versions.restassured.get()}" testImplementation "com.github.tomakehurst:wiremock-jre8:${libs.versions.wiremock.get()}" + testImplementation "net.javacrumbs.json-unit:json-unit-assertj:${libs.versions.jsonUnit.get()}" } jar { diff --git a/jvm-libs/json-rpc/src/main/kotlin/net/consensys/linea/jsonrpc/JsonRpcMessageProcessor.kt b/jvm-libs/json-rpc/src/main/kotlin/net/consensys/linea/jsonrpc/JsonRpcMessageProcessor.kt index 0b708a5c0..ebe6c8109 100644 --- a/jvm-libs/json-rpc/src/main/kotlin/net/consensys/linea/jsonrpc/JsonRpcMessageProcessor.kt +++ b/jvm-libs/json-rpc/src/main/kotlin/net/consensys/linea/jsonrpc/JsonRpcMessageProcessor.kt @@ -1,5 +1,6 @@ package net.consensys.linea.jsonrpc +import com.fasterxml.jackson.module.kotlin.registerKotlinModule import com.github.michaelbull.result.Err import com.github.michaelbull.result.Ok import com.github.michaelbull.result.Result @@ -19,6 +20,7 @@ import io.vertx.core.json.DecodeException import io.vertx.core.json.Json import io.vertx.core.json.JsonArray import io.vertx.core.json.JsonObject +import io.vertx.core.json.jackson.DatabindCodec import io.vertx.ext.auth.User import net.consensys.linea.metrics.micrometer.DynamicTagTimerCapture import net.consensys.linea.metrics.micrometer.SimpleTimerCapture @@ -51,6 +53,9 @@ class JsonRpcMessageProcessor( private val meterRegistry: MeterRegistry, private val requestParser: JsonRpcRequestParser = Companion::parseRequest ) : JsonRpcMessageHandler { + init { + DatabindCodec.mapper().registerKotlinModule() + } private val log: Logger = LogManager.getLogger(this.javaClass) private val counterBuilder = Counter.builder("jsonrpc.counter") override fun invoke(user: User?, messageJsonStr: String): Future = diff --git a/jvm-libs/json-rpc/src/main/kotlin/net/consensys/linea/jsonrpc/JsonRpcRequest.kt b/jvm-libs/json-rpc/src/main/kotlin/net/consensys/linea/jsonrpc/JsonRpcRequest.kt index ba68dfebc..1ac494b21 100644 --- a/jvm-libs/json-rpc/src/main/kotlin/net/consensys/linea/jsonrpc/JsonRpcRequest.kt +++ b/jvm-libs/json-rpc/src/main/kotlin/net/consensys/linea/jsonrpc/JsonRpcRequest.kt @@ -1,7 +1,6 @@ package net.consensys.linea.jsonrpc import com.fasterxml.jackson.annotation.JsonIgnore -import com.fasterxml.jackson.annotation.JsonProperty import java.util.StringJoiner interface JsonRpcRequest { @@ -20,11 +19,18 @@ interface JsonRpcRequest { } } +internal data class JsonRpcRequestData( + override val jsonrpc: String, + override val id: Any, + override val method: String, + override val params: Any +) : JsonRpcRequest + data class JsonRpcRequestListParams( - @JsonProperty("jsonrpc") override val jsonrpc: String, - @JsonProperty("id") override val id: Any, - @JsonProperty("method") override val method: String, - @JsonProperty("params") override val params: List + override val jsonrpc: String, + override val id: Any, + override val method: String, + override val params: List ) : JsonRpcRequest { override fun toString(): String { return StringJoiner(", ", JsonRpcRequestListParams::class.java.simpleName + "[", "]") @@ -37,10 +43,10 @@ data class JsonRpcRequestListParams( } data class JsonRpcRequestMapParams( - @JsonProperty("jsonrpc") override val jsonrpc: String, - @JsonProperty("id") override val id: Any, - @JsonProperty("method") override val method: String, - @JsonProperty("params") override val params: Map + override val jsonrpc: String, + override val id: Any, + override val method: String, + override val params: Map ) : JsonRpcRequest { override fun toString(): String { return StringJoiner(", ", JsonRpcRequestMapParams::class.java.simpleName + "[", "]") diff --git a/jvm-libs/json-rpc/src/main/kotlin/net/consensys/linea/jsonrpc/JsonRpcResponse.kt b/jvm-libs/json-rpc/src/main/kotlin/net/consensys/linea/jsonrpc/JsonRpcResponse.kt index c5ec83f52..6fbebe302 100644 --- a/jvm-libs/json-rpc/src/main/kotlin/net/consensys/linea/jsonrpc/JsonRpcResponse.kt +++ b/jvm-libs/json-rpc/src/main/kotlin/net/consensys/linea/jsonrpc/JsonRpcResponse.kt @@ -61,6 +61,7 @@ data class JsonRpcErrorResponse( fun internalError(id: Any, data: Any?): JsonRpcErrorResponse { return JsonRpcErrorResponse(id, JsonRpcError.internalError(data)) } + fun invalidParams(id: Any, message: String?): JsonRpcErrorResponse { return JsonRpcErrorResponse(id, JsonRpcError.invalidMethodParameter(message)) } @@ -74,8 +75,10 @@ data class JsonRpcError( @JsonProperty("message") val message: String, @JsonProperty("data") val data: Any? = null ) { + // inlining for better stacktrace + @Suppress("NOTHING_TO_INLINE") + inline fun asException() = JsonRpcErrorResponseException(code, message, data) - fun asException() = JsonRpcErrorException("Code: $code, message: '$message'") companion object { @JvmStatic fun invalidMethodParameter(message: String?): JsonRpcError = @@ -88,14 +91,25 @@ data class JsonRpcError( fun invalidMethodParameter(message: String, data: Any): JsonRpcError = JsonRpcError(JsonRpcErrorCode.INVALID_PARAMS.code, message, data) - @JvmStatic fun internalError(): JsonRpcError = JsonRpcErrorCode.INTERNAL_ERROR.toErrorObject() + @JvmStatic + fun internalError(): JsonRpcError = JsonRpcErrorCode.INTERNAL_ERROR.toErrorObject() @JvmStatic fun internalError(data: Any?): JsonRpcError = JsonRpcErrorCode.INTERNAL_ERROR.toErrorObject(data) - @JvmStatic fun unauthorized(): JsonRpcError = JsonRpcErrorCode.UNAUTHORIZED.toErrorObject() + @JvmStatic + fun unauthorized(): JsonRpcError = JsonRpcErrorCode.UNAUTHORIZED.toErrorObject() } } -class JsonRpcErrorException(override val message: String) : Exception(message) +class JsonRpcErrorException( + override val message: String?, + val httpStatusCode: Int? = null +) : RuntimeException(message) + +class JsonRpcErrorResponseException( + val rpcErrorCode: Int, + val rpcErrorMessage: String, + val rpcErrorData: Any? = null +) : RuntimeException("code=$rpcErrorCode message=$rpcErrorMessage errorData=$rpcErrorData") diff --git a/jvm-libs/json-rpc/src/main/kotlin/net/consensys/linea/jsonrpc/client/JsonRpcClient.kt b/jvm-libs/json-rpc/src/main/kotlin/net/consensys/linea/jsonrpc/client/JsonRpcClient.kt index 0edb93bc8..3cfcddaf9 100644 --- a/jvm-libs/json-rpc/src/main/kotlin/net/consensys/linea/jsonrpc/client/JsonRpcClient.kt +++ b/jvm-libs/json-rpc/src/main/kotlin/net/consensys/linea/jsonrpc/client/JsonRpcClient.kt @@ -1,27 +1,56 @@ package net.consensys.linea.jsonrpc.client +import com.fasterxml.jackson.databind.JsonNode +import com.fasterxml.jackson.databind.node.JsonNodeType import com.github.michaelbull.result.Ok import com.github.michaelbull.result.Result import io.vertx.core.Future +import io.vertx.core.json.JsonArray +import io.vertx.core.json.JsonObject import net.consensys.linea.jsonrpc.JsonRpcErrorResponse import net.consensys.linea.jsonrpc.JsonRpcRequest import net.consensys.linea.jsonrpc.JsonRpcSuccessResponse -fun identityMapper(value: Any?): Any? = value +fun toPrimitiveOrJacksonJsonNode(value: Any?): Any? = value + +@Suppress("UNCHECKED_CAST") +fun toPrimitiveOrVertxJson(value: Any?): Any? { + if (value == null) { + return null + } + return when (value) { + is String -> value + is Number -> value + is Boolean -> value + is JsonNode -> { + when (value.nodeType) { + JsonNodeType.STRING, JsonNodeType.NUMBER, JsonNodeType.BOOLEAN, JsonNodeType.NULL -> + value + .toPrimitiveOrJsonNode() + + JsonNodeType.OBJECT -> JsonObject(objectMapper.convertValue(value, Map::class.java) as Map) + JsonNodeType.ARRAY -> JsonArray(objectMapper.convertValue(value, List::class.java) as List) + else -> throw IllegalArgumentException("Unsupported JsonNodeType: ${value.nodeType}") + } + } + + else -> throw IllegalArgumentException("Unsupported type: ${value::class.java}") + } +} interface JsonRpcClient { fun makeRequest( request: JsonRpcRequest, - resultMapper: (Any?) -> Any? = ::identityMapper + resultMapper: (Any?) -> Any? = ::toPrimitiveOrVertxJson // to keep backward compatibility ): Future> } -fun isResultOk(result: Result): Boolean = result is Ok +fun isResultOk(result: Result): Boolean = result is Ok interface JsonRpcClientWithRetries : JsonRpcClient { fun makeRequest( request: JsonRpcRequest, - resultMapper: (Any?) -> Any? = ::identityMapper, + resultMapper: (Any?) -> Any? = ::toPrimitiveOrVertxJson, // to keep backward compatibility stopRetriesPredicate: (result: Result) -> Boolean = ::isResultOk ): Future> } diff --git a/jvm-libs/json-rpc/src/main/kotlin/net/consensys/linea/jsonrpc/client/JsonRpcRequestFanOut.kt b/jvm-libs/json-rpc/src/main/kotlin/net/consensys/linea/jsonrpc/client/JsonRpcRequestFanOut.kt index 15bbca706..589205ad1 100644 --- a/jvm-libs/json-rpc/src/main/kotlin/net/consensys/linea/jsonrpc/client/JsonRpcRequestFanOut.kt +++ b/jvm-libs/json-rpc/src/main/kotlin/net/consensys/linea/jsonrpc/client/JsonRpcRequestFanOut.kt @@ -39,7 +39,7 @@ class JsonRpcRequestFanOut( fun fanoutRequest( request: JsonRpcRequest, - resultMapper: (Any?) -> Any? = ::identityMapper + resultMapper: (Any?) -> Any? = ::toPrimitiveOrVertxJson ): Future>> { return Future .all(targets.map { it.makeRequest(request, resultMapper) }) diff --git a/jvm-libs/json-rpc/src/main/kotlin/net/consensys/linea/jsonrpc/client/JsonRpcRequestRetryer.kt b/jvm-libs/json-rpc/src/main/kotlin/net/consensys/linea/jsonrpc/client/JsonRpcRequestRetryer.kt index 12b15142b..0bb637823 100644 --- a/jvm-libs/json-rpc/src/main/kotlin/net/consensys/linea/jsonrpc/client/JsonRpcRequestRetryer.kt +++ b/jvm-libs/json-rpc/src/main/kotlin/net/consensys/linea/jsonrpc/client/JsonRpcRequestRetryer.kt @@ -45,7 +45,7 @@ class JsonRpcRequestRetryer( private val vertx: Vertx, private val delegate: JsonRpcClient, private val config: Config, - private val requestObjectMapper: ObjectMapper = VertxHttpJsonRpcClient.objectMapper, + private val requestObjectMapper: ObjectMapper = objectMapper, private val log: Logger = LogManager.getLogger(JsonRpcRequestRetryer::class.java), private val failuresLogLevel: Level = Level.WARN ) : JsonRpcClientWithRetries { diff --git a/jvm-libs/json-rpc/src/main/kotlin/net/consensys/linea/jsonrpc/client/JsonRpcRequestRetryerV2.kt b/jvm-libs/json-rpc/src/main/kotlin/net/consensys/linea/jsonrpc/client/JsonRpcRequestRetryerV2.kt new file mode 100644 index 000000000..0182324b0 --- /dev/null +++ b/jvm-libs/json-rpc/src/main/kotlin/net/consensys/linea/jsonrpc/client/JsonRpcRequestRetryerV2.kt @@ -0,0 +1,109 @@ +package net.consensys.linea.jsonrpc.client + +import com.fasterxml.jackson.databind.ObjectMapper +import com.github.michaelbull.result.Err +import com.github.michaelbull.result.Ok +import com.github.michaelbull.result.Result +import com.github.michaelbull.result.map +import com.github.michaelbull.result.mapError +import com.github.michaelbull.result.onFailure +import io.vertx.core.Vertx +import net.consensys.linea.async.AsyncRetryer +import net.consensys.linea.async.RetriedExecutionException +import net.consensys.linea.async.toSafeFuture +import net.consensys.linea.jsonrpc.JsonRpcErrorResponse +import net.consensys.linea.jsonrpc.JsonRpcRequest +import net.consensys.linea.jsonrpc.JsonRpcSuccessResponse +import org.apache.logging.log4j.Level +import org.apache.logging.log4j.LogManager +import org.apache.logging.log4j.Logger +import tech.pegasys.teku.infrastructure.async.SafeFuture +import java.util.concurrent.atomic.AtomicInteger +import java.util.concurrent.atomic.AtomicReference +import java.util.function.Predicate + +class JsonRpcRequestRetryerV2( + private val vertx: Vertx, + private val delegate: JsonRpcClient, + private val requestRetry: RequestRetryConfig, + private val requestObjectMapper: ObjectMapper = objectMapper, + private val shallRetryRequestsClientBasePredicate: Predicate>, + private val log: Logger = LogManager.getLogger(JsonRpcRequestRetryer::class.java), + private val failuresLogLevel: Level = Level.WARN +) { + fun makeRequest( + request: JsonRpcRequest, + shallRetryRequestPredicate: Predicate>, + resultMapper: (Any?) -> T + ): SafeFuture { + return makeRequestWithRetryer(request, resultMapper, shallRetryRequestPredicate) + } + + private fun shallWarnFailureRetries(retries: Int): Boolean { + return requestRetry.failuresWarningThreshold > 0u && + retries > 0 && + (retries % requestRetry.failuresWarningThreshold.toInt()) == 0 + } + + private fun makeRequestWithRetryer( + request: JsonRpcRequest, + resultMapper: (Any?) -> T, + shallRetryRequestPredicate: Predicate> + ): SafeFuture { + val lastException = AtomicReference() + val retriesCount = AtomicInteger(0) + val requestPredicate = Predicate> { result -> + log.info("result: {}", result) + shallRetryRequestsClientBasePredicate.test(result) || shallRetryRequestPredicate.test(result) + } + + return AsyncRetryer.retry( + vertx = vertx, + backoffDelay = requestRetry.backoffDelay, + maxRetries = requestRetry.maxRetries?.toInt(), + timeout = requestRetry.timeout, + stopRetriesPredicate = { result: Result -> + result.onFailure(lastException::set) + !requestPredicate.test(result) + } + ) { + if (shallWarnFailureRetries(retriesCount.get())) { + log.log( + failuresLogLevel, + "Request '{}' already retried {} times. lastError={}", + requestObjectMapper.writeValueAsString(request), + retriesCount.get(), + lastException.get() + ) + } + retriesCount.incrementAndGet() + delegate.makeRequest(request, resultMapper).toSafeFuture().thenApply { unfoldResultValueOrException(it) } + .exceptionally { th -> + if (th is Error || th.cause is Error) { + // Very serious JVM error, we should stop retrying anyway + throw th + } else { + Err(th.cause ?: th) + } + } + }.handleComposed { result, throwable -> + when { + result is Ok -> SafeFuture.completedFuture(result.value) + result is Err -> SafeFuture.failedFuture(result.error) + throwable != null && throwable is RetriedExecutionException -> SafeFuture.failedFuture(lastException.get()) + else -> SafeFuture.failedFuture(throwable) + } + } + } + + companion object { + fun unfoldResultValueOrException( + response: Result + ): Result { + @Suppress("UNCHECKED_CAST") + return response + .map { it.result as T } + .mapError { it.error.asException() } + } + } +} diff --git a/jvm-libs/json-rpc/src/main/kotlin/net/consensys/linea/jsonrpc/client/JsonRpcV2Client.kt b/jvm-libs/json-rpc/src/main/kotlin/net/consensys/linea/jsonrpc/client/JsonRpcV2Client.kt new file mode 100644 index 000000000..20fef8440 --- /dev/null +++ b/jvm-libs/json-rpc/src/main/kotlin/net/consensys/linea/jsonrpc/client/JsonRpcV2Client.kt @@ -0,0 +1,34 @@ +package net.consensys.linea.jsonrpc.client + +import com.github.michaelbull.result.Result +import tech.pegasys.teku.infrastructure.async.SafeFuture +import java.util.function.Predicate + +/** + * JSON-RPC client that supports JSON-RPC v2.0. + * It will automatically generate the request id and retry requests when JSON-RPC errors are received. + * Please override default stopRetriesPredicate to customize the retry logic. + * + * JSON-RPC result/error.data serialization is done automatically to Jackson JsonNode or primitive types. + */ +interface JsonRpcV2Client { + /** + * Makes a JSON-RPC request. + * @param method The method to call. + * @param params The parameters to pass to the method. It can be a List, a Map or a Pojo. + * @param shallRetryRequestPredicate predicate to evaluate request retrying. It defaults to never retrying. + * @param resultMapper Mapper to apply to successful JSON-RPC result. + * the result is primary type (String, Number, Boolean, null) or (jackson's JsonNode or vertx JsonObject/JsonArray) + * The underlying type will depend on the serialization configured on the concrete implementation. + * @return A future that + * - when success - resolves with mapped result + * - when JSON-RPC error - rejects with JsonRpcErrorException with corresponding error code, message and data + * - when other error - rejects with underlying exception + */ + fun makeRequest( + method: String, + params: Any, // List, Map, Pojo + shallRetryRequestPredicate: Predicate> = Predicate { false }, + resultMapper: (Any?) -> T + ): SafeFuture +} diff --git a/jvm-libs/json-rpc/src/main/kotlin/net/consensys/linea/jsonrpc/client/JsonRpcV2ClientImpl.kt b/jvm-libs/json-rpc/src/main/kotlin/net/consensys/linea/jsonrpc/client/JsonRpcV2ClientImpl.kt new file mode 100644 index 000000000..ac73bc1c2 --- /dev/null +++ b/jvm-libs/json-rpc/src/main/kotlin/net/consensys/linea/jsonrpc/client/JsonRpcV2ClientImpl.kt @@ -0,0 +1,28 @@ +package net.consensys.linea.jsonrpc.client + +import com.github.michaelbull.result.Result +import net.consensys.linea.jsonrpc.JsonRpcRequestData +import tech.pegasys.teku.infrastructure.async.SafeFuture +import java.util.function.Predicate +import java.util.function.Supplier + +internal class JsonRpcV2ClientImpl( + private val delegate: JsonRpcRequestRetryerV2, + private val idSupplier: Supplier +) : JsonRpcV2Client { + + override fun makeRequest( + method: String, + params: Any, + shallRetryRequestPredicate: Predicate>, + resultMapper: (Any?) -> T + ): SafeFuture { + val request = JsonRpcRequestData(jsonrpc = "2.0", id = idSupplier.get(), method, params) + + return delegate.makeRequest( + request = request, + shallRetryRequestPredicate = shallRetryRequestPredicate, + resultMapper = resultMapper + ) + } +} diff --git a/jvm-libs/json-rpc/src/main/kotlin/net/consensys/linea/jsonrpc/client/ObjectMappers.kt b/jvm-libs/json-rpc/src/main/kotlin/net/consensys/linea/jsonrpc/client/ObjectMappers.kt new file mode 100644 index 000000000..f718cf0f0 --- /dev/null +++ b/jvm-libs/json-rpc/src/main/kotlin/net/consensys/linea/jsonrpc/client/ObjectMappers.kt @@ -0,0 +1,26 @@ +package net.consensys.linea.jsonrpc.client + +import build.linea.s11n.jackson.ethByteAsHexSerialisersModule +import com.fasterxml.jackson.databind.JsonNode +import com.fasterxml.jackson.databind.node.JsonNodeType +import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper +import io.vertx.core.json.jackson.VertxModule + +internal val objectMapper = jacksonObjectMapper() + .registerModules(VertxModule()) + .registerModules(ethByteAsHexSerialisersModule) + +fun JsonNodeType.isPrimitive(): Boolean { + return when (this) { + JsonNodeType.STRING, JsonNodeType.NUMBER, JsonNodeType.BOOLEAN, JsonNodeType.NULL -> true + else -> false + } +} + +fun JsonNode.toPrimitiveOrJsonNode(): Any? { + return if (this.nodeType.isPrimitive()) { + objectMapper.convertValue(this, Any::class.java) + } else { + this + } +} diff --git a/jvm-libs/json-rpc/src/main/kotlin/net/consensys/linea/jsonrpc/client/SequentialIdSupplier.kt b/jvm-libs/json-rpc/src/main/kotlin/net/consensys/linea/jsonrpc/client/SequentialIdSupplier.kt new file mode 100644 index 000000000..338d0b302 --- /dev/null +++ b/jvm-libs/json-rpc/src/main/kotlin/net/consensys/linea/jsonrpc/client/SequentialIdSupplier.kt @@ -0,0 +1,19 @@ +package net.consensys.linea.jsonrpc.client + +import java.util.concurrent.atomic.AtomicLong +import java.util.function.Supplier +import javax.annotation.concurrent.ThreadSafe + +@ThreadSafe +class SequentialIdSupplier : Supplier { + private var id = AtomicLong(0) + + // if application makes 1_000 requests per second, it will take 292,277,026,596 years of uptime to overflow + override fun get(): Any { + return id.incrementAndGet() + } + + companion object { + val singleton = SequentialIdSupplier() + } +} diff --git a/jvm-libs/json-rpc/src/main/kotlin/net/consensys/linea/jsonrpc/client/VertxHttpJsonRpcClient.kt b/jvm-libs/json-rpc/src/main/kotlin/net/consensys/linea/jsonrpc/client/VertxHttpJsonRpcClient.kt index 7bd5cf0b2..42eb78843 100644 --- a/jvm-libs/json-rpc/src/main/kotlin/net/consensys/linea/jsonrpc/client/VertxHttpJsonRpcClient.kt +++ b/jvm-libs/json-rpc/src/main/kotlin/net/consensys/linea/jsonrpc/client/VertxHttpJsonRpcClient.kt @@ -1,10 +1,7 @@ package net.consensys.linea.jsonrpc.client -import com.fasterxml.jackson.core.JsonGenerator -import com.fasterxml.jackson.databind.JsonSerializer import com.fasterxml.jackson.databind.ObjectMapper -import com.fasterxml.jackson.databind.SerializerProvider -import com.fasterxml.jackson.databind.module.SimpleModule +import com.fasterxml.jackson.module.kotlin.contains import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper import com.github.michaelbull.result.Err import com.github.michaelbull.result.Ok @@ -16,24 +13,24 @@ import io.vertx.core.http.HttpClient import io.vertx.core.http.HttpClientResponse import io.vertx.core.http.HttpMethod import io.vertx.core.http.RequestOptions -import io.vertx.core.json.JsonObject -import io.vertx.core.json.jackson.VertxModule import net.consensys.linea.jsonrpc.JsonRpcError +import net.consensys.linea.jsonrpc.JsonRpcErrorException import net.consensys.linea.jsonrpc.JsonRpcErrorResponse import net.consensys.linea.jsonrpc.JsonRpcRequest +import net.consensys.linea.jsonrpc.JsonRpcRequestData import net.consensys.linea.jsonrpc.JsonRpcSuccessResponse import net.consensys.linea.metrics.micrometer.SimpleTimerCapture import org.apache.logging.log4j.Level import org.apache.logging.log4j.LogManager import org.apache.logging.log4j.Logger import java.net.URL -import java.util.HexFormat +@Suppress("UNCHECKED_CAST") class VertxHttpJsonRpcClient( private val httpClient: HttpClient, private val endpoint: URL, private val meterRegistry: MeterRegistry, - private val requestObjectMapper: ObjectMapper = objectMapper, + private val requestParamsObjectMapper: ObjectMapper = objectMapper, private val responseObjectMapper: ObjectMapper = objectMapper, private val log: Logger = LogManager.getLogger(VertxHttpJsonRpcClient::class.java), private val requestResponseLogLevel: Level = Level.TRACE, @@ -44,11 +41,22 @@ class VertxHttpJsonRpcClient( setAbsoluteURI(endpoint) } + private fun serializeRequest(request: JsonRpcRequest): String { + return requestEnvelopeObjectMapper.writeValueAsString( + JsonRpcRequestData( + jsonrpc = request.jsonrpc, + id = request.id, + method = request.method, + params = requestParamsObjectMapper.valueToTree(request.params) + ) + ) + } + override fun makeRequest( request: JsonRpcRequest, resultMapper: (Any?) -> Any? ): Future> { - val json = requestObjectMapper.writeValueAsString(request) + val json = serializeRequest(request) return httpClient.request(requestOptions).flatMap { httpClientRequest -> httpClientRequest.putHeader("Content-Type", "application/json") @@ -67,8 +75,10 @@ class VertxHttpJsonRpcClient( responseBody = bodyBuffer.toString().lines().firstOrNull() ?: "" ) Future.failedFuture( - Exception( - "HTTP errorCode=${response.statusCode()}, message=${response.statusMessage()}" + JsonRpcErrorException( + message = + "HTTP errorCode=${response.statusCode()}, message=${response.statusMessage()}", + httpStatusCode = response.statusCode() ) ) } @@ -99,37 +109,41 @@ class VertxHttpJsonRpcClient( .flatMap { bodyBuffer: Buffer -> responseBody = bodyBuffer.toString() try { - @Suppress("UNCHECKED_CAST") - val jsonResponse = (responseObjectMapper.readValue(responseBody, Map::class.java) as Map) - .let(::JsonObject) + val jsonResponse = responseObjectMapper.readTree(responseBody) + val responseId = responseObjectMapper.convertValue(jsonResponse.get("id"), Any::class.java) val response = when { - jsonResponse.containsKey("result") -> + jsonResponse.contains("result") -> { Ok( JsonRpcSuccessResponse( - jsonResponse.getValue("id"), - resultMapper(jsonResponse.getValue("result")) + responseId, + resultMapper(jsonResponse.get("result").toPrimitiveOrJsonNode()) ) ) + } - jsonResponse.containsKey("error") -> { + jsonResponse.contains("error") -> { isError = true - Err( - JsonRpcErrorResponse( - jsonResponse.getValue("id"), - jsonResponse.getJsonObject("error").mapTo(JsonRpcError::class.java) - ) + val errorResponse = JsonRpcErrorResponse( + responseId, + responseObjectMapper.treeToValue(jsonResponse["error"], JsonRpcError::class.java) ) + Err(errorResponse) } else -> throw IllegalArgumentException("Invalid JSON-RPC response without result or error") } - Future.succeededFuture(response) + Future.succeededFuture>(response) } catch (e: Throwable) { isError = true when (e) { is IllegalArgumentException -> Future.failedFuture(e) - else -> Future.failedFuture(IllegalArgumentException("Invalid JSON-RPC response.", e)) + else -> Future.failedFuture( + IllegalArgumentException( + "Error parsing JSON-RPC response: message=${e.message}", + e + ) + ) } } } @@ -185,19 +199,6 @@ class VertxHttpJsonRpcClient( } companion object { - val objectMapper = jacksonObjectMapper() - .registerModules(VertxModule()) - .registerModules( - SimpleModule().apply { - this.addSerializer(ByteArray::class.java, ByteArrayToHexStringSerializer()) - } - ) - } -} - -class ByteArrayToHexStringSerializer : JsonSerializer() { - private val hexFormatter = HexFormat.of() - override fun serialize(value: ByteArray?, gen: JsonGenerator?, serializers: SerializerProvider?) { - gen?.writeString(value?.let { "0x" + hexFormatter.formatHex(it) }) + private val requestEnvelopeObjectMapper: ObjectMapper = jacksonObjectMapper() } } diff --git a/jvm-libs/json-rpc/src/main/kotlin/net/consensys/linea/jsonrpc/client/VertxHttpJsonRpcClientFactory.kt b/jvm-libs/json-rpc/src/main/kotlin/net/consensys/linea/jsonrpc/client/VertxHttpJsonRpcClientFactory.kt index ccdaa9873..21687a62d 100644 --- a/jvm-libs/json-rpc/src/main/kotlin/net/consensys/linea/jsonrpc/client/VertxHttpJsonRpcClientFactory.kt +++ b/jvm-libs/json-rpc/src/main/kotlin/net/consensys/linea/jsonrpc/client/VertxHttpJsonRpcClientFactory.kt @@ -1,6 +1,8 @@ package net.consensys.linea.jsonrpc.client import com.fasterxml.jackson.databind.ObjectMapper +import com.github.michaelbull.result.Err +import com.github.michaelbull.result.Result import io.micrometer.core.instrument.MeterRegistry import io.vertx.core.Vertx import io.vertx.core.http.HttpClientOptions @@ -9,19 +11,22 @@ import org.apache.logging.log4j.Level import org.apache.logging.log4j.LogManager import org.apache.logging.log4j.Logger import java.net.URL +import java.util.function.Predicate +import java.util.function.Supplier class VertxHttpJsonRpcClientFactory( private val vertx: Vertx, private val meterRegistry: MeterRegistry, private val requestResponseLogLevel: Level = Level.TRACE, - private val failuresLogLevel: Level = Level.DEBUG + private val failuresLogLevel: Level = Level.DEBUG, + private val requestIdSupplier: Supplier = SequentialIdSupplier.singleton ) { fun create( endpoint: URL, maxPoolSize: Int? = null, httpVersion: HttpVersion? = null, - requestObjectMapper: ObjectMapper = VertxHttpJsonRpcClient.objectMapper, - responseObjectMapper: ObjectMapper = VertxHttpJsonRpcClient.objectMapper, + requestObjectMapper: ObjectMapper = objectMapper, + responseObjectMapper: ObjectMapper = objectMapper, log: Logger = LogManager.getLogger(VertxHttpJsonRpcClient::class.java), requestResponseLogLevel: Level = this.requestResponseLogLevel, failuresLogLevel: Level = this.failuresLogLevel @@ -39,7 +44,7 @@ class VertxHttpJsonRpcClientFactory( endpoint, meterRegistry, log = log, - requestObjectMapper = requestObjectMapper, + requestParamsObjectMapper = requestObjectMapper, responseObjectMapper = responseObjectMapper, requestResponseLogLevel = requestResponseLogLevel, failuresLogLevel = failuresLogLevel @@ -50,8 +55,8 @@ class VertxHttpJsonRpcClientFactory( endpoints: Set, maxInflightRequestsPerClient: UInt, httpVersion: HttpVersion? = null, - requestObjectMapper: ObjectMapper = VertxHttpJsonRpcClient.objectMapper, - responseObjectMapper: ObjectMapper = VertxHttpJsonRpcClient.objectMapper, + requestObjectMapper: ObjectMapper = objectMapper, + responseObjectMapper: ObjectMapper = objectMapper, log: Logger = LogManager.getLogger(VertxHttpJsonRpcClient::class.java), requestResponseLogLevel: Level = this.requestResponseLogLevel, failuresLogLevel: Level = this.failuresLogLevel @@ -79,8 +84,8 @@ class VertxHttpJsonRpcClientFactory( retryConfig: RequestRetryConfig, methodsToRetry: Set, httpVersion: HttpVersion? = null, - requestObjectMapper: ObjectMapper = VertxHttpJsonRpcClient.objectMapper, - responseObjectMapper: ObjectMapper = VertxHttpJsonRpcClient.objectMapper, + requestObjectMapper: ObjectMapper = objectMapper, + responseObjectMapper: ObjectMapper = objectMapper, log: Logger = LogManager.getLogger(VertxHttpJsonRpcClient::class.java), requestResponseLogLevel: Level = this.requestResponseLogLevel, failuresLogLevel: Level = this.failuresLogLevel @@ -114,8 +119,8 @@ class VertxHttpJsonRpcClientFactory( retryConfig: RequestRetryConfig, methodsToRetry: Set, httpVersion: HttpVersion? = null, - requestObjectMapper: ObjectMapper = VertxHttpJsonRpcClient.objectMapper, - responseObjectMapper: ObjectMapper = VertxHttpJsonRpcClient.objectMapper, + requestObjectMapper: ObjectMapper = objectMapper, + responseObjectMapper: ObjectMapper = objectMapper, log: Logger = LogManager.getLogger(VertxHttpJsonRpcClient::class.java), requestResponseLogLevel: Level = this.requestResponseLogLevel, failuresLogLevel: Level = this.failuresLogLevel @@ -143,4 +148,60 @@ class VertxHttpJsonRpcClientFactory( log = log ) } + + fun createV2( + vertx: Vertx, + endpoints: Set, + maxInflightRequestsPerClient: UInt? = null, + retryConfig: RequestRetryConfig, + httpVersion: HttpVersion? = null, + requestObjectMapper: ObjectMapper = objectMapper, + responseObjectMapper: ObjectMapper = objectMapper, + shallRetryRequestsClientBasePredicate: Predicate> = Predicate { it is Err }, + log: Logger = LogManager.getLogger(VertxHttpJsonRpcClient::class.java), + requestResponseLogLevel: Level = this.requestResponseLogLevel, + failuresLogLevel: Level = this.failuresLogLevel + ): JsonRpcV2Client { + assert(endpoints.isNotEmpty()) { "endpoints set is empty " } + // create base client + return if (maxInflightRequestsPerClient != null || endpoints.size > 1) { + createWithLoadBalancing( + endpoints = endpoints, + maxInflightRequestsPerClient = maxInflightRequestsPerClient!!, + httpVersion = httpVersion, + requestObjectMapper = requestObjectMapper, + responseObjectMapper = responseObjectMapper, + log = log, + requestResponseLogLevel = requestResponseLogLevel, + failuresLogLevel = failuresLogLevel + ) + } else { + create( + endpoint = endpoints.first(), + httpVersion = httpVersion, + requestObjectMapper = requestObjectMapper, + responseObjectMapper = responseObjectMapper, + log = log, + requestResponseLogLevel = requestResponseLogLevel, + failuresLogLevel = failuresLogLevel + ) + }.let { + // Wrap the client with a retryer + JsonRpcRequestRetryerV2( + vertx = vertx, + delegate = it, + requestRetry = retryConfig, + requestObjectMapper = requestObjectMapper, + shallRetryRequestsClientBasePredicate = shallRetryRequestsClientBasePredicate, + failuresLogLevel = failuresLogLevel, + log = log + ) + }.let { + // Wrap the client with a v2 client helper + JsonRpcV2ClientImpl( + delegate = it, + idSupplier = requestIdSupplier + ) + } + } } diff --git a/jvm-libs/json-rpc/src/test/kotlin/net/consensys/linea/jsonrpc/client/JsonRpcV2ClientImplTest.kt b/jvm-libs/json-rpc/src/test/kotlin/net/consensys/linea/jsonrpc/client/JsonRpcV2ClientImplTest.kt new file mode 100644 index 000000000..c6a008993 --- /dev/null +++ b/jvm-libs/json-rpc/src/test/kotlin/net/consensys/linea/jsonrpc/client/JsonRpcV2ClientImplTest.kt @@ -0,0 +1,672 @@ +package net.consensys.linea.jsonrpc.client + +import build.linea.s11n.jackson.BigIntegerToHexSerializer +import build.linea.s11n.jackson.ByteArrayToHexSerializer +import build.linea.s11n.jackson.JIntegerToHexSerializer +import build.linea.s11n.jackson.ULongToHexSerializer +import com.fasterxml.jackson.databind.JsonNode +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.databind.module.SimpleModule +import com.fasterxml.jackson.databind.node.ArrayNode +import com.fasterxml.jackson.databind.node.ObjectNode +import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper +import com.github.michaelbull.result.Ok +import com.github.michaelbull.result.Result +import com.github.michaelbull.result.getOr +import com.github.michaelbull.result.orElseThrow +import com.github.tomakehurst.wiremock.WireMockServer +import com.github.tomakehurst.wiremock.client.WireMock.containing +import com.github.tomakehurst.wiremock.client.WireMock.post +import com.github.tomakehurst.wiremock.client.WireMock.status +import com.github.tomakehurst.wiremock.core.WireMockConfiguration +import com.github.tomakehurst.wiremock.stubbing.Scenario.STARTED +import io.micrometer.core.instrument.simple.SimpleMeterRegistry +import io.vertx.core.Vertx +import io.vertx.core.json.JsonObject +import io.vertx.junit5.VertxExtension +import net.consensys.decodeHex +import net.consensys.linea.jsonrpc.JsonRpcErrorResponseException +import net.javacrumbs.jsonunit.assertj.JsonAssertions.assertThatJson +import org.assertj.core.api.Assertions.assertThat +import org.assertj.core.api.Assertions.assertThatThrownBy +import org.junit.jupiter.api.AfterEach +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.extension.ExtendWith +import tech.pegasys.teku.infrastructure.async.SafeFuture +import java.math.BigInteger +import java.net.ConnectException +import java.net.URI +import java.net.URL +import java.util.concurrent.ExecutionException +import java.util.function.Predicate +import kotlin.time.Duration +import kotlin.time.Duration.Companion.milliseconds +import kotlin.time.Duration.Companion.minutes +import kotlin.time.Duration.Companion.seconds +import kotlin.time.toJavaDuration + +@ExtendWith(VertxExtension::class) +class JsonRpcV2ClientImplTest { + private lateinit var vertx: Vertx + private lateinit var factory: VertxHttpJsonRpcClientFactory + private lateinit var client: JsonRpcV2Client + private lateinit var wiremock: WireMockServer + private val path = "/api/v1?appKey=1234" + private lateinit var meterRegistry: SimpleMeterRegistry + private lateinit var endpoint: URL + private val defaultRetryConfig = retryConfig(maxRetries = 2u, timeout = 8.seconds, backoffDelay = 5.milliseconds) + + private val defaultObjectMapper = jacksonObjectMapper() + private val objectMapperBytesAsHex = jacksonObjectMapper() + .registerModules( + SimpleModule().apply { + this.addSerializer(ByteArray::class.java, ByteArrayToHexSerializer) + this.addSerializer(ULong::class.java, ULongToHexSerializer) + } + ) + private val jsonRpcResultOk = """{"jsonrpc": "2.0", "id": 1, "result": "OK"}""" + + private fun retryConfig( + maxRetries: UInt = 2u, + timeout: Duration = 8.seconds, // bellow 2s we may have flacky tests when running whole test suite in parallel + backoffDelay: Duration = 5.milliseconds + ) = RequestRetryConfig( + maxRetries = maxRetries, + timeout = timeout, + backoffDelay = backoffDelay + ) + + private fun createClientAndSetupWireMockServer( + vertx: Vertx, + responseObjectMapper: ObjectMapper = defaultObjectMapper, + requestObjectMapper: ObjectMapper = defaultObjectMapper, + retryConfig: RequestRetryConfig = defaultRetryConfig, + shallRetryRequestsClientBasePredicate: Predicate> = Predicate { false } + ): JsonRpcV2Client { + wiremock = WireMockServer(WireMockConfiguration.options().dynamicPort()) + wiremock.start() + endpoint = URI(wiremock.baseUrl() + path).toURL() + + return factory.createV2( + vertx = vertx, + endpoints = setOf(endpoint), + retryConfig = retryConfig, + requestObjectMapper = requestObjectMapper, + responseObjectMapper = responseObjectMapper, + shallRetryRequestsClientBasePredicate = shallRetryRequestsClientBasePredicate + ) + } + + @BeforeEach + fun beforeEach(vertx: Vertx) { + this.vertx = vertx + this.meterRegistry = SimpleMeterRegistry() + this.factory = VertxHttpJsonRpcClientFactory(vertx, meterRegistry) + this.client = createClientAndSetupWireMockServer(vertx) + } + + @AfterEach + fun tearDown() { + wiremock.stop() + } + + private fun WireMockServer.jsonRequest(requestIndex: Int = 0): String { + return this.serveEvents.serveEvents[requestIndex].request.bodyAsString + } + + @Test + fun `when request is a list of params shall serialize to json array`() { + replyRequestWith(200, jsonRpcResultOk) + + client.makeRequest( + method = "someMethod", + params = listOf("superUser", "Alice"), + resultMapper = { it } + ).get() + + assertThatJson(wiremock.jsonRequest()).isEqualTo( + """ + { + "jsonrpc": "2.0", + "id":"${'$'}{json-unit.any-number}", + "method": "someMethod", + "params": ["superUser", "Alice"] + } + """ + ) + } + + @Test + fun `when request is a map of params shall serialize to json object`() { + replyRequestWith(200, jsonRpcResultOk) + + client.makeRequest( + method = "someMethod", + params = mapOf("superUser" to "Alice"), + resultMapper = { it } + ).get() + + assertThatJson(wiremock.jsonRequest()).isEqualTo( + """ + { + "jsonrpc": "2.0", + "id":"${'$'}{json-unit.any-number}", + "method": "someMethod", + "params": {"superUser":"Alice"} + } + """ + ) + } + + private data class User( + val name: String, + val email: String, + val address: ByteArray, + val value: ULong + ) + + @Test + fun `when request is Pojo object shall serialize to json object`() { + replyRequestWith(200, jsonRpcResultOk) + + client.makeRequest( + method = "someMethod", + params = User(name = "John", email = "email@example.com", address = "0x01ffbb".decodeHex(), value = 987UL), + resultMapper = { it } + ).get() + // 0x01ffbb -> "Af+7" in Base64, jackon's default encoding for ByteArray + assertThatJson(wiremock.jsonRequest()).isEqualTo( + """ + { + "jsonrpc": "2.0", + "id":"${'$'}{json-unit.any-number}", + "method": "someMethod", + "params": {"name":"John", "email":"email@example.com", "address":"Af+7", "value":987} + } + """ + ) + } + + @Test + fun `request params shall use defined objectMapper and not affect json-rpc envelope`() { + val obj = User(name = "John", email = "email@example.com", address = "0x01ffbb".decodeHex(), value = 987UL) + + createClientAndSetupWireMockServer(vertx, requestObjectMapper = defaultObjectMapper).also { client -> + replyRequestWith(200, jsonRpcResultOk) + client.makeRequest( + method = "someMethod", + params = obj, + resultMapper = { it } + ).get() + + assertThatJson(wiremock.jsonRequest()).isEqualTo( + """ + { + "jsonrpc": "2.0", + "id":"${'$'}{json-unit.any-number}", + "method": "someMethod", + "params": {"name":"John", "email":"email@example.com", "address":"Af+7", "value":987} + } + """ + ) + wiremock.stop() + } + + val objMapperWithNumbersAsHex = jacksonObjectMapper() + .registerModules( + SimpleModule().apply { + this.addSerializer(ByteArray::class.java, ByteArrayToHexSerializer) + this.addSerializer(ULong::class.java, ULongToHexSerializer) + this.addSerializer(Integer::class.java, JIntegerToHexSerializer) + this.addSerializer(BigInteger::class.java, BigIntegerToHexSerializer) + } + ) + + createClientAndSetupWireMockServer(vertx, requestObjectMapper = objMapperWithNumbersAsHex).also { client -> + replyRequestWith(200, jsonRpcResultOk) + client.makeRequest( + method = "someMethod", + params = obj, + resultMapper = { it } + ).get() + + assertThatJson(wiremock.jsonRequest()).isEqualTo( + """ + { + "jsonrpc": "2.0", + "id":"${'$'}{json-unit.any-number}", + "method": "someMethod", + "params": {"name":"John", "email":"email@example.com", "address":"0x01ffbb", "value": "0x3db"} + } + """ + ) + } + } + + @Test + fun `when multiple requests are made, each request shall have an unique id`() { + replyRequestWith(200, jsonRpcResultOk) + val numberOfRequests = 20 + val requestsPromises = IntRange(1, numberOfRequests).map { index -> + client.makeRequest( + method = "someMethod", + params = listOf(index), + resultMapper = { it } + ) + } + SafeFuture.collectAll(requestsPromises.stream()).get() + assertThat(wiremock.serveEvents.serveEvents.size).isEqualTo(numberOfRequests) + + val ids = wiremock.serveEvents.serveEvents.fold(mutableSetOf()) { acc, event -> + val index = JsonObject(event.request.bodyAsString).getString("id") + acc.add(index) + acc + } + assertThat(ids.size).isEqualTo(numberOfRequests) + } + + @Test + fun `when result is null resolves with null`() { + replyRequestWith(200, """{"jsonrpc": "2.0", "id": 1, "result": null}""") + + client.makeRequest( + method = "someMethod", + params = emptyList(), + resultMapper = { it } + ) + .get() + .also { response -> + assertThat(response).isNull() + } + } + + @Test + fun `when result is string resolves with String`() { + replyRequestWith(200, """{"jsonrpc": "2.0", "id": 1, "result": "hello :)"}""") + + client.makeRequest( + method = "someMethod", + params = emptyList(), + resultMapper = { it } + ) + .get() + .also { response -> + assertThat(response).isEqualTo("hello :)") + } + } + + @Test + fun `when result is Int Number resolves with Int`() { + replyRequestWith(200, """{"jsonrpc": "2.0", "id": 1, "result": 42}""") + + client.makeRequest( + method = "someMethod", + params = emptyList(), + resultMapper = { it } + ) + .get() + .also { response -> + assertThat(response).isEqualTo(42) + } + } + + @Test + fun `when result is Long Number resolves with Long`() { + replyRequestWith(200, """{"jsonrpc": "2.0", "id": 1, "result": ${Long.MAX_VALUE}}""") + + client.makeRequest( + method = "someMethod", + params = emptyList(), + resultMapper = { it } + ) + .get() + .also { response -> + assertThat(response).isEqualTo(Long.MAX_VALUE) + } + } + + @Test + fun `when result is Floating point Number resolves with Double`() { + replyRequestWith(200, """{"jsonrpc": "2.0", "id": 1, "result": 3.14}""") + + client.makeRequest( + method = "someMethod", + params = emptyList(), + resultMapper = { it } + ) + .get() + .also { response -> + assertThat(response).isEqualTo(3.14) + } + } + + @Test + fun `when result is an Object, returns JsonNode`() { + replyRequestWith( + 200, + """{"jsonrpc": "2.0", "id": 1, "result": {"name": "Alice", "age": 23}}""" + ) + + client.makeRequest( + method = "someMethod", + params = emptyList(), + resultMapper = { it } + ) + .get() + .also { response -> + val expectedObj: ObjectNode = + objectMapperBytesAsHex.readTree("""{"name": "Alice", "age": 23}""") as ObjectNode + assertThat(response).isEqualTo(expectedObj) + } + } + + @Test + fun `when result is an Array, return JsonNode`() { + replyRequestWith(200, """{"jsonrpc": "2.0", "id": 1, "result": [1, 2, 3]}""") + + client.makeRequest( + method = "someMethod", + params = emptyList(), + resultMapper = { it } + ) + .get() + .also { response -> + val expectedArray: ArrayNode = objectMapperBytesAsHex.readTree("[1, 2, 3]") as ArrayNode + assertThat(response).isEqualTo(expectedArray) + } + } + + @Test + fun `when result transforms result shall return it`() { + data class SimpleUser(val name: String, val age: Int) + replyRequestWith(200, """{"jsonrpc": "2.0", "id": 1, "result": {"name": "Alice", "age": 23}}""") + + val expectedUser = SimpleUser("Alice", 23) + client.makeRequest( + method = "someMethod", + params = emptyList(), + resultMapper = { + it as JsonNode + SimpleUser(it.get("name").asText(), it.get("age").asInt()) + } + ) + .get() + .also { response -> + assertThat(response).isEqualTo(expectedUser) + } + + client.makeRequest( + method = "someMethod", + params = emptyList(), + resultMapper = { + it as JsonNode + defaultObjectMapper.treeToValue(it, SimpleUser::class.java) + } + ) + .get() + .also { response -> + assertThat(response).isEqualTo(expectedUser) + } + } + + @Test + fun `when it gets a json-prc error rejects with JsonRpcErrorException cause with error code and message`() { + replyRequestWith( + 200, + """{ + "jsonrpc": "2.0", + "id": 1, + "error": { + "code": -32602, + "message": "Invalid params", + "data": { + "field1": {"key1": "value1", "key2": 20, "key3": [1, 2, 3], "key4": null} + } + } + } + """.trimMargin() + ) + + assertThat( + client.makeRequest( + method = "someMethod", + params = emptyList(), + resultMapper = { it } + ) + ).failsWithin(10.seconds.toJavaDuration()) + .withThrowableThat() + .isInstanceOfSatisfying(ExecutionException::class.java) { + assertThat(it.cause).isInstanceOf(JsonRpcErrorResponseException::class.java) + val cause = it.cause as JsonRpcErrorResponseException + assertThat(cause.rpcErrorCode).isEqualTo(-32602) + assertThat(cause.rpcErrorMessage).isEqualTo("Invalid params") + val expectedData = mapOf( + "field1" to mapOf( + "key1" to "value1", + "key2" to 20, + "key3" to listOf(1, 2, 3), + "key4" to null + ) + ) + assertThat(cause.rpcErrorData).isEqualTo(expectedData) + } + } + + @Test + fun `when it gets an error propagates to shallRetryRequestPredicate and retries while is true`() { + createClientAndSetupWireMockServer( + vertx, + retryConfig = retryConfig(maxRetries = 10u) + ).also { client -> + val responses = listOf( + 500 to "Internal Error", + 200 to "Invalid Json", + 200 to """{"jsonrpc": "2.0", "id": 1, "error": {"code": -32602, "message": "Invalid params"}}""", + 200 to """{"jsonrpc": "2.0", "id": 1, "result": null }""", + 200 to """{"jsonrpc": "2.0", "id": 1, "result": "some result" }""", + 200 to """{"jsonrpc": "2.0", "id": 1, "result": "expected result" }""" + ) + replyRequestsWith(responses = responses) + val retryPredicateCalls = mutableListOf>() + + client.makeRequest( + method = "someMethod", + params = emptyList(), + shallRetryRequestPredicate = { + retryPredicateCalls.add(it) + it != Ok("EXPECTED RESULT") + }, + resultMapper = { + it as String? + it?.uppercase() + } + ).get() + + assertThat(wiremock.serveEvents.serveEvents).hasSize(responses.size) + assertThat(retryPredicateCalls).hasSize(responses.size) + assertThatThrownBy { retryPredicateCalls[0].orElseThrow() } + .isInstanceOfSatisfying(Exception::class.java) { + assertThat(it.message).contains("HTTP errorCode=500, message=Server Error") + } + assertThatThrownBy { retryPredicateCalls[1].orElseThrow() } + .isInstanceOfSatisfying(IllegalArgumentException::class.java) { + assertThat(it.message).contains("Error parsing JSON-RPC response") + } + assertThatThrownBy { retryPredicateCalls[2].orElseThrow() } + .isInstanceOfSatisfying(JsonRpcErrorResponseException::class.java) { + assertThat(it.rpcErrorCode).isEqualTo(-32602) + assertThat(it.rpcErrorMessage).isEqualTo("Invalid params") + } + assertThat(retryPredicateCalls[3]).isEqualTo(Ok(value = null)) + assertThat(retryPredicateCalls[4]).isEqualTo(Ok(value = "SOME RESULT")) + assertThat(retryPredicateCalls[5]).isEqualTo(Ok(value = "EXPECTED RESULT")) + } + } + + @Test + fun `when it has connection error propagates to shallRetryRequestPredicate and retries while is true`() { + createClientAndSetupWireMockServer( + vertx, + retryConfig = retryConfig(maxRetries = 10u) + ).also { client -> + // stop the server to simulate connection error + wiremock.stop() + + val retryPredicateCalls = mutableListOf>() + + val reqFuture = client.makeRequest( + method = "someMethod", + params = emptyList(), + shallRetryRequestPredicate = { + retryPredicateCalls.add(it) + retryPredicateCalls.size < 2 + }, + resultMapper = { it as String? } + ) + + assertThatThrownBy { reqFuture.get() } + .isInstanceOfSatisfying(ExecutionException::class.java) { + assertThat(it.cause).isInstanceOfSatisfying(ConnectException::class.java) { + assertThat(it.message).contains("Connection refused: localhost/127.0.0.1:") + } + } + + assertThat(retryPredicateCalls.size).isEqualTo(2) + assertThatThrownBy { retryPredicateCalls[0].orElseThrow() } + .isInstanceOfSatisfying(ConnectException::class.java) { + assertThat(it.message).contains("Connection refused: localhost/127.0.0.1:") + } + } + } + + @Test + fun `when it has connection error propagates to shallRetryRequestPredicate and retries until retry config elapses`() { + createClientAndSetupWireMockServer( + vertx, + retryConfig = retryConfig(maxRetries = 2u, timeout = 8.seconds, backoffDelay = 5.milliseconds) + ).also { client -> + // stop the server to simulate connection error + wiremock.stop() + + val retryPredicateCalls = mutableListOf>() + + val reqFuture = client.makeRequest( + method = "someMethod", + params = emptyList(), + shallRetryRequestPredicate = { + retryPredicateCalls.add(it) + true // keep retrying + }, + resultMapper = { it as String? } + ) + + assertThatThrownBy { reqFuture.get() } + .isInstanceOfSatisfying(ExecutionException::class.java) { + assertThat(it.cause).isInstanceOfSatisfying(ConnectException::class.java) { + assertThat(it.message).contains("Connection refused: localhost/127.0.0.1:") + } + } + assertThat(retryPredicateCalls).hasSizeBetween(1, 3) + } + } + + @Test + fun `when shared predicate is defined shall retry when any of them returns true`() { + val baseRetryPredicateCalls = mutableListOf>() + val baseRetryPredicate = Predicate> { + baseRetryPredicateCalls.add(it) + it as Ok + (it.value as String).startsWith("retry_a") + } + createClientAndSetupWireMockServer( + vertx, + retryConfig = RequestRetryConfig( + maxRetries = 10u, + timeout = 5.minutes, + backoffDelay = 1.milliseconds + ), + shallRetryRequestsClientBasePredicate = baseRetryPredicate + ).also { client -> + replyRequestsWith( + listOf( + 200 to """{"jsonrpc": "2.0", "id": 1, "result": "retry_a_0" }""", + 200 to """{"jsonrpc": "2.0", "id": 1, "result": "retry_a_1" }""", + 200 to """{"jsonrpc": "2.0", "id": 1, "result": "retry_a_2" }""", + 200 to """{"jsonrpc": "2.0", "id": 1, "result": "retry_b_3" }""", + 200 to """{"jsonrpc": "2.0", "id": 1, "result": "retry_b_4" }""", + 200 to """{"jsonrpc": "2.0", "id": 1, "result": "retry_b_5" }""", + 200 to """{"jsonrpc": "2.0", "id": 1, "result": "some_result" }""" + ) + ) + val retryPredicateCalls = mutableListOf>() + val reqFuture = client.makeRequest( + method = "someMethod", + params = emptyList(), + shallRetryRequestPredicate = { + retryPredicateCalls.add(it) + it.getOr("").startsWith("retry_b") + }, + resultMapper = { it as String } + ) + + assertThat(reqFuture.get()).isEqualTo("some_result") + assertThat(baseRetryPredicateCalls).hasSize(7) + // this extra assertion is not necessary for correctness. + // however, if it breaks raises awareness that 2nd predicate may be evaluated without need + assertThat(retryPredicateCalls.map { it.getOr(-1) }).isEqualTo( + listOf( + "retry_b_3", + "retry_b_4", + "retry_b_5", + "some_result" + ) + ) + } + } + + private fun replyRequestWith(statusCode: Int, body: String?) { + wiremock.stubFor( + post(path) + .withHeader("Content-Type", containing("application/json")) + .willReturn( + status(statusCode) + .withHeader("Content-type", "text/plain") + .apply { if (body != null) withBody(body) } + ) + ) + } + + private fun replyRequestsWith(responses: List>) { + val (firstResponseStatus, firstResponseBody) = responses.first() + wiremock.stubFor( + post(path) + .withHeader("Content-Type", containing("application/json")) + .inScenario("retry") + .whenScenarioStateIs(STARTED) + .willReturn( + status(firstResponseStatus) + .withHeader("Content-type", "text/plain") + .apply { if (firstResponseBody != null) withBody(firstResponseBody) } + ) + .willSetStateTo("req_0") + ) + + responses + .drop(1) + .forEachIndexed { index, (statusCode, body) -> + wiremock.stubFor( + post(path) + .withHeader("Content-Type", containing("application/json")) + .inScenario("retry") + .whenScenarioStateIs("req_$index") + .willReturn( + status(statusCode) + .withHeader("Content-type", "text/plain") + .apply { if (body != null) withBody(body) } + ) + .willSetStateTo("req_${index + 1}") + ) + } + } +} diff --git a/jvm-libs/json-rpc/src/test/kotlin/net/consensys/linea/jsonrpc/client/VertxHttpJsonRpcClientTest.kt b/jvm-libs/json-rpc/src/test/kotlin/net/consensys/linea/jsonrpc/client/VertxHttpJsonRpcClientTest.kt index c080675a1..b7506bfbd 100644 --- a/jvm-libs/json-rpc/src/test/kotlin/net/consensys/linea/jsonrpc/client/VertxHttpJsonRpcClientTest.kt +++ b/jvm-libs/json-rpc/src/test/kotlin/net/consensys/linea/jsonrpc/client/VertxHttpJsonRpcClientTest.kt @@ -16,6 +16,7 @@ import io.micrometer.core.instrument.simple.SimpleMeterRegistry import io.vertx.core.Vertx import io.vertx.core.http.HttpClientOptions import io.vertx.core.http.HttpVersion +import io.vertx.core.json.JsonArray import io.vertx.core.json.JsonObject import net.consensys.linea.async.get import net.consensys.linea.async.toSafeFuture @@ -115,27 +116,37 @@ class VertxHttpJsonRpcClientTest { } @Test - fun makesRequest_successNullResult() { + fun makesRequest_success_result_is_null() { replyRequestWith(JsonObject().put("jsonrpc", "2.0").put("id", "1").put("result", null)) - val response = - client.makeRequest(JsonRpcRequestListParams("2.0", 1, "eth_blockNumber", emptyList())).get() - - assertThat(response).isEqualTo(Ok(JsonRpcSuccessResponse("1", null))) + client.makeRequest(JsonRpcRequestListParams("2.0", 1, "eth_blockNumber", emptyList())).get() + .also { response -> + assertThat(response).isEqualTo(Ok(JsonRpcSuccessResponse("1", null))) + } } @Test - fun makesRequest_successSingleValue() { + fun makesRequest_success_result_is_number() { replyRequestWith(JsonObject().put("jsonrpc", "2.0").put("id", "1").put("result", 3)) - val response = - client.makeRequest(JsonRpcRequestListParams("2.0", 1, "randomNumber", emptyList())).get() + client.makeRequest(JsonRpcRequestListParams("2.0", 1, "randomNumber", emptyList())).get() + .also { response -> + assertThat(response).isEqualTo(Ok(JsonRpcSuccessResponse("1", 3))) + } + } - assertThat(response).isEqualTo(Ok(JsonRpcSuccessResponse("1", 3))) + @Test + fun makesRequest_success_result_is_string() { + replyRequestWith(JsonObject().put("jsonrpc", "2.0").put("id", "1").put("result", "0x1234")) + + client.makeRequest(JsonRpcRequestListParams("2.0", 1, "randomNumber", emptyList())).get() + .also { response -> + assertThat(response).isEqualTo(Ok(JsonRpcSuccessResponse("1", "0x1234"))) + } } @Test - fun makesRequest_successJsonObject() { + fun makesRequest_success_result_is_Object() { replyRequestWith( JsonObject() .put("jsonrpc", "2.0") @@ -143,11 +154,60 @@ class VertxHttpJsonRpcClientTest { .put("result", JsonObject().put("odd", 23).put("even", 10)) ) - val response = - client.makeRequest(JsonRpcRequestListParams("2.0", 1, "randomNumbers", emptyList())).get() + client + .makeRequest(JsonRpcRequestListParams("2.0", 1, "randomNumbers", emptyList())) + .get() + .also { response -> + val expectedJsonNode = JsonObject("""{"odd":23,"even":10}""") + assertThat(response) + .isEqualTo(Ok(JsonRpcSuccessResponse("1", expectedJsonNode))) + } + + client + .makeRequest( + request = JsonRpcRequestListParams("2.0", 1, "randomNumbers", emptyList()), + resultMapper = ::toPrimitiveOrJacksonJsonNode + ) + .get() + .also { response -> + val expectedJsonNode = objectMapper.readTree("""{"odd":23,"even":10}""") + assertThat(response) + .isEqualTo(Ok(JsonRpcSuccessResponse("1", expectedJsonNode))) + } + } - assertThat(response) - .isEqualTo(Ok(JsonRpcSuccessResponse("1", JsonObject().put("odd", 23).put("even", 10)))) + @Test + fun makesRequest_success_result_is_array() { + replyRequestWith( + statusCode = 200, + """{ + |"jsonrpc": "2.0", + |"id": "1", + |"result": ["a", 2, "c", 4] + |} + """.trimMargin() + ) + + client + .makeRequest(JsonRpcRequestListParams("2.0", 1, "randomNumbers", emptyList())) + .get() + .also { response -> + val expectedJsonNode = JsonArray("""["a", 2, "c", 4]""") + assertThat(response) + .isEqualTo(Ok(JsonRpcSuccessResponse("1", expectedJsonNode))) + } + + client + .makeRequest( + request = JsonRpcRequestListParams("2.0", 1, "randomNumbers", emptyList()), + resultMapper = ::toPrimitiveOrJacksonJsonNode + ) + .get() + .also { response -> + val expectedJsonNode = objectMapper.readTree("""["a", 2, "c", 4]""") + assertThat(response) + .isEqualTo(Ok(JsonRpcSuccessResponse("1", expectedJsonNode))) + } } @Test @@ -156,12 +216,13 @@ class VertxHttpJsonRpcClientTest { JsonObject().put("jsonrpc", "2.0").put("id", "1").put("result", "some_random_value") ) val resultMapper = { value: Any? -> (value as String).uppercase() } - val response = - client - .makeRequest(JsonRpcRequestListParams("2.0", 1, "randomNumbers", emptyList()), resultMapper) - .get() - assertThat(response).isEqualTo(Ok(JsonRpcSuccessResponse("1", "SOME_RANDOM_VALUE"))) + client + .makeRequest(JsonRpcRequestListParams("2.0", 1, "randomNumbers", emptyList()), resultMapper) + .get() + .also { response -> + assertThat(response).isEqualTo(Ok(JsonRpcSuccessResponse("1", "SOME_RANDOM_VALUE"))) + } } @Test diff --git a/traces-api-facade/conflation/src/main/kotlin/net/consensys/linea/traces/RawJsonTracesConflator.kt b/traces-api-facade/conflation/src/main/kotlin/net/consensys/linea/traces/RawJsonTracesConflator.kt index 1d553c424..c6bfc7e79 100644 --- a/traces-api-facade/conflation/src/main/kotlin/net/consensys/linea/traces/RawJsonTracesConflator.kt +++ b/traces-api-facade/conflation/src/main/kotlin/net/consensys/linea/traces/RawJsonTracesConflator.kt @@ -1,5 +1,7 @@ package net.consensys.linea.traces +import com.fasterxml.jackson.databind.MapperFeature +import com.fasterxml.jackson.databind.json.JsonMapper import com.github.michaelbull.result.Err import com.github.michaelbull.result.Ok import com.github.michaelbull.result.Result @@ -1028,22 +1030,18 @@ class ConflatedTrace : ConflatedTraceStorage() { fun reAssembleRom() { fun hiLoToAddr(hi: String, lo: String): BigInteger { val addrShift = 256.toBigInteger().pow(16) - return hi - .toBigInteger() - .multiply(addrShift) - .add(lo.toBigInteger()) + return hi.toBigInteger().multiply(addrShift).add(lo.toBigInteger()) } val idxs = (0 until this.rom.PC.size).toList() - val sortedIdxs = - idxs.sortedWith( - compareBy( - { hiLoToAddr(this.rom.SC_ADDRESS_HI[it], this.rom.SC_ADDRESS_LO[it]) }, - // we want the initcode *FIRST* - { -this.rom.IS_INITCODE[it].toInt() }, - { this.rom.PC[it].toInt() } - ) + val sortedIdxs = idxs.sortedWith( + compareBy( + { hiLoToAddr(this.rom.SC_ADDRESS_HI[it], this.rom.SC_ADDRESS_LO[it]) }, + // we want the initcode *FIRST* + { -this.rom.IS_INITCODE[it].toInt() }, + { this.rom.PC[it].toInt() } ) + ) // Awfully suboptimal, but it just works val copiedRom = this.rom.copy() @@ -1054,9 +1052,8 @@ class ConflatedTrace : ConflatedTraceStorage() { if (i > 0) { for (column in Rom::class.memberProperties) { if (column != Rom::ADDRESS_INDEX && column != Rom::CODE_FRAGMENT_INDEX) { - duplicate = duplicate && - getColumn(copiedRom, column)[sortedIdxs[i]].toBigInteger() - .equals(getColumn(this.rom, column).last().toBigInteger()) + duplicate = duplicate && getColumn(copiedRom, column)[sortedIdxs[i]].toBigInteger() + .equals(getColumn(this.rom, column).last().toBigInteger()) } } } else { @@ -1163,7 +1160,9 @@ class ConflatedTrace : ConflatedTraceStorage() { } } -class RawJsonTracesConflator(val tracesEngineVersion: String) : TracesConflator { +class RawJsonTracesConflator(private val tracesEngineVersion: String) : TracesConflator { + private val objectMapper: JsonMapper = JsonMapper.builder().disable(MapperFeature.USE_GETTERS_AS_SETTERS).build() + private val log: Logger = LogManager.getLogger(this::class.java) override fun conflateTraces( @@ -1181,12 +1180,11 @@ class RawJsonTracesConflator(val tracesEngineVersion: String) : TracesConflator log.trace("Parsing trace: {}", jsonPath) trace.getTrace(jsonPath)?.let { if (!it.isEmpty) { - ax.add(it.mapTo(klass)) + ax.add(objectMapper.convertValue(it, klass)) } + } ?: run { + log.warn("Could not parse object with path: '{}'", jsonPath.joinToString(".")) } - ?: run { - log.warn("Could not parse object with path: '{}'", jsonPath.joinToString(".")) - } } }