Skip to content

Commit

Permalink
Restore widgets in batches to not exceed the zmq high water mark mess…
Browse files Browse the repository at this point in the history
…age limit in the kernel.

Current ZMQ by default limits the kernel’s iopub message send queue to at most 1000 messages (and the real limit can be much lower, see ZMQ_SNDHWM at http://api.zeromq.org/4-3:zmq-setsockopt). We now request comm state in batches to avoid this limit.


See voila-dashboards/voila#534 for more details, including where this was causing real problems.
  • Loading branch information
jasongrout authored and jtpio committed Mar 26, 2021
1 parent 24628f0 commit 8c008d4
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 35 deletions.
16 changes: 11 additions & 5 deletions jupyterlab_widgets/src/manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ import {
import {
ManagerBase,
serialize_state,
IStateOptions
IStateOptions,
mapBatch
} from '@jupyter-widgets/base-manager';

import { IDisposable } from '@lumino/disposable';
Expand Down Expand Up @@ -106,9 +107,14 @@ export abstract class LabWidgetManager extends ManagerBase
}
const comm_ids = await this._get_comm_info();

// For each comm id that we do not know about, create the comm, and request the state.
const widgets_info = await Promise.all(
Object.keys(comm_ids).map(async comm_id => {
// For each comm id that we do not know about, create the comm, and
// request the state. We must do this in batches to make sure we do not
// exceed the ZMQ high water mark limiting messages from the kernel. See
// https://github.com/voila-dashboards/voila/issues/534 for more details.
const widgets_info = await mapBatch(
Object.keys(comm_ids),
100,
async comm_id => {
try {
await this.get_model(comm_id);
// If we successfully get the model, do no more.
Expand Down Expand Up @@ -147,7 +153,7 @@ export abstract class LabWidgetManager extends ManagerBase

return info.promise;
}
})
}
);

// We put in a synchronization barrier here so that we don't have to
Expand Down
19 changes: 19 additions & 0 deletions packages/base-manager/src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -298,3 +298,22 @@ export function bufferToBase64(buffer: ArrayBuffer): string {
export function base64ToBuffer(base64: string): ArrayBuffer {
return toByteArray(base64).buffer;
}

/**
* Map a function onto a list in batches, resolving each batch of returned
* promises before moving to the next batch.
*/
export async function mapBatch<T, U>(
list: T[],
step: number,
fn: (value: T, index: number, array: T[]) => Promise<U> | U,
thisArg?: any
): Promise<U[]> {
const results = [];
for (let i = 0; i < list.length; i += step) {
results.push(
...(await Promise.all(list.slice(i, i + step).map(fn, thisArg)))
);
}
return results;
}
61 changes: 31 additions & 30 deletions widgetsnbextension/src/manager.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
'use strict';

var base = require('@jupyter-widgets/base');
var ManagerBase = require('@jupyter-widgets/base-manager').ManagerBase;
var baseManager = require('@jupyter-widgets/base-manager');
var widgets = require('@jupyter-widgets/controls');
var outputWidgets = require('./widget_output');
var saveState = require('./save_state');
Expand Down Expand Up @@ -74,7 +74,7 @@ function new_comm(
// WidgetManager class
//--------------------------------------------------------------------

export class WidgetManager extends ManagerBase {
export class WidgetManager extends baseManager.ManagerBase {
constructor(comm_manager, notebook) {
super();
// Managers are stored in *reverse* order, so that _managers[0] is the most recent.
Expand Down Expand Up @@ -111,34 +111,32 @@ export class WidgetManager extends ManagerBase {
// for the responses (2).
return Promise.all(comm_promises)
.then(function(comms) {
return Promise.all(
comms.map(function(comm) {
var update_promise = new Promise(function(resolve, reject) {
comm.on_msg(function(msg) {
base.put_buffers(
msg.content.data.state,
msg.content.data.buffer_paths,
msg.buffers
);
// A suspected response was received, check to see if
// it's a state update. If so, resolve.
if (msg.content.data.method === 'update') {
resolve({
comm: comm,
msg: msg
});
}
});
return baseManager.mapBatch(comms, 100, function(comm) {
var update_promise = new Promise(function(resolve, reject) {
comm.on_msg(function(msg) {
base.put_buffers(
msg.content.data.state,
msg.content.data.buffer_paths,
msg.buffers
);
// A suspected response was received, check to see if
// it's a state update. If so, resolve.
if (msg.content.data.method === 'update') {
resolve({
comm: comm,
msg: msg
});
}
});
comm.send(
{
method: 'request_state'
},
that.callbacks()
);
return update_promise;
})
);
});
comm.send(
{
method: 'request_state'
},
that.callbacks()
);
return update_promise;
});
})
.then(function(widgets_info) {
return Promise.all(
Expand Down Expand Up @@ -407,7 +405,10 @@ export class WidgetManager extends ManagerBase {
* Callback handlers for a specific view
*/
callbacks(view) {
var callbacks = ManagerBase.prototype.callbacks.call(this, view);
var callbacks = baseManager.ManagerBase.prototype.callbacks.call(
this,
view
);
if (view && view.options.iopub_callbacks) {
callbacks.iopub = view.options.iopub_callbacks;
}
Expand Down

0 comments on commit 8c008d4

Please sign in to comment.