From 6e2f99f20bb0d31f11795a2bcc3a1ae8bb577914 Mon Sep 17 00:00:00 2001 From: AJ Date: Fri, 20 Dec 2024 12:56:34 +0100 Subject: [PATCH] feat(fuel-streams-ws): Introduce FuelStreamsClient for Websocket Proxy Introduced in a `fuel-streams-ws` namespace to allow updating the fuel-streams progressively --- packages/fuel-streams-ws/package.json | 25 +++ .../src/__tests__/index.test.ts | 174 ++++++++++++++++++ packages/fuel-streams-ws/src/index.ts | 159 ++++++++++++++++ packages/fuel-streams-ws/tsconfig.json | 11 ++ packages/fuel-streams-ws/tsup.config.js | 12 ++ pnpm-lock.yaml | 45 +++++ 6 files changed, 426 insertions(+) create mode 100644 packages/fuel-streams-ws/package.json create mode 100644 packages/fuel-streams-ws/src/__tests__/index.test.ts create mode 100644 packages/fuel-streams-ws/src/index.ts create mode 100644 packages/fuel-streams-ws/tsconfig.json create mode 100644 packages/fuel-streams-ws/tsup.config.js diff --git a/packages/fuel-streams-ws/package.json b/packages/fuel-streams-ws/package.json new file mode 100644 index 0000000..e5bc6e3 --- /dev/null +++ b/packages/fuel-streams-ws/package.json @@ -0,0 +1,25 @@ +{ + "name": "@fuels/streams-ws", + "version": "1.0.0", + "description": "Official data streaming Typescript library for Fuel Network", + "author": "Fuel Labs (https://fuel.network/)", + "homepage": "https://github.com/FuelLabs/fuel-streams-js", + "main": "./src/index.ts", + "scripts": { + "build": "tsup", + "build:watch": "tsup --watch", + "ts:check": "tsc --noEmit", + "test": "vitest run" + }, + "keywords": [], + "license": "ISC", + "dependencies": { + "isomorphic-ws": "5.0.0" + }, + "devDependencies": { + "cross-fetch": "4.0.0", + "mock-socket": "9.3.1", + "vitest": "~2.1.6", + "vitest-websocket-mock": "0.4.0" + } +} diff --git a/packages/fuel-streams-ws/src/__tests__/index.test.ts b/packages/fuel-streams-ws/src/__tests__/index.test.ts new file mode 100644 index 0000000..f8dfeaf --- /dev/null +++ b/packages/fuel-streams-ws/src/__tests__/index.test.ts @@ -0,0 +1,174 @@ +import fetch from 'cross-fetch'; +import { FuelStreamsClient } from 'src/index'; +import { afterEach, beforeEach, describe, expect, test, vi } from 'vitest'; + +// Mock `fetch` globally +global.fetch = fetch; + +vi.mock('cross-fetch'); + +const MOCK_NETWORK_URL = 'https://example.com'; // Network URL +const MOCK_WS_URL = 'wss://example.com/api/v1/ws'; // WebSocket URL +const MOCK_USERNAME = 'testUser'; +const MOCK_PASSWORD = 'testPass'; +const MOCK_JWT = 'mock-jwt-token'; + +describe('FuelStreamsClient', () => { + let client: FuelStreamsClient; + + beforeEach(() => { + // Mock JWT fetch + vi.resetAllMocks(); + vi.mocked(fetch).mockImplementation(async (url) => { + if (url === `${MOCK_NETWORK_URL}/api/v1/jwt`) { + return { + ok: true, + json: async () => ({ jwt_token: MOCK_JWT }), + } as Response; + } + throw new Error(`Unexpected fetch URL: ${url}`); + }); + + // Initialize FuelStreamsClient and inject mock WebSocket + client = new FuelStreamsClient( + MOCK_NETWORK_URL, + MOCK_USERNAME, + MOCK_PASSWORD, + ); + client.setWebsocketModule(MockWebSocket); // Inject mock WebSocket + }); + + afterEach(() => { + vi.clearAllMocks(); + }); + + test('should fetch JWT via refreshJwt', async () => { + await client.refreshJwt(); + expect(fetch).toHaveBeenCalledWith(`${MOCK_NETWORK_URL}/api/v1/jwt`, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ + username: MOCK_USERNAME, + password: MOCK_PASSWORD, + }), + }); + }); + + test('should connect to WebSocket server', async () => { + await client.connect(); + + expect(client.ws).toBeInstanceOf(MockWebSocket); + expect(client.ws?.url).toBe(MOCK_WS_URL); + }); + + test('should send subscription message to WebSocket server', async () => { + await client.connect(); + const ws = client.ws as MockWebSocket; + + const messageSpy = vi.fn(); + ws.onmessage = messageSpy; + + client.subscribe('example.topic', { type: 'All' }); + + expect(messageSpy).toHaveBeenCalledWith( + expect.objectContaining({ + data: JSON.stringify({ + type: 'Subscribe', + payload: { + wildcard: 'example.topic', + deliverPolicy: { type: 'All' }, + }, + }), + }), + ); + }); + + test('should send unsubscription message to WebSocket server', async () => { + await client.connect(); + const ws = client.ws as MockWebSocket; + + const messageSpy = vi.fn(); + ws.onmessage = messageSpy; + + client.unsubscribe('example.topic'); // No deliverPolicy specified, defaults to { type: 'All' } + + expect(messageSpy).toHaveBeenCalledWith( + expect.objectContaining({ + data: JSON.stringify({ + type: 'Unsubscribe', + payload: { + wildcard: 'example.topic', + deliverPolicy: { type: 'All' }, + }, + }), + }), + ); + }); + + test('should handle incoming messages from WebSocket server', async () => { + const mockMessage = { type: 'TestMessage', data: { foo: 'bar' } }; + + await client.connect(); + const ws = client.ws as MockWebSocket; + + const onMessage = vi.fn(); + client.listen(onMessage); + + ws.triggerMessage(JSON.stringify(mockMessage)); // Trigger incoming message + + expect(onMessage).toHaveBeenCalledWith(mockMessage); + }); + + test('should disconnect from WebSocket server', async () => { + await client.connect(); + const ws = client.ws as MockWebSocket; + + ws.close = vi.fn(); + client.disconnect(); + + expect(ws.close).toHaveBeenCalled(); + expect(client.ws).toBeNull(); + }); +}); + +// Define the mock WebSocket class +class MockWebSocket { + static OPEN = 1; + static CLOSED = 3; + readyState = MockWebSocket.CLOSED; + onopen: (() => void) | null = null; + onmessage: ((event: { data: string }) => void) | null = null; + onclose: (() => void) | null = null; + onerror: ((error: ErrorEvent) => void) | null = null; + + constructor( + public url: string, + public protocols?: string[], + ) { + // Automatically trigger `onopen` after construction to simulate connection + setTimeout(() => { + this.readyState = MockWebSocket.OPEN; + if (this.onopen) this.onopen(); + }, 0); + } + + send(data: string) { + if (this.readyState !== MockWebSocket.OPEN) { + throw new Error('WebSocket is not open'); + } + if (this.onmessage) { + this.onmessage({ data }); + } + } + + close() { + this.readyState = MockWebSocket.CLOSED; + if (this.onclose) this.onclose(); + } + + triggerMessage(data: string) { + if (this.onmessage) { + this.onmessage({ data }); + } + } +} diff --git a/packages/fuel-streams-ws/src/index.ts b/packages/fuel-streams-ws/src/index.ts new file mode 100644 index 0000000..254e072 --- /dev/null +++ b/packages/fuel-streams-ws/src/index.ts @@ -0,0 +1,159 @@ +import fetch from 'cross-fetch'; +import WebSocket from 'isomorphic-ws'; + +interface LoginResponse { + jwt_token: string; +} + +// Define DeliverPolicy as per the Rust model +type DeliverPolicy = + | { type: 'All' } + | { type: 'Last' } + | { type: 'New' } + | { type: 'ByStartSequence'; startSequence: number } + | { type: 'ByStartTime'; startTime: string } + | { type: 'LastPerSubject' }; + +// SubscriptionPayload matches the Rust structure +interface SubscriptionPayload { + wildcard: string; + deliverPolicy: DeliverPolicy; +} + +// ClientMessage matches the Rust model +type ClientMessage = + | { type: 'Subscribe'; payload: SubscriptionPayload } + | { type: 'Unsubscribe'; payload: SubscriptionPayload }; + +// ServerMessage matches the Rust model +type ServerMessage = + | { type: 'Subscribed'; payload: SubscriptionPayload } + | { type: 'Unsubscribed'; payload: SubscriptionPayload } + | { type: 'Update'; payload: Uint8Array } + | { type: 'Error'; payload: string }; + +export class FuelStreamsClient { + private wsUrl: string; + private jwtToken: string | null = null; + private ws: WebSocket | null = null; + + private websocketModule = WebSocket; + + constructor( + private networkUrl: string, + private username: string, + private password: string, + ) { + this.wsUrl = `${networkUrl.replace('http', 'ws')}/api/v1/ws`; + console.log('WebSocket URL:', this.wsUrl); + } + + /** @internal */ + public setWebsocketModule(module: WebSocket): void { + this.websocketModule = module; + } + + public async refreshJwt(): Promise { + this.jwtToken = await this.fetchJWT(); + } + + // Connect to WebSocket + public async connect(): Promise { + if (!this.jwtToken) { + this.jwtToken = await this.fetchJWT(); + } + + return new Promise((resolve, reject) => { + this.ws = new this.websocketModule(this.wsUrl, { + headers: { + Authorization: `Bearer ${this.jwtToken}`, + }, + }); + + this.ws.onopen = () => { + console.log('Connected to WebSocket'); + resolve(); + }; + + this.ws.onerror = (error: Error) => { + console.error('WebSocket error:', error); + reject(error); + }; + + this.ws.onclose = ({ + code, + reason, + }: { code: string; reason: string }) => { + console.log(`WebSocket closed: ${code} - ${reason}`); + }; + }); + } + + // Fetch JWT (works in both environments with cross-fetch) + private async fetchJWT(): Promise { + const response = await fetch(`${this.networkUrl}/api/v1/jwt`, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + }, + body: JSON.stringify({ + username: this.username, + password: this.password, + }), + }); + + if (!response.ok) { + throw new Error(`Failed to fetch JWT: ${response.statusText}`); + } + + const data: LoginResponse = await response.json(); + return data.jwt_token; + } + + // Send a message to WebSocket + private sendMessage(message: ClientMessage): void { + if (!this.ws || this.ws.readyState !== WebSocket.OPEN) { + throw new Error('WebSocket is not connected'); + } + + const serializedMessage = JSON.stringify(message); + this.ws.send(serializedMessage); + } + + // Subscribe to a topic + public subscribe(wildcard: string, deliverPolicy: DeliverPolicy): void { + const payload: SubscriptionPayload = { wildcard, deliverPolicy }; + const message: ClientMessage = { type: 'Subscribe', payload }; + this.sendMessage(message); + } + + // Unsubscribe from a topic + public unsubscribe( + wildcard: string, + deliverPolicy: DeliverPolicy = { type: 'All' }, + ): void { + const payload: SubscriptionPayload = { wildcard, deliverPolicy }; + const message: ClientMessage = { type: 'Unsubscribe', payload }; + this.sendMessage(message); + } + + // Listen for server messages + public listen(callback: (message: ServerMessage) => void): void { + if (!this.ws) { + throw new Error('WebSocket is not connected'); + } + + this.ws.onmessage = ({ data }: { data: string }) => { + const message: ServerMessage = JSON.parse(data); + callback(message); + }; + } + + // Disconnect from WebSocket + public disconnect(): void { + if (this.ws) { + this.ws.close(); + this.ws = null; + } + } +} diff --git a/packages/fuel-streams-ws/tsconfig.json b/packages/fuel-streams-ws/tsconfig.json new file mode 100644 index 0000000..c200a37 --- /dev/null +++ b/packages/fuel-streams-ws/tsconfig.json @@ -0,0 +1,11 @@ +{ + "extends": "../../tsconfig.json", + "compilerOptions": { + "tsBuildInfoFile": "./node_modules/.tmp/tsconfig.tsbuildinfo", + "outDir": "./dist", + "baseUrl": ".", + "incremental": true + }, + "exclude": ["dist"], + "include": ["src"] +} diff --git a/packages/fuel-streams-ws/tsup.config.js b/packages/fuel-streams-ws/tsup.config.js new file mode 100644 index 0000000..dd28800 --- /dev/null +++ b/packages/fuel-streams-ws/tsup.config.js @@ -0,0 +1,12 @@ +import baseConfig from '@fuels/tsup-config'; +import { defineConfig } from 'tsup'; + +export default defineConfig((options) => ({ + ...baseConfig(options, { withReact: false }), + entry: ['src/index.ts'], + external: [], + minify: 'terser', + dts: true, + splitting: true, + metafile: true, +})); diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 99f2e1f..73a25fe 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -158,6 +158,25 @@ importers: specifier: 0.30.2 version: 0.30.2 + packages/fuel-streams-ws: + dependencies: + isomorphic-ws: + specifier: 5.0.0 + version: 5.0.0(ws@8.18.0(bufferutil@4.0.8)(utf-8-validate@5.0.10)) + devDependencies: + cross-fetch: + specifier: 4.0.0 + version: 4.0.0(encoding@0.1.13) + mock-socket: + specifier: 9.3.1 + version: 9.3.1 + vitest: + specifier: ~2.1.6 + version: 2.1.6(@types/node@22.10.0)(jiti@1.21.6)(jsdom@24.0.0(bufferutil@4.0.8)(utf-8-validate@5.0.10))(terser@5.36.0)(tsx@4.19.2)(yaml@2.5.0) + vitest-websocket-mock: + specifier: 0.4.0 + version: 0.4.0(vitest@2.1.6(@types/node@22.10.0)(jiti@1.21.6)(jsdom@24.0.0(bufferutil@4.0.8)(utf-8-validate@5.0.10))(terser@5.36.0)(tsx@4.19.2)(yaml@2.5.0)) + packages/simple-app: dependencies: '@fuels/streams': @@ -2097,6 +2116,9 @@ packages: cross-fetch@3.1.8: resolution: {integrity: sha512-cvA+JwZoU0Xq+h6WkMvAUqPEYy92Obet6UdKLfW60qn99ftItKjB5T+BkyWOFWe2pUyfQ+IJHmpOTznqk1M6Kg==} + cross-fetch@4.0.0: + resolution: {integrity: sha512-e4a5N8lVvuLgAWgnCrLr2PP0YyDOTHa9H/Rj54dirp61qXnNq46m82bRhNqIA5VccJtWBvPTFRV3TtvHUKPB1g==} + cross-spawn@7.0.3: resolution: {integrity: sha512-iRDPJKUPVEND7dHPO8rkbOnPpyDygcDFtWjpeWNCgy8WP2rXcxXL8TskReQl6OrB2G7+UJrags1q15Fudc7G6w==} engines: {node: '>= 8'} @@ -2870,6 +2892,10 @@ packages: engines: {node: '>=10'} hasBin: true + mock-socket@9.3.1: + resolution: {integrity: sha512-qxBgB7Qa2sEQgHFjj0dSigq7fX4k6Saisd5Nelwp2q8mlbAFh5dHV9JTTlF8viYJLSSWgMCZFUom8PJcMNBoJw==} + engines: {node: '>= 8'} + mri@1.2.0: resolution: {integrity: sha512-tzzskb3bG8LvYGFF/mDTpq3jpI6Q9wc3LEmBaghu+DdCssd1FakN7Bc0hVNmEyGq1bq3RgfkCb3cmQLpNPOroA==} engines: {node: '>=4'} @@ -3913,6 +3939,11 @@ packages: yaml: optional: true + vitest-websocket-mock@0.4.0: + resolution: {integrity: sha512-tGnOwE2nC8jfioQXDrX+lZ8EVrF+IO2NVqe1vV9h945W/hlR0S6ZYbMqCJGG3Nyd//c5XSe1IGLD2ZgE2D1I7Q==} + peerDependencies: + vitest: '>=2' + vitest@2.1.6: resolution: {integrity: sha512-isUCkvPL30J4c5O5hgONeFRsDmlw6kzFEdLQHLezmDdKQHy8Ke/B/dgdTMEgU0vm+iZ0TjW8GuK83DiahBoKWQ==} engines: {node: ^18.0.0 || ^20.0.0 || >=22.0.0} @@ -5966,6 +5997,12 @@ snapshots: transitivePeerDependencies: - encoding + cross-fetch@4.0.0(encoding@0.1.13): + dependencies: + node-fetch: 2.7.0(encoding@0.1.13) + transitivePeerDependencies: + - encoding + cross-spawn@7.0.3: dependencies: path-key: 3.1.1 @@ -6832,6 +6869,8 @@ snapshots: mkdirp@3.0.1: {} + mock-socket@9.3.1: {} + mri@1.2.0: {} ms@2.1.2: {} @@ -7836,6 +7875,12 @@ snapshots: tsx: 4.19.2 yaml: 2.5.0 + vitest-websocket-mock@0.4.0(vitest@2.1.6(@types/node@22.10.0)(jiti@1.21.6)(jsdom@24.0.0(bufferutil@4.0.8)(utf-8-validate@5.0.10))(terser@5.36.0)(tsx@4.19.2)(yaml@2.5.0)): + dependencies: + '@vitest/utils': 2.1.6 + mock-socket: 9.3.1 + vitest: 2.1.6(@types/node@22.10.0)(jiti@1.21.6)(jsdom@24.0.0(bufferutil@4.0.8)(utf-8-validate@5.0.10))(terser@5.36.0)(tsx@4.19.2)(yaml@2.5.0) + vitest@2.1.6(@types/node@22.10.0)(jiti@1.21.6)(jsdom@24.0.0(bufferutil@4.0.8)(utf-8-validate@5.0.10))(terser@5.36.0)(tsx@4.19.2)(yaml@2.5.0): dependencies: '@vitest/expect': 2.1.6