diff --git a/jupyterlab_widgets/src/manager.ts b/jupyterlab_widgets/src/manager.ts index 12050aac3e..da33511be4 100644 --- a/jupyterlab_widgets/src/manager.ts +++ b/jupyterlab_widgets/src/manager.ts @@ -16,7 +16,8 @@ import { import { ManagerBase, serialize_state, - IStateOptions + IStateOptions, + mapBatch } from '@jupyter-widgets/base-manager'; import { IDisposable } from '@lumino/disposable'; @@ -106,9 +107,14 @@ export abstract class LabWidgetManager extends ManagerBase } const comm_ids = await this._get_comm_info(); - // For each comm id that we do not know about, create the comm, and request the state. - const widgets_info = await Promise.all( - Object.keys(comm_ids).map(async comm_id => { + // For each comm id that we do not know about, create the comm, and + // request the state. We must do this in batches to make sure we do not + // exceed the ZMQ high water mark limiting messages from the kernel. See + // https://github.com/voila-dashboards/voila/issues/534 for more details. + const widgets_info = await mapBatch( + Object.keys(comm_ids), + 100, + async comm_id => { try { await this.get_model(comm_id); // If we successfully get the model, do no more. @@ -147,7 +153,7 @@ export abstract class LabWidgetManager extends ManagerBase return info.promise; } - }) + } ); // We put in a synchronization barrier here so that we don't have to diff --git a/packages/base-manager/src/utils.ts b/packages/base-manager/src/utils.ts index 0e0c7f59ba..c49f3fc369 100644 --- a/packages/base-manager/src/utils.ts +++ b/packages/base-manager/src/utils.ts @@ -298,3 +298,22 @@ export function bufferToBase64(buffer: ArrayBuffer): string { export function base64ToBuffer(base64: string): ArrayBuffer { return toByteArray(base64).buffer; } + +/** + * Map a function onto a list in batches, resolving each batch of returned + * promises before moving to the next batch. + */ +export async function mapBatch( + list: T[], + step: number, + fn: (value: T, index: number, array: T[]) => Promise | U, + thisArg?: any +): Promise { + const results = []; + for (let i = 0; i < list.length; i += step) { + results.push( + ...(await Promise.all(list.slice(i, i + step).map(fn, thisArg))) + ); + } + return results; +} diff --git a/widgetsnbextension/src/manager.js b/widgetsnbextension/src/manager.js index 5bbc446b59..5989b723f2 100644 --- a/widgetsnbextension/src/manager.js +++ b/widgetsnbextension/src/manager.js @@ -3,7 +3,7 @@ 'use strict'; var base = require('@jupyter-widgets/base'); -var ManagerBase = require('@jupyter-widgets/base-manager').ManagerBase; +var baseManager = require('@jupyter-widgets/base-manager'); var widgets = require('@jupyter-widgets/controls'); var outputWidgets = require('./widget_output'); var saveState = require('./save_state'); @@ -74,7 +74,7 @@ function new_comm( // WidgetManager class //-------------------------------------------------------------------- -export class WidgetManager extends ManagerBase { +export class WidgetManager extends baseManager.ManagerBase { constructor(comm_manager, notebook) { super(); // Managers are stored in *reverse* order, so that _managers[0] is the most recent. @@ -111,34 +111,32 @@ export class WidgetManager extends ManagerBase { // for the responses (2). return Promise.all(comm_promises) .then(function(comms) { - return Promise.all( - comms.map(function(comm) { - var update_promise = new Promise(function(resolve, reject) { - comm.on_msg(function(msg) { - base.put_buffers( - msg.content.data.state, - msg.content.data.buffer_paths, - msg.buffers - ); - // A suspected response was received, check to see if - // it's a state update. If so, resolve. - if (msg.content.data.method === 'update') { - resolve({ - comm: comm, - msg: msg - }); - } - }); + return baseManager.mapBatch(comms, 100, function(comm) { + var update_promise = new Promise(function(resolve, reject) { + comm.on_msg(function(msg) { + base.put_buffers( + msg.content.data.state, + msg.content.data.buffer_paths, + msg.buffers + ); + // A suspected response was received, check to see if + // it's a state update. If so, resolve. + if (msg.content.data.method === 'update') { + resolve({ + comm: comm, + msg: msg + }); + } }); - comm.send( - { - method: 'request_state' - }, - that.callbacks() - ); - return update_promise; - }) - ); + }); + comm.send( + { + method: 'request_state' + }, + that.callbacks() + ); + return update_promise; + }); }) .then(function(widgets_info) { return Promise.all( @@ -407,7 +405,10 @@ export class WidgetManager extends ManagerBase { * Callback handlers for a specific view */ callbacks(view) { - var callbacks = ManagerBase.prototype.callbacks.call(this, view); + var callbacks = baseManager.ManagerBase.prototype.callbacks.call( + this, + view + ); if (view && view.options.iopub_callbacks) { callbacks.iopub = view.options.iopub_callbacks; }