Skip to content

Commit

Permalink
fix unsub
Browse files Browse the repository at this point in the history
  • Loading branch information
iartemiev committed Oct 9, 2024
1 parent 81620c6 commit 9ecf744
Show file tree
Hide file tree
Showing 9 changed files with 185 additions and 77 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -171,4 +171,14 @@ export class AWSAppSyncEventProvider extends AWSWebSocketProvider {

return [false, { id, type, payload }];
}

protected _unsubscribeMessage(subscriptionId: string): {
id: string;
type: string;
} {
return {
id: subscriptionId,
type: MESSAGE_TYPES.EVENT_STOP,
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -148,4 +148,14 @@ export class AWSAppSyncRealTimeProvider extends AWSWebSocketProvider {

return [false, { id, type, payload }];
}

protected _unsubscribeMessage(subscriptionId: string): {
id: string;
type: string;
} {
return {
id: subscriptionId,
type: MESSAGE_TYPES.GQL_STOP,
};
}
}
10 changes: 6 additions & 4 deletions packages/api-graphql/src/Providers/AWSWebSocketProvider/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,11 @@ export abstract class AWSWebSocketProvider {
}
}

protected abstract _unsubscribeMessage(subscriptionId: string): {
id: string;
type: string;
};

private _sendUnsubscriptionMessage(subscriptionId: string) {
try {
if (
Expand All @@ -474,10 +479,7 @@ export abstract class AWSWebSocketProvider {
this.socketStatus === SOCKET_STATUS.READY
) {
// Preparing unsubscribe message to stop receiving messages for that subscription
const unsubscribeMessage = {
id: subscriptionId,
type: MESSAGE_TYPES.GQL_STOP,
};
const unsubscribeMessage = this._unsubscribeMessage(subscriptionId);
const stringToAWSRealTime = JSON.stringify(unsubscribeMessage);
this.awsRealTimeSocket.send(stringToAWSRealTime);
}
Expand Down
10 changes: 10 additions & 0 deletions packages/api-graphql/src/Providers/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,16 @@ export enum MESSAGE_TYPES {
* Server acknowledges successful publish
*/
EVENT_PUBLISH_ACK = 'publish_success',
/**
* Client -> Server message.
* This message type is for unregister subscriptions with AWS AppSync RealTime
*/
EVENT_STOP = 'unsubscribe',
/**
* Server -> Client message.
* This is the ack response from AWS AppSync Events to EVENT_STOP message
*/
EVENT_COMPLETE = 'unsubscribe_success',
}

export enum SUBSCRIPTION_STATUS {
Expand Down
1 change: 1 addition & 0 deletions packages/api-graphql/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@ export * from './types';

export { CONNECTION_STATE_CHANGE } from './Providers/constants';
export * as events from './internals/events';
export * from './internals/events/types';
98 changes: 26 additions & 72 deletions packages/api-graphql/src/internals/events/index.ts
Original file line number Diff line number Diff line change
@@ -1,78 +1,33 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

import { Observable } from 'rxjs';
import { Subscription } from 'rxjs';
import { Amplify } from '@aws-amplify/core';
import {
DocumentType,
GraphQLAuthMode,
} from '@aws-amplify/core/internals/utils';
import { DocumentType } from '@aws-amplify/core/internals/utils';

import { AWSAppSyncEventProvider } from '../../Providers/AWSAppSyncEventsProvider';

import { appsyncRequest } from './appsyncRequest';

type ResolvedGraphQLAuthModes = Exclude<GraphQLAuthMode, 'identityPool'>;
interface EventsOptions {
authMode?: GraphQLAuthMode;
authToken?: string;
}
import { configure, normalizeAuth } from './utils';
import type {
EventsChannel,
EventsOptions,
PublishResponse,
PublishedEvent,
SubscriptionObserver,
} from './types';

const eventProvider = new AWSAppSyncEventProvider();

const normalizeAuth = (
explicitAuthMode: GraphQLAuthMode | undefined,
defaultAuthMode: ResolvedGraphQLAuthModes,
): ResolvedGraphQLAuthModes => {
if (!explicitAuthMode) {
return defaultAuthMode;
}

if (explicitAuthMode === 'identityPool') {
return 'iam';
}

return explicitAuthMode;
};

const configure = () => {
const config = Amplify.getConfig() as any;

// TODO - get this correct
const eventsConfig = config.API?.GraphQL?.events ?? config.data?.events;

if (!eventsConfig) {
throw new Error(
'Amplify configuration is missing. Have you called Amplify.configure()',
);
}

const configAuthMode = normalizeAuth(
eventsConfig.defaultAuthMode ?? eventsConfig.default_authorization_type,
'apiKey',
);

const options = {
appSyncGraphqlEndpoint: eventsConfig.url,
region: eventsConfig.region ?? eventsConfig.aws_region,
authenticationType: configAuthMode,
apiKey: eventsConfig.apiKey ?? eventsConfig.api_key,
};

return options;
};

interface SubscriptionObserver<T> {
next(value: T): void;
error(errorValue: any): void;
}

/**
*
* @param channelName
* @param options
*/
async function connect(channelName: string, options?: EventsOptions) {
async function connect(
channelName: string,
options?: EventsOptions,
): Promise<EventsChannel> {
const providerOptions = configure();

providerOptions.authenticationType = normalizeAuth(
Expand All @@ -82,20 +37,23 @@ async function connect(channelName: string, options?: EventsOptions) {

await eventProvider.connect(providerOptions);

let _subscription: Subscription;

const sub = (
observer: SubscriptionObserver<any>,
subOptions?: EventsOptions,
): Observable<any> => {
): Subscription => {
const subscribeOptions = { ...providerOptions, query: channelName };
subscribeOptions.authenticationType = normalizeAuth(
subOptions?.authMode,
subscribeOptions.authenticationType,
);

const _sub = eventProvider.subscribe(subscribeOptions);
_sub.subscribe(observer);
_subscription = eventProvider
.subscribe(subscribeOptions)
.subscribe(observer);

return _sub;
return _subscription;
};

// WS publish is not enabled in the service yet. It will be a follow up feature
Expand All @@ -116,22 +74,18 @@ async function connect(channelName: string, options?: EventsOptions) {
return eventProvider.publish(publishOptions);
};

const close = () => {
_subscription && _subscription.unsubscribe();
};

return {
// WS publish is not enabled in the service yet, will be a follow up feature
// publish: pub,
subscribe: sub,
close,
};
}

interface PublishedEvent {
identifier: string;
index: number;
}
interface PublishResponse {
failed: PublishedEvent[];
successful: PublishedEvent[];
}

/**
* Event API expects and array of JSON strings
*
Expand Down
35 changes: 35 additions & 0 deletions packages/api-graphql/src/internals/events/types.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

import { Subscription } from 'rxjs';
import type { GraphQLAuthMode } from '@aws-amplify/core/internals/utils';

export interface SubscriptionObserver<T> {
next(value: T): void;
error(errorValue: any): void;
}

export interface EventsChannel {
subscribe(
observer: SubscriptionObserver<any>,
subOptions?: EventsOptions,
): Subscription;
close(): void;
}

export type ResolvedGraphQLAuthModes = Exclude<GraphQLAuthMode, 'identityPool'>;

export interface EventsOptions {
authMode?: GraphQLAuthMode;
authToken?: string;
}

export interface PublishedEvent {
identifier: string;
index: number;
}

export interface PublishResponse {
failed: PublishedEvent[];
successful: PublishedEvent[];
}
81 changes: 81 additions & 0 deletions packages/api-graphql/src/internals/events/utils.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
import { Amplify } from '@aws-amplify/core';
import {
DocumentType,
GraphQLAuthMode,
} from '@aws-amplify/core/internals/utils';

import type { ResolvedGraphQLAuthModes } from './types';

export const normalizeAuth = (
explicitAuthMode: GraphQLAuthMode | undefined,
defaultAuthMode: ResolvedGraphQLAuthModes,
): ResolvedGraphQLAuthModes => {
if (!explicitAuthMode) {
return defaultAuthMode;
}

if (explicitAuthMode === 'identityPool') {
return 'iam';
}

return explicitAuthMode;
};

export const configure = () => {
const config = Amplify.getConfig() as any;

// TODO - get this correct
const eventsConfig = config.API?.GraphQL?.events ?? config.data?.events;

if (!eventsConfig) {
throw new Error(
'Amplify configuration is missing. Have you called Amplify.configure()',
);
}

const configAuthMode = normalizeAuth(
eventsConfig.defaultAuthMode ?? eventsConfig.default_authorization_type,
'apiKey',
);

const options = {
appSyncGraphqlEndpoint: eventsConfig.url,
region: eventsConfig.region ?? eventsConfig.aws_region,
authenticationType: configAuthMode,
apiKey: eventsConfig.apiKey ?? eventsConfig.api_key,
};

return options;
};

/**
* Event API expects and array of JSON strings
*
* @param events - JSON-serializable value or an array of values
* @returns array of JSON strings
*/
export const serializeEvents = (
events: DocumentType | DocumentType[],
): string[] => {
if (Array.isArray(events)) {
return events.map((ev, idx) => {
const eventJson = JSON.stringify(ev);
if (eventJson === undefined) {
throw new Error(
`Event must be a valid JSON value. Received ${ev} at index ${idx}`,
);
}

return eventJson;
});
}

const eventJson = JSON.stringify(events);
if (eventJson === undefined) {
throw new Error(`Event must be a valid JSON value. Received ${events}`);
}

return [eventJson];
};
7 changes: 6 additions & 1 deletion packages/api/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,12 @@ import type { V6Client } from '@aws-amplify/api-graphql';
export { GraphQLQuery, GraphQLSubscription, SelectionSet } from './types';
export { generateClient } from './API';

export { GraphQLAuthError, ConnectionState } from '@aws-amplify/api-graphql';
export {
GraphQLAuthError,
ConnectionState,
EventsChannel,
EventsOptions,
} from '@aws-amplify/api-graphql';

export type {
GraphQLResult,
Expand Down

0 comments on commit 9ecf744

Please sign in to comment.