Skip to content

Commit

Permalink
feat: handle rejected logs, new unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Samaritan1011001 committed Jan 30, 2024
1 parent 61622ba commit fee80f1
Show file tree
Hide file tree
Showing 6 changed files with 158 additions and 50 deletions.
6 changes: 6 additions & 0 deletions packages/core/src/utils/deviceId/getDeviceId.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,14 @@
import { Cache } from '../../Cache';
import { amplifyUuid } from '../amplifyUuid';

/**
* Local storage key to store the device id
*/
const _localStorageKey = 'amplify-device-id';

/**
* Utility to generate or return cached deviceId
*/
export async function getDeviceId(): Promise<string | undefined> {
let deviceId = (await Cache.getItem(_localStorageKey)) as string | undefined;
if (!!deviceId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ import {
PutLogEventsCommand,
} from '@aws-sdk/client-cloudwatch-logs';
import { Reachability } from '@aws-amplify/core/internals/utils';
import { Observable, Observer } from 'rxjs';
import { handleRejectedLogEvents } from '../../../../src/providers/cloudwatch/client/CloudWatchClient';

const mockedQueuedStorage = {
add: jest.fn(),
isFull: jest.fn(),
Expand All @@ -29,7 +32,22 @@ describe('Cloudwatch provider APIs:', () => {
category: 'Auth',
};
const intendedLogMessageFormat = `[${testLog.logLevel}] ${testLog.namespace}/${testLog.category}: ${testLog.message}`;
let reachabilityObserver: Observer<{ online: boolean }>;

beforeAll(() => {
jest
.spyOn(Reachability.prototype, 'networkMonitor')
.mockImplementationOnce(() => {
return new Observable(observer => {
reachabilityObserver = observer;
});
})
// Twice because we subscribe to get the initial state then again to monitor reachability
.mockImplementationOnce(() => {
return new Observable(observer => {
reachabilityObserver = observer;
});
});
cloudWatchProvider.configure({
region: 'us-test-1',
logGroupName: 'test-group-name',
Expand Down Expand Up @@ -90,8 +108,55 @@ describe('Cloudwatch provider APIs:', () => {
await cloudWatchProvider.flushLogs();
expect(mockedQueuedStorage.delete).toHaveBeenCalledTimes(1);
});

it('should not send the logs to cloud watch when the device is offline', async () => {
CloudWatchLogsClient.prototype.send = jest.fn(
(command: PutLogEventsCommand) => {
return Promise.resolve({
rejectedLogEventsInfo: undefined,
});
}
);
expect(mockCreateQueuedStorage).toHaveBeenCalled();
mockedQueuedStorage.peekAll.mockReturnValue([
{
content: intendedLogMessageFormat,
timestamp: new Date().getTime().toString(),
},
]);
jest
.spyOn(Reachability.prototype, 'isOnline')
.mockImplementationOnce(() => {
return false;
});
await cloudWatchProvider.flushLogs();
expect(CloudWatchLogsClient.prototype.send).toHaveBeenCalledTimes(0);
});
});
describe('_sendToCloudWatch', () => {
it('should handle rejected tooOld log events: ', async () => {});
describe('handleRejectedLogEvents', () => {
let batchedLogs: any[];
beforeEach(() => {
batchedLogs = [
{ content: 'content-0', timestamp: '' },
{ content: 'content-1', timestamp: '' },
{ content: 'content-2', timestamp: '' },
{ content: 'content-3', timestamp: '' },
];
});
it('should delete the rejected tooOld and expired logs from storage: ', async () => {
handleRejectedLogEvents(batchedLogs, {
expiredLogEventEndIndex: 1,
tooOldLogEventEndIndex: 2,
});
expect(mockedQueuedStorage.delete).toHaveBeenLastCalledWith(batchedLogs);
});
it('should delete all the logs preceding tooNewLogEventStartIndex from storage: ', async () => {
handleRejectedLogEvents(batchedLogs, {
tooNewLogEventStartIndex: 2,
});
expect(mockedQueuedStorage.delete).toHaveBeenLastCalledWith(
batchedLogs.slice(2)
);
});
});
});
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

import { getDefaultStreamName } from '../../../../src/providers/cloudwatch/utils';
import { fetchAuthSession } from '@aws-amplify/core';

const mockFetchAuthSession = fetchAuthSession as jest.Mock;
const testDeviceId = 'test-device-id-1';

jest.mock('@aws-amplify/core');
jest.mock('@aws-amplify/core/internals/utils', () => ({
...jest.requireActual('@aws-amplify/core/internals/utils'),
getDeviceId: jest.fn(() => testDeviceId),
}));

describe('CloudWatch Utils: ', () => {
describe('getDefaultStreamName', () => {
it('should return a log stream name for a guest', async () => {
mockFetchAuthSession.mockImplementationOnce(() => {
return { userSub: undefined };
});
expect(await getDefaultStreamName()).toContain(`${testDeviceId}.guest`);
});
it('should return a log stream name for a logged in user', async () => {
const loggedInUserSub = 'test-logged-in-user-sub';
mockFetchAuthSession.mockImplementationOnce(() => {
return { userSub: loggedInUserSub };
});
expect(await getDefaultStreamName()).toContain(
`${testDeviceId}.${loggedInUserSub}`
);
});
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,16 @@ import {
InputLogEvent,
PutLogEventsCommand,
PutLogEventsCommandInput,
PutLogEventsCommandOutput,
RejectedLogEventsInfo,
} from '@aws-sdk/client-cloudwatch-logs';
import { LogLevel, LogParams } from '../../../types';
import { CloudWatchConfig, CloudWatchProvider } from '../types';
import { createQueuedStorage, QueuedStorage } from '@aws-amplify/core';
import { QueuedItem } from '@aws-amplify/core/dist/esm/utils/queuedStorage/types';
import { NetworkConnectionMonitor } from '@aws-amplify/core/internals/utils';
import { getDefaultStreamName } from '../utils';
import { resolveCredentials } from '../../../utils/resolveCredentials';
// TODO: Fix this type import
import { QueuedItem } from '@aws-amplify/core/dist/esm/utils/queuedStorage/types';

const DEFAULT_LOG_LEVEL: LogLevel = 'INFO';

Expand All @@ -33,20 +33,17 @@ const defaultConfig = {
defaultLogLevel: DEFAULT_LOG_LEVEL,
},
};
// TODO: Needs a syncing module to chek and flushLogs

export const cloudWatchProvider: CloudWatchProvider = {
/**
* set the initial configuration
* @internal
*/
configure: async (config: CloudWatchConfig) => {
// TODO(ashwinkumar6): rename 'initialize' to 'configure'. Allow configuring multiple times
// TODO(ashwinkumar6): create and use LoggingError
// TODO(ashwinkumar6): fix merge logic, support nested
cloudWatchConfig = { ...defaultConfig, ...config };
const { region } = cloudWatchConfig;

// TODO: update this client when credentials change
// TODO: Test credentials change
cloudWatchSDKClient = new CloudWatchLogsClient({
region,
credentials: resolveCredentials,
Expand All @@ -68,6 +65,8 @@ export const cloudWatchProvider: CloudWatchProvider = {
const { namespace, category, logLevel, message } = input;
const categoryPrefix = category ? `/${category}` : '';
const prefix = `[${logLevel}] ${namespace}${categoryPrefix}`;

// Final log format looks like this: `[${logLevel}] ${namespace}/${category}: ${message}`
const content = `${prefix}: ${message}`;

// Store log with log rotation enabled if it's full
Expand Down Expand Up @@ -117,7 +116,7 @@ export const cloudWatchProvider: CloudWatchProvider = {

async function _sendToCloudWatch(batchedLogs: QueuedItem[]) {
const { logGroupName } = cloudWatchConfig;
// TODO: Decide how cx can give their own logStreamName
// TODO: how can cx give their own logStreamName?
const logStreamName = await getDefaultStreamName();
const logBatch: PutLogEventsCommandInput = {
logEvents: convertBufferLogsToCWLogs(batchedLogs),
Expand All @@ -126,37 +125,38 @@ async function _sendToCloudWatch(batchedLogs: QueuedItem[]) {
};
if (sdkClientConstraintsSatisfied(logBatch)) {
networkMonitor.enableNetworkMonitoringFor(async () => {
await _handleRejectedLogEvents(
batchedLogs,
(await cloudWatchSDKClient.send(new PutLogEventsCommand(logBatch)))
.rejectedLogEventsInfo
);
let rejectedLogEventsInfo;
try {
rejectedLogEventsInfo = (
await cloudWatchSDKClient.send(new PutLogEventsCommand(logBatch))
).rejectedLogEventsInfo;
await handleRejectedLogEvents(batchedLogs, rejectedLogEventsInfo);
} catch (e) {
// TODO: Should we log to console or dispatch a hub event?
}
});
}
}

async function _handleRejectedLogEvents(
// Exporting this function for testing purposes
export async function handleRejectedLogEvents(
batchedLogs: QueuedItem[],
rejectedLogEventsInfo?: RejectedLogEventsInfo
) {
if (!rejectedLogEventsInfo) {
await queuedStorage.delete(batchedLogs);
return;
}
const { oldOrExpiredLogsEndIndex, tooNewLogEventStartIndex } =
parseRejectedLogEvents(rejectedLogEventsInfo);
if (oldOrExpiredLogsEndIndex) {
await queuedStorage.delete(batchedLogs.slice(oldOrExpiredLogsEndIndex));
}
if (oldOrExpiredLogsEndIndex && tooNewLogEventStartIndex) {
// These are the ones that succeeded in going to CW
// If there is tooNewLogEvents delete every log until then
if (rejectedLogEventsInfo?.tooNewLogEventStartIndex) {
await queuedStorage.delete(
batchedLogs.slice(oldOrExpiredLogsEndIndex, tooNewLogEventStartIndex)
batchedLogs.slice(rejectedLogEventsInfo.tooNewLogEventStartIndex)
);
// If there is no tooNewLogEvents then others are either tooOld, expired or successfully logged so delete them from storage
} else {
await queuedStorage.delete(batchedLogs);
return;
}
if (tooNewLogEventStartIndex) {
// TODO: Needs design clarification on how to handle too new logs
}

// TODO:
// 1. Needs design clarification on how to handle tooNewLogEventStartIndex -- For now following the Android impl of keeping them in local memory(cache).
// 2. Retry logic for the same needs to be implemented
}

function _isLoggable(log: LogParams): boolean {
Expand All @@ -183,21 +183,3 @@ function convertBufferLogsToCWLogs(
};
});
}

function parseRejectedLogEvents(rejectedLogEventsInfo: RejectedLogEventsInfo) {
const {
tooOldLogEventEndIndex,
tooNewLogEventStartIndex,
expiredLogEventEndIndex,
} = rejectedLogEventsInfo;
let oldOrExpiredLogsEndIndex;
if (tooOldLogEventEndIndex) {
oldOrExpiredLogsEndIndex = tooOldLogEventEndIndex;
}
if (expiredLogEventEndIndex) {
oldOrExpiredLogsEndIndex = oldOrExpiredLogsEndIndex
? Math.max(oldOrExpiredLogsEndIndex, expiredLogEventEndIndex)
: expiredLogEventEndIndex;
}
return { oldOrExpiredLogsEndIndex, tooNewLogEventStartIndex };
}
2 changes: 1 addition & 1 deletion packages/logging/src/providers/cloudwatch/utils/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

export { getDefaultStreamName } from './utils';
export { getDefaultStreamName, parseRejectedLogEvents } from './utils';
21 changes: 21 additions & 0 deletions packages/logging/src/providers/cloudwatch/utils/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import { fetchAuthSession } from '@aws-amplify/core';
import { getDeviceId } from '@aws-amplify/core/internals/utils';
import { RejectedLogEventsInfo } from '@aws-sdk/client-cloudwatch-logs';

const GUEST_USER_ID_FOR_LOG_STREAM_NAME: string = 'guest';

Expand All @@ -13,3 +14,23 @@ export async function getDefaultStreamName() {
const dateNow = new Date().toISOString().split('T')[0];
return `${dateNow}.${deviceId}.${userId}`;
}

export function parseRejectedLogEvents(
rejectedLogEventsInfo: RejectedLogEventsInfo
) {
const {
tooOldLogEventEndIndex,
tooNewLogEventStartIndex,
expiredLogEventEndIndex,
} = rejectedLogEventsInfo;
let oldOrExpiredLogsEndIndex;
if (tooOldLogEventEndIndex) {
oldOrExpiredLogsEndIndex = tooOldLogEventEndIndex;
}
if (expiredLogEventEndIndex) {
oldOrExpiredLogsEndIndex = oldOrExpiredLogsEndIndex
? Math.max(oldOrExpiredLogsEndIndex, expiredLogEventEndIndex)
: expiredLogEventEndIndex;
}
return { oldOrExpiredLogsEndIndex, tooNewLogEventStartIndex };
}

0 comments on commit fee80f1

Please sign in to comment.