Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: update rate limit logic #371

Merged
merged 18 commits into from
Jun 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ name: Node CI

on:
push:
branches: [ master ]
branches: [ "*" ]
pull_request:
branches: [ master ]
branches: [ "*" ]

jobs:
build:
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<p align="center">
<b>Better understand your network routing, fix anycast issues, monitor your CDN and DNS performance,<br/>
do uptime monitoring and build your own network tools for personal or public use. </b>
<br />
<br/>
<br />
</p>

Expand Down
5 changes: 3 additions & 2 deletions config/default.cjs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ module.exports = {
},
geoip: {
cache: {
ttl: 3 * 24 * 60 * 60 * 1000, // 24hrs
ttl: 3 * 24 * 60 * 60 * 1000, // 3 days
},
},
maxmind: {
Expand All @@ -29,7 +29,8 @@ module.exports = {
fetchSocketsCacheTTL: 1000,
},
measurement: {
rateLimit: 300,
rateLimit: 100000,
rateLimitReset: 3600,
maxInProgressProbes: 5,
// Timeout after which measurement will be marked as finished even if not all probes respond
timeout: 30, // 30 seconds
Expand Down
5 changes: 5 additions & 0 deletions config/test.cjs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ module.exports = {
admin: {
key: 'admin',
},
geoip: {
cache: {
ttl: 1, // 1 ms ttl here to disable redis cache in tests
},
},
ws: {
fetchSocketsCacheTTL: 0,
},
Expand Down
2 changes: 1 addition & 1 deletion src/lib/cache/redis-cache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ export default class RedisCache implements CacheInterface {
constructor (private readonly redis: RedisClient) {}

async set (key: string, value: unknown, ttl?: number): Promise<void> {
await this.redis.set(this.buildCacheKey(key), JSON.stringify(value), { EX: ttl ? ttl / 1000 : 0 });
await this.redis.set(this.buildCacheKey(key), JSON.stringify(value), { PX: ttl ?? 0 });
}

async get<T = unknown> (key: string): Promise<T | undefined> {
Expand Down
2 changes: 1 addition & 1 deletion src/lib/geoip/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ export default class GeoipClient {

const resultsWithCities = results.filter(s => s.city);

if (resultsWithCities.length < 2 && resultsWithCities[0]?.provider === 'fastly') {
if (resultsWithCities.length === 0 || (resultsWithCities.length === 1 && resultsWithCities[0]?.provider === 'fastly')) {
throw new InternalError(`unresolvable geoip: ${addr}`, true);
}

Expand Down
5 changes: 3 additions & 2 deletions src/lib/geoip/providers/maxmind.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ export const isMaxmindError = (error: unknown): error is WebServiceClientError =

const query = async (addr: string, retryCounter = 0): Promise<City> => {
try {
return await client.city(addr);
const city = await client.city(addr);
return city;
} catch (error: unknown) {
if (isMaxmindError(error)) {
if (error.code === 'SERVER_ERROR' && retryCounter < 3) {
Expand All @@ -27,7 +28,7 @@ const query = async (addr: string, retryCounter = 0): Promise<City> => {
}
}

throw new Error('no maxmind data');
throw error;
}
};

Expand Down
35 changes: 21 additions & 14 deletions src/lib/http/middleware/ratelimit.ts
Original file line number Diff line number Diff line change
@@ -1,33 +1,40 @@
import type { Context, Next } from 'koa';
import requestIp from 'request-ip';
import type { RateLimiterRes } from 'rate-limiter-flexible';

import rateLimiter from '../../ratelimiter.js';
import createHttpError from 'http-errors';
import rateLimiter, { defaultState } from '../../ratelimiter.js';
import type { MeasurementRequest } from '../../../measurement/types.js';

const setResponseHeaders = (ctx: Context, response: RateLimiterRes) => {
ctx.set('X-RateLimit-Reset', `${Math.round(response.msBeforeNext / 1000)}`);
ctx.set('X-RateLimit-Limit', `${rateLimiter.points}`);
ctx.set('X-RateLimit-Remaining', `${response.remainingPoints}`);
};

const methodsWhitelist = new Set([ 'GET', 'HEAD', 'OPTIONS' ]);

export const rateLimitHandler = () => async (ctx: Context, next: Next) => {
const { method, isAdmin } = ctx;
const { isAdmin } = ctx;
const clientIp = requestIp.getClientIp(ctx.req) ?? '';
const request = ctx.request.body as MeasurementRequest;
const limit = request.locations.some(l => l.limit) ? request.locations.reduce((sum, { limit = 1 }) => sum + limit, 0) : request.limit;

if (methodsWhitelist.has(method) || isAdmin) {
if (isAdmin) {
return next();
}

try {
const response = await rateLimiter.consume(requestIp.getClientIp(ctx.req) ?? '');
setResponseHeaders(ctx, response);
} catch (error: unknown) { // Ts requires 'unknown' for errors
setResponseHeaders(ctx, error as RateLimiterRes);
ctx.status = 429;
ctx.body = 'Too Many Requests';
return;
const currentState = await rateLimiter.get(clientIp) ?? defaultState as RateLimiterRes;

if (currentState.remainingPoints < limit) {
setResponseHeaders(ctx, currentState);
throw createHttpError(429, 'Too Many Probes Requested', { type: 'too_many_probes' });
}

await next();
const response = ctx.response.body as object & { probesCount?: number };

if (!('probesCount' in response) || typeof response.probesCount !== 'number') {
throw new Error('Missing probesCount field in response object');
}

const newState = await rateLimiter.penalty(clientIp, response.probesCount);
setResponseHeaders(ctx, newState);
};
4 changes: 1 addition & 3 deletions src/lib/http/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import { registerGetMeasurementRoute } from '../../measurement/route/get-measure
import { registerCreateMeasurementRoute } from '../../measurement/route/create-measurement.js';
import { registerHealthRoute } from '../../health/route/get.js';
import { errorHandler } from './error-handler.js';
import { rateLimitHandler } from './middleware/ratelimit.js';
import { errorHandlerMw } from './middleware/error-handler.js';
import { corsHandler } from './middleware/cors.js';
import { isAdminMw } from './middleware/is-admin.js';
Expand All @@ -40,8 +39,7 @@ rootRouter.get('/', (ctx) => {
const apiRouter = new Router({ strict: true, sensitive: true });

apiRouter.prefix('/v1')
.use(isAdminMw)
.use(rateLimitHandler());
.use(isAdminMw);

// POST /measurements
registerCreateMeasurementRoute(apiRouter);
Expand Down
11 changes: 9 additions & 2 deletions src/lib/ratelimiter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,18 @@ import { createRedisClient } from './redis/client.js';

const redisClient = await createRedisClient({ legacyMode: true });

export const rateLimiter = new RateLimiterRedis({
export const defaultState = {
remainingPoints: config.get<number>('measurement.rateLimit'),
msBeforeNext: config.get<number>('measurement.rateLimitReset') * 1000,
consumedPoints: 0,
isFirstInDuration: true,
};

const rateLimiter = new RateLimiterRedis({
storeClient: redisClient,
keyPrefix: 'rate',
points: config.get<number>('measurement.rateLimit'),
duration: 60,
duration: config.get<number>('measurement.rateLimitReset'),
});

export default rateLimiter;
8 changes: 4 additions & 4 deletions src/lib/redis/scripts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,17 @@ import type { MeasurementRecord, MeasurementResultMessage } from '../../measurem
type CountScript = {
NUMBER_OF_KEYS: number;
SCRIPT: string;
transformArguments (this: void, key: string): Array<string>;
transformReply (this: void, reply: number): number;
transformArguments (key: string): Array<string>;
transformReply (reply: number): number;
} & {
SHA1: string;
};

export type RecordResultScript = {
NUMBER_OF_KEYS: number;
SCRIPT: string;
transformArguments (this: void, measurementId: string, testId: string, data: MeasurementResultMessage['result']): string[];
transformReply (this: void, reply: string): MeasurementRecord | null;
transformArguments (measurementId: string, testId: string, data: MeasurementResultMessage['result']): string[];
transformReply (reply: string): MeasurementRecord | null;
} & {
SHA1: string;
};
Expand Down
2 changes: 1 addition & 1 deletion src/lib/ws/helper/error-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ type NextArgument = NextConnectArgument | NextMwArgument;
const isError = (error: unknown): error is Error => Boolean(error as Error['message']);

// eslint-disable-next-line @typescript-eslint/no-explicit-any
export const errorHandler = (next: NextArgument) => (socket: Socket, mwNext?: (error?: any) => void | undefined) => {
export const errorHandler = (next: NextArgument) => (socket: Socket, mwNext?: (error?: any) => void) => {
next(socket, mwNext!).catch((error) => { // eslint-disable-line @typescript-eslint/no-non-null-assertion
const clientIp = getProbeIp(socket.request) ?? '';
const reason = isError(error) ? error.message : 'unknown';
Expand Down
4 changes: 2 additions & 2 deletions src/lib/ws/helper/reconnect-probes.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import { fetchSockets } from '../server.js';
import type { ThrottledFetchSockets } from '../server';

const TIME_TO_RECONNECT_PROBES = 60_000;

export const reconnectProbes = async () => {
export const reconnectProbes = async (fetchSockets: ThrottledFetchSockets) => { // passing fetchSockets in arguments to avoid cycle dependency
const sockets = await fetchSockets();

for (const socket of sockets) {
Expand Down
4 changes: 3 additions & 1 deletion src/lib/ws/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ export const initWsServer = async () => {
);

setTimeout(() => {
reconnectProbes().catch(error => logger.error(error));
reconnectProbes(fetchSockets).catch(error => logger.error(error));
}, TIME_UNTIL_VM_BECOMES_HEALTHY);
};

Expand All @@ -67,3 +67,5 @@ export const fetchSockets = async () => {

return sockets;
};

export type ThrottledFetchSockets = typeof fetchSockets;
3 changes: 2 additions & 1 deletion src/measurement/route/create-measurement.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import type { MeasurementRequest } from '../types.js';
import { bodyParser } from '../../lib/http/middleware/body-parser.js';
import { validate } from '../../lib/http/middleware/validate.js';
import { schema } from '../schema/global-schema.js';
import { rateLimitHandler } from '../../lib/http/middleware/ratelimit.js';

const hostConfig = config.get<string>('server.host');
const runner = getMeasurementRunner();
Expand All @@ -24,5 +25,5 @@ const handle = async (ctx: Context): Promise<void> => {
};

export const registerCreateMeasurementRoute = (router: Router): void => {
router.post('/measurements', bodyParser(), validate(schema), handle);
router.post('/measurements', bodyParser(), validate(schema), rateLimitHandler(), handle);
};
1 change: 1 addition & 0 deletions src/measurement/schema/location-schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,6 @@ export const schema = Joi.array().items(Joi.object().keys({
limit: Joi.number().min(1).max(measurementConfig.limits.location).when(Joi.ref('/limit'), {
is: Joi.exist(),
then: Joi.forbidden().messages({ 'any.unknown': 'limit per location is not allowed when a global limit is set' }),
otherwise: Joi.number().default(1),
}),
}).or('continent', 'region', 'country', 'state', 'city', 'network', 'asn', 'magic', 'tags')).default(GLOBAL_DEFAULTS.locations);
26 changes: 14 additions & 12 deletions src/probe/builder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,20 @@ import getProbeIp from '../lib/get-probe-ip.js';
import { getRegion } from '../lib/ip-ranges.js';
import type { Probe, ProbeLocation, Tag } from './types.js';

const fakeIpForDebug = () => _.sample([
'18.200.0.1', // aws-eu-west-1
'34.140.0.10', // gcp-europe-west1
'95.155.94.127',
'65.49.22.66',
'185.229.226.83',
'51.158.22.211',
'131.255.7.26',
'213.136.174.80',
'94.214.253.78',
'79.205.97.254',
]);
const fakeIpForDebug = () => {
return _.sample([
'18.200.0.1', // aws-eu-west-1
'34.140.0.10', // gcp-europe-west1
'95.155.94.127',
'65.49.22.66',
'185.229.226.83',
'51.158.22.211',
'131.255.7.26',
'213.136.174.80',
'94.214.253.78',
'79.205.97.254',
]);
};

const geoipClient = createGeoipClient();

Expand Down
2 changes: 1 addition & 1 deletion src/probe/handler/dns.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import type { Probe } from '../../probe/types.js';
import type { Probe } from '../types.js';

export const handleDnsUpdate = (probe: Probe) => (list: string[]): void => {
probe.resolvers = list;
Expand Down
5 changes: 1 addition & 4 deletions src/probe/handler/stats.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
import type {
Probe,
ProbeStats,
} from '../../probe/types.js';
import type { Probe, ProbeStats } from '../types.js';

export const handleStatsReport = (probe: Probe) => (report: ProbeStats): void => {
probe.stats = report;
Expand Down
2 changes: 1 addition & 1 deletion src/probe/handler/status.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import type { Probe } from '../../probe/types.js';
import type { Probe } from '../types.js';

export const handleStatusUpdate = (probe: Probe) => (status: Probe['status']): void => {
probe.status = status;
Expand Down
5 changes: 0 additions & 5 deletions test/mocks/redis-cache.ts

This file was deleted.

2 changes: 0 additions & 2 deletions test/tests/integration/measurement/create-measurement.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import request, { type SuperTest, type Test } from 'supertest';
import * as td from 'testdouble';
import nock from 'nock';
import type { Socket } from 'socket.io-client';
import RedisCacheMock from '../../../mocks/redis-cache.js';

const nockMocks = JSON.parse(fs.readFileSync('./test/mocks/nock-geoip.json').toString()) as Record<string, any>;

Expand All @@ -16,7 +15,6 @@ describe('Create measurement', () => {
let requestAgent: SuperTest<Test>;

before(async () => {
await td.replaceEsm('../../../../src/lib/cache/redis-cache.ts', {}, RedisCacheMock);
await td.replaceEsm('../../../../src/lib/ip-ranges.ts', { getRegion: () => 'gcp-us-west4', populateMemList: () => Promise.resolve() });
({ getTestServer, addFakeProbe, deleteFakeProbe } = await import('../../../utils/server.js'));
const app = await getTestServer();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import nock from 'nock';
import type { Socket } from 'socket.io-client';
import * as sinon from 'sinon';
import { expect } from 'chai';
import RedisCacheMock from '../../../mocks/redis-cache.js';

const nockMocks = JSON.parse(fs.readFileSync('./test/mocks/nock-geoip.json').toString()) as Record<string, any>;

Expand All @@ -22,7 +21,6 @@ describe('Create measurement request', () => {

before(async () => {
await td.replaceEsm('crypto-random-string', {}, cryptoRandomString);
await td.replaceEsm('../../../../src/lib/cache/redis-cache.ts', {}, RedisCacheMock);
await td.replaceEsm('../../../../src/lib/ip-ranges.ts', { getRegion: () => 'gcp-us-west4', populateMemList: () => Promise.resolve() });
({ getTestServer, addFakeProbe, deleteFakeProbe } = await import('../../../utils/server.js'));
const app = await getTestServer();
Expand Down
2 changes: 0 additions & 2 deletions test/tests/integration/measurement/timeout-result.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import nock from 'nock';
import type { Socket } from 'socket.io-client';
import * as sinon from 'sinon';
import { expect } from 'chai';
import RedisCacheMock from '../../../mocks/redis-cache.js';

const nockMocks = JSON.parse(fs.readFileSync('./test/mocks/nock-geoip.json').toString()) as Record<string, any>;

Expand All @@ -23,7 +22,6 @@ describe('Timeout results', () => {
sandbox = sinon.createSandbox({ useFakeTimers: true });
await td.replaceEsm('@jcoreio/async-throttle', null, (f: any) => f);
await td.replaceEsm('crypto-random-string', {}, cryptoRandomString);
await td.replaceEsm('../../../../src/lib/cache/redis-cache.ts', {}, RedisCacheMock);
await td.replaceEsm('../../../../src/lib/ip-ranges.ts', { getRegion: () => 'gcp-us-west4', populateMemList: () => Promise.resolve() });
({ getTestServer, addFakeProbe, deleteFakeProbe } = await import('../../../utils/server.js'));
const app = await getTestServer();
Expand Down
Loading