Skip to content

Commit

Permalink
feat(fuel-streams-ws): Introduce FuelStreamsClient for Websocket Proxy
Browse files Browse the repository at this point in the history
Introduced in a `fuel-streams-ws` namespace to allow updating the
fuel-streams progressively
  • Loading branch information
Jurshsmith committed Dec 20, 2024
1 parent d5f30f4 commit 6e2f99f
Show file tree
Hide file tree
Showing 6 changed files with 426 additions and 0 deletions.
25 changes: 25 additions & 0 deletions packages/fuel-streams-ws/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
{
"name": "@fuels/streams-ws",
"version": "1.0.0",
"description": "Official data streaming Typescript library for Fuel Network",
"author": "Fuel Labs <[email protected]> (https://fuel.network/)",
"homepage": "https://github.com/FuelLabs/fuel-streams-js",
"main": "./src/index.ts",
"scripts": {
"build": "tsup",
"build:watch": "tsup --watch",
"ts:check": "tsc --noEmit",
"test": "vitest run"
},
"keywords": [],
"license": "ISC",
"dependencies": {
"isomorphic-ws": "5.0.0"
},
"devDependencies": {
"cross-fetch": "4.0.0",
"mock-socket": "9.3.1",
"vitest": "~2.1.6",
"vitest-websocket-mock": "0.4.0"
}
}
174 changes: 174 additions & 0 deletions packages/fuel-streams-ws/src/__tests__/index.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
import fetch from 'cross-fetch';
import { FuelStreamsClient } from 'src/index';
import { afterEach, beforeEach, describe, expect, test, vi } from 'vitest';

// Mock `fetch` globally
global.fetch = fetch;

vi.mock('cross-fetch');

const MOCK_NETWORK_URL = 'https://example.com'; // Network URL
const MOCK_WS_URL = 'wss://example.com/api/v1/ws'; // WebSocket URL
const MOCK_USERNAME = 'testUser';
const MOCK_PASSWORD = 'testPass';
const MOCK_JWT = 'mock-jwt-token';

describe('FuelStreamsClient', () => {
let client: FuelStreamsClient;

beforeEach(() => {
// Mock JWT fetch
vi.resetAllMocks();
vi.mocked(fetch).mockImplementation(async (url) => {
if (url === `${MOCK_NETWORK_URL}/api/v1/jwt`) {
return {
ok: true,
json: async () => ({ jwt_token: MOCK_JWT }),
} as Response;
}
throw new Error(`Unexpected fetch URL: ${url}`);
});

// Initialize FuelStreamsClient and inject mock WebSocket
client = new FuelStreamsClient(
MOCK_NETWORK_URL,
MOCK_USERNAME,
MOCK_PASSWORD,
);
client.setWebsocketModule(MockWebSocket); // Inject mock WebSocket
});

afterEach(() => {
vi.clearAllMocks();
});

test('should fetch JWT via refreshJwt', async () => {
await client.refreshJwt();
expect(fetch).toHaveBeenCalledWith(`${MOCK_NETWORK_URL}/api/v1/jwt`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
username: MOCK_USERNAME,
password: MOCK_PASSWORD,
}),
});
});

test('should connect to WebSocket server', async () => {
await client.connect();

expect(client.ws).toBeInstanceOf(MockWebSocket);
expect(client.ws?.url).toBe(MOCK_WS_URL);
});

test('should send subscription message to WebSocket server', async () => {
await client.connect();
const ws = client.ws as MockWebSocket;

const messageSpy = vi.fn();
ws.onmessage = messageSpy;

client.subscribe('example.topic', { type: 'All' });

expect(messageSpy).toHaveBeenCalledWith(
expect.objectContaining({
data: JSON.stringify({
type: 'Subscribe',
payload: {
wildcard: 'example.topic',
deliverPolicy: { type: 'All' },
},
}),
}),
);
});

test('should send unsubscription message to WebSocket server', async () => {
await client.connect();
const ws = client.ws as MockWebSocket;

const messageSpy = vi.fn();
ws.onmessage = messageSpy;

client.unsubscribe('example.topic'); // No deliverPolicy specified, defaults to { type: 'All' }

expect(messageSpy).toHaveBeenCalledWith(
expect.objectContaining({
data: JSON.stringify({
type: 'Unsubscribe',
payload: {
wildcard: 'example.topic',
deliverPolicy: { type: 'All' },
},
}),
}),
);
});

test('should handle incoming messages from WebSocket server', async () => {
const mockMessage = { type: 'TestMessage', data: { foo: 'bar' } };

await client.connect();
const ws = client.ws as MockWebSocket;

const onMessage = vi.fn();
client.listen(onMessage);

ws.triggerMessage(JSON.stringify(mockMessage)); // Trigger incoming message

expect(onMessage).toHaveBeenCalledWith(mockMessage);
});

test('should disconnect from WebSocket server', async () => {
await client.connect();
const ws = client.ws as MockWebSocket;

ws.close = vi.fn();
client.disconnect();

expect(ws.close).toHaveBeenCalled();
expect(client.ws).toBeNull();
});
});

