Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Define server plugin interface, move Agent queries code into a built-in plugin #353

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
197 changes: 46 additions & 151 deletions lib/agent.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ var ShareDBError = require('./error');

var ERROR_CODE = ShareDBError.CODES;

/** @typedef {import('./backend')} Backend */
/** @typedef {import('./backend').ServerPlugin<unknown, unknown, unknown>} ServerPlugin */

/**
* Agent deserializes the wire protocol messages received from the stream and
* calls the corresponding functions on its Agent. It uses the return values
Expand All @@ -16,6 +19,7 @@ var ERROR_CODE = ShareDBError.CODES;
* @param {Duplex} stream connection to a client
*/
function Agent(backend, stream) {
/** @type {Backend} */
this.backend = backend;
this.stream = stream;

Expand All @@ -32,8 +36,24 @@ function Agent(backend, stream) {
// map of collection -> id -> stream
this.subscribedDocs = {};

// Map from queryId -> emitter
this.subscribedQueries = {};
/**
* Map of action name (`a` field in requests) to plugin
* @type {{ [action: string]: ServerPlugin }}
*/
var actionToPlugin = this.actionToPlugin = {};
// Map of plugin name -> plugin's agent state
this.pluginStates = {};
for (var i = 0; i < backend.plugins.length; i++) {
var plugin = backend.plugins[i];
for (var action in plugin.requestHandlers) {
if (actionToPlugin[action]) {
throw new Error('Action ' + action + ' in plugin ' + plugin.name +
' conflicts with plugin ' + actionToPlugin[action].name);
}
actionToPlugin[action] = plugin;
}
this.pluginStates[plugin.name] = plugin.createAgentState();
}

// Track which documents are subscribed to presence by the client. This is a
// map of channel -> stream
Expand Down Expand Up @@ -167,43 +187,6 @@ Agent.prototype._subscribeToPresenceStream = function(channel, stream) {
});
};

Agent.prototype._subscribeToQuery = function(emitter, queryId, collection, query) {
var previous = this.subscribedQueries[queryId];
if (previous) previous.destroy();
this.subscribedQueries[queryId] = emitter;

var agent = this;
emitter.onExtra = function(extra) {
agent.send({a: 'q', id: queryId, extra: extra});
};

emitter.onDiff = function(diff) {
for (var i = 0; i < diff.length; i++) {
var item = diff[i];
if (item.type === 'insert') {
item.values = getResultsData(item.values);
}
}
// Consider stripping the collection out of the data we send here
// if it matches the query's collection.
agent.send({a: 'q', id: queryId, diff: diff});
};

emitter.onError = function(err) {
// Log then silently ignore errors in a subscription stream, since these
// may not be the client's fault, and they were not the result of a
// direct request by the client
logger.error('Query subscription stream error', collection, query, err);
};

emitter.onOp = function(op) {
var id = op.d;
agent._onOp(collection, id, op);
};

emitter._open();
};

Agent.prototype._onOp = function(collection, id, op) {
if (this._isOwnOp(collection, op)) return;

Expand Down Expand Up @@ -379,19 +362,22 @@ Agent.prototype._checkRequest = function(request) {
// Handle an incoming message from the client
Agent.prototype._handleMessage = function(request, callback) {
try {
var plugin = this.actionToPlugin[request.a];

var errMessage = this._checkRequest(request);
if (errMessage) return callback(new ShareDBError(ERROR_CODE.ERR_MESSAGE_BADLY_FORMED, errMessage));
if (plugin) {
try {
plugin.checkRequest(request);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this method should be optional? Or do we want to enforce that all plugins validate requests?

} catch (err) {
return callback(new ShareDBError(ERROR_CODE.ERR_MESSAGE_BADLY_FORMED, err.message));
}
}

switch (request.a) {
case 'hs':
if (request.id) this.src = request.id;
return callback(null, this._initMessage('hs'));
case 'qf':
return this._queryFetch(request.id, request.c, request.q, getQueryOptions(request), callback);
case 'qs':
return this._querySubscribe(request.id, request.c, request.q, getQueryOptions(request), callback);
case 'qu':
return this._queryUnsubscribe(request.id, callback);
case 'bf':
return this._fetchBulk(request.c, request.b, callback);
case 'bs':
Expand Down Expand Up @@ -435,109 +421,23 @@ Agent.prototype._handleMessage = function(request, callback) {
case 'pu':
return this._unsubscribePresence(request.ch, request.seq, callback);
default:
callback(new ShareDBError(ERROR_CODE.ERR_MESSAGE_BADLY_FORMED, 'Invalid or unknown message'));
}
} catch (err) {
callback(err);
}
};
function getQueryOptions(request) {
var results = request.r;
var ids;
var fetch;
var fetchOps;
if (results) {
ids = [];
for (var i = 0; i < results.length; i++) {
var result = results[i];
var id = result[0];
var version = result[1];
ids.push(id);
if (version == null) {
if (fetch) {
fetch.push(id);
if (plugin) {
return plugin.requestHandlers[request.a](request, {
agent: this,
backend: this.backend,
agentState: this.pluginStates[plugin.name]
}, callback);
} else {
fetch = [id];
callback(
new ShareDBError(ERROR_CODE.ERR_MESSAGE_BADLY_FORMED, 'Invalid or unknown message')
);
}
} else {
if (!fetchOps) fetchOps = {};
fetchOps[id] = version;
}
}
} catch (err) {
callback(err);
}
var options = request.o || {};
options.ids = ids;
options.fetch = fetch;
options.fetchOps = fetchOps;
return options;
}

Agent.prototype._queryFetch = function(queryId, collection, query, options, callback) {
// Fetch the results of a query once
this.backend.queryFetch(this, collection, query, options, function(err, results, extra) {
if (err) return callback(err);
var message = {
data: getResultsData(results),
extra: extra
};
callback(null, message);
});
};

Agent.prototype._querySubscribe = function(queryId, collection, query, options, callback) {
// Subscribe to a query. The client is sent the query results and its
// notified whenever there's a change
var agent = this;
var wait = 1;
var message;
function finish(err) {
if (err) return callback(err);
if (--wait) return;
callback(null, message);
}
if (options.fetch) {
wait++;
this.backend.fetchBulk(this, collection, options.fetch, function(err, snapshotMap) {
if (err) return finish(err);
message = getMapResult(snapshotMap);
finish();
});
}
if (options.fetchOps) {
wait++;
this._fetchBulkOps(collection, options.fetchOps, finish);
}
this.backend.querySubscribe(this, collection, query, options, function(err, emitter, results, extra) {
if (err) return finish(err);
if (this.closed) return emitter.destroy();

agent._subscribeToQuery(emitter, queryId, collection, query);
// No results are returned when ids are passed in as an option. Instead,
// want to re-poll the entire query once we've established listeners to
// emit any diff in results
if (!results) {
emitter.queryPoll(finish);
return;
}
message = {
data: getResultsData(results),
extra: extra
};
finish();
});
};

function getResultsData(results) {
var items = [];
for (var i = 0; i < results.length; i++) {
var result = results[i];
var item = getSnapshotData(result);
item.d = result.id;
items.push(item);
}
return items;
}

function getMapResult(snapshotMap) {
var data = {};
for (var id in snapshotMap) {
Expand All @@ -553,6 +453,8 @@ function getMapResult(snapshotMap) {
}
return {data: data};
}
// Exported for use in core plugins
Agent._getMapResult = getMapResult;

function getSnapshotData(snapshot) {
var data = {
Expand All @@ -564,15 +466,8 @@ function getSnapshotData(snapshot) {
}
return data;
}

Agent.prototype._queryUnsubscribe = function(queryId, callback) {
var emitter = this.subscribedQueries[queryId];
if (emitter) {
emitter.destroy();
delete this.subscribedQueries[queryId];
}
process.nextTick(callback);
};
// Exported for use in core plugins
Agent._getSnapshotData = getSnapshotData;

Agent.prototype._fetch = function(collection, id, version, callback) {
if (version == null) {
Expand Down
63 changes: 63 additions & 0 deletions lib/backend.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ var MemoryPubSub = require('./pubsub/memory');
var ot = require('./ot');
var projections = require('./projections');
var QueryEmitter = require('./query-emitter');
var QueryServerPlugin = require('./query-server-plugin').Plugin;
var ShareDBError = require('./error');
var Snapshot = require('./snapshot');
var StreamSocket = require('./stream-socket');
Expand Down Expand Up @@ -37,6 +38,10 @@ function Backend(options) {
// Map from event name to a list of middleware
this.middleware = {};

/** @type {Array<ServerPlugin<unknown, unknown, unknown>>} */
this.plugins = [];
this._registerServerPlugin(new QueryServerPlugin());

// The number of open agents for monitoring and testing memory leaks
this.agentsCount = 0;
this.remoteAgentsCount = 0;
Expand Down Expand Up @@ -82,6 +87,64 @@ Backend.prototype.SNAPSHOT_TYPES = {
byTimestamp: 'byTimestamp'
};

/**
* @param {ServerPlugin<unknown, unknown, unknown>} plugin
*/
Backend.prototype._registerServerPlugin = function(plugin) {
this.plugins.push(plugin);
};

/**
* @typedef {object} ServerPlugin
*
* @property {string} name Unique plugin name, usually based on the plugin's package name
* @property {{[action: string]: RequestHandler<TReq>}} requestHandlers
* @property {(callback: (error?: Error) => void) => void} close Function called when
* `Backend#close` is called. The close function should shut down database connections and then
* call the callback.
* @property {() => S} createAgentState Function to create an object that contains custom
* per-agent state that the plugin needs
* @property {(agentState: S) => void} destroyAgentState Function to tear down the plugin's state
* for an agent
* @property {(request: unknown) => request is TReq} checkRequest Function to check the format of
* an incoming message from a client, throwing an error if the message is invalid
*
* @template TReq - Type for request data from a client
* @template TResp - Type for response data sent back to a client
* @template S - Type for custom per-agent state that the plugin needs to keep
*/

/**
* @callback RequestHandler
*
* Function that handles an incoming message from a client
*
* @param {TReq} request Request message from a client
* @param {RequestHandlerContext<S>} context
* @param {(err?: Error | null, reply?: TResp) => void} callback Callback to be called with the
* reply message
* @returns {void}
*
* @template TReq - Type for request data from a client
* @template TResp - Type for response data sent back to a client
* @template S - Type for custom per-agent state that the plugin needs to keep
*/

/**
* @typedef {object} RequestHandlerContext
*
* @property {import('./agent')} agent
* @property {import('./backend')} backend
* @property {S} agentState
*
* @template S
*/

/**
* Closes the backend and all its database connections.
*
* @param {(error?: Error) => void} callback
*/
Backend.prototype.close = function(callback) {
var wait = 4;
var backend = this;
Expand Down
Loading