Skip to content

Commit

Permalink
feat: refactor error message api-probe communication (#395)
Browse files Browse the repository at this point in the history
  • Loading branch information
alexey-yarmosh authored Jul 19, 2023
1 parent 944801d commit dcaf37f
Show file tree
Hide file tree
Showing 7 changed files with 24 additions and 93 deletions.
3 changes: 0 additions & 3 deletions src/lib/ws/gateway.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import { handleStatsReport } from '../../probe/handler/stats.js';
import { scopedLogger } from '../logger.js';
import { getWsServer, PROBES_NAMESPACE } from './server.js';
import { probeMetadata } from './middleware/probe-metadata.js';
import { verifyIpLimit } from './helper/probe-ip-limit.js';
import { errorHandler } from './helper/error-handler.js';
import { subscribeWithHandler } from './helper/subscribe-handler.js';

Expand All @@ -22,8 +21,6 @@ io
.of(PROBES_NAMESPACE)
.use(probeMetadata)
.on('connect', errorHandler(async (socket: Socket) => {
await verifyIpLimit(socket);

const probe = socket.data['probe'] as Probe;
socket.emit('api:connect:location', probe.location);
logger.info(`ws client ${socket.id} connected from ${probe.location.city}, ${probe.location.country} [${probe.ipAddress} - ${probe.location.network}]`);
Expand Down
5 changes: 0 additions & 5 deletions src/lib/ws/helper/error-handler.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import type { Socket } from 'socket.io';
import getProbeIp from '../../get-probe-ip.js';
import { scopedLogger } from '../../logger.js';
import { WsError } from '../ws-error.js';

const logger = scopedLogger('ws:error');

Expand All @@ -27,10 +26,6 @@ export const errorHandler = (next: NextArgument) => (socket: Socket, mwNext?: (e
logger.info(`disconnecting client ${socket.id} for (${reason}) [${clientIp}]`);
logger.debug(error);

if (error instanceof WsError) {
socket.emit('api:error', error.toJson());
}

if (mwNext) {
mwNext(error);
}
Expand Down
22 changes: 5 additions & 17 deletions src/lib/ws/helper/probe-ip-limit.ts
Original file line number Diff line number Diff line change
@@ -1,32 +1,20 @@
import * as process from 'node:process';
import type { Socket } from 'socket.io';
import type { Probe } from '../../../probe/types.js';
import { WsError } from '../ws-error.js';
import { fetchSockets } from '../server.js';
import { scopedLogger } from '../../logger.js';
import { InternalError } from '../../internal-error.js';

const logger = scopedLogger('ws:limit');

export const verifyIpLimit = async (socket: Socket): Promise<void> => {
export const verifyIpLimit = async (ip: string, socketId: string): Promise<void> => {
if (process.env['FAKE_PROBE_IP']) {
return;
}

const probe = socket.data['probe'] as Probe;

const socketList = await fetchSockets();
const previousSocket = socketList.find(s => s.data.probe.ipAddress === probe.ipAddress && s.id !== socket.id);
const previousSocket = socketList.find(s => s.data.probe.ipAddress === ip && s.id !== socketId);

if (previousSocket) {
logger.info(`ws client ${socket.id} has reached the concurrent IP limit.`, { message: previousSocket.data.probe.ipAddress });
throw new WsError(
'IP Limit',
{
code: 'ip_limit',
socketId: socket.id,
probe,
ipAddress: probe.ipAddress,
},
);
logger.info(`ws client ${socketId} has reached the concurrent IP limit.`, { message: previousSocket.data.probe.ipAddress });
throw new InternalError('ip limit', true);
}
};
1 change: 0 additions & 1 deletion src/lib/ws/middleware/probe-metadata.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ export const probeMetadata = errorHandler(async (socket: Socket, next: (error?:
}

throw new WsError(message, {
socketId: socket.id,
ipAddress: clientIp ?? '',
});
}
Expand Down
27 changes: 4 additions & 23 deletions src/lib/ws/ws-error.ts
Original file line number Diff line number Diff line change
@@ -1,31 +1,12 @@
import type { Probe } from '../../probe/types.js';

type Info = {
socketId: string;
type Data = {
ipAddress: string;
// A string used for message parsing
code?: string;
probe?: Probe;
cause?: unknown;
};

type JsonResponse = {
message: string;
info: Info;
};

export class WsError extends Error {
info: Info;
constructor (message: string, info: Info) {
data: Data;
constructor (message: string, data: Data) {
super(message);
this.info = info;
}

toJson (): JsonResponse {
return {
message: this.message,
info: this.info,
};
this.data = data;
}
}

Expand Down
3 changes: 3 additions & 0 deletions src/probe/builder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import { createGeoipClient } from '../lib/geoip/client.js';
import getProbeIp from '../lib/get-probe-ip.js';
import { getRegion } from '../lib/ip-ranges.js';
import type { Probe, ProbeLocation, Tag } from './types.js';
import { verifyIpLimit } from '../lib/ws/helper/probe-ip-limit.js';

const fakeIpForDebug = () => {
return _.sample([
Expand Down Expand Up @@ -62,6 +63,8 @@ export const buildProbe = async (socket: Socket): Promise<Probe> => {
throw new Error(`couldn't detect probe location for ip ${clientIp}`);
}

await verifyIpLimit(clientIp, socket.id);

const location = getLocation(ipInfo);

const tags = getTags(clientIp);
Expand Down
56 changes: 12 additions & 44 deletions test/tests/unit/ws/error-handler.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,43 +36,8 @@ describe('ws error', () => {
sandbox.restore();
});

describe('connect', () => {
it('should catch error and disconnect socket', async () => {
const socket = new MockSocket('abc') as BundledMockSocket;

const testMethod = async (socket: Socket): Promise<void> => {
// Prevent unused variable err
socket.emit('connect', '');
throw new Error('abc');
};

errorHandler(testMethod)(socket as Socket);
await sandbox.clock.nextAsync();

expect(socket.isConnected).to.equal(false);
});

it('should catch error and emit api:error event ', async () => {
const socket = new MockSocket('abc') as BundledMockSocket;

const testMethod = async (socket: Socket): Promise<void> => {
// Prevent unused variable err
socket.emit('connect', '');
throw new WsError('abc', { socketId: socket.id, ipAddress: '' });
};

errorHandler(testMethod)(socket as Socket);
await sandbox.clock.nextAsync();

const storeError = socket.store.find(m => m.event === 'api:error');
expect(socket.isConnected).to.equal(false);
expect(storeError).to.exist;
expect(storeError?.payload?.message).to.equal('abc');
});
});

describe('middleware', () => {
it('should catch error and execute cb', async () => {
describe('ws error handler', () => {
it('should catch Error and execute cb', async () => {
const socket = new MockSocket('abc') as BundledMockSocket;
let cbError: Error | null = null;

Expand All @@ -87,36 +52,39 @@ describe('ws error', () => {
};

errorHandler(testMethod)(socket as Socket, testCb);

expect(socket.isConnected).to.equal(true);
await sandbox.clock.nextAsync();

const apiError = socket.store.find(m => m.event === 'api:error');
expect(socket.isConnected).to.equal(false);
expect(cbError).to.not.be.null;
expect(cbError).to.be.instanceof(Error);
expect(apiError).to.not.exist;
expect(cbError!.toString()).to.equal('Error: abc');
});

it('should catch error, execute cb and emit api:error event', async () => {
it('should catch WsError and execute cb', async () => {
const socket = new MockSocket('abc') as BundledMockSocket;
let cbError: Error | null = null;

const testMethod = async (socket: Socket): Promise<void> => {
// Prevent unused variable err
socket.emit('connect', '');
throw new WsError('vpn detected', { socketId: socket.id, ipAddress: '' });
throw new WsError('vpn detected', { ipAddress: '' });
};

const testCb = (error: Error) => {
cbError = error;
};

errorHandler(testMethod)(socket as Socket, testCb);

expect(socket.isConnected).to.equal(true);
await sandbox.clock.nextAsync();

const apiError = socket.store.find(m => m.event === 'api:error');
expect(socket.isConnected).to.equal(false);
expect(cbError).to.not.be.null;
expect(cbError).to.be.instanceof(WsError);
expect(apiError).to.exist;
expect(apiError?.payload.message).to.equal('vpn detected');
expect(cbError!.toString()).to.equal('Error: vpn detected');
});
});
});

0 comments on commit dcaf37f

Please sign in to comment.