Skip to content

Commit

Permalink
feat: add support to graqphql_transport_ws (#593)
Browse files Browse the repository at this point in the history
* feat: add support to graqphql_transport_ws

* chore: make protocol data type more consistent

* feat: add protocol option to subscription client

* docs: gateway subscription protocol example
  • Loading branch information
brainrepo authored Oct 5, 2021
1 parent 41be4dd commit d4fda97
Show file tree
Hide file tree
Showing 9 changed files with 191 additions and 82 deletions.
5 changes: 4 additions & 1 deletion examples/gateway-subscription.js
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,10 @@ async function start () {
}, {
name: 'comment',
url: 'http://localhost:4003/graphql',
wsUrl: 'ws://localhost:4003/graphql'
wsUrl: 'ws://localhost:4003/graphql',
wsConnectionParams: {
protocols: ['graphql-transport-ws'] // optional, if not set, will use the default protocol (graphql-ws)
}
}]
}
})
Expand Down
46 changes: 24 additions & 22 deletions lib/subscription-client.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,10 @@ const sJSON = require('secure-json-parse')

const WebSocket = require('ws')
const {
GQL_CONNECTION_INIT,
GQL_CONNECTION_ACK,
GQL_CONNECTION_ERROR,
GQL_CONNECTION_KEEP_ALIVE,
GQL_START,
GQL_DATA,
GQL_ERROR,
GQL_COMPLETE,
GQL_STOP,
GRAPHQL_WS
GRAPHQL_WS,
getProtocolByName
} = require('./subscription-protocol')
const { MER_ERR_GQL_SUBSCRIPTION_MESSAGE_INVALID, MER_ERR_GQL_SUBSCRIPTION_CONNECTION_NOT_READY } = require('./errors')
const { MER_ERR_GQL_SUBSCRIPTION_MESSAGE_INVALID, MER_ERR_GQL_SUBSCRIPTION_CONNECTION_NOT_READY, MER_ERR_INVALID_OPTS } = require('./errors')