// Define the mock WebSocket class
class MockWebSocket {
static OPEN = 1;
static CLOSED = 3;
readyState = MockWebSocket.CLOSED;
onopen: (() => void) | null = null;
onmessage: ((event: { data: string }) => void) | null = null;
onclose: (() => void) | null = null;
onerror: ((error: ErrorEvent) => void) | null = null;

constructor(
public url: string,
public protocols?: string[],
) {
// Automatically trigger `onopen` after construction to simulate connection
setTimeout(() => {
this.readyState = MockWebSocket.OPEN;
if (this.onopen) this.onopen();
}, 0);
}

send(data: string) {
if (this.readyState !== MockWebSocket.OPEN) {
throw new Error('WebSocket is not open');
}
if (this.onmessage) {
this.onmessage({ data });
}
}

close() {
this.readyState = MockWebSocket.CLOSED;
if (this.onclose) this.onclose();
}

triggerMessage(data: string) {
if (this.onmessage) {
this.onmessage({ data });
}
}
}
159 changes: 159 additions & 0 deletions packages/fuel-streams-ws/src/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
import fetch from 'cross-fetch';
import WebSocket from 'isomorphic-ws';

interface LoginResponse {
jwt_token: string;
}

// Define DeliverPolicy as per the Rust model
type DeliverPolicy =
| { type: 'All' }
| { type: 'Last' }
| { type: 'New' }
| { type: 'ByStartSequence'; startSequence: number }
| { type: 'ByStartTime'; startTime: string }
| { type: 'LastPerSubject' };

// SubscriptionPayload matches the Rust structure
interface SubscriptionPayload {
wildcard: string;
deliverPolicy: DeliverPolicy;
}

// ClientMessage matches the Rust model
type ClientMessage =
| { type: 'Subscribe'; payload: SubscriptionPayload }
| { type: 'Unsubscribe'; payload: SubscriptionPayload };

// ServerMessage matches the Rust model
type ServerMessage =
| { type: 'Subscribed'; payload: SubscriptionPayload }
| { type: 'Unsubscribed'; payload: SubscriptionPayload }
| { type: 'Update'; payload: Uint8Array }
| { type: 'Error'; payload: string };

export class FuelStreamsClient {
private wsUrl: string;
private jwtToken: string | null = null;
private ws: WebSocket | null = null;

private websocketModule = WebSocket;

constructor(
private networkUrl: string,
private username: string,
private password: string,
) {
this.wsUrl = `${networkUrl.replace('http', 'ws')}/api/v1/ws`;
console.log('WebSocket URL:', this.wsUrl);
}

/** @internal */
public setWebsocketModule(module: WebSocket): void {
this.websocketModule = module;
}

public async refreshJwt(): Promise<void> {
this.jwtToken = await this.fetchJWT();
}

// Connect to WebSocket
public async connect(): Promise<void> {
if (!this.jwtToken) {
this.jwtToken = await this.fetchJWT();
}

return new Promise((resolve, reject) => {
this.ws = new this.websocketModule(this.wsUrl, {
headers: {
Authorization: `Bearer ${this.jwtToken}`,
},
});

this.ws.onopen = () => {
console.log('Connected to WebSocket');
resolve();
};

this.ws.onerror = (error: Error) => {
console.error('WebSocket error:', error);
reject(error);
};

this.ws.onclose = ({
code,
reason,
}: { code: string; reason: string }) => {
console.log(`WebSocket closed: ${code} - ${reason}`);
};
});
}

// Fetch JWT (works in both environments with cross-fetch)
private async fetchJWT(): Promise<string> {
const response = await fetch(`${this.networkUrl}/api/v1/jwt`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify({
username: this.username,
password: this.password,
}),
});

if (!response.ok) {
throw new Error(`Failed to fetch JWT: ${response.statusText}`);
}

const data: LoginResponse = await response.json();
return data.jwt_token;
}

// Send a message to WebSocket
private sendMessage(message: ClientMessage): void {
if (!this.ws || this.ws.readyState !== WebSocket.OPEN) {
throw new Error('WebSocket is not connected');
}

const serializedMessage = JSON.stringify(message);
this.ws.send(serializedMessage);
}

// Subscribe to a topic
public subscribe(wildcard: string, deliverPolicy: DeliverPolicy): void {
const payload: SubscriptionPayload = { wildcard, deliverPolicy };
const message: ClientMessage = { type: 'Subscribe', payload };
this.sendMessage(message);
}

// Unsubscribe from a topic
public unsubscribe(
wildcard: string,
deliverPolicy: DeliverPolicy = { type: 'All' },
): void {
const payload: SubscriptionPayload = { wildcard, deliverPolicy };
const message: ClientMessage = { type: 'Unsubscribe', payload };
this.sendMessage(message);
}

// Listen for server messages
public listen(callback: (message: ServerMessage) => void): void {
if (!this.ws) {
throw new Error('WebSocket is not connected');
}

this.ws.onmessage = ({ data }: { data: string }) => {
const message: ServerMessage = JSON.parse(data);
callback(message);
};
}

// Disconnect from WebSocket
public disconnect(): void {
if (this.ws) {
this.ws.close();
this.ws = null;
}
}
}
11 changes: 11 additions & 0 deletions packages/fuel-streams-ws/tsconfig.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
{
"extends": "../../tsconfig.json",
"compilerOptions": {
"tsBuildInfoFile": "./node_modules/.tmp/tsconfig.tsbuildinfo",
"outDir": "./dist",
"baseUrl": ".",
"incremental": true
},
"exclude": ["dist"],
"include": ["src"]
}
12 changes: 12 additions & 0 deletions packages/fuel-streams-ws/tsup.config.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import baseConfig from '@fuels/tsup-config';
import { defineConfig } from 'tsup';

export default defineConfig((options) => ({
...baseConfig(options, { withReact: false }),
entry: ['src/index.ts'],
external: [],
minify: 'terser',
dts: true,
splitting: true,
metafile: true,
}));
Loading

0 comments on commit 6e2f99f

Please sign in to comment.