Skip to content

Commit

Permalink
wip: handle rejected log events
Browse files Browse the repository at this point in the history
  • Loading branch information
Samaritan1011001 committed Jan 27, 2024
1 parent 8aa762b commit 61622ba
Show file tree
Hide file tree
Showing 3 changed files with 123 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,82 +6,92 @@ import { createQueuedStorage, QueuedStorage } from '@aws-amplify/core';
import { LogParams } from '../../../../src/types';
import {
CloudWatchLogsClient,
PutLogEventsCommand,
} from '@aws-sdk/client-cloudwatch-logs';
import { Reachability } from '@aws-amplify/core/internals/utils';
const mockedQueuedStorage = {
add: jest.fn(),
isFull: jest.fn(),
peekAll: jest.fn()
}
peekAll: jest.fn(),
delete: jest.fn(),
};
jest.mock('@aws-amplify/core', () => ({
...jest.requireActual('@aws-amplify/core'),
createQueuedStorage: jest.fn(()=>mockedQueuedStorage),
createQueuedStorage: jest.fn(() => mockedQueuedStorage),
}));
const mockCreateQueuedStorage = createQueuedStorage as jest.Mock;


// const mockCloudWatchLogsClient = jest.fn().mockImplementation(() => {
// return mockCWClientInstance;
// });
// CloudWatchLogsClient.prototype.send = jest.fn((command) => {
// return Promise.resolve({});
// });
const mockCWClientSend = jest.fn();
jest.mock('@aws-sdk/client-cloudwatch-logs', () => ({
__esModule: true,
// ...jest.requireActual('@aws-sdk/client-cloudwatch-logs'),
CloudWatchLogsClient: jest.fn().mockImplementation((param) => {
return {
send: mockCWClientSend
}
}),
PutLogEventsCommand: jest.fn().mockImplementation((param) => {
return {
}
})

}));

