Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade rpc router internals #178

Merged
merged 8 commits into from
Jan 3, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions json_rpc.nimble
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ requires "nim >= 1.6.0",
"stew",
"nimcrypto",
"stint",
"chronos",
"httputils",
"chronos#head",
"httputils#head",
"chronicles",
"websock",
"json_serialization",
Expand Down
266 changes: 100 additions & 166 deletions json_rpc/client.nim
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# json-rpc
# Copyright (c) 2019-2023 Status Research & Development GmbH
# Copyright (c) 2019-2024 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
Expand All @@ -8,195 +8,129 @@
# those terms.

import
std/[tables, macros],
std/[json, tables, macros],
chronos,
./jsonmarshal
results,
./private/jrpc_conv,
./private/jrpc_sys,
./private/client_handler_wrapper,
./private/shared_wrapper,
./private/errors

from strutils import toLowerAscii, replace
from strutils import replace

export
chronos, jsonmarshal, tables
chronos,
tables,
jrpc_conv,
RequestParamsTx,
results

type
ClientId* = int64
MethodHandler* = proc (j: JsonNode) {.gcsafe, raises: [Defect, CatchableError].}
RpcClient* = ref object of RootRef
awaiting*: Table[ClientId, Future[Response]]
lastId: ClientId
methodHandlers: Table[string, MethodHandler]
onDisconnect*: proc() {.gcsafe, raises: [Defect].}
awaiting*: Table[RequestId, Future[StringOfJson]]
lastId: int
onDisconnect*: proc() {.gcsafe, raises: [].}

GetJsonRpcRequestHeaders* = proc(): seq[(string, string)] {.gcsafe, raises: [].}

{.push gcsafe, raises: [].}

# ------------------------------------------------------------------------------
# Public helpers
# ------------------------------------------------------------------------------

Response* = JsonNode
func requestTxEncode*(name: string, params: RequestParamsTx, id: RequestId): string =
let req = requestTx(name, params, id)
JrpcSys.encode(req)

GetJsonRpcRequestHeaders* = proc(): seq[(string, string)] {.gcsafe, raises: [Defect].}
# ------------------------------------------------------------------------------
# Public functions
# ------------------------------------------------------------------------------

proc getNextId*(client: RpcClient): ClientId =
proc getNextId*(client: RpcClient): RequestId =
client.lastId += 1
client.lastId
RequestId(kind: riNumber, num: client.lastId)

proc rpcCallNode*(path: string, params: JsonNode, id: ClientId): JsonNode =
%{"jsonrpc": %"2.0", "method": %path, "params": params, "id": %id}
method call*(client: RpcClient, name: string,
params: RequestParamsTx): Future[StringOfJson]
{.base, gcsafe, async.} =
doAssert(false, "`RpcClient.call` not implemented")

method call*(client: RpcClient, name: string,
params: JsonNode): Future[Response] {.base, async.} =
discard
params: JsonNode): Future[StringOfJson]
{.base, gcsafe, async.} =

method close*(client: RpcClient): Future[void] {.base, async.} =
discard
await client.call(name, params.paramsTx)

template `or`(a: JsonNode, b: typed): JsonNode =
if a.isNil: b else: a
method close*(client: RpcClient): Future[void] {.base, gcsafe, async.} =
doAssert(false, "`RpcClient.close` not implemented")

proc processMessage*(self: RpcClient, line: string) =
proc processMessage*(client: RpcClient, line: string): Result[void, string] =
# Note: this doesn't use any transport code so doesn't need to be
# differentiated.
let node = try: parseJson(line)
except CatchableError as exc: raise exc
# TODO https://github.com/status-im/nimbus-eth2/issues/2430
except Exception as exc: raise (ref ValueError)(msg: exc.msg, parent: exc)

if "id" in node:
let id = node{"id"} or newJNull()

var requestFut: Future[Response]
if not self.awaiting.pop(id.getInt(-1), requestFut):
raise newException(ValueError, "Cannot find message id \"" & $id & "\"")

let version = node{"jsonrpc"}.getStr()
if version != "2.0":
requestFut.fail(newException(ValueError,
"Unsupported version of JSON, expected 2.0, received \"" & version & "\""))
else:
let result = node{"result"}
if result.isNil:
let error = node{"error"} or newJNull()
requestFut.fail(newException(ValueError, $error))
else:
requestFut.complete(result)
elif "method" in node:
# This could be subscription notification
let name = node["method"].getStr()
let handler = self.methodHandlers.getOrDefault(name)
if not handler.isNil:
handler(node{"params"} or newJArray())
else:
raise newException(ValueError, "Invalid jsonrpc message: " & $node)
try:
let response = JrpcSys.decode(line, ResponseRx)

