diff --git a/js/package.json b/js/package.json index c3516a19b..c696d4360 100644 --- a/js/package.json +++ b/js/package.json @@ -27,6 +27,7 @@ "css-loader": "^3.0.0", "file-loader": "^4.0.0", "npm-run-all": "^4.1.5", + "promise-semaphore": "^0.2.8", "style-loader": "^0.23.1", "url-loader": "^1.0.0", "webpack": "^4.29.3", diff --git a/js/src/manager.js b/js/src/manager.js index 40aa79eaf..86c74fedd 100644 --- a/js/src/manager.js +++ b/js/src/manager.js @@ -16,6 +16,7 @@ import * as controls from '@jupyter-widgets/controls'; import * as PhosphorWidget from '@phosphor/widgets'; import { requireLoader } from './loader'; +import { batchRateMap } from './utils'; if (typeof window !== "undefined" && typeof window.define !== "undefined") { window.define("@jupyter-widgets/base", base); @@ -117,10 +118,17 @@ export class WidgetManager extends JupyterLabManager { async _build_models() { const comm_ids = await this._get_comm_info(); const models = {}; - const widgets_info = await Promise.all(Object.keys(comm_ids).map(async (comm_id) => { + /** + * For the classical notebook, iopub_msg_rate_limit=1000 (default) + * And for zmq, we are affected by the default ZMQ_SNDHWM setting of 1000 + * See https://github.com/voila-dashboards/voila/issues/534 for a discussion + */ + const maxMessagesInTransit = 100; // really save limit compared to ZMQ_SNDHWM + const maxMessagesPerSecond = 500; // lets be on the save side, in case the kernel sends more msg'es + const widgets_info = await Promise.all(batchRateMap(Object.keys(comm_ids), async (comm_id) => { const comm = await this._create_comm(this.comm_target_name, comm_id); return this._update_comm(comm); - })); + }, {room: maxMessagesInTransit, rate: maxMessagesPerSecond})); await Promise.all(widgets_info.map(async (widget_info) => { const state = widget_info.msg.content.data.state; diff --git a/js/src/utils.js b/js/src/utils.js new file mode 100644 index 000000000..05a42e187 --- /dev/null +++ b/js/src/utils.js @@ -0,0 +1,31 @@ +import PSemaphore from 'promise-semaphore'; + +const delay = (sec) => new Promise((resolve) => setTimeout(resolve, sec*1000)); + +/** + * Map a function onto a list where fn is being called at a limit of 'rate' number of calls per second. + * and 'room' number of parallel calls. + * Note that the minimum window at which rate is respected is room/rate seconds. + */ +export +const batchRateMap = (list, fn, {room, rate}) => { + var queue = new PSemaphore({rooms: room}) + return list.map(async (value) => { + return new Promise((valueResolve, reject) => { + queue.add(() => { + // We may not want to start the next job directly, we want to respect the + // throttling/rate, e.g.: + // If we have room for 10 parallel jobs, at a max rate of 100/second, each job + // should take at least 10/100=0.1 seconds. + // If we have room for 100 parallel jobs, and a max rate of 10/second, each + // job should take 100/10=10 seconds. But it will have a burst of 100 calls. + const throttlePromise = delay(room/rate); + // If the job is done, resolve the promise immediately, don't want for the throttle Promise + // This means that if we have room for 10 parallel jobs + // and just 9 jobs, we will never have to wait for the throttlePromise + const resultPromise = fn(value).then(valueResolve); + return Promise.all([resultPromise, throttlePromise]); + }); + }); + }); +}