From f21d3f0fcbf9970d17df917440a114ea909dc29a Mon Sep 17 00:00:00 2001 From: Ivan Artemiev <29709626+iartemiev@users.noreply.github.com> Date: Wed, 9 Oct 2024 09:05:53 -0400 Subject: [PATCH] fix unsub --- .../AWSAppSyncEventsProvider/index.ts | 10 ++ .../AWSAppSyncRealTimeProvider/index.ts | 10 ++ .../Providers/AWSWebSocketProvider/index.ts | 10 +- .../api-graphql/src/Providers/constants.ts | 10 ++ packages/api-graphql/src/index.ts | 1 + .../api-graphql/src/internals/events/index.ts | 98 +++++-------------- .../api-graphql/src/internals/events/types.ts | 35 +++++++ .../api-graphql/src/internals/events/utils.ts | 81 +++++++++++++++ packages/api/src/index.ts | 7 +- 9 files changed, 185 insertions(+), 77 deletions(-) create mode 100644 packages/api-graphql/src/internals/events/types.ts create mode 100644 packages/api-graphql/src/internals/events/utils.ts diff --git a/packages/api-graphql/src/Providers/AWSAppSyncEventsProvider/index.ts b/packages/api-graphql/src/Providers/AWSAppSyncEventsProvider/index.ts index 3ba87f65d7c..612d5d2faf1 100644 --- a/packages/api-graphql/src/Providers/AWSAppSyncEventsProvider/index.ts +++ b/packages/api-graphql/src/Providers/AWSAppSyncEventsProvider/index.ts @@ -171,4 +171,14 @@ export class AWSAppSyncEventProvider extends AWSWebSocketProvider { return [false, { id, type, payload }]; } + + protected _unsubscribeMessage(subscriptionId: string): { + id: string; + type: string; + } { + return { + id: subscriptionId, + type: MESSAGE_TYPES.EVENT_STOP, + }; + } } diff --git a/packages/api-graphql/src/Providers/AWSAppSyncRealTimeProvider/index.ts b/packages/api-graphql/src/Providers/AWSAppSyncRealTimeProvider/index.ts index 4206623f83d..464a1872ced 100644 --- a/packages/api-graphql/src/Providers/AWSAppSyncRealTimeProvider/index.ts +++ b/packages/api-graphql/src/Providers/AWSAppSyncRealTimeProvider/index.ts @@ -148,4 +148,14 @@ export class AWSAppSyncRealTimeProvider extends AWSWebSocketProvider { return [false, { id, type, payload }]; } + + protected _unsubscribeMessage(subscriptionId: string): { + id: string; + type: string; + } { + return { + id: subscriptionId, + type: MESSAGE_TYPES.GQL_STOP, + }; + } } diff --git a/packages/api-graphql/src/Providers/AWSWebSocketProvider/index.ts b/packages/api-graphql/src/Providers/AWSWebSocketProvider/index.ts index c725d5a0e77..0723cbbe2a3 100644 --- a/packages/api-graphql/src/Providers/AWSWebSocketProvider/index.ts +++ b/packages/api-graphql/src/Providers/AWSWebSocketProvider/index.ts @@ -466,6 +466,11 @@ export abstract class AWSWebSocketProvider { } } + protected abstract _unsubscribeMessage(subscriptionId: string): { + id: string; + type: string; + }; + private _sendUnsubscriptionMessage(subscriptionId: string) { try { if ( @@ -474,10 +479,7 @@ export abstract class AWSWebSocketProvider { this.socketStatus === SOCKET_STATUS.READY ) { // Preparing unsubscribe message to stop receiving messages for that subscription - const unsubscribeMessage = { - id: subscriptionId, - type: MESSAGE_TYPES.GQL_STOP, - }; + const unsubscribeMessage = this._unsubscribeMessage(subscriptionId); const stringToAWSRealTime = JSON.stringify(unsubscribeMessage); this.awsRealTimeSocket.send(stringToAWSRealTime); } diff --git a/packages/api-graphql/src/Providers/constants.ts b/packages/api-graphql/src/Providers/constants.ts index 282dad167da..79641a32e68 100644 --- a/packages/api-graphql/src/Providers/constants.ts +++ b/packages/api-graphql/src/Providers/constants.ts @@ -79,6 +79,16 @@ export enum MESSAGE_TYPES { * Server acknowledges successful publish */ EVENT_PUBLISH_ACK = 'publish_success', + /** + * Client -> Server message. + * This message type is for unregister subscriptions with AWS AppSync RealTime + */ + EVENT_STOP = 'unsubscribe', + /** + * Server -> Client message. + * This is the ack response from AWS AppSync Events to EVENT_STOP message + */ + EVENT_COMPLETE = 'unsubscribe_success', } export enum SUBSCRIPTION_STATUS { diff --git a/packages/api-graphql/src/index.ts b/packages/api-graphql/src/index.ts index 11549654498..f4479b416ec 100644 --- a/packages/api-graphql/src/index.ts +++ b/packages/api-graphql/src/index.ts @@ -6,3 +6,4 @@ export * from './types'; export { CONNECTION_STATE_CHANGE } from './Providers/constants'; export * as events from './internals/events'; +export * from './internals/events/types'; diff --git a/packages/api-graphql/src/internals/events/index.ts b/packages/api-graphql/src/internals/events/index.ts index b0921b3efe7..e49a629d762 100644 --- a/packages/api-graphql/src/internals/events/index.ts +++ b/packages/api-graphql/src/internals/events/index.ts @@ -1,78 +1,33 @@ // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 -import { Observable } from 'rxjs'; +import { Subscription } from 'rxjs'; import { Amplify } from '@aws-amplify/core'; -import { - DocumentType, - GraphQLAuthMode, -} from '@aws-amplify/core/internals/utils'; +import { DocumentType } from '@aws-amplify/core/internals/utils'; import { AWSAppSyncEventProvider } from '../../Providers/AWSAppSyncEventsProvider'; import { appsyncRequest } from './appsyncRequest'; - -type ResolvedGraphQLAuthModes = Exclude; -interface EventsOptions { - authMode?: GraphQLAuthMode; - authToken?: string; -} +import { configure, normalizeAuth } from './utils'; +import type { + EventsChannel, + EventsOptions, + PublishResponse, + PublishedEvent, + SubscriptionObserver, +} from './types'; const eventProvider = new AWSAppSyncEventProvider(); -const normalizeAuth = ( - explicitAuthMode: GraphQLAuthMode | undefined, - defaultAuthMode: ResolvedGraphQLAuthModes, -): ResolvedGraphQLAuthModes => { - if (!explicitAuthMode) { - return defaultAuthMode; - } - - if (explicitAuthMode === 'identityPool') { - return 'iam'; - } - - return explicitAuthMode; -}; - -const configure = () => { - const config = Amplify.getConfig() as any; - - // TODO - get this correct - const eventsConfig = config.API?.GraphQL?.events ?? config.data?.events; - - if (!eventsConfig) { - throw new Error( - 'Amplify configuration is missing. Have you called Amplify.configure()', - ); - } - - const configAuthMode = normalizeAuth( - eventsConfig.defaultAuthMode ?? eventsConfig.default_authorization_type, - 'apiKey', - ); - - const options = { - appSyncGraphqlEndpoint: eventsConfig.url, - region: eventsConfig.region ?? eventsConfig.aws_region, - authenticationType: configAuthMode, - apiKey: eventsConfig.apiKey ?? eventsConfig.api_key, - }; - - return options; -}; - -interface SubscriptionObserver { - next(value: T): void; - error(errorValue: any): void; -} - /** * * @param channelName * @param options */ -async function connect(channelName: string, options?: EventsOptions) { +async function connect( + channelName: string, + options?: EventsOptions, +): Promise { const providerOptions = configure(); providerOptions.authenticationType = normalizeAuth( @@ -82,20 +37,23 @@ async function connect(channelName: string, options?: EventsOptions) { await eventProvider.connect(providerOptions); + let _subscription: Subscription; + const sub = ( observer: SubscriptionObserver, subOptions?: EventsOptions, - ): Observable => { + ): Subscription => { const subscribeOptions = { ...providerOptions, query: channelName }; subscribeOptions.authenticationType = normalizeAuth( subOptions?.authMode, subscribeOptions.authenticationType, ); - const _sub = eventProvider.subscribe(subscribeOptions); - _sub.subscribe(observer); + _subscription = eventProvider + .subscribe(subscribeOptions) + .subscribe(observer); - return _sub; + return _subscription; }; // WS publish is not enabled in the service yet. It will be a follow up feature @@ -116,22 +74,18 @@ async function connect(channelName: string, options?: EventsOptions) { return eventProvider.publish(publishOptions); }; + const close = () => { + _subscription && _subscription.unsubscribe(); + }; + return { // WS publish is not enabled in the service yet, will be a follow up feature // publish: pub, subscribe: sub, + close, }; } -interface PublishedEvent { - identifier: string; - index: number; -} -interface PublishResponse { - failed: PublishedEvent[]; - successful: PublishedEvent[]; -} - /** * Event API expects and array of JSON strings * diff --git a/packages/api-graphql/src/internals/events/types.ts b/packages/api-graphql/src/internals/events/types.ts new file mode 100644 index 00000000000..c0b7ed50be2 --- /dev/null +++ b/packages/api-graphql/src/internals/events/types.ts @@ -0,0 +1,35 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +import { Subscription } from 'rxjs'; +import type { GraphQLAuthMode } from '@aws-amplify/core/internals/utils'; + +export interface SubscriptionObserver { + next(value: T): void; + error(errorValue: any): void; +} + +export interface EventsChannel { + subscribe( + observer: SubscriptionObserver, + subOptions?: EventsOptions, + ): Subscription; + close(): void; +} + +export type ResolvedGraphQLAuthModes = Exclude; + +export interface EventsOptions { + authMode?: GraphQLAuthMode; + authToken?: string; +} + +export interface PublishedEvent { + identifier: string; + index: number; +} + +export interface PublishResponse { + failed: PublishedEvent[]; + successful: PublishedEvent[]; +} diff --git a/packages/api-graphql/src/internals/events/utils.ts b/packages/api-graphql/src/internals/events/utils.ts new file mode 100644 index 00000000000..ab99df9e75c --- /dev/null +++ b/packages/api-graphql/src/internals/events/utils.ts @@ -0,0 +1,81 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 +import { Amplify } from '@aws-amplify/core'; +import { + DocumentType, + GraphQLAuthMode, +} from '@aws-amplify/core/internals/utils'; + +import type { ResolvedGraphQLAuthModes } from './types'; + +export const normalizeAuth = ( + explicitAuthMode: GraphQLAuthMode | undefined, + defaultAuthMode: ResolvedGraphQLAuthModes, +): ResolvedGraphQLAuthModes => { + if (!explicitAuthMode) { + return defaultAuthMode; + } + + if (explicitAuthMode === 'identityPool') { + return 'iam'; + } + + return explicitAuthMode; +}; + +export const configure = () => { + const config = Amplify.getConfig() as any; + + // TODO - get this correct + const eventsConfig = config.API?.GraphQL?.events ?? config.data?.events; + + if (!eventsConfig) { + throw new Error( + 'Amplify configuration is missing. Have you called Amplify.configure()', + ); + } + + const configAuthMode = normalizeAuth( + eventsConfig.defaultAuthMode ?? eventsConfig.default_authorization_type, + 'apiKey', + ); + + const options = { + appSyncGraphqlEndpoint: eventsConfig.url, + region: eventsConfig.region ?? eventsConfig.aws_region, + authenticationType: configAuthMode, + apiKey: eventsConfig.apiKey ?? eventsConfig.api_key, + }; + + return options; +}; + +/** + * Event API expects and array of JSON strings + * + * @param events - JSON-serializable value or an array of values + * @returns array of JSON strings + */ +export const serializeEvents = ( + events: DocumentType | DocumentType[], +): string[] => { + if (Array.isArray(events)) { + return events.map((ev, idx) => { + const eventJson = JSON.stringify(ev); + if (eventJson === undefined) { + throw new Error( + `Event must be a valid JSON value. Received ${ev} at index ${idx}`, + ); + } + + return eventJson; + }); + } + + const eventJson = JSON.stringify(events); + if (eventJson === undefined) { + throw new Error(`Event must be a valid JSON value. Received ${events}`); + } + + return [eventJson]; +}; diff --git a/packages/api/src/index.ts b/packages/api/src/index.ts index 8b4cb73fe1f..c9ea54aee25 100644 --- a/packages/api/src/index.ts +++ b/packages/api/src/index.ts @@ -6,7 +6,12 @@ import type { V6Client } from '@aws-amplify/api-graphql'; export { GraphQLQuery, GraphQLSubscription, SelectionSet } from './types'; export { generateClient } from './API'; -export { GraphQLAuthError, ConnectionState } from '@aws-amplify/api-graphql'; +export type { + GraphQLAuthError, + ConnectionState, + EventsChannel, + EventsOptions, +} from '@aws-amplify/api-graphql'; export type { GraphQLResult,