From f88b934981a3c2dfbaee2041de88d981e9dc4d9a Mon Sep 17 00:00:00 2001 From: Denis Pingin <2085934+denis-pingin@users.noreply.github.com> Date: Thu, 5 Oct 2023 15:23:35 +0200 Subject: [PATCH] feat: add websocket support (#35) --- package.json | 4 +- src/index.ts | 125 ++++++++++++++++--- src/lib/status/types/index.ts | 8 +- src/lib/types/index.ts | 1 + src/utils/index.ts | 2 + src/utils/isFinalTaskState.ts | 12 ++ src/utils/websocketHandler.ts | 218 ++++++++++++++++++++++++++++++++++ src/utils/websocketMessage.ts | 21 ++++ yarn.lock | 10 ++ 9 files changed, 377 insertions(+), 24 deletions(-) create mode 100644 src/utils/isFinalTaskState.ts create mode 100644 src/utils/websocketHandler.ts create mode 100644 src/utils/websocketMessage.ts diff --git a/package.json b/package.json index 89bbb67..788d72c 100644 --- a/package.json +++ b/package.json @@ -39,7 +39,9 @@ }, "dependencies": { "axios": "0.27.2", - "ethers": "6.7.0" + "ethers": "6.7.0", + "isomorphic-ws": "^5.0.0", + "ws": "^8.5.0" }, "lint-staged": { "*.{js,json,md,ts}": "yarn format", diff --git a/src/index.ts b/src/index.ts index a0a3adb..7e41dce 100644 --- a/src/index.ts +++ b/src/index.ts @@ -29,6 +29,7 @@ import { GELATO_RELAY_CONCURRENT_ERC2771_ZKSYNC_ADDRESS, GELATO_RELAY_1BALANCE_CONCURRENT_ERC2771_ZKSYNC_ADDRESS, } from "./constants"; +import { WebsocketHandler } from "./utils/index.js"; export { CallWithSyncFeeRequest, @@ -47,11 +48,14 @@ export { Config, SignerOrProvider, }; + export class GelatoRelay { #config: Config; + readonly #websocketHandler: WebsocketHandler; constructor(config?: Partial) { this.#config = this._getConfiguration(config); + this.#websocketHandler = new WebsocketHandler(this.#config.websocketUrl); } /** @@ -62,8 +66,10 @@ export class GelatoRelay { }; private _getConfiguration = (config?: Partial): Config => { + const url = config?.url ?? GELATO_RELAY_URL; return { - url: config?.url ?? GELATO_RELAY_URL, + url, + websocketUrl: url.replace(/^http/, "ws"), contract: { relayERC2771: config?.contract?.relayERC2771 ?? GELATO_RELAY_ERC2771_ADDRESS, @@ -99,12 +105,22 @@ export class GelatoRelay { * @returns {Promise} Response object with taskId parameter * */ - callWithSyncFee = ( + callWithSyncFee = async ( request: CallWithSyncFeeRequest, options?: RelayRequestOptions, sponsorApiKey?: string - ): Promise => - library.relayWithSyncFee({ request, sponsorApiKey, options }, this.#config); + ): Promise => { + const response = await library.relayWithSyncFee( + { request, sponsorApiKey, options }, + this.#config + ); + + if (this.#websocketHandler.hasHandlers()) { + await this.#websocketHandler.subscribe(response.taskId); + } + + return response; + }; /** * @param {CallWithSyncFeeERC2771Request | CallWithSyncFeeConcurrentERC2771Request} request - Call with sync fee: Sequential ERC2771 or Concurrent ERC2771 request to be relayed by Gelato Executors @@ -114,15 +130,15 @@ export class GelatoRelay { * @returns {Promise} Response object with taskId parameter * */ - callWithSyncFeeERC2771 = ( + callWithSyncFeeERC2771 = async ( request: | CallWithSyncFeeERC2771Request | CallWithSyncFeeConcurrentERC2771Request, signerOrProvider: SignerOrProvider, options?: RelayRequestOptions, sponsorApiKey?: string - ): Promise => - library.relayWithCallWithSyncFeeERC2771( + ): Promise => { + const response = await library.relayWithCallWithSyncFeeERC2771( { request, signerOrProvider, @@ -132,6 +148,13 @@ export class GelatoRelay { this.#config ); + if (this.#websocketHandler.hasHandlers()) { + await this.#websocketHandler.subscribe(response.taskId); + } + + return response; + }; + /** * @param {SponsoredCallRequest} request SponsoredCallRequest to be relayed by the Gelato Executors. * @param {string} sponsorApiKey Sponsor API key to be used for the call @@ -139,16 +162,23 @@ export class GelatoRelay { * @returns {Promise} Response object with taskId parameter * */ - sponsoredCall = ( + sponsoredCall = async ( request: SponsoredCallRequest, sponsorApiKey: string, options?: RelayRequestOptions - ): Promise => - library.relayWithSponsoredCall( + ): Promise => { + const response = await library.relayWithSponsoredCall( { request, sponsorApiKey, options }, this.#config ); + if (this.#websocketHandler.hasHandlers()) { + await this.#websocketHandler.subscribe(response.taskId); + } + + return response; + }; + /** * @param {CallWithERC2771Request | CallWithConcurrentERC2771Request} request - Sponsored: Sequential ERC2771 or Concurrent ERC2771 request to be relayed by Gelato Executors * @param {SignerOrProvider} signerOrProvider - BrowserProvider [front-end] or Signer [back-end] to sign the payload @@ -157,13 +187,13 @@ export class GelatoRelay { * @returns {Promise} Response object with taskId parameter * */ - sponsoredCallERC2771 = ( + sponsoredCallERC2771 = async ( request: CallWithERC2771Request | CallWithConcurrentERC2771Request, signerOrProvider: SignerOrProvider, sponsorApiKey: string, options?: RelayRequestOptions - ): Promise => - library.relayWithSponsoredCallERC2771( + ): Promise => { + const response = await library.relayWithSponsoredCallERC2771( { request, signerOrProvider, @@ -173,6 +203,13 @@ export class GelatoRelay { this.#config ); + if (this.#websocketHandler.hasHandlers()) { + await this.#websocketHandler.subscribe(response.taskId); + } + + return response; + }; + /** * @param {CallWithERC2771Request | CallWithConcurrentERC2771Request} request - Sequential ERC2771 or Concurrent ERC2771 request to be relayed by Gelato Executors * @param {SignerOrProvider} signerOrProvider - BrowserProvider [front-end] or Signer [back-end] to sign the payload @@ -215,13 +252,13 @@ export class GelatoRelay { * @returns {Promise} Response object with taskId parameter * */ - sponsoredCallERC2771WithSignature = ( + sponsoredCallERC2771WithSignature = async ( struct: SignatureData["struct"], signature: SignatureData["signature"], sponsorApiKey: string, options?: RelayRequestOptions - ): Promise => - library.sponsoredCallERC2771WithSignature( + ): Promise => { + const response = await library.sponsoredCallERC2771WithSignature( { struct, signature, @@ -231,6 +268,13 @@ export class GelatoRelay { this.#config ); + if (this.#websocketHandler.hasHandlers()) { + await this.#websocketHandler.subscribe(response.taskId); + } + + return response; + }; + /** * @param {SignatureData["struct"]} struct - Struct that can be obtained from getSignatureDataERC2771 * @param {BaseCallWithSyncFeeParams} syncFeeParams - Call with Sync Fee parameters @@ -240,14 +284,14 @@ export class GelatoRelay { * @returns {Promise} Response object with taskId parameter * */ - callWithSyncFeeERC2771WithSignature = ( + callWithSyncFeeERC2771WithSignature = async ( struct: SignatureData["struct"], syncFeeParams: BaseCallWithSyncFeeParams, signature: SignatureData["signature"], options?: RelayRequestOptions, sponsorApiKey?: string - ): Promise => - library.callWithSyncFeeERC2771WithSignature( + ): Promise => { + const response = await library.callWithSyncFeeERC2771WithSignature( { struct, syncFeeParams, @@ -258,6 +302,13 @@ export class GelatoRelay { this.#config ); + if (this.#websocketHandler.hasHandlers()) { + await this.#websocketHandler.subscribe(response.taskId); + } + + return response; + }; + /** * @param {bigint} chainId - Chain Id * @returns {Promise} Boolean to demonstrate if Relay V2 is supported on the provided chain @@ -322,4 +373,40 @@ export class GelatoRelay { taskId: string ): Promise => library.getTaskStatus({ taskId }, this.#config); + + /** + * @param {callback} handler - Callback function to be called on every task status update + * + */ + onTaskStatusUpdate = ( + handler: (taskStatus: TransactionStatusResponse) => void + ): void => { + this.#websocketHandler.onUpdate(handler); + }; + + /** + * @param {callback} handler - Callback function to be unregistered from task status updates + * + */ + offTaskStatusUpdate = ( + handler: (taskStatus: TransactionStatusResponse) => void + ): void => { + this.#websocketHandler.offUpdate(handler); + }; + + /** + * @param {callback} handler - Callback function to be called on error + * + */ + onError = (handler: (error: Error) => void): void => { + this.#websocketHandler.onError(handler); + }; + + /** + * @param {callback} handler - Callback function to be unregistered as an error handler + * + */ + offError = (handler: (error: Error) => void): void => { + this.#websocketHandler.offError(handler); + }; } diff --git a/src/lib/status/types/index.ts b/src/lib/status/types/index.ts index 68bf0f7..c625434 100644 --- a/src/lib/status/types/index.ts +++ b/src/lib/status/types/index.ts @@ -8,15 +8,15 @@ export type TransactionStatusResponse = { transactionHash?: string; blockNumber?: number; executionDate?: string; + gasUsed?: string; + effectiveGasPrice?: string; }; -enum TaskState { +export enum TaskState { CheckPending = "CheckPending", ExecPending = "ExecPending", + WaitingForConfirmation = "WaitingForConfirmation", ExecSuccess = "ExecSuccess", ExecReverted = "ExecReverted", - WaitingForConfirmation = "WaitingForConfirmation", - Blacklisted = "Blacklisted", Cancelled = "Cancelled", - NotFound = "NotFound", } diff --git a/src/lib/types/index.ts b/src/lib/types/index.ts index afb4e00..af1ad32 100644 --- a/src/lib/types/index.ts +++ b/src/lib/types/index.ts @@ -55,6 +55,7 @@ export type BaseCallWithSyncFeeParams = { export type Config = { url: string; + websocketUrl: string; contract: { relayERC2771: string; relay1BalanceERC2771: string; diff --git a/src/utils/index.ts b/src/utils/index.ts index 54ff20a..705485f 100644 --- a/src/utils/index.ts +++ b/src/utils/index.ts @@ -13,3 +13,5 @@ export * from "./axios"; export * from "./isConcurrentStruct"; export * from "./isConcurrentRequest"; export * from "./generateSalt"; +export * from "./isFinalTaskState"; +export * from "./websocketHandler"; diff --git a/src/utils/isFinalTaskState.ts b/src/utils/isFinalTaskState.ts new file mode 100644 index 0000000..bafdb47 --- /dev/null +++ b/src/utils/isFinalTaskState.ts @@ -0,0 +1,12 @@ +import { TaskState } from "../lib/status/types/index.js"; + +export const isFinalTaskState = (taskState: TaskState): boolean => { + switch (taskState) { + case TaskState.ExecSuccess: + case TaskState.ExecReverted: + case TaskState.Cancelled: + return true; + default: + return false; + } +}; diff --git a/src/utils/websocketHandler.ts b/src/utils/websocketHandler.ts new file mode 100644 index 0000000..f57f4d9 --- /dev/null +++ b/src/utils/websocketHandler.ts @@ -0,0 +1,218 @@ +import WebSocket from "isomorphic-ws"; + +import { TransactionStatusResponse } from "../lib/status/types/index.js"; + +import { isFinalTaskState } from "./isFinalTaskState.js"; +import { + ErrorWebsocketMessage, + UpdateWebsocketMessage, + WebsocketEvent, + WebsocketMessage, +} from "./websocketMessage.js"; + +export class WebsocketHandler { + readonly #url: string; + readonly #subscriptions: Set = new Set(); + #updateHandlers: ((taskStatus: TransactionStatusResponse) => void)[] = []; + #errorHandlers: ((error: Error) => void)[] = []; + #websocket?: WebSocket; + readonly #reconnectIntervalMillis = 1000; + readonly #connectTimeoutMillis = 10000; + + constructor(url: string) { + this.#url = `${url}/tasks/ws/status`; + } + + public onUpdate( + handler: (taskStatus: TransactionStatusResponse) => void + ): void { + if (!handler) { + throw new Error("Callback handler is not provided"); + } + + this.#updateHandlers.push(handler); + + this._connect(); + } + + public offUpdate( + handler: (taskStatus: TransactionStatusResponse) => void + ): void { + if (!handler) { + throw new Error("Callback handler is not provided"); + } + + this.#updateHandlers = this.#updateHandlers.filter( + (element) => element !== handler + ); + + this._disconnectIfUnused(); + } + + public onError(handler: (error: Error) => void): void { + if (!handler) { + throw new Error("Callback handler is not provided"); + } + + this.#errorHandlers.push(handler); + + this._connect(); + } + + public offError(handler: (error: Error) => void): void { + if (!handler) { + throw new Error("Callback handler is not provided"); + } + + this.#errorHandlers = this.#errorHandlers.filter( + (element) => element !== handler + ); + + this._disconnectIfUnused(); + } + + public async subscribe(taskId: string) { + if (this.#subscriptions.has(taskId)) { + return; + } + + this.#subscriptions.add(taskId); + + await this._sendWebsocketMessage({ + action: "subscribe", + taskId, + }); + } + + public async unsubscribe(taskId: string) { + if (!this.#subscriptions.has(taskId)) { + return; + } + + this.#subscriptions.delete(taskId); + + await this._sendWebsocketMessage({ + action: "unsubscribe", + taskId, + }); + } + + public hasHandlers(): boolean { + return this.#updateHandlers.length > 0 || this.#errorHandlers.length > 0; + } + + private _connect() { + if (this.#websocket) { + return; + } + + this.#websocket = new WebSocket(this.#url); + + this.#websocket.onopen = async () => { + this.#subscriptions.forEach((taskId) => { + this._sendWebsocketMessage({ + action: "subscribe", + taskId, + }); + }); + }; + + this.#websocket.onclose = () => { + setTimeout(() => { + this._reconnect(); + }, this.#reconnectIntervalMillis); + }; + + this.#websocket.onmessage = async (data: WebSocket.MessageEvent) => { + const message = JSON.parse( + data.data.toString() + ) as WebsocketMessage; + + switch (message.event) { + case WebsocketEvent.ERROR: { + const errorWebsocketMessage = message as ErrorWebsocketMessage; + const error: Error = errorWebsocketMessage.payload; + + this.#errorHandlers.forEach((handler) => { + handler(error); + }); + break; + } + case WebsocketEvent.UPDATE: { + const updateWebsocketMessage = message as UpdateWebsocketMessage; + const taskStatus: TransactionStatusResponse = + updateWebsocketMessage.payload; + + this.#updateHandlers.forEach((handler) => { + handler(taskStatus); + }); + + if (isFinalTaskState(taskStatus.taskState)) { + await this.unsubscribe(taskStatus.taskId); + } + break; + } + default: { + break; + } + } + }; + } + + private async _sendWebsocketMessage(message: unknown): Promise { + const isConnected = await this._ensureIsConnected(); + if (isConnected) { + this.#websocket.send(JSON.stringify(message)); + } + } + + private _disconnectIfUnused(): void { + if ( + this.#updateHandlers.length === 0 && + this.#errorHandlers.length === 0 && + this.#websocket + ) { + this._disconnect(); + } + } + + private _disconnect(): void { + if (this.#websocket) { + this.#websocket.close(); + this.#websocket = undefined; + } + } + + private _reconnect(): void { + this._disconnect(); + this._connect(); + } + + private async _ensureIsConnected(): Promise { + if (!this.#websocket) { + this._connect(); + } else if (this.#websocket.readyState !== WebSocket.OPEN) { + this._reconnect(); + } + return await this._awaitConnection(); + } + + private async _awaitConnection(): Promise { + const start = Date.now(); + while (!this.#websocket || this.#websocket.readyState !== WebSocket.OPEN) { + const elapsed = Date.now() - start; + if (elapsed > this.#connectTimeoutMillis) { + const error = new Error( + `Timeout connecting to ${this.#url} after ${elapsed}ms` + ); + this.#errorHandlers.forEach((handler) => { + handler(error); + }); + return false; + } + + await new Promise((resolve) => setTimeout(resolve, 10)); + } + return true; + } +} diff --git a/src/utils/websocketMessage.ts b/src/utils/websocketMessage.ts new file mode 100644 index 0000000..d7551ab --- /dev/null +++ b/src/utils/websocketMessage.ts @@ -0,0 +1,21 @@ +import { TransactionStatusResponse } from "../lib/status/types/index.js"; + +export enum WebsocketEvent { + ERROR = "error", + UPDATE = "update", +} + +export interface WebsocketMessage { + event: WebsocketEvent; + payload: T; +} + +export interface UpdateWebsocketMessage { + event: WebsocketEvent.UPDATE; + payload: TransactionStatusResponse; +} + +export interface ErrorWebsocketMessage { + event: WebsocketEvent.ERROR; + payload: Error; +} diff --git a/yarn.lock b/yarn.lock index 23a08d9..99d9590 100644 --- a/yarn.lock +++ b/yarn.lock @@ -1573,6 +1573,11 @@ isexe@^2.0.0: resolved "https://registry.yarnpkg.com/isexe/-/isexe-2.0.0.tgz#e8fbf374dc556ff8947a10dcb0572d633f2cfa10" integrity sha512-RHxMLp9lnKHGHRng9QFhRCMbYAcVpn69smSGcq3f36xjgVVWThj4qqLbTLlq7Ssj8B+fIQ1EuCEGI2lKsyQeIw== +isomorphic-ws@^5.0.0: + version "5.0.0" + resolved "https://registry.yarnpkg.com/isomorphic-ws/-/isomorphic-ws-5.0.0.tgz#e5529148912ecb9b451b46ed44d53dae1ce04bbf" + integrity sha512-muId7Zzn9ywDsyXgTIafTry2sV3nySZeUDe6YedVd1Hvuuep5AsIlqK+XefWpYTyJG5e503F2xIuT2lcU6rCSw== + js-yaml@^4.1.0: version "4.1.0" resolved "https://registry.yarnpkg.com/js-yaml/-/js-yaml-4.1.0.tgz#c1fb65f8f5017901cdd2c951864ba18458a10602" @@ -2463,6 +2468,11 @@ ws@8.5.0: resolved "https://registry.yarnpkg.com/ws/-/ws-8.5.0.tgz#bfb4be96600757fe5382de12c670dab984a1ed4f" integrity sha512-BWX0SWVgLPzYwF8lTzEy1egjhS4S4OEAHfsO8o65WOVsrnSRGaSiUaa9e0ggGlkMTtBlmOpEXiie9RUcBO86qg== +ws@^8.5.0: + version "8.14.2" + resolved "https://registry.yarnpkg.com/ws/-/ws-8.14.2.tgz#6c249a806eb2db7a20d26d51e7709eab7b2e6c7f" + integrity sha512-wEBG1ftX4jcglPxgFCMJmZ2PLtSbJ2Peg6TmpJFTbe9GZYOQCDPdMYu/Tm0/bGZkw8paZnJY45J4K2PZrLYq8g== + yallist@^4.0.0: version "4.0.0" resolved "https://registry.yarnpkg.com/yallist/-/yallist-4.0.0.tgz#9bb92790d9c0effec63be73519e11a35019a3a72"