diff --git a/README.md b/README.md index 63649993..1a622329 100644 --- a/README.md +++ b/README.md @@ -185,6 +185,31 @@ Example: }, ``` +#### Horizontally scaled TURN servers + +To spread load across multiple TURN services, you can enable sharding: + +```json + "ep_webrtc": { + "iceServers": [ + {"urls": ["stun:shard0.example.com", "turn:shard0.example.com"]}, + {"urls": ["stun:shard1.example.com", "turn:shard1.example.com"]}, + {"urls": ["stun:shard2.example.com", "turn:shard2.example.com"]}, + {"urls": ["stun:shard3.example.com", "turn:shard3.example.com"]}, + ], + "shardIceServers": true + }, +``` + +When `shardIceServers` is `false` (the default), all clients receive all +RTCIceServer objects in the `iceServers` list and it's up to the browser to +figure out how to use them to connect with peers. When `true`, this plugin +assigns a single entry from `iceServers` to each pad and gives out only that +assigned entry to users that connect to the pad. The intention is to provide a +better guarantee of load distribution across a set of TURN servers, and to avoid +an unnecessary network hop when both peers are configured to force the use of +TURN. + ### Microphone Settings The microphone can be configured by setting `audio.constraints` to any [audio diff --git a/index.js b/index.js index 631253ff..76a051b0 100644 --- a/index.js +++ b/index.js @@ -49,8 +49,10 @@ const defaultSettings = { iceServers: [{urls: ['stun:stun.l.google.com:19302']}], listenClass: null, moreInfoUrl: {}, + shardIceServers: false, }; let settings = null; +let shardIceServersHmacSecret; let socketio; const addContextToError = (err, pfx) => { @@ -139,54 +141,90 @@ const fetchJson = async (url, opts = {}) => { return await res.json(); }; -exports.clientVars = async (hookName, {clientVars: {userId: authorId}}) => ({ep_webrtc: { - ...settings, - iceServers: await Promise.all(settings.iceServers.map(async (server) => { - switch (server.credentialType) { - case 'coturn ephemeral password': { - const {lifetime = 60 * 60 * 12 /* seconds */} = server; - const username = `${Math.floor(Date.now() / 1000) + lifetime}:${authorId}`; - const hmac = crypto.createHmac('sha1', server.credential); - hmac.update(username); - const credential = hmac.digest('base64'); - return {urls: server.urls, username, credential}; - } - case 'xirsys ephemeral credentials': { - const { - url, - username, - credential, - lifetime: expire = 12 * 60 * 60, // seconds - method = 'PUT', - headers: h = {}, - jsonBody: b = {}, - } = server; - // Can't set default values for the Content-Type and Authorization headers by using an - // object literal with spread (e.g., `{'content-type': 'foo', ...h}`) because the Headers - // constructor uses `.append()` internally instead of `.set()`. This matters if a header is - // repeated multiple times by using different mixes of upper- and lower-case letters. - const headers = new globalThis.Headers(h); - if (!headers.has('content-type')) headers.set('content-type', 'application/json'); - if (username && !headers.has('authorization')) { - headers.set('authorization', - `Basic ${Buffer.from(`${username}:${credential}`).toString('base64')}`); +exports.clientVars = async (hookName, {clientVars: {userId: authorId}, pad: {id: padId}}) => { + let iceServers = settings.iceServers; + if (settings.shardIceServers && iceServers.length > 1) { + // We could simply hash the pad ID, but we include some randomness to make it slightly harder + // for a malicious user to overload a particular shard by picking pad IDs that all use the same + // shard. (The randomness forces malicious users to try multiple pad IDs and keep the ones that + // use the same shard.) The randomness also helps avoid chronic imbalance due to unlucky + // assignments; generating a new secret will reassign the shards. + // + // The secret is generated at startup, so all users visiting the same pad will get the same HMAC + // value (and thus the same shard) until Etherpad is restarted. Users that connect after + // Etherpad restarts might be assigned a different shard from the users on the pad that received + // their clientVars before Etherpad restarted. This doesn't affect protocol correctness, but it + // might result in three network hops instead of two: client A sends to TURN A which relays to + // TURN B which relays to client B, instead of client A sends to TURN AB which relays to + // localhost (TURN AB) which relays to client B. This should be rare because it will only happen + // if all of the following are true: + // + // * Both users have configured their browsers to force relay. + // * One user loaded the pad before Etherpad restarted and the other loaded after. + // * The new random value caused the pad to be assigned to a different shard. + // + // TODO: Convey ICE servers via a message that is sent every time a user connects. (CLIENT_VARS + // is only sent on initial connection, so if a client reconnects due to Etherpad restarting, a + // new CLIENT_VARS is not sent.) This will allow the server to select a different shard for a + // pad when it restarts, and all clients (old and new) will use the new shard for new sessions. + // + // TODO: Select the shard for the pad when the first user joins the pad and forget that + // selection once all users have left. This would enable alternative load balancing schemes such + // as true random or least loaded. + const hmac = crypto.createHmac('sha256', shardIceServersHmacSecret); + hmac.update(padId); + const i = Number(BigInt(`0x${hmac.digest('hex')}`) % BigInt(iceServers.length)); + iceServers = iceServers.slice(i, i + 1); + } + return {ep_webrtc: { + ...settings, + iceServers: await Promise.all(iceServers.map(async (server) => { + switch (server.credentialType) { + case 'coturn ephemeral password': { + const {lifetime = 60 * 60 * 12 /* seconds */} = server; + const username = `${Math.floor(Date.now() / 1000) + lifetime}:${authorId}`; + const hmac = crypto.createHmac('sha1', server.credential); + hmac.update(username); + const credential = hmac.digest('base64'); + return {urls: server.urls, username, credential}; } - const body = - JSON.stringify(b && typeof b === 'object' ? {format: 'urls', expire, ...b} : b); - try { - const {v, s} = await fetchJson(url, {method, headers, body}); - if (s !== 'ok') throw new Error(`API error: ${v}`); - return v.iceServers; - } catch (err) { - const newErr = addContextToError(err, 'failed to get TURN credentials: '); - logger.error(newErr.stack || newErr.toString()); - throw newErr; + case 'xirsys ephemeral credentials': { + const { + url, + username, + credential, + lifetime: expire = 12 * 60 * 60, // seconds + method = 'PUT', + headers: h = {}, + jsonBody: b = {}, + } = server; + // Can't set default values for the Content-Type and Authorization headers by using an + // object literal with spread (e.g., `{'content-type': 'foo', ...h}`) because the Headers + // constructor uses `.append()` internally instead of `.set()`. This matters if a header + // is repeated multiple times by using different mixes of upper- and lower-case letters. + const headers = new globalThis.Headers(h); + if (!headers.has('content-type')) headers.set('content-type', 'application/json'); + if (username && !headers.has('authorization')) { + headers.set('authorization', + `Basic ${Buffer.from(`${username}:${credential}`).toString('base64')}`); + } + const body = + JSON.stringify(b && typeof b === 'object' ? {format: 'urls', expire, ...b} : b); + try { + const {v, s} = await fetchJson(url, {method, headers, body}); + if (s !== 'ok') throw new Error(`API error: ${v}`); + return v.iceServers; + } catch (err) { + const newErr = addContextToError(err, 'failed to get TURN credentials: '); + logger.error(newErr.stack || newErr.toString()); + throw newErr; + } } + default: return server; } - default: return server; - } - })), -}}); + })), + }}; +}; exports.handleMessage = async (hookName, {message, socket}) => { if (message.type === 'COLLABROOM' && message.data.type === 'RTC_MESSAGE') { @@ -243,6 +281,9 @@ exports.loadSettings = async (hookName, {settings: {ep_webrtc: s = {}}}) => { } return false; })(); + if (settings.shardIceServers && settings.iceServers.length > 1) { + shardIceServersHmacSecret = await util.promisify(crypto.randomBytes.bind(crypto))(16); + } logger.info('configured:', util.inspect({ ...settings, iceServers: settings.iceServers.map((s) => s.credential ? {...s, credential: '*****'} : s), diff --git a/static/tests/backend/specs/sharding.js b/static/tests/backend/specs/sharding.js new file mode 100644 index 00000000..b51e014f --- /dev/null +++ b/static/tests/backend/specs/sharding.js @@ -0,0 +1,130 @@ +'use strict'; + +const assert = require('assert').strict; +const common = require('ep_etherpad-lite/tests/backend/common'); +const init = require('../init'); +const plugin = require('../../../../index'); +const settings = require('ep_etherpad-lite/node/utils/Settings'); + +describe(__filename, function () { + let agent; + const backup = {settings: {...settings}}; + const iceServers = [...Array(1000).keys()].map((i) => ({urls: [`turn:turn${i}.example.com`]})); + + const reload = async (settings = {}) => { + await plugin.loadSettings('loadSettings', {settings: {ep_webrtc: {iceServers, ...settings}}}); + }; + + const getIceServers = async (padId = common.randomString()) => { + while (getIceServers._busy != null) { + await getIceServers._busy; + } + if (++getIceServers._active >= getIceServers._limit) { + getIceServers._busy = new Promise((resolve) => getIceServers._resolve = resolve); + } + try { + const res = await agent.get(`/p/${padId}`).expect(200); + const socket = await common.connect(res); + try { + const {type, data: clientVars} = await common.handshake(socket, padId); + assert.equal(type, 'CLIENT_VARS'); + return clientVars.ep_webrtc.iceServers; + } finally { + socket.close(); + } + } finally { + if (--getIceServers._active < getIceServers._limit && getIceServers._busy != null) { + getIceServers._resolve(); + getIceServers._busy = null; + } + } + }; + getIceServers._limit = 5; // Avoid timeouts caused by overload. + getIceServers._active = 0; + getIceServers._resolve = () => {}; + + before(async function () { + settings.requireAuthentication = false; + agent = await init(); + }); + + after(async function () { + Object.assign(settings, backup.settings); + await plugin.loadSettings('loadSettings', {settings}); + }); + + it('defaults to disabled', async function () { + await reload(); + const got = await getIceServers(); + assert.deepEqual(got, iceServers); + }); + + it('explicitly disabled', async function () { + await reload({shardIceServers: false}); + const got = await getIceServers(); + assert.deepEqual(got, iceServers); + }); + + it('enabled, zero entries', async function () { + await reload({iceServers: [], shardIceServers: true}); + assert.deepEqual(await getIceServers(), []); + }); + + it('enabled, one entry', async function () { + const entries = [{urls: ['turn:turn.example.com']}]; + await reload({iceServers: entries, shardIceServers: true}); + assert.deepEqual(await getIceServers(), entries); + }); + + describe('enabled, multiple entries', function () { + beforeEach(async function () { + await reload({shardIceServers: true}); + }); + + it('only gives one entry to each client', async function () { + const got = await getIceServers(); + assert.equal(got.length, 1); + assert(iceServers.some((s) => { + try { + assert.deepEqual(got[0], s); + return true; + } catch (err) { + return false; + } + })); + }); + + it('same pad gets same entry', async function () { + this.timeout(60000); + const assignments = new Map(await Promise.all([...Array(10).keys()].map(async () => { + const padId = common.randomString(); + return [padId, await getIceServers(padId)]; + }))); + await Promise.all([...assignments].map(async ([padId, want]) => { + const got = await getIceServers(padId); + assert.deepEqual(got, want); + })); + }); + + it('randomizes assignments on reload', async function () { + this.timeout(60000); + const oldAssignments = new Map(await Promise.all([...Array(10).keys()].map(async () => { + const padId = common.randomString(); + return [padId, await getIceServers(padId)]; + }))); + await reload({shardIceServers: true}); + const newAssignments = new Map(await Promise.all( + [...oldAssignments.keys()].map(async (padId) => [padId, await getIceServers(padId)]))); + // With 10 pad IDs and 1000 ICE servers, the probability that every new assignment exactly + // matches the old assignment is effectively zero. + assert([...newAssignments].some(([padId, newAssignment]) => { + try { + assert.deepEqual(newAssignment, oldAssignments.get(padId)); + return false; + } catch (err) { + return true; + } + })); + }); + }); +});