Skip to content
This repository has been archived by the owner on Dec 22, 2024. It is now read-only.

Add connection closures cases, add effectiveGasPrice for getTransactionReceipt, change gasPrice to user input for getTransactionByHash #100

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions src/routes/evm/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,7 @@ export default async function (fastify: FastifyInstance, opts: TelosEvmConfig) {
} else {
const hexBlockNum = removeLeftZeros(blockHex);
const hexGas = removeLeftZeros(numToHex(receipt['gas_limit']));
const hexGasPrice = removeLeftZeros(numToHex(receipt['charged_gas_price']));
const hexGasPrice = removeLeftZeros(numToHex(receipt['gas_price']));
const hexNonce = removeLeftZeros(numToHex(receipt['nonce']));
const hexTransactionIndex = removeLeftZeros(numToHex(receipt['trx_index']));
const hexValue = addHexPrefix(receipt['value']);
Expand Down Expand Up @@ -1144,6 +1144,7 @@ export default async function (fastify: FastifyInstance, opts: TelosEvmConfig) {
blockNumber: removeLeftZeros(numToHex(receipt['block'])),
contractAddress: toChecksumAddress(_contractAddr)?.toLowerCase(),
cumulativeGasUsed: removeLeftZeros(_gas),
effectiveGasPrice: removeLeftZeros(numToHex(receipt['charged_gas_price'])),
from: toChecksumAddress(receipt['from'])?.toLowerCase(),
gasUsed: removeLeftZeros(_gas),
logsBloom: _logsBloom,
Expand Down Expand Up @@ -1188,7 +1189,7 @@ export default async function (fastify: FastifyInstance, opts: TelosEvmConfig) {
blockNumber: removeLeftZeros(_blockNum),
from: toChecksumAddress(receipt['from']).toLowerCase(),
gas: removeLeftZeros(numToHex(receipt.gas_limit)),
gasPrice: removeLeftZeros(numToHex(receipt.charged_gas_price)),
gasPrice: removeLeftZeros(numToHex(receipt.gas_price)),
hash: receipt['hash'],
input: receipt['input_data'],
nonce: removeLeftZeros(numToHex(receipt['nonce'])),
Expand Down
72 changes: 50 additions & 22 deletions src/ws/WebsocketRPC.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import {TelosEvmConfig} from "../types";
import Subscription from "./Subscription";
import LogSubscription from "./LogSubscription";
import {createLogger} from "../util/logger";
import {BigNumber} from 'ethers';

const NEW_HEADS_SUBSCRIPTION = "0x9ce59a13059e417087c02d3236a0b1cd"
const logger = createLogger('telos-evm-rpc-ws');
Expand Down Expand Up @@ -40,16 +39,16 @@ export default class WebsocketRPC {
initUWS(): void {
const host = this.config.rpcWebsocketHost;
const port = this.config.rpcWebsocketPort;
let ip, origin;
let ip: String, origin : String;
this.websocketRPC = uWS.App({}).ws('/evm', {
compression: 0,
maxPayloadLength: 16 * 1024 * 1024,
idleTimeout: 30,
upgrade: (res: uWS.HttpResponse, req: uWS.HttpRequest, context: uWS.us_socket_context_t) => {
const tRef = process.hrtime.bigint();
const buffer = Buffer.from(res.getRemoteAddressAsText());
const string = buffer.toString();
ip = req.getHeader('x-forwarded-for') || string || '';
const remoteAddress = buffer.toString();
ip = req.getHeader('x-forwarded-for') || remoteAddress || '';

if (req.getHeader('origin') === 'chrome-extension://nkbihfbeogaeaoehlefnkodbefgpgknn') {
origin = 'MetaMask';
Expand Down Expand Up @@ -78,9 +77,13 @@ export default class WebsocketRPC {
drain: () => {
},
close: (ws: uWS.WebSocket) => {
console.log(`WSCLOSE: ${new Date().toISOString()} - ${ip} (0/0) - ${ws.clientInfo.origin} - close`);
if(ws.readyState !== ws.CLOSED){
ws.close();
}
this.headSubscription.removeWs(ws, true);
for (const [subId, sub] of this.logSubscriptions)
sub.removeWs(ws, true)
sub.removeWs(ws, true);
},
}).listen(host, port, (token: uWS.us_listen_socket) => {
if (token) {
Expand All @@ -91,21 +94,26 @@ export default class WebsocketRPC {
});
}

makeResponse(result, originalMessage) {
makeResponse(result: any, originalMessage: any) {
return {"jsonrpc": "2.0", result, id: originalMessage.id};
}

makeError(message, id=null, code=-32600) {
makeError(message: String, id=null, code=-32600) {
return {"jsonrpc": "2.0", "error": {code, message}, id};
}

async handleMessage(ws, msg, ip, origin) {
async handleMessage(ws: WebSocket, msg: ArrayBuffer, ip: String, origin: String) {
const tRef = process.hrtime.bigint();
const buffer = Buffer.from(msg);
const string = buffer.toString();
try {
if(string === "CLOSE" || string === "INVALID_DATA"){
ws.close();
return;
}
const msgObj = JSON.parse(string);
if (!msgObj.method) {
this.attemptSend(ws, JSON.stringify(this.makeError("Invalid Request, no method specified", msgObj.id ? msgObj.id : null)));
ws.send(this.makeError("Invalid Request, no method specified", msgObj.id ? msgObj.id : null));
return;
}
Expand All @@ -117,7 +125,7 @@ export default class WebsocketRPC {
return;
} else if (method === "eth_unsubscribe") {
if (!msgObj?.params?.length) {
ws.send(JSON.stringify(this.makeError("Subscription ID should be provided as first parameter", msgObj.id)))
this.attemptSend(ws, JSON.stringify(this.makeError("Subscription ID should be provided as first parameter", msgObj.id)))
return;
}
const subscriptionId = msgObj.params[0];
Expand All @@ -132,7 +140,7 @@ export default class WebsocketRPC {
this.logSubscriptions.delete(sub.getId());
});
}
ws.send(JSON.stringify(this.makeResponse(true, msgObj)));
this.attemptSend(ws, JSON.stringify(this.makeResponse(true, msgObj)));
const duration = ((Number(process.hrtime.bigint()) - Number(tRef)) / 1000).toFixed(3);
console.log(`WSUNSUBSCRIBE: ${new Date().toISOString()} - ${duration} μs - ${ip} (0/0) - ${origin} - ${msgObj.params[0]}`);
return;
Expand All @@ -143,22 +151,41 @@ export default class WebsocketRPC {
if (!sub.hasClients())
this.logSubscriptions.delete(sub.getId());
});
ws.send(JSON.stringify(this.makeResponse(true, msgObj)));
this.attemptSend(ws, JSON.stringify(this.makeResponse(true, msgObj)));
const duration = ((Number(process.hrtime.bigint()) - Number(tRef)) / 1000).toFixed(3);
console.log(`WSUNSUBSCRIBE: ${new Date().toISOString()} - ${duration} μs - ${ip} (0/0) - ${origin} - all`);
return;
}

const rpcResponse = await this.rpcHandlerContainer.handler(msgObj, ws.clientInfo);
ws.send(JSON.stringify(rpcResponse));
this.attemptSend(ws, JSON.stringify(rpcResponse));
const duration = ((Number(process.hrtime.bigint()) - Number(tRef)) / 1000).toFixed(3);
console.log(`WS: ${new Date().toISOString()} - ${duration} μs - ${ip} (0/0) - ${origin} - ${msgObj.method}`);
} catch (e) {
console.error(`Failed to parse websocket message: ${string} error: ${e.message}`);
}
}

async handleSubscription(ws, msgObj): Promise<void> {
async attemptSend(ws: WebSocket, message: string, retries = 0, maxRetries = 30) {
if (ws.readyState !== ws.OPEN) {
console.log('WebSocket is not open. Cannot send message.');
return;
}

const success = ws.send(message);
if (!success) {
console.log('Failed to send message');
if (retries < maxRetries) {
retries++;
console.log(`Failed to send message. Retrying... Attempt ${retries} of ${maxRetries}`);
const self = this;
setTimeout(async () => await self.attemptSend(ws, message, retries, maxRetries), 1000); // Wait 1 second before retrying
} else {
console.log('Failed to send message. Max retries reached. Closing connection.');
ws.close(1011, 'Repeated send failures');
}
}
}
async handleSubscription(ws: WebSocket, msgObj: any): Promise<void> {
switch (msgObj.params[0]) {
case 'logs':
this.handleLogSubscription(ws, msgObj);
Expand All @@ -167,28 +194,29 @@ export default class WebsocketRPC {
this.handleNewHeadsSubscription(ws, msgObj);
break;
default:
ws.send(JSON.stringify(this.makeError(`Subscription type ${msgObj.params[0]} is not supported`, msgObj.id)));
await this.attemptSend(ws, JSON.stringify(this.makeError(`Subscription type ${msgObj.params[0]} is not supported`, msgObj.id)));
break;
}
}

async handleLogSubscription(ws, msgObj): Promise<void> {
async handleLogSubscription(ws: WebSocket, msgObj: any): Promise<void> {
const filter = msgObj.params[1];
if(!filter?.address){
ws.send(JSON.stringify(this.makeError("address should be provided in params", msgObj.id)));
await this.attemptSend(ws, JSON.stringify(this.makeError("address should be provided in params", msgObj.id)));
}
const subscriptionId = LogSubscription.makeId(filter);
if (!this.logSubscriptions.has(subscriptionId)) {
this.logSubscriptions.set(subscriptionId, new LogSubscription(this.websocketRPC, subscriptionId, filter, this.config.debug))
}

this.logSubscriptions.get(subscriptionId).addWs(ws);
ws.send(JSON.stringify(this.makeResponse(subscriptionId, msgObj)));

await this.attemptSend(ws, JSON.stringify(this.makeResponse(subscriptionId, msgObj)));
}

async handleNewHeadsSubscription(ws, msgObj): Promise<void> {
async handleNewHeadsSubscription(ws: WebSocket, msgObj: Object): Promise<void> {
this.headSubscription.addWs(ws);
ws.send(JSON.stringify(this.makeResponse(this.headSubscription.getId(), msgObj)));
await this.attemptSend(ws, JSON.stringify(this.makeResponse(this.headSubscription.getId(), msgObj)));
}

handleIndexerMessage(data): void{
Expand All @@ -208,7 +236,7 @@ export default class WebsocketRPC {
}
}

handleRawMessage(data): void {
handleRawMessage(data: any): void {
for (const [subId, sub] of this.logSubscriptions) {
const tRef = process.hrtime.bigint();
sub.handleRawAction(data);
Expand All @@ -219,7 +247,7 @@ export default class WebsocketRPC {
}
}

handleHeadMessage(head): void {
handleHeadMessage(head: any): void {
const tRef = process.hrtime.bigint();
if(this.headSubscription.hasClients()){
const headMessage = {
Expand Down