Skip to content

Commit

Permalink
feat: add periodic ip limit checks
Browse files Browse the repository at this point in the history
  • Loading branch information
alexey-yarmosh committed Apr 4, 2024
1 parent f447049 commit ce6b599
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 13 deletions.
4 changes: 3 additions & 1 deletion src/lib/server.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import type { Server } from 'node:http';
import { initRedisClient } from './redis/client.js';
import { adoptedProbes, initWsServer } from './ws/server.js';
import { adoptedProbes, probeIpLimit, initWsServer } from './ws/server.js';
import { getMetricsAgent } from './metrics.js';
import { populateMemList as populateMemMalwareList } from './malware/client.js';
import { populateMemList as populateMemIpRangesList } from './ip-ranges.js';
Expand Down Expand Up @@ -33,6 +33,8 @@ export const createServer = async (): Promise<Server> => {
await auth.syncTokens();
auth.scheduleSync();

probeIpLimit.scheduleSync();

reconnectProbes();

const { getWsServer } = await import('./ws/server.js');
Expand Down
63 changes: 53 additions & 10 deletions src/lib/ws/helper/probe-ip-limit.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,62 @@
import { fetchProbes } from '../server.js';
import _ from 'lodash';
import config from 'config';
import type { fetchProbes as serverFetchProbes, fetchRawSockets as serverFetchRawSockets } from '../server.js';
import { scopedLogger } from '../../logger.js';
import { ProbeError } from '../../probe-error.js';

const numberOfProcesses = config.get<number>('server.processes');

const logger = scopedLogger('ws:limit');

export const verifyIpLimit = async (ip: string, socketId: string): Promise<void> => {
if (process.env['FAKE_PROBE_IP'] || process.env['TEST_MODE'] === 'unit') {
return;
export class ProbeIpLimit {
private timer: NodeJS.Timeout | undefined;

constructor (
private readonly fetchProbes: typeof serverFetchProbes,
private readonly fetchRawSockets: typeof serverFetchRawSockets,
) {}

scheduleSync () {
clearTimeout(this.timer);

this.timer = setTimeout(() => {
this.syncIpLimit()
.finally(() => this.scheduleSync())
.catch(error => logger.error(error));
}, 60_000 * 2 * numberOfProcesses * Math.random()).unref();
}

const probes = await fetchProbes({ allowStale: false });
const previousSocket = probes.find(p => p.ipAddress === ip && p.client !== socketId);
async syncIpLimit () {
const sockets = await this.fetchRawSockets();
// Sorting sockets by id, so all workers will treat the same socket as "first".
const sortedSockets = _.sortBy(sockets, [ 'id' ]);

const uniqIpToSocketId = new Map<string, string>();

for (const socket of sortedSockets) {
const prevSocketId = uniqIpToSocketId.get(socket.data.probe.ipAddress);

if (previousSocket) {
logger.info(`ws client ${socketId} has reached the concurrent IP limit.`, { message: previousSocket.ipAddress });
throw new ProbeError('ip limit');
if (prevSocketId && prevSocketId !== socket.id) {
logger.warn(`Probe ip duplication occured (${socket.data.probe.ipAddress}). First socket id: ${prevSocketId}, socket id to disconnect: ${socket.id}`);
socket.disconnect();
} else {
uniqIpToSocketId.set(socket.data.probe.ipAddress, socket.id);
}
}
}
};

async verifyIpLimit (ip: string, socketId: string): Promise<void> {
if (process.env['FAKE_PROBE_IP'] || process.env['TEST_MODE'] === 'unit') {
return;
}

const probes = await this.fetchProbes({ allowStale: false });
const previousProbe = probes.find(p => p.ipAddress === ip && p.client !== socketId);

if (previousProbe) {
logger.info(`ws client ${socketId} has reached the concurrent IP limit.`, { message: previousProbe.ipAddress });
throw new ProbeError('ip limit');
}
}
}

3 changes: 3 additions & 0 deletions src/lib/ws/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { getRedisClient } from '../redis/client.js';
import { SyncedProbeList } from './synced-probe-list.js';
import { client } from '../sql/client.js';
import { AdoptedProbes } from '../adopted-probes.js';
import { ProbeIpLimit } from './helper/probe-ip-limit.js';

export type SocketData = {
probe: Probe;
Expand Down Expand Up @@ -90,3 +91,5 @@ export const fetchRawProbes = async (): Promise<Probe[]> => {
};

export const adoptedProbes = new AdoptedProbes(client, fetchRawProbes);

export const probeIpLimit = new ProbeIpLimit(fetchProbes, fetchRawSockets);
4 changes: 2 additions & 2 deletions src/probe/builder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import type GeoipClient from '../lib/geoip/client.js';
import getProbeIp from '../lib/get-probe-ip.js';
import { getRegion } from '../lib/ip-ranges.js';
import type { Probe, ProbeLocation, Tag } from './types.js';
import { verifyIpLimit } from '../lib/ws/helper/probe-ip-limit.js';
import { probeIpLimit } from '../lib/ws/server.js';
import { fakeLookup } from '../lib/geoip/fake-client.js';

let geoipClient: GeoipClient;
Expand Down Expand Up @@ -62,7 +62,7 @@ export const buildProbe = async (socket: Socket): Promise<Probe> => {
throw new Error(`couldn't detect probe location for ip ${ip}`);
}

await verifyIpLimit(ip, socket.id);
await probeIpLimit.verifyIpLimit(ip, socket.id);

const location = getLocation(ipInfo);

Expand Down

0 comments on commit ce6b599

Please sign in to comment.