# Signature processing
if response.jsonrpc.isNone:
return err("missing or invalid `jsonrpc`")

if response.id.isNone:
return err("missing or invalid response id")

proc createRpcProc(procName, parameters, callBody: NimNode): NimNode =
# parameters come as a tree
var paramList = newSeq[NimNode]()
for p in parameters: paramList.add(p)

let body = quote do:
{.gcsafe.}:
`callBody`

# build proc
result = newProc(procName, paramList, body)

# make proc async
result.addPragma ident"async"
# export this proc
result[0] = nnkPostfix.newTree(ident"*", newIdentNode($procName))

proc toJsonArray(parameters: NimNode): NimNode =
# outputs an array of jsonified parameters
# ie; %[%a, %b, %c]
parameters.expectKind nnkFormalParams
var items = newNimNode(nnkBracket)
for i in 2 ..< parameters.len:
let curParam = parameters[i][0]
if curParam.kind != nnkEmpty:
items.add(nnkPrefix.newTree(ident"%", curParam))
result = nnkPrefix.newTree(bindSym("%", brForceOpen), items)

proc createRpcFromSig*(clientType, rpcDecl: NimNode): NimNode =
# Each input parameter in the rpc signature is converted
# to json with `%`.
# Return types are then converted back to native Nim types.
let iJsonNode = newIdentNode("JsonNode")

var parameters = rpcDecl.findChild(it.kind == nnkFormalParams).copy
# ensure we have at least space for a return parameter
if parameters.isNil or parameters.kind == nnkEmpty or parameters.len == 0:
parameters = nnkFormalParams.newTree(iJsonNode)

let
procName = rpcDecl.name
pathStr = $procName
returnType =
# if no return type specified, defaults to JsonNode
if parameters[0].kind == nnkEmpty: iJsonNode
else: parameters[0]
customReturnType = returnType != iJsonNode

# insert rpc client as first parameter
parameters.insert(1, nnkIdentDefs.newTree(ident"client", ident($clientType),
newEmptyNode()))

let
# variable used to send json to the server
jsonParamIdent = genSym(nskVar, "jsonParam")
# json array of marshalled parameters
jsonParamArray = parameters.toJsonArray()
var
# populate json params - even rpcs with no parameters have an empty json
# array node sent
callBody = newStmtList().add(quote do:
var `jsonParamIdent` = `jsonParamArray`
)

# convert return type to Future
parameters[0] = nnkBracketExpr.newTree(ident"Future", returnType)

let
# temporary variable to hold `Response` from rpc call
rpcResult = genSym(nskLet, "res")
clientIdent = newIdentNode("client")
# proc return variable
procRes = ident"result"

# perform rpc call
callBody.add(quote do:
# `rpcResult` is of type `Response`
let `rpcResult` = await `clientIdent`.call(`pathStr`, `jsonParamIdent`)
)

if customReturnType:
# marshal json to native Nim type
callBody.add(jsonToNim(procRes, returnType, rpcResult, "result"))
else:
# native json expected so no work
callBody.add quote do:
`procRes` = if `rpcResult`.isNil:
newJNull()
else:
`rpcResult`

# create rpc proc
result = createRpcProc(procName, parameters, callBody)
when defined(nimDumpRpcs):
echo pathStr, ":\n", result.repr

proc processRpcSigs(clientType, parsedCode: NimNode): NimNode =
result = newStmtList()

for line in parsedCode:
if line.kind == nnkProcDef:
var procDef = createRpcFromSig(clientType, line)
result.add(procDef)

proc setMethodHandler*(cl: RpcClient, name: string, callback: MethodHandler) =
cl.methodHandlers[name] = callback

proc delMethodHandler*(cl: RpcClient, name: string) =
cl.methodHandlers.del(name)
var requestFut: Future[StringOfJson]
let id = response.id.get
if not client.awaiting.pop(id, requestFut):
return err("Cannot find message id \"" & $id & "\"")

if response.error.isSome:
let error = JrpcSys.encode(response.error.get)
requestFut.fail(newException(JsonRpcError, error))
return ok()

if response.result.isNone:
return err("missing or invalid response result")

requestFut.complete(response.result.get)
return ok()

except CatchableError as exc:
return err(exc.msg)

