Skip to content

Commit

Permalink
Merge branch 'main' into feat/optimize-blob-submission-data
Browse files Browse the repository at this point in the history
  • Loading branch information
thedarkjester authored Oct 18, 2024
2 parents 797f4f3 + 9fc7fa6 commit 9e1f7dd
Show file tree
Hide file tree
Showing 17 changed files with 1,173 additions and 107 deletions.
3 changes: 3 additions & 0 deletions jvm-libs/json-rpc/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ plugins {
dependencies {
implementation project(":jvm-libs:metrics:micrometer")
implementation project(":jvm-libs:future-extensions")
implementation project(":jvm-libs:kotlin-extensions")
implementation project(":jvm-libs:generic:serialization:jackson")
implementation "com.fasterxml.jackson.core:jackson-annotations:${libs.versions.jackson.get()}"
api "com.fasterxml.jackson.core:jackson-databind:${libs.versions.jackson.get()}"
implementation "com.fasterxml.jackson.module:jackson-module-kotlin:${libs.versions.jackson.get()}"
Expand All @@ -22,6 +24,7 @@ dependencies {
testImplementation "io.rest-assured:rest-assured:${libs.versions.restassured.get()}"
testImplementation "io.rest-assured:json-schema-validator:${libs.versions.restassured.get()}"
testImplementation "com.github.tomakehurst:wiremock-jre8:${libs.versions.wiremock.get()}"
testImplementation "net.javacrumbs.json-unit:json-unit-assertj:${libs.versions.jsonUnit.get()}"
}

jar {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package net.consensys.linea.jsonrpc

import com.fasterxml.jackson.module.kotlin.registerKotlinModule
import com.github.michaelbull.result.Err
import com.github.michaelbull.result.Ok
import com.github.michaelbull.result.Result
Expand All @@ -19,6 +20,7 @@ import io.vertx.core.json.DecodeException
import io.vertx.core.json.Json
import io.vertx.core.json.JsonArray
import io.vertx.core.json.JsonObject
import io.vertx.core.json.jackson.DatabindCodec
import io.vertx.ext.auth.User
import net.consensys.linea.metrics.micrometer.DynamicTagTimerCapture
import net.consensys.linea.metrics.micrometer.SimpleTimerCapture
Expand Down Expand Up @@ -51,6 +53,9 @@ class JsonRpcMessageProcessor(
private val meterRegistry: MeterRegistry,
private val requestParser: JsonRpcRequestParser = Companion::parseRequest
) : JsonRpcMessageHandler {
init {
DatabindCodec.mapper().registerKotlinModule()
}
private val log: Logger = LogManager.getLogger(this.javaClass)
private val counterBuilder = Counter.builder("jsonrpc.counter")
override fun invoke(user: User?, messageJsonStr: String): Future<String> =
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package net.consensys.linea.jsonrpc

import com.fasterxml.jackson.annotation.JsonIgnore
import com.fasterxml.jackson.annotation.JsonProperty
import java.util.StringJoiner

interface JsonRpcRequest {
Expand All @@ -20,11 +19,18 @@ interface JsonRpcRequest {
}
}

internal data class JsonRpcRequestData(
override val jsonrpc: String,
override val id: Any,
override val method: String,
override val params: Any
) : JsonRpcRequest

data class JsonRpcRequestListParams(
@JsonProperty("jsonrpc") override val jsonrpc: String,
@JsonProperty("id") override val id: Any,
@JsonProperty("method") override val method: String,
@JsonProperty("params") override val params: List<Any?>
override val jsonrpc: String,
override val id: Any,
override val method: String,
override val params: List<Any?>
) : JsonRpcRequest {
override fun toString(): String {
return StringJoiner(", ", JsonRpcRequestListParams::class.java.simpleName + "[", "]")
Expand All @@ -37,10 +43,10 @@ data class JsonRpcRequestListParams(
}

data class JsonRpcRequestMapParams(
@JsonProperty("jsonrpc") override val jsonrpc: String,
@JsonProperty("id") override val id: Any,
@JsonProperty("method") override val method: String,
@JsonProperty("params") override val params: Map<String, *>
override val jsonrpc: String,
override val id: Any,
override val method: String,
override val params: Map<String, *>
) : JsonRpcRequest {
override fun toString(): String {
return StringJoiner(", ", JsonRpcRequestMapParams::class.java.simpleName + "[", "]")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ data class JsonRpcErrorResponse(
fun internalError(id: Any, data: Any?): JsonRpcErrorResponse {
return JsonRpcErrorResponse(id, JsonRpcError.internalError(data))
}

fun invalidParams(id: Any, message: String?): JsonRpcErrorResponse {
return JsonRpcErrorResponse(id, JsonRpcError.invalidMethodParameter(message))
}
Expand All @@ -74,8 +75,10 @@ data class JsonRpcError(
@JsonProperty("message") val message: String,
@JsonProperty("data") val data: Any? = null
) {
// inlining for better stacktrace
@Suppress("NOTHING_TO_INLINE")
inline fun asException() = JsonRpcErrorResponseException(code, message, data)

fun asException() = JsonRpcErrorException("Code: $code, message: '$message'")
companion object {
@JvmStatic
fun invalidMethodParameter(message: String?): JsonRpcError =
Expand All @@ -88,14 +91,25 @@ data class JsonRpcError(
fun invalidMethodParameter(message: String, data: Any): JsonRpcError =
JsonRpcError(JsonRpcErrorCode.INVALID_PARAMS.code, message, data)

@JvmStatic fun internalError(): JsonRpcError = JsonRpcErrorCode.INTERNAL_ERROR.toErrorObject()
@JvmStatic
fun internalError(): JsonRpcError = JsonRpcErrorCode.INTERNAL_ERROR.toErrorObject()

@JvmStatic
fun internalError(data: Any?): JsonRpcError =
JsonRpcErrorCode.INTERNAL_ERROR.toErrorObject(data)

@JvmStatic fun unauthorized(): JsonRpcError = JsonRpcErrorCode.UNAUTHORIZED.toErrorObject()
@JvmStatic
fun unauthorized(): JsonRpcError = JsonRpcErrorCode.UNAUTHORIZED.toErrorObject()
}
}

class JsonRpcErrorException(override val message: String) : Exception(message)
class JsonRpcErrorException(
override val message: String?,
val httpStatusCode: Int? = null
) : RuntimeException(message)

class JsonRpcErrorResponseException(
val rpcErrorCode: Int,
val rpcErrorMessage: String,
val rpcErrorData: Any? = null
) : RuntimeException("code=$rpcErrorCode message=$rpcErrorMessage errorData=$rpcErrorData")
Original file line number Diff line number Diff line change
@@ -1,27 +1,56 @@
package net.consensys.linea.jsonrpc.client

import com.fasterxml.jackson.databind.JsonNode
import com.fasterxml.jackson.databind.node.JsonNodeType
import com.github.michaelbull.result.Ok
import com.github.michaelbull.result.Result
import io.vertx.core.Future
import io.vertx.core.json.JsonArray
import io.vertx.core.json.JsonObject
import net.consensys.linea.jsonrpc.JsonRpcErrorResponse
import net.consensys.linea.jsonrpc.JsonRpcRequest
import net.consensys.linea.jsonrpc.JsonRpcSuccessResponse

fun identityMapper(value: Any?): Any? = value
fun toPrimitiveOrJacksonJsonNode(value: Any?): Any? = value

@Suppress("UNCHECKED_CAST")
fun toPrimitiveOrVertxJson(value: Any?): Any? {
if (value == null) {
return null
}
return when (value) {
is String -> value
is Number -> value
is Boolean -> value
is JsonNode -> {
when (value.nodeType) {
JsonNodeType.STRING, JsonNodeType.NUMBER, JsonNodeType.BOOLEAN, JsonNodeType.NULL ->
value
.toPrimitiveOrJsonNode()

JsonNodeType.OBJECT -> JsonObject(objectMapper.convertValue(value, Map::class.java) as Map<String, Any?>)
JsonNodeType.ARRAY -> JsonArray(objectMapper.convertValue(value, List::class.java) as List<Any?>)
else -> throw IllegalArgumentException("Unsupported JsonNodeType: ${value.nodeType}")
}
}

else -> throw IllegalArgumentException("Unsupported type: ${value::class.java}")
}
}

interface JsonRpcClient {
fun makeRequest(
request: JsonRpcRequest,
resultMapper: (Any?) -> Any? = ::identityMapper
resultMapper: (Any?) -> Any? = ::toPrimitiveOrVertxJson // to keep backward compatibility
): Future<Result<JsonRpcSuccessResponse, JsonRpcErrorResponse>>
}

fun isResultOk(result: Result<JsonRpcSuccessResponse, JsonRpcErrorResponse>): Boolean = result is Ok
fun isResultOk(result: Result<Any?, Any?>): Boolean = result is Ok

interface JsonRpcClientWithRetries : JsonRpcClient {
fun makeRequest(
request: JsonRpcRequest,
resultMapper: (Any?) -> Any? = ::identityMapper,
resultMapper: (Any?) -> Any? = ::toPrimitiveOrVertxJson, // to keep backward compatibility
stopRetriesPredicate: (result: Result<JsonRpcSuccessResponse, JsonRpcErrorResponse>) -> Boolean = ::isResultOk
): Future<Result<JsonRpcSuccessResponse, JsonRpcErrorResponse>>
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class JsonRpcRequestFanOut(

fun fanoutRequest(
request: JsonRpcRequest,
resultMapper: (Any?) -> Any? = ::identityMapper
resultMapper: (Any?) -> Any? = ::toPrimitiveOrVertxJson
): Future<List<Result<JsonRpcSuccessResponse, JsonRpcErrorResponse>>> {
return Future
.all(targets.map { it.makeRequest(request, resultMapper) })
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class JsonRpcRequestRetryer(
private val vertx: Vertx,
private val delegate: JsonRpcClient,
private val config: Config,
private val requestObjectMapper: ObjectMapper = VertxHttpJsonRpcClient.objectMapper,
private val requestObjectMapper: ObjectMapper = objectMapper,
private val log: Logger = LogManager.getLogger(JsonRpcRequestRetryer::class.java),
private val failuresLogLevel: Level = Level.WARN
) : JsonRpcClientWithRetries {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package net.consensys.linea.jsonrpc.client

import com.fasterxml.jackson.databind.ObjectMapper
import com.github.michaelbull.result.Err
import com.github.michaelbull.result.Ok
import com.github.michaelbull.result.Result
import com.github.michaelbull.result.map
import com.github.michaelbull.result.mapError
import com.github.michaelbull.result.onFailure
import io.vertx.core.Vertx
import net.consensys.linea.async.AsyncRetryer
import net.consensys.linea.async.RetriedExecutionException
import net.consensys.linea.async.toSafeFuture
import net.consensys.linea.jsonrpc.JsonRpcErrorResponse
import net.consensys.linea.jsonrpc.JsonRpcRequest
import net.consensys.linea.jsonrpc.JsonRpcSuccessResponse
import org.apache.logging.log4j.Level
import org.apache.logging.log4j.LogManager
import org.apache.logging.log4j.Logger
import tech.pegasys.teku.infrastructure.async.SafeFuture
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.atomic.AtomicReference
import java.util.function.Predicate

class JsonRpcRequestRetryerV2(
private val vertx: Vertx,
private val delegate: JsonRpcClient,
private val requestRetry: RequestRetryConfig,
private val requestObjectMapper: ObjectMapper = objectMapper,
private val shallRetryRequestsClientBasePredicate: Predicate<Result<Any?, Throwable>>,
private val log: Logger = LogManager.getLogger(JsonRpcRequestRetryer::class.java),
private val failuresLogLevel: Level = Level.WARN
) {
fun <T> makeRequest(
request: JsonRpcRequest,
shallRetryRequestPredicate: Predicate<Result<T, Throwable>>,
resultMapper: (Any?) -> T
): SafeFuture<T> {
return makeRequestWithRetryer(request, resultMapper, shallRetryRequestPredicate)
}

private fun shallWarnFailureRetries(retries: Int): Boolean {
return requestRetry.failuresWarningThreshold > 0u &&
retries > 0 &&
(retries % requestRetry.failuresWarningThreshold.toInt()) == 0
}

private fun <T> makeRequestWithRetryer(
request: JsonRpcRequest,
resultMapper: (Any?) -> T,
shallRetryRequestPredicate: Predicate<Result<T, Throwable>>
): SafeFuture<T> {
val lastException = AtomicReference<Throwable>()
val retriesCount = AtomicInteger(0)
val requestPredicate = Predicate<Result<T, Throwable>> { result ->
log.info("result: {}", result)
shallRetryRequestsClientBasePredicate.test(result) || shallRetryRequestPredicate.test(result)
}

return AsyncRetryer.retry(
vertx = vertx,
backoffDelay = requestRetry.backoffDelay,
maxRetries = requestRetry.maxRetries?.toInt(),
timeout = requestRetry.timeout,
stopRetriesPredicate = { result: Result<T, Throwable> ->
result.onFailure(lastException::set)
!requestPredicate.test(result)
}
) {
if (shallWarnFailureRetries(retriesCount.get())) {
log.log(
failuresLogLevel,
"Request '{}' already retried {} times. lastError={}",
requestObjectMapper.writeValueAsString(request),
retriesCount.get(),
lastException.get()
)
}
retriesCount.incrementAndGet()
delegate.makeRequest(request, resultMapper).toSafeFuture().thenApply { unfoldResultValueOrException<T>(it) }
.exceptionally { th ->
if (th is Error || th.cause is Error) {
// Very serious JVM error, we should stop retrying anyway
throw th
} else {
Err(th.cause ?: th)
}
}
}.handleComposed { result, throwable ->
when {
result is Ok -> SafeFuture.completedFuture(result.value)
result is Err -> SafeFuture.failedFuture<T>(result.error)
throwable != null && throwable is RetriedExecutionException -> SafeFuture.failedFuture(lastException.get())
else -> SafeFuture.failedFuture(throwable)
}
}
}

companion object {
fun <T> unfoldResultValueOrException(
response: Result<JsonRpcSuccessResponse, JsonRpcErrorResponse>
): Result<T, Throwable> {
@Suppress("UNCHECKED_CAST")
return response
.map { it.result as T }
.mapError { it.error.asException() }
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package net.consensys.linea.jsonrpc.client

import com.github.michaelbull.result.Result
import tech.pegasys.teku.infrastructure.async.SafeFuture
import java.util.function.Predicate

/**
* JSON-RPC client that supports JSON-RPC v2.0.
* It will automatically generate the request id and retry requests when JSON-RPC errors are received.
* Please override default stopRetriesPredicate to customize the retry logic.
*
* JSON-RPC result/error.data serialization is done automatically to Jackson JsonNode or primitive types.
*/
interface JsonRpcV2Client {
/**
* Makes a JSON-RPC request.
* @param method The method to call.
* @param params The parameters to pass to the method. It can be a List<Any?>, a Map<String, *> or a Pojo.
* @param shallRetryRequestPredicate predicate to evaluate request retrying. It defaults to never retrying.
* @param resultMapper Mapper to apply to successful JSON-RPC result.
* the result is primary type (String, Number, Boolean, null) or (jackson's JsonNode or vertx JsonObject/JsonArray)
* The underlying type will depend on the serialization configured on the concrete implementation.
* @return A future that
* - when success - resolves with mapped result
* - when JSON-RPC error - rejects with JsonRpcErrorException with corresponding error code, message and data
* - when other error - rejects with underlying exception
*/
fun <T> makeRequest(
method: String,
params: Any, // List<Any?>, Map<String, Any?>, Pojo
shallRetryRequestPredicate: Predicate<Result<T, Throwable>> = Predicate { false },
resultMapper: (Any?) -> T
): SafeFuture<T>
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package net.consensys.linea.jsonrpc.client

import com.github.michaelbull.result.Result
import net.consensys.linea.jsonrpc.JsonRpcRequestData
import tech.pegasys.teku.infrastructure.async.SafeFuture
import java.util.function.Predicate
import java.util.function.Supplier

internal class JsonRpcV2ClientImpl(
private val delegate: JsonRpcRequestRetryerV2,
private val idSupplier: Supplier<Any>
) : JsonRpcV2Client {

override fun <T> makeRequest(
method: String,
params: Any,
shallRetryRequestPredicate: Predicate<Result<T, Throwable>>,
resultMapper: (Any?) -> T
): SafeFuture<T> {
val request = JsonRpcRequestData(jsonrpc = "2.0", id = idSupplier.get(), method, params)

return delegate.makeRequest(
request = request,
shallRetryRequestPredicate = shallRetryRequestPredicate,
resultMapper = resultMapper
)
}
}
Loading

0 comments on commit 9e1f7dd

Please sign in to comment.