describe('Cloudwatch provider APIs:', () => {
const testLog: LogParams = {logLevel:'INFO', message: 'test-message-1',namespace:'test-namespace', category:'Auth'}
const intendedLogMessageFormat = `[${testLog.logLevel}] ${testLog.namespace}/${testLog.category}: ${testLog.message}`
beforeAll(()=>{
cloudWatchProvider.configure({region: 'us-test-1', logGroupName:'test-group-name'})
let testLog: LogParams = {
logLevel: 'INFO',
message: 'test-message-1',
namespace: 'test-namespace',
category: 'Auth',
};
const intendedLogMessageFormat = `[${testLog.logLevel}] ${testLog.namespace}/${testLog.category}: ${testLog.message}`;
beforeAll(() => {
cloudWatchProvider.configure({
region: 'us-test-1',
logGroupName: 'test-group-name',
});
});
afterEach(()=>{
mockedQueuedStorage.add.mockReset()
})
describe('log', () => {
it('should add given log to local storage', () => {
expect(mockCreateQueuedStorage).toHaveBeenCalled();
mockedQueuedStorage.isFull.mockReturnValue(false)
cloudWatchProvider.log(testLog)
expect(mockedQueuedStorage.add).toHaveBeenCalledWith(expect.objectContaining({content: expect.stringContaining(testLog.message)}), {dequeueBeforeEnqueue: false});
afterEach(() => {
mockedQueuedStorage.add.mockReset();
});
it('should store the intented log message format', () => {
expect(mockCreateQueuedStorage).toHaveBeenCalled();
mockedQueuedStorage.isFull.mockReturnValue(false)
cloudWatchProvider.log(testLog)
expect(mockedQueuedStorage.add).toHaveBeenCalledWith(expect.objectContaining({content: intendedLogMessageFormat}), {dequeueBeforeEnqueue: false});
describe('log', () => {
it('should add given log to local storage', () => {
expect(mockCreateQueuedStorage).toHaveBeenCalled();
mockedQueuedStorage.isFull.mockReturnValue(false);
cloudWatchProvider.log(testLog);
expect(mockedQueuedStorage.add).toHaveBeenCalledWith(
expect.objectContaining({
content: expect.stringContaining(testLog.message),
}),
{ dequeueBeforeEnqueue: false }
);
});
it('should store the intented log message format', () => {
expect(mockCreateQueuedStorage).toHaveBeenCalled();
mockedQueuedStorage.isFull.mockReturnValue(false);
cloudWatchProvider.log(testLog);
expect(mockedQueuedStorage.add).toHaveBeenCalledWith(
expect.objectContaining({ content: intendedLogMessageFormat }),
{ dequeueBeforeEnqueue: false }
);
});
});
});
describe('flushLogs', () => {
it('should flush the logs to cloud watch given the device is online', () => {
// const cwClientSpy = jest.spyOn(CloudWatchLogsClient.prototype, 'send');
it('should flush the logs to cloud watch given the device is online', async () => {
expect.assertions(4);

CloudWatchLogsClient.prototype.send = jest.fn(
(command: PutLogEventsCommand) => {
expect(command.input.logEvents).toBeDefined();
expect(command.input.logEvents![0].message).toEqual(
intendedLogMessageFormat
);
return Promise.resolve({
rejectedLogEventsInfo: undefined,
});
}
);
expect(mockCreateQueuedStorage).toHaveBeenCalled();
mockedQueuedStorage.peekAll.mockReturnValue([{content: intendedLogMessageFormat, timestamp: new Date().getTime().toString()}])
mockedQueuedStorage.peekAll.mockReturnValue([
{
content: intendedLogMessageFormat,
timestamp: new Date().getTime().toString(),
},
]);
jest
.spyOn(Reachability.prototype, 'isOnline')
.mockImplementationOnce(() => {
return true;
});
cloudWatchProvider.flushLogs()
// TODO: Unable to mock send and test if it was sent.
// expect(mockCWClientSend).toHaveBeenCalled();

.spyOn(Reachability.prototype, 'isOnline')
.mockImplementationOnce(() => {
return true;
});
await cloudWatchProvider.flushLogs();
expect(mockedQueuedStorage.delete).toHaveBeenCalledTimes(1);
});

// TODO: add test to check the right format is stored
});
describe('_sendToCloudWatch', () => {
it('should handle rejected tooOld log events: ', async () => {});
});
});
4 changes: 2 additions & 2 deletions packages/logging/src/errors/assertValidationError.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@
// SPDX-License-Identifier: Apache-2.0

import { LoggingError } from './LoggingError';
import { AnalyticsValidationErrorCode, validationErrorMap } from './validation';
import { LoggingValidationErrorCode, validationErrorMap } from './validation';

/**
* @internal
*/
export function assertValidationError(
assertion: boolean,
name: AnalyticsValidationErrorCode,
name: LoggingValidationErrorCode,
message?: string
): asserts assertion {
const { message: defaultMessage, recoverySuggestion } =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import {
InputLogEvent,
PutLogEventsCommand,
PutLogEventsCommandInput,
PutLogEventsCommandOutput,
RejectedLogEventsInfo,
} from '@aws-sdk/client-cloudwatch-logs';
import { LogLevel, LogParams } from '../../../types';
import { CloudWatchConfig, CloudWatchProvider } from '../types';
Expand Down Expand Up @@ -82,7 +84,6 @@ export const cloudWatchProvider: CloudWatchProvider = {
}
);
// TODO: call startSyncIfNotInProgress
console.log('Done logging the event');
},

// TODO: Need a module to tie log and flushLogs together. log -> storage -> buffer -> flushLogs
Expand All @@ -94,8 +95,8 @@ export const cloudWatchProvider: CloudWatchProvider = {
*/
flushLogs: async (): Promise<void> => {
// TODO: Get these messages from buffer and not storage
const messages = await queuedStorage.peekAll();
await _sendToCloudWatch(convertBufferLogsToCWLogs(messages));
const batchedLogs = await queuedStorage.peekAll();
await _sendToCloudWatch(batchedLogs);
return Promise.resolve();
},
/**
Expand All @@ -114,20 +115,47 @@ export const cloudWatchProvider: CloudWatchProvider = {
},
};

async function _sendToCloudWatch(messages: InputLogEvent[]) {
async function _sendToCloudWatch(batchedLogs: QueuedItem[]) {
const { logGroupName } = cloudWatchConfig;
// TODO: Decide how cx can give their own logStreamName
const logStreamName = await getDefaultStreamName();
const logBatch: PutLogEventsCommandInput = {
logEvents: messages,
logEvents: convertBufferLogsToCWLogs(batchedLogs),
logGroupName,
logStreamName,
};
if (sdkClientConstraintsSatisfied(logBatch)) {
networkMonitor.enableNetworkMonitoringFor(async () => {
await cloudWatchSDKClient.send(new PutLogEventsCommand(logBatch));
await _handleRejectedLogEvents(
batchedLogs,
(await cloudWatchSDKClient.send(new PutLogEventsCommand(logBatch)))
.rejectedLogEventsInfo
);
});
// TODO: retry with failed logs
}
}

async function _handleRejectedLogEvents(
batchedLogs: QueuedItem[],
rejectedLogEventsInfo?: RejectedLogEventsInfo
) {
if (!rejectedLogEventsInfo) {
await queuedStorage.delete(batchedLogs);
return;
}
const { oldOrExpiredLogsEndIndex, tooNewLogEventStartIndex } =
parseRejectedLogEvents(rejectedLogEventsInfo);
if (oldOrExpiredLogsEndIndex) {
await queuedStorage.delete(batchedLogs.slice(oldOrExpiredLogsEndIndex));
}
if (oldOrExpiredLogsEndIndex && tooNewLogEventStartIndex) {
// These are the ones that succeeded in going to CW
await queuedStorage.delete(
batchedLogs.slice(oldOrExpiredLogsEndIndex, tooNewLogEventStartIndex)
);
}
if (tooNewLogEventStartIndex) {
// TODO: Needs design clarification on how to handle too new logs
}
}

Expand Down Expand Up @@ -155,3 +183,21 @@ function convertBufferLogsToCWLogs(
};
});
}

function parseRejectedLogEvents(rejectedLogEventsInfo: RejectedLogEventsInfo) {
const {
tooOldLogEventEndIndex,
tooNewLogEventStartIndex,
expiredLogEventEndIndex,
} = rejectedLogEventsInfo;
let oldOrExpiredLogsEndIndex;
if (tooOldLogEventEndIndex) {
oldOrExpiredLogsEndIndex = tooOldLogEventEndIndex;
}
if (expiredLogEventEndIndex) {
oldOrExpiredLogsEndIndex = oldOrExpiredLogsEndIndex
? Math.max(oldOrExpiredLogsEndIndex, expiredLogEventEndIndex)
: expiredLogEventEndIndex;
}
return { oldOrExpiredLogsEndIndex, tooNewLogEventStartIndex };
}

0 comments on commit 61622ba

Please sign in to comment.