# ------------------------------------------------------------------------------
# Signature processing
# ------------------------------------------------------------------------------

macro createRpcSigs*(clientType: untyped, filePath: static[string]): untyped =
## Takes a file of forward declarations in Nim and builds them into RPC
## calls, based on their parameters.
## Inputs are marshalled to json, and results are put into the signature's
## Nim type.
result = processRpcSigs(clientType, staticRead($filePath.replace('\\', '/')).parseStmt())
cresteSignaturesFromString(clientType, staticRead($filePath.replace('\\', '/')))

macro createRpcSigsFromString*(clientType: untyped, sigString: static[string]): untyped =
## Takes a string of forward declarations in Nim and builds them into RPC
## calls, based on their parameters.
## Inputs are marshalled to json, and results are put into the signature's
## Nim type.
cresteSignaturesFromString(clientType, sigString)

macro createSingleRpcSig*(clientType: untyped, alias: static[string], procDecl: typed): untyped =
## Takes a single forward declarations in Nim and builds them into RPC
## calls, based on their parameters.
## Inputs are marshalled to json, and results are put into the signature's
## Nim type.
doAssert procDecl.len == 1, "Only accept single proc definition"
let procDecl = procDecl[0]
procDecl.expectKind nnkProcDef
result = createRpcFromSig(clientType, procDecl, ident(alias))

macro createRpcSigsFromNim*(clientType: untyped, procList: typed): untyped =
## Takes a list of forward declarations in Nim and builds them into RPC
## calls, based on their parameters.
## Inputs are marshalled to json, and results are put into the signature's
## Nim type.
processRpcSigs(clientType, procList)

{.pop.}

30 changes: 17 additions & 13 deletions json_rpc/clients/httpclient.nim
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,17 @@

import
std/[tables, uri],
stew/[byteutils, results],
stew/byteutils,
results,
chronos/apps/http/httpclient as chronosHttpClient,
chronicles, httputils, json_serialization/std/net,
".."/[client, errors]
../client,
../private/errors,
../private/jrpc_sys

export
client, HttpClientFlag, HttpClientFlags

{.push raises: [Defect].}

logScope:
topics = "JSONRPC-HTTP-CLIENT"

Expand All @@ -35,6 +36,8 @@ type
const
MaxHttpRequestSize = 128 * 1024 * 1024 # maximum size of HTTP body in octets

{.push gcsafe, raises: [].}

proc new(
T: type RpcHttpClient, maxBodySize = MaxHttpRequestSize, secure = false,
getHeaders: GetJsonRpcRequestHeaders = nil, flags: HttpClientFlags = {}): T =
Expand All @@ -51,7 +54,7 @@ proc newRpcHttpClient*(
RpcHttpClient.new(maxBodySize, secure, getHeaders, flags)

method call*(client: RpcHttpClient, name: string,
params: JsonNode): Future[Response]
params: RequestParamsTx): Future[StringOfJson]
{.async, gcsafe.} =
doAssert client.httpSession != nil
if client.httpAddress.isErr:
Expand All @@ -66,7 +69,7 @@ method call*(client: RpcHttpClient, name: string,

let
id = client.getNextId()
reqBody = $rpcCallNode(name, params, id)
reqBody = requestTxEncode(name, params, id)

var req: HttpClientRequestRef
var res: HttpClientResponseRef
Expand Down Expand Up @@ -128,19 +131,18 @@ method call*(client: RpcHttpClient, name: string,

# completed by processMessage - the flow is quite weird here to accomodate
# socket and ws clients, but could use a more thorough refactoring
var newFut = newFuture[Response]()
var newFut = newFuture[StringOfJson]()
# add to awaiting responses
client.awaiting[id] = newFut

try:
# Might raise for all kinds of reasons
client.processMessage(resText)
except CatchableError as e:
# Might error for all kinds of reasons
let msgRes = client.processMessage(resText)
if msgRes.isErr:
# Need to clean up in case the answer was invalid
debug "Failed to process POST Response for JSON-RPC", e = e.msg
debug "Failed to process POST Response for JSON-RPC", msg = msgRes.error
client.awaiting.del(id)
closeRefs()
raise e
raise newException(JsonRpcError, msgRes.error)

client.awaiting.del(id)

Expand Down Expand Up @@ -175,3 +177,5 @@ proc connect*(client: RpcHttpClient, address: string, port: Port, secure: bool)
method close*(client: RpcHttpClient) {.async.} =
if not client.httpSession.isNil:
await client.httpSession.closeWait()

{.pop.}
Loading