class SubscriptionClient {
constructor (uri, config) {
Expand All @@ -38,7 +30,6 @@ class SubscriptionClient {
rewriteConnectionInitPayload
} = config

this.protocols = [GRAPHQL_WS, ...protocols]
this.tryReconnect = reconnect
this.maxReconnectAttempts = maxReconnectAttempts
this.serviceName = serviceName
Expand All @@ -49,6 +40,17 @@ class SubscriptionClient {
this.connectionInitPayload = connectionInitPayload
this.rewriteConnectionInitPayload = rewriteConnectionInitPayload

if (Array.isArray(protocols) && protocols.length > 0) {
this.protocols = protocols
} else {
this.protocols = [GRAPHQL_WS]
}

this.protocolMessageTypes = getProtocolByName(this.protocols[0])

if (this.protocolMessageTypes === null) {
throw new MER_ERR_INVALID_OPTS(`${this.protocols[0]} is not a valid gateway subscription protocol`)
}
this.connect()
}

Expand All @@ -62,7 +64,7 @@ class SubscriptionClient {
const payload = typeof this.connectionInitPayload === 'function'
? await this.connectionInitPayload()
: this.connectionInitPayload
this.sendMessage(null, GQL_CONNECTION_INIT, payload)
this.sendMessage(null, this.protocolMessageTypes.GQL_CONNECTION_INIT, payload)
} catch (err) {
this.close(this.tryReconnect, false)
}
Expand Down Expand Up @@ -141,7 +143,7 @@ class SubscriptionClient {
count--

if (count === 0 || forceUnsubscribe) {
this.sendMessage(operationId, GQL_STOP, null)
this.sendMessage(operationId, this.protocolMessageTypes.GQL_STOP, null)
this.operationsCount[operationId] = 0
} else {
this.operationsCount[operationId] = count
Expand Down Expand Up @@ -183,7 +185,7 @@ class SubscriptionClient {
}

switch (data.type) {
case GQL_CONNECTION_ACK:
case this.protocolMessageTypes.GQL_CONNECTION_ACK:
this.reconnecting = false
this.ready = true
this.reconnectAttempts = 0
Expand All @@ -197,35 +199,35 @@ class SubscriptionClient {
}

break
case GQL_DATA:
case this.protocolMessageTypes.GQL_DATA:
/* istanbul ignore else */
if (operation) {
operation.handler(data.payload.data)
}
break
case GQL_ERROR:
case this.protocolMessageTypes.GQL_ERROR:
/* istanbul ignore else */
if (operation) {
operation.handler(null)
this.operations.delete(operationId)
this.sendMessage(operationId, GQL_ERROR, data.payload)
this.sendMessage(operationId, this.protocolMessageTypes.GQL_ERROR, data.payload)
}
break
case GQL_COMPLETE:
case this.protocolMessageTypes.GQL_COMPLETE:
/* istanbul ignore else */
if (operation) {
operation.handler(null)
this.operations.delete(operationId)
}

break
case GQL_CONNECTION_ERROR:
case this.protocolMessageTypes.GQL_CONNECTION_ERROR:
this.close(this.tryReconnect, false)
if (this.failedConnectionCallback) {
await this.failedConnectionCallback(data.payload)
}
break
case GQL_CONNECTION_KEEP_ALIVE:
case this.protocolMessageTypes.GQL_CONNECTION_KEEP_ALIVE:
break
/* istanbul ignore next */
default:
Expand All @@ -241,7 +243,7 @@ class SubscriptionClient {
throw new MER_ERR_GQL_SUBSCRIPTION_CONNECTION_NOT_READY()
}
this.operations.set(operationId, { started: true, options, handler, extensions })
this.sendMessage(operationId, GQL_START, options, extensions)
this.sendMessage(operationId, this.protocolMessageTypes.GQL_START, options, extensions)
}
}

Expand Down
39 changes: 14 additions & 25 deletions lib/subscription-connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,10 @@ const { subscribe, parse } = require('graphql')
const { SubscriptionContext } = require('./subscriber')
const { kEntityResolvers } = require('./gateway/make-resolver')
const sJSON = require('secure-json-parse')
const {
GQL_CONNECTION_INIT,
GQL_CONNECTION_ERROR,
GQL_CONNECTION_ACK,
GQL_CONNECTION_TERMINATE,
GQL_START,
GQL_DATA,
GQL_ERROR,
GQL_COMPLETE,
GQL_STOP
} = require('./subscription-protocol')
const { MER_ERR_GQL_SUBSCRIPTION_FORBIDDEN, MER_ERR_GQL_SUBSCRIPTION_UNKNOWN_EXTENSION } = require('./errors')
const { preSubscriptionParsingHandler, onSubscriptionResolutionHandler, preSubscriptionExecutionHandler, onSubscriptionEndHandler } = require('./handlers')
const { kSubscriptionFactory, kLoaders } = require('./symbols')
const { getProtocolByName } = require('./subscription-protocol')

module.exports = class SubscriptionConnection {
constructor (socket, {
Expand All @@ -43,7 +33,7 @@ module.exports = class SubscriptionConnection {
this.context = context
this.isReady = false
this.resolveContext = resolveContext

this.protocolMessageTypes = getProtocolByName(socket.protocol)
this.socket.on('error', this.handleConnectionClose.bind(this))
this.handleConnection()
}
Expand All @@ -64,35 +54,34 @@ module.exports = class SubscriptionConnection {
try {
data = sJSON.parse(isBinary ? message : message.toString())
} catch (e) {
this.sendMessage(GQL_ERROR, null, 'Message must be a JSON string')
this.sendMessage(this.protocolMessageTypes.GQL_ERROR, null, 'Message must be a JSON string')
return
}

const { id, type } = data

switch (type) {
case GQL_CONNECTION_INIT:
case this.protocolMessageTypes.GQL_CONNECTION_INIT:
await this.handleConnectionInit(data)
break
case GQL_CONNECTION_TERMINATE:
case this.protocolMessageTypes.GQL_CONNECTION_TERMINATE:
this.handleConnectionClose()
break
case GQL_START: {
case this.protocolMessageTypes.GQL_START: {
if (this.isReady) {
this.handleGQLStart(data).catch(e => {
this.sendMessage(GQL_ERROR, id, e.message)
this.sendMessage(this.protocolMessageTypes.GQL_ERROR, id, e.message)
})
} else {
this.sendMessage(GQL_CONNECTION_ERROR, undefined, { message: 'Connection has not been established yet.' })
this.sendMessage(this.protocolMessageTypes.GQL_CONNECTION_ERROR, undefined, { message: 'Connection has not been established yet.' })
return this.handleConnectionClose()
}
break
}
case GQL_STOP:
case this.protocolMessageTypes.GQL_STOP:
await this.handleGQLStop(data)
break
default:
this.sendMessage(GQL_ERROR, id, 'Invalid payload type')
this.sendMessage(this.protocolMessageTypes.GQL_ERROR, id, 'Invalid payload type')
}
}

Expand All @@ -119,7 +108,7 @@ module.exports = class SubscriptionConnection {
this.fastify.log.error(e)
}
if (!authorize) {
this.sendMessage(GQL_CONNECTION_ERROR, undefined, { message: 'Forbidden' })
this.sendMessage(this.protocolMessageTypes.GQL_CONNECTION_ERROR, undefined, { message: 'Forbidden' })
return this.handleConnectionClose()
}

Expand All @@ -134,7 +123,7 @@ module.exports = class SubscriptionConnection {

this.context._connectionInit = data.payload

this.sendMessage(GQL_CONNECTION_ACK)
this.sendMessage(this.protocolMessageTypes.GQL_CONNECTION_ACK)
this.isReady = true
}

Expand Down Expand Up @@ -223,10 +212,10 @@ module.exports = class SubscriptionConnection {
return this.handleConnectionClose()
}
}
this.sendMessage(GQL_DATA, data.id, value)
this.sendMessage(this.protocolMessageTypes.GQL_DATA, data.id, value)
}

this.sendMessage(GQL_COMPLETE, data.id, null)
this.sendMessage(this.protocolMessageTypes.GQL_COMPLETE, data.id, null)
}

async handleGQLStop (data) {
Expand Down
46 changes: 34 additions & 12 deletions lib/subscription-protocol.js
Original file line number Diff line number Diff line change
@@ -1,15 +1,37 @@
'use strict'

module.exports.GQL_CONNECTION_INIT = 'connection_init' // Client -> Server
module.exports.GQL_CONNECTION_ACK = 'connection_ack' // Server -> Client
module.exports.GQL_CONNECTION_ERROR = 'connection_error' // Server -> Client
module.exports.GQL_CONNECTION_KEEP_ALIVE = 'ka' // Server -> Client
const GRAPHQL_WS = 'graphql-ws'
module.exports.GRAPHQL_WS = GRAPHQL_WS

module.exports.GQL_CONNECTION_TERMINATE = 'connection_terminate' // Client -> Server
module.exports.GQL_START = 'start' // Client -> Server
module.exports.GQL_DATA = 'data' // Server -> Client
module.exports.GQL_ERROR = 'error' // Server -> Client
module.exports.GQL_COMPLETE = 'complete' // Server -> Client
module.exports.GQL_STOP = 'stop' // Client -> Server

module.exports.GRAPHQL_WS = 'graphql-ws'
module.exports.getProtocolByName = function (name) {
switch (true) {
case (name.indexOf('graphql-transport-ws') !== -1):
return {
GQL_CONNECTION_INIT: 'connection_init', // Client -> Server
GQL_CONNECTION_ACK: 'connection_ack', // Server -> Client
GQL_CONNECTION_ERROR: 'connection_error', // Server -> Client
GQL_CONNECTION_KEEP_ALIVE: 'ka', // Server -> Client
GQL_CONNECTION_TERMINATE: 'connection_terminate', // Client -> Server
GQL_START: 'subscribe', // Client -> Server
GQL_DATA: 'next', // Server -> Client
GQL_ERROR: 'error', // Server -> Client
GQL_COMPLETE: 'complete', // Server -> Client
GQL_STOP: 'stop' // Client -> Server
}
case (name.indexOf(GRAPHQL_WS) !== -1):
return {
GQL_CONNECTION_INIT: 'connection_init', // Client -> Server
GQL_CONNECTION_ACK: 'connection_ack', // Server -> Client
GQL_CONNECTION_ERROR: 'connection_error', // Server -> Client
GQL_CONNECTION_KEEP_ALIVE: 'ka', // Server -> Client
GQL_CONNECTION_TERMINATE: 'connection_terminate', // Client -> Server
GQL_START: 'start', // Client -> Server
GQL_DATA: 'data', // Server -> Client
GQL_ERROR: 'error', // Server -> Client
GQL_COMPLETE: 'complete', // Server -> Client
GQL_STOP: 'stop' // Client -> Server
}
default:
return null
}
}
6 changes: 3 additions & 3 deletions lib/subscription.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@ const fastifyWebsocket = require('fastify-websocket')
const { assignLifeCycleHooksToContext, Hooks } = require('./hooks')
const { kHooks } = require('./symbols')
const SubscriptionConnection = require('./subscription-connection')
const GRAPHQL_WS = 'graphql-ws'
const { getProtocolByName } = require('./subscription-protocol')

function createConnectionHandler ({ subscriber, fastify, onConnect, onDisconnect, lruGatewayResolvers, entityResolversFactory, subscriptionContextFn }) {
return async (connection, request) => {
const { socket } = connection
if (socket.protocol === undefined ||
(socket.protocol.indexOf(GRAPHQL_WS) === -1)) {

if (socket.protocol === undefined || getProtocolByName(socket.protocol) === null) {
// Close the connection with an error code, ws v2 ensures that the
// connection is cleaned up even when the closing handshake fails.
// 1002: protocol error
Expand Down
4 changes: 2 additions & 2 deletions static/main.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/* global React:false ReactDOM:false GraphiQL:false SubscriptionsTransportWs: false */
/* global React:false ReactDOM:false GraphiQL:false */

const importer = {
url: (url) => {
Expand Down Expand Up @@ -26,7 +26,7 @@ function render () {

const fetcher = GraphiQL.createFetcher({
url,
legacyClient: new SubscriptionsTransportWs.SubscriptionClient(subscriptionUrl)
subscriptionUrl
})

ReactDOM.render(
Expand Down
44 changes: 44 additions & 0 deletions test/subscription-client.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,50 @@ const FakeTimers = require('@sinonjs/fake-timers')
const SubscriptionClient = require('../lib/subscription-client')
const WS = require('ws')

test('subscription client initialization fails when a not supported protocol is in the options', (t) => {
t.plan(1)
t.throws(() => new SubscriptionClient('ws://localhost:1234', {
protocols: ['unsupported-protocol'],
serviceName: 'test-service'
}), 'Invalid options: unsupported-protocol is not a valid gateway subscription protocol')
})

test('subscription client calls the publish method with the correct payload', (t) => {
const server = new WS.Server({ port: 0 })
const port = server.address().port

server.on('connection', function connection (ws) {
ws.on('message', function incoming (message, isBinary) {
const data = JSON.parse(isBinary ? message : message.toString())
if (data.type === 'connection_init') {
ws.send(JSON.stringify({ id: undefined, type: 'connection_ack' }))
} else if (data.type === 'subscribe') {
ws.send(JSON.stringify({ id: '1', type: 'next', payload: { data: { foo: 'bar' } } }))
}
})
})

const client = new SubscriptionClient(`ws://localhost:${port}`, {
reconnect: true,
maxReconnectAttempts: 10,
serviceName: 'test-service',
protocols: ['graphql-transport-ws'],
connectionCallback: () => {
client.createSubscription('query', {}, (data) => {
t.same(data, {
topic: 'test-service_1',
payload: {
foo: 'bar'
}
})
client.close()
server.close()
t.end()
})
}
})
})

test('subscription client calls the publish method with the correct payload', (t) => {
const server = new WS.Server({ port: 0 })
const port = server.address().port
Expand Down
Loading

0 comments on commit d4fda97

Please sign in to comment.