From a1b66e51ff2ba0c79ac8f231de381dc072c8e519 Mon Sep 17 00:00:00 2001 From: Sameer Srivastava Date: Thu, 3 Feb 2022 16:41:47 +0530 Subject: [PATCH] Retry failed services (#706) * chore: retry failed services * chore: only retry mandatory services * chore: pr fixes * chore: fix tests * chore: add test for onGatewayReplaceSchemaHandler * chore: fix lint issue * chore: fix tests * chore: improve tests, update docs * chore: fix tests * chore: no longer a breaking change * chore: fix ts tests * chore: fix tests * chore: fix tests * chore: fix types * chore: wait time increased * chore: retry count changed * chore: failed service names added --- docs/api/options.md | 2 + index.d.ts | 2 +- index.js | 43 ++ lib/errors.js | 5 + lib/gateway.js | 25 +- lib/gateway/service-map.js | 10 +- .../test/gateway/remote-services.js.test.cjs | 4 + test/gateway/remote-services.js | 2 +- test/gateway/retry-failed-services.js | 514 ++++++++++++++++++ 9 files changed, 598 insertions(+), 9 deletions(-) create mode 100644 test/gateway/retry-failed-services.js diff --git a/docs/api/options.md b/docs/api/options.md index b2302298..10eea271 100644 --- a/docs/api/options.md +++ b/docs/api/options.md @@ -85,6 +85,8 @@ - `wsConnectionParams.failedConnectionCallback`: `Function` A function called after a `connection_error` message is received, the first argument contains the message payload. - `wsConnectionParams.failedReconnectCallback`: `Function` A function called if reconnect is enabled and maxReconnectAttempts is reached. - `wsConnectionParams.rewriteConnectionInitPayload`: `Function` A function that gets the original `connection_init` payload along with the context as a parameter and returns an object that replaces the original `connection_init` payload before forwarding it to the federated service + - `gateway.retryServicesCount`: `Number` Specifies the maximum number of retries when a service fails to start on gateway initialization. (Default: 10) + - `gateway.retryServicesInterval`: `Number` The amount of time(in milliseconds) between service retry attempts in case a service fails to start on gateway initialization. (Default: 3000) - `persistedQueries`: A hash/query map to resolve the full query text using it's unique hash. Overrides `persistedQueryProvider`. - `onlyPersisted`: Boolean. Flag to control whether to allow graphql queries other than persisted. When `true`, it'll make the server reject any queries that are not present in the `persistedQueries` option above. It will also disable any ide available (graphiql). Requires `persistedQueries` to be set, and overrides `persistedQueryProvider`. diff --git a/index.d.ts b/index.d.ts index dabe1a14..234cad91 100644 --- a/index.d.ts +++ b/index.d.ts @@ -227,7 +227,7 @@ interface ServiceConfig { } interface Gateway { - refresh: () => Promise; + refresh: (isRetry?: boolean) => Promise; serviceMap: Record; } diff --git a/index.js b/index.js index 0e4d11f7..56c17f9d 100644 --- a/index.js +++ b/index.js @@ -175,9 +175,49 @@ const plugin = fp(async function (app, opts) { } let entityResolversFactory + let gatewayRetryIntervalTimer = null + const retryServicesCount = gateway && gateway.retryServicesCount ? gateway.retryServicesCount : 10 + + const retryServices = (interval) => { + let retryCount = 0 + let isRetry = true + + return setInterval(async () => { + try { + if (retryCount === retryServicesCount) { + clearInterval(gatewayRetryIntervalTimer) + isRetry = false + } + retryCount++ + + const context = assignApplicationLifecycleHooksToContext({}, fastifyGraphQl[kHooks]) + const schema = await gateway.refresh(isRetry) + if (schema !== null) { + clearInterval(gatewayRetryIntervalTimer) + // Trigger onGatewayReplaceSchema hook + if (context.onGatewayReplaceSchema !== null) { + await onGatewayReplaceSchemaHandler(context, { instance: app, schema }) + } + fastifyGraphQl.replaceSchema(schema) + } + } catch (error) { + app.log.error(error) + } + }, interval) + } + if (gateway) { + const retryInterval = gateway.retryServicesInterval || 3000 gateway = await buildGateway(gateway, app) + const serviceMap = Object.values(gateway.serviceMap) + const failedMandatoryServices = serviceMap.filter(service => !!service.error && service.mandatory) + + if (failedMandatoryServices.length) { + gatewayRetryIntervalTimer = retryServices(retryInterval) + gatewayRetryIntervalTimer.unref() + } + schema = gateway.schema entityResolversFactory = gateway.entityResolversFactory @@ -210,6 +250,9 @@ const plugin = fp(async function (app, opts) { if (gatewayInterval !== null) { clearInterval(gatewayInterval) } + if (gatewayRetryIntervalTimer !== null) { + clearInterval(gatewayRetryIntervalTimer) + } setImmediate(next) }) diff --git a/lib/errors.js b/lib/errors.js index b7888a0b..cdf7c686 100644 --- a/lib/errors.js +++ b/lib/errors.js @@ -224,6 +224,11 @@ const errors = { 'MER_ERR_HOOK_UNSUPPORTED_HOOK', '%s hook not supported!', 500 + ), + MER_ERR_SERVICE_RETRY_FAILED: createError( + 'MER_ERR_SERVICE_RETRY_FAILED', + 'Mandatory services retry failed - [%s]', + 500 ) } diff --git a/lib/gateway.js b/lib/gateway.js index 68469d04..e6d4fbc9 100644 --- a/lib/gateway.js +++ b/lib/gateway.js @@ -16,7 +16,7 @@ const { createEntityReferenceResolverOperation, kEntityResolvers } = require('./gateway/make-resolver') -const { MER_ERR_GQL_GATEWAY_REFRESH, MER_ERR_GQL_GATEWAY_INIT } = require('./errors') +const { MER_ERR_GQL_GATEWAY_REFRESH, MER_ERR_GQL_GATEWAY_INIT, MER_ERR_SERVICE_RETRY_FAILED } = require('./errors') const findValueTypes = require('./gateway/find-value-types') const getQueryResult = require('./gateway/get-query-result') const allSettled = require('promise.allsettled') @@ -257,7 +257,7 @@ function defaultErrorHandler (error, service) { async function buildGateway (gatewayOpts, app) { const { services, errorHandler = defaultErrorHandler } = gatewayOpts - const serviceMap = await buildServiceMap(services, errorHandler) + const serviceMap = await buildServiceMap(services, errorHandler, app.log) const serviceSDLs = Object.entries(serviceMap).reduce((acc, [name, value]) => { const { schemaDefinition, error } = value @@ -379,7 +379,8 @@ async function buildGateway (gatewayOpts, app) { serviceMap, entityResolversFactory: factory, pollingInterval: gatewayOpts.pollingInterval, - async refresh () { + async refresh (isRetry) { + const failedMandatoryServices = [] if (this._serviceSDLs === undefined) { this._serviceSDLs = serviceSDLs.join('') } @@ -387,11 +388,27 @@ async function buildGateway (gatewayOpts, app) { const $refreshResult = await allSettled( Object.values(serviceMap).map((service) => service.refresh().catch((err) => { - errorHandler(err, service) + // If non-mandatory service or if retry count has exceeded for mandatory service then throw + if (!service.mandatory || !isRetry) { + errorHandler(err, service) + } + + // If service is mandatory and retry count has not exceeded then add to service to + // failedMandatoryServices so it can be returned for retrying + if (service.mandatory) { + failedMandatoryServices.push(service) + } }) ) ) + if (failedMandatoryServices.length > 0) { + const serviceNames = failedMandatoryServices.map(service => service.name) + const err = new MER_ERR_SERVICE_RETRY_FAILED(serviceNames.join(', ')) + err.failedServices = serviceNames + throw err + } + const rejectedResults = $refreshResult .filter(({ status }) => status === 'rejected') .map(({ reason }) => reason) diff --git a/lib/gateway/service-map.js b/lib/gateway/service-map.js index 82000455..7a2ee18a 100644 --- a/lib/gateway/service-map.js +++ b/lib/gateway/service-map.js @@ -6,7 +6,8 @@ const { isTypeDefinitionNode, parse, buildSchema, - GraphQLSchema + GraphQLSchema, + GraphQLError } = require('graphql') const { buildRequest, sendRequest } = require('./request') @@ -109,7 +110,7 @@ function safeBuildSchema (schemaDefinition) { } } -async function buildServiceMap (services, errorHandler) { +async function buildServiceMap (services, errorHandler, log) { const serviceMap = {} for (const service of services) { @@ -201,7 +202,10 @@ async function buildServiceMap (services, errorHandler) { serviceConfig = await serviceMap[service.name].init() } catch (err) { serviceConfigErr = err - errorHandler(err, service) + if (!service.mandatory || err instanceof GraphQLError) { + log.warn(`Initializing service "${service.name}" failed with message: "${err.message}"`) + errorHandler(err, service) + } } if (serviceConfig) { diff --git a/tap-snapshots/test/gateway/remote-services.js.test.cjs b/tap-snapshots/test/gateway/remote-services.js.test.cjs index f5365fa2..4237fc17 100644 --- a/tap-snapshots/test/gateway/remote-services.js.test.cjs +++ b/tap-snapshots/test/gateway/remote-services.js.test.cjs @@ -8,3 +8,7 @@ exports['test/gateway/remote-services.js TAP Does not error if at least one service schema is valid > must match snapshot 1'] = ` Initializing service "not-working" failed with message: "Unknown type "World"." ` + +exports['test/gateway/remote-services.js TAP Does not error if at least one service schema is valid > must match snapshot 2'] = ` +Initializing service "not-working" failed with message: "Unknown type "World"." +` diff --git a/test/gateway/remote-services.js b/test/gateway/remote-services.js index 6491e07a..969475c4 100644 --- a/test/gateway/remote-services.js +++ b/test/gateway/remote-services.js @@ -120,5 +120,5 @@ test('Does not error if at least one service schema is valid', async (t) => { } catch (err) { t.error(err) } - t.equal(warnCalled, 1, 'Warning is called') + t.equal(warnCalled, 2, 'Warning is called') }) diff --git a/test/gateway/retry-failed-services.js b/test/gateway/retry-failed-services.js new file mode 100644 index 00000000..74f1cce2 --- /dev/null +++ b/test/gateway/retry-failed-services.js @@ -0,0 +1,514 @@ +'use strict' + +const { test } = require('tap') +const Fastify = require('fastify') +const { GraphQLSchema } = require('graphql') +const GQL = require('../..') +const FakeTimers = require('@sinonjs/fake-timers') + +async function createTestService (port, schema, resolvers = {}) { + const service = Fastify() + service.register(GQL, { + schema, + resolvers, + federationMetadata: true + }) + await service.listen(port) + return service +} + +const users = { + u1: { + id: 'u1', + name: 'John' + }, + u2: { + id: 'u2', + name: 'Jane' + } +} + +const posts = { + p1: { + pid: 'p1', + title: 'Post 1', + content: 'Content 1', + authorId: 'u1' + }, + p2: { + pid: 'p2', + title: 'Post 2', + content: 'Content 2', + authorId: 'u2' + }, + p3: { + pid: 'p3', + title: 'Post 3', + content: 'Content 3', + authorId: 'u1' + }, + p4: { + pid: 'p4', + title: 'Post 4', + content: 'Content 4', + authorId: 'u1' + } +} + +const userService = { + schema: ` + extend type Query { + me: User + } + + type User @key(fields: "id") { + id: ID! + name: String! + } + `, + resolvers: { + Query: { + me: () => { + return users.u1 + } + }, + User: { + __resolveReference: user => { + return users[user.id] + } + } + } +} + +const postService = { + schema: ` + type Post @key(fields: "pid") { + pid: ID! + title: String + content: String + author: User @requires(fields: "pid title") + } + + type User @key(fields: "id") @extends { + id: ID! @external + name: String @external + posts(count: Int): [Post] + } +`, + resolvers: { + Post: { + author: post => { + return { + __typename: 'User', + id: post.authorId + } + } + }, + User: { + posts: (user, { count }) => { + return Object.values(posts).filter(p => p.authorId === user.id).slice(0, count) + } + } + } +} + +test('gateway - retry mandatory failed services on startup', async (t) => { + t.plan(5) + const clock = FakeTimers.install({ + shouldAdvanceTime: true, + advanceTimeDelta: 100 + }) + + const service1 = await createTestService(5001, userService.schema, userService.resolvers) + + let service2 = null + setTimeout(async () => { + service2 = await createTestService(5002, postService.schema, postService.resolvers) + }, 5000) + + const app = Fastify() + t.teardown(async () => { + await app.close() + await service1.close() + await service2.close() + clock.uninstall() + }) + + await app.register(GQL, { + jit: 1, + gateway: { + services: [ + { + name: 'user', + url: 'http://localhost:5001/graphql', + mandatory: false + }, + { + name: 'post', + url: 'http://localhost:5002/graphql', + mandatory: true + } + ] + } + }) + + app.graphql.addHook('onGatewayReplaceSchema', async (instance, schema) => { + t.type(instance, 'object') + t.type(schema, GraphQLSchema) + t.ok('should be called') + }) + + await app.ready() + + const query = ` + query { + user: me { + id + name + posts(count: 1) { + pid + } + } + }` + + const res = await app.inject({ + method: 'POST', + headers: { 'content-type': 'application/json' }, + url: '/graphql', + body: JSON.stringify({ query }) + }) + + t.same(JSON.parse(res.body), { + errors: [ + { + message: 'Cannot query field "posts" on type "User".', + locations: [{ line: 6, column: 9 }] + } + ], + data: null + }) + + for (let i = 0; i < 10; i++) { + await clock.tickAsync(2000) + } + + const res1 = await app.inject({ + method: 'POST', + headers: { 'content-type': 'application/json' }, + url: '/graphql', + body: JSON.stringify({ query }) + }) + + t.same(JSON.parse(res1.body), { + data: { + user: { + id: 'u1', + name: 'John', + posts: [ + { + pid: 'p1' + } + ] + } + } + }) +}) + +test('gateway - should not call onGatewayReplaceSchemaHandler if the hook is not specified', async (t) => { + t.plan(2) + const clock = FakeTimers.install({ + shouldAdvanceTime: true, + advanceTimeDelta: 100 + }) + + const service1 = await createTestService(5001, userService.schema, userService.resolvers) + + let service2 = null + setTimeout(async () => { + service2 = await createTestService(5002, postService.schema, postService.resolvers) + }, 5000) + + const app = Fastify() + t.teardown(async () => { + await app.close() + await service1.close() + await service2.close() + clock.uninstall() + }) + + await app.register(GQL, { + jit: 1, + gateway: { + services: [ + { + name: 'user', + url: 'http://localhost:5001/graphql', + mandatory: false + }, + { + name: 'post', + url: 'http://localhost:5002/graphql', + mandatory: true + } + ], + retryServicesCount: 10, + retryServicesInterval: 2000 + } + }) + + await app.ready() + + const query = ` + query { + user: me { + id + name + posts(count: 1) { + pid + } + } + }` + + const res = await app.inject({ + method: 'POST', + headers: { 'content-type': 'application/json' }, + url: '/graphql', + body: JSON.stringify({ query }) + }) + + t.same(JSON.parse(res.body), { + errors: [ + { + message: 'Cannot query field "posts" on type "User".', + locations: [{ line: 6, column: 9 }] + } + ], + data: null + }) + + for (let i = 0; i < 10; i++) { + await clock.tickAsync(1000) + } + + const res1 = await app.inject({ + method: 'POST', + headers: { 'content-type': 'application/json' }, + url: '/graphql', + body: JSON.stringify({ query }) + }) + + t.same(JSON.parse(res1.body), { + data: { + user: { + id: 'u1', + name: 'John', + posts: [ + { + pid: 'p1' + } + ] + } + } + }) +}) + +test('gateway - dont retry non-mandatory failed services on startup', async (t) => { + t.plan(2) + const clock = FakeTimers.install({ + shouldAdvanceTime: true, + advanceTimeDelta: 100 + }) + + const service1 = await createTestService(5001, userService.schema, userService.resolvers) + + const app = Fastify() + t.teardown(async () => { + await app.close() + await service1.close() + clock.uninstall() + }) + + app.register(GQL, { + jit: 1, + gateway: { + services: [ + { + name: 'user', + url: 'http://localhost:5001/graphql', + mandatory: false + }, + { + name: 'post', + url: 'http://localhost:5002/graphql', + mandatory: false + } + ], + pollingInterval: 2000 + } + }) + + await app.ready() + + const query = ` + query { + user: me { + id + name + posts(count: 1) { + pid + } + } + }` + + const res = await app.inject({ + method: 'POST', + headers: { 'content-type': 'application/json' }, + url: '/graphql', + body: JSON.stringify({ query }) + }) + + t.same(JSON.parse(res.body), { + errors: [ + { + message: 'Cannot query field "posts" on type "User".', + locations: [{ line: 6, column: 9 }] + } + ], + data: null + }) + + for (let i = 0; i < 10; i++) { + await clock.tickAsync(1500) + } + + const res1 = await app.inject({ + method: 'POST', + headers: { 'content-type': 'application/json' }, + url: '/graphql', + body: JSON.stringify({ query }) + }) + + t.same(JSON.parse(res1.body), { + errors: [ + { + message: 'Cannot query field "posts" on type "User".', + locations: [{ line: 6, column: 9 }] + } + ], + data: null + }) +}) + +test('gateway - should log error if retry throws', async (t) => { + t.plan(1) + const clock = FakeTimers.install({ + shouldAdvanceTime: true, + advanceTimeDelta: 100 + }) + + const service1 = await createTestService(5001, userService.schema, userService.resolvers) + + let service2 = null + setTimeout(async () => { + service2 = await createTestService(5002, postService.schema, postService.resolvers) + }, 2000) + + const app = Fastify() + + let errCount = 0 + app.log.error = (error) => { + if (error.message.includes('kaboom') && errCount === 0) { + errCount++ + t.pass() + } + } + + t.teardown(async () => { + await app.close() + await service1.close() + await service2.close() + clock.uninstall() + }) + + await app.register(GQL, { + jit: 1, + gateway: { + services: [ + { + name: 'user', + url: 'http://localhost:5001/graphql', + mandatory: false + }, + { + name: 'post', + url: 'http://localhost:5002/graphql', + mandatory: true + } + ], + retryServicesCount: 1, + retryServicesInterval: 2000 + } + }) + + app.graphql.addHook('onGatewayReplaceSchema', async () => { + throw new Error('kaboom') + }) + + await app.ready() + + for (let i = 0; i < 10; i++) { + await clock.tickAsync(1000) + } +}) + +test('gateway - stop retrying after no. of retries exceeded', async (t) => { + t.plan(1) + const clock = FakeTimers.install({ + shouldAdvanceTime: true, + advanceTimeDelta: 100 + }) + + const service1 = await createTestService(5001, userService.schema, userService.resolvers) + + const app = Fastify() + + let errCount = 0 + app.log.error = (error) => { + if (error.code === 'MER_ERR_GQL_GATEWAY_REFRESH' && errCount === 0) { + errCount++ + t.pass() + } + } + + t.teardown(async () => { + await app.close() + await service1.close() + clock.uninstall() + }) + + await app.register(GQL, { + jit: 1, + gateway: { + services: [ + { + name: 'user', + url: 'http://localhost:5001/graphql', + mandatory: false + }, + { + name: 'post', + url: 'http://localhost:5002/graphql', + mandatory: true + } + ], + retryServicesCount: 1, + retryServicesInterval: 2000 + } + }) + + await app.ready() + + for (let i = 0; i < 10; i++) { + await clock.tickAsync(1500) + } +})