From d4fda97070c0b999cf68a6ba45a519c64f85b7bc Mon Sep 17 00:00:00 2001 From: Mauro Murru Date: Tue, 5 Oct 2021 18:20:46 +0200 Subject: [PATCH] feat: add support to graqphql_transport_ws (#593) * feat: add support to graqphql_transport_ws * chore: make protocol data type more consistent * feat: add protocol option to subscription client * docs: gateway subscription protocol example --- examples/gateway-subscription.js | 5 +++- lib/subscription-client.js | 46 ++++++++++++++-------------- lib/subscription-connection.js | 39 +++++++++--------------- lib/subscription-protocol.js | 46 ++++++++++++++++++++-------- lib/subscription.js | 6 ++-- static/main.js | 4 +-- test/subscription-client.js | 44 +++++++++++++++++++++++++++ test/subscription-connection.js | 51 +++++++++++++++++++++----------- test/subscription-protocol.js | 32 ++++++++++++++++++++ 9 files changed, 191 insertions(+), 82 deletions(-) create mode 100644 test/subscription-protocol.js diff --git a/examples/gateway-subscription.js b/examples/gateway-subscription.js index 72a40099..bdcdef6f 100644 --- a/examples/gateway-subscription.js +++ b/examples/gateway-subscription.js @@ -267,7 +267,10 @@ async function start () { }, { name: 'comment', url: 'http://localhost:4003/graphql', - wsUrl: 'ws://localhost:4003/graphql' + wsUrl: 'ws://localhost:4003/graphql', + wsConnectionParams: { + protocols: ['graphql-transport-ws'] // optional, if not set, will use the default protocol (graphql-ws) + } }] } }) diff --git a/lib/subscription-client.js b/lib/subscription-client.js index aadc13bc..f588df18 100644 --- a/lib/subscription-client.js +++ b/lib/subscription-client.js @@ -4,18 +4,10 @@ const sJSON = require('secure-json-parse') const WebSocket = require('ws') const { - GQL_CONNECTION_INIT, - GQL_CONNECTION_ACK, - GQL_CONNECTION_ERROR, - GQL_CONNECTION_KEEP_ALIVE, - GQL_START, - GQL_DATA, - GQL_ERROR, - GQL_COMPLETE, - GQL_STOP, - GRAPHQL_WS + GRAPHQL_WS, + getProtocolByName } = require('./subscription-protocol') -const { MER_ERR_GQL_SUBSCRIPTION_MESSAGE_INVALID, MER_ERR_GQL_SUBSCRIPTION_CONNECTION_NOT_READY } = require('./errors') +const { MER_ERR_GQL_SUBSCRIPTION_MESSAGE_INVALID, MER_ERR_GQL_SUBSCRIPTION_CONNECTION_NOT_READY, MER_ERR_INVALID_OPTS } = require('./errors') class SubscriptionClient { constructor (uri, config) { @@ -38,7 +30,6 @@ class SubscriptionClient { rewriteConnectionInitPayload } = config - this.protocols = [GRAPHQL_WS, ...protocols] this.tryReconnect = reconnect this.maxReconnectAttempts = maxReconnectAttempts this.serviceName = serviceName @@ -49,6 +40,17 @@ class SubscriptionClient { this.connectionInitPayload = connectionInitPayload this.rewriteConnectionInitPayload = rewriteConnectionInitPayload + if (Array.isArray(protocols) && protocols.length > 0) { + this.protocols = protocols + } else { + this.protocols = [GRAPHQL_WS] + } + + this.protocolMessageTypes = getProtocolByName(this.protocols[0]) + + if (this.protocolMessageTypes === null) { + throw new MER_ERR_INVALID_OPTS(`${this.protocols[0]} is not a valid gateway subscription protocol`) + } this.connect() } @@ -62,7 +64,7 @@ class SubscriptionClient { const payload = typeof this.connectionInitPayload === 'function' ? await this.connectionInitPayload() : this.connectionInitPayload - this.sendMessage(null, GQL_CONNECTION_INIT, payload) + this.sendMessage(null, this.protocolMessageTypes.GQL_CONNECTION_INIT, payload) } catch (err) { this.close(this.tryReconnect, false) } @@ -141,7 +143,7 @@ class SubscriptionClient { count-- if (count === 0 || forceUnsubscribe) { - this.sendMessage(operationId, GQL_STOP, null) + this.sendMessage(operationId, this.protocolMessageTypes.GQL_STOP, null) this.operationsCount[operationId] = 0 } else { this.operationsCount[operationId] = count @@ -183,7 +185,7 @@ class SubscriptionClient { } switch (data.type) { - case GQL_CONNECTION_ACK: + case this.protocolMessageTypes.GQL_CONNECTION_ACK: this.reconnecting = false this.ready = true this.reconnectAttempts = 0 @@ -197,21 +199,21 @@ class SubscriptionClient { } break - case GQL_DATA: + case this.protocolMessageTypes.GQL_DATA: /* istanbul ignore else */ if (operation) { operation.handler(data.payload.data) } break - case GQL_ERROR: + case this.protocolMessageTypes.GQL_ERROR: /* istanbul ignore else */ if (operation) { operation.handler(null) this.operations.delete(operationId) - this.sendMessage(operationId, GQL_ERROR, data.payload) + this.sendMessage(operationId, this.protocolMessageTypes.GQL_ERROR, data.payload) } break - case GQL_COMPLETE: + case this.protocolMessageTypes.GQL_COMPLETE: /* istanbul ignore else */ if (operation) { operation.handler(null) @@ -219,13 +221,13 @@ class SubscriptionClient { } break - case GQL_CONNECTION_ERROR: + case this.protocolMessageTypes.GQL_CONNECTION_ERROR: this.close(this.tryReconnect, false) if (this.failedConnectionCallback) { await this.failedConnectionCallback(data.payload) } break - case GQL_CONNECTION_KEEP_ALIVE: + case this.protocolMessageTypes.GQL_CONNECTION_KEEP_ALIVE: break /* istanbul ignore next */ default: @@ -241,7 +243,7 @@ class SubscriptionClient { throw new MER_ERR_GQL_SUBSCRIPTION_CONNECTION_NOT_READY() } this.operations.set(operationId, { started: true, options, handler, extensions }) - this.sendMessage(operationId, GQL_START, options, extensions) + this.sendMessage(operationId, this.protocolMessageTypes.GQL_START, options, extensions) } } diff --git a/lib/subscription-connection.js b/lib/subscription-connection.js index 34136242..7305b48b 100644 --- a/lib/subscription-connection.js +++ b/lib/subscription-connection.js @@ -5,20 +5,10 @@ const { subscribe, parse } = require('graphql') const { SubscriptionContext } = require('./subscriber') const { kEntityResolvers } = require('./gateway/make-resolver') const sJSON = require('secure-json-parse') -const { - GQL_CONNECTION_INIT, - GQL_CONNECTION_ERROR, - GQL_CONNECTION_ACK, - GQL_CONNECTION_TERMINATE, - GQL_START, - GQL_DATA, - GQL_ERROR, - GQL_COMPLETE, - GQL_STOP -} = require('./subscription-protocol') const { MER_ERR_GQL_SUBSCRIPTION_FORBIDDEN, MER_ERR_GQL_SUBSCRIPTION_UNKNOWN_EXTENSION } = require('./errors') const { preSubscriptionParsingHandler, onSubscriptionResolutionHandler, preSubscriptionExecutionHandler, onSubscriptionEndHandler } = require('./handlers') const { kSubscriptionFactory, kLoaders } = require('./symbols') +const { getProtocolByName } = require('./subscription-protocol') module.exports = class SubscriptionConnection { constructor (socket, { @@ -43,7 +33,7 @@ module.exports = class SubscriptionConnection { this.context = context this.isReady = false this.resolveContext = resolveContext - + this.protocolMessageTypes = getProtocolByName(socket.protocol) this.socket.on('error', this.handleConnectionClose.bind(this)) this.handleConnection() } @@ -64,35 +54,34 @@ module.exports = class SubscriptionConnection { try { data = sJSON.parse(isBinary ? message : message.toString()) } catch (e) { - this.sendMessage(GQL_ERROR, null, 'Message must be a JSON string') + this.sendMessage(this.protocolMessageTypes.GQL_ERROR, null, 'Message must be a JSON string') return } const { id, type } = data - switch (type) { - case GQL_CONNECTION_INIT: + case this.protocolMessageTypes.GQL_CONNECTION_INIT: await this.handleConnectionInit(data) break - case GQL_CONNECTION_TERMINATE: + case this.protocolMessageTypes.GQL_CONNECTION_TERMINATE: this.handleConnectionClose() break - case GQL_START: { + case this.protocolMessageTypes.GQL_START: { if (this.isReady) { this.handleGQLStart(data).catch(e => { - this.sendMessage(GQL_ERROR, id, e.message) + this.sendMessage(this.protocolMessageTypes.GQL_ERROR, id, e.message) }) } else { - this.sendMessage(GQL_CONNECTION_ERROR, undefined, { message: 'Connection has not been established yet.' }) + this.sendMessage(this.protocolMessageTypes.GQL_CONNECTION_ERROR, undefined, { message: 'Connection has not been established yet.' }) return this.handleConnectionClose() } break } - case GQL_STOP: + case this.protocolMessageTypes.GQL_STOP: await this.handleGQLStop(data) break default: - this.sendMessage(GQL_ERROR, id, 'Invalid payload type') + this.sendMessage(this.protocolMessageTypes.GQL_ERROR, id, 'Invalid payload type') } } @@ -119,7 +108,7 @@ module.exports = class SubscriptionConnection { this.fastify.log.error(e) } if (!authorize) { - this.sendMessage(GQL_CONNECTION_ERROR, undefined, { message: 'Forbidden' }) + this.sendMessage(this.protocolMessageTypes.GQL_CONNECTION_ERROR, undefined, { message: 'Forbidden' }) return this.handleConnectionClose() } @@ -134,7 +123,7 @@ module.exports = class SubscriptionConnection { this.context._connectionInit = data.payload - this.sendMessage(GQL_CONNECTION_ACK) + this.sendMessage(this.protocolMessageTypes.GQL_CONNECTION_ACK) this.isReady = true } @@ -223,10 +212,10 @@ module.exports = class SubscriptionConnection { return this.handleConnectionClose() } } - this.sendMessage(GQL_DATA, data.id, value) + this.sendMessage(this.protocolMessageTypes.GQL_DATA, data.id, value) } - this.sendMessage(GQL_COMPLETE, data.id, null) + this.sendMessage(this.protocolMessageTypes.GQL_COMPLETE, data.id, null) } async handleGQLStop (data) { diff --git a/lib/subscription-protocol.js b/lib/subscription-protocol.js index e93f5d4e..a480555a 100644 --- a/lib/subscription-protocol.js +++ b/lib/subscription-protocol.js @@ -1,15 +1,37 @@ 'use strict' -module.exports.GQL_CONNECTION_INIT = 'connection_init' // Client -> Server -module.exports.GQL_CONNECTION_ACK = 'connection_ack' // Server -> Client -module.exports.GQL_CONNECTION_ERROR = 'connection_error' // Server -> Client -module.exports.GQL_CONNECTION_KEEP_ALIVE = 'ka' // Server -> Client +const GRAPHQL_WS = 'graphql-ws' +module.exports.GRAPHQL_WS = GRAPHQL_WS -module.exports.GQL_CONNECTION_TERMINATE = 'connection_terminate' // Client -> Server -module.exports.GQL_START = 'start' // Client -> Server -module.exports.GQL_DATA = 'data' // Server -> Client -module.exports.GQL_ERROR = 'error' // Server -> Client -module.exports.GQL_COMPLETE = 'complete' // Server -> Client -module.exports.GQL_STOP = 'stop' // Client -> Server - -module.exports.GRAPHQL_WS = 'graphql-ws' +module.exports.getProtocolByName = function (name) { + switch (true) { + case (name.indexOf('graphql-transport-ws') !== -1): + return { + GQL_CONNECTION_INIT: 'connection_init', // Client -> Server + GQL_CONNECTION_ACK: 'connection_ack', // Server -> Client + GQL_CONNECTION_ERROR: 'connection_error', // Server -> Client + GQL_CONNECTION_KEEP_ALIVE: 'ka', // Server -> Client + GQL_CONNECTION_TERMINATE: 'connection_terminate', // Client -> Server + GQL_START: 'subscribe', // Client -> Server + GQL_DATA: 'next', // Server -> Client + GQL_ERROR: 'error', // Server -> Client + GQL_COMPLETE: 'complete', // Server -> Client + GQL_STOP: 'stop' // Client -> Server + } + case (name.indexOf(GRAPHQL_WS) !== -1): + return { + GQL_CONNECTION_INIT: 'connection_init', // Client -> Server + GQL_CONNECTION_ACK: 'connection_ack', // Server -> Client + GQL_CONNECTION_ERROR: 'connection_error', // Server -> Client + GQL_CONNECTION_KEEP_ALIVE: 'ka', // Server -> Client + GQL_CONNECTION_TERMINATE: 'connection_terminate', // Client -> Server + GQL_START: 'start', // Client -> Server + GQL_DATA: 'data', // Server -> Client + GQL_ERROR: 'error', // Server -> Client + GQL_COMPLETE: 'complete', // Server -> Client + GQL_STOP: 'stop' // Client -> Server + } + default: + return null + } +} diff --git a/lib/subscription.js b/lib/subscription.js index 61502e70..b0026e9d 100644 --- a/lib/subscription.js +++ b/lib/subscription.js @@ -4,13 +4,13 @@ const fastifyWebsocket = require('fastify-websocket') const { assignLifeCycleHooksToContext, Hooks } = require('./hooks') const { kHooks } = require('./symbols') const SubscriptionConnection = require('./subscription-connection') -const GRAPHQL_WS = 'graphql-ws' +const { getProtocolByName } = require('./subscription-protocol') function createConnectionHandler ({ subscriber, fastify, onConnect, onDisconnect, lruGatewayResolvers, entityResolversFactory, subscriptionContextFn }) { return async (connection, request) => { const { socket } = connection - if (socket.protocol === undefined || - (socket.protocol.indexOf(GRAPHQL_WS) === -1)) { + + if (socket.protocol === undefined || getProtocolByName(socket.protocol) === null) { // Close the connection with an error code, ws v2 ensures that the // connection is cleaned up even when the closing handshake fails. // 1002: protocol error diff --git a/static/main.js b/static/main.js index 2de7bdb1..32bb79d7 100644 --- a/static/main.js +++ b/static/main.js @@ -1,4 +1,4 @@ -/* global React:false ReactDOM:false GraphiQL:false SubscriptionsTransportWs: false */ +/* global React:false ReactDOM:false GraphiQL:false */ const importer = { url: (url) => { @@ -26,7 +26,7 @@ function render () { const fetcher = GraphiQL.createFetcher({ url, - legacyClient: new SubscriptionsTransportWs.SubscriptionClient(subscriptionUrl) + subscriptionUrl }) ReactDOM.render( diff --git a/test/subscription-client.js b/test/subscription-client.js index 25103839..107d2edc 100644 --- a/test/subscription-client.js +++ b/test/subscription-client.js @@ -6,6 +6,50 @@ const FakeTimers = require('@sinonjs/fake-timers') const SubscriptionClient = require('../lib/subscription-client') const WS = require('ws') +test('subscription client initialization fails when a not supported protocol is in the options', (t) => { + t.plan(1) + t.throws(() => new SubscriptionClient('ws://localhost:1234', { + protocols: ['unsupported-protocol'], + serviceName: 'test-service' + }), 'Invalid options: unsupported-protocol is not a valid gateway subscription protocol') +}) + +test('subscription client calls the publish method with the correct payload', (t) => { + const server = new WS.Server({ port: 0 }) + const port = server.address().port + + server.on('connection', function connection (ws) { + ws.on('message', function incoming (message, isBinary) { + const data = JSON.parse(isBinary ? message : message.toString()) + if (data.type === 'connection_init') { + ws.send(JSON.stringify({ id: undefined, type: 'connection_ack' })) + } else if (data.type === 'subscribe') { + ws.send(JSON.stringify({ id: '1', type: 'next', payload: { data: { foo: 'bar' } } })) + } + }) + }) + + const client = new SubscriptionClient(`ws://localhost:${port}`, { + reconnect: true, + maxReconnectAttempts: 10, + serviceName: 'test-service', + protocols: ['graphql-transport-ws'], + connectionCallback: () => { + client.createSubscription('query', {}, (data) => { + t.same(data, { + topic: 'test-service_1', + payload: { + foo: 'bar' + } + }) + client.close() + server.close() + t.end() + }) + } + }) +}) + test('subscription client calls the publish method with the correct payload', (t) => { const server = new WS.Server({ port: 0 }) const port = server.address().port diff --git a/test/subscription-connection.js b/test/subscription-connection.js index b7de7afd..99b8bbef 100644 --- a/test/subscription-connection.js +++ b/test/subscription-connection.js @@ -6,6 +6,7 @@ const fastify = require('fastify') const mq = require('mqemitter') const SubscriptionConnection = require('../lib/subscription-connection') const { PubSub } = require('../lib/subscriber') +const { GRAPHQL_WS } = require('../lib/subscription-protocol') test('socket is closed on unhandled promise rejection in handleMessage', t => { t.plan(1) @@ -72,7 +73,8 @@ test('subscription connection sends error message when message is not json strin id: null, payload: 'Message must be a JSON string' }), message) - } + }, + protocol: GRAPHQL_WS }, {}) await sc.handleMessage('invalid json string') @@ -83,7 +85,8 @@ test('subscription connection handles GQL_CONNECTION_TERMINATE message correctly const sc = new SubscriptionConnection({ on () {}, close () { t.pass() }, - send (message) {} + send (message) {}, + protocol: GRAPHQL_WS }, {}) await sc.handleMessage(JSON.stringify({ @@ -97,7 +100,8 @@ test('subscription connection closes context on GQL_STOP message correctly', asy const sc = new SubscriptionConnection({ on () {}, close () {}, - send (message) {} + send (message) {}, + protocol: GRAPHQL_WS }, {}) sc.subscriptionContexts = new Map() @@ -120,7 +124,8 @@ test('subscription connection completes resolver iterator on GQL_STOP message co const sc = new SubscriptionConnection({ on () {}, close () {}, - send (message) {} + send (message) {}, + protocol: GRAPHQL_WS }, {}) sc.subscriptionIters = new Map() @@ -149,7 +154,8 @@ test('handles error in send and closes connection', async t => { close () { t.pass() }, - on () {} + on () {}, + protocol: GRAPHQL_WS }, {} ) @@ -161,7 +167,8 @@ test('subscription connection handles GQL_STOP message correctly, with no data', const sc = new SubscriptionConnection({ on () {}, close () {}, - send (message) {} + send (message) {}, + protocol: GRAPHQL_WS }, {}) await sc.handleMessage(JSON.stringify({ @@ -184,7 +191,8 @@ test('subscription connection send error message when GQL_START handler errs', a id: 1, payload: 'handleGQLStart error' }), message) - } + }, + protocol: GRAPHQL_WS }, {}) sc.isReady = true @@ -212,7 +220,8 @@ test('subscription connection send error message when client message type is inv id: 1, payload: 'Invalid payload type' }), message) - } + }, + protocol: GRAPHQL_WS }, {}) await sc.handleMessage(JSON.stringify({ @@ -232,7 +241,8 @@ test('subscription connection handles GQL_START message correctly, when payload. t.equal(JSON.stringify( { type: 'error', id: 1, payload: 'Must provide document.' } ), message) - } + }, + protocol: GRAPHQL_WS }, {}) sc.isReady = true @@ -254,7 +264,8 @@ test('subscription connection handles when GQL_START is called before GQL_INIT', t.equal(JSON.stringify( { type: 'connection_error', payload: { message: 'Connection has not been established yet.' } } ), message) - } + }, + protocol: GRAPHQL_WS }, {}) await sc.handleMessage(JSON.stringify({ @@ -276,7 +287,8 @@ test('subscription connection extends context with onConnect return value', asyn close () {}, send (message) { t.equal(JSON.stringify({ type: 'connection_ack' }), message) - } + }, + protocol: GRAPHQL_WS }, { context, onConnect: function () { @@ -301,7 +313,8 @@ test('subscription connection send GQL_ERROR message if connectionInit extension type: 'error', payload: 'Forbidden' }) - } + }, + protocol: GRAPHQL_WS }, { onConnect: function () { throw new Error('Not allowed') @@ -328,7 +341,8 @@ test('subscription connection does not create subscription if connectionInit ext const sc = new SubscriptionConnection({ on () { }, close () { }, - send (message) {} + send (message) {}, + protocol: GRAPHQL_WS }, { onConnect: function () { throw new Error('Not allowed') @@ -365,7 +379,8 @@ test('subscription connection send GQL_ERROR on unknown extension', async (t) => type: 'error', payload: 'Unknown extension unknown' }) - } + }, + protocol: GRAPHQL_WS }, { }) sc.isReady = true @@ -388,7 +403,8 @@ test('subscription connection handleConnectionInitExtension returns the onConnec const sc = new SubscriptionConnection({ on () { }, close () { }, - send (message) { } + send (message) { }, + protocol: GRAPHQL_WS }, { onConnect: function () { return onConnectResult @@ -406,14 +422,15 @@ test('subscription connection handleConnectionInitExtension returns the onConnec t.same(res, onConnectResult) }) -test('subscription connection externds the context with the connection_init payload', async (t) => { +test('subscription connection extends the context with the connection_init payload', async (t) => { const connectionInitPayload = { hello: 'world' } const sc = new SubscriptionConnection({ on () { }, close () { }, - send (message) { } + send (message) { }, + protocol: GRAPHQL_WS }, {}) await sc.handleConnectionInit({ type: 'connection_init', payload: connectionInitPayload }) diff --git a/test/subscription-protocol.js b/test/subscription-protocol.js new file mode 100644 index 00000000..b7c8472c --- /dev/null +++ b/test/subscription-protocol.js @@ -0,0 +1,32 @@ +'use strict' +const { test } = require('tap') +const { getProtocolByName } = require('../lib/subscription-protocol') + +test('getProtocolByName returns correct protocol message types', t => { + t.plan(3) + t.same(getProtocolByName('graphql-ws'), { + GQL_CONNECTION_INIT: 'connection_init', + GQL_CONNECTION_ACK: 'connection_ack', + GQL_CONNECTION_ERROR: 'connection_error', + GQL_CONNECTION_KEEP_ALIVE: 'ka', + GQL_CONNECTION_TERMINATE: 'connection_terminate', + GQL_START: 'start', + GQL_DATA: 'data', + GQL_ERROR: 'error', + GQL_COMPLETE: 'complete', + GQL_STOP: 'stop' + }) + t.same(getProtocolByName('graphql-transport-ws'), { + GQL_CONNECTION_INIT: 'connection_init', + GQL_CONNECTION_ACK: 'connection_ack', + GQL_CONNECTION_ERROR: 'connection_error', + GQL_CONNECTION_KEEP_ALIVE: 'ka', + GQL_CONNECTION_TERMINATE: 'connection_terminate', + GQL_START: 'subscribe', + GQL_DATA: 'next', + GQL_ERROR: 'error', + GQL_COMPLETE: 'complete', + GQL_STOP: 'stop' + }) + t.equal(getProtocolByName('unsupported-protocol'), null) +})