Skip to content

Commit

Permalink
fix(analytics): add options for configuring pinpoint event buffer
Browse files Browse the repository at this point in the history
  • Loading branch information
Di Wu committed Nov 6, 2023
1 parent 7b514bb commit d2b1d90
Show file tree
Hide file tree
Showing 14 changed files with 121 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import {
resolveCredentials,
} from '../../../../src/providers/pinpoint/utils';
import { config, credentials, identityId } from './testUtils/data';
import { flushEvents } from '../../../../src/providers/pinpoint';
import { flushEvents } from '../../../../src';
import { flushEvents as pinpointFlushEvents } from '@aws-amplify/core/internals/providers/pinpoint';
import { AnalyticsAction } from '@aws-amplify/core/internals/utils';
import { ConsoleLogger } from '@aws-amplify/core';
Expand Down Expand Up @@ -44,13 +44,12 @@ describe('Pinpoint API: flushEvents', () => {
expect(mockResolveCredentials).toBeCalledTimes(1);

await new Promise(process.nextTick);
expect(mockPinpointFlushEvents).toBeCalledWith(
config.appId,
config.region,
expect(mockPinpointFlushEvents).toBeCalledWith({
...config,
credentials,
identityId,
getAnalyticsUserAgentString(AnalyticsAction.Record)
);
userAgentValue: getAnalyticsUserAgentString(AnalyticsAction.Record),
});
});

it('logs an error when credentials can not be fetched', async () => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ describe('Analytics Pinpoint Provider Util: resolveConfig', () => {
const pinpointConfig = {
appId: 'app-id',
region: 'region',
bufferSize: 100,
flushSize: 10,
flushInterval: 50,
resendLimit: 3,
};
// create spies
const getConfigSpy = jest.spyOn(Amplify, 'getConfig');
Expand Down
13 changes: 9 additions & 4 deletions packages/analytics/src/providers/pinpoint/apis/flushEvents.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,21 @@ const logger = new ConsoleLogger('Analytics');
* this API may not be included in the flush.
*/
export const flushEvents = () => {
const { appId, region } = resolveConfig();
const { appId, region, bufferSize, flushSize, flushInterval, resendLimit } =
resolveConfig();
resolveCredentials()
.then(({ credentials, identityId }) =>
flushEventsCore(
flushEventsCore({
appId,
region,
credentials,
identityId,
getAnalyticsUserAgentString(AnalyticsAction.Record)
)
bufferSize,
flushSize,
flushInterval,
resendLimit,
userAgentValue: getAnalyticsUserAgentString(AnalyticsAction.Record),
})
)
.catch(e => logger.warn('Failed to flush events', e));
};
7 changes: 6 additions & 1 deletion packages/analytics/src/providers/pinpoint/apis/record.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ const logger = new ConsoleLogger('Analytics');
* ```
*/
export const record = (input: RecordInput): void => {
const { appId, region } = resolveConfig();
const { appId, region, bufferSize, flushSize, flushInterval, resendLimit } =
resolveConfig();

if (!isAnalyticsEnabled()) {
logger.debug('Analytics is disabled, event will not be recorded.');
Expand All @@ -74,6 +75,10 @@ export const record = (input: RecordInput): void => {
identityId,
region,
userAgentValue: getAnalyticsUserAgentString(AnalyticsAction.Record),
bufferSize,
flushSize,
flushInterval,
resendLimit,
});
})
.catch(e => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@ import {
* @internal
*/
export const resolveConfig = () => {
const { appId, region } = Amplify.getConfig().Analytics?.Pinpoint ?? {};
const { appId, region, bufferSize, flushSize, flushInterval, resendLimit } =
Amplify.getConfig().Analytics?.Pinpoint ?? {};
assertValidationError(!!appId, AnalyticsValidationErrorCode.NoAppId);
assertValidationError(!!region, AnalyticsValidationErrorCode.NoRegion);
return { appId, region };

return { appId, region, bufferSize, flushSize, flushInterval, resendLimit };
};
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ describe('Pinpoint Provider API: flushEvents', () => {
});

it('invokes flushAll on pinpoint buffer', () => {
flushEvents(appId, region, credentials, identityId);
flushEvents({ appId, region, credentials, identityId });
expect(mockGetEventBuffer).toBeCalledWith(
expect.objectContaining({
appId,
Expand Down
13 changes: 12 additions & 1 deletion packages/core/src/ServiceWorker/ServiceWorker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,14 @@ export class ServiceWorkerClass {
const currentState = this.serviceWorker.state;
this._logger.debug(`ServiceWorker statechange: ${currentState}`);

const { appId, region } = Amplify.getConfig().Analytics?.Pinpoint ?? {};
const {
appId,
region,
bufferSize,
flushInterval,
flushSize,
resendLimit,
} = Amplify.getConfig().Analytics?.Pinpoint ?? {};
const { credentials } = await fetchAuthSession();

if (appId && region && credentials) {
Expand All @@ -221,6 +228,10 @@ export class ServiceWorkerClass {
region,
category: 'Core',
credentials,
bufferSize,
flushInterval,
flushSize,
resendLimit,
event: {
name: 'ServiceWorker',
attributes: {
Expand Down
39 changes: 26 additions & 13 deletions packages/core/src/providers/pinpoint/apis/flushEvents.ts
Original file line number Diff line number Diff line change
@@ -1,31 +1,44 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

import { AWSCredentials } from '../../../libraryUtils';
import { getEventBuffer } from '../utils/getEventBuffer';
import { EventBufferConfig } from '../types/buffer';
import { AuthSession } from '../../../singleton/Auth/types';
import {
BUFFER_SIZE,
FLUSH_INTERVAL,
FLUSH_SIZE,
RESEND_LIMIT,
} from '../utils/constants';

export const flushEvents = (
appId: string,
region: string,
credentials: AWSCredentials,
identityId?: string,
userAgentValue?: string
) => {
export type PinpointFlushEventsInput = Partial<EventBufferConfig> & {
appId: string;
region: string;
credentials: Required<AuthSession>['credentials'];
identityId?: AuthSession['identityId'];
userAgentValue?: string;
};

export const flushEvents = ({
appId,
region,
credentials,
bufferSize,
flushInterval,
flushSize,
resendLimit,
identityId,
userAgentValue,
}: PinpointFlushEventsInput) => {
getEventBuffer({
appId,
bufferSize: BUFFER_SIZE,
region,
credentials,
flushInterval: FLUSH_INTERVAL,
flushSize: FLUSH_SIZE,
bufferSize: bufferSize ?? BUFFER_SIZE,
flushInterval: flushInterval ?? FLUSH_INTERVAL,
flushSize: flushSize ?? FLUSH_SIZE,
resendLimit: resendLimit ?? RESEND_LIMIT,
identityId,
region,
resendLimit: RESEND_LIMIT,
userAgentValue,
}).flushAll();
};
16 changes: 10 additions & 6 deletions packages/core/src/providers/pinpoint/apis/record.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ export const record = async ({
identityId,
region,
userAgentValue,
bufferSize,
flushInterval,
flushSize,
resendLimit,
}: PinpointRecordInput): Promise<void> => {
let eventSession = session;
const currentTime = new Date();
Expand All @@ -39,13 +43,13 @@ export const record = async ({
// Prepare event buffer if required
const buffer = getEventBuffer({
appId,
bufferSize: BUFFER_SIZE,
region,
credentials,
flushInterval: FLUSH_INTERVAL,
flushSize: FLUSH_SIZE,
bufferSize: bufferSize ?? BUFFER_SIZE,
flushInterval: flushInterval ?? FLUSH_INTERVAL,
flushSize: flushSize ?? FLUSH_SIZE,
resendLimit: resendLimit ?? RESEND_LIMIT,
identityId,
region,
resendLimit: RESEND_LIMIT,
userAgentValue,
});

Expand Down Expand Up @@ -88,6 +92,6 @@ export const record = async ({
event,
session: eventSession!,
timestamp: timestampISOString,
resendLimit: RESEND_LIMIT,
resendLimit: resendLimit ?? RESEND_LIMIT,
});
};
11 changes: 7 additions & 4 deletions packages/core/src/providers/pinpoint/types/buffer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,17 @@ import { AuthSession } from '../../../singleton/Auth/types';
import { PinpointAnalyticsEvent, PinpointSession } from './pinpoint';

export type EventBufferConfig = {
appId: string;
bufferSize: number;
credentials: Required<AuthSession>['credentials'];
identityId: AuthSession['identityId'];
flushInterval: number;
flushSize: number;
region: string;
resendLimit: number;
};

export type PinpointEventBufferConfig = EventBufferConfig & {
appId: string;
region: string;
credentials: Required<AuthSession>['credentials'];
identityId: AuthSession['identityId'];
userAgentValue?: string;
};

Expand Down
10 changes: 9 additions & 1 deletion packages/core/src/providers/pinpoint/types/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,12 @@
// SPDX-License-Identifier: Apache-2.0

export { UpdateEndpointException } from './errors';
export * from './pinpoint';
export {
PinpointUpdateEndpointInput,
PinpointRecordInput,
SupportedCategory,
PinpointProviderConfig,
PinpointSession,
PinpointServiceOptions,
PinpointAnalyticsEvent,
} from './pinpoint';
10 changes: 6 additions & 4 deletions packages/core/src/providers/pinpoint/types/pinpoint.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import { AuthSession } from '../../../singleton/Auth/types';
import { UserProfile } from '../../../types';
import { EventBufferConfig } from './buffer';

export type SupportedCategory =
| 'Analytics'
Expand All @@ -13,7 +14,7 @@ export type SupportedCategory =
type SupportedChannelType = 'APNS' | 'APNS_SANDBOX' | 'GCM' | 'IN_APP';

export type PinpointProviderConfig = {
Pinpoint: {
Pinpoint: Partial<EventBufferConfig> & {
appId: string;
region: string;
};
Expand Down Expand Up @@ -55,6 +56,7 @@ export type PinpointUpdateEndpointInput = PinpointCommonParameters &
userProfile?: UserProfile;
};

export type PinpointRecordInput = PinpointCommonParameters & {
event: PinpointAnalyticsEvent;
};
export type PinpointRecordInput = Partial<EventBufferConfig> &
PinpointCommonParameters & {
event: PinpointAnalyticsEvent;
};
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import {
PutEventsOutput,
} from '../../../awsClients/pinpoint';
import {
EventBufferConfig,
PinpointEventBufferConfig,
BufferedEvent,
BufferedEventMap,
EventBuffer,
Expand All @@ -22,13 +22,13 @@ const RETRYABLE_CODES = [429, 500];
const ACCEPTED_CODES = [202];

export class PinpointEventBuffer {
private _config: EventBufferConfig;
private _config: PinpointEventBufferConfig;
private _interval: ReturnType<typeof setInterval> | undefined = undefined;
private _buffer: EventBuffer;
private _pause = false;
private _flush = false;

constructor(config: EventBufferConfig) {
constructor(config: PinpointEventBufferConfig) {
this._buffer = [];
this._config = config;

Expand Down
30 changes: 19 additions & 11 deletions packages/core/src/providers/pinpoint/utils/getEventBuffer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,32 +3,40 @@

import { EventBufferConfig } from '../types/buffer';
import { PinpointEventBuffer } from './PinpointEventBuffer';
import { BUFFER_SIZE, FLUSH_INTERVAL, FLUSH_SIZE, RESEND_LIMIT } from './constants';
import { AuthSession } from '../../../singleton/Auth/types';

// Map of buffers by region -> appId
const eventBufferMap: Record<string, Record<string, PinpointEventBuffer>> = {};

export type GetEventBufferInput = EventBufferConfig & {
appId: string;
region: string;
credentials: Required<AuthSession>['credentials'];
identityId?: AuthSession['identityId'];
userAgentValue?: string;
};

/**
* Returns a PinpointEventBuffer instance for the specified region & app ID, creating one if it does not yet exist.
*
*
* @internal
*/
export const getEventBuffer = ({
appId,
bufferSize = BUFFER_SIZE,
region,
credentials,
flushInterval = FLUSH_INTERVAL,
flushSize = FLUSH_SIZE,
bufferSize,
flushInterval,
flushSize,
resendLimit,
identityId,
region,
resendLimit = RESEND_LIMIT,
userAgentValue
}: EventBufferConfig): PinpointEventBuffer => {
userAgentValue,
}: GetEventBufferInput): PinpointEventBuffer => {
if (eventBufferMap[region]?.[appId]) {
const buffer = eventBufferMap[region][appId];

/*
If the identity has changed flush out the buffer and create a new instance. The old instance will be garbage
If the identity has changed flush out the buffer and create a new instance. The old instance will be garbage
collected.
*/
if (buffer.identityHasChanged(identityId)) {
Expand All @@ -47,7 +55,7 @@ export const getEventBuffer = ({
identityId,
region,
resendLimit,
userAgentValue
userAgentValue,
});

if (!eventBufferMap[region]) {
Expand Down

0 comments on commit d2b1d90

Please sign in to comment.