diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index 8a945ce2b82..530cc248c40 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -13,24 +13,24 @@ # The following paths involve server-side use cases, token/user session management. # Changes made to these paths requires additional reviews and approvals. -/packages/auth @haverchuck @cshfang @jimblanc @HuiSF -/packages/aws-amplify/src/adapter-core @haverchuck @cshfang @jimblanc @HuiSF -/packages/core/src/adapterCore @haverchuck @cshfang @jimblanc @HuiSF -/packages/core/src/singleton @haverchuck @cshfang @jimblanc @HuiSF -/packages/core/src/utils/convert @haverchuck @cshfang @jimblanc @HuiSF -/packages/core/src/utils/WordArray.ts @haverchuck @cshfang @jimblanc @HuiSF -/packages/core/src/storage @haverchuck @cshfang @jimblanc @HuiSF -/packages/core/src/utils/generateRandomString.ts @haverchuck @cshfang @jimblanc @HuiSF -/packages/core/src/utils/globalHelpers @haverchuck @cshfang @jimblanc @HuiSF -/packages/core/src/utils/urlSafeDecode.ts @haverchuck @cshfang @jimblanc @HuiSF -/packages/core/src/awsClients/cognitoIdentity @haverchuck @cshfang @jimblanc @HuiSF -/packages/core/src/clients/internal @haverchuck @cshfang @jimblanc @HuiSF -/packages/core/src/Hub @haverchuck @cshfang @jimblanc @HuiSF -/packages/adapter-nextjs @haverchuck @cshfang @jimblanc @HuiSF -/packages/rtn-web-browser @haverchuck @cshfang @jimblanc @HuiSF -/packages/storage/src/providers/s3/apis/internal @haverchuck @cshfang @jimblanc @HuiSF -/packages/storage/src/providers/s3/apis/server @haverchuck @cshfang @jimblanc @HuiSF -/packages/api-rest/src/apis/server.ts @haverchuck @cshfang @jimblanc @HuiSF -/packages/api-rest/src/apis/common/internalPost.ts @haverchuck @cshfang @jimblanc @HuiSF -/packages/api-graphql/src/server @haverchuck @cshfang @jimblanc @HuiSF -/packages/api-graphql/src/internals/server @haverchuck @cshfang @jimblanc @HuiSF +/packages/auth @haverchuck @cshfang @HuiSF @pranavosu +/packages/aws-amplify/src/adapter-core @haverchuck @cshfang @HuiSF @pranavosu +/packages/core/src/adapterCore @haverchuck @cshfang @HuiSF @pranavosu +/packages/core/src/singleton @haverchuck @cshfang @HuiSF @pranavosu +/packages/core/src/utils/convert @haverchuck @cshfang @HuiSF @pranavosu +/packages/core/src/utils/WordArray.ts @haverchuck @cshfang @HuiSF @pranavosu +/packages/core/src/storage @haverchuck @cshfang @HuiSF @pranavosu +/packages/core/src/utils/generateRandomString.ts @haverchuck @cshfang @HuiSF @pranavosu +/packages/core/src/utils/globalHelpers @haverchuck @cshfang @HuiSF @pranavosu +/packages/core/src/utils/urlSafeDecode.ts @haverchuck @cshfang @HuiSF @pranavosu +/packages/core/src/awsClients/cognitoIdentity @haverchuck @cshfang @HuiSF @pranavosu +/packages/core/src/clients/internal @haverchuck @cshfang @HuiSF @pranavosu +/packages/core/src/Hub @haverchuck @cshfang @HuiSF @pranavosu +/packages/adapter-nextjs @haverchuck @cshfang @HuiSF @pranavosu +/packages/rtn-web-browser @haverchuck @cshfang @HuiSF @pranavosu +/packages/storage/src/providers/s3/apis/internal @haverchuck @cshfang @HuiSF @pranavosu +/packages/storage/src/providers/s3/apis/server @haverchuck @cshfang @HuiSF @pranavosu +/packages/api-rest/src/apis/server.ts @haverchuck @cshfang @HuiSF @pranavosu +/packages/api-rest/src/apis/common/internalPost.ts @haverchuck @cshfang @HuiSF @pranavosu +/packages/api-graphql/src/server @haverchuck @cshfang @HuiSF @pranavosu +/packages/api-graphql/src/internals/server @haverchuck @cshfang @HuiSF @pranavosu diff --git a/packages/api-graphql/__tests__/AWSAppSyncEventProvider.test.ts b/packages/api-graphql/__tests__/AWSAppSyncEventProvider.test.ts new file mode 100644 index 00000000000..d84f1daad89 --- /dev/null +++ b/packages/api-graphql/__tests__/AWSAppSyncEventProvider.test.ts @@ -0,0 +1,177 @@ +import { Observable, Observer } from 'rxjs'; +import { Reachability } from '@aws-amplify/core/internals/utils'; +import { ConsoleLogger } from '@aws-amplify/core'; +import { MESSAGE_TYPES } from '../src/Providers/constants'; +import * as constants from '../src/Providers/constants'; + +import { delay, FakeWebSocketInterface } from './helpers'; +import { ConnectionState as CS } from '../src/types/PubSub'; + +import { AWSAppSyncEventProvider } from '../src/Providers/AWSAppSyncEventsProvider'; + +describe('AppSyncEventProvider', () => { + describe('subscribe()', () => { + describe('returned observer', () => { + describe('connection logic with mocked websocket', () => { + let fakeWebSocketInterface: FakeWebSocketInterface; + const loggerSpy: jest.SpyInstance = jest.spyOn( + ConsoleLogger.prototype, + '_log', + ); + + let provider: AWSAppSyncEventProvider; + let reachabilityObserver: Observer<{ online: boolean }>; + + beforeEach(async () => { + // Set the network to "online" for these tests + jest + .spyOn(Reachability.prototype, 'networkMonitor') + .mockImplementationOnce(() => { + return new Observable(observer => { + reachabilityObserver = observer; + }); + }) + // Twice because we subscribe to get the initial state then again to monitor reachability + .mockImplementationOnce(() => { + return new Observable(observer => { + reachabilityObserver = observer; + }); + }); + + fakeWebSocketInterface = new FakeWebSocketInterface(); + provider = new AWSAppSyncEventProvider(); + + // Saving this spy and resetting it by hand causes badness + // Saving it causes new websockets to be reachable across past tests that have not fully closed + // Resetting it proactively causes those same past tests to be dealing with null while they reach a settled state + jest + .spyOn(provider as any, '_getNewWebSocket') + .mockImplementation(() => { + fakeWebSocketInterface.newWebSocket(); + return fakeWebSocketInterface.webSocket as WebSocket; + }); + + // Reduce retry delay for tests to 100ms + Object.defineProperty(constants, 'MAX_DELAY_MS', { + value: 100, + }); + // Reduce retry delay for tests to 100ms + Object.defineProperty(constants, 'RECONNECT_DELAY', { + value: 100, + }); + }); + + afterEach(async () => { + provider?.close(); + await fakeWebSocketInterface?.closeInterface(); + fakeWebSocketInterface?.teardown(); + loggerSpy.mockClear(); + }); + + test('subscription observer error is triggered when a connection is formed and a non-retriable connection_error data message is received', async () => { + expect.assertions(3); + + const socketCloseSpy = jest.spyOn( + fakeWebSocketInterface.webSocket, + 'close', + ); + fakeWebSocketInterface.webSocket.readyState = WebSocket.OPEN; + + const observer = provider.subscribe({ + appSyncGraphqlEndpoint: 'ws://localhost:8080', + }); + + observer.subscribe({ + error: e => { + expect(e.errors[0].message).toEqual( + 'Connection failed: UnauthorizedException', + ); + }, + }); + + await fakeWebSocketInterface?.readyForUse; + await fakeWebSocketInterface?.triggerOpen(); + + // Resolve the message delivery actions + await Promise.resolve( + fakeWebSocketInterface?.sendDataMessage({ + type: MESSAGE_TYPES.GQL_CONNECTION_ERROR, + errors: [ + { + errorType: 'UnauthorizedException', // - non-retriable + errorCode: 401, + }, + ], + }), + ); + + // Watching for raised exception to be caught and logged + expect(loggerSpy).toHaveBeenCalledWith( + 'DEBUG', + expect.stringContaining('error on bound '), + expect.objectContaining({ + message: expect.stringMatching('UnauthorizedException'), + }), + ); + + await delay(1); + + expect(socketCloseSpy).toHaveBeenCalledWith(3001); + }); + + test('subscription observer error is not triggered when a connection is formed and a retriable connection_error data message is received', async () => { + expect.assertions(2); + + const observer = provider.subscribe({ + appSyncGraphqlEndpoint: 'ws://localhost:8080', + }); + + observer.subscribe({ + error: x => {}, + }); + + const openSocketAttempt = async () => { + await fakeWebSocketInterface?.readyForUse; + await fakeWebSocketInterface?.triggerOpen(); + + // Resolve the message delivery actions + await Promise.resolve( + fakeWebSocketInterface?.sendDataMessage({ + type: MESSAGE_TYPES.GQL_CONNECTION_ERROR, + errors: [ + { + errorType: 'Retriable Test', + errorCode: 408, // Request timed out - retriable + }, + ], + }), + ); + await fakeWebSocketInterface?.resetWebsocket(); + }; + + // Go through two connection attempts to excercise backoff and retriable raise + await openSocketAttempt(); + await openSocketAttempt(); + + // Watching for raised exception to be caught and logged + expect(loggerSpy).toHaveBeenCalledWith( + 'DEBUG', + expect.stringContaining('error on bound '), + expect.objectContaining({ + message: expect.stringMatching('Retriable Test'), + }), + ); + + await fakeWebSocketInterface?.waitUntilConnectionStateIn([ + CS.ConnectionDisrupted, + ]); + + expect(loggerSpy).toHaveBeenCalledWith( + 'DEBUG', + 'Connection failed: Retriable Test', + ); + }); + }); + }); + }); +}); diff --git a/packages/api-graphql/__tests__/AWSAppSyncRealTimeProvider.test.ts b/packages/api-graphql/__tests__/AWSAppSyncRealTimeProvider.test.ts index d8456f5b373..041b9624898 100644 --- a/packages/api-graphql/__tests__/AWSAppSyncRealTimeProvider.test.ts +++ b/packages/api-graphql/__tests__/AWSAppSyncRealTimeProvider.test.ts @@ -12,6 +12,7 @@ import { import { ConnectionState as CS } from '../src/types/PubSub'; import { AWSAppSyncRealTimeProvider } from '../src/Providers/AWSAppSyncRealTimeProvider'; +import { isCustomDomain } from '../src/Providers/AWSWebSocketProvider/appsyncUrl'; // Mock all calls to signRequest jest.mock('@aws-amplify/core/internals/aws-client-utils', () => { @@ -20,7 +21,7 @@ jest.mock('@aws-amplify/core/internals/aws-client-utils', () => { ); return { ...original, - signRequest: (_request, _options) => { + signRequest: (_request: any, _options: any) => { return { method: 'test', headers: { test: 'test' }, @@ -46,7 +47,7 @@ jest.mock('@aws-amplify/core', () => { }; return { ...original, - fetchAuthSession: (_request, _options) => { + fetchAuthSession: (_request: any, _options: any) => { return Promise.resolve(session); }, Amplify: { @@ -66,24 +67,19 @@ jest.mock('@aws-amplify/core', () => { describe('AWSAppSyncRealTimeProvider', () => { describe('isCustomDomain()', () => { test('Custom domain returns `true`', () => { - const provider = new AWSAppSyncRealTimeProvider(); - const result = (provider as any).isCustomDomain( - 'https://unit-test.testurl.com/graphql', - ); + const result = isCustomDomain('https://unit-test.testurl.com/graphql'); expect(result).toBe(true); }); test('Non-custom domain returns `false`', () => { - const provider = new AWSAppSyncRealTimeProvider(); - const result = (provider as any).isCustomDomain( + const result = isCustomDomain( 'https://12345678901234567890123456.appsync-api.us-west-2.amazonaws.com/graphql', ); expect(result).toBe(false); }); test('Non-custom domain in the amazonaws.com.cn subdomain space returns `false`', () => { - const provider = new AWSAppSyncRealTimeProvider(); - const result = (provider as any).isCustomDomain( + const result = isCustomDomain( 'https://12345678901234567890123456.appsync-api.cn-north-1.amazonaws.com.cn/graphql', ); expect(result).toBe(false); @@ -136,10 +132,12 @@ describe('AWSAppSyncRealTimeProvider', () => { // Saving this spy and resetting it by hand causes badness // Saving it causes new websockets to be reachable across past tests that have not fully closed // Resetting it proactively causes those same past tests to be dealing with null while they reach a settled state - jest.spyOn(provider, 'getNewWebSocket').mockImplementation(() => { - fakeWebSocketInterface.newWebSocket(); - return fakeWebSocketInterface.webSocket as WebSocket; - }); + jest + .spyOn(provider as any, '_getNewWebSocket') + .mockImplementation(() => { + fakeWebSocketInterface.newWebSocket(); + return fakeWebSocketInterface.webSocket as WebSocket; + }); // Reduce retry delay for tests to 100ms Object.defineProperty(constants, 'MAX_DELAY_MS', { @@ -228,7 +226,7 @@ describe('AWSAppSyncRealTimeProvider', () => { expect.assertions(1); const newSocketSpy = jest - .spyOn(provider, 'getNewWebSocket') + .spyOn(provider as any, '_getNewWebSocket') .mockImplementation(() => { fakeWebSocketInterface.newWebSocket(); return fakeWebSocketInterface.webSocket as WebSocket; @@ -254,7 +252,7 @@ describe('AWSAppSyncRealTimeProvider', () => { expect.assertions(1); const newSocketSpy = jest - .spyOn(provider, 'getNewWebSocket') + .spyOn(provider as any, '_getNewWebSocket') .mockImplementation(() => { fakeWebSocketInterface.newWebSocket(); return fakeWebSocketInterface.webSocket as WebSocket; @@ -280,7 +278,7 @@ describe('AWSAppSyncRealTimeProvider', () => { expect.assertions(1); const newSocketSpy = jest - .spyOn(provider, 'getNewWebSocket') + .spyOn(provider as any, '_getNewWebSocket') .mockImplementation(() => { fakeWebSocketInterface.newWebSocket(); return fakeWebSocketInterface.webSocket as WebSocket; @@ -307,7 +305,7 @@ describe('AWSAppSyncRealTimeProvider', () => { expect.assertions(1); const newSocketSpy = jest - .spyOn(provider, 'getNewWebSocket') + .spyOn(provider as any, '_getNewWebSocket') .mockImplementation(() => { fakeWebSocketInterface.newWebSocket(); return fakeWebSocketInterface.webSocket; @@ -349,7 +347,7 @@ describe('AWSAppSyncRealTimeProvider', () => { expect.assertions(1); const newSocketSpy = jest - .spyOn(provider, 'getNewWebSocket') + .spyOn(provider as any, '_getNewWebSocket') .mockImplementation(() => { fakeWebSocketInterface.newWebSocket(); return fakeWebSocketInterface.webSocket; @@ -545,7 +543,7 @@ describe('AWSAppSyncRealTimeProvider', () => { await fakeWebSocketInterface?.standardConnectionHandshake(); await fakeWebSocketInterface?.sendDataMessage({ - type: MESSAGE_TYPES.GQL_DATA, + type: MESSAGE_TYPES.DATA, payload: { data: {} }, }); @@ -571,7 +569,7 @@ describe('AWSAppSyncRealTimeProvider', () => { connectionTimeoutMs: 100, }); await fakeWebSocketInterface?.sendDataMessage({ - type: MESSAGE_TYPES.GQL_DATA, + type: MESSAGE_TYPES.DATA, payload: { data: {} }, }); @@ -597,7 +595,7 @@ describe('AWSAppSyncRealTimeProvider', () => { connectionTimeoutMs: 100, }); await fakeWebSocketInterface?.sendDataMessage({ - type: MESSAGE_TYPES.GQL_DATA, + type: MESSAGE_TYPES.DATA, payload: { data: {} }, }); expect(mockNext).toHaveBeenCalled(); @@ -677,7 +675,9 @@ describe('AWSAppSyncRealTimeProvider', () => { }), ); - expect(socketCloseSpy).toHaveBeenNthCalledWith(1, 3001); + await delay(1); + + expect(socketCloseSpy).toHaveBeenCalledWith(3001); }); test('subscription observer error is triggered when a connection is formed', async () => { @@ -931,7 +931,7 @@ describe('AWSAppSyncRealTimeProvider', () => { await fakeWebSocketInterface?.standardConnectionHandshake(); await fakeWebSocketInterface?.sendDataMessage({ - type: MESSAGE_TYPES.GQL_DATA, + type: MESSAGE_TYPES.DATA, payload: { data: {} }, }); await subscription.unsubscribe(); @@ -1181,7 +1181,7 @@ describe('AWSAppSyncRealTimeProvider', () => { }); test('authenticating with AWS_LAMBDA/custom w/ custom header function that accepts request options', async () => { - expect.assertions(2); + expect.assertions(3); provider .subscribe({ diff --git a/packages/api-graphql/__tests__/GraphQLAPI.test.ts b/packages/api-graphql/__tests__/GraphQLAPI.test.ts index 4b40df78f3e..8f2e953d906 100644 --- a/packages/api-graphql/__tests__/GraphQLAPI.test.ts +++ b/packages/api-graphql/__tests__/GraphQLAPI.test.ts @@ -5,9 +5,9 @@ import { Amplify as AmplifyCore } from '@aws-amplify/core'; import * as typedQueries from './fixtures/with-types/queries'; import * as typedSubscriptions from './fixtures/with-types/subscriptions'; import { expectGet } from './utils/expects'; -import { InternalGraphQLAPIClass } from '../src/internals/InternalGraphQLAPI'; import { GraphQLAuthMode } from '@aws-amplify/core/internals/utils'; import { INTERNAL_USER_AGENT_OVERRIDE } from '@aws-amplify/data-schema/runtime'; +import * as graphqlAuth from '../src/internals/graphqlAuth'; import { __amplify, @@ -1489,10 +1489,7 @@ describe('API test', () => { }, }; - const spy = jest.spyOn( - InternalGraphQLAPIClass.prototype as any, - '_headerBasedAuth', - ); + const spy = jest.spyOn(graphqlAuth, 'headerBasedAuth'); const spy2 = jest .spyOn((raw.GraphQLAPI as any)._api, 'post') @@ -1515,6 +1512,7 @@ describe('API test', () => { getConfig: expect.any(Function), }), 'iam', + 'FAKE-KEY', {}, ); }); @@ -1579,7 +1577,7 @@ describe('API test', () => { const spyon_appsync_realtime = jest .spyOn( AWSAppSyncRealTimeProvider.prototype as any, - '_initializeRetryableHandshake', + '_establishRetryableConnection', ) .mockImplementation( jest.fn(() => { diff --git a/packages/api-graphql/__tests__/appsyncUrl.test.ts b/packages/api-graphql/__tests__/appsyncUrl.test.ts new file mode 100644 index 00000000000..1f5ffdc25a9 --- /dev/null +++ b/packages/api-graphql/__tests__/appsyncUrl.test.ts @@ -0,0 +1,14 @@ +import { getRealtimeEndpointUrl } from '../src/Providers/AWSWebSocketProvider/appsyncUrl'; + +describe('getRealtimeEndpointUrl', () => { + test('events', () => { + const httpUrl = + 'https://abcdefghijklmnopqrstuvwxyz.appsync-api.us-east-1.amazonaws.com/event'; + + const res = getRealtimeEndpointUrl(httpUrl).toString(); + + expect(res).toEqual( + 'wss://abcdefghijklmnopqrstuvwxyz.appsync-realtime-api.us-east-1.amazonaws.com/event/realtime', + ); + }); +}); diff --git a/packages/api-graphql/__tests__/events.test.ts b/packages/api-graphql/__tests__/events.test.ts new file mode 100644 index 00000000000..a1f8054e3dd --- /dev/null +++ b/packages/api-graphql/__tests__/events.test.ts @@ -0,0 +1,215 @@ +import { Amplify } from '@aws-amplify/core'; +import { AppSyncEventProvider } from '../src/Providers/AWSAppSyncEventsProvider'; + +import { events } from '../src/'; +import { appsyncRequest } from '../src/internals/events/appsyncRequest'; + +import { GraphQLAuthMode } from '@aws-amplify/core/internals/utils'; + +const abortController = new AbortController(); + +var mockSubscribeObservable: any; + +jest.mock('../src/Providers/AWSAppSyncEventsProvider', () => { + mockSubscribeObservable = jest.fn(() => ({ + subscribe: jest.fn(), + })); + + return { + AppSyncEventProvider: { + connect: jest.fn(), + subscribe: jest.fn(mockSubscribeObservable), + publish: jest.fn(), + close: jest.fn(), + }, + }; +}); + +jest.mock('../src/internals/events/appsyncRequest', () => { + return { + appsyncRequest: jest.fn().mockResolvedValue({}), + }; +}); + +/** + * Note: thorough auth testing, including validating correct auth header generation + * is performed in __tests__/AWSAppSyncRealTimeProvider.test.ts + * + * The auth implementation is shared between AWSAppSyncEventsProvider and AWSAppSyncRealTimeProvider, + * so we're just sanity checking that the expected auth mode is passed to the provider in this test file. + */ + +describe('Events client', () => { + afterAll(() => { + jest.resetAllMocks(); + jest.clearAllMocks(); + }); + + describe('config', () => { + test('no configure()', async () => { + await expect(events.connect('/')).rejects.toThrow( + 'Amplify configuration is missing. Have you called Amplify.configure()?', + ); + }); + + test('manual (resource config)', async () => { + Amplify.configure({ + API: { + Events: { + endpoint: + 'https://not-a-real.ddpg-api.us-west-2.amazonaws.com/event', + region: 'us-west-2', + defaultAuthMode: 'apiKey', + apiKey: 'da2-abcxyz321', + }, + }, + }); + + await expect(events.connect('/')).resolves.not.toThrow(); + }); + + test('outputs (amplify-backend config)', async () => { + Amplify.configure({ + custom: { + events: { + url: 'https://not-a-real.ddpg-api.us-west-2.amazonaws.com/event', + aws_region: 'us-west-2', + default_authorization_type: 'API_KEY', + api_key: 'da2-abcxyz321', + }, + }, + version: '1.2', + }); + + await expect(events.connect('/')).resolves.not.toThrow(); + }); + }); + + describe('client', () => { + beforeEach(() => { + Amplify.configure({ + API: { + Events: { + endpoint: + 'https://not-a-real.ddpg-api.us-west-2.amazonaws.com/event', + region: 'us-west-2', + defaultAuthMode: 'apiKey', + apiKey: 'da2-abcxyz321', + }, + }, + }); + }); + + const authModes: GraphQLAuthMode[] = [ + 'apiKey', + 'userPool', + 'oidc', + 'iam', + 'lambda', + 'none', + ]; + + describe('channel', () => { + test('happy connect', async () => { + const channel = await events.connect('/'); + + expect(channel.subscribe).toBeInstanceOf(Function); + expect(channel.close).toBeInstanceOf(Function); + }); + + describe('auth modes', () => { + let mockProvider: typeof AppSyncEventProvider; + + beforeEach(() => { + mockProvider = AppSyncEventProvider; + }); + + for (const authMode of authModes) { + test(`auth override: ${authMode}`, async () => { + await events.connect('/', { authMode }); + + expect(mockProvider.connect).toHaveBeenCalledWith( + expect.objectContaining({ authenticationType: authMode }), + ); + }); + } + }); + }); + + describe('subscribe', () => { + test('happy subscribe', async () => { + const channel = await events.connect('/'); + + channel.subscribe({ + next: data => void data, + error: error => void error, + }); + }); + + describe('auth modes', () => { + let mockProvider: typeof AppSyncEventProvider; + + beforeEach(() => { + mockProvider = AppSyncEventProvider; + }); + + for (const authMode of authModes) { + test(`auth override: ${authMode}`, async () => { + const channel = await events.connect('/'); + + channel.subscribe( + { + next: data => void data, + error: error => void error, + }, + { authMode }, + ); + + expect(mockSubscribeObservable).toHaveBeenCalledWith( + expect.objectContaining({ authenticationType: authMode }), + ); + }); + } + }); + }); + + describe('post', () => { + let mockReq: typeof appsyncRequest; + + beforeEach(() => { + mockReq = appsyncRequest; + }); + + test('happy post', async () => { + await events.post('/', { test: 'data' }); + + expect(mockReq).toHaveBeenCalledWith( + Amplify, + expect.objectContaining({ + query: '/', + variables: ['{"test":"data"}'], + }), + {}, + abortController, + ); + }); + + for (const authMode of authModes) { + test(`auth override: ${authMode}`, async () => { + await events.post('/', { test: 'data' }, { authMode }); + + expect(mockReq).toHaveBeenCalledWith( + Amplify, + expect.objectContaining({ + query: '/', + variables: ['{"test":"data"}'], + authenticationType: authMode, + }), + {}, + abortController, + ); + }); + } + }); + }); +}); diff --git a/packages/api-graphql/src/Providers/AWSAppSyncEventsProvider/index.ts b/packages/api-graphql/src/Providers/AWSAppSyncEventsProvider/index.ts new file mode 100644 index 00000000000..5bfeddd89fa --- /dev/null +++ b/packages/api-graphql/src/Providers/AWSAppSyncEventsProvider/index.ts @@ -0,0 +1,207 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 +import { + CustomUserAgentDetails, + DocumentType, + GraphQLAuthMode, + USER_AGENT_HEADER, + getAmplifyUserAgent, +} from '@aws-amplify/core/internals/utils'; +import { CustomHeaders } from '@aws-amplify/data-schema/runtime'; + +import { DEFAULT_KEEP_ALIVE_TIMEOUT, MESSAGE_TYPES } from '../constants'; +import { AWSWebSocketProvider } from '../AWSWebSocketProvider'; +import { awsRealTimeHeaderBasedAuth } from '../AWSWebSocketProvider/authHeaders'; + +// resolved/actual AuthMode values. identityPool gets resolves to IAM upstream in InternalGraphQLAPI._graphqlSubscribe +type ResolvedGraphQLAuthModes = Exclude; + +interface AWSAppSyncEventProviderOptions { + appSyncGraphqlEndpoint?: string; + authenticationType?: ResolvedGraphQLAuthModes; + query?: string; + variables?: DocumentType; + apiKey?: string; + region?: string; + libraryConfigHeaders?(): Promise | Headers>; + additionalHeaders?: CustomHeaders; + additionalCustomHeaders?: Record; + authToken?: string; +} + +interface DataPayload { + id: string; + event: string; + type: string; +} + +interface DataResponse { + id: string; + payload: string; + type: string; +} + +const PROVIDER_NAME = 'AWSAppSyncEventsProvider'; +const WS_PROTOCOL_NAME = 'aws-appsync-event-ws'; + +export class AWSAppSyncEventProvider extends AWSWebSocketProvider { + constructor() { + super({ providerName: PROVIDER_NAME, wsProtocolName: WS_PROTOCOL_NAME }); + } + + getProviderName() { + return PROVIDER_NAME; + } + + public async connect(options: AWSAppSyncEventProviderOptions) { + super.connect(options); + } + + public subscribe( + options?: AWSAppSyncEventProviderOptions, + customUserAgentDetails?: CustomUserAgentDetails, + ) { + return super.subscribe(options, customUserAgentDetails).pipe(); + } + + public async publish( + options: AWSAppSyncEventProviderOptions, + customUserAgentDetails?: CustomUserAgentDetails, + ) { + super.publish(options, customUserAgentDetails); + } + + protected async _prepareSubscriptionPayload({ + options, + subscriptionId, + customUserAgentDetails, + additionalCustomHeaders, + libraryConfigHeaders, + publish, + }: { + options: AWSAppSyncEventProviderOptions; + subscriptionId: string; + customUserAgentDetails: CustomUserAgentDetails | undefined; + additionalCustomHeaders: Record; + libraryConfigHeaders: Record; + publish?: boolean; + }): Promise { + const { + appSyncGraphqlEndpoint, + authenticationType, + query, + variables, + apiKey, + region, + } = options; + + // This will be needed for WS publish + // const data = { + // events: [variables], + // }; + + const serializedData = JSON.stringify([variables]); + + const headers = { + ...(await awsRealTimeHeaderBasedAuth({ + apiKey, + appSyncGraphqlEndpoint, + authenticationType, + payload: serializedData, + canonicalUri: '', + region, + additionalCustomHeaders, + })), + ...libraryConfigHeaders, + ...additionalCustomHeaders, + [USER_AGENT_HEADER]: getAmplifyUserAgent(customUserAgentDetails), + }; + + // Commented out code will be needed for WS publish + const subscriptionMessage = { + id: subscriptionId, + channel: query, + // events: [JSON.stringify(variables)], + authorization: { + ...headers, + }, + // payload: { + // events: serializedData, + // extensions: { + // authorization: { + // ...headers, + // }, + // }, + // }, + type: publish + ? MESSAGE_TYPES.EVENT_PUBLISH + : MESSAGE_TYPES.EVENT_SUBSCRIBE, + }; + + const serializedSubscriptionMessage = JSON.stringify(subscriptionMessage); + + return serializedSubscriptionMessage; + } + + protected _handleSubscriptionData( + message: MessageEvent, + ): [boolean, DataResponse] { + this.logger.debug( + `subscription message from AWS AppSync Events: ${message.data}`, + ); + + const { + id = '', + event: payload, + type, + }: DataPayload = JSON.parse(String(message.data)); + + const { + observer = null, + query = '', + variables = {}, + } = this.subscriptionObserverMap.get(id) || {}; + + this.logger.debug({ id, observer, query, variables }); + + if (type === MESSAGE_TYPES.DATA && payload) { + const deserializedEvent = JSON.parse(payload); + if (observer) { + observer.next(deserializedEvent); + } else { + this.logger.debug(`observer not found for id: ${id}`); + } + + return [true, { id, type, payload: deserializedEvent }]; + } + + return [false, { id, type, payload }]; + } + + protected _unsubscribeMessage(subscriptionId: string): { + id: string; + type: string; + } { + return { + id: subscriptionId, + type: MESSAGE_TYPES.EVENT_STOP, + }; + } + + protected _extractConnectionTimeout(data: Record): number { + const { connectionTimeoutMs = DEFAULT_KEEP_ALIVE_TIMEOUT } = data; + + return connectionTimeoutMs; + } + + protected _extractErrorCodeAndType(data: Record): { + errorCode: number; + errorType: string; + } { + const { errors: [{ errorType = '', errorCode = 0 } = {}] = [] } = data; + + return { errorCode, errorType }; + } +} + +export const AppSyncEventProvider = new AWSAppSyncEventProvider(); diff --git a/packages/api-graphql/src/Providers/AWSAppSyncRealTimeProvider/index.ts b/packages/api-graphql/src/Providers/AWSAppSyncRealTimeProvider/index.ts index 61a1f6149bc..2ba5968a890 100644 --- a/packages/api-graphql/src/Providers/AWSAppSyncRealTimeProvider/index.ts +++ b/packages/api-graphql/src/Providers/AWSAppSyncRealTimeProvider/index.ts @@ -1,107 +1,27 @@ // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 -import { Observable, SubscriptionLike } from 'rxjs'; -import { GraphQLError } from 'graphql'; -import { - ConsoleLogger, - Hub, - HubPayload, - fetchAuthSession, -} from '@aws-amplify/core'; -import { signRequest } from '@aws-amplify/core/internals/aws-client-utils'; + import { - AmplifyUrl, - AmplifyUrlSearchParams, CustomUserAgentDetails, DocumentType, GraphQLAuthMode, - NonRetryableError, USER_AGENT_HEADER, - amplifyUuid, - base64Encoder, getAmplifyUserAgent, - isNonRetryableError, - jitteredExponentialRetry, } from '@aws-amplify/core/internals/utils'; -import { - CustomHeaders, - RequestOptions, -} from '@aws-amplify/data-schema/runtime'; - -import { - CONTROL_MSG, - ConnectionState, - PubSubContentObserver, -} from '../../types/PubSub'; -import { - AMPLIFY_SYMBOL, - AWS_APPSYNC_REALTIME_HEADERS, - CONNECTION_INIT_TIMEOUT, - CONNECTION_STATE_CHANGE, - DEFAULT_KEEP_ALIVE_ALERT_TIMEOUT, - DEFAULT_KEEP_ALIVE_TIMEOUT, - MAX_DELAY_MS, - MESSAGE_TYPES, - NON_RETRYABLE_CODES, - SOCKET_STATUS, - START_ACK_TIMEOUT, - SUBSCRIPTION_STATUS, -} from '../constants'; -import { - CONNECTION_CHANGE, - ConnectionStateMonitor, -} from '../../utils/ConnectionStateMonitor'; -import { - ReconnectEvent, - ReconnectionMonitor, -} from '../../utils/ReconnectionMonitor'; +import { CustomHeaders } from '@aws-amplify/data-schema/runtime'; -const logger = new ConsoleLogger('AWSAppSyncRealTimeProvider'); - -const dispatchApiEvent = (payload: HubPayload) => { - Hub.dispatch('api', payload, 'PubSub', AMPLIFY_SYMBOL); -}; +import { DEFAULT_KEEP_ALIVE_TIMEOUT, MESSAGE_TYPES } from '../constants'; +import { AWSWebSocketProvider } from '../AWSWebSocketProvider'; +import { awsRealTimeHeaderBasedAuth } from '../AWSWebSocketProvider/authHeaders'; // resolved/actual AuthMode values. identityPool gets resolves to IAM upstream in InternalGraphQLAPI._graphqlSubscribe type ResolvedGraphQLAuthModes = Exclude; -export interface ObserverQuery { - observer: PubSubContentObserver; - query: string; - variables: Record; - subscriptionState: SUBSCRIPTION_STATUS; - subscriptionReadyCallback?(): void; - subscriptionFailedCallback?(reason?: any): void; - startAckTimeoutId?: ReturnType; -} - -const standardDomainPattern = - /^https:\/\/\w{26}\.appsync-api\.\w{2}(?:(?:-\w{2,})+)-\d\.amazonaws.com(?:\.cn)?\/graphql$/i; - -const customDomainPath = '/realtime'; - -interface DataObject extends Record { - data: Record; -} - -interface DataPayload { - id: string; - payload: DataObject; - type: string; -} - -interface ParsedMessagePayload { - type: string; - payload: { - connectionTimeoutMs: number; - errors?: [{ errorType: string; errorCode: number }]; - }; -} export interface AWSAppSyncRealTimeProviderOptions { appSyncGraphqlEndpoint?: string; authenticationType?: ResolvedGraphQLAuthModes; query?: string; - variables?: Record; + variables?: DocumentType; apiKey?: string; region?: string; libraryConfigHeaders?(): Promise | Headers>; @@ -110,209 +30,48 @@ export interface AWSAppSyncRealTimeProviderOptions { authToken?: string; } -type AWSAppSyncRealTimeAuthInput = - Partial & { - canonicalUri: string; - payload: string; - host?: string | undefined; - }; - -export class AWSAppSyncRealTimeProvider { - private awsRealTimeSocket?: WebSocket; - private socketStatus: SOCKET_STATUS = SOCKET_STATUS.CLOSED; - private keepAliveTimeoutId?: ReturnType; - private keepAliveTimeout = DEFAULT_KEEP_ALIVE_TIMEOUT; - private keepAliveAlertTimeoutId?: ReturnType; - private subscriptionObserverMap = new Map(); - private promiseArray: { res(): void; rej(reason?: any): void }[] = []; - private connectionState: ConnectionState | undefined; - private readonly connectionStateMonitor = new ConnectionStateMonitor(); - private readonly reconnectionMonitor = new ReconnectionMonitor(); - private connectionStateMonitorSubscription: SubscriptionLike; - - constructor() { - // Monitor the connection state and pass changes along to Hub - this.connectionStateMonitorSubscription = - this.connectionStateMonitor.connectionStateObservable.subscribe( - connectionState => { - dispatchApiEvent({ - event: CONNECTION_STATE_CHANGE, - data: { - provider: this, - connectionState, - }, - message: `Connection state is ${connectionState}`, - }); - this.connectionState = connectionState; - - // Trigger START_RECONNECT when the connection is disrupted - if (connectionState === ConnectionState.ConnectionDisrupted) { - this.reconnectionMonitor.record(ReconnectEvent.START_RECONNECT); - } - - // Trigger HALT_RECONNECT to halt reconnection attempts when the state is anything other than - // ConnectionDisrupted or Connecting - if ( - [ - ConnectionState.Connected, - ConnectionState.ConnectedPendingDisconnect, - ConnectionState.ConnectedPendingKeepAlive, - ConnectionState.ConnectedPendingNetwork, - ConnectionState.ConnectionDisruptedPendingNetwork, - ConnectionState.Disconnected, - ].includes(connectionState) - ) { - this.reconnectionMonitor.record(ReconnectEvent.HALT_RECONNECT); - } - }, - ); - } +interface DataObject extends Record { + data: Record; +} - /** - * Mark the socket closed and release all active listeners - */ - close() { - // Mark the socket closed both in status and the connection monitor - this.socketStatus = SOCKET_STATUS.CLOSED; - this.connectionStateMonitor.record(CONNECTION_CHANGE.CONNECTION_FAILED); +interface DataPayload { + id: string; + payload: DataObject; + type: string; +} - // Turn off the subscription monitor Hub publishing - this.connectionStateMonitorSubscription.unsubscribe(); - // Complete all reconnect observers - this.reconnectionMonitor.close(); - } +const PROVIDER_NAME = 'AWSAppSyncRealTimeProvider'; +const WS_PROTOCOL_NAME = 'graphql-ws'; - getNewWebSocket(url: string, protocol: string[]) { - return new WebSocket(url, protocol); +export class AWSAppSyncRealTimeProvider extends AWSWebSocketProvider { + constructor() { + super({ providerName: PROVIDER_NAME, wsProtocolName: WS_PROTOCOL_NAME }); } getProviderName() { - return 'AWSAppSyncRealTimeProvider'; + return PROVIDER_NAME; } - // Check if url matches standard domain pattern - private isCustomDomain(url: string): boolean { - return url.match(standardDomainPattern) === null; - } - - subscribe( + public subscribe( options?: AWSAppSyncRealTimeProviderOptions, customUserAgentDetails?: CustomUserAgentDetails, - ): Observable> { - const { - appSyncGraphqlEndpoint, - region, - query, - variables, - authenticationType, - additionalHeaders, - apiKey, - authToken, - libraryConfigHeaders, - } = options || {}; - - return new Observable(observer => { - if (!options || !appSyncGraphqlEndpoint) { - observer.error({ - errors: [ - { - ...new GraphQLError( - `Subscribe only available for AWS AppSync endpoint`, - ), - }, - ], - }); - observer.complete(); - } else { - let subscriptionStartActive = false; - const subscriptionId = amplifyUuid(); - const startSubscription = () => { - if (!subscriptionStartActive) { - subscriptionStartActive = true; - - const startSubscriptionPromise = - this._startSubscriptionWithAWSAppSyncRealTime({ - options: { - query, - variables, - region, - authenticationType, - appSyncGraphqlEndpoint, - additionalHeaders, - apiKey, - authToken, - libraryConfigHeaders, - }, - observer, - subscriptionId, - customUserAgentDetails, - }).catch(err => { - logger.debug( - `${CONTROL_MSG.REALTIME_SUBSCRIPTION_INIT_ERROR}: ${err}`, - ); - - this.connectionStateMonitor.record(CONNECTION_CHANGE.CLOSED); - }); - startSubscriptionPromise.finally(() => { - subscriptionStartActive = false; - }); - } - }; - - // Add an observable to the reconnection list to manage reconnection for this subscription - const reconnectSubscription = new Observable( - reconnectSubscriptionObserver => { - this.reconnectionMonitor.addObserver(reconnectSubscriptionObserver); - }, - ).subscribe(() => { - startSubscription(); - }); - - startSubscription(); - - return async () => { - // Cleanup reconnection subscription - reconnectSubscription?.unsubscribe(); - - // Cleanup after unsubscribing or observer.complete was called after _startSubscriptionWithAWSAppSyncRealTime - try { - // Waiting that subscription has been connected before trying to unsubscribe - await this._waitForSubscriptionToBeConnected(subscriptionId); - - const { subscriptionState } = - this.subscriptionObserverMap.get(subscriptionId) || {}; - - if (!subscriptionState) { - // subscription already unsubscribed - return; - } - - if (subscriptionState === SUBSCRIPTION_STATUS.CONNECTED) { - this._sendUnsubscriptionMessage(subscriptionId); - } else { - throw new Error('Subscription never connected'); - } - } catch (err) { - logger.debug(`Error while unsubscribing ${err}`); - } finally { - this._removeSubscriptionObserver(subscriptionId); - } - }; - } - }); + ) { + return super.subscribe(options, customUserAgentDetails); } - private async _startSubscriptionWithAWSAppSyncRealTime({ + protected async _prepareSubscriptionPayload({ options, - observer, subscriptionId, customUserAgentDetails, + additionalCustomHeaders, + libraryConfigHeaders, }: { options: AWSAppSyncRealTimeProviderOptions; - observer: PubSubContentObserver; subscriptionId: string; customUserAgentDetails: CustomUserAgentDetails | undefined; - }) { + additionalCustomHeaders: Record; + libraryConfigHeaders: Record; + }): Promise { const { appSyncGraphqlEndpoint, authenticationType, @@ -320,59 +79,24 @@ export class AWSAppSyncRealTimeProvider { variables, apiKey, region, - libraryConfigHeaders = () => ({}), - additionalHeaders = {}, - authToken, } = options; - - let additionalCustomHeaders: Record = {}; - - if (typeof additionalHeaders === 'function') { - const requestOptions: RequestOptions = { - url: appSyncGraphqlEndpoint || '', - queryString: query || '', - }; - additionalCustomHeaders = await additionalHeaders(requestOptions); - } else { - additionalCustomHeaders = additionalHeaders; - } - - // if an authorization header is set, have the explicit authToken take precedence - if (authToken) { - additionalCustomHeaders = { - ...additionalCustomHeaders, - Authorization: authToken, - }; - } - - const subscriptionState: SUBSCRIPTION_STATUS = SUBSCRIPTION_STATUS.PENDING; const data = { query, variables, }; - // Having a subscription id map will make it simple to forward messages received - this.subscriptionObserverMap.set(subscriptionId, { - observer, - query: query ?? '', - variables: variables ?? {}, - subscriptionState, - startAckTimeoutId: undefined, - }); - - // Preparing payload for subscription message + const serializedData = JSON.stringify(data); - const dataString = JSON.stringify(data); - const headerObj = { - ...(await this._awsRealTimeHeaderBasedAuth({ + const headers = { + ...(await awsRealTimeHeaderBasedAuth({ apiKey, appSyncGraphqlEndpoint, authenticationType, - payload: dataString, + payload: serializedData, canonicalUri: '', region, additionalCustomHeaders, })), - ...(await libraryConfigHeaders()), + ...libraryConfigHeaders, ...additionalCustomHeaders, [USER_AGENT_HEADER]: getAmplifyUserAgent(customUserAgentDetails), }; @@ -380,775 +104,77 @@ export class AWSAppSyncRealTimeProvider { const subscriptionMessage = { id: subscriptionId, payload: { - data: dataString, + data: serializedData, extensions: { authorization: { - ...headerObj, + ...headers, }, }, }, type: MESSAGE_TYPES.GQL_START, }; - const stringToAWSRealTime = JSON.stringify(subscriptionMessage); - - try { - this.connectionStateMonitor.record(CONNECTION_CHANGE.OPENING_CONNECTION); - await this._initializeWebSocketConnection({ - apiKey, - appSyncGraphqlEndpoint, - authenticationType, - region, - additionalCustomHeaders, - }); - } catch (err: any) { - this._logStartSubscriptionError(subscriptionId, observer, err); - - return; - } - - // Potential race condition can occur when unsubscribe is called during _initializeWebSocketConnection. - // E.g.unsubscribe gets invoked prior to finishing WebSocket handshake or START_ACK. - // Both subscriptionFailedCallback and subscriptionReadyCallback are used to synchronized this. - - const { subscriptionFailedCallback, subscriptionReadyCallback } = - this.subscriptionObserverMap.get(subscriptionId) ?? {}; - - // This must be done before sending the message in order to be listening immediately - this.subscriptionObserverMap.set(subscriptionId, { - observer, - subscriptionState, - query: query ?? '', - variables: variables ?? {}, - subscriptionReadyCallback, - subscriptionFailedCallback, - startAckTimeoutId: setTimeout(() => { - this._timeoutStartSubscriptionAck(subscriptionId); - }, START_ACK_TIMEOUT), - }); - if (this.awsRealTimeSocket) { - this.awsRealTimeSocket.send(stringToAWSRealTime); - } - } - - // Log logic for start subscription failures - private _logStartSubscriptionError( - subscriptionId: string, - observer: PubSubContentObserver, - err: { message?: string }, - ) { - logger.debug({ err }); - const message = String(err.message ?? ''); - // Resolving to give the state observer time to propogate the update - this.connectionStateMonitor.record(CONNECTION_CHANGE.CLOSED); - - // Capture the error only when the network didn't cause disruption - if ( - this.connectionState !== ConnectionState.ConnectionDisruptedPendingNetwork - ) { - // When the error is non-retriable, error out the observable - if (isNonRetryableError(err)) { - observer.error({ - errors: [ - { - ...new GraphQLError( - `${CONTROL_MSG.CONNECTION_FAILED}: ${message}`, - ), - }, - ], - }); - } else { - logger.debug(`${CONTROL_MSG.CONNECTION_FAILED}: ${message}`); - } - - const { subscriptionFailedCallback } = - this.subscriptionObserverMap.get(subscriptionId) || {}; - - // Notify concurrent unsubscription - if (typeof subscriptionFailedCallback === 'function') { - subscriptionFailedCallback(); - } - } - } - - // Waiting that subscription has been connected before trying to unsubscribe - private async _waitForSubscriptionToBeConnected(subscriptionId: string) { - const subscriptionObserver = - this.subscriptionObserverMap.get(subscriptionId); - if (subscriptionObserver) { - const { subscriptionState } = subscriptionObserver; - // This in case unsubscribe is invoked before sending start subscription message - if (subscriptionState === SUBSCRIPTION_STATUS.PENDING) { - return new Promise((resolve, reject) => { - const { - observer, - subscriptionState: observedSubscriptionState, - variables, - query, - } = subscriptionObserver; - this.subscriptionObserverMap.set(subscriptionId, { - observer, - subscriptionState: observedSubscriptionState, - variables, - query, - subscriptionReadyCallback: resolve, - subscriptionFailedCallback: reject, - }); - }); - } - } - } - - private _sendUnsubscriptionMessage(subscriptionId: string) { - try { - if ( - this.awsRealTimeSocket && - this.awsRealTimeSocket.readyState === WebSocket.OPEN && - this.socketStatus === SOCKET_STATUS.READY - ) { - // Preparing unsubscribe message to stop receiving messages for that subscription - const unsubscribeMessage = { - id: subscriptionId, - type: MESSAGE_TYPES.GQL_STOP, - }; - const stringToAWSRealTime = JSON.stringify(unsubscribeMessage); - this.awsRealTimeSocket.send(stringToAWSRealTime); - } - } catch (err) { - // If GQL_STOP is not sent because of disconnection issue, then there is nothing the client can do - logger.debug({ err }); - } - } - - private _removeSubscriptionObserver(subscriptionId: string) { - this.subscriptionObserverMap.delete(subscriptionId); - - // Verifying 1000ms after removing subscription in case there are new subscription unmount/mount - setTimeout(this._closeSocketIfRequired.bind(this), 1000); - } - - private _closeSocketIfRequired() { - if (this.subscriptionObserverMap.size > 0) { - // Active subscriptions on the WebSocket - return; - } - - if (!this.awsRealTimeSocket) { - this.socketStatus = SOCKET_STATUS.CLOSED; + const serializedSubscriptionMessage = JSON.stringify(subscriptionMessage); - return; - } - - this.connectionStateMonitor.record(CONNECTION_CHANGE.CLOSING_CONNECTION); - - if (this.awsRealTimeSocket.bufferedAmount > 0) { - // Still data on the WebSocket - setTimeout(this._closeSocketIfRequired.bind(this), 1000); - } else { - logger.debug('closing WebSocket...'); - if (this.keepAliveTimeoutId) { - clearTimeout(this.keepAliveTimeoutId); - } - if (this.keepAliveAlertTimeoutId) { - clearTimeout(this.keepAliveAlertTimeoutId); - } - const tempSocket = this.awsRealTimeSocket; - // Cleaning callbacks to avoid race condition, socket still exists - tempSocket.onclose = null; - tempSocket.onerror = null; - tempSocket.close(1000); - this.awsRealTimeSocket = undefined; - this.socketStatus = SOCKET_STATUS.CLOSED; - this.connectionStateMonitor.record(CONNECTION_CHANGE.CLOSED); - } + return serializedSubscriptionMessage; } - private _handleIncomingSubscriptionMessage(message: MessageEvent) { - if (typeof message.data !== 'string') { - return; - } - logger.debug( + protected _handleSubscriptionData( + message: MessageEvent, + ): [boolean, DataPayload] { + this.logger.debug( `subscription message from AWS AppSync RealTime: ${message.data}`, ); - const { - id = '', - payload, - type, - }: DataPayload = JSON.parse(String(message.data)); + + const { id = '', payload, type } = JSON.parse(String(message.data)); + const { observer = null, query = '', variables = {}, - startAckTimeoutId, - subscriptionReadyCallback, - subscriptionFailedCallback, } = this.subscriptionObserverMap.get(id) || {}; - logger.debug({ id, observer, query, variables }); + this.logger.debug({ id, observer, query, variables }); - if (type === MESSAGE_TYPES.GQL_DATA && payload && payload.data) { + if (type === MESSAGE_TYPES.DATA && payload && payload.data) { if (observer) { observer.next(payload); } else { - logger.debug(`observer not found for id: ${id}`); - } - - return; - } - - if (type === MESSAGE_TYPES.GQL_START_ACK) { - logger.debug( - `subscription ready for ${JSON.stringify({ query, variables })}`, - ); - if (typeof subscriptionReadyCallback === 'function') { - subscriptionReadyCallback(); - } - if (startAckTimeoutId) clearTimeout(startAckTimeoutId); - dispatchApiEvent({ - event: CONTROL_MSG.SUBSCRIPTION_ACK, - data: { query, variables }, - message: 'Connection established for subscription', - }); - const subscriptionState = SUBSCRIPTION_STATUS.CONNECTED; - if (observer) { - this.subscriptionObserverMap.set(id, { - observer, - query, - variables, - startAckTimeoutId: undefined, - subscriptionState, - subscriptionReadyCallback, - subscriptionFailedCallback, - }); - } - this.connectionStateMonitor.record( - CONNECTION_CHANGE.CONNECTION_ESTABLISHED, - ); - - return; - } - - if (type === MESSAGE_TYPES.GQL_CONNECTION_KEEP_ALIVE) { - if (this.keepAliveTimeoutId) clearTimeout(this.keepAliveTimeoutId); - if (this.keepAliveAlertTimeoutId) - clearTimeout(this.keepAliveAlertTimeoutId); - this.keepAliveTimeoutId = setTimeout(() => { - this._errorDisconnect(CONTROL_MSG.TIMEOUT_DISCONNECT); - }, this.keepAliveTimeout); - this.keepAliveAlertTimeoutId = setTimeout(() => { - this.connectionStateMonitor.record(CONNECTION_CHANGE.KEEP_ALIVE_MISSED); - }, DEFAULT_KEEP_ALIVE_ALERT_TIMEOUT); - this.connectionStateMonitor.record(CONNECTION_CHANGE.KEEP_ALIVE); - - return; - } - - if (type === MESSAGE_TYPES.GQL_ERROR) { - const subscriptionState = SUBSCRIPTION_STATUS.FAILED; - if (observer) { - this.subscriptionObserverMap.set(id, { - observer, - query, - variables, - startAckTimeoutId, - subscriptionReadyCallback, - subscriptionFailedCallback, - subscriptionState, - }); - - logger.debug( - `${CONTROL_MSG.CONNECTION_FAILED}: ${JSON.stringify(payload)}`, - ); - - observer.error({ - errors: [ - { - ...new GraphQLError( - `${CONTROL_MSG.CONNECTION_FAILED}: ${JSON.stringify(payload)}`, - ), - }, - ], - }); - - if (startAckTimeoutId) clearTimeout(startAckTimeoutId); - - if (typeof subscriptionFailedCallback === 'function') { - subscriptionFailedCallback(); - } - } - } - } - - private _errorDisconnect(msg: string) { - logger.debug(`Disconnect error: ${msg}`); - - if (this.awsRealTimeSocket) { - this.connectionStateMonitor.record(CONNECTION_CHANGE.CLOSED); - this.awsRealTimeSocket.close(); - } - - this.socketStatus = SOCKET_STATUS.CLOSED; - } - - private _timeoutStartSubscriptionAck(subscriptionId: string) { - const subscriptionObserver = - this.subscriptionObserverMap.get(subscriptionId); - if (subscriptionObserver) { - const { observer, query, variables } = subscriptionObserver; - if (!observer) { - return; - } - this.subscriptionObserverMap.set(subscriptionId, { - observer, - query, - variables, - subscriptionState: SUBSCRIPTION_STATUS.FAILED, - }); - - this.connectionStateMonitor.record(CONNECTION_CHANGE.CLOSED); - logger.debug( - 'timeoutStartSubscription', - JSON.stringify({ query, variables }), - ); - } - } - - /** - * Strips out `Authorization` header if present - */ - private _extractNonAuthHeaders( - headers?: AWSAppSyncRealTimeProviderOptions['additionalCustomHeaders'], - ): Record { - if (!headers) { - return {}; - } - - if ('Authorization' in headers) { - const { Authorization: _, ...nonAuthHeaders } = headers; - - return nonAuthHeaders; - } - - return headers; - } - - /** - * - * @param headers - http headers - * @returns uri-encoded query parameters derived from custom headers - */ - private _queryParamsFromCustomHeaders( - headers?: AWSAppSyncRealTimeProviderOptions['additionalCustomHeaders'], - ): URLSearchParams { - const nonAuthHeaders = this._extractNonAuthHeaders(headers); - - const params = new AmplifyUrlSearchParams(); - - Object.entries(nonAuthHeaders).forEach(([k, v]) => { - params.append(k, v); - }); - - return params; - } - - /** - * Normalizes AppSync realtime endpoint URL - * - * @param appSyncGraphqlEndpoint - AppSync endpointUri from config - * @param urlParams - URLSearchParams - * @returns fully resolved string realtime endpoint URL - */ - private _realtimeUrlWithQueryString( - appSyncGraphqlEndpoint: string | undefined, - urlParams: URLSearchParams, - ): string { - const protocol = 'wss://'; - - let realtimeEndpoint = appSyncGraphqlEndpoint ?? ''; - - if (this.isCustomDomain(realtimeEndpoint)) { - realtimeEndpoint = realtimeEndpoint.concat(customDomainPath); - } else { - realtimeEndpoint = realtimeEndpoint - .replace('appsync-api', 'appsync-realtime-api') - .replace('gogi-beta', 'grt-beta'); - } - - realtimeEndpoint = realtimeEndpoint - .replace('https://', protocol) - .replace('http://', protocol); - - const realtimeEndpointUrl = new AmplifyUrl(realtimeEndpoint); - - // preserves any query params a customer might manually set in the configuration - const existingParams = new AmplifyUrlSearchParams( - realtimeEndpointUrl.search, - ); - - for (const [k, v] of urlParams.entries()) { - existingParams.append(k, v); - } - - realtimeEndpointUrl.search = existingParams.toString(); - - return realtimeEndpointUrl.toString(); - } - - private _initializeWebSocketConnection({ - appSyncGraphqlEndpoint, - authenticationType, - apiKey, - region, - additionalCustomHeaders, - }: AWSAppSyncRealTimeProviderOptions) { - if (this.socketStatus === SOCKET_STATUS.READY) { - return; - } - - // TODO(Eslint): refactor to now use async function as the promise executor - // eslint-disable-next-line no-async-promise-executor - return new Promise(async (resolve, reject) => { - this.promiseArray.push({ res: resolve, rej: reject }); - - if (this.socketStatus === SOCKET_STATUS.CLOSED) { - try { - this.socketStatus = SOCKET_STATUS.CONNECTING; - - const payloadString = '{}'; - - const authHeader = await this._awsRealTimeHeaderBasedAuth({ - authenticationType, - payload: payloadString, - canonicalUri: '/connect', - apiKey, - appSyncGraphqlEndpoint, - region, - additionalCustomHeaders, - }); - - const headerString = authHeader ? JSON.stringify(authHeader) : ''; - // base64url-encoded string - const encodedHeader = base64Encoder.convert(headerString, { - urlSafe: true, - skipPadding: true, - }); - - const authTokenSubprotocol = `header-${encodedHeader}`; - - const queryParams = this._queryParamsFromCustomHeaders( - additionalCustomHeaders, - ); - - const awsRealTimeUrl = this._realtimeUrlWithQueryString( - appSyncGraphqlEndpoint, - queryParams, - ); - - await this._initializeRetryableHandshake( - awsRealTimeUrl, - authTokenSubprotocol, - ); - - this.promiseArray.forEach(({ res }) => { - logger.debug('Notifying connection successful'); - res(); - }); - this.socketStatus = SOCKET_STATUS.READY; - this.promiseArray = []; - } catch (err) { - logger.debug('Connection exited with', err); - this.promiseArray.forEach(({ rej }) => { - rej(err); - }); - this.promiseArray = []; - if ( - this.awsRealTimeSocket && - this.awsRealTimeSocket.readyState === WebSocket.OPEN - ) { - this.awsRealTimeSocket.close(3001); - } - this.awsRealTimeSocket = undefined; - this.socketStatus = SOCKET_STATUS.CLOSED; - } - } - }); - } - - private async _initializeRetryableHandshake( - awsRealTimeUrl: string, - subprotocol: string, - ) { - logger.debug(`Initializaling retryable Handshake`); - await jitteredExponentialRetry( - this._initializeHandshake.bind(this), - [awsRealTimeUrl, subprotocol], - MAX_DELAY_MS, - ); - } - - /** - * - * @param subprotocol - - */ - private async _initializeHandshake( - awsRealTimeUrl: string, - subprotocol: string, - ) { - logger.debug(`Initializing handshake ${awsRealTimeUrl}`); - // Because connecting the socket is async, is waiting until connection is open - // Step 1: connect websocket - try { - await (() => { - return new Promise((resolve, reject) => { - const newSocket = this.getNewWebSocket(awsRealTimeUrl, [ - 'graphql-ws', - subprotocol, - ]); - - newSocket.onerror = () => { - logger.debug(`WebSocket connection error`); - }; - newSocket.onclose = () => { - reject(new Error('Connection handshake error')); - }; - newSocket.onopen = () => { - this.awsRealTimeSocket = newSocket; - - resolve(); - }; - }); - })(); - // Step 2: wait for ack from AWS AppSyncReaTime after sending init - await (() => { - return new Promise((resolve, reject) => { - if (this.awsRealTimeSocket) { - let ackOk = false; - this.awsRealTimeSocket.onerror = error => { - logger.debug(`WebSocket error ${JSON.stringify(error)}`); - }; - this.awsRealTimeSocket.onclose = event => { - logger.debug(`WebSocket closed ${event.reason}`); - reject(new Error(JSON.stringify(event))); - }; - - this.awsRealTimeSocket.onmessage = (message: MessageEvent) => { - if (typeof message.data !== 'string') { - return; - } - logger.debug( - `subscription message from AWS AppSyncRealTime: ${message.data} `, - ); - const data = JSON.parse(message.data) as ParsedMessagePayload; - const { - type, - payload: { - connectionTimeoutMs = DEFAULT_KEEP_ALIVE_TIMEOUT, - } = {}, - } = data; - if (type === MESSAGE_TYPES.GQL_CONNECTION_ACK) { - ackOk = true; - if (this.awsRealTimeSocket) { - this.keepAliveTimeout = connectionTimeoutMs; - this.awsRealTimeSocket.onmessage = - this._handleIncomingSubscriptionMessage.bind(this); - this.awsRealTimeSocket.onerror = err => { - logger.debug(err); - this._errorDisconnect(CONTROL_MSG.CONNECTION_CLOSED); - }; - this.awsRealTimeSocket.onclose = event => { - logger.debug(`WebSocket closed ${event.reason}`); - this._errorDisconnect(CONTROL_MSG.CONNECTION_CLOSED); - }; - } - resolve('Cool, connected to AWS AppSyncRealTime'); - - return; - } - - if (type === MESSAGE_TYPES.GQL_CONNECTION_ERROR) { - const { - payload: { - errors: [{ errorType = '', errorCode = 0 } = {}] = [], - } = {}, - } = data; - - // TODO(Eslint): refactor to reject an Error object instead of a plain object - // eslint-disable-next-line prefer-promise-reject-errors - reject({ errorType, errorCode }); - } - }; - - const gqlInit = { - type: MESSAGE_TYPES.GQL_CONNECTION_INIT, - }; - this.awsRealTimeSocket.send(JSON.stringify(gqlInit)); - - const checkAckOk = (targetAckOk: boolean) => { - if (!targetAckOk) { - this.connectionStateMonitor.record( - CONNECTION_CHANGE.CONNECTION_FAILED, - ); - reject( - new Error( - `Connection timeout: ack from AWSAppSyncRealTime was not received after ${CONNECTION_INIT_TIMEOUT} ms`, - ), - ); - } - }; - - setTimeout(() => { - checkAckOk(ackOk); - }, CONNECTION_INIT_TIMEOUT); - } - }); - })(); - } catch (err) { - const { errorType, errorCode } = err as { - errorType: string; - errorCode: number; - }; - - if (NON_RETRYABLE_CODES.includes(errorCode)) { - throw new NonRetryableError(errorType); - } else if (errorType) { - throw new Error(errorType); - } else { - throw err; - } - } - } - - private async _awsRealTimeHeaderBasedAuth({ - apiKey, - authenticationType, - payload, - canonicalUri, - appSyncGraphqlEndpoint, - region, - additionalCustomHeaders, - }: AWSAppSyncRealTimeAuthInput): Promise< - Record | undefined - > { - const headerHandler: { - [key in ResolvedGraphQLAuthModes]: ( - arg0: AWSAppSyncRealTimeAuthInput, - ) => Promise> | Record; - } = { - apiKey: this._awsRealTimeApiKeyHeader.bind(this), - iam: this._awsRealTimeIAMHeader.bind(this), - oidc: this._awsAuthTokenHeader.bind(this), - userPool: this._awsAuthTokenHeader.bind(this), - lambda: this._customAuthHeader, - none: this._customAuthHeader, - }; - - if (!authenticationType || !headerHandler[authenticationType]) { - logger.debug(`Authentication type ${authenticationType} not supported`); - - return undefined; - } else { - const handler = headerHandler[authenticationType]; - - const host = appSyncGraphqlEndpoint - ? new AmplifyUrl(appSyncGraphqlEndpoint).host - : undefined; - - logger.debug(`Authenticating with ${JSON.stringify(authenticationType)}`); - let resolvedApiKey; - if (authenticationType === 'apiKey') { - resolvedApiKey = apiKey; + this.logger.debug(`observer not found for id: ${id}`); } - const result = await handler({ - payload, - canonicalUri, - appSyncGraphqlEndpoint, - apiKey: resolvedApiKey, - region, - host, - additionalCustomHeaders, - }); - return result; + return [true, { id, type, payload }]; } - } - private async _awsAuthTokenHeader({ host }: AWSAppSyncRealTimeAuthInput) { - const session = await fetchAuthSession(); - - return { - Authorization: session?.tokens?.accessToken?.toString(), - host, - }; + return [false, { id, type, payload }]; } - private async _awsRealTimeApiKeyHeader({ - apiKey, - host, - }: AWSAppSyncRealTimeAuthInput) { - const dt = new Date(); - const dtStr = dt.toISOString().replace(/[:-]|\.\d{3}/g, ''); - + protected _unsubscribeMessage(subscriptionId: string): { + id: string; + type: string; + } { return { - host, - 'x-amz-date': dtStr, - 'x-api-key': apiKey, + id: subscriptionId, + type: MESSAGE_TYPES.GQL_STOP, }; } - private async _awsRealTimeIAMHeader({ - payload, - canonicalUri, - appSyncGraphqlEndpoint, - region, - }: AWSAppSyncRealTimeAuthInput) { - const endpointInfo = { - region, - service: 'appsync', - }; - - const creds = (await fetchAuthSession()).credentials; - - const request = { - url: `${appSyncGraphqlEndpoint}${canonicalUri}`, - data: payload, - method: 'POST', - headers: { ...AWS_APPSYNC_REALTIME_HEADERS }, - }; - - const signedParams = signRequest( - { - headers: request.headers, - method: request.method, - url: new AmplifyUrl(request.url), - body: request.data, - }, - { - // TODO: What do we need to do to remove these !'s? - credentials: creds!, - signingRegion: endpointInfo.region!, - signingService: endpointInfo.service, - }, - ); + protected _extractConnectionTimeout(data: Record): number { + const { + payload: { connectionTimeoutMs = DEFAULT_KEEP_ALIVE_TIMEOUT } = {}, + } = data; - return signedParams.headers; + return connectionTimeoutMs; } - private _customAuthHeader({ - host, - additionalCustomHeaders, - }: AWSAppSyncRealTimeAuthInput) { - /** - * If `additionalHeaders` was provided to the subscription as a function, - * the headers that are returned by that function will already have been - * provided before this function is called. - */ - if (!additionalCustomHeaders?.Authorization) { - throw new Error('No auth token specified'); - } + protected _extractErrorCodeAndType(data: any): { + errorCode: number; + errorType: string; + } { + const { + payload: { errors: [{ errorType = '', errorCode = 0 } = {}] = [] } = {}, + } = data; - return { - Authorization: additionalCustomHeaders.Authorization, - host, - }; + return { errorCode, errorType }; } } diff --git a/packages/api-graphql/src/Providers/AWSWebSocketProvider/appsyncUrl.ts b/packages/api-graphql/src/Providers/AWSWebSocketProvider/appsyncUrl.ts new file mode 100644 index 00000000000..b79c9cca1eb --- /dev/null +++ b/packages/api-graphql/src/Providers/AWSWebSocketProvider/appsyncUrl.ts @@ -0,0 +1,152 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +import { RequestOptions } from '@aws-amplify/data-schema/runtime'; +import { + AmplifyUrl, + AmplifyUrlSearchParams, +} from '@aws-amplify/core/internals/utils'; + +import { AWSAppSyncRealTimeProviderOptions } from '../AWSAppSyncRealTimeProvider'; + +const protocol = 'wss://'; +const standardDomainPattern = + /^https:\/\/\w{26}\.appsync-api\.\w{2}(?:(?:-\w{2,})+)-\d\.amazonaws.com(?:\.cn)?\/graphql$/i; +const eventDomainPattern = + /^https:\/\/\w{26}\.\w+-api\.\w{2}(?:(?:-\w{2,})+)-\d\.amazonaws.com(?:\.cn)?\/event$/i; +const customDomainPath = '/realtime'; + +export const isCustomDomain = (url: string): boolean => { + return url.match(standardDomainPattern) === null; +}; + +const isEventDomain = (url: string): boolean => + url.match(eventDomainPattern) !== null; + +export const getRealtimeEndpointUrl = ( + appSyncGraphqlEndpoint: string | undefined, +): URL => { + let realtimeEndpoint = appSyncGraphqlEndpoint ?? ''; + + if (isEventDomain(realtimeEndpoint)) { + realtimeEndpoint = realtimeEndpoint + .concat(customDomainPath) + .replace('ddpg-api', 'grt-gamma') + .replace('appsync-api', 'appsync-realtime-api'); + } else if (isCustomDomain(realtimeEndpoint)) { + realtimeEndpoint = realtimeEndpoint.concat(customDomainPath); + } else { + realtimeEndpoint = realtimeEndpoint + .replace('appsync-api', 'appsync-realtime-api') + .replace('gogi-beta', 'grt-beta') + .replace('ddpg-api', 'grt-gamma'); + } + + realtimeEndpoint = realtimeEndpoint + .replace('https://', protocol) + .replace('http://', protocol); + + return new AmplifyUrl(realtimeEndpoint); +}; + +/** + * Strips out `Authorization` header if present + */ +const extractNonAuthHeaders = ( + headers?: AWSAppSyncRealTimeProviderOptions['additionalCustomHeaders'], +): Record => { + if (!headers) { + return {}; + } + + if ('Authorization' in headers) { + const { Authorization: _, ...nonAuthHeaders } = headers; + + return nonAuthHeaders; + } + + return headers; +}; + +/** + * + * @param headers - http headers + * @returns uri-encoded query parameters derived from custom headers + */ +export const queryParamsFromCustomHeaders = ( + headers?: AWSAppSyncRealTimeProviderOptions['additionalCustomHeaders'], +): URLSearchParams => { + const nonAuthHeaders = extractNonAuthHeaders(headers); + + const params = new AmplifyUrlSearchParams(); + + Object.entries(nonAuthHeaders).forEach(([k, v]) => { + params.append(k, v); + }); + + return params; +}; + +/** + * Normalizes AppSync realtime endpoint URL + * + * @param appSyncGraphqlEndpoint - AppSync endpointUri from config + * @param urlParams - URLSearchParams + * @returns fully resolved string realtime endpoint URL + */ +export const realtimeUrlWithQueryString = ( + appSyncGraphqlEndpoint: string | undefined, + urlParams: URLSearchParams, +): string => { + const realtimeEndpointUrl = getRealtimeEndpointUrl(appSyncGraphqlEndpoint); + + // preserves any query params a customer might manually set in the configuration + const existingParams = new AmplifyUrlSearchParams(realtimeEndpointUrl.search); + + for (const [k, v] of urlParams.entries()) { + existingParams.append(k, v); + } + + realtimeEndpointUrl.search = existingParams.toString(); + + return realtimeEndpointUrl.toString(); +}; + +// TODO: move to separate file? +export const additionalHeadersFromOptions = async ( + options: AWSAppSyncRealTimeProviderOptions, +) => { + const { + appSyncGraphqlEndpoint, + query, + libraryConfigHeaders = () => ({}), + additionalHeaders = {}, + authToken, + } = options; + + let additionalCustomHeaders = {}; + const _libraryConfigHeaders = await libraryConfigHeaders(); + + if (typeof additionalHeaders === 'function') { + const requestOptions: RequestOptions = { + url: appSyncGraphqlEndpoint || '', + queryString: query || '', + }; + additionalCustomHeaders = await additionalHeaders(requestOptions); + } else { + additionalCustomHeaders = additionalHeaders; + } + + // if an authorization header is set, have the explicit, operation-level authToken take precedence + if (authToken) { + additionalCustomHeaders = { + ...additionalCustomHeaders, + Authorization: authToken, + }; + } + + return { + additionalCustomHeaders, + libraryConfigHeaders: _libraryConfigHeaders, + }; +}; diff --git a/packages/api-graphql/src/Providers/AWSWebSocketProvider/authHeaders.ts b/packages/api-graphql/src/Providers/AWSWebSocketProvider/authHeaders.ts new file mode 100644 index 00000000000..faa822fc55c --- /dev/null +++ b/packages/api-graphql/src/Providers/AWSWebSocketProvider/authHeaders.ts @@ -0,0 +1,146 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +import { ConsoleLogger, fetchAuthSession } from '@aws-amplify/core'; +import { signRequest } from '@aws-amplify/core/internals/aws-client-utils'; +import { AmplifyUrl } from '@aws-amplify/core/internals/utils'; + +import { AWS_APPSYNC_REALTIME_HEADERS } from '../constants'; +import { AWSAppSyncRealTimeProviderOptions } from '../AWSAppSyncRealTimeProvider'; + +const logger = new ConsoleLogger('AWSAppSyncRealTimeProvider Auth'); + +type AWSAppSyncRealTimeAuthInput = + Partial & { + canonicalUri: string; + payload: string; + host?: string | undefined; + }; + +const awsAuthTokenHeader = async ({ host }: AWSAppSyncRealTimeAuthInput) => { + const session = await fetchAuthSession(); + + return { + Authorization: session?.tokens?.accessToken?.toString(), + host, + }; +}; + +const awsRealTimeApiKeyHeader = async ({ + apiKey, + host, +}: AWSAppSyncRealTimeAuthInput) => { + const dt = new Date(); + const dtStr = dt.toISOString().replace(/[:-]|\.\d{3}/g, ''); + + return { + host, + 'x-amz-date': dtStr, + 'x-api-key': apiKey, + }; +}; + +const awsRealTimeIAMHeader = async ({ + payload, + canonicalUri, + appSyncGraphqlEndpoint, + region, +}: AWSAppSyncRealTimeAuthInput) => { + const endpointInfo = { + region, + service: 'appsync', + }; + + const creds = (await fetchAuthSession()).credentials; + + const request = { + url: `${appSyncGraphqlEndpoint}${canonicalUri}`, + data: payload, + method: 'POST', + headers: { ...AWS_APPSYNC_REALTIME_HEADERS }, + }; + + const signedParams = signRequest( + { + headers: request.headers, + method: request.method, + url: new AmplifyUrl(request.url), + body: request.data, + }, + { + credentials: creds!, + signingRegion: endpointInfo.region!, + signingService: endpointInfo.service, + }, + ); + + return signedParams.headers; +}; + +const customAuthHeader = async ({ + host, + additionalCustomHeaders, +}: AWSAppSyncRealTimeAuthInput) => { + /** + * If `additionalHeaders` was provided to the subscription as a function, + * the headers that are returned by that function will already have been + * provided before this function is called. + */ + if (!additionalCustomHeaders?.Authorization) { + throw new Error('No auth token specified'); + } + + return { + Authorization: additionalCustomHeaders.Authorization, + host, + }; +}; + +export const awsRealTimeHeaderBasedAuth = async ({ + apiKey, + authenticationType, + canonicalUri, + appSyncGraphqlEndpoint, + region, + additionalCustomHeaders, + payload, +}: AWSAppSyncRealTimeAuthInput): Promise< + Record | undefined +> => { + const headerHandler = { + apiKey: awsRealTimeApiKeyHeader, + iam: awsRealTimeIAMHeader, + oidc: awsAuthTokenHeader, + userPool: awsAuthTokenHeader, + lambda: customAuthHeader, + none: customAuthHeader, + } as const; + + if (!authenticationType || !headerHandler[authenticationType]) { + logger.debug(`Authentication type ${authenticationType} not supported`); + + return undefined; + } else { + const handler = headerHandler[authenticationType]; + + const host = appSyncGraphqlEndpoint + ? new AmplifyUrl(appSyncGraphqlEndpoint).host + : undefined; + + const resolvedApiKey = authenticationType === 'apiKey' ? apiKey : undefined; + + logger.debug(`Authenticating with ${JSON.stringify(authenticationType)}`); + + const result = await handler({ + payload, + canonicalUri, + appSyncGraphqlEndpoint, + apiKey: resolvedApiKey, + region, + host, + additionalCustomHeaders, + }); + + return result; + } +}; diff --git a/packages/api-graphql/src/Providers/AWSWebSocketProvider/index.ts b/packages/api-graphql/src/Providers/AWSWebSocketProvider/index.ts new file mode 100644 index 00000000000..3553a008123 --- /dev/null +++ b/packages/api-graphql/src/Providers/AWSWebSocketProvider/index.ts @@ -0,0 +1,943 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 +import { Observable, Subscription, SubscriptionLike } from 'rxjs'; +import { GraphQLError } from 'graphql'; +import { ConsoleLogger, Hub, HubPayload } from '@aws-amplify/core'; +import { + CustomUserAgentDetails, + DocumentType, + NonRetryableError, + amplifyUuid, + base64Encoder, + isNonRetryableError, + jitteredExponentialRetry, +} from '@aws-amplify/core/internals/utils'; + +import { + CONTROL_MSG, + ConnectionState, + PubSubContentObserver, +} from '../../types/PubSub'; +import { + AMPLIFY_SYMBOL, + CONNECTION_INIT_TIMEOUT, + CONNECTION_STATE_CHANGE, + DEFAULT_KEEP_ALIVE_ALERT_TIMEOUT, + DEFAULT_KEEP_ALIVE_TIMEOUT, + MAX_DELAY_MS, + MESSAGE_TYPES, + NON_RETRYABLE_CODES, + NON_RETRYABLE_ERROR_TYPES, + SOCKET_STATUS, + START_ACK_TIMEOUT, + SUBSCRIPTION_STATUS, +} from '../constants'; +import { + CONNECTION_CHANGE, + ConnectionStateMonitor, +} from '../../utils/ConnectionStateMonitor'; +import { + ReconnectEvent, + ReconnectionMonitor, +} from '../../utils/ReconnectionMonitor'; +import type { AWSAppSyncRealTimeProviderOptions } from '../AWSAppSyncRealTimeProvider'; + +import { + additionalHeadersFromOptions, + queryParamsFromCustomHeaders, + realtimeUrlWithQueryString, +} from './appsyncUrl'; +import { awsRealTimeHeaderBasedAuth } from './authHeaders'; + +const dispatchApiEvent = (payload: HubPayload) => { + Hub.dispatch('api', payload, 'PubSub', AMPLIFY_SYMBOL); +}; + +export interface ObserverQuery { + observer: PubSubContentObserver; + query: string; + variables: DocumentType; + subscriptionState: SUBSCRIPTION_STATUS; + subscriptionReadyCallback?(): void; + subscriptionFailedCallback?(reason?: any): void; + startAckTimeoutId?: ReturnType; +} + +interface ParsedMessagePayload { + type: string; + payload: { + connectionTimeoutMs: number; + errors?: [{ errorType: string; errorCode: number }]; + }; +} + +interface AWSWebSocketProviderArgs { + providerName: string; + wsProtocolName: string; +} + +export abstract class AWSWebSocketProvider { + protected logger: ConsoleLogger; + protected subscriptionObserverMap = new Map(); + + private awsRealTimeSocket?: WebSocket; + private socketStatus: SOCKET_STATUS = SOCKET_STATUS.CLOSED; + private keepAliveTimeoutId?: ReturnType; + private keepAliveTimeout = DEFAULT_KEEP_ALIVE_TIMEOUT; + private keepAliveAlertTimeoutId?: ReturnType; + private promiseArray: { res(): void; rej(reason?: any): void }[] = []; + private connectionState: ConnectionState | undefined; + private readonly connectionStateMonitor = new ConnectionStateMonitor(); + private readonly reconnectionMonitor = new ReconnectionMonitor(); + private connectionStateMonitorSubscription: SubscriptionLike; + private readonly wsProtocolName: string; + + constructor(args: AWSWebSocketProviderArgs) { + this.logger = new ConsoleLogger(args.providerName); + this.wsProtocolName = args.wsProtocolName; + + this.connectionStateMonitorSubscription = + this._startConnectionStateMonitoring(); + } + + /** + * Mark the socket closed and release all active listeners + */ + close() { + // Mark the socket closed both in status and the connection monitor + this.socketStatus = SOCKET_STATUS.CLOSED; + this.connectionStateMonitor.record(CONNECTION_CHANGE.CONNECTION_FAILED); + + // Turn off the subscription monitor Hub publishing + this.connectionStateMonitorSubscription.unsubscribe(); + // Complete all reconnect observers + this.reconnectionMonitor.close(); + } + + subscribe( + options?: AWSAppSyncRealTimeProviderOptions, + customUserAgentDetails?: CustomUserAgentDetails, + ): Observable> { + return new Observable(observer => { + if (!options?.appSyncGraphqlEndpoint) { + observer.error({ + errors: [ + { + ...new GraphQLError( + `Subscribe only available for AWS AppSync endpoint`, + ), + }, + ], + }); + observer.complete(); + + return; + } + + let subscriptionStartInProgress = false; + const subscriptionId = amplifyUuid(); + const startSubscription = () => { + if (!subscriptionStartInProgress) { + subscriptionStartInProgress = true; + + this._startSubscriptionWithAWSAppSyncRealTime({ + options, + observer, + subscriptionId, + customUserAgentDetails, + }) + .catch(err => { + this.logger.debug( + `${CONTROL_MSG.REALTIME_SUBSCRIPTION_INIT_ERROR}: ${err}`, + ); + this.connectionStateMonitor.record(CONNECTION_CHANGE.CLOSED); + }) + .finally(() => { + subscriptionStartInProgress = false; + }); + } + }; + + // Add an observable to the reconnection list to manage reconnection for this subscription + const reconnectSubscription = new Observable( + reconnectSubscriptionObserver => { + this.reconnectionMonitor.addObserver(reconnectSubscriptionObserver); + }, + ).subscribe(() => { + startSubscription(); + }); + + startSubscription(); + + return async () => { + await this._cleanupSubscription(subscriptionId, reconnectSubscription); + }; + }); + } + + protected async connect( + options: AWSAppSyncRealTimeProviderOptions, + ): Promise { + if (this.socketStatus === SOCKET_STATUS.READY) { + return; + } + + await this._connectWebSocket(options); + } + + protected async publish( + options: AWSAppSyncRealTimeProviderOptions, + customUserAgentDetails?: CustomUserAgentDetails, + ): Promise { + if (this.socketStatus !== SOCKET_STATUS.READY) { + throw new Error('Subscription has not been initialized'); + } + + return this._publishMessage(options, customUserAgentDetails); + } + + private async _connectWebSocket(options: AWSAppSyncRealTimeProviderOptions) { + const { apiKey, appSyncGraphqlEndpoint, authenticationType, region } = + options; + + const { additionalCustomHeaders } = + await additionalHeadersFromOptions(options); + + this.connectionStateMonitor.record(CONNECTION_CHANGE.OPENING_CONNECTION); + await this._initializeWebSocketConnection({ + apiKey, + appSyncGraphqlEndpoint, + authenticationType, + region, + additionalCustomHeaders, + }); + } + + private async _publishMessage( + options: AWSAppSyncRealTimeProviderOptions, + customUserAgentDetails?: CustomUserAgentDetails, + ): Promise { + const subscriptionId = amplifyUuid(); + + const { additionalCustomHeaders, libraryConfigHeaders } = + await additionalHeadersFromOptions(options); + + const serializedSubscriptionMessage = + await this._prepareSubscriptionPayload({ + options, + subscriptionId, + customUserAgentDetails, + additionalCustomHeaders, + libraryConfigHeaders, + publish: true, + }); + + return new Promise((resolve, reject) => { + if (this.awsRealTimeSocket) { + const publishListener = (event: MessageEvent) => { + const data = JSON.parse(event.data); + if (data.id === subscriptionId && data.type === 'publish_success') { + this.awsRealTimeSocket && + this.awsRealTimeSocket.removeEventListener( + 'message', + publishListener, + ); + + resolve(); + } + + if (data.erroredEvents && data.erroredEvents.length > 0) { + // TODO: handle errors + } + }; + this.awsRealTimeSocket.addEventListener('message', publishListener); + this.awsRealTimeSocket.addEventListener('close', () => { + reject(new Error('WebSocket is closed')); + }); + // + // this.awsRealTimeSocket.addEventListener('error', publishListener); + + this.awsRealTimeSocket.send(serializedSubscriptionMessage); + } + }); + } + + private async _cleanupSubscription( + subscriptionId: string, + reconnectSubscription: Subscription, + ) { + // Cleanup reconnection subscription + reconnectSubscription?.unsubscribe(); + + // Cleanup after unsubscribing or observer.complete was called after _startSubscriptionWithAWSAppSyncRealTime + try { + // Waiting that subscription has been connected before trying to unsubscribe + await this._waitForSubscriptionToBeConnected(subscriptionId); + + const { subscriptionState } = + this.subscriptionObserverMap.get(subscriptionId) || {}; + + if (!subscriptionState) { + // subscription already unsubscribed + return; + } + + if (subscriptionState === SUBSCRIPTION_STATUS.CONNECTED) { + this._sendUnsubscriptionMessage(subscriptionId); + } else { + throw new Error('Subscription never connected'); + } + } catch (err) { + this.logger.debug(`Error while unsubscribing ${err}`); + } finally { + this._removeSubscriptionObserver(subscriptionId); + } + } + + // Monitor the connection state and pass changes along to Hub + private _startConnectionStateMonitoring() { + return this.connectionStateMonitor.connectionStateObservable.subscribe( + connectionState => { + dispatchApiEvent({ + event: CONNECTION_STATE_CHANGE, + data: { + provider: this, + connectionState, + }, + message: `Connection state is ${connectionState}`, + }); + this.connectionState = connectionState; + + // Trigger START_RECONNECT when the connection is disrupted + if (connectionState === ConnectionState.ConnectionDisrupted) { + this.reconnectionMonitor.record(ReconnectEvent.START_RECONNECT); + } + + // Trigger HALT_RECONNECT to halt reconnection attempts when the state is anything other than + // ConnectionDisrupted or Connecting + if ( + [ + ConnectionState.Connected, + ConnectionState.ConnectedPendingDisconnect, + ConnectionState.ConnectedPendingKeepAlive, + ConnectionState.ConnectedPendingNetwork, + ConnectionState.ConnectionDisruptedPendingNetwork, + ConnectionState.Disconnected, + ].includes(connectionState) + ) { + this.reconnectionMonitor.record(ReconnectEvent.HALT_RECONNECT); + } + }, + ); + } + + protected abstract _prepareSubscriptionPayload(param: { + options: AWSAppSyncRealTimeProviderOptions; + subscriptionId: string; + customUserAgentDetails: CustomUserAgentDetails | undefined; + additionalCustomHeaders: Record; + libraryConfigHeaders: Record; + publish?: boolean; + }): Promise; + + private async _startSubscriptionWithAWSAppSyncRealTime({ + options, + observer, + subscriptionId, + customUserAgentDetails, + }: { + options: AWSAppSyncRealTimeProviderOptions; + observer: PubSubContentObserver; + subscriptionId: string; + customUserAgentDetails: CustomUserAgentDetails | undefined; + }) { + const { query, variables } = options; + + const { additionalCustomHeaders, libraryConfigHeaders } = + await additionalHeadersFromOptions(options); + + this.subscriptionObserverMap.set(subscriptionId, { + observer, + query: query ?? '', + variables: variables ?? {}, + subscriptionState: SUBSCRIPTION_STATUS.PENDING, + startAckTimeoutId: undefined, + }); + + const serializedSubscriptionMessage = + await this._prepareSubscriptionPayload({ + options, + subscriptionId, + customUserAgentDetails, + additionalCustomHeaders, + libraryConfigHeaders, + }); + + try { + await this._connectWebSocket(options); + } catch (err: any) { + this._logStartSubscriptionError(subscriptionId, observer, err); + + return; + } + + // Potential race condition can occur when unsubscribe is called during _initializeWebSocketConnection. + // E.g.unsubscribe gets invoked prior to finishing WebSocket handshake or START_ACK. + // Both subscriptionFailedCallback and subscriptionReadyCallback are used to synchronized this. + const { subscriptionFailedCallback, subscriptionReadyCallback } = + this.subscriptionObserverMap.get(subscriptionId) ?? {}; + + // This must be done before sending the message in order to be listening immediately + this.subscriptionObserverMap.set(subscriptionId, { + observer, + subscriptionState: SUBSCRIPTION_STATUS.PENDING, + query: query ?? '', + variables: variables ?? {}, + subscriptionReadyCallback, + subscriptionFailedCallback, + startAckTimeoutId: setTimeout(() => { + this._timeoutStartSubscriptionAck(subscriptionId); + }, START_ACK_TIMEOUT), + }); + + if (this.awsRealTimeSocket) { + this.awsRealTimeSocket.send(serializedSubscriptionMessage); + } + } + + // Log logic for start subscription failures + private _logStartSubscriptionError( + subscriptionId: string, + observer: PubSubContentObserver, + err: { message?: string }, + ) { + this.logger.debug({ err }); + const message = String(err.message ?? ''); + // Resolving to give the state observer time to propogate the update + this.connectionStateMonitor.record(CONNECTION_CHANGE.CLOSED); + + // Capture the error only when the network didn't cause disruption + if ( + this.connectionState !== ConnectionState.ConnectionDisruptedPendingNetwork + ) { + // When the error is non-retriable, error out the observable + if (isNonRetryableError(err)) { + observer.error({ + errors: [ + { + ...new GraphQLError( + `${CONTROL_MSG.CONNECTION_FAILED}: ${message}`, + ), + }, + ], + }); + } else { + this.logger.debug(`${CONTROL_MSG.CONNECTION_FAILED}: ${message}`); + } + + const { subscriptionFailedCallback } = + this.subscriptionObserverMap.get(subscriptionId) || {}; + + // Notify concurrent unsubscription + if (typeof subscriptionFailedCallback === 'function') { + subscriptionFailedCallback(); + } + } + } + + // Waiting that subscription has been connected before trying to unsubscribe + private async _waitForSubscriptionToBeConnected(subscriptionId: string) { + const subscriptionObserver = + this.subscriptionObserverMap.get(subscriptionId); + + if (subscriptionObserver) { + const { subscriptionState } = subscriptionObserver; + // This in case unsubscribe is invoked before sending start subscription message + if (subscriptionState === SUBSCRIPTION_STATUS.PENDING) { + return new Promise((resolve, reject) => { + const { + observer, + subscriptionState: observedSubscriptionState, + variables, + query, + } = subscriptionObserver; + this.subscriptionObserverMap.set(subscriptionId, { + observer, + subscriptionState: observedSubscriptionState, + variables, + query, + subscriptionReadyCallback: resolve, + subscriptionFailedCallback: reject, + }); + }); + } + } + } + + protected abstract _unsubscribeMessage(subscriptionId: string): { + id: string; + type: string; + }; + + private _sendUnsubscriptionMessage(subscriptionId: string) { + try { + if ( + this.awsRealTimeSocket && + this.awsRealTimeSocket.readyState === WebSocket.OPEN && + this.socketStatus === SOCKET_STATUS.READY + ) { + // Preparing unsubscribe message to stop receiving messages for that subscription + const unsubscribeMessage = this._unsubscribeMessage(subscriptionId); + const stringToAWSRealTime = JSON.stringify(unsubscribeMessage); + this.awsRealTimeSocket.send(stringToAWSRealTime); + } + } catch (err) { + // If GQL_STOP is not sent because of disconnection issue, then there is nothing the client can do + this.logger.debug({ err }); + } + } + + private _removeSubscriptionObserver(subscriptionId: string) { + this.subscriptionObserverMap.delete(subscriptionId); + + // Verifying 1000ms after removing subscription in case there are new subscription unmount/mount + setTimeout(this._closeSocketIfRequired.bind(this), 1000); + } + + private _closeSocketIfRequired() { + if (this.subscriptionObserverMap.size > 0) { + // Active subscriptions on the WebSocket + return; + } + + if (!this.awsRealTimeSocket) { + this.socketStatus = SOCKET_STATUS.CLOSED; + + return; + } + + this.connectionStateMonitor.record(CONNECTION_CHANGE.CLOSING_CONNECTION); + + if (this.awsRealTimeSocket.bufferedAmount > 0) { + // Still data on the WebSocket + setTimeout(this._closeSocketIfRequired.bind(this), 1000); + } else { + this.logger.debug('closing WebSocket...'); + if (this.keepAliveTimeoutId) { + clearTimeout(this.keepAliveTimeoutId); + } + if (this.keepAliveAlertTimeoutId) { + clearTimeout(this.keepAliveAlertTimeoutId); + } + const tempSocket = this.awsRealTimeSocket; + // Cleaning callbacks to avoid race condition, socket still exists + tempSocket.onclose = null; + tempSocket.onerror = null; + tempSocket.close(1000); + this.awsRealTimeSocket = undefined; + this.socketStatus = SOCKET_STATUS.CLOSED; + this.connectionStateMonitor.record(CONNECTION_CHANGE.CLOSED); + } + } + + protected abstract _handleSubscriptionData( + message: MessageEvent, + ): [ + boolean, + { id: string; payload: string | Record; type: string }, + ]; + + protected abstract _extractConnectionTimeout( + data: Record, + ): number; + + protected abstract _extractErrorCodeAndType(data: Record): { + errorCode: number; + errorType: string; + }; + + private _handleIncomingSubscriptionMessage(message: MessageEvent) { + if (typeof message.data !== 'string') { + return; + } + + const [isData, data] = this._handleSubscriptionData(message); + if (isData) return; + + const { type, id, payload } = data; + + const { + observer = null, + query = '', + variables = {}, + startAckTimeoutId, + subscriptionReadyCallback, + subscriptionFailedCallback, + } = this.subscriptionObserverMap.get(id) || {}; + + if ( + type === MESSAGE_TYPES.GQL_START_ACK || + type === MESSAGE_TYPES.EVENT_SUBSCRIBE_ACK + ) { + this.logger.debug( + `subscription ready for ${JSON.stringify({ query, variables })}`, + ); + if (typeof subscriptionReadyCallback === 'function') { + subscriptionReadyCallback(); + } + if (startAckTimeoutId) clearTimeout(startAckTimeoutId); + dispatchApiEvent({ + event: CONTROL_MSG.SUBSCRIPTION_ACK, + data: { query, variables }, + message: 'Connection established for subscription', + }); + const subscriptionState = SUBSCRIPTION_STATUS.CONNECTED; + if (observer) { + this.subscriptionObserverMap.set(id, { + observer, + query, + variables, + startAckTimeoutId: undefined, + subscriptionState, + subscriptionReadyCallback, + subscriptionFailedCallback, + }); + } + this.connectionStateMonitor.record( + CONNECTION_CHANGE.CONNECTION_ESTABLISHED, + ); + + return; + } + + if (type === MESSAGE_TYPES.GQL_CONNECTION_KEEP_ALIVE) { + if (this.keepAliveTimeoutId) clearTimeout(this.keepAliveTimeoutId); + if (this.keepAliveAlertTimeoutId) + clearTimeout(this.keepAliveAlertTimeoutId); + this.keepAliveTimeoutId = setTimeout(() => { + this._errorDisconnect(CONTROL_MSG.TIMEOUT_DISCONNECT); + }, this.keepAliveTimeout); + this.keepAliveAlertTimeoutId = setTimeout(() => { + this.connectionStateMonitor.record(CONNECTION_CHANGE.KEEP_ALIVE_MISSED); + }, DEFAULT_KEEP_ALIVE_ALERT_TIMEOUT); + this.connectionStateMonitor.record(CONNECTION_CHANGE.KEEP_ALIVE); + + return; + } + + if (type === MESSAGE_TYPES.GQL_ERROR) { + const subscriptionState = SUBSCRIPTION_STATUS.FAILED; + if (observer) { + this.subscriptionObserverMap.set(id, { + observer, + query, + variables, + startAckTimeoutId, + subscriptionReadyCallback, + subscriptionFailedCallback, + subscriptionState, + }); + + this.logger.debug( + `${CONTROL_MSG.CONNECTION_FAILED}: ${JSON.stringify(payload ?? data)}`, + ); + + observer.error({ + errors: [ + { + ...new GraphQLError( + `${CONTROL_MSG.CONNECTION_FAILED}: ${JSON.stringify(payload ?? data)}`, + ), + }, + ], + }); + + if (startAckTimeoutId) clearTimeout(startAckTimeoutId); + + if (typeof subscriptionFailedCallback === 'function') { + subscriptionFailedCallback(); + } + } + } + } + + private _errorDisconnect(msg: string) { + this.logger.debug(`Disconnect error: ${msg}`); + + if (this.awsRealTimeSocket) { + this.connectionStateMonitor.record(CONNECTION_CHANGE.CLOSED); + this.awsRealTimeSocket.close(); + } + + this.socketStatus = SOCKET_STATUS.CLOSED; + } + + private _timeoutStartSubscriptionAck(subscriptionId: string) { + const subscriptionObserver = + this.subscriptionObserverMap.get(subscriptionId); + if (subscriptionObserver) { + const { observer, query, variables } = subscriptionObserver; + if (!observer) { + return; + } + this.subscriptionObserverMap.set(subscriptionId, { + observer, + query, + variables, + subscriptionState: SUBSCRIPTION_STATUS.FAILED, + }); + + this.connectionStateMonitor.record(CONNECTION_CHANGE.CLOSED); + this.logger.debug( + 'timeoutStartSubscription', + JSON.stringify({ query, variables }), + ); + } + } + + private _initializeWebSocketConnection({ + appSyncGraphqlEndpoint, + authenticationType, + apiKey, + region, + additionalCustomHeaders, + }: AWSAppSyncRealTimeProviderOptions) { + if (this.socketStatus === SOCKET_STATUS.READY) { + return; + } + + // TODO(Eslint): refactor to now use async function as the promise executor + // eslint-disable-next-line no-async-promise-executor + return new Promise(async (resolve, reject) => { + this.promiseArray.push({ res: resolve, rej: reject }); + + if (this.socketStatus === SOCKET_STATUS.CLOSED) { + try { + this.socketStatus = SOCKET_STATUS.CONNECTING; + + // Empty payload on connect + const payloadString = '{}'; + + const authHeader = await awsRealTimeHeaderBasedAuth({ + authenticationType, + payload: payloadString, + canonicalUri: '/connect', + apiKey, + appSyncGraphqlEndpoint, + region, + additionalCustomHeaders, + }); + + const headerString = authHeader ? JSON.stringify(authHeader) : ''; + // base64url-encoded string + const encodedHeader = base64Encoder.convert(headerString, { + urlSafe: true, + skipPadding: true, + }); + + const authTokenSubprotocol = `header-${encodedHeader}`; + + const queryParams = queryParamsFromCustomHeaders( + additionalCustomHeaders, + ); + + const awsRealTimeUrl = realtimeUrlWithQueryString( + appSyncGraphqlEndpoint, + queryParams, + ); + + await this._establishRetryableConnection( + awsRealTimeUrl, + authTokenSubprotocol, + ); + + this.promiseArray.forEach(({ res }) => { + this.logger.debug('Notifying connection successful'); + res(); + }); + this.socketStatus = SOCKET_STATUS.READY; + this.promiseArray = []; + } catch (err) { + this.logger.debug('Connection exited with', err); + this.promiseArray.forEach(({ rej }) => { + rej(err); + }); + this.promiseArray = []; + if ( + this.awsRealTimeSocket && + this.awsRealTimeSocket.readyState === WebSocket.OPEN + ) { + this.awsRealTimeSocket.close(3001); + } + this.awsRealTimeSocket = undefined; + this.socketStatus = SOCKET_STATUS.CLOSED; + } + } + }); + } + + private async _establishRetryableConnection( + awsRealTimeUrl: string, + subprotocol: string, + ) { + this.logger.debug(`Establishing retryable connection`); + await jitteredExponentialRetry( + this._establishConnection.bind(this), + [awsRealTimeUrl, subprotocol], + MAX_DELAY_MS, + ); + } + + private async _openConnection(awsRealTimeUrl: string, subprotocol: string) { + return new Promise((resolve, reject) => { + const newSocket = this._getNewWebSocket(awsRealTimeUrl, [ + this.wsProtocolName, + subprotocol, + ]); + + newSocket.onerror = () => { + this.logger.debug(`WebSocket connection error`); + }; + newSocket.onclose = () => { + reject(new Error('Connection handshake error')); + }; + newSocket.onopen = () => { + this.awsRealTimeSocket = newSocket; + resolve(); + }; + }); + } + + private _getNewWebSocket(url: string, protocol: string[]) { + return new WebSocket(url, protocol); + } + + private async _initiateHandshake(): Promise { + return new Promise((resolve, reject) => { + if (!this.awsRealTimeSocket) { + reject(new Error('awsRealTimeSocket undefined')); + + return; + } + + let ackOk = false; + + this.awsRealTimeSocket.onerror = error => { + this.logger.debug(`WebSocket error ${JSON.stringify(error)}`); + }; + + this.awsRealTimeSocket.onclose = event => { + this.logger.debug(`WebSocket closed ${event.reason}`); + reject(new Error(JSON.stringify(event))); + }; + + this.awsRealTimeSocket.onmessage = (message: MessageEvent) => { + if (typeof message.data !== 'string') { + return; + } + this.logger.debug( + `subscription message from AWS AppSyncRealTime: ${message.data} `, + ); + + const data = JSON.parse(message.data) as ParsedMessagePayload; + + const { type } = data; + + const connectionTimeoutMs = this._extractConnectionTimeout(data); + + if (type === MESSAGE_TYPES.GQL_CONNECTION_ACK) { + ackOk = true; + this._registerWebsocketHandlers(connectionTimeoutMs); + resolve('Connected to AWS AppSyncRealTime'); + + return; + } + + if (type === MESSAGE_TYPES.GQL_CONNECTION_ERROR) { + const { errorType, errorCode } = this._extractErrorCodeAndType(data); + + // TODO(Eslint): refactor to reject an Error object instead of a plain object + // eslint-disable-next-line prefer-promise-reject-errors + reject({ errorType, errorCode }); + } + }; + + const gqlInit = { + type: MESSAGE_TYPES.GQL_CONNECTION_INIT, + }; + this.awsRealTimeSocket.send(JSON.stringify(gqlInit)); + + const checkAckOk = (targetAckOk: boolean) => { + if (!targetAckOk) { + this.connectionStateMonitor.record( + CONNECTION_CHANGE.CONNECTION_FAILED, + ); + reject( + new Error( + `Connection timeout: ack from AWSAppSyncRealTime was not received after ${CONNECTION_INIT_TIMEOUT} ms`, + ), + ); + } + }; + + setTimeout(() => { + checkAckOk(ackOk); + }, CONNECTION_INIT_TIMEOUT); + }); + } + + private _registerWebsocketHandlers(connectionTimeoutMs: number) { + if (!this.awsRealTimeSocket) { + return; + } + + this.keepAliveTimeout = connectionTimeoutMs; + this.awsRealTimeSocket.onmessage = + this._handleIncomingSubscriptionMessage.bind(this); + + this.awsRealTimeSocket.onerror = err => { + this.logger.debug(err); + this._errorDisconnect(CONTROL_MSG.CONNECTION_CLOSED); + }; + + this.awsRealTimeSocket.onclose = event => { + this.logger.debug(`WebSocket closed ${event.reason}`); + this._errorDisconnect(CONTROL_MSG.CONNECTION_CLOSED); + }; + } + + /** + * Open WebSocket connection & perform handshake + * Ref: https://docs.aws.amazon.com/appsync/latest/devguide/real-time-websocket-client.html#appsynclong-real-time-websocket-client-implementation-guide-for-graphql-subscriptions + * + * @param subprotocol - + */ + private _establishConnection = async ( + awsRealTimeUrl: string, + subprotocol: string, + ) => { + this.logger.debug(`Establishing WebSocket connection to ${awsRealTimeUrl}`); + try { + await this._openConnection(awsRealTimeUrl, subprotocol); + await this._initiateHandshake(); + } catch (err) { + const { errorType, errorCode } = err as { + errorType: string; + errorCode: number; + }; + + if ( + NON_RETRYABLE_CODES.includes(errorCode) || + // Event API does not currently return `errorCode`. This may change in the future. + // For now fall back to also checking known non-retryable error types + NON_RETRYABLE_ERROR_TYPES.includes(errorType) + ) { + throw new NonRetryableError(errorType); + } else if (errorType) { + throw new Error(errorType); + } else { + throw err; + } + } + }; +} diff --git a/packages/api-graphql/src/Providers/constants.ts b/packages/api-graphql/src/Providers/constants.ts index cda958b7f6a..5e82f672081 100644 --- a/packages/api-graphql/src/Providers/constants.ts +++ b/packages/api-graphql/src/Providers/constants.ts @@ -5,6 +5,10 @@ export { AMPLIFY_SYMBOL } from '@aws-amplify/core/internals/utils'; export const MAX_DELAY_MS = 5000; export const NON_RETRYABLE_CODES = [400, 401, 403]; +export const NON_RETRYABLE_ERROR_TYPES = [ + 'BadRequestException', + 'UnauthorizedException', +]; export const CONNECTION_STATE_CHANGE = 'ConnectionStateChange'; @@ -36,9 +40,9 @@ export enum MESSAGE_TYPES { GQL_START_ACK = 'start_ack', /** * Server -> Client message. - * This message type is for subscription message from AWS AppSync RealTime + * This message type is for subscription message from AWS AppSync RealTime or Events */ - GQL_DATA = 'data', + DATA = 'data', /** * Server -> Client message. * This message type helps the client to know is still receiving messages from AWS AppSync RealTime @@ -59,6 +63,36 @@ export enum MESSAGE_TYPES { * This message type is for sending error messages from AWS AppSync RealTime to the client */ GQL_ERROR = 'error', // Server -> Client + /** + * Client -> Server message. + * This message type is for registering subscriptions with Events + */ + EVENT_SUBSCRIBE = 'subscribe', + /** + * Client -> Server message. + * This message type is for publishing a message with Events + */ + EVENT_PUBLISH = 'publish', + /** + * Server -> Client message. + * Server acknowledges successful subscription + */ + EVENT_SUBSCRIBE_ACK = 'subscribe_success', + /** + * Server -> Client message. + * Server acknowledges successful publish + */ + EVENT_PUBLISH_ACK = 'publish_success', + /** + * Client -> Server message. + * This message type is for unregister subscriptions with AWS AppSync RealTime + */ + EVENT_STOP = 'unsubscribe', + /** + * Server -> Client message. + * This is the ack response from AWS AppSync Events to EVENT_STOP message + */ + EVENT_COMPLETE = 'unsubscribe_success', } export enum SUBSCRIPTION_STATUS { diff --git a/packages/api-graphql/src/index.ts b/packages/api-graphql/src/index.ts index a5e3edd991b..bec5dd43416 100644 --- a/packages/api-graphql/src/index.ts +++ b/packages/api-graphql/src/index.ts @@ -1,7 +1,12 @@ // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 +import * as events from './internals/events'; + +export { events }; + export { GraphQLAPI, GraphQLAPIClass, graphqlOperation } from './GraphQLAPI'; export * from './types'; export { CONNECTION_STATE_CHANGE } from './Providers/constants'; +export * from './internals/events/types'; diff --git a/packages/api-graphql/src/internals/InternalGraphQLAPI.ts b/packages/api-graphql/src/internals/InternalGraphQLAPI.ts index 15b89670314..f7d1a60d556 100644 --- a/packages/api-graphql/src/internals/InternalGraphQLAPI.ts +++ b/packages/api-graphql/src/internals/InternalGraphQLAPI.ts @@ -12,7 +12,6 @@ import { AmplifyClassV6 } from '@aws-amplify/core'; import { AmplifyUrl, CustomUserAgentDetails, - GraphQLAuthMode, getAmplifyUserAgent, } from '@aws-amplify/core/internals/utils'; import { isCancelError as isCancelErrorREST } from '@aws-amplify/api-rest'; @@ -30,17 +29,11 @@ import { AWSAppSyncRealTimeProvider } from '../Providers/AWSAppSyncRealTimeProvi import { GraphQLOperation, GraphQLOptions, GraphQLResult } from '../types'; import { resolveConfig, resolveLibraryOptions } from '../utils'; import { repackageUnauthorizedError } from '../utils/errors/repackageAuthError'; -import { - NO_API_KEY, - NO_AUTH_TOKEN_HEADER, - NO_ENDPOINT, - NO_SIGNED_IN_USER, - NO_VALID_AUTH_TOKEN, - NO_VALID_CREDENTIALS, -} from '../utils/errors/constants'; +import { NO_ENDPOINT } from '../utils/errors/constants'; import { GraphQLApiError, createGraphQLResultWithError } from '../utils/errors'; import { isGraphQLResponseWithErrors } from './utils/runtimeTypeGuards/isGraphQLResponseWithErrors'; +import { headerBasedAuth } from './graphqlAuth'; const USER_AGENT_HEADER = 'x-amz-user-agent'; @@ -72,80 +65,6 @@ export class InternalGraphQLAPIClass { return 'InternalGraphQLAPI'; } - private async _headerBasedAuth( - amplify: AmplifyClassV6, - authMode: GraphQLAuthMode, - additionalHeaders: Record = {}, - ) { - const { apiKey } = resolveConfig(amplify); - - let headers = {}; - - switch (authMode) { - case 'apiKey': - if (!apiKey) { - throw new GraphQLApiError(NO_API_KEY); - } - headers = { - 'X-Api-Key': apiKey, - }; - break; - case 'iam': { - const session = await amplify.Auth.fetchAuthSession(); - if (session.credentials === undefined) { - throw new GraphQLApiError(NO_VALID_CREDENTIALS); - } - break; - } - case 'oidc': - case 'userPool': { - let token: string | undefined; - - try { - token = ( - await amplify.Auth.fetchAuthSession() - ).tokens?.accessToken.toString(); - } catch (e) { - // fetchAuthSession failed - throw new GraphQLApiError({ - ...NO_SIGNED_IN_USER, - underlyingError: e, - }); - } - - // `fetchAuthSession()` succeeded but didn't return `tokens`. - // This may happen when unauthenticated access is enabled and there is - // no user signed in. - if (!token) { - throw new GraphQLApiError(NO_VALID_AUTH_TOKEN); - } - - headers = { - Authorization: token, - }; - break; - } - case 'lambda': - if ( - typeof additionalHeaders === 'object' && - !additionalHeaders.Authorization - ) { - throw new GraphQLApiError(NO_AUTH_TOKEN_HEADER); - } - - headers = { - Authorization: additionalHeaders.Authorization, - }; - break; - case 'none': - break; - default: - break; - } - - return headers; - } - /** * to get the operation type * @param operation @@ -252,6 +171,7 @@ export class InternalGraphQLAPIClass { authToken?: string, ): Promise> { const { + apiKey, region, endpoint: appSyncGraphqlEndpoint, customEndpoint, @@ -303,29 +223,22 @@ export class InternalGraphQLAPIClass { }; } - // TODO: Figure what we need to do to remove `!`'s. + const authHeaders = await headerBasedAuth( + amplify, + authMode, + apiKey, + additionalCustomHeaders, + ); + const headers = { - ...(!customEndpoint && - (await this._headerBasedAuth( - amplify, - authMode!, - additionalCustomHeaders, - ))), + ...(!customEndpoint && authHeaders), /** * Custom endpoint headers. * If there is both a custom endpoint and custom region present, we get the headers. * If there is a custom endpoint but no region, we return an empty object. * If neither are present, we return an empty object. */ - ...((customEndpoint && - (customEndpointRegion - ? await this._headerBasedAuth( - amplify, - authMode!, - additionalCustomHeaders, - ) - : {})) || - {}), + ...((customEndpoint && (customEndpointRegion ? authHeaders : {})) || {}), // Custom headers included in Amplify configuration options: ...(customHeaders && (await customHeaders({ @@ -378,9 +291,9 @@ export class InternalGraphQLAPIClass { let response: any; try { - // See the inline doc of the REST `post()` API for possible errors to be thrown. - // As these errors are catastrophic they should be caught and handled by GraphQL - // API consumers. + // // // See the inline doc of the REST `post()` API for possible errors to be thrown. + // // // As these errors are catastrophic they should be caught and handled by GraphQL + // // // API consumers. const { body: responseBody } = await this._api.post(amplify, { url: new AmplifyUrl(endpoint), options: { diff --git a/packages/api-graphql/src/internals/events/appsyncRequest.ts b/packages/api-graphql/src/internals/events/appsyncRequest.ts new file mode 100644 index 00000000000..5b53d81204d --- /dev/null +++ b/packages/api-graphql/src/internals/events/appsyncRequest.ts @@ -0,0 +1,169 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 +import { AmplifyClassV6 } from '@aws-amplify/core'; +import { + AmplifyUrl, + CustomUserAgentDetails, + GraphQLAuthMode, + getAmplifyUserAgent, +} from '@aws-amplify/core/internals/utils'; +import { post } from '@aws-amplify/api-rest/internals'; +import { + CustomHeaders, + RequestOptions, +} from '@aws-amplify/data-schema/runtime'; + +import { resolveLibraryOptions } from '../../utils'; +import { repackageUnauthorizedError } from '../../utils/errors/repackageAuthError'; +import { headerBasedAuth } from '../graphqlAuth'; +import { isGraphQLResponseWithErrors } from '../utils/runtimeTypeGuards/isGraphQLResponseWithErrors'; + +const USER_AGENT_HEADER = 'x-amz-user-agent'; + +interface GqlRequestOptions { + apiKey?: string; + region?: string; + appSyncGraphqlEndpoint: string; + authenticationType: GraphQLAuthMode; + query: string; + variables: string[]; + authToken?: string; +} + +// This is effectively a copy of InternalGraphQLAPI.ts._graphql(...) +// Our existing unit tests are tightly coupled to the implementation, so i was unable to refactor +// and extend _graphql() without having to change a bunch of tests as well... which in turn reduces confidence +// that this feature will _not affect_ GQL behavior. +export async function appsyncRequest( + amplify: AmplifyClassV6, + options: GqlRequestOptions, + additionalHeaders: CustomHeaders = {}, + abortController: AbortController, + customUserAgentDetails?: CustomUserAgentDetails, +): Promise { + const { + region, + appSyncGraphqlEndpoint: endpoint, + authenticationType: authMode, + query, + variables, + } = options; + + if (!endpoint) { + throw new Error('No endpoint'); + } + + const { withCredentials } = resolveLibraryOptions(amplify); + + const headers = await requestHeaders( + amplify, + options, + additionalHeaders, + customUserAgentDetails, + ); + + const body = { + channel: query as string, + events: variables, + }; + + const signingServiceInfo = ['apiKey', 'none'].includes(authMode) + ? undefined + : { + service: 'appsync', + region, + }; + + const { body: responseBody } = await post(amplify, { + url: new AmplifyUrl(endpoint), + options: { + headers, + body, + signingServiceInfo, + withCredentials, + }, + abortController, + }); + + const response = await responseBody.json(); + + if (isGraphQLResponseWithErrors(response)) { + throw repackageUnauthorizedError(response); + } + + return response as T; +} + +/** + * Computes all the necessary HTTP headers for the request based on: + * 1. Operation-level header options + * 2. Amplify.configure custom headers + * 3. AuthZ headers for explicit auth mode specified for operation ?? default auth mode in config + * + * @returns HTTP request headers key/value + */ +async function requestHeaders( + amplify: AmplifyClassV6, + options: GqlRequestOptions, + additionalHeaders: CustomHeaders, + customUserAgentDetails?: CustomUserAgentDetails, +) { + const { + apiKey, + appSyncGraphqlEndpoint: endpoint, + authenticationType: authMode, + query, + variables, + authToken, + } = options; + + const { headers: customHeadersFn } = resolveLibraryOptions(amplify); + + let additionalCustomHeaders: Record; + + if (typeof additionalHeaders === 'function') { + const requestOptions: RequestOptions = { + method: 'POST', + url: new AmplifyUrl(endpoint).toString(), + queryString: query, + }; + + additionalCustomHeaders = await additionalHeaders(requestOptions); + } else { + additionalCustomHeaders = additionalHeaders; + } + + // if an authorization header is set, have the operation-level authToken take precedence + if (authToken) { + additionalCustomHeaders = { + ...additionalCustomHeaders, + Authorization: authToken, + }; + } + + const authHeaders = await headerBasedAuth( + amplify, + authMode, + apiKey, + additionalCustomHeaders, + ); + + const customHeaders = + customHeadersFn && + (await customHeadersFn({ + query, + variables: variables as any, + })); + + const headers = { + ...authHeaders, + // Custom headers included in Amplify configuration options: + ...customHeaders, + // Custom headers from individual requests or API client configuration: + ...additionalCustomHeaders, + // User agent headers: + [USER_AGENT_HEADER]: getAmplifyUserAgent(customUserAgentDetails), + }; + + return headers; +} diff --git a/packages/api-graphql/src/internals/events/index.ts b/packages/api-graphql/src/internals/events/index.ts new file mode 100644 index 00000000000..49860a92049 --- /dev/null +++ b/packages/api-graphql/src/internals/events/index.ts @@ -0,0 +1,161 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +import { Subscription } from 'rxjs'; +import { Amplify } from '@aws-amplify/core'; +import { DocumentType } from '@aws-amplify/core/internals/utils'; + +import { AppSyncEventProvider as eventProvider } from '../../Providers/AWSAppSyncEventsProvider'; + +import { appsyncRequest } from './appsyncRequest'; +import { configure, normalizeAuth, serializeEvents } from './utils'; +import type { + EventsChannel, + EventsOptions, + PublishResponse, + PublishedEvent, + SubscriptionObserver, +} from './types'; + +/** + * @experimental API may change in future versions + * + * Establish a WebSocket connection to an Events channel + * + * @example + * const channel = await events.connect("default/channel") + * + * channel.subscribe({ + * next: (data) => { console.log(data) }, + * error: (err) => { console.error(err) }, + * }) + * + * @example // authMode override + * const channel = await events.connect("default/channel", { authMode: "userPool" }) + * + * @param channel - channel path; `/` + * @param options - request overrides: `authMode`, `authToken` + * + */ +async function connect( + channel: string, + options?: EventsOptions, +): Promise { + const providerOptions = configure(); + + providerOptions.authenticationType = normalizeAuth( + options?.authMode, + providerOptions.authenticationType, + ); + + await eventProvider.connect(providerOptions); + + let _subscription: Subscription; + + const sub = ( + observer: SubscriptionObserver, + subOptions?: EventsOptions, + ): Subscription => { + const subscribeOptions = { ...providerOptions, query: channel }; + subscribeOptions.authenticationType = normalizeAuth( + subOptions?.authMode, + subscribeOptions.authenticationType, + ); + + _subscription = eventProvider + .subscribe(subscribeOptions) + .subscribe(observer); + + return _subscription; + }; + + // WS publish is not enabled in the service yet. It will be a follow up feature + const _pub = async ( + event: DocumentType, + pubOptions?: EventsOptions, + ): Promise => { + const publishOptions = { + ...providerOptions, + query: channel, + variables: event, + }; + publishOptions.authenticationType = normalizeAuth( + pubOptions?.authMode, + publishOptions.authenticationType, + ); + + return eventProvider.publish(publishOptions); + }; + + const close = () => { + _subscription && _subscription.unsubscribe(); + }; + + return { + subscribe: sub, + close, + // publish: pub, + }; +} + +/** + * @experimental API may change in future versions + * + * Publish events to a channel via HTTP request + * + * @example + * await events.post("default/channel", { some: "event" }) + * + * @example // event batching + * await events.post("default/channel", [{ some: "event" }, { some: "event2" }]) + * + * @example // authMode override + * await events.post("default/channel", { some: "event" }, { authMode: "userPool" }) + * + * @param channel - channel path; `/` + * @param event - JSON-serializable value or an array of values + * @param options - request overrides: `authMode`, `authToken` + * + * @returns void on success + * @throws on error + */ +async function post( + channel: string, + event: DocumentType | DocumentType[], + options?: EventsOptions, +): Promise { + const providerOptions = configure(); + providerOptions.authenticationType = normalizeAuth( + options?.authMode, + providerOptions.authenticationType, + ); + + // trailing slash required in publish + const normalizedChannelName = channel[0] === '/' ? channel : `/${channel}`; + + const publishOptions = { + ...providerOptions, + query: normalizedChannelName, + variables: serializeEvents(event), + authToken: options?.authToken, + }; + + const abortController = new AbortController(); + + const res = await appsyncRequest( + Amplify, + publishOptions, + {}, + abortController, + ); + + if (res.failed?.length > 0) { + return res.failed; + } +} + +function closeAll(): void { + eventProvider.close(); +} + +export { connect, post, closeAll }; diff --git a/packages/api-graphql/src/internals/events/types.ts b/packages/api-graphql/src/internals/events/types.ts new file mode 100644 index 00000000000..f9f52c538c1 --- /dev/null +++ b/packages/api-graphql/src/internals/events/types.ts @@ -0,0 +1,71 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +import { Subscription } from 'rxjs'; +import type { GraphQLAuthMode } from '@aws-amplify/core/internals/utils'; + +export interface SubscriptionObserver { + next(value: T): void; + error(errorValue: any): void; +} + +export interface EventsChannel { + /** + * @experimental API may change in future versions + * + * Subscribe to Events + * + * @example + * const channel = await events.connect("default/channel") + * + * channel.subscribe({ + * next: (data) => { console.log(data) }, + * error: (err) => { console.error(err) }, + * }) + * + * @example // authMode override + * channel.subscribe({ + * next: (data) => { console.log(data) }, + * error: (err) => { console.error(err) }, + * }, { authMode: 'userPool' }) + * + * @param observer - observer callback handlers + * `{ next: () => {}, error: () => {}}` + * + * @param options - subscribe overrides: `authMode`, `authToken` + * + */ + subscribe( + observer: SubscriptionObserver, + subOptions?: EventsOptions, + ): Subscription; + /** + * @experimental API may change in future versions + * + * Close channel and any active subscriptions + * + * @example + * const channel = await events.connect("default/channel") + * + * channel.close() + * + */ + close(): void; +} + +export type ResolvedGraphQLAuthModes = Exclude; + +export interface EventsOptions { + authMode?: GraphQLAuthMode; + authToken?: string; +} + +export interface PublishedEvent { + identifier: string; + index: number; +} + +export interface PublishResponse { + failed: PublishedEvent[]; + successful: PublishedEvent[]; +} diff --git a/packages/api-graphql/src/internals/events/utils.ts b/packages/api-graphql/src/internals/events/utils.ts new file mode 100644 index 00000000000..018aed5f96a --- /dev/null +++ b/packages/api-graphql/src/internals/events/utils.ts @@ -0,0 +1,77 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 +import { Amplify } from '@aws-amplify/core'; +import { + DocumentType, + GraphQLAuthMode, +} from '@aws-amplify/core/internals/utils'; + +import type { ResolvedGraphQLAuthModes } from './types'; + +export const normalizeAuth = ( + explicitAuthMode: GraphQLAuthMode | undefined, + defaultAuthMode: ResolvedGraphQLAuthModes, +): ResolvedGraphQLAuthModes => { + if (!explicitAuthMode) { + return defaultAuthMode; + } + + if (explicitAuthMode === 'identityPool') { + return 'iam'; + } + + return explicitAuthMode; +}; + +export const configure = () => { + const config = Amplify.getConfig(); + + const eventsConfig = config.API?.Events; + + if (!eventsConfig) { + throw new Error( + 'Amplify configuration is missing. Have you called Amplify.configure()?', + ); + } + + const configAuthMode = normalizeAuth(eventsConfig.defaultAuthMode, 'apiKey'); + + const options = { + appSyncGraphqlEndpoint: eventsConfig.endpoint, + region: eventsConfig.region, + authenticationType: configAuthMode, + apiKey: eventsConfig.apiKey, + }; + + return options; +}; + +/** + * Event API expects and array of JSON strings + * + * @param events - JSON-serializable value or an array of values + * @returns array of JSON strings + */ +export const serializeEvents = ( + events: DocumentType | DocumentType[], +): string[] => { + if (Array.isArray(events)) { + return events.map((ev, idx) => { + const eventJson = JSON.stringify(ev); + if (eventJson === undefined) { + throw new Error( + `Event must be a valid JSON value. Received ${ev} at index ${idx}`, + ); + } + + return eventJson; + }); + } + + const eventJson = JSON.stringify(events); + if (eventJson === undefined) { + throw new Error(`Event must be a valid JSON value. Received ${events}`); + } + + return [eventJson]; +}; diff --git a/packages/api-graphql/src/internals/graphqlAuth.ts b/packages/api-graphql/src/internals/graphqlAuth.ts new file mode 100644 index 00000000000..2b1ade5c344 --- /dev/null +++ b/packages/api-graphql/src/internals/graphqlAuth.ts @@ -0,0 +1,87 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +import { GraphQLAuthMode } from '@aws-amplify/core/internals/utils'; +import { AmplifyClassV6 } from '@aws-amplify/core'; + +import { GraphQLApiError } from '../utils/errors'; +import { + NO_API_KEY, + NO_AUTH_TOKEN_HEADER, + NO_SIGNED_IN_USER, + NO_VALID_AUTH_TOKEN, + NO_VALID_CREDENTIALS, +} from '../utils/errors/constants'; + +export async function headerBasedAuth( + amplify: AmplifyClassV6, + authMode: GraphQLAuthMode, + apiKey: string | undefined, + additionalHeaders: Record = {}, +) { + let headers = {}; + + switch (authMode) { + case 'apiKey': + if (!apiKey) { + throw new GraphQLApiError(NO_API_KEY); + } + headers = { + 'X-Api-Key': apiKey, + }; + break; + case 'iam': { + const session = await amplify.Auth.fetchAuthSession(); + if (session.credentials === undefined) { + throw new GraphQLApiError(NO_VALID_CREDENTIALS); + } + break; + } + case 'oidc': + case 'userPool': { + let token: string | undefined; + + try { + token = ( + await amplify.Auth.fetchAuthSession() + ).tokens?.accessToken.toString(); + } catch (e) { + // fetchAuthSession failed + throw new GraphQLApiError({ + ...NO_SIGNED_IN_USER, + underlyingError: e, + }); + } + + // `fetchAuthSession()` succeeded but didn't return `tokens`. + // This may happen when unauthenticated access is enabled and there is + // no user signed in. + if (!token) { + throw new GraphQLApiError(NO_VALID_AUTH_TOKEN); + } + + headers = { + Authorization: token, + }; + break; + } + case 'lambda': + if ( + typeof additionalHeaders === 'object' && + !additionalHeaders.Authorization + ) { + throw new GraphQLApiError(NO_AUTH_TOKEN_HEADER); + } + + headers = { + Authorization: additionalHeaders.Authorization, + }; + break; + case 'none': + break; + default: + break; + } + + return headers; +} diff --git a/packages/api-graphql/src/internals/graphqlRequest.ts b/packages/api-graphql/src/internals/graphqlRequest.ts new file mode 100644 index 00000000000..8566b9d682d --- /dev/null +++ b/packages/api-graphql/src/internals/graphqlRequest.ts @@ -0,0 +1,30 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +import { AmplifyUrl } from '@aws-amplify/core/internals/utils'; +import { AmplifyClassV6 } from '@aws-amplify/core'; +import { + // cancel as cancelREST, + post, + // updateRequestToBeCancellable, +} from '@aws-amplify/api-rest/internals'; + +export async function graphqlRequest( + amplify: AmplifyClassV6, + url: string, + options: any, + abortController: AbortController, + _post?: typeof post, +) { + const p = _post ?? post; + + const { body: responseBody } = await p(amplify, { + url: new AmplifyUrl(url), + options, + abortController, + }); + + const response = await responseBody.json(); + + return response; +} diff --git a/packages/api-graphql/src/utils/ConnectionStateMonitor.ts b/packages/api-graphql/src/utils/ConnectionStateMonitor.ts index 49f35afe683..92581015f0d 100644 --- a/packages/api-graphql/src/utils/ConnectionStateMonitor.ts +++ b/packages/api-graphql/src/utils/ConnectionStateMonitor.ts @@ -17,18 +17,7 @@ interface LinkedConnectionStates { keepAliveState: LinkedHealthState; } -export const CONNECTION_CHANGE: { - [key in - | 'KEEP_ALIVE_MISSED' - | 'KEEP_ALIVE' - | 'CONNECTION_ESTABLISHED' - | 'CONNECTION_FAILED' - | 'CLOSING_CONNECTION' - | 'OPENING_CONNECTION' - | 'CLOSED' - | 'ONLINE' - | 'OFFLINE']: Partial; -} = { +export const CONNECTION_CHANGE = { KEEP_ALIVE_MISSED: { keepAliveState: 'unhealthy' }, KEEP_ALIVE: { keepAliveState: 'healthy' }, CONNECTION_ESTABLISHED: { connectionState: 'connected' }, @@ -44,7 +33,7 @@ export const CONNECTION_CHANGE: { CLOSED: { connectionState: 'disconnected' }, ONLINE: { networkState: 'connected' }, OFFLINE: { networkState: 'disconnected' }, -}; +} as const; export class ConnectionStateMonitor { /** diff --git a/packages/api-graphql/src/utils/ReconnectionMonitor.ts b/packages/api-graphql/src/utils/ReconnectionMonitor.ts index f521f2a7208..7d781264684 100644 --- a/packages/api-graphql/src/utils/ReconnectionMonitor.ts +++ b/packages/api-graphql/src/utils/ReconnectionMonitor.ts @@ -11,7 +11,7 @@ export enum ReconnectEvent { /** * Captures the reconnect event logic used to determine when to reconnect to PubSub providers. - * Reconnnect attempts are delayed by 5 seconds to let the interface settle. + * Reconnect attempts are delayed by 5 seconds to let the interface settle. * Attempting to reconnect only once creates unrecoverable states when the network state isn't * supported by the browser, so this keeps retrying every minute until halted. */ diff --git a/packages/api/src/index.ts b/packages/api/src/index.ts index 35062beffce..ab99abc99d6 100644 --- a/packages/api/src/index.ts +++ b/packages/api/src/index.ts @@ -6,14 +6,19 @@ import type { V6Client } from '@aws-amplify/api-graphql'; export { GraphQLQuery, GraphQLSubscription, SelectionSet } from './types'; export { generateClient } from './API'; -export { GraphQLAuthError, ConnectionState } from '@aws-amplify/api-graphql'; +export { + GraphQLAuthError, + ConnectionState, + EventsChannel, + EventsOptions, +} from '@aws-amplify/api-graphql'; export type { GraphQLResult, GraphQLReturnType, } from '@aws-amplify/api-graphql'; -export { CONNECTION_STATE_CHANGE } from '@aws-amplify/api-graphql'; +export { CONNECTION_STATE_CHANGE, events } from '@aws-amplify/api-graphql'; // explicitly defaulting to `never` here resolves // TS2589: Type instantiation is excessively deep and possibly infinite. diff --git a/packages/aws-amplify/__tests__/exports.test.ts b/packages/aws-amplify/__tests__/exports.test.ts index c6425ad7c93..0a354d6cf11 100644 --- a/packages/aws-amplify/__tests__/exports.test.ts +++ b/packages/aws-amplify/__tests__/exports.test.ts @@ -55,6 +55,7 @@ describe('aws-amplify Exports', () => { 'ConnectionState', 'GraphQLAuthError', 'del', + 'events', 'generateClient', 'get', 'head', diff --git a/packages/aws-amplify/package.json b/packages/aws-amplify/package.json index 0be29232695..214bcc2e020 100644 --- a/packages/aws-amplify/package.json +++ b/packages/aws-amplify/package.json @@ -293,7 +293,7 @@ "name": "[Analytics] record (Pinpoint)", "path": "./dist/esm/analytics/index.mjs", "import": "{ record }", - "limit": "17.35 kB" + "limit": "17.41 kB" }, { "name": "[Analytics] record (Kinesis)", @@ -317,7 +317,7 @@ "name": "[Analytics] identifyUser (Pinpoint)", "path": "./dist/esm/analytics/index.mjs", "import": "{ identifyUser }", - "limit": "15.85 kB" + "limit": "15.91 kB" }, { "name": "[Analytics] enable", @@ -335,7 +335,7 @@ "name": "[API] generateClient (AppSync)", "path": "./dist/esm/api/index.mjs", "import": "{ generateClient }", - "limit": "43.40 kB" + "limit": "44.1 kB" }, { "name": "[API] REST API handlers", @@ -425,7 +425,7 @@ "name": "[Auth] getCurrentUser (Cognito)", "path": "./dist/esm/auth/index.mjs", "import": "{ getCurrentUser }", - "limit": "7.97 kB" + "limit": "8.09 kB" }, { "name": "[Auth] confirmUserAttribute (Cognito)", @@ -461,43 +461,43 @@ "name": "[Storage] copy (S3)", "path": "./dist/esm/storage/index.mjs", "import": "{ copy }", - "limit": "14.96 kB" + "limit": "15.03 kB" }, { "name": "[Storage] downloadData (S3)", "path": "./dist/esm/storage/index.mjs", "import": "{ downloadData }", - "limit": "15.55 kB" + "limit": "15.62 kB" }, { "name": "[Storage] getProperties (S3)", "path": "./dist/esm/storage/index.mjs", "import": "{ getProperties }", - "limit": "14.81 kB" + "limit": "14.89 kB" }, { "name": "[Storage] getUrl (S3)", "path": "./dist/esm/storage/index.mjs", "import": "{ getUrl }", - "limit": "16.05 kB" + "limit": "16.11 kB" }, { "name": "[Storage] list (S3)", "path": "./dist/esm/storage/index.mjs", "import": "{ list }", - "limit": "15.50 kB" + "limit": "15.55 kB" }, { "name": "[Storage] remove (S3)", "path": "./dist/esm/storage/index.mjs", "import": "{ remove }", - "limit": "14.67 kB" + "limit": "14.75 kB" }, { "name": "[Storage] uploadData (S3)", "path": "./dist/esm/storage/index.mjs", "import": "{ uploadData }", - "limit": "20.05 kB" + "limit": "20.08 kB" } ] } diff --git a/packages/core/src/parseAmplifyOutputs.ts b/packages/core/src/parseAmplifyOutputs.ts index 3a9265720e1..c7a5d819487 100644 --- a/packages/core/src/parseAmplifyOutputs.ts +++ b/packages/core/src/parseAmplifyOutputs.ts @@ -5,10 +5,10 @@ /* eslint-disable camelcase */ /* Does not like exhaustive checks */ -/* eslint-disable no-case-declarations */ import { APIConfig, + APIEventsConfig, APIGraphQLConfig, GraphQLAuthMode, ModelIntrospectionSchema, @@ -22,6 +22,7 @@ import { AmplifyOutputs, AmplifyOutputsAnalyticsProperties, AmplifyOutputsAuthProperties, + AmplifyOutputsCustomProperties, AmplifyOutputsDataProperties, AmplifyOutputsGeoProperties, AmplifyOutputsNotificationsProperties, @@ -223,6 +224,28 @@ function parseData( }; } +function parseCustom( + amplifyOutputsCustomProperties?: AmplifyOutputsCustomProperties, +) { + if (!amplifyOutputsCustomProperties?.events) { + return undefined; + } + + const { url, aws_region, api_key, default_authorization_type } = + amplifyOutputsCustomProperties.events; + + const Events: APIEventsConfig = { + endpoint: url, + defaultAuthMode: getGraphQLAuthMode(default_authorization_type), + region: aws_region, + apiKey: api_key, + }; + + return { + Events, + }; +} + function parseNotifications( amplifyOutputsNotificationsProperties?: AmplifyOutputsNotificationsProperties, ): NotificationsConfig | undefined { @@ -290,6 +313,14 @@ export function parseAmplifyOutputs( resourcesConfig.API = parseData(amplifyOutputs.data); } + if (amplifyOutputs.custom) { + const customConfig = parseCustom(amplifyOutputs.custom); + + if (customConfig && 'Events' in customConfig) { + resourcesConfig.API = { ...resourcesConfig.API, ...customConfig }; + } + } + if (amplifyOutputs.notifications) { resourcesConfig.Notifications = parseNotifications( amplifyOutputs.notifications, diff --git a/packages/core/src/singleton/API/types.ts b/packages/core/src/singleton/API/types.ts index 3909d264180..dc9435c9b6a 100644 --- a/packages/core/src/singleton/API/types.ts +++ b/packages/core/src/singleton/API/types.ts @@ -47,6 +47,37 @@ export interface APIGraphQLConfig { modelIntrospection?: ModelIntrospectionSchema; } +/** + * @experimental + */ +export interface APIEventsConfig { + /** + * Required GraphQL endpoint, must be a valid URL string. + */ + endpoint: string; + /** + * Optional region string used to sign the request. Required only if the auth mode is 'iam'. + */ + region?: string; + /** + * Optional API key string. Required only if the auth mode is 'apiKey'. + */ + apiKey?: string; + /** + * Custom domain endpoint for GraphQL API. + */ + customEndpoint?: string; + /** + * Optional region string used to sign the request to `customEndpoint`. Effective only if `customEndpoint` is + * specified, and the auth mode is 'iam'. + */ + customEndpointRegion?: string; + /** + * Default auth mode for all the API calls to given service. + */ + defaultAuthMode: GraphQLAuthMode; +} + export interface APIRestConfig { /** * Required REST endpoint, must be a valid URL string. @@ -75,7 +106,13 @@ export interface GraphQLProviderConfig { GraphQL: APIGraphQLConfig; } -export type APIConfig = AtLeastOne; +export interface EventsProviderConfig { + Events: APIEventsConfig; +} + +export type APIConfig = AtLeastOne< + RESTProviderConfig & GraphQLProviderConfig & EventsProviderConfig +>; export type GraphQLAuthMode = | 'apiKey' diff --git a/packages/core/src/singleton/AmplifyOutputs/types.ts b/packages/core/src/singleton/AmplifyOutputs/types.ts index 2968955158f..c3a23fc98ab 100644 --- a/packages/core/src/singleton/AmplifyOutputs/types.ts +++ b/packages/core/src/singleton/AmplifyOutputs/types.ts @@ -94,6 +94,17 @@ export interface AmplifyOutputsDataProperties { conflict_resolution_mode?: string; } +export interface AmplifyOutputsCustomProperties { + // @experimental + events?: { + url: string; + aws_region: string; + default_authorization_type: string; + api_key?: string; + }; + [key: string]: any; +} + export interface AmplifyOutputsNotificationsProperties { aws_region: string; amazon_pinpoint_app_id: string; @@ -107,5 +118,6 @@ export interface AmplifyOutputs { analytics?: AmplifyOutputsAnalyticsProperties; geo?: AmplifyOutputsGeoProperties; data?: AmplifyOutputsDataProperties; + custom?: AmplifyOutputsCustomProperties; notifications?: AmplifyOutputsNotificationsProperties; }