diff --git a/.changeset/cuddly-lemons-dance.md b/.changeset/cuddly-lemons-dance.md new file mode 100644 index 00000000000..482da74c89e --- /dev/null +++ b/.changeset/cuddly-lemons-dance.md @@ -0,0 +1,23 @@ +--- +'@apollo/server': minor +--- + +Introduce new `ApolloServerPluginSubscriptionCallback` plugin. This plugin implements the [subscription callback protocol](https://github.com/apollographql/router/blob/dev/dev-docs/callback_protocol.md) which is used by Apollo Router. This feature implements subscriptions over HTTP via a callback URL which Apollo Router registers with Apollo Server. This feature is currently in preview and is subject to change. + +You can enable callback subscriptions like so: +```ts +import { ApolloServerPluginSubscriptionCallback } from '@apollo/server/plugin/subscriptionCallback'; +import { ApolloServer } from '@apollo/server'; + +const server = new ApolloServer({ + // ... + plugins: [ + ApolloServerPluginSubscriptionCallback(), + ], +}); +``` + +Note that there is currently no tracing or metrics mechanism in place for callback subscriptions. Additionally, this plugin "intercepts" callback subscription requests and bypasses some of Apollo Server's internals. The result of this is that certain plugin hooks (notably `executionDidStart` and `willResolveField`) will not be called when handling callback subscription requests or when sending subscription events. + +For more information on the subscription callback protocol, visit the docs: +https://www.apollographql.com/docs/router/executing-operations/subscription-callback-protocol/ diff --git a/package-lock.json b/package-lock.json index f100663a0ef..982f0438f1d 100644 --- a/package-lock.json +++ b/package-lock.json @@ -55,6 +55,7 @@ "eslint-plugin-import": "2.27.5", "express": "4.18.2", "graphql": "16.7.1", + "graphql-subscriptions": "2.0.0", "graphql-tag": "2.12.6", "jest": "29.6.1", "jest-config": "29.6.1", @@ -8571,6 +8572,18 @@ "url": "https://github.com/sponsors/jaydenseric" } }, + "node_modules/graphql-subscriptions": { + "version": "2.0.0", + "resolved": "https://registry.npmjs.org/graphql-subscriptions/-/graphql-subscriptions-2.0.0.tgz", + "integrity": "sha512-s6k2b8mmt9gF9pEfkxsaO1lTxaySfKoEJzEfmwguBbQ//Oq23hIXCfR1hm4kdh5hnR20RdwB+s3BCb+0duHSZA==", + "dev": true, + "dependencies": { + "iterall": "^1.3.0" + }, + "peerDependencies": { + "graphql": "^15.7.2 || ^16.0.0" + } + }, "node_modules/graphql-tag": { "version": "2.12.6", "license": "MIT", @@ -9586,6 +9599,12 @@ "node": ">=8" } }, + "node_modules/iterall": { + "version": "1.3.0", + "resolved": "https://registry.npmjs.org/iterall/-/iterall-1.3.0.tgz", + "integrity": "sha512-QZ9qOMdF+QLHxy1QIpUHUU1D5pS2CG2P69LF6L6CPjPYA/XMOmKV3PZpawHoAjHNyB0swdVTRxdYT4tbBbxqwg==", + "dev": true + }, "node_modules/jest": { "version": "29.6.1", "resolved": "https://registry.npmjs.org/jest/-/jest-29.6.1.tgz", @@ -21036,6 +21055,15 @@ } } }, + "graphql-subscriptions": { + "version": "2.0.0", + "resolved": "https://registry.npmjs.org/graphql-subscriptions/-/graphql-subscriptions-2.0.0.tgz", + "integrity": "sha512-s6k2b8mmt9gF9pEfkxsaO1lTxaySfKoEJzEfmwguBbQ//Oq23hIXCfR1hm4kdh5hnR20RdwB+s3BCb+0duHSZA==", + "dev": true, + "requires": { + "iterall": "^1.3.0" + } + }, "graphql-tag": { "version": "2.12.6", "requires": { @@ -21719,6 +21747,12 @@ "istanbul-lib-report": "^3.0.0" } }, + "iterall": { + "version": "1.3.0", + "resolved": "https://registry.npmjs.org/iterall/-/iterall-1.3.0.tgz", + "integrity": "sha512-QZ9qOMdF+QLHxy1QIpUHUU1D5pS2CG2P69LF6L6CPjPYA/XMOmKV3PZpawHoAjHNyB0swdVTRxdYT4tbBbxqwg==", + "dev": true + }, "jest": { "version": "29.6.1", "resolved": "https://registry.npmjs.org/jest/-/jest-29.6.1.tgz", diff --git a/package.json b/package.json index 96589b9bbb8..5967e61bb57 100644 --- a/package.json +++ b/package.json @@ -83,6 +83,7 @@ "eslint-plugin-import": "2.27.5", "express": "4.18.2", "graphql": "16.7.1", + "graphql-subscriptions": "2.0.0", "graphql-tag": "2.12.6", "jest": "29.6.1", "jest-config": "29.6.1", diff --git a/packages/server/package.json b/packages/server/package.json index e0ca565d36b..0d3bb76882f 100644 --- a/packages/server/package.json +++ b/packages/server/package.json @@ -87,6 +87,14 @@ "import": "./dist/esm/plugin/schemaReporting/index.js", "require": "./dist/cjs/plugin/schemaReporting/index.js" }, + "./plugin/subscriptionCallback": { + "types": { + "require": "./dist/cjs/plugin/subscriptionCallback/index.d.ts", + "default": "./dist/esm/plugin/subscriptionCallback/index.d.ts" + }, + "import": "./dist/esm/plugin/subscriptionCallback/index.js", + "require": "./dist/cjs/plugin/subscriptionCallback/index.js" + }, "./plugin/usageReporting": { "types": { "require": "./dist/cjs/plugin/usageReporting/index.d.ts", diff --git a/packages/server/plugin/subscriptionCallback/package.json b/packages/server/plugin/subscriptionCallback/package.json new file mode 100644 index 00000000000..7892a9c38b0 --- /dev/null +++ b/packages/server/plugin/subscriptionCallback/package.json @@ -0,0 +1,8 @@ +{ + "name": "@apollo/server/plugin/subscriptionCallback", + "type": "module", + "main": "../../dist/cjs/plugin/subscriptionCallback/index.js", + "module": "../../dist/esm/plugin/subscriptionCallback/index.js", + "types": "../../dist/esm/plugin/subscriptionCallback/index.d.ts", + "sideEffects": false +} diff --git a/packages/server/src/__tests__/plugin/subscriptionCallback/index.test.ts b/packages/server/src/__tests__/plugin/subscriptionCallback/index.test.ts new file mode 100644 index 00000000000..8d1d00e6f6f --- /dev/null +++ b/packages/server/src/__tests__/plugin/subscriptionCallback/index.test.ts @@ -0,0 +1,1808 @@ +import { + ApolloServer, + ApolloServerOptionsWithTypeDefs, + BaseContext, + HeaderMap, +} from '@apollo/server'; +import { ApolloServerPluginSubscriptionCallback } from '@apollo/server/plugin/subscriptionCallback'; +import { Logger } from '@apollo/utils.logger'; +import { + afterAll, + afterEach, + beforeAll, + beforeEach, + describe, + expect, + it, + jest, +} from '@jest/globals'; +import assert from 'assert'; +import { PubSub } from 'graphql-subscriptions'; +import nock from 'nock'; +import { nockAfterEach, nockBeforeEach } from '../../nockAssertions'; + +describe('SubscriptionCallbackPlugin', () => { + let logger: Logger & { orderOfOperations: string[] }; + beforeEach(() => { + logger = orderOfOperationsLogger(); + nockBeforeEach(); + }); + + afterEach(nockAfterEach); + + beforeAll(() => { + // This explicitly mocks only `setInterval` and `clearInterval` for the + // heartbeat. + jest.useFakeTimers({ + doNotFake: [ + 'Date', + 'hrtime', + 'nextTick', + 'performance', + 'queueMicrotask', + 'requestAnimationFrame', + 'cancelAnimationFrame', + 'requestIdleCallback', + 'cancelIdleCallback', + 'setImmediate', + 'clearImmediate', + 'setTimeout', + 'clearTimeout', + ], + }); + }); + + afterAll(() => { + jest.useRealTimers(); + }); + + it('simple happy path', async () => { + const server = await startSubscriptionServer({ logger }); + + // Mock the initial check response from the router. + mockRouterCheckResponse(); + + // Start the subscription; this triggers the initial check request and + // starts the heartbeat interval. This simulates an incoming subscription + // request from the router. + const result = await server.executeOperation({ + query: `#graphql + subscription { + count + } + `, + extensions: { + subscription: { + callback_url: 'http://mock-router-url.com', + subscription_id: '1234-cats', + verifier: 'my-verifier-token', + }, + }, + }); + + expect(result.http.status).toEqual(200); + + // Mock the heartbeat response from the router. We'll trigger it once below + // after the subscription is initialized to make sure it works. + const firstHeartbeat = mockRouterHeartbeatResponse(); + // Advance timers to trigger the heartbeat once. This consumes the one + // heartbeat mock from above. + jest.advanceTimersByTime(5000); + await firstHeartbeat; + + // Next we'll trigger some subscription events. In advance, we'll mock the 2 + // router responses. + const updates = Promise.all([ + mockRouterNextResponse({ payload: { count: 1 } }), + mockRouterNextResponse({ + payload: { count: 2 }, + }), + ]); + + // Trigger a couple updates. These send `next` requests to the router. + logger.debug('TESTING: Triggering first update'); + await server.executeOperation({ + query: `#graphql + mutation { + addOne + } + `, + }); + + logger.debug('TESTING: Triggering second update'); + await server.executeOperation({ + query: `#graphql + mutation { + addOne + } + `, + }); + await updates; + + // When we shutdown the server, we'll stop listening for subscription + // updates, await unresolved requests, and send a `complete` request to the + // router for each active subscription. + const completeRequest = mockRouterCompleteResponse(); + + await server.stop(); + await completeRequest; + + // The heartbeat should be cleaned up at this point. There is no second + // heartbeat mock, so if it ticks again it'll throw an error. + jest.advanceTimersByTime(5000); + + expect(logger.orderOfOperations).toMatchInlineSnapshot(` + [ + "SubscriptionCallback[1234-cats]: Received new subscription request", + "SubscriptionManager[1234-cats]: Sending \`check\` request to router", + "SubscriptionManager[1234-cats]: \`check\` request successful", + "SubscriptionCallback[1234-cats]: Starting graphql-js subscription", + "SubscriptionCallback[1234-cats]: graphql-js subscription successful", + "SubscriptionManager[1234-cats]: Starting new heartbeat interval for http://mock-router-url.com", + "SubscriptionManager[1234-cats]: Listening to graphql-js subscription", + "SubscriptionCallback[1234-cats]: Responding to original subscription request", + "SubscriptionManager: Sending \`heartbeat\` request to http://mock-router-url.com for IDs: [1234-cats]", + "SubscriptionManager: Heartbeat received response for IDs: [1234-cats]", + "SubscriptionManager: Heartbeat request successful, IDs: [1234-cats]", + "TESTING: Triggering first update", + "SubscriptionManager[1234-cats]: Sending \`next\` request to router", + "TESTING: Triggering second update", + "SubscriptionManager[1234-cats]: \`next\` request successful", + "SubscriptionManager[1234-cats]: Sending \`next\` request to router", + "SubscriptionManager[1234-cats]: \`next\` request successful", + "SubscriptionCallback: Server is shutting down. Cleaning up outstanding subscriptions and heartbeat intervals", + "SubscriptionManager[1234-cats]: Sending \`complete\` request to router", + "SubscriptionManager[1234-cats]: \`complete\` request successful", + "SubscriptionManager: Terminating subscriptions for IDs: [1234-cats]", + "SubscriptionManager: Terminating heartbeat interval, no more subscriptions for http://mock-router-url.com", + "SubscriptionCallback: Successfully cleaned up outstanding subscriptions and heartbeat intervals.", + ] + `); + }); + + it('handles multiple callback urls', async () => { + const server = await startSubscriptionServer({ logger }); + const router2 = { + url: 'http://mock-router-url-2.com', + id: '5678-dogs', + verifier: 'another-verifier-token', + }; + // Mock the initial check response from the 2 callback urls. + mockRouterCheckResponse(); + mockRouterCheckResponse({ requestOpts: router2 }); + + // Mock the heartbeat response from the routers. We'll trigger it once below + // after the subscriptions are initialized to make sure it works. + const heartbeats = Promise.all([ + mockRouterHeartbeatResponse(), + mockRouterHeartbeatResponse({ requestOptions: router2 }), + ]); + + // Start the subscriptions. + const router1Result = await server.executeOperation({ + query: `#graphql + subscription { + count + } + `, + extensions: { + subscription: { + callback_url: 'http://mock-router-url.com', + subscription_id: '1234-cats', + verifier: 'my-verifier-token', + }, + }, + }); + const router2Result = await server.executeOperation({ + query: `#graphql + subscription { + count + } + `, + extensions: { + subscription: { + callback_url: router2.url, + subscription_id: router2.id, + verifier: router2.verifier, + }, + }, + }); + expect(router1Result.http.status).toEqual(200); + expect(router2Result.http.status).toEqual(200); + + // Advance timers to trigger the heartbeat once. This consumes the + // heartbeat mocks from above (one per router). + jest.advanceTimersByTime(5000); + await heartbeats; + + // Next we'll trigger some subscription events. In advance, we'll mock the 2 + // router responses. + const firstUpdate = Promise.all([ + mockRouterNextResponse({ payload: { count: 1 } }), + mockRouterNextResponse({ payload: { count: 1 }, ...router2 }), + ]); + const secondUpdate = Promise.all([ + mockRouterNextResponse({ payload: { count: 2 } }), + mockRouterNextResponse({ payload: { count: 2 }, ...router2 }), + ]); + + // Trigger a couple updates. These send `next` requests to the router. + logger.debug('TESTING: Triggering first update'); + await server.executeOperation({ + query: `#graphql + mutation { + addOne + } + `, + }); + await firstUpdate; + + logger.debug('TESTING: Triggering second update'); + await server.executeOperation({ + query: `#graphql + mutation { + addOne + } + `, + }); + await secondUpdate; + + // When we shutdown the server, we'll stop listening for subscription + // updates, await unresolved requests, and send a `complete` request to the + // router for each active subscription. + const completeRequests = Promise.all([ + mockRouterCompleteResponse(), + mockRouterCompleteResponse(router2), + ]); + + await server.stop(); + await completeRequests; + + // The heartbeat should be cleaned up at this point. There is no second + // heartbeat mock, so if it ticks again it'll throw an error. + jest.advanceTimersByTime(5000); + + expect(logger.orderOfOperations).toMatchInlineSnapshot(` + [ + "SubscriptionCallback[1234-cats]: Received new subscription request", + "SubscriptionManager[1234-cats]: Sending \`check\` request to router", + "SubscriptionManager[1234-cats]: \`check\` request successful", + "SubscriptionCallback[1234-cats]: Starting graphql-js subscription", + "SubscriptionCallback[1234-cats]: graphql-js subscription successful", + "SubscriptionManager[1234-cats]: Starting new heartbeat interval for http://mock-router-url.com", + "SubscriptionManager[1234-cats]: Listening to graphql-js subscription", + "SubscriptionCallback[1234-cats]: Responding to original subscription request", + "SubscriptionCallback[5678-dogs]: Received new subscription request", + "SubscriptionManager[5678-dogs]: Sending \`check\` request to router", + "SubscriptionManager[5678-dogs]: \`check\` request successful", + "SubscriptionCallback[5678-dogs]: Starting graphql-js subscription", + "SubscriptionCallback[5678-dogs]: graphql-js subscription successful", + "SubscriptionManager[5678-dogs]: Starting new heartbeat interval for http://mock-router-url-2.com", + "SubscriptionManager[5678-dogs]: Listening to graphql-js subscription", + "SubscriptionCallback[5678-dogs]: Responding to original subscription request", + "SubscriptionManager: Sending \`heartbeat\` request to http://mock-router-url.com for IDs: [1234-cats]", + "SubscriptionManager: Sending \`heartbeat\` request to http://mock-router-url-2.com for IDs: [5678-dogs]", + "SubscriptionManager: Heartbeat received response for IDs: [1234-cats]", + "SubscriptionManager: Heartbeat request successful, IDs: [1234-cats]", + "SubscriptionManager: Heartbeat received response for IDs: [5678-dogs]", + "SubscriptionManager: Heartbeat request successful, IDs: [5678-dogs]", + "TESTING: Triggering first update", + "SubscriptionManager[1234-cats]: Sending \`next\` request to router", + "SubscriptionManager[5678-dogs]: Sending \`next\` request to router", + "SubscriptionManager[1234-cats]: \`next\` request successful", + "SubscriptionManager[5678-dogs]: \`next\` request successful", + "TESTING: Triggering second update", + "SubscriptionManager[1234-cats]: Sending \`next\` request to router", + "SubscriptionManager[5678-dogs]: Sending \`next\` request to router", + "SubscriptionManager[1234-cats]: \`next\` request successful", + "SubscriptionManager[5678-dogs]: \`next\` request successful", + "SubscriptionCallback: Server is shutting down. Cleaning up outstanding subscriptions and heartbeat intervals", + "SubscriptionManager[1234-cats]: Sending \`complete\` request to router", + "SubscriptionManager[5678-dogs]: Sending \`complete\` request to router", + "SubscriptionManager[1234-cats]: \`complete\` request successful", + "SubscriptionManager: Terminating subscriptions for IDs: [1234-cats]", + "SubscriptionManager: Terminating heartbeat interval, no more subscriptions for http://mock-router-url.com", + "SubscriptionManager[5678-dogs]: \`complete\` request successful", + "SubscriptionManager: Terminating subscriptions for IDs: [5678-dogs]", + "SubscriptionManager: Terminating heartbeat interval, no more subscriptions for http://mock-router-url-2.com", + "SubscriptionCallback: Successfully cleaned up outstanding subscriptions and heartbeat intervals.", + ] + `); + }); + + it('updates id/verifier on heartbeat response with 400 + invalid IDs', async () => { + const server = await startSubscriptionServer({ logger }); + const secondSubscription = { + id: '5678-dogs', + verifier: 'another-verifier-token', + }; + + mockRouterCheckResponse(); + mockRouterCheckResponse({ requestOpts: secondSubscription }); + + // Start the subscriptions; this triggers the initial check request and + // starts the heartbeat interval. This simulates an incoming subscription + // request from the router. + const firstResult = await server.executeOperation({ + query: `#graphql + subscription { + count + } + `, + extensions: { + subscription: { + callback_url: 'http://mock-router-url.com', + subscription_id: '1234-cats', + verifier: 'my-verifier-token', + }, + }, + }); + + expect(firstResult.http.status).toEqual(200); + + const secondResult = await server.executeOperation({ + query: `#graphql + subscription { + count + } + `, + extensions: { + subscription: { + callback_url: 'http://mock-router-url.com', + subscription_id: secondSubscription.id, + verifier: secondSubscription.verifier, + }, + }, + }); + + expect(secondResult.http.status).toEqual(200); + + // Trigger a heartbeat once to make sure it's working + const firstHeartbeat = mockRouterHeartbeatResponse({ + requestOptions: { + ids: ['1234-cats', secondSubscription.id], + }, + }); + jest.advanceTimersByTime(5000); + await firstHeartbeat; + + // Next we'll trigger some subscription events. In advance, we'll mock the 2 + // router responses. + const firstUpdate = Promise.all([ + mockRouterNextResponse({ payload: { count: 1 } }), + mockRouterNextResponse({ payload: { count: 1 }, ...secondSubscription }), + ]); + + // This promise is a testing detail - we need to wait for the second update + // to fully resolve or else the server will stop and clean everything up + // before the mock is consumed. It resolves when the mock replies. + const secondUpdate = Promise.all([ + mockRouterNextResponse({ payload: { count: 2 } }), + mockRouterNextResponse({ payload: { count: 2 }, ...secondSubscription }), + ]); + + // Trigger a couple updates. These send `next` requests to the router. + logger.debug('TESTING: Triggering first update'); + await server.executeOperation({ + query: `#graphql + mutation { + addOne + } + `, + }); + await firstUpdate; + + logger.debug('TESTING: Triggering second update'); + await server.executeOperation({ + query: `#graphql + mutation { + addOne + } + `, + }); + await secondUpdate; + + // We've established two subscriptions are functional at this point. Now + // let's have the router invalidate one with a 400 heartbeat. + const heartbeatWithInvalidIds = mockRouterHeartbeatResponse({ + requestOptions: { + id: '1234-cats', + ids: ['1234-cats', secondSubscription.id], + verifier: 'my-verifier-token', + }, + statusCode: 400, + responseBody: { + id: 'updated-subscription-id', + ids: ['1234-cats'], + invalid_ids: [secondSubscription.id], + verifier: 'updated-verifier-token', + }, + }); + + jest.advanceTimersByTime(5000); + await heartbeatWithInvalidIds; + + // A subsequent heartbeat should use the updated id/verifier, and the second + // subscription should be excluded. + const updatedHeartbeat = mockRouterHeartbeatResponse({ + requestOptions: { + id: 'updated-subscription-id', + ids: ['1234-cats'], + verifier: 'updated-verifier-token', + }, + }); + + jest.advanceTimersByTime(5000); + await updatedHeartbeat; + + const thirdUpdate = mockRouterNextResponse({ + payload: { count: 3 }, + }); + // Trigger a 3rd update to make sure the second subscription is + // excluded. + logger.debug('TESTING: Triggering third update'); + await server.executeOperation({ + query: `#graphql + mutation { + addOne + } + `, + }); + + await thirdUpdate; + // When we shutdown the server, we'll stop listening for subscription + // updates, await unresolved requests, and send a `complete` request to the + // router for each active subscription. + mockRouterCompleteResponse(); + + await server.stop(); + + // The heartbeat should be cleaned up at this point. There is no additional + // heartbeat mock, so if it ticks again it'll throw an error. + jest.advanceTimersByTime(5000); + + expect(logger.orderOfOperations).toMatchInlineSnapshot(` + [ + "SubscriptionCallback[1234-cats]: Received new subscription request", + "SubscriptionManager[1234-cats]: Sending \`check\` request to router", + "SubscriptionManager[1234-cats]: \`check\` request successful", + "SubscriptionCallback[1234-cats]: Starting graphql-js subscription", + "SubscriptionCallback[1234-cats]: graphql-js subscription successful", + "SubscriptionManager[1234-cats]: Starting new heartbeat interval for http://mock-router-url.com", + "SubscriptionManager[1234-cats]: Listening to graphql-js subscription", + "SubscriptionCallback[1234-cats]: Responding to original subscription request", + "SubscriptionCallback[5678-dogs]: Received new subscription request", + "SubscriptionManager[5678-dogs]: Sending \`check\` request to router", + "SubscriptionManager[5678-dogs]: \`check\` request successful", + "SubscriptionCallback[5678-dogs]: Starting graphql-js subscription", + "SubscriptionCallback[5678-dogs]: graphql-js subscription successful", + "SubscriptionManager[5678-dogs]: Heartbeat interval already exists for http://mock-router-url.com, reusing existing interval", + "SubscriptionManager[5678-dogs]: Listening to graphql-js subscription", + "SubscriptionCallback[5678-dogs]: Responding to original subscription request", + "SubscriptionManager: Sending \`heartbeat\` request to http://mock-router-url.com for IDs: [1234-cats,5678-dogs]", + "SubscriptionManager: Heartbeat received response for IDs: [1234-cats,5678-dogs]", + "SubscriptionManager: Heartbeat request successful, IDs: [1234-cats,5678-dogs]", + "TESTING: Triggering first update", + "SubscriptionManager[1234-cats]: Sending \`next\` request to router", + "SubscriptionManager[5678-dogs]: Sending \`next\` request to router", + "SubscriptionManager[1234-cats]: \`next\` request successful", + "SubscriptionManager[5678-dogs]: \`next\` request successful", + "TESTING: Triggering second update", + "SubscriptionManager[1234-cats]: Sending \`next\` request to router", + "SubscriptionManager[5678-dogs]: Sending \`next\` request to router", + "SubscriptionManager[1234-cats]: \`next\` request successful", + "SubscriptionManager[5678-dogs]: \`next\` request successful", + "SubscriptionManager: Sending \`heartbeat\` request to http://mock-router-url.com for IDs: [1234-cats,5678-dogs]", + "SubscriptionManager: Heartbeat received response for IDs: [1234-cats,5678-dogs]", + "SubscriptionManager: Heartbeat request received invalid IDs: [5678-dogs]", + "SubscriptionManager: Terminating subscriptions for IDs: [5678-dogs]", + "SubscriptionManager: Sending \`heartbeat\` request to http://mock-router-url.com for IDs: [1234-cats]", + "SubscriptionManager: Heartbeat received response for IDs: [1234-cats]", + "SubscriptionManager: Heartbeat request successful, IDs: [1234-cats]", + "TESTING: Triggering third update", + "SubscriptionManager[1234-cats]: Sending \`next\` request to router", + "SubscriptionManager[5678-dogs]: Subscription already cancelled, ignoring current and future payloads", + "SubscriptionManager[1234-cats]: \`next\` request successful", + "SubscriptionCallback: Server is shutting down. Cleaning up outstanding subscriptions and heartbeat intervals", + "SubscriptionManager[1234-cats]: Sending \`complete\` request to router", + "SubscriptionManager[1234-cats]: \`complete\` request successful", + "SubscriptionManager: Terminating subscriptions for IDs: [1234-cats]", + "SubscriptionManager: Terminating heartbeat interval, no more subscriptions for http://mock-router-url.com", + "SubscriptionCallback: Successfully cleaned up outstanding subscriptions and heartbeat intervals.", + ] + `); + }); + + it('cancels heartbeat on 404 (all IDs invalid)', async () => { + const server = await startSubscriptionServer({ logger }); + const secondSubscription = { + id: '5678-dogs', + verifier: 'another-verifier-token', + }; + + mockRouterCheckResponse(); + mockRouterCheckResponse({ requestOpts: secondSubscription }); + // Mock the heartbeat response from the router. We'll trigger it once below + // after the subscription is initialized to make sure it works. + const firstHeartbeat = mockRouterHeartbeatResponse({ + requestOptions: { + ids: ['1234-cats', secondSubscription.id], + }, + }); + + // Start the subscriptions; this triggers the initial check request and + // starts the heartbeat interval. This simulates an incoming subscription + // request from the router. + const firstResult = await server.executeOperation({ + query: `#graphql + subscription { + count + } + `, + extensions: { + subscription: { + callback_url: 'http://mock-router-url.com', + subscription_id: '1234-cats', + verifier: 'my-verifier-token', + }, + }, + }); + + expect(firstResult.http.status).toEqual(200); + + const secondResult = await server.executeOperation({ + query: `#graphql + subscription { + count + } + `, + extensions: { + subscription: { + callback_url: 'http://mock-router-url.com', + subscription_id: secondSubscription.id, + verifier: secondSubscription.verifier, + }, + }, + }); + + expect(secondResult.http.status).toEqual(200); + + // Advance timers to trigger the heartbeat. This consumes the one + // heartbeat mock from above. + jest.advanceTimersByTime(5000); + await firstHeartbeat; + + // Next we'll trigger some subscription events. In advance, we'll mock the + // router responses. + const firstUpdate = Promise.all([ + mockRouterNextResponse({ payload: { count: 1 } }), + mockRouterNextResponse({ payload: { count: 1 }, ...secondSubscription }), + ]); + + const secondUpdate = Promise.all([ + mockRouterNextResponse({ payload: { count: 2 } }), + mockRouterNextResponse({ payload: { count: 2 }, ...secondSubscription }), + ]); + + // Trigger a couple updates. These send `next` requests to the router. + logger.debug('TESTING: Triggering first update'); + await server.executeOperation({ + query: `#graphql + mutation { + addOne + } + `, + }); + + await firstUpdate; + + logger.debug('TESTING: Triggering second update'); + await server.executeOperation({ + query: `#graphql + mutation { + addOne + } + `, + }); + + await secondUpdate; + + // We've established two subscriptions are functional at this point. Now + // let's have the router invalidate them with a 404 heartbeat. + const secondHeartbeat = mockRouterHeartbeatResponse({ + requestOptions: { + id: '1234-cats', + ids: ['1234-cats', secondSubscription.id], + verifier: 'my-verifier-token', + }, + statusCode: 404, + }); + + jest.advanceTimersByTime(5000); + await secondHeartbeat; + + // Trigger a 3rd update to make sure both subscriptions are cancelled. + logger.debug('TESTING: Triggering third update'); + await server.executeOperation({ + query: `#graphql + mutation { + addOne + } + `, + }); + + // The heartbeat should be cleaned up at this point. There is no remaining + // heartbeat mock, so if it ticks again it'll throw an error. + jest.advanceTimersByTime(5000); + + await server.stop(); + + expect(logger.orderOfOperations).toMatchInlineSnapshot(` + [ + "SubscriptionCallback[1234-cats]: Received new subscription request", + "SubscriptionManager[1234-cats]: Sending \`check\` request to router", + "SubscriptionManager[1234-cats]: \`check\` request successful", + "SubscriptionCallback[1234-cats]: Starting graphql-js subscription", + "SubscriptionCallback[1234-cats]: graphql-js subscription successful", + "SubscriptionManager[1234-cats]: Starting new heartbeat interval for http://mock-router-url.com", + "SubscriptionManager[1234-cats]: Listening to graphql-js subscription", + "SubscriptionCallback[1234-cats]: Responding to original subscription request", + "SubscriptionCallback[5678-dogs]: Received new subscription request", + "SubscriptionManager[5678-dogs]: Sending \`check\` request to router", + "SubscriptionManager[5678-dogs]: \`check\` request successful", + "SubscriptionCallback[5678-dogs]: Starting graphql-js subscription", + "SubscriptionCallback[5678-dogs]: graphql-js subscription successful", + "SubscriptionManager[5678-dogs]: Heartbeat interval already exists for http://mock-router-url.com, reusing existing interval", + "SubscriptionManager[5678-dogs]: Listening to graphql-js subscription", + "SubscriptionCallback[5678-dogs]: Responding to original subscription request", + "SubscriptionManager: Sending \`heartbeat\` request to http://mock-router-url.com for IDs: [1234-cats,5678-dogs]", + "SubscriptionManager: Heartbeat received response for IDs: [1234-cats,5678-dogs]", + "SubscriptionManager: Heartbeat request successful, IDs: [1234-cats,5678-dogs]", + "TESTING: Triggering first update", + "SubscriptionManager[1234-cats]: Sending \`next\` request to router", + "SubscriptionManager[5678-dogs]: Sending \`next\` request to router", + "SubscriptionManager[1234-cats]: \`next\` request successful", + "SubscriptionManager[5678-dogs]: \`next\` request successful", + "TESTING: Triggering second update", + "SubscriptionManager[1234-cats]: Sending \`next\` request to router", + "SubscriptionManager[5678-dogs]: Sending \`next\` request to router", + "SubscriptionManager[1234-cats]: \`next\` request successful", + "SubscriptionManager[5678-dogs]: \`next\` request successful", + "SubscriptionManager: Sending \`heartbeat\` request to http://mock-router-url.com for IDs: [1234-cats,5678-dogs]", + "SubscriptionManager: Heartbeat received response for IDs: [1234-cats,5678-dogs]", + "SubscriptionManager: Heartbeat request received invalid IDs: [1234-cats,5678-dogs]", + "SubscriptionManager: Terminating subscriptions for IDs: [1234-cats,5678-dogs]", + "SubscriptionManager: Terminating heartbeat interval, no more subscriptions for http://mock-router-url.com", + "TESTING: Triggering third update", + "SubscriptionManager[1234-cats]: Subscription already cancelled, ignoring current and future payloads", + "SubscriptionManager[5678-dogs]: Subscription already cancelled, ignoring current and future payloads", + "SubscriptionCallback: Server is shutting down. Cleaning up outstanding subscriptions and heartbeat intervals", + "SubscriptionCallback: Successfully cleaned up outstanding subscriptions and heartbeat intervals.", + ] + `); + }); + + it('sends a `complete` when a subscription terminates successfully', async () => { + const server = await startSubscriptionServer({ logger }); + + // Mock the initial check response from the router. + mockRouterCheckResponse(); + // Mock the first response from the router in response to the first + // subscription event / update + mockRouterNextResponse({ + payload: { terminatesSuccessfully: true }, + }); + // The subscription completes after the first update, so it should fire a + // `complete` request to the router. + const completeRequest = mockRouterCompleteResponse(); + + // Start the subscription; this triggers the initial check request and + // starts the heartbeat interval. This simulates an incoming subscription + // request from the router. + const result = await server.executeOperation({ + query: `#graphql + subscription { + terminatesSuccessfully + } + `, + extensions: { + subscription: { + callback_url: 'http://mock-router-url.com', + subscription_id: '1234-cats', + verifier: 'my-verifier-token', + }, + }, + }); + // The response to the router's initial request should be status 200 + expect(result.http.status).toEqual(200); + + await completeRequest; + + // The heartbeat should be cleaned up at this point. There is no remaining + // heartbeat mock, so if it ticks again it'll throw an error. + jest.advanceTimersByTime(5000); + + // When we shutdown the server, we'll stop listening for subscription + // updates, await unresolved requests, and send a `complete` request to the + // router for each active subscription (in this case they've already + // completed themselves and cleaned up before this is called). + await server.stop(); + + expect(logger.orderOfOperations).toMatchInlineSnapshot(` + [ + "SubscriptionCallback[1234-cats]: Received new subscription request", + "SubscriptionManager[1234-cats]: Sending \`check\` request to router", + "SubscriptionManager[1234-cats]: \`check\` request successful", + "SubscriptionCallback[1234-cats]: Starting graphql-js subscription", + "SubscriptionCallback[1234-cats]: graphql-js subscription successful", + "SubscriptionManager[1234-cats]: Starting new heartbeat interval for http://mock-router-url.com", + "SubscriptionManager[1234-cats]: Listening to graphql-js subscription", + "SubscriptionCallback[1234-cats]: Responding to original subscription request", + "SubscriptionManager[1234-cats]: Sending \`next\` request to router", + "SubscriptionManager[1234-cats]: \`next\` request successful", + "SubscriptionManager[1234-cats]: Subscription completed without errors", + "SubscriptionManager[1234-cats]: Sending \`complete\` request to router", + "SubscriptionManager[1234-cats]: \`complete\` request successful", + "SubscriptionManager: Terminating subscriptions for IDs: [1234-cats]", + "SubscriptionManager: Terminating heartbeat interval, no more subscriptions for http://mock-router-url.com", + "SubscriptionCallback: Server is shutting down. Cleaning up outstanding subscriptions and heartbeat intervals", + "SubscriptionCallback: Successfully cleaned up outstanding subscriptions and heartbeat intervals.", + ] + `); + }); + + describe('error handling', () => { + it('encounters errors on initial `check`', async () => { + const server = await startSubscriptionServer({ logger }); + + // Mock the failed check response from the router. + mockRouterCheckResponse({ + statusCode: 400, + responseBody: 'Invalid subscription ID provided', + }); + + // This triggers the check request which will fail. + const result = await server.executeHTTPGraphQLRequest({ + httpGraphQLRequest: { + body: { + query: `#graphql + subscription { + count + } + `, + extensions: { + subscription: { + callback_url: 'http://mock-router-url.com', + subscription_id: '1234-cats', + verifier: 'my-verifier-token', + }, + }, + }, + headers: new HeaderMap([['content-type', 'application/json']]), + method: 'POST', + search: '', + }, + context: async () => ({}), + }); + + expect(result.status).toEqual(500); + assert(result.body.kind === 'complete'); + expect(JSON.parse(result.body.string)).toMatchInlineSnapshot(` + { + "data": null, + "errors": [ + { + "message": "\`check\` request failed with unexpected status code: 400, terminating subscription", + }, + ], + } + `); + + // Trigger the heartbeat interval just to make sure it doesn't actually + // happen in this case (we haven't mocked it, so if it fires it will + // trigger an error and fail the test). + jest.advanceTimersByTime(5000); + + await server.stop(); + + expect(logger.orderOfOperations).toMatchInlineSnapshot(` + [ + "SubscriptionCallback[1234-cats]: Received new subscription request", + "SubscriptionManager[1234-cats]: Sending \`check\` request to router", + "SubscriptionManager[1234-cats]: \`check\` request failed with unexpected status code: 400, terminating subscription", + "SubscriptionManager: Terminating subscriptions for IDs: [1234-cats]", + "ERROR: SubscriptionManager: No subscriptions found for http://mock-router-url.com, skipping termination", + "ERROR: SubscriptionCallback[1234-cats]: \`check\` request failed: \`check\` request failed with unexpected status code: 400, terminating subscription", + "SubscriptionCallback: Server is shutting down. Cleaning up outstanding subscriptions and heartbeat intervals", + "SubscriptionCallback: Successfully cleaned up outstanding subscriptions and heartbeat intervals.", + ] + `); + }); + + it('encounters errors on subscription', async () => { + const server = await startSubscriptionServer({ logger }); + + // Mock the initial check response. + mockRouterCheckResponse(); + + const completeRequest = mockRouterCompleteResponse({ + errors: [ + { + message: + 'The subscription field "invalidSubscriptionField" is not defined.', + locations: [{ line: 3, column: 15 }], + }, + ], + }); + + // Trigger an invalid subscription + const response = await server.executeHTTPGraphQLRequest({ + httpGraphQLRequest: { + body: { + query: `#graphql + subscription { + invalidSubscriptionField + } + `, + extensions: { + subscription: { + callback_url: 'http://mock-router-url.com', + subscription_id: '1234-cats', + verifier: 'my-verifier-token', + }, + }, + }, + headers: new HeaderMap([['content-type', 'application/json']]), + method: 'POST', + search: '', + }, + context: async () => ({}), + }); + expect(response.status).toEqual(400); + assert(response.body.kind === 'complete'); + expect(JSON.parse(response.body.string)).toEqual({ + errors: [ + { + message: + 'Cannot query field "invalidSubscriptionField" on type "Subscription".', + locations: [{ line: 3, column: 15 }], + extensions: { + code: 'GRAPHQL_VALIDATION_FAILED', + }, + }, + ], + }); + + // Trigger the heartbeat interval just to make sure it doesn't actually + // happen in this case (we haven't mocked it, so it'll throw an error if it + // sends a heartbeat). + jest.advanceTimersByTime(5000); + + await completeRequest; + await server.stop(); + expect(logger.orderOfOperations).toMatchInlineSnapshot(` + [ + "SubscriptionManager[1234-cats]: Sending \`check\` request to router", + "SubscriptionManager[1234-cats]: \`check\` request successful", + "SubscriptionCallback[1234-cats]: Starting graphql-js subscription", + "ERROR: SubscriptionCallback[1234-cats]: graphql-js subscription unsuccessful: [ + The subscription field "invalidSubscriptionField" is not defined. + ]", + "SubscriptionManager[1234-cats]: Sending \`complete\` request to router with errors", + "SubscriptionCallback[1234-cats]: Responding to original subscription request", + "SubscriptionManager[1234-cats]: \`complete\` request successful", + "SubscriptionCallback: Server is shutting down. Cleaning up outstanding subscriptions and heartbeat intervals", + "SubscriptionCallback: Successfully cleaned up outstanding subscriptions and heartbeat intervals.", + ] + `); + }); + + it('handles failed heartbeats', async () => { + const server = await startSubscriptionServer({ logger }); + + // Mock the initial check response from the router. + mockRouterCheckResponse(); + + // Start the subscription; this triggers the initial check request and + // starts the heartbeat interval. This simulates an incoming subscription + // request from the router. + const result = await server.executeOperation({ + query: `#graphql + subscription { + count + } + `, + extensions: { + subscription: { + callback_url: 'http://mock-router-url.com', + subscription_id: '1234-cats', + verifier: 'my-verifier-token', + }, + }, + }); + + expect(result.http.status).toEqual(200); + + // 5 failures is the limit before the heartbeat is cancelled. We expect to + // see 5 errors and then a final error indicating the heartbeat was + // cancelled in the log snapshot below. + for (let i = 0; i < 5; i++) { + // mock heartbeat response failure + nock('http://mock-router-url.com') + .matchHeader('content-type', 'application/json') + .post('/', { + kind: 'subscription', + action: 'heartbeat', + id: '1234-cats', + verifier: 'my-verifier-token', + ids: ['1234-cats'], + }) + .replyWithError('network request error'); + // trigger heartbeat + jest.advanceTimersByTime(5000); + } + + await server.stop(); + + expect(logger.orderOfOperations).toMatchInlineSnapshot(` + [ + "SubscriptionCallback[1234-cats]: Received new subscription request", + "SubscriptionManager[1234-cats]: Sending \`check\` request to router", + "SubscriptionManager[1234-cats]: \`check\` request successful", + "SubscriptionCallback[1234-cats]: Starting graphql-js subscription", + "SubscriptionCallback[1234-cats]: graphql-js subscription successful", + "SubscriptionManager[1234-cats]: Starting new heartbeat interval for http://mock-router-url.com", + "SubscriptionManager[1234-cats]: Listening to graphql-js subscription", + "SubscriptionCallback[1234-cats]: Responding to original subscription request", + "SubscriptionManager: Sending \`heartbeat\` request to http://mock-router-url.com for IDs: [1234-cats]", + "SubscriptionCallback: Server is shutting down. Cleaning up outstanding subscriptions and heartbeat intervals", + "ERROR: SubscriptionManager[1234-cats]: Heartbeat request failed (1 consecutive): request to http://mock-router-url.com/ failed, reason: network request error", + "SubscriptionManager: Sending \`heartbeat\` request to http://mock-router-url.com for IDs: [1234-cats]", + "ERROR: SubscriptionManager[1234-cats]: Heartbeat request failed (2 consecutive): request to http://mock-router-url.com/ failed, reason: network request error", + "SubscriptionManager: Sending \`heartbeat\` request to http://mock-router-url.com for IDs: [1234-cats]", + "ERROR: SubscriptionManager[1234-cats]: Heartbeat request failed (3 consecutive): request to http://mock-router-url.com/ failed, reason: network request error", + "SubscriptionManager: Sending \`heartbeat\` request to http://mock-router-url.com for IDs: [1234-cats]", + "ERROR: SubscriptionManager[1234-cats]: Heartbeat request failed (4 consecutive): request to http://mock-router-url.com/ failed, reason: network request error", + "SubscriptionManager: Sending \`heartbeat\` request to http://mock-router-url.com for IDs: [1234-cats]", + "ERROR: SubscriptionManager[1234-cats]: Heartbeat request failed (5 consecutive): request to http://mock-router-url.com/ failed, reason: network request error", + "ERROR: SubscriptionManager[1234-cats]: Heartbeat request failed 5 times, terminating subscriptions and heartbeat interval: request to http://mock-router-url.com/ failed, reason: network request error", + "SubscriptionManager: Terminating subscriptions for IDs: [1234-cats]", + "SubscriptionManager: Terminating heartbeat interval, no more subscriptions for http://mock-router-url.com", + "SubscriptionCallback: Successfully cleaned up outstanding subscriptions and heartbeat intervals.", + ] + `); + }); + + it('handles failed heartbeats with unexpected status codes', async () => { + const server = await startSubscriptionServer({ logger }); + + // Mock the initial check response from the router. + mockRouterCheckResponse(); + + // Start the subscription; this triggers the initial check request and + // starts the heartbeat interval. This simulates an incoming subscription + // request from the router. + const result = await server.executeOperation({ + query: `#graphql + subscription { + count + } + `, + extensions: { + subscription: { + callback_url: 'http://mock-router-url.com', + subscription_id: '1234-cats', + verifier: 'my-verifier-token', + }, + }, + }); + + expect(result.http.status).toEqual(200); + + // 5 failures is the limit before the heartbeat is cancelled. We expect to + // see 5 errors and then a final error indicating the heartbeat was + // cancelled in the log snapshot below. + for (let i = 0; i < 5; i++) { + // mock heartbeat response failure + nock('http://mock-router-url.com') + .matchHeader('content-type', 'application/json') + .post('/', { + kind: 'subscription', + action: 'heartbeat', + id: '1234-cats', + verifier: 'my-verifier-token', + ids: ['1234-cats'], + }) + .reply(500); + // trigger heartbeat + jest.advanceTimersByTime(5000); + } + + await server.stop(); + + expect(logger.orderOfOperations).toMatchInlineSnapshot(` + [ + "SubscriptionCallback[1234-cats]: Received new subscription request", + "SubscriptionManager[1234-cats]: Sending \`check\` request to router", + "SubscriptionManager[1234-cats]: \`check\` request successful", + "SubscriptionCallback[1234-cats]: Starting graphql-js subscription", + "SubscriptionCallback[1234-cats]: graphql-js subscription successful", + "SubscriptionManager[1234-cats]: Starting new heartbeat interval for http://mock-router-url.com", + "SubscriptionManager[1234-cats]: Listening to graphql-js subscription", + "SubscriptionCallback[1234-cats]: Responding to original subscription request", + "SubscriptionManager: Sending \`heartbeat\` request to http://mock-router-url.com for IDs: [1234-cats]", + "SubscriptionCallback: Server is shutting down. Cleaning up outstanding subscriptions and heartbeat intervals", + "SubscriptionManager: Heartbeat received response for IDs: [1234-cats]", + "ERROR: SubscriptionManager[1234-cats]: Heartbeat request failed (1 consecutive): Unexpected status code: 500", + "SubscriptionManager: Sending \`heartbeat\` request to http://mock-router-url.com for IDs: [1234-cats]", + "SubscriptionManager: Heartbeat received response for IDs: [1234-cats]", + "ERROR: SubscriptionManager[1234-cats]: Heartbeat request failed (2 consecutive): Unexpected status code: 500", + "SubscriptionManager: Sending \`heartbeat\` request to http://mock-router-url.com for IDs: [1234-cats]", + "SubscriptionManager: Heartbeat received response for IDs: [1234-cats]", + "ERROR: SubscriptionManager[1234-cats]: Heartbeat request failed (3 consecutive): Unexpected status code: 500", + "SubscriptionManager: Sending \`heartbeat\` request to http://mock-router-url.com for IDs: [1234-cats]", + "SubscriptionManager: Heartbeat received response for IDs: [1234-cats]", + "ERROR: SubscriptionManager[1234-cats]: Heartbeat request failed (4 consecutive): Unexpected status code: 500", + "SubscriptionManager: Sending \`heartbeat\` request to http://mock-router-url.com for IDs: [1234-cats]", + "SubscriptionManager: Heartbeat received response for IDs: [1234-cats]", + "ERROR: SubscriptionManager[1234-cats]: Heartbeat request failed (5 consecutive): Unexpected status code: 500", + "ERROR: SubscriptionManager[1234-cats]: Heartbeat request failed 5 times, terminating subscriptions and heartbeat interval: Unexpected status code: 500", + "SubscriptionManager: Terminating subscriptions for IDs: [1234-cats]", + "SubscriptionManager: Terminating heartbeat interval, no more subscriptions for http://mock-router-url.com", + "SubscriptionCallback: Successfully cleaned up outstanding subscriptions and heartbeat intervals.", + ] + `); + }); + + describe('retries', () => { + it('failed `check` requests', async () => { + const server = await startSubscriptionServer({ logger }); + + // Mock the initial check response from the router. We'll fail a couple + // first to test the retry logic. + mockRouterCheckResponseWithError(); + mockRouterCheckResponseWithError(); + mockRouterCheckResponse(); + + // Start the subscription; this triggers the initial check request and + // starts the heartbeat interval. This simulates an incoming subscription + // request from the router. + const result = await server.executeOperation({ + query: `#graphql + subscription { + count + } + `, + extensions: { + subscription: { + callback_url: 'http://mock-router-url.com', + subscription_id: '1234-cats', + verifier: 'my-verifier-token', + }, + }, + }); + + expect(result.http.status).toEqual(200); + + // Mock the heartbeat response from the router. We'll trigger it once below + // after the subscription is initialized to make sure it works. + const firstHeartbeat = mockRouterHeartbeatResponse(); + // Advance timers to trigger the heartbeat once. This consumes the one + // heartbeat mock from above. + jest.advanceTimersByTime(5000); + await firstHeartbeat; + + // Mock the update from the router, we'll trigger it below + const update = mockRouterNextResponse({ payload: { count: 1 } }); + + // Trigger a couple updates. These send `next` requests to the router. + logger.debug('TESTING: Triggering first update'); + await server.executeOperation({ + query: `#graphql + mutation { + addOne + } + `, + }); + + await update; + + // When we shutdown the server, we'll stop listening for subscription + // updates, await unresolved requests, and send a `complete` request to the + // router for each active subscription. + const completeRequest = mockRouterCompleteResponse(); + + await server.stop(); + await completeRequest; + + // The heartbeat should be cleaned up at this point. There is no second + // heartbeat mock, so if it ticks again it'll throw an error. + jest.advanceTimersByTime(5000); + + expect(logger.orderOfOperations).toMatchInlineSnapshot(` + [ + "SubscriptionCallback[1234-cats]: Received new subscription request", + "SubscriptionManager[1234-cats]: Sending \`check\` request to router", + "WARN: SubscriptionManager[1234-cats]: Retrying \`check\` request (attempt 1) due to error: request to http://mock-router-url.com/ failed, reason: network request error", + "WARN: SubscriptionManager[1234-cats]: Retrying \`check\` request (attempt 2) due to error: request to http://mock-router-url.com/ failed, reason: network request error", + "SubscriptionManager[1234-cats]: \`check\` request successful", + "SubscriptionCallback[1234-cats]: Starting graphql-js subscription", + "SubscriptionCallback[1234-cats]: graphql-js subscription successful", + "SubscriptionManager[1234-cats]: Starting new heartbeat interval for http://mock-router-url.com", + "SubscriptionManager[1234-cats]: Listening to graphql-js subscription", + "SubscriptionCallback[1234-cats]: Responding to original subscription request", + "SubscriptionManager: Sending \`heartbeat\` request to http://mock-router-url.com for IDs: [1234-cats]", + "SubscriptionManager: Heartbeat received response for IDs: [1234-cats]", + "SubscriptionManager: Heartbeat request successful, IDs: [1234-cats]", + "TESTING: Triggering first update", + "SubscriptionManager[1234-cats]: Sending \`next\` request to router", + "SubscriptionManager[1234-cats]: \`next\` request successful", + "SubscriptionCallback: Server is shutting down. Cleaning up outstanding subscriptions and heartbeat intervals", + "SubscriptionManager[1234-cats]: Sending \`complete\` request to router", + "SubscriptionManager[1234-cats]: \`complete\` request successful", + "SubscriptionManager: Terminating subscriptions for IDs: [1234-cats]", + "SubscriptionManager: Terminating heartbeat interval, no more subscriptions for http://mock-router-url.com", + "SubscriptionCallback: Successfully cleaned up outstanding subscriptions and heartbeat intervals.", + ] + `); + }); + + it('failed `next` requests', async () => { + const server = await startSubscriptionServer({ logger }); + + // Mock the initial check response from the router. + mockRouterCheckResponse(); + + // Start the subscription; this triggers the initial check request and + // starts the heartbeat interval. This simulates an incoming subscription + // request from the router. + const result = await server.executeOperation({ + query: `#graphql + subscription { + count + } + `, + extensions: { + subscription: { + callback_url: 'http://mock-router-url.com', + subscription_id: '1234-cats', + verifier: 'my-verifier-token', + }, + }, + }); + + expect(result.http.status).toEqual(200); + + // Mock the heartbeat response from the router. We'll trigger it once below + // after the subscription is initialized to make sure it works. + const firstHeartbeat = mockRouterHeartbeatResponse(); + // Advance timers to trigger the heartbeat once. This consumes the one + // heartbeat mock from above. + jest.advanceTimersByTime(5000); + await firstHeartbeat; + + // Next we'll trigger some subscription events. In advance, we'll mock the + // router responses. These responses will fail the first 3 times and + // succeed on the 4th. The retry logic is expected to handle this + // gracefully. + const updates = Promise.all([ + mockRouterNextResponse({ payload: { count: 1 }, statusCode: 500 }), + mockRouterNextResponse({ payload: { count: 1 }, statusCode: 500 }), + mockRouterNextResponse({ payload: { count: 1 }, statusCode: 500 }), + mockRouterNextResponse({ payload: { count: 1 } }), + mockRouterNextResponse({ payload: { count: 2 }, statusCode: 500 }), + mockRouterNextResponse({ payload: { count: 2 }, statusCode: 500 }), + mockRouterNextResponse({ payload: { count: 2 }, statusCode: 500 }), + mockRouterNextResponse({ payload: { count: 2 } }), + ]); + + // Trigger a couple updates. These send `next` requests to the router. + logger.debug('TESTING: Triggering first update'); + await server.executeOperation({ + query: `#graphql + mutation { + addOne + } + `, + }); + + logger.debug('TESTING: Triggering second update'); + await server.executeOperation({ + query: `#graphql + mutation { + addOne + } + `, + }); + + await updates; + + // When we shutdown the server, we'll stop listening for subscription + // updates, await unresolved requests, and send a `complete` request to the + // router for each active subscription. + const completeRequest = mockRouterCompleteResponse(); + + await server.stop(); + await completeRequest; + + // The heartbeat should be cleaned up at this point. There is no second + // heartbeat mock, so if it ticks again it'll throw an error. + jest.advanceTimersByTime(5000); + + expect(logger.orderOfOperations).toMatchInlineSnapshot(` + [ + "SubscriptionCallback[1234-cats]: Received new subscription request", + "SubscriptionManager[1234-cats]: Sending \`check\` request to router", + "SubscriptionManager[1234-cats]: \`check\` request successful", + "SubscriptionCallback[1234-cats]: Starting graphql-js subscription", + "SubscriptionCallback[1234-cats]: graphql-js subscription successful", + "SubscriptionManager[1234-cats]: Starting new heartbeat interval for http://mock-router-url.com", + "SubscriptionManager[1234-cats]: Listening to graphql-js subscription", + "SubscriptionCallback[1234-cats]: Responding to original subscription request", + "SubscriptionManager: Sending \`heartbeat\` request to http://mock-router-url.com for IDs: [1234-cats]", + "SubscriptionManager: Heartbeat received response for IDs: [1234-cats]", + "SubscriptionManager: Heartbeat request successful, IDs: [1234-cats]", + "TESTING: Triggering first update", + "SubscriptionManager[1234-cats]: Sending \`next\` request to router", + "TESTING: Triggering second update", + "WARN: SubscriptionManager[1234-cats]: Retrying \`next\` request (attempt 1) due to error: \`next\` request failed with unexpected status code: 500", + "WARN: SubscriptionManager[1234-cats]: Retrying \`next\` request (attempt 2) due to error: \`next\` request failed with unexpected status code: 500", + "WARN: SubscriptionManager[1234-cats]: Retrying \`next\` request (attempt 3) due to error: \`next\` request failed with unexpected status code: 500", + "SubscriptionManager[1234-cats]: \`next\` request successful", + "SubscriptionManager[1234-cats]: Sending \`next\` request to router", + "WARN: SubscriptionManager[1234-cats]: Retrying \`next\` request (attempt 1) due to error: \`next\` request failed with unexpected status code: 500", + "WARN: SubscriptionManager[1234-cats]: Retrying \`next\` request (attempt 2) due to error: \`next\` request failed with unexpected status code: 500", + "WARN: SubscriptionManager[1234-cats]: Retrying \`next\` request (attempt 3) due to error: \`next\` request failed with unexpected status code: 500", + "SubscriptionManager[1234-cats]: \`next\` request successful", + "SubscriptionCallback: Server is shutting down. Cleaning up outstanding subscriptions and heartbeat intervals", + "SubscriptionManager[1234-cats]: Sending \`complete\` request to router", + "SubscriptionManager[1234-cats]: \`complete\` request successful", + "SubscriptionManager: Terminating subscriptions for IDs: [1234-cats]", + "SubscriptionManager: Terminating heartbeat interval, no more subscriptions for http://mock-router-url.com", + "SubscriptionCallback: Successfully cleaned up outstanding subscriptions and heartbeat intervals.", + ] + `); + }); + + it('failed `complete` requests', async () => { + const server = await startSubscriptionServer({ logger }); + + // Mock the initial check response from the router. + mockRouterCheckResponse(); + + // Start the subscription; this triggers the initial check request and + // starts the heartbeat interval. This simulates an incoming subscription + // request from the router. + const result = await server.executeOperation({ + query: `#graphql + subscription { + count + } + `, + extensions: { + subscription: { + callback_url: 'http://mock-router-url.com', + subscription_id: '1234-cats', + verifier: 'my-verifier-token', + }, + }, + }); + + expect(result.http.status).toEqual(200); + + // Mock the heartbeat response from the router. We'll trigger it once below + // after the subscription is initialized to make sure it works. + const firstHeartbeat = mockRouterHeartbeatResponse(); + // Advance timers to trigger the heartbeat once. This consumes the one + // heartbeat mock from above. + jest.advanceTimersByTime(5000); + await firstHeartbeat; + + // Mock the response to the upcoming subscription update. + const update = mockRouterNextResponse({ payload: { count: 1 } }); + + // Trigger a couple updates. These send `next` requests to the router. + logger.debug('TESTING: Triggering first update'); + await server.executeOperation({ + query: `#graphql + mutation { + addOne + } + `, + }); + + await update; + + // The server will send a `complete` request when it shuts down. Here we + // test that the retry logic works for sending `complete` requests. + const completeRetries = Promise.all([ + mockRouterCompleteResponse({ statusCode: 500 }), + mockRouterCompleteResponse({ statusCode: 500 }), + mockRouterCompleteResponse(), + ]); + + await server.stop(); + await completeRetries; + + // The heartbeat should be cleaned up at this point. There is no second + // heartbeat mock, so if it ticks again it'll throw an error. + jest.advanceTimersByTime(5000); + + expect(logger.orderOfOperations).toMatchInlineSnapshot(` + [ + "SubscriptionCallback[1234-cats]: Received new subscription request", + "SubscriptionManager[1234-cats]: Sending \`check\` request to router", + "SubscriptionManager[1234-cats]: \`check\` request successful", + "SubscriptionCallback[1234-cats]: Starting graphql-js subscription", + "SubscriptionCallback[1234-cats]: graphql-js subscription successful", + "SubscriptionManager[1234-cats]: Starting new heartbeat interval for http://mock-router-url.com", + "SubscriptionManager[1234-cats]: Listening to graphql-js subscription", + "SubscriptionCallback[1234-cats]: Responding to original subscription request", + "SubscriptionManager: Sending \`heartbeat\` request to http://mock-router-url.com for IDs: [1234-cats]", + "SubscriptionManager: Heartbeat received response for IDs: [1234-cats]", + "SubscriptionManager: Heartbeat request successful, IDs: [1234-cats]", + "TESTING: Triggering first update", + "SubscriptionManager[1234-cats]: Sending \`next\` request to router", + "SubscriptionManager[1234-cats]: \`next\` request successful", + "SubscriptionCallback: Server is shutting down. Cleaning up outstanding subscriptions and heartbeat intervals", + "SubscriptionManager[1234-cats]: Sending \`complete\` request to router", + "WARN: SubscriptionManager[1234-cats]: Retrying \`complete\` request (attempt 1) due to error: \`complete\` request failed with unexpected status code: 500", + "WARN: SubscriptionManager[1234-cats]: Retrying \`complete\` request (attempt 2) due to error: \`complete\` request failed with unexpected status code: 500", + "SubscriptionManager[1234-cats]: \`complete\` request successful", + "SubscriptionManager: Terminating subscriptions for IDs: [1234-cats]", + "SubscriptionManager: Terminating heartbeat interval, no more subscriptions for http://mock-router-url.com", + "SubscriptionCallback: Successfully cleaned up outstanding subscriptions and heartbeat intervals.", + ] + `); + }); + + it('`complete` requests to failure', async () => { + const server = await startSubscriptionServer({ logger }); + + // Mock the initial check response from the router. + mockRouterCheckResponse(); + + // Start the subscription; this triggers the initial check request and + // starts the heartbeat interval. This simulates an incoming subscription + // request from the router. + const result = await server.executeOperation({ + query: `#graphql + subscription { + count + } + `, + extensions: { + subscription: { + callback_url: 'http://mock-router-url.com', + subscription_id: '1234-cats', + verifier: 'my-verifier-token', + }, + }, + }); + + expect(result.http.status).toEqual(200); + + // Mock the heartbeat response from the router. We'll trigger it once below + // after the subscription is initialized to make sure it works. + const firstHeartbeat = mockRouterHeartbeatResponse(); + // Advance timers to trigger the heartbeat once. This consumes the one + // heartbeat mock from above. + jest.advanceTimersByTime(5000); + await firstHeartbeat; + + // Mock the response to the upcoming subscription update. + const update = mockRouterNextResponse({ payload: { count: 1 } }); + + // Trigger a couple updates. These send `next` requests to the router. + logger.debug('TESTING: Triggering first update'); + await server.executeOperation({ + query: `#graphql + mutation { + addOne + } + `, + }); + + await update; + + // The server will send a `complete` request when it shuts down. Here we + // test that the server will retry max 5 times and give up. + const completeRetries = Promise.all([ + mockRouterCompleteResponse({ statusCode: 500 }), + mockRouterCompleteResponse({ statusCode: 500 }), + mockRouterCompleteResponse({ statusCode: 500 }), + mockRouterCompleteResponse({ statusCode: 500 }), + mockRouterCompleteResponse({ statusCode: 500 }), + ]); + + await server.stop(); + await completeRetries; + + // The heartbeat should be cleaned up at this point. There is no second + // heartbeat mock, so if it ticks again it'll throw an error. + jest.advanceTimersByTime(5000); + + expect(logger.orderOfOperations).toMatchInlineSnapshot(` + [ + "SubscriptionCallback[1234-cats]: Received new subscription request", + "SubscriptionManager[1234-cats]: Sending \`check\` request to router", + "SubscriptionManager[1234-cats]: \`check\` request successful", + "SubscriptionCallback[1234-cats]: Starting graphql-js subscription", + "SubscriptionCallback[1234-cats]: graphql-js subscription successful", + "SubscriptionManager[1234-cats]: Starting new heartbeat interval for http://mock-router-url.com", + "SubscriptionManager[1234-cats]: Listening to graphql-js subscription", + "SubscriptionCallback[1234-cats]: Responding to original subscription request", + "SubscriptionManager: Sending \`heartbeat\` request to http://mock-router-url.com for IDs: [1234-cats]", + "SubscriptionManager: Heartbeat received response for IDs: [1234-cats]", + "SubscriptionManager: Heartbeat request successful, IDs: [1234-cats]", + "TESTING: Triggering first update", + "SubscriptionManager[1234-cats]: Sending \`next\` request to router", + "SubscriptionManager[1234-cats]: \`next\` request successful", + "SubscriptionCallback: Server is shutting down. Cleaning up outstanding subscriptions and heartbeat intervals", + "SubscriptionManager[1234-cats]: Sending \`complete\` request to router", + "WARN: SubscriptionManager[1234-cats]: Retrying \`complete\` request (attempt 1) due to error: \`complete\` request failed with unexpected status code: 500", + "WARN: SubscriptionManager[1234-cats]: Retrying \`complete\` request (attempt 2) due to error: \`complete\` request failed with unexpected status code: 500", + "WARN: SubscriptionManager[1234-cats]: Retrying \`complete\` request (attempt 3) due to error: \`complete\` request failed with unexpected status code: 500", + "WARN: SubscriptionManager[1234-cats]: Retrying \`complete\` request (attempt 4) due to error: \`complete\` request failed with unexpected status code: 500", + "WARN: SubscriptionManager[1234-cats]: Retrying \`complete\` request (attempt 5) due to error: \`complete\` request failed with unexpected status code: 500", + "ERROR: SubscriptionManager[1234-cats]: \`complete\` request failed: \`complete\` request failed with unexpected status code: 500", + "SubscriptionManager: Terminating subscriptions for IDs: [1234-cats]", + "SubscriptionManager: Terminating heartbeat interval, no more subscriptions for http://mock-router-url.com", + "SubscriptionCallback: Successfully cleaned up outstanding subscriptions and heartbeat intervals.", + ] + `); + }); + + it('terminates subscription after max retries `next` requests', async () => { + const server = await startSubscriptionServer({ logger }); + + // Mock the initial check response from the router. + mockRouterCheckResponse(); + + // Start the subscription; this triggers the initial check request and + // starts the heartbeat interval. This simulates an incoming subscription + // request from the router. + const result = await server.executeOperation({ + query: `#graphql + subscription { + count + } + `, + extensions: { + subscription: { + callback_url: 'http://mock-router-url.com', + subscription_id: '1234-cats', + verifier: 'my-verifier-token', + }, + }, + }); + + expect(result.http.status).toEqual(200); + + // Mock the heartbeat response from the router. We'll trigger it once below + // after the subscription is initialized to make sure it works. + const firstHeartbeat = mockRouterHeartbeatResponse(); + // Advance timers to trigger the heartbeat once. This consumes the one + // heartbeat mock from above. + jest.advanceTimersByTime(5000); + await firstHeartbeat; + + // 5 failures to hit the retry limit + const updates = Promise.all( + [...new Array(5)].map(() => + mockRouterNextResponse({ + payload: { count: 1 }, + statusCode: 500, + }), + ), + ); + + // Trigger a couple updates. These send `next` requests to the router. + logger.debug('TESTING: Triggering first update'); + await server.executeOperation({ + query: `#graphql + mutation { + addOne + } + `, + }); + + // After 5 failures, the plugin will terminate the subscriptions without + // sending a `complete` request. + await updates; + // Jest needs a little help here to finish handling the retry failures + // and cancel the subscription. + await new Promise((resolve) => setTimeout(resolve, 100)); + await server.stop(); + + // The heartbeat should be cleaned up at this point. There is no second + // heartbeat mock, so if it ticks again it'll throw an error. + jest.advanceTimersByTime(5000); + + expect(logger.orderOfOperations).toMatchInlineSnapshot(` + [ + "SubscriptionCallback[1234-cats]: Received new subscription request", + "SubscriptionManager[1234-cats]: Sending \`check\` request to router", + "SubscriptionManager[1234-cats]: \`check\` request successful", + "SubscriptionCallback[1234-cats]: Starting graphql-js subscription", + "SubscriptionCallback[1234-cats]: graphql-js subscription successful", + "SubscriptionManager[1234-cats]: Starting new heartbeat interval for http://mock-router-url.com", + "SubscriptionManager[1234-cats]: Listening to graphql-js subscription", + "SubscriptionCallback[1234-cats]: Responding to original subscription request", + "SubscriptionManager: Sending \`heartbeat\` request to http://mock-router-url.com for IDs: [1234-cats]", + "SubscriptionManager: Heartbeat received response for IDs: [1234-cats]", + "SubscriptionManager: Heartbeat request successful, IDs: [1234-cats]", + "TESTING: Triggering first update", + "SubscriptionManager[1234-cats]: Sending \`next\` request to router", + "WARN: SubscriptionManager[1234-cats]: Retrying \`next\` request (attempt 1) due to error: \`next\` request failed with unexpected status code: 500", + "WARN: SubscriptionManager[1234-cats]: Retrying \`next\` request (attempt 2) due to error: \`next\` request failed with unexpected status code: 500", + "WARN: SubscriptionManager[1234-cats]: Retrying \`next\` request (attempt 3) due to error: \`next\` request failed with unexpected status code: 500", + "WARN: SubscriptionManager[1234-cats]: Retrying \`next\` request (attempt 4) due to error: \`next\` request failed with unexpected status code: 500", + "WARN: SubscriptionManager[1234-cats]: Retrying \`next\` request (attempt 5) due to error: \`next\` request failed with unexpected status code: 500", + "ERROR: SubscriptionManager[1234-cats]: \`next\` request failed, terminating subscription: \`next\` request failed with unexpected status code: 500", + "SubscriptionManager: Terminating subscriptions for IDs: [1234-cats]", + "SubscriptionManager: Terminating heartbeat interval, no more subscriptions for http://mock-router-url.com", + "SubscriptionCallback: Server is shutting down. Cleaning up outstanding subscriptions and heartbeat intervals", + "SubscriptionCallback: Successfully cleaned up outstanding subscriptions and heartbeat intervals.", + ] + `); + }); + }); + }); +}); + +async function startSubscriptionServer( + opts?: Partial>, +) { + let count = 0; + const pubsub = new PubSub(); + const server = new ApolloServer({ + plugins: [ + ApolloServerPluginSubscriptionCallback({ + // set some reasonable testing defaults + retry: { + maxTimeout: 50, + minTimeout: 10, + }, + ...(opts?.logger ? { logger: opts.logger } : undefined), + }), + ], + typeDefs: `#graphql + type Query { + hello: String! + } + type Mutation { + addOne: Boolean! + } + type Subscription { + count: Int + terminatesSuccessfully: Boolean + } + `, + resolvers: { + Query: { + hello: () => 'world', + }, + Mutation: { + addOne: async () => { + await pubsub.publish('ADD_ONE', { count: ++count }); + return true; + }, + }, + Subscription: { + count: { + subscribe: () => pubsub.asyncIterator(['ADD_ONE']), + }, + terminatesSuccessfully: { + subscribe: () => ({ + count: 0, + [Symbol.asyncIterator]() { + return { + next: () => { + this.count++; + return { + value: { terminatesSuccessfully: true }, + done: this.count > 1, + }; + }, + }; + }, + }), + }, + }, + }, + ...opts, + }); + await server.start(); + return server; +} + +// Other attempts at this which didn't actually solve the problem:: +// * Use `nock`'s reply callback to resolve a promise. At that point the reply +// isn't done yet, so it's insufficient. +// * while (!nock.isDone()) { await new Promise(...) } - isDone is true when the +// mock is consumed and before the reply is done, so also insufficient. +function promisifyNock(nock: nock.Scope) { + return new Promise((resolve) => { + nock.addListener('replied', () => { + resolve(); + }); + }); +} + +function mockRouterCheckResponse(opts?: { + requestOpts?: { + url?: string; + id?: string; + verifier?: string; + }; + statusCode?: number; + responseBody?: any; +}) { + const { + requestOpts: { + url = 'http://mock-router-url.com', + id = '1234-cats', + verifier = 'my-verifier-token', + } = {}, + statusCode = 204, + responseBody = undefined, + } = opts ?? {}; + + return promisifyNock( + nock(url) + .matchHeader('content-type', 'application/json') + .post('/', { + kind: 'subscription', + action: 'check', + id, + verifier, + }) + .reply(statusCode, responseBody), + ); +} + +function mockRouterCheckResponseWithError(opts?: { + requestOpts?: { + url?: string; + id?: string; + verifier?: string; + }; +}) { + const { + requestOpts: { + url = 'http://mock-router-url.com', + id = '1234-cats', + verifier = 'my-verifier-token', + } = {}, + } = opts ?? {}; + + return promisifyNock( + nock(url) + .matchHeader('content-type', 'application/json') + .post('/', { + kind: 'subscription', + action: 'check', + id, + verifier, + }) + .replyWithError('network request error'), + ); +} + +function mockRouterHeartbeatResponse(opts?: { + requestOptions: { + url?: string; + id?: string; + verifier?: string; + ids?: string[]; + invalidIds?: string[]; + }; + statusCode?: number; + responseBody?: any; +}) { + const { + requestOptions: { + url = 'http://mock-router-url.com', + id = '1234-cats', + verifier = 'my-verifier-token', + ids = undefined, + invalidIds = undefined, + } = {}, + statusCode = 200, + responseBody, + } = opts ?? {}; + + return promisifyNock( + nock(url) + .matchHeader('content-type', 'application/json') + .post('/', { + kind: 'subscription', + action: 'heartbeat', + id, + verifier, + ids: ids ?? [id], + ...(invalidIds && { invalid_ids: invalidIds }), + }) + .reply(statusCode, responseBody), + ); +} + +function mockRouterNextResponse(requestOpts: { + payload: Record; + url?: string; + id?: string; + verifier?: string; + statusCode?: number; +}) { + const { + payload, + url = 'http://mock-router-url.com', + id = '1234-cats', + verifier = 'my-verifier-token', + statusCode = 200, + } = requestOpts; + + return promisifyNock( + nock(url) + .matchHeader('content-type', 'application/json') + .post('/', { + kind: 'subscription', + action: 'next', + id, + verifier, + payload: { data: payload }, + }) + .reply(statusCode), + ); +} + +function mockRouterCompleteResponse(requestOpts?: { + errors?: any[]; + url?: string; + id?: string; + verifier?: string; + statusCode?: number; +}) { + const { + errors = undefined, + url = 'http://mock-router-url.com', + id = '1234-cats', + verifier = 'my-verifier-token', + statusCode = 200, + } = requestOpts ?? {}; + + return promisifyNock( + nock(url) + .matchHeader('content-type', 'application/json') + .post('/', { + kind: 'subscription', + action: 'complete', + id, + verifier, + ...(errors && { errors }), + }) + .reply(statusCode), + ); +} + +/** + * Returns a logger that pushes all logs to an array. This is used validate the + * order in which things happen within the plugin at the end of a test. + */ +function orderOfOperationsLogger() { + const logger: Logger & { orderOfOperations: string[] } = { + debug(msg: string) { + this.orderOfOperations.push(msg); + }, + info() {}, + warn(msg: string) { + this.orderOfOperations.push(`WARN: ${msg}`); + }, + error(msg: string) { + this.orderOfOperations.push(`ERROR: ${msg}`); + }, + orderOfOperations: [], + }; + return logger; +} diff --git a/packages/server/src/plugin/subscriptionCallback/index.ts b/packages/server/src/plugin/subscriptionCallback/index.ts new file mode 100644 index 00000000000..460f6c73ce3 --- /dev/null +++ b/packages/server/src/plugin/subscriptionCallback/index.ts @@ -0,0 +1,720 @@ +import type { Logger } from '@apollo/utils.logger'; +import { type GraphQLError, subscribe, type ExecutionResult } from 'graphql'; +import fetch, { type Response } from 'node-fetch'; +import { ensureError, ensureGraphQLError } from '../../errorNormalize.js'; +import type { ApolloServerPlugin } from '../../externalTypes/index.js'; +import { HeaderMap } from '../../utils/HeaderMap.js'; +import retry from 'async-retry'; + +export interface ApolloServerPluginSubscriptionCallbackOptions { + heartbeatIntervalMs?: number; + maxConsecutiveHeartbeatFailures?: number; + logger?: Logger; + retry?: retry.Options; +} + +export function ApolloServerPluginSubscriptionCallback( + options: ApolloServerPluginSubscriptionCallbackOptions = Object.create(null), +): ApolloServerPlugin { + const subscriptionManager = new SubscriptionManager(options); + const logger = options.logger + ? prefixedLogger(options.logger, 'SubscriptionCallback') + : undefined; + + return { + async requestDidStart({ request }) { + const subscriptionExtension = request?.extensions?.subscription; + // If it's not a callback subscription, ignore the request. + if (!subscriptionExtension) return; + const { + callback_url: callbackUrl, + subscription_id: id, + verifier, + } = subscriptionExtension; + + return { + // Implementing `responseForOperation` is the only hook that allows us + // to bypass normal execution by returning our own response. We don't + // want Apollo Server to actually handle this subscription request, we + // want to handle everything ourselves. The actual subscription handling + // will be done in `willSendResponse`. + async responseForOperation() { + logger?.debug('Received new subscription request', id); + + return { + http: { + status: 200, + headers: new HeaderMap(), + }, + body: { + kind: 'single', + singleResult: { + data: null, + }, + }, + }; + }, + // The majority of the subscription work is implemented in this hook. + // This can _almost_ all be implemented in `responseForOperation`, + // however in the case of GraphQL validation errors, + // `responseForOperation` won't actually be called, but + // `willSendResponse` will. + async willSendResponse({ + request, + schema, + document, + contextValue, + operationName, + response, + }) { + try { + // Before responding to the original request, we need to complete a + // roundtrip `check` request to the router, so we `await` this + // request. + await subscriptionManager.checkRequest({ + callbackUrl, + id, + verifier, + }); + } catch (e) { + const graphqlError = ensureGraphQLError(e); + logger?.error( + `\`check\` request failed: ${graphqlError.message}`, + id, + ); + // In the event of a check failure, we respond to the original + // request with a >=400 status code. + if (response.body.kind === 'single') { + response.body.singleResult.errors = [graphqlError]; + response.http.status = 500; + } + return; + } + + // The `check` request was successful, so we can initialize the actual + // `graphql-js` subscription. We don't expect `subscribe` to throw, + // but rather return an object with `errors` on it (if there are any). + logger?.debug(`Starting graphql-js subscription`, id); + let subscription: Awaited>; + try { + subscription = await subscribe({ + schema, + document: document!, + variableValues: request.variables, + contextValue: contextValue, + operationName: operationName, + }); + } catch (e) { + // While we don't expect this scenario, we should still handle it + // gracefully (in the same way that we do below, when the + // subscription object has `errors` on it). + const graphqlError = ensureGraphQLError(e); + logger?.error( + `Programming error: graphql-js subscribe() threw unexpectedly! Please report this bug to Apollo. The error was: ${e}`, + id, + ); + subscriptionManager.completeRequest({ + errors: [graphqlError], + callbackUrl, + id, + verifier, + }); + return; + } + + // In the case of errors, send a `complete` request to the router with + // the errors. + if ('errors' in subscription) { + logger?.error( + `graphql-js subscription unsuccessful: [\n\t${subscription.errors + ?.map((e) => e.message) + .join(',\n\t')}\n]`, + id, + ); + + try { + subscriptionManager.completeRequest({ + errors: subscription.errors, + callbackUrl, + id, + verifier, + }); + } catch (e) { + // TODO: not sure how to best handle a failed "completion with + // errors" request outside of retrying. + logger?.error(`\`complete\` request failed: ${e}`, id); + } + } else if (isAsyncIterable(subscription)) { + // We have a real subscription - now we can kick off the heartbeat + // interval and consume the AsyncIterable on the `subscription` + // object. + logger?.debug('graphql-js subscription successful', id); + subscriptionManager.initHeartbeat({ + callbackUrl, + id, + verifier, + }); + + subscriptionManager.startConsumingSubscription({ + subscription, + callbackUrl, + id, + verifier, + }); + } + + logger?.debug(`Responding to original subscription request`, id); + }, + }; + }, + async serverWillStart() { + return { + async drainServer() { + logger?.debug( + 'Server is shutting down. Cleaning up outstanding subscriptions and heartbeat intervals', + ); + await subscriptionManager.cleanup(); + logger?.debug( + 'Successfully cleaned up outstanding subscriptions and heartbeat intervals.', + ); + }, + }; + }, + }; +} + +function isAsyncIterable(value: any): value is AsyncIterable { + return value && typeof value[Symbol.asyncIterator] === 'function'; +} + +interface SubscriptionObject { + cancelled: boolean; + startConsumingSubscription: () => Promise; + completeSubscription: () => Promise; +} +// This class manages the state of subscriptions, heartbeat intervals, and +// router requests. It keeps track of in flight requests so we can await them +// during server cleanup. +class SubscriptionManager { + private heartbeatIntervalMs: number; + private maxConsecutiveHeartbeatFailures: number; + private logger?: ReturnType; + private retryConfig?: retry.Options; + private requestsInFlight: Set> = new Set(); + // A map of information about subscriptions for a given callback url. For each + // url, this tracks its single heartbeat interval (with relevant heartbeat + // request info) and active subscriptions. + private subscriptionInfoByCallbackUrl: Map< + string, + { + heartbeat?: { + id: string; + verifier: string; + interval: NodeJS.Timeout; + queue: Promise[]; + }; + subscriptionsById: Map; + } + > = new Map(); + + constructor(options: ApolloServerPluginSubscriptionCallbackOptions) { + this.heartbeatIntervalMs = options.heartbeatIntervalMs ?? 5000; + this.maxConsecutiveHeartbeatFailures = + options.maxConsecutiveHeartbeatFailures ?? 5; + this.retryConfig = { + retries: 5, + minTimeout: 100, + maxTimeout: 1000, + ...options.retry, + }; + this.logger = options.logger + ? prefixedLogger(options.logger, 'SubscriptionManager') + : undefined; + } + + async retryFetch({ + url, + action, + id, + verifier, + payload, + errors, + }: { + url: string; + action: 'check' | 'next' | 'complete'; + id: string; + verifier: string; + payload?: ExecutionResult; + errors?: readonly GraphQLError[]; + }) { + let response: Promise | undefined; + try { + const maybeWithErrors = errors?.length ? ` with errors` : ''; + this.logger?.debug( + `Sending \`${action}\` request to router` + maybeWithErrors, + id, + ); + return retry( + async (bail) => { + response = fetch(url, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ + kind: 'subscription', + action, + id, + verifier, + ...(payload && { payload }), + ...(errors?.length && { errors }), + }), + }); + this.requestsInFlight.add(response); + const result = await response; + + if (!result.ok) { + if (result.status >= 500) { + // Throwing here triggers a retry, which seems reasonable for 5xx + // (i.e. an internal server error). + throw new Error( + `\`${action}\` request failed with unexpected status code: ${result.status}`, + ); + } else { + // For 4xx, we don't want to retry. These errors carry a semantic + // meaning (terminate), so we bail. This will reject the promise and + // should be handled by the caller. Specifically, 404 from the + // router means terminate, but the protocol says that in other error + // cases the subscription should be terminated due to an unexpected + // error. + if (result.status === 404) { + this.logger?.debug( + `\`${action}\` request received 404, terminating subscription`, + id, + ); + } else { + const errMsg = `\`${action}\` request failed with unexpected status code: ${result.status}, terminating subscription`; + this.logger?.debug(errMsg, id); + bail(new Error(errMsg)); + } + this.terminateSubscriptions([id], url); + return result; + } + } + this.logger?.debug(`\`${action}\` request successful`, id); + return result; + }, + { + ...this.retryConfig, + onRetry: (e, attempt) => { + this.requestsInFlight.delete(response!); + this.logger?.warn( + `Retrying \`${action}\` request (attempt ${attempt}) due to error: ${e.message}`, + id, + ); + this.retryConfig?.onRetry?.(e, attempt); + }, + }, + ); + } finally { + this.requestsInFlight.delete(response!); + } + } + + // Implements sending the `check` request to the router. Fetch errors are + // thrown and expected to be handled by the caller. + async checkRequest({ + callbackUrl, + id, + verifier, + }: { + callbackUrl: string; + id: string; + verifier: string; + }) { + return this.retryFetch({ + url: callbackUrl, + action: 'check', + id, + verifier, + }); + } + + // Kicks off an interval that sends `heartbeat` requests to the router. If an + // interval already exists for the callback url, we just add the new ID to its + // list of IDs that it's sending a heartbeat for. This allows us to send one + // batched heartbeat per interval per callback url. + initHeartbeat({ + callbackUrl, + id, + verifier, + }: { + callbackUrl: string; + id: string; + verifier: string; + }) { + if (!this.subscriptionInfoByCallbackUrl.has(callbackUrl)) { + this.subscriptionInfoByCallbackUrl.set(callbackUrl, { + subscriptionsById: new Map(), + }); + } + + // Skip interval creation if one already exists for this url + if (this.subscriptionInfoByCallbackUrl.get(callbackUrl)?.heartbeat) { + this.logger?.debug( + `Heartbeat interval already exists for ${callbackUrl}, reusing existing interval`, + id, + ); + return; + } + + // Kickoff heartbeat interval since there isn't one already + this.logger?.debug( + `Starting new heartbeat interval for ${callbackUrl}`, + id, + ); + + let consecutiveHeartbeatFailureCount = 0; + const heartbeatInterval = setInterval(async () => { + let heartbeatRequest: Promise | undefined; + + // XXX since we're on an interval, it's possible a heartbeat goes out + // before the previous heartbeat has finished. It seems reasonable to + // queue them and await the previous heartbeat if there is one. It might + // also be reasonable to just bail / skip this heartbeat if there's + // already one in flight. I'm not sure which is better. This Promise and + // all of the queue stuff try to address this. Maybe this would be better + // off chaining timeouts instead of using an interval? + let resolveHeartbeatPromise: () => void; + const heartbeatPromise = new Promise((r) => { + resolveHeartbeatPromise = r; + }); + const existingSubscriptionInfo = + this.subscriptionInfoByCallbackUrl.get(callbackUrl); + + if (!existingSubscriptionInfo?.heartbeat) { + // This is unexpected - if the interval is still running we should have + // an entry in the map for it. But if we do end up here, there's no + // reason to let the interval continue to run. + clearInterval(heartbeatInterval); + this.logger?.error( + `Programming error: Heartbeat interval unexpectedly missing for ${callbackUrl}. This is probably a bug in Apollo Server.`, + ); + return; + } + const existingHeartbeat = existingSubscriptionInfo.heartbeat; + const { queue } = existingHeartbeat; + queue.push(heartbeatPromise); + if (queue.length > 1) { + const requestBeforeMe = queue[existingHeartbeat?.queue.length - 2]; + await requestBeforeMe; + } + + // Send the heartbeat request + try { + const ids = Array.from( + existingSubscriptionInfo.subscriptionsById.keys() ?? [], + ); + this.logger?.debug( + `Sending \`heartbeat\` request to ${callbackUrl} for IDs: [${ids.join( + ',', + )}]`, + ); + + heartbeatRequest = fetch(callbackUrl, { + method: 'POST', + body: JSON.stringify({ + kind: 'subscription', + action: 'heartbeat', + id: existingHeartbeat.id ?? id, + verifier: existingHeartbeat.verifier ?? verifier, + ids, + }), + headers: { 'Content-Type': 'application/json' }, + }); + this.requestsInFlight.add(heartbeatRequest); + + // The heartbeat response might contain updates for us to act upon, so we + // need to await it + const result = await heartbeatRequest; + + this.logger?.debug( + `Heartbeat received response for IDs: [${ids.join(',')}]`, + ); + + if (result.ok) { + this.logger?.debug( + `Heartbeat request successful, IDs: [${ids.join(',')}]`, + ); + } else if (result.status === 400) { + const body = await result.json(); + this.logger?.debug( + `Heartbeat request received invalid IDs: [${body.invalid_ids.join( + ',', + )}]`, + ); + // Some of the IDs are invalid, so we need to update the id and + // verifier for future heartbeat requests (both provided by the router + // in the response body) + existingHeartbeat.id = body.id; + existingHeartbeat.verifier = body.verifier; + + this.terminateSubscriptions(body.invalid_ids, callbackUrl); + } else if (result.status === 404) { + // all ids we sent are invalid + this.logger?.debug( + `Heartbeat request received invalid IDs: [${ids.join(',')}]`, + ); + // This will also handle cleaning up the heartbeat interval + this.terminateSubscriptions(ids, callbackUrl); + } else { + // We'll catch this below and log it with the expectation that we'll + // retry this request some number of times before giving up + throw new Error(`Unexpected status code: ${result.status}`); + } + + // If we make it here, there wasn't some transient error with the + // request and it had an expected status code (2xx, 400, 404). + consecutiveHeartbeatFailureCount = 0; + } catch (e) { + const err = ensureError(e); + // The heartbeat request failed. + this.logger?.error( + `Heartbeat request failed (${++consecutiveHeartbeatFailureCount} consecutive): ${ + err.message + }`, + existingHeartbeat.id, + ); + + if ( + consecutiveHeartbeatFailureCount >= + this.maxConsecutiveHeartbeatFailures + ) { + this.logger?.error( + `Heartbeat request failed ${consecutiveHeartbeatFailureCount} times, terminating subscriptions and heartbeat interval: ${err.message}`, + existingHeartbeat.id, + ); + // If we've failed 5 times in a row, we should terminate all + // subscriptions for this callback url. This will also handle + // cleaning up the heartbeat interval. + this.terminateSubscriptions( + Array.from( + this.subscriptionInfoByCallbackUrl + .get(callbackUrl) + ?.subscriptionsById.keys() ?? [], + ), + callbackUrl, + ); + } + return; + } finally { + if (heartbeatRequest) { + this.requestsInFlight.delete(heartbeatRequest); + } + // remove itself from the queue and resolve the promise + existingHeartbeat?.queue.shift(); + resolveHeartbeatPromise!(); + } + }, this.heartbeatIntervalMs); + + // Add the heartbeat interval to the map of intervals + const subscriptionInfo = + this.subscriptionInfoByCallbackUrl.get(callbackUrl)!; + subscriptionInfo.heartbeat = { + interval: heartbeatInterval, + id, + verifier, + queue: [], + }; + } + + // Cancels and cleans up the subscriptions for given IDs and callback url. If + // there are no active subscriptions after this, we also clean up the + // heartbeat interval. This does not handle sending the `complete` request to + // the router. + private terminateSubscriptions(ids: string[], callbackUrl: string) { + this.logger?.debug(`Terminating subscriptions for IDs: [${ids.join(',')}]`); + const subscriptionInfo = + this.subscriptionInfoByCallbackUrl.get(callbackUrl); + if (!subscriptionInfo) { + this.logger?.error( + `No subscriptions found for ${callbackUrl}, skipping termination`, + ); + return; + } + const { subscriptionsById, heartbeat } = subscriptionInfo; + for (const id of ids) { + const subscription = subscriptionsById.get(id); + if (subscription) { + subscription.cancelled = true; + } + subscriptionsById.delete(id); + // if the list is empty now we can clean up everything for this callback url + if (subscriptionsById?.size === 0) { + this.logger?.debug( + `Terminating heartbeat interval, no more subscriptions for ${callbackUrl}`, + ); + if (heartbeat) clearInterval(heartbeat.interval); + this.subscriptionInfoByCallbackUrl.delete(callbackUrl); + } + } + } + + // Consumes the AsyncIterable returned by `graphql-js`'s `subscribe` function. + // This handles sending the `next` requests to the router as well as the + // `complete` request when the subscription is finished. + startConsumingSubscription({ + subscription, + callbackUrl, + id, + verifier, + }: { + subscription: AsyncGenerator; + callbackUrl: string; + id: string; + verifier: string; + }) { + // For each subscription we need to manage a bit of state. We need to be + // able to cancel the subscription externally. Setting `cancelled` to true + // allows us to break out of the `for await` and ignore future payloads. + const self = this; + const subscriptionObject = { + cancelled: false, + async startConsumingSubscription() { + self.logger?.debug(`Listening to graphql-js subscription`, id); + for await (const payload of subscription) { + if (this.cancelled) { + self.logger?.debug( + `Subscription already cancelled, ignoring current and future payloads`, + id, + ); + // It's already been cancelled - something else has already handled + // sending the `complete` request so we don't want to `break` here + // and send it again after the loop. + return; + } + + try { + await self.retryFetch({ + url: callbackUrl, + action: 'next', + id, + verifier, + payload, + }); + } catch (e) { + const originalError = ensureError(e); + self.logger?.error( + `\`next\` request failed, terminating subscription: ${originalError.message}`, + id, + ); + self.terminateSubscriptions([id], callbackUrl); + } + } + // The subscription ended without errors, send the `complete` request to + // the router + self.logger?.debug(`Subscription completed without errors`, id); + await this.completeSubscription(); + }, + async completeSubscription(errors?: readonly GraphQLError[]) { + if (this.cancelled) return; + this.cancelled = true; + + try { + await self.completeRequest({ + callbackUrl, + id, + verifier, + ...(errors && { errors }), + }); + } catch (e) { + const error = ensureError(e); + // This is just the `complete` request. If we fail to get this message + // to the router, it should just invalidate the subscription after the + // next heartbeat fails. + self.logger?.error( + `\`complete\` request failed: ${error.message}`, + id, + ); + } finally { + self.terminateSubscriptions([id], callbackUrl); + } + }, + }; + + subscriptionObject.startConsumingSubscription(); + const subscriptionInfo = + this.subscriptionInfoByCallbackUrl.get(callbackUrl); + if (!subscriptionInfo) { + this.logger?.error( + `No existing heartbeat found for ${callbackUrl}, skipping subscription`, + ); + } + subscriptionInfo?.subscriptionsById.set(id, subscriptionObject); + } + + // Sends the `complete` request to the router. + async completeRequest({ + errors, + callbackUrl, + id, + verifier, + }: { + errors?: readonly GraphQLError[]; + callbackUrl: string; + id: string; + verifier: string; + }) { + return this.retryFetch({ + url: callbackUrl, + action: 'complete', + id, + verifier, + errors, + }); + } + + collectAllSubscriptions() { + return Array.from(this.subscriptionInfoByCallbackUrl.values()).reduce( + (subscriptions, { subscriptionsById }) => { + subscriptions.push(...Array.from(subscriptionsById.values())); + return subscriptions; + }, + [] as SubscriptionObject[], + ); + } + + async cleanup() { + // Wait for our inflight heartbeats to finish - they might handle cancelling + // some subscriptions + await Promise.allSettled( + Array.from(this.subscriptionInfoByCallbackUrl.values()).map( + async ({ heartbeat }) => { + clearInterval(heartbeat?.interval); + await heartbeat?.queue[heartbeat.queue.length - 1]; + }, + ), + ); + // Cancel / complete any still-active subscriptions + await Promise.allSettled( + this.collectAllSubscriptions() + .filter((s) => !s.cancelled) + .map((s) => s.completeSubscription()), + ); + // Wait for any remaining requests to finish + await Promise.allSettled(this.requestsInFlight.values()); + } +} + +// Simple prefixing logger to curry class name and request IDs into log messages +function prefixedLogger(logger: Logger, prefix: string) { + function log(level: keyof Logger) { + return function (message: string, id?: string) { + logger[level](`${prefix}${id ? `[${id}]` : ''}: ${message}`); + }; + } + return { + debug: log('debug'), + error: log('error'), + info: log('info'), + warn: log('warn'), + }; +}