Skip to content

Commit

Permalink
Add basic sharding support
Browse files Browse the repository at this point in the history
  • Loading branch information
rhansen committed Mar 25, 2022
1 parent 639c424 commit b260dd5
Show file tree
Hide file tree
Showing 3 changed files with 241 additions and 45 deletions.
25 changes: 25 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,31 @@ Example:
},
```
#### Horizontally scaled TURN servers
To spread load across multiple TURN services, you can enable sharding:
```json
"ep_webrtc": {
"iceServers": [
{"urls": ["stun:shard0.example.com", "turn:shard0.example.com"]},
{"urls": ["stun:shard1.example.com", "turn:shard1.example.com"]},
{"urls": ["stun:shard2.example.com", "turn:shard2.example.com"]},
{"urls": ["stun:shard3.example.com", "turn:shard3.example.com"]},
],
"shardIceServers": true
},
```
When `shardIceServers` is `false` (the default), all clients receive all
RTCIceServer objects in the `iceServers` list and it's up to the browser to
figure out how to use them to connect with peers. When `true`, this plugin
assigns a single entry from `iceServers` to each pad and gives out only that
assigned entry to users that connect to the pad. The intention is to provide a
better guarantee of load distribution across a set of TURN servers, and to avoid
an unnecessary network hop when both peers are configured to force the use of
TURN.

### Microphone Settings

The microphone can be configured by setting `audio.constraints` to any [audio
Expand Down
131 changes: 86 additions & 45 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,10 @@ const defaultSettings = {
iceServers: [{urls: ['stun:stun.l.google.com:19302']}],
listenClass: null,
moreInfoUrl: {},
shardIceServers: false,
};
let settings = null;
let shardIceServersHmacSecret;
let socketio;

const addContextToError = (err, pfx) => {
Expand Down Expand Up @@ -139,54 +141,90 @@ const fetchJson = async (url, opts = {}) => {
return await res.json();
};

exports.clientVars = async (hookName, {clientVars: {userId: authorId}}) => ({ep_webrtc: {
...settings,
iceServers: await Promise.all(settings.iceServers.map(async (server) => {
switch (server.credentialType) {
case 'coturn ephemeral password': {
const {lifetime = 60 * 60 * 12 /* seconds */} = server;
const username = `${Math.floor(Date.now() / 1000) + lifetime}:${authorId}`;
const hmac = crypto.createHmac('sha1', server.credential);
hmac.update(username);
const credential = hmac.digest('base64');
return {urls: server.urls, username, credential};
}
case 'xirsys ephemeral credentials': {
const {
url,
username,
credential,
lifetime: expire = 12 * 60 * 60, // seconds
method = 'PUT',
headers: h = {},
jsonBody: b = {},
} = server;
// Can't set default values for the Content-Type and Authorization headers by using an
// object literal with spread (e.g., `{'content-type': 'foo', ...h}`) because the Headers
// constructor uses `.append()` internally instead of `.set()`. This matters if a header is
// repeated multiple times by using different mixes of upper- and lower-case letters.
const headers = new globalThis.Headers(h);
if (!headers.has('content-type')) headers.set('content-type', 'application/json');
if (username && !headers.has('authorization')) {
headers.set('authorization',
`Basic ${Buffer.from(`${username}:${credential}`).toString('base64')}`);
exports.clientVars = async (hookName, {clientVars: {userId: authorId}, pad: {id: padId}}) => {
let iceServers = settings.iceServers;
if (settings.shardIceServers && iceServers.length > 1) {
// We could simply hash the pad ID, but we include some randomness to make it slightly harder
// for a malicious user to overload a particular shard by picking pad IDs that all use the same
// shard. (The randomness forces malicious users to try multiple pad IDs and keep the ones that
// use the same shard.) The randomness also helps avoid chronic imbalance due to unlucky
// assignments; generating a new secret will reassign the shards.
//
// The secret is generated at startup, so all users visiting the same pad will get the same HMAC
// value (and thus the same shard) until Etherpad is restarted. Users that connect after
// Etherpad restarts might be assigned a different shard from the users on the pad that received
// their clientVars before Etherpad restarted. This doesn't affect protocol correctness, but it
// might result in three network hops instead of two: client A sends to TURN A which relays to
// TURN B which relays to client B, instead of client A sends to TURN AB which relays to
// localhost (TURN AB) which relays to client B. This should be rare because it will only happen
// if all of the following are true:
//
// * Both users have configured their browsers to force relay.
// * One user loaded the pad before Etherpad restarted and the other loaded after.
// * The new random value caused the pad to be assigned to a different shard.
//
// TODO: Convey ICE servers via a message that is sent every time a user connects. (CLIENT_VARS
// is only sent on initial connection, so if a client reconnects due to Etherpad restarting, a
// new CLIENT_VARS is not sent.) This will allow the server to select a different shard for a
// pad when it restarts, and all clients (old and new) will use the new shard for new sessions.
//
// TODO: Select the shard for the pad when the first user joins the pad and forget that
// selection once all users have left. This would enable alternative load balancing schemes such
// as true random or least loaded.
const hmac = crypto.createHmac('sha256', shardIceServersHmacSecret);
hmac.update(padId);
const i = Number(BigInt(`0x${hmac.digest('hex')}`) % BigInt(iceServers.length));
iceServers = iceServers.slice(i, i + 1);
}
return {ep_webrtc: {
...settings,
iceServers: await Promise.all(iceServers.map(async (server) => {
switch (server.credentialType) {
case 'coturn ephemeral password': {
const {lifetime = 60 * 60 * 12 /* seconds */} = server;
const username = `${Math.floor(Date.now() / 1000) + lifetime}:${authorId}`;
const hmac = crypto.createHmac('sha1', server.credential);
hmac.update(username);
const credential = hmac.digest('base64');
return {urls: server.urls, username, credential};
}
const body =
JSON.stringify(b && typeof b === 'object' ? {format: 'urls', expire, ...b} : b);
try {
const {v, s} = await fetchJson(url, {method, headers, body});
if (s !== 'ok') throw new Error(`API error: ${v}`);
return v.iceServers;
} catch (err) {
const newErr = addContextToError(err, 'failed to get TURN credentials: ');
logger.error(newErr.stack || newErr.toString());
throw newErr;
case 'xirsys ephemeral credentials': {
const {
url,
username,
credential,
lifetime: expire = 12 * 60 * 60, // seconds
method = 'PUT',
headers: h = {},
jsonBody: b = {},
} = server;
// Can't set default values for the Content-Type and Authorization headers by using an
// object literal with spread (e.g., `{'content-type': 'foo', ...h}`) because the Headers
// constructor uses `.append()` internally instead of `.set()`. This matters if a header
// is repeated multiple times by using different mixes of upper- and lower-case letters.
const headers = new globalThis.Headers(h);
if (!headers.has('content-type')) headers.set('content-type', 'application/json');
if (username && !headers.has('authorization')) {
headers.set('authorization',
`Basic ${Buffer.from(`${username}:${credential}`).toString('base64')}`);
}
const body =
JSON.stringify(b && typeof b === 'object' ? {format: 'urls', expire, ...b} : b);
try {
const {v, s} = await fetchJson(url, {method, headers, body});
if (s !== 'ok') throw new Error(`API error: ${v}`);
return v.iceServers;
} catch (err) {
const newErr = addContextToError(err, 'failed to get TURN credentials: ');
logger.error(newErr.stack || newErr.toString());
throw newErr;
}
}
default: return server;
}
default: return server;
}
})),
}});
})),
}};
};

exports.handleMessage = async (hookName, {message, socket}) => {
if (message.type === 'COLLABROOM' && message.data.type === 'RTC_MESSAGE') {
Expand Down Expand Up @@ -243,6 +281,9 @@ exports.loadSettings = async (hookName, {settings: {ep_webrtc: s = {}}}) => {
}
return false;
})();
if (settings.shardIceServers && settings.iceServers.length > 1) {
shardIceServersHmacSecret = await util.promisify(crypto.randomBytes.bind(crypto))(16);
}
logger.info('configured:', util.inspect({
...settings,
iceServers: settings.iceServers.map((s) => s.credential ? {...s, credential: '*****'} : s),
Expand Down
130 changes: 130 additions & 0 deletions static/tests/backend/specs/sharding.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
'use strict';

const assert = require('assert').strict;
const common = require('ep_etherpad-lite/tests/backend/common');
const init = require('../init');
const plugin = require('../../../../index');
const settings = require('ep_etherpad-lite/node/utils/Settings');

describe(__filename, function () {
let agent;
const backup = {settings: {...settings}};
const iceServers = [...Array(1000).keys()].map((i) => ({urls: [`turn:turn${i}.example.com`]}));

const reload = async (settings = {}) => {
await plugin.loadSettings('loadSettings', {settings: {ep_webrtc: {iceServers, ...settings}}});
};

const getIceServers = async (padId = common.randomString()) => {
while (getIceServers._busy != null) {
await getIceServers._busy;
}
if (++getIceServers._active >= getIceServers._limit) {
getIceServers._busy = new Promise((resolve) => getIceServers._resolve = resolve);
}
try {
const res = await agent.get(`/p/${padId}`).expect(200);
const socket = await common.connect(res);
try {
const {type, data: clientVars} = await common.handshake(socket, padId);
assert.equal(type, 'CLIENT_VARS');
return clientVars.ep_webrtc.iceServers;
} finally {
socket.close();
}
} finally {
if (--getIceServers._active < getIceServers._limit && getIceServers._busy != null) {
getIceServers._resolve();
getIceServers._busy = null;
}
}
};
getIceServers._limit = 5; // Avoid timeouts caused by overload.
getIceServers._active = 0;
getIceServers._resolve = () => {};

before(async function () {
settings.requireAuthentication = false;
agent = await init();
});

after(async function () {
Object.assign(settings, backup.settings);
await plugin.loadSettings('loadSettings', {settings});
});

it('defaults to disabled', async function () {
await reload();
const got = await getIceServers();
assert.deepEqual(got, iceServers);
});

it('explicitly disabled', async function () {
await reload({shardIceServers: false});
const got = await getIceServers();
assert.deepEqual(got, iceServers);
});

it('enabled, zero entries', async function () {
await reload({iceServers: [], shardIceServers: true});
assert.deepEqual(await getIceServers(), []);
});

it('enabled, one entry', async function () {
const entries = [{urls: ['turn:turn.example.com']}];
await reload({iceServers: entries, shardIceServers: true});
assert.deepEqual(await getIceServers(), entries);
});

describe('enabled, multiple entries', function () {
beforeEach(async function () {
await reload({shardIceServers: true});
});

it('only gives one entry to each client', async function () {
const got = await getIceServers();
assert.equal(got.length, 1);
assert(iceServers.some((s) => {
try {
assert.deepEqual(got[0], s);
return true;
} catch (err) {
return false;
}
}));
});

it('same pad gets same entry', async function () {
this.timeout(60000);
const assignments = new Map(await Promise.all([...Array(10).keys()].map(async () => {
const padId = common.randomString();
return [padId, await getIceServers(padId)];
})));
await Promise.all([...assignments].map(async ([padId, want]) => {
const got = await getIceServers(padId);
assert.deepEqual(got, want);
}));
});

it('randomizes assignments on reload', async function () {
this.timeout(60000);
const oldAssignments = new Map(await Promise.all([...Array(10).keys()].map(async () => {
const padId = common.randomString();
return [padId, await getIceServers(padId)];
})));
await reload({shardIceServers: true});
const newAssignments = new Map(await Promise.all(
[...oldAssignments.keys()].map(async (padId) => [padId, await getIceServers(padId)])));
// With 10 pad IDs and 1000 ICE servers, the probability that every new assignment exactly
// matches the old assignment is effectively zero.
assert([...newAssignments].some(([padId, newAssignment]) => {
try {
assert.deepEqual(newAssignment, oldAssignments.get(padId));
return false;
} catch (err) {
return true;
}
}));
});
});
});

0 comments on commit b260dd5

Please sign in to comment.