diff --git a/packages/sdk/browser/src/BrowserClient.ts b/packages/sdk/browser/src/BrowserClient.ts index 6bb99c8b1..79153a220 100644 --- a/packages/sdk/browser/src/BrowserClient.ts +++ b/packages/sdk/browser/src/BrowserClient.ts @@ -2,6 +2,8 @@ import { AutoEnvAttributes, base64UrlEncode, LDClient as CommonClient, + DataSourcePaths, + Encoding, LDClientImpl, LDContext, LDOptions, @@ -33,11 +35,27 @@ export class BrowserClient extends LDClientImpl { return base64UrlEncode(JSON.stringify(context), this.platform.encoding!); } - override createStreamUriPath(context: LDContext) { - return `/eval/${this.clientSideId}/${this.encodeContext(context)}`; + override getStreamingPaths(): DataSourcePaths { + const parentThis = this; + return { + pathGet(encoding: Encoding, _plainContextString: string): string { + return `/eval/${parentThis.clientSideId}/${base64UrlEncode(_plainContextString, encoding)}`; + }, + pathReport(_encoding: Encoding, _plainContextString: string): string { + return `/eval/${parentThis.clientSideId}`; + }, + }; } - override createPollUriPath(context: LDContext): string { - return `/sdk/evalx/${this.clientSideId}/contexts/${this.encodeContext(context)}`; + override getPollingPaths(): DataSourcePaths { + const parentThis = this; + return { + pathGet(encoding: Encoding, _plainContextString: string): string { + return `/sdk/evalx/${parentThis.clientSideId}/contexts/${base64UrlEncode(_plainContextString, encoding)}`; + }, + pathReport(_encoding: Encoding, _plainContextString: string): string { + return `/sdk/evalx/${parentThis.clientSideId}/context`; + }, + }; } } diff --git a/packages/sdk/react-native/package.json b/packages/sdk/react-native/package.json index d7e91dd5f..9abc8441e 100644 --- a/packages/sdk/react-native/package.json +++ b/packages/sdk/react-native/package.json @@ -34,9 +34,7 @@ "prettier": "prettier --write '**/*.@(js|ts|tsx|json|css)' --ignore-path ../../../.prettierignore", "test": "jest", "coverage": "yarn test --coverage", - "check": "yarn prettier && yarn lint && yarn build && yarn test", - "android": "yarn && yarn ./example && yarn build && (cd example/ && yarn android-release)", - "ios": "yarn && yarn ./example && yarn build && (cd example/ && yarn ios-go)" + "check": "yarn prettier && yarn lint && yarn build && yarn test" }, "peerDependencies": { "react": "*", diff --git a/packages/sdk/react-native/src/ReactNativeLDClient.ts b/packages/sdk/react-native/src/ReactNativeLDClient.ts index 44558a5cb..a0ade06ed 100644 --- a/packages/sdk/react-native/src/ReactNativeLDClient.ts +++ b/packages/sdk/react-native/src/ReactNativeLDClient.ts @@ -4,6 +4,8 @@ import { base64UrlEncode, BasicLogger, ConnectionMode, + DataSourcePaths, + Encoding, internal, LDClientImpl, type LDContext, @@ -103,12 +105,26 @@ export default class ReactNativeLDClient extends LDClientImpl { return base64UrlEncode(JSON.stringify(context), this.platform.encoding!); } - override createStreamUriPath(context: LDContext) { - return `/meval/${this.encodeContext(context)}`; + override getStreamingPaths(): DataSourcePaths { + return { + pathGet(encoding: Encoding, _plainContextString: string): string { + return `/meval/${base64UrlEncode(_plainContextString, encoding)}`; + }, + pathReport(_encoding: Encoding, _plainContextString: string): string { + return `/meval`; + }, + }; } - override createPollUriPath(context: LDContext): string { - return `/msdk/evalx/contexts/${this.encodeContext(context)}`; + override getPollingPaths(): DataSourcePaths { + return { + pathGet(encoding: Encoding, _plainContextString: string): string { + return `/msdk/evalx/contexts/${base64UrlEncode(_plainContextString, encoding)}`; + }, + pathReport(_encoding: Encoding, _plainContextString: string): string { + return `/msdk/evalx/context`; + }, + }; } override async setConnectionMode(mode: ConnectionMode): Promise { diff --git a/packages/sdk/react-native/src/platform/PlatformRequests.ts b/packages/sdk/react-native/src/platform/PlatformRequests.ts index 6add1e7be..0fe8698f7 100644 --- a/packages/sdk/react-native/src/platform/PlatformRequests.ts +++ b/packages/sdk/react-native/src/platform/PlatformRequests.ts @@ -16,7 +16,9 @@ export default class PlatformRequests implements Requests { createEventSource(url: string, eventSourceInitDict: EventSourceInitDict): EventSource { return new RNEventSource(url, { + method: eventSourceInitDict.method ?? 'GET', headers: eventSourceInitDict.headers, + body: eventSourceInitDict.body, retryAndHandleError: eventSourceInitDict.errorFilter, logger: this.logger, }); diff --git a/packages/shared/common/src/api/platform/EventSource.ts b/packages/shared/common/src/api/platform/EventSource.ts index a9214b015..55bef9bfe 100644 --- a/packages/shared/common/src/api/platform/EventSource.ts +++ b/packages/shared/common/src/api/platform/EventSource.ts @@ -18,8 +18,10 @@ export interface EventSource { } export interface EventSourceInitDict { - errorFilter: (err: HttpErrorResponse) => boolean; + method?: string; headers: { [key: string]: string | string[] }; + body?: string; + errorFilter: (err: HttpErrorResponse) => boolean; initialRetryDelayMillis: number; readTimeoutMillis: number; retryResetIntervalMillis: number; diff --git a/packages/shared/common/src/internal/stream/StreamingProcessor.ts b/packages/shared/common/src/internal/stream/StreamingProcessor.ts index a4d115e40..b91e103a3 100644 --- a/packages/shared/common/src/internal/stream/StreamingProcessor.ts +++ b/packages/shared/common/src/internal/stream/StreamingProcessor.ts @@ -25,6 +25,7 @@ const reportJsonError = ( errorHandler?.(new LDStreamingError('Malformed JSON data in event stream')); }; +// TODO: SDK-156 - Move to Server SDK specific location class StreamingProcessor implements LDStreamProcessor { private readonly headers: { [key: string]: string | string[] }; private readonly streamUri: string; diff --git a/packages/shared/sdk-client/__tests__/LDClientImpl.events.test.ts b/packages/shared/sdk-client/__tests__/LDClientImpl.events.test.ts index 09fdac1fe..093fcaef3 100644 --- a/packages/shared/sdk-client/__tests__/LDClientImpl.events.test.ts +++ b/packages/shared/sdk-client/__tests__/LDClientImpl.events.test.ts @@ -2,6 +2,7 @@ import { AutoEnvAttributes, ClientContext, clone, + Encoding, internal, LDContext, subsystem, @@ -10,12 +11,12 @@ import { createBasicPlatform, createLogger, MockEventProcessor, - setupMockStreamingProcessor, } from '@launchdarkly/private-js-mocks'; import LDClientImpl from '../src/LDClientImpl'; import { Flags } from '../src/types'; import * as mockResponseJson from './evaluation/mockResponse.json'; +import { MockEventSource } from './streaming/LDClientImpl.mocks'; type InputCustomEvent = internal.InputCustomEvent; type InputIdentifyEvent = internal.InputIdentifyEvent; @@ -36,7 +37,6 @@ jest.mock('@launchdarkly/js-sdk-common', () => { ...{ internal: { ...actual.internal, - StreamingProcessor: m.MockStreamingProcessor, EventProcessor: m.MockEventProcessor, }, }, @@ -45,6 +45,7 @@ jest.mock('@launchdarkly/js-sdk-common', () => { const testSdkKey = 'test-sdk-key'; let ldc: LDClientImpl; +let mockEventSource: MockEventSource; let defaultPutResponse: Flags; const carContext: LDContext = { kind: 'car', key: 'test-car' }; @@ -66,15 +67,31 @@ describe('sdk-client object', () => { sendEvent: mockedSendEvent, }), ); - setupMockStreamingProcessor(false, defaultPutResponse); + + const simulatedEvents = [{ data: JSON.stringify(defaultPutResponse) }]; + mockPlatform.storage.get.mockImplementation(() => undefined); + mockPlatform.requests.createEventSource.mockImplementation( + (streamUri: string = '', options: any = {}) => { + mockEventSource = new MockEventSource(streamUri, options); + mockEventSource.simulateEvents('put', simulatedEvents); + return mockEventSource; + }, + ); + mockPlatform.crypto.randomUUID.mockReturnValue('random1'); ldc = new LDClientImpl(testSdkKey, AutoEnvAttributes.Enabled, mockPlatform, { logger, }); - jest - .spyOn(LDClientImpl.prototype as any, 'createStreamUriPath') - .mockReturnValue('/stream/path'); + + jest.spyOn(LDClientImpl.prototype as any, 'getStreamingPaths').mockReturnValue({ + pathGet(_encoding: Encoding, _plainContextString: string): string { + return '/stream/path'; + }, + pathReport(_encoding: Encoding, _plainContextString: string): string { + return '/stream/path'; + }, + }); }); afterEach(() => { diff --git a/packages/shared/sdk-client/__tests__/LDClientImpl.storage.test.ts b/packages/shared/sdk-client/__tests__/LDClientImpl.storage.test.ts index 36b8a39b5..976dd2092 100644 --- a/packages/shared/sdk-client/__tests__/LDClientImpl.storage.test.ts +++ b/packages/shared/sdk-client/__tests__/LDClientImpl.storage.test.ts @@ -1,15 +1,12 @@ -import { AutoEnvAttributes, clone, type LDContext, noop } from '@launchdarkly/js-sdk-common'; -import { - createBasicPlatform, - createLogger, - setupMockStreamingProcessor, -} from '@launchdarkly/private-js-mocks'; +import { AutoEnvAttributes, clone, Encoding, type LDContext } from '@launchdarkly/js-sdk-common'; +import { createBasicPlatform, createLogger } from '@launchdarkly/private-js-mocks'; import { toMulti } from '../src/context/addAutoEnv'; import LDClientImpl from '../src/LDClientImpl'; import LDEmitter from '../src/LDEmitter'; -import { DeleteFlag, Flags, PatchFlag } from '../src/types'; +import { Flags, PatchFlag } from '../src/types'; import * as mockResponseJson from './evaluation/mockResponse.json'; +import { MockEventSource } from './streaming/LDClientImpl.mocks'; let mockPlatform: ReturnType; let logger: ReturnType; @@ -19,25 +16,12 @@ beforeEach(() => { logger = createLogger(); }); -jest.mock('@launchdarkly/js-sdk-common', () => { - const actual = jest.requireActual('@launchdarkly/js-sdk-common'); - const { MockStreamingProcessor } = jest.requireActual('@launchdarkly/private-js-mocks'); - return { - ...actual, - ...{ - internal: { - ...actual.internal, - StreamingProcessor: MockStreamingProcessor, - }, - }, - }; -}); - const testSdkKey = 'test-sdk-key'; const context: LDContext = { kind: 'org', key: 'Testy Pizza' }; const flagStorageKey = 'LaunchDarkly_1234567890123456_1234567890123456'; const indexStorageKey = 'LaunchDarkly_1234567890123456_ContextIndex'; let ldc: LDClientImpl; +let mockEventSource: MockEventSource; let emitter: LDEmitter; let defaultPutResponse: Flags; let defaultFlagKeys: string[]; @@ -50,36 +34,6 @@ const onChangePromise = () => }); }); -// Common setup code for all tests -// 1. Sets up streaming -// 2. Sets up the change listener -// 3. Runs identify -// 4. Get all flags -const identifyGetAllFlags = async ( - shouldError: boolean = false, - putResponse = defaultPutResponse, - patchResponse?: PatchFlag, - deleteResponse?: DeleteFlag, - waitForChange: boolean = true, -) => { - setupMockStreamingProcessor(shouldError, putResponse, patchResponse, deleteResponse); - const changePromise = onChangePromise(); - - try { - await ldc.identify(context); - } catch (e) { - /* empty */ - } - jest.runAllTimers(); - - // if streaming errors, don't wait for 'change' because it will not be sent. - if (waitForChange && !shouldError) { - await changePromise; - } - - return ldc.allFlags(); -}; - describe('sdk-client storage', () => { beforeEach(() => { jest.useFakeTimers(); @@ -97,9 +51,14 @@ describe('sdk-client storage', () => { } }); - jest - .spyOn(LDClientImpl.prototype as any, 'createStreamUriPath') - .mockReturnValue('/stream/path'); + jest.spyOn(LDClientImpl.prototype as any, 'getStreamingPaths').mockReturnValue({ + pathGet(_encoding: Encoding, _plainContextString: string): string { + return '/stream/path'; + }, + pathReport(_encoding: Encoding, _plainContextString: string): string { + return '/stream/path'; + }, + }); ldc = new LDClientImpl(testSdkKey, AutoEnvAttributes.Disabled, mockPlatform, { logger, @@ -116,8 +75,17 @@ describe('sdk-client storage', () => { }); test('initialize from storage succeeds without streaming', async () => { - // make sure streaming errors - const allFlags = await identifyGetAllFlags(true, defaultPutResponse); + mockPlatform.requests.createEventSource.mockImplementation( + (streamUri: string = '', options: any = {}) => { + mockEventSource = new MockEventSource(streamUri, options); + mockEventSource.simulateError({ status: 404, message: 'test-error' }); + return mockEventSource; + }, + ); + + const changePromise = onChangePromise(); + await ldc.identify(context); + await changePromise; expect(mockPlatform.storage.get).toHaveBeenCalledWith(flagStorageKey); @@ -130,7 +98,8 @@ describe('sdk-client storage', () => { context, expect.objectContaining({ message: 'test-error' }), ); - expect(allFlags).toEqual({ + + expect(ldc.allFlags()).toEqual({ 'dev-test-flag': true, 'easter-i-tunes-special': false, 'easter-specials': 'no specials', @@ -143,6 +112,14 @@ describe('sdk-client storage', () => { }); test('initialize from storage succeeds with auto env', async () => { + mockPlatform.requests.createEventSource.mockImplementation( + (streamUri: string = '', options: any = {}) => { + mockEventSource = new MockEventSource(streamUri, options); + mockEventSource.simulateError({ status: 404, message: 'test-error' }); + return mockEventSource; + }, + ); + ldc = new LDClientImpl(testSdkKey, AutoEnvAttributes.Enabled, mockPlatform, { logger, sendEvents: false, @@ -151,7 +128,8 @@ describe('sdk-client storage', () => { emitter = ldc.emitter; jest.spyOn(emitter as LDEmitter, 'emit'); - const allFlags = await identifyGetAllFlags(true, defaultPutResponse); + await ldc.identify(context); + await jest.runAllTimersAsync(); expect(mockPlatform.storage.get).toHaveBeenLastCalledWith( expect.stringMatching('LaunchDarkly_1234567890123456_1234567890123456'), @@ -171,7 +149,7 @@ describe('sdk-client storage', () => { expect.objectContaining(toMulti(context)), expect.objectContaining({ message: 'test-error' }), ); - expect(allFlags).toEqual({ + expect(ldc.allFlags()).toEqual({ 'dev-test-flag': true, 'easter-i-tunes-special': false, 'easter-specials': 'no specials', @@ -184,36 +162,38 @@ describe('sdk-client storage', () => { }); test('not emitting change event when changed keys is empty', async () => { - let LDClientImplTestNoChange; - jest.isolateModules(async () => { - LDClientImplTestNoChange = jest.requireActual('../src/LDClientImpl').default; - ldc = new LDClientImplTestNoChange(testSdkKey, AutoEnvAttributes.Enabled, mockPlatform, { - logger, - sendEvents: false, - }); - }); + mockPlatform.requests.createEventSource.mockImplementation( + (streamUri: string = '', options: any = {}) => { + mockEventSource = new MockEventSource(streamUri, options); + return mockEventSource; + }, + ); // @ts-ignore emitter = ldc.emitter; jest.spyOn(emitter as LDEmitter, 'emit'); // expect emission - await identifyGetAllFlags(true, defaultPutResponse); + await ldc.identify(context); // expit no emission - await identifyGetAllFlags(true, defaultPutResponse); + await ldc.identify(context); expect(emitter.emit).toHaveBeenCalledTimes(1); }); test('no storage, cold start from streaming', async () => { - // fake previously cached flags even though there's no storage for this context - // @ts-ignore - ldc.flags = defaultPutResponse; + const simulatedEvents = [{ data: JSON.stringify(defaultPutResponse) }]; mockPlatform.storage.get.mockImplementation(() => undefined); - setupMockStreamingProcessor(false, defaultPutResponse); + mockPlatform.requests.createEventSource.mockImplementation( + (streamUri: string = '', options: any = {}) => { + mockEventSource = new MockEventSource(streamUri, options); + mockEventSource.simulateEvents('put', simulatedEvents); + return mockEventSource; + }, + ); - ldc.identify(context).then(noop); + await ldc.identify(context); await jest.runAllTimersAsync(); expect(mockPlatform.storage.set).toHaveBeenNthCalledWith( @@ -244,12 +224,22 @@ describe('sdk-client storage', () => { test('syncing storage when a flag is deleted', async () => { const putResponse = clone(defaultPutResponse); delete putResponse['dev-test-flag']; - const allFlags = await identifyGetAllFlags(false, putResponse); - // wait for async code to resolve promises + const simulatedEvents = [{ data: JSON.stringify(putResponse) }]; + mockPlatform.requests.createEventSource.mockImplementation( + (streamUri: string = '', options: any = {}) => { + mockEventSource = new MockEventSource(streamUri, options); + mockEventSource.simulateEvents('put', simulatedEvents); + return mockEventSource; + }, + ); + + const changePromise = onChangePromise(); + await ldc.identify(context); + await changePromise; await jest.runAllTimersAsync(); - expect(allFlags).not.toHaveProperty('dev-test-flag'); + expect(ldc.allFlags()).not.toHaveProperty('dev-test-flag'); expect(mockPlatform.storage.set).toHaveBeenCalledTimes(2); expect(mockPlatform.storage.set).toHaveBeenNthCalledWith( 1, @@ -275,12 +265,22 @@ describe('sdk-client storage', () => { variation: 1, trackEvents: false, }; - const allFlags = await identifyGetAllFlags(false, putResponse); - // wait for async code to resolve promises + const simulatedEvents = [{ data: JSON.stringify(putResponse) }]; + mockPlatform.requests.createEventSource.mockImplementation( + (streamUri: string = '', options: any = {}) => { + mockEventSource = new MockEventSource(streamUri, options); + mockEventSource.simulateEvents('put', simulatedEvents); + return mockEventSource; + }, + ); + + const changePromise = onChangePromise(); + await ldc.identify(context); + await changePromise; await jest.runAllTimersAsync(); - expect(allFlags).toMatchObject({ 'another-dev-test-flag': false }); + expect(ldc.allFlags()).toMatchObject({ 'another-dev-test-flag': false }); expect(mockPlatform.storage.set).toHaveBeenCalledTimes(2); expect(mockPlatform.storage.set).toHaveBeenNthCalledWith( 1, @@ -299,9 +299,22 @@ describe('sdk-client storage', () => { const putResponse = clone(defaultPutResponse); putResponse['dev-test-flag'].version = 999; putResponse['dev-test-flag'].value = false; - const allFlags = await identifyGetAllFlags(false, putResponse); - expect(allFlags).toMatchObject({ 'dev-test-flag': false }); + const simulatedEvents = [{ data: JSON.stringify(putResponse) }]; + mockPlatform.requests.createEventSource.mockImplementation( + (streamUri: string = '', options: any = {}) => { + mockEventSource = new MockEventSource(streamUri, options); + mockEventSource.simulateEvents('put', simulatedEvents); + return mockEventSource; + }, + ); + + const changePromise = onChangePromise(); + await ldc.identify(context); + await changePromise; + await jest.runAllTimersAsync(); + + expect(ldc.allFlags()).toMatchObject({ 'dev-test-flag': false }); expect(emitter.emit).toHaveBeenNthCalledWith(2, 'change', context, ['dev-test-flag']); }); @@ -313,13 +326,23 @@ describe('sdk-client storage', () => { putResponse['dev-test-flag'].value = false; putResponse['another-dev-test-flag'] = newFlag; delete putResponse['moonshot-demo']; - const allFlags = await identifyGetAllFlags(false, putResponse); - // wait for async code to resolve promises + const simulatedEvents = [{ data: JSON.stringify(putResponse) }]; + mockPlatform.requests.createEventSource.mockImplementation( + (streamUri: string = '', options: any = {}) => { + mockEventSource = new MockEventSource(streamUri, options); + mockEventSource.simulateEvents('put', simulatedEvents); + return mockEventSource; + }, + ); + + const changePromise = onChangePromise(); + await ldc.identify(context); + await changePromise; await jest.runAllTimersAsync(); - expect(allFlags).toMatchObject({ 'dev-test-flag': false, 'another-dev-test-flag': true }); - expect(allFlags).not.toHaveProperty('moonshot-demo'); + expect(ldc.allFlags()).toMatchObject({ 'dev-test-flag': false, 'another-dev-test-flag': true }); + expect(ldc.allFlags()).not.toHaveProperty('moonshot-demo'); expect(emitter.emit).toHaveBeenNthCalledWith(2, 'change', context, [ 'moonshot-demo', 'dev-test-flag', @@ -328,15 +351,16 @@ describe('sdk-client storage', () => { }); test('syncing storage when PUT is consistent so no change', async () => { - const allFlags = await identifyGetAllFlags( - false, - defaultPutResponse, - undefined, - undefined, - false, + const simulatedEvents = [{ data: JSON.stringify(defaultPutResponse) }]; + mockPlatform.requests.createEventSource.mockImplementation( + (streamUri: string = '', options: any = {}) => { + mockEventSource = new MockEventSource(streamUri, options); + mockEventSource.simulateEvents('put', simulatedEvents); + return mockEventSource; + }, ); - // wait for async code to resolve promises + await ldc.identify(context); await jest.runAllTimersAsync(); expect(mockPlatform.storage.set).toHaveBeenCalledTimes(2); @@ -356,7 +380,7 @@ describe('sdk-client storage', () => { expect(emitter.emit).toHaveBeenNthCalledWith(1, 'change', context, defaultFlagKeys); // this is defaultPutResponse - expect(allFlags).toEqual({ + expect(ldc.allFlags()).toEqual({ 'dev-test-flag': true, 'easter-i-tunes-special': false, 'easter-specials': 'no specials', @@ -372,13 +396,22 @@ describe('sdk-client storage', () => { const putResponse = clone(defaultPutResponse); putResponse['dev-test-flag'].reason = { kind: 'RULE_MATCH', inExperiment: true }; - const allFlags = await identifyGetAllFlags(false, putResponse); + const simulatedEvents = [{ data: JSON.stringify(putResponse) }]; + mockPlatform.requests.createEventSource.mockImplementation( + (streamUri: string = '', options: any = {}) => { + mockEventSource = new MockEventSource(streamUri, options); + mockEventSource.simulateEvents('put', simulatedEvents); + return mockEventSource; + }, + ); - // wait for async code to resolve promises + const changePromise = onChangePromise(); + await ldc.identify(context); + await changePromise; await jest.runAllTimersAsync(); - const flagsInStorage = JSON.parse(mockPlatform.storage.set.mock.lastCall[1]) as Flags; - expect(allFlags).toMatchObject({ 'dev-test-flag': true }); + const flagsInStorage = JSON.parse(mockPlatform.storage.set.mock.lastCall[1]) as Flags; + expect(ldc.allFlags()).toMatchObject({ 'dev-test-flag': true }); expect(flagsInStorage['dev-test-flag'].reason).toEqual({ kind: 'RULE_MATCH', inExperiment: true, @@ -395,13 +428,24 @@ describe('sdk-client storage', () => { patchResponse.value = false; patchResponse.version += 1; - const allFlags = await identifyGetAllFlags(false, defaultPutResponse, patchResponse); + const putEvents = [{ data: JSON.stringify(defaultPutResponse) }]; + const patchEvents = [{ data: JSON.stringify(patchResponse) }]; + mockPlatform.requests.createEventSource.mockImplementation( + (streamUri: string = '', options: any = {}) => { + mockEventSource = new MockEventSource(streamUri, options); + mockEventSource.simulateEvents('put', putEvents); + mockEventSource.simulateEvents('patch', patchEvents); + return mockEventSource; + }, + ); - // wait for async code to resolve promises + const changePromise = onChangePromise(); + await ldc.identify(context); + await changePromise; await jest.runAllTimersAsync(); const flagsInStorage = JSON.parse(mockPlatform.storage.set.mock.lastCall[1]) as Flags; - expect(allFlags).toMatchObject({ 'dev-test-flag': false }); + expect(ldc.allFlags()).toMatchObject({ 'dev-test-flag': false }); expect(mockPlatform.storage.set).toHaveBeenCalledTimes(4); expect(flagsInStorage['dev-test-flag'].version).toEqual(patchResponse.version); expect(emitter.emit).toHaveBeenCalledTimes(2); @@ -412,13 +456,24 @@ describe('sdk-client storage', () => { const patchResponse = clone(defaultPutResponse['dev-test-flag']); patchResponse.key = 'another-dev-test-flag'; - const allFlags = await identifyGetAllFlags(false, defaultPutResponse, patchResponse); + const putEvents = [{ data: JSON.stringify(defaultPutResponse) }]; + const patchEvents = [{ data: JSON.stringify(patchResponse) }]; + mockPlatform.requests.createEventSource.mockImplementation( + (streamUri: string = '', options: any = {}) => { + mockEventSource = new MockEventSource(streamUri, options); + mockEventSource.simulateEvents('put', putEvents); + mockEventSource.simulateEvents('patch', patchEvents); + return mockEventSource; + }, + ); - // wait for async code to resolve promises + const changePromise = onChangePromise(); + await ldc.identify(context); + await changePromise; await jest.runAllTimersAsync(); const flagsInStorage = JSON.parse(mockPlatform.storage.set.mock.lastCall[1]) as Flags; - expect(allFlags).toHaveProperty('another-dev-test-flag'); + expect(ldc.allFlags()).toHaveProperty('another-dev-test-flag'); expect(mockPlatform.storage.set).toHaveBeenNthCalledWith( 4, flagStorageKey, @@ -435,19 +490,28 @@ describe('sdk-client storage', () => { patchResponse.value = false; patchResponse.version -= 1; - const allFlags = await identifyGetAllFlags( - false, - defaultPutResponse, - patchResponse, - undefined, - false, + const putEvents = [{ data: JSON.stringify(defaultPutResponse) }]; + const patchEvents = [{ data: JSON.stringify(patchResponse) }]; + mockPlatform.requests.createEventSource.mockImplementation( + (streamUri: string = '', options: any = {}) => { + mockEventSource = new MockEventSource(streamUri, options); + mockEventSource.simulateEvents('put', putEvents); + mockEventSource.simulateEvents('patch', patchEvents); + return mockEventSource; + }, ); - expect(mockPlatform.storage.set).toHaveBeenCalledTimes(0); + const changePromise = onChangePromise(); + await ldc.identify(context); + await changePromise; + await jest.runAllTimersAsync(); + + // the initial put is resulting in two sets, one for the index and one for the flag data + expect(mockPlatform.storage.set).toHaveBeenCalledTimes(2); expect(emitter.emit).not.toHaveBeenCalledWith('change'); // this is defaultPutResponse - expect(allFlags).toEqual({ + expect(ldc.allFlags()).toEqual({ 'dev-test-flag': true, 'easter-i-tunes-special': false, 'easter-specials': 'no specials', @@ -465,18 +529,24 @@ describe('sdk-client storage', () => { version: defaultPutResponse['dev-test-flag'].version + 1, }; - const allFlags = await identifyGetAllFlags( - false, - defaultPutResponse, - undefined, - deleteResponse, + const putEvents = [{ data: JSON.stringify(defaultPutResponse) }]; + const deleteEvents = [{ data: JSON.stringify(deleteResponse) }]; + mockPlatform.requests.createEventSource.mockImplementation( + (streamUri: string = '', options: any = {}) => { + mockEventSource = new MockEventSource(streamUri, options); + mockEventSource.simulateEvents('put', putEvents); + mockEventSource.simulateEvents('delete', deleteEvents); + return mockEventSource; + }, ); - // wait for async code to resolve promises + const changePromise = onChangePromise(); + await ldc.identify(context); + await changePromise; await jest.runAllTimersAsync(); const flagsInStorage = JSON.parse(mockPlatform.storage.set.mock.lastCall[1]) as Flags; - expect(allFlags).not.toHaveProperty('dev-test-flag'); + expect(ldc.allFlags()).not.toHaveProperty('dev-test-flag'); expect(mockPlatform.storage.set).toHaveBeenNthCalledWith( 4, flagStorageKey, @@ -493,16 +563,25 @@ describe('sdk-client storage', () => { version: defaultPutResponse['dev-test-flag'].version, }; - const allFlags = await identifyGetAllFlags( - false, - defaultPutResponse, - undefined, - deleteResponse, - false, + const putEvents = [{ data: JSON.stringify(defaultPutResponse) }]; + const deleteEvents = [{ data: JSON.stringify(deleteResponse) }]; + mockPlatform.requests.createEventSource.mockImplementation( + (streamUri: string = '', options: any = {}) => { + mockEventSource = new MockEventSource(streamUri, options); + mockEventSource.simulateEvents('put', putEvents); + mockEventSource.simulateEvents('delete', deleteEvents); + return mockEventSource; + }, ); - expect(allFlags).toHaveProperty('dev-test-flag'); - expect(mockPlatform.storage.set).toHaveBeenCalledTimes(0); + const changePromise = onChangePromise(); + await ldc.identify(context); + await changePromise; + await jest.runAllTimersAsync(); + + expect(ldc.allFlags()).toHaveProperty('dev-test-flag'); + // the initial put is resulting in two sets, one for the index and one for the flag data + expect(mockPlatform.storage.set).toHaveBeenCalledTimes(2); expect(emitter.emit).not.toHaveBeenCalledWith('change'); }); @@ -512,16 +591,25 @@ describe('sdk-client storage', () => { version: defaultPutResponse['dev-test-flag'].version - 1, }; - const allFlags = await identifyGetAllFlags( - false, - defaultPutResponse, - undefined, - deleteResponse, - false, + const putEvents = [{ data: JSON.stringify(defaultPutResponse) }]; + const deleteEvents = [{ data: JSON.stringify(deleteResponse) }]; + mockPlatform.requests.createEventSource.mockImplementation( + (streamUri: string = '', options: any = {}) => { + mockEventSource = new MockEventSource(streamUri, options); + mockEventSource.simulateEvents('put', putEvents); + mockEventSource.simulateEvents('delete', deleteEvents); + return mockEventSource; + }, ); - expect(allFlags).toHaveProperty('dev-test-flag'); - expect(mockPlatform.storage.set).toHaveBeenCalledTimes(0); + const changePromise = onChangePromise(); + await ldc.identify(context); + await changePromise; + await jest.runAllTimersAsync(); + + expect(ldc.allFlags()).toHaveProperty('dev-test-flag'); + // the initial put is resulting in two sets, one for the index and one for the flag data + expect(mockPlatform.storage.set).toHaveBeenCalledTimes(2); expect(emitter.emit).not.toHaveBeenCalledWith('change'); }); @@ -531,9 +619,20 @@ describe('sdk-client storage', () => { version: 1, }; - await identifyGetAllFlags(false, defaultPutResponse, undefined, deleteResponse, false); + const putEvents = [{ data: JSON.stringify(defaultPutResponse) }]; + const deleteEvents = [{ data: JSON.stringify(deleteResponse) }]; + mockPlatform.requests.createEventSource.mockImplementation( + (streamUri: string = '', options: any = {}) => { + mockEventSource = new MockEventSource(streamUri, options); + mockEventSource.simulateEvents('put', putEvents); + mockEventSource.simulateEvents('delete', deleteEvents); + return mockEventSource; + }, + ); - // wait for async code to resolve promises + const changePromise = onChangePromise(); + await ldc.identify(context); + await changePromise; await jest.runAllTimersAsync(); const flagsInStorage = JSON.parse(mockPlatform.storage.set.mock.lastCall[1]) as Flags; diff --git a/packages/shared/sdk-client/__tests__/LDClientImpl.test.ts b/packages/shared/sdk-client/__tests__/LDClientImpl.test.ts index 648be2766..4e732d114 100644 --- a/packages/shared/sdk-client/__tests__/LDClientImpl.test.ts +++ b/packages/shared/sdk-client/__tests__/LDClientImpl.test.ts @@ -1,28 +1,10 @@ -import { AutoEnvAttributes, clone, Hasher, LDContext } from '@launchdarkly/js-sdk-common'; -import { - createBasicPlatform, - createLogger, - MockStreamingProcessor, - setupMockStreamingProcessor, -} from '@launchdarkly/private-js-mocks'; +import { AutoEnvAttributes, clone, Encoding, Hasher, LDContext } from '@launchdarkly/js-sdk-common'; +import { createBasicPlatform, createLogger } from '@launchdarkly/private-js-mocks'; import LDClientImpl from '../src/LDClientImpl'; import { Flags } from '../src/types'; import * as mockResponseJson from './evaluation/mockResponse.json'; - -jest.mock('@launchdarkly/js-sdk-common', () => { - const actual = jest.requireActual('@launchdarkly/js-sdk-common'); - const actualMock = jest.requireActual('@launchdarkly/private-js-mocks'); - return { - ...actual, - ...{ - internal: { - ...actual.internal, - StreamingProcessor: actualMock.MockStreamingProcessor, - }, - }, - }; -}); +import { MockEventSource } from './streaming/LDClientImpl.mocks'; const testSdkKey = 'test-sdk-key'; const context: LDContext = { kind: 'org', key: 'Testy Pizza' }; @@ -41,8 +23,11 @@ const autoEnv = { os: { name: 'An OS', version: '1.0.1', family: 'orange' }, }, }; + describe('sdk-client object', () => { let ldc: LDClientImpl; + let mockEventSource: MockEventSource; + let simulatedEvents: { data?: any }[] = []; let defaultPutResponse: Flags; let mockPlatform: ReturnType; let logger: ReturnType; @@ -51,7 +36,6 @@ describe('sdk-client object', () => { mockPlatform = createBasicPlatform(); logger = createLogger(); defaultPutResponse = clone(mockResponseJson); - setupMockStreamingProcessor(false, defaultPutResponse); mockPlatform.crypto.randomUUID.mockReturnValue('random1'); const hasher = { update: jest.fn((): Hasher => hasher), @@ -59,13 +43,33 @@ describe('sdk-client object', () => { }; mockPlatform.crypto.createHash.mockReturnValue(hasher); + jest.spyOn(LDClientImpl.prototype as any, 'getStreamingPaths').mockReturnValue({ + pathGet(_encoding: Encoding, _plainContextString: string): string { + return '/stream/path/get'; + }, + pathReport(_encoding: Encoding, _plainContextString: string): string { + return '/stream/path/report'; + }, + }); + + simulatedEvents = [{ data: JSON.stringify(defaultPutResponse) }]; + mockPlatform.requests.getEventSourceCapabilities.mockImplementation(() => ({ + readTimeout: true, + headers: true, + customMethod: true, + })); + mockPlatform.requests.createEventSource.mockImplementation( + (streamUri: string = '', options: any = {}) => { + mockEventSource = new MockEventSource(streamUri, options); + mockEventSource.simulateEvents('put', simulatedEvents); + return mockEventSource; + }, + ); + ldc = new LDClientImpl(testSdkKey, AutoEnvAttributes.Enabled, mockPlatform, { logger, sendEvents: false, }); - jest - .spyOn(LDClientImpl.prototype as any, 'createStreamUriPath') - .mockReturnValue('/stream/path'); }); afterEach(async () => { @@ -90,11 +94,18 @@ describe('sdk-client object', () => { }); test('identify success', async () => { - defaultPutResponse['dev-test-flag'].value = false; const carContext: LDContext = { kind: 'car', key: 'test-car' }; mockPlatform.crypto.randomUUID.mockReturnValue('random1'); + // need reference within test to run assertions against + const mockCreateEventSource = jest.fn((streamUri: string = '', options: any = {}) => { + mockEventSource = new MockEventSource(streamUri, options); + mockEventSource.simulateEvents('put', [{ data: JSON.stringify(defaultPutResponse) }]); + return mockEventSource; + }); + mockPlatform.requests.createEventSource = mockCreateEventSource; + await ldc.identify(carContext); const c = ldc.getContext(); const all = ldc.allFlags(); @@ -105,21 +116,25 @@ describe('sdk-client object', () => { ...autoEnv, }); expect(all).toMatchObject({ - 'dev-test-flag': false, + 'dev-test-flag': true, }); - expect(MockStreamingProcessor).toHaveBeenCalledWith( - expect.anything(), - '/stream/path', - expect.anything(), - expect.anything(), - expect.anything(), - undefined, + expect(mockCreateEventSource).toHaveBeenCalledWith( + expect.stringContaining('/stream/path'), expect.anything(), ); }); test('identify success withReasons', async () => { const carContext: LDContext = { kind: 'car', key: 'test-car' }; + + // need reference within test to run assertions against + const mockCreateEventSource = jest.fn((streamUri: string = '', options: any = {}) => { + mockEventSource = new MockEventSource(streamUri, options); + mockEventSource.simulateEvents('put', [{ data: JSON.stringify(defaultPutResponse) }]); + return mockEventSource; + }); + mockPlatform.requests.createEventSource = mockCreateEventSource; + ldc = new LDClientImpl(testSdkKey, AutoEnvAttributes.Enabled, mockPlatform, { logger, sendEvents: false, @@ -128,19 +143,41 @@ describe('sdk-client object', () => { await ldc.identify(carContext); - expect(MockStreamingProcessor).toHaveBeenCalledWith( - expect.anything(), - '/stream/path', - [{ key: 'withReasons', value: 'true' }], - expect.anything(), + expect(mockCreateEventSource).toHaveBeenCalledWith( + expect.stringContaining('?withReasons=true'), expect.anything(), - undefined, + ); + }); + + test('identify success useReport', async () => { + const carContext: LDContext = { kind: 'car', key: 'test-car' }; + + // need reference within test to run assertions against + const mockCreateEventSource = jest.fn((streamUri: string = '', options: any = {}) => { + mockEventSource = new MockEventSource(streamUri, options); + mockEventSource.simulateEvents('put', [{ data: JSON.stringify(defaultPutResponse) }]); + return mockEventSource; + }); + mockPlatform.requests.createEventSource = mockCreateEventSource; + + ldc = new LDClientImpl(testSdkKey, AutoEnvAttributes.Enabled, mockPlatform, { + logger, + sendEvents: false, + useReport: true, + }); + + await ldc.identify(carContext); + + expect(mockCreateEventSource).toHaveBeenCalledWith( + expect.stringContaining('/stream/path/report'), expect.anything(), ); }); test('identify success without auto env', async () => { defaultPutResponse['dev-test-flag'].value = false; + simulatedEvents = [{ data: JSON.stringify(defaultPutResponse) }]; + const carContext: LDContext = { kind: 'car', key: 'test-car' }; ldc = new LDClientImpl(testSdkKey, AutoEnvAttributes.Disabled, mockPlatform, { logger, @@ -159,6 +196,8 @@ describe('sdk-client object', () => { test('identify anonymous', async () => { defaultPutResponse['dev-test-flag'].value = false; + simulatedEvents = [{ data: JSON.stringify(defaultPutResponse) }]; + const carContext: LDContext = { kind: 'car', anonymous: true, key: '' }; mockPlatform.crypto.randomUUID.mockReturnValue('random1'); @@ -186,7 +225,14 @@ describe('sdk-client object', () => { }); test('identify error stream error', async () => { - setupMockStreamingProcessor(true); + mockPlatform.requests.createEventSource.mockImplementation( + (streamUri: string = '', options: any = {}) => { + mockEventSource = new MockEventSource(streamUri, options); + mockEventSource.simulateError({ status: 404, message: 'test-error' }); + return mockEventSource; + }, + ); + const carContext: LDContext = { kind: 'car', key: 'test-car' }; await expect(ldc.identify(carContext)).rejects.toThrow('test-error'); diff --git a/packages/shared/sdk-client/__tests__/LDClientImpl.timeout.test.ts b/packages/shared/sdk-client/__tests__/LDClientImpl.timeout.test.ts index c37cc0d86..bfe2a5df7 100644 --- a/packages/shared/sdk-client/__tests__/LDClientImpl.timeout.test.ts +++ b/packages/shared/sdk-client/__tests__/LDClientImpl.timeout.test.ts @@ -1,14 +1,11 @@ -import { AutoEnvAttributes, clone, LDContext } from '@launchdarkly/js-sdk-common'; -import { - createBasicPlatform, - createLogger, - setupMockStreamingProcessor, -} from '@launchdarkly/private-js-mocks'; +import { AutoEnvAttributes, clone, Encoding, LDContext } from '@launchdarkly/js-sdk-common'; +import { createBasicPlatform, createLogger } from '@launchdarkly/private-js-mocks'; import { toMulti } from '../src/context/addAutoEnv'; import LDClientImpl from '../src/LDClientImpl'; import { Flags } from '../src/types'; import * as mockResponseJson from './evaluation/mockResponse.json'; +import { MockEventSource } from './streaming/LDClientImpl.mocks'; let mockPlatform: ReturnType; let logger: ReturnType; @@ -18,24 +15,12 @@ beforeEach(() => { logger = createLogger(); }); -jest.mock('@launchdarkly/js-sdk-common', () => { - const actual = jest.requireActual('@launchdarkly/js-sdk-common'); - const m = jest.requireActual('@launchdarkly/private-js-mocks'); - return { - ...actual, - ...{ - internal: { - ...actual.internal, - StreamingProcessor: m.MockStreamingProcessor, - }, - }, - }; -}); - const testSdkKey = 'test-sdk-key'; const carContext: LDContext = { kind: 'car', key: 'test-car' }; let ldc: LDClientImpl; +let mockEventSource: MockEventSource; +let simulatedEvents: { data?: any }[] = []; let defaultPutResponse: Flags; const DEFAULT_IDENTIFY_TIMEOUT = 5; @@ -48,30 +33,38 @@ describe('sdk-client identify timeout', () => { beforeEach(() => { defaultPutResponse = clone(mockResponseJson); - // simulate streaming error after a long timeout - setupMockStreamingProcessor(true, defaultPutResponse, undefined, undefined, 30); + mockPlatform.requests.createEventSource.mockImplementation( + (streamUri: string = '', options: any = {}) => { + mockEventSource = new MockEventSource(streamUri, options); + mockEventSource.simulateEvents('put', simulatedEvents); + return mockEventSource; + }, + ); ldc = new LDClientImpl(testSdkKey, AutoEnvAttributes.Enabled, mockPlatform, { logger, sendEvents: false, }); - jest - .spyOn(LDClientImpl.prototype as any, 'createStreamUriPath') - .mockReturnValue('/stream/path'); + jest.spyOn(LDClientImpl.prototype as any, 'getStreamingPaths').mockReturnValue({ + pathGet(_encoding: Encoding, _plainContextString: string): string { + return '/stream/path'; + }, + pathReport(_encoding: Encoding, _plainContextString: string): string { + return '/stream/path'; + }, + }); }); afterEach(() => { jest.resetAllMocks(); }); - // streaming is setup to error in beforeEach to cause a timeout test('rejects with default timeout of 5s', async () => { jest.advanceTimersByTimeAsync(DEFAULT_IDENTIFY_TIMEOUT * 1000).then(); await expect(ldc.identify(carContext)).rejects.toThrow(/identify timed out/); expect(logger.error).toHaveBeenCalledWith(expect.stringMatching(/identify timed out/)); }); - // streaming is setup to error in beforeEach to cause a timeout test('rejects with custom timeout', async () => { const timeout = 15; jest.advanceTimersByTimeAsync(timeout * 1000).then(); @@ -79,7 +72,9 @@ describe('sdk-client identify timeout', () => { }); test('resolves with default timeout', async () => { - setupMockStreamingProcessor(false, defaultPutResponse); + // set simulated events to be default response + simulatedEvents = [{ data: JSON.stringify(defaultPutResponse) }]; + jest.advanceTimersByTimeAsync(DEFAULT_IDENTIFY_TIMEOUT * 1000).then(); await expect(ldc.identify(carContext)).resolves.toBeUndefined(); @@ -99,7 +94,10 @@ describe('sdk-client identify timeout', () => { test('resolves with custom timeout', async () => { const timeout = 15; - setupMockStreamingProcessor(false, defaultPutResponse); + + // set simulated events to be default response + simulatedEvents = [{ data: JSON.stringify(defaultPutResponse) }]; + jest.advanceTimersByTimeAsync(timeout).then(); await expect(ldc.identify(carContext, { timeout })).resolves.toBeUndefined(); @@ -119,7 +117,10 @@ describe('sdk-client identify timeout', () => { test('setting high timeout threshold with internalOptions', async () => { const highTimeoutThreshold = 20; - setupMockStreamingProcessor(false, defaultPutResponse); + + // set simulated events to be default response + simulatedEvents = [{ data: JSON.stringify(defaultPutResponse) }]; + ldc = new LDClientImpl( testSdkKey, AutoEnvAttributes.Enabled, @@ -139,7 +140,10 @@ describe('sdk-client identify timeout', () => { test('warning when timeout is too high', async () => { const highTimeout = 60; - setupMockStreamingProcessor(false, defaultPutResponse); + + // set simulated events to be default response + simulatedEvents = [{ data: JSON.stringify(defaultPutResponse) }]; + jest.advanceTimersByTimeAsync(highTimeout * 1000).then(); await ldc.identify(carContext, { timeout: highTimeout }); @@ -148,7 +152,9 @@ describe('sdk-client identify timeout', () => { }); test('safe timeout should not warn', async () => { - setupMockStreamingProcessor(false, defaultPutResponse); + // set simulated events to be default response + simulatedEvents = [{ data: JSON.stringify(defaultPutResponse) }]; + jest.advanceTimersByTimeAsync(DEFAULT_IDENTIFY_TIMEOUT * 1000).then(); await ldc.identify(carContext, { timeout: DEFAULT_IDENTIFY_TIMEOUT }); diff --git a/packages/shared/sdk-client/__tests__/LDClientImpl.variation.test.ts b/packages/shared/sdk-client/__tests__/LDClientImpl.variation.test.ts index 384dcf0ce..664a54ca2 100644 --- a/packages/shared/sdk-client/__tests__/LDClientImpl.variation.test.ts +++ b/packages/shared/sdk-client/__tests__/LDClientImpl.variation.test.ts @@ -1,13 +1,16 @@ -import { AutoEnvAttributes, clone, Context, LDContext } from '@launchdarkly/js-sdk-common'; import { - createBasicPlatform, - createLogger, - setupMockStreamingProcessor, -} from '@launchdarkly/private-js-mocks'; + AutoEnvAttributes, + clone, + Context, + Encoding, + LDContext, +} from '@launchdarkly/js-sdk-common'; +import { createBasicPlatform, createLogger } from '@launchdarkly/private-js-mocks'; import LDClientImpl from '../src/LDClientImpl'; import { Flags } from '../src/types'; import * as mockResponseJson from './evaluation/mockResponse.json'; +import { MockEventSource } from './streaming/LDClientImpl.mocks'; let mockPlatform: ReturnType; let logger: ReturnType; @@ -17,37 +20,39 @@ beforeEach(() => { logger = createLogger(); }); -jest.mock('@launchdarkly/js-sdk-common', () => { - const actual = jest.requireActual('@launchdarkly/js-sdk-common'); - const actualMock = jest.requireActual('@launchdarkly/private-js-mocks'); - return { - ...actual, - ...{ - internal: { - ...actual.internal, - StreamingProcessor: actualMock.MockStreamingProcessor, - }, - }, - }; -}); - const testSdkKey = 'test-sdk-key'; const context: LDContext = { kind: 'org', key: 'Testy Pizza' }; let ldc: LDClientImpl; +let mockEventSource: MockEventSource; +let simulatedEvents: { data?: any }[] = []; let defaultPutResponse: Flags; describe('sdk-client object', () => { beforeEach(() => { defaultPutResponse = clone(mockResponseJson); - setupMockStreamingProcessor(false, defaultPutResponse); + jest.spyOn(LDClientImpl.prototype as any, 'getStreamingPaths').mockReturnValue({ + pathGet(_encoding: Encoding, _plainContextString: string): string { + return '/stream/path'; + }, + pathReport(_encoding: Encoding, _plainContextString: string): string { + return '/stream/path'; + }, + }); + + simulatedEvents = [{ data: JSON.stringify(defaultPutResponse) }]; + mockPlatform.requests.createEventSource.mockImplementation( + (streamUri: string = '', options: any = {}) => { + mockEventSource = new MockEventSource(streamUri, options); + mockEventSource.simulateEvents('put', simulatedEvents); + return mockEventSource; + }, + ); + ldc = new LDClientImpl(testSdkKey, AutoEnvAttributes.Disabled, mockPlatform, { logger, sendEvents: false, }); - jest - .spyOn(LDClientImpl.prototype as any, 'createStreamUriPath') - .mockReturnValue('/stream/path'); }); afterEach(() => { @@ -57,7 +62,6 @@ describe('sdk-client object', () => { test('variation', async () => { await ldc.identify(context); const devTestFlag = ldc.variation('dev-test-flag'); - expect(devTestFlag).toBe(true); }); @@ -67,14 +71,11 @@ describe('sdk-client object', () => { ldc.on('error', errorListener); const p = ldc.identify(context); - setTimeout(() => { - // call variation in the next tick to give ldc a chance to hook up event emitter - ldc.variation('does-not-exist', 'not-found'); - }); + ldc.variation('does-not-exist', 'not-found'); await expect(p).resolves.toBeUndefined(); - const error = errorListener.mock.calls[0][1]; expect(errorListener).toHaveBeenCalledTimes(1); + const error = errorListener.mock.calls[0][1]; expect(error.message).toMatch(/unknown feature/i); }); diff --git a/packages/shared/sdk-client/__tests__/polling/PollingProcessor.test.ts b/packages/shared/sdk-client/__tests__/polling/PollingProcessor.test.ts index 71ca351ed..593372d83 100644 --- a/packages/shared/sdk-client/__tests__/polling/PollingProcessor.test.ts +++ b/packages/shared/sdk-client/__tests__/polling/PollingProcessor.test.ts @@ -1,6 +1,7 @@ import { waitFor } from '@testing-library/dom'; import { + Encoding, EventSource, EventSourceCapabilities, EventSourceInitDict, @@ -8,7 +9,8 @@ import { Response, } from '@launchdarkly/js-sdk-common'; -import PollingProcessor, { PollingConfig } from '../../src/polling/PollingProcessor'; +import PollingProcessor from '../../src/polling/PollingProcessor'; +import { PollingDataSourceConfig } from '../../src/streaming'; function mockResponse(value: string, statusCode: number) { const response: Response = { @@ -53,26 +55,42 @@ function makeRequests(): Requests { }; } -function makeConfig(config?: { pollInterval?: number; useReport?: boolean }): PollingConfig { +function makeEncoding(): Encoding { return { - pollInterval: config?.pollInterval ?? 60 * 5, - // eslint-disable-next-line no-console - logger: { - error: jest.fn(), - warn: jest.fn(), - info: jest.fn(), - debug: jest.fn(), - }, - tags: {}, - useReport: config?.useReport ?? false, - serviceEndpoints: { - streaming: '', - polling: 'http://example.example.example', - events: '', - analyticsEventPath: '', - diagnosticEventPath: '', - includeAuthorizationHeader: false, + btoa: jest.fn(), + }; +} + +const serviceEndpoints = { + events: 'mockEventsEndpoint', + polling: 'mockPollingEndpoint', + streaming: 'mockStreamingEndpoint', + diagnosticEventPath: '/diagnostic', + analyticsEventPath: '/bulk', + includeAuthorizationHeader: true, + payloadFilterKey: 'testPayloadFilterKey', +}; + +function makeConfig( + pollInterval: number, + withReasons: boolean, + useReport: boolean, +): PollingDataSourceConfig { + return { + credential: 'the-sdk-key', + serviceEndpoints, + paths: { + pathGet(_encoding: Encoding, _plainContextString: string): string { + return '/poll/path/get'; + }, + pathReport(_encoding: Encoding, _plainContextString: string): string { + return '/poll/path/report'; + }, }, + baseHeaders: {}, + withReasons, + useReport, + pollInterval, }; } @@ -80,11 +98,10 @@ it('makes no requests until it is started', () => { const requests = makeRequests(); // eslint-disable-next-line no-new new PollingProcessor( + 'mockContextString', + makeConfig(1, true, false), requests, - '/polling', - [], - makeConfig(), - {}, + makeEncoding(), (_flags) => {}, (_error) => {}, ); @@ -96,11 +113,10 @@ it('polls immediately when started', () => { const requests = makeRequests(); const polling = new PollingProcessor( + 'mockContextString', + makeConfig(1, true, false), requests, - '/polling', - [], - makeConfig(), - {}, + makeEncoding(), (_flags) => {}, (_error) => {}, ); @@ -116,11 +132,10 @@ it('calls callback on success', async () => { const errorCallback = jest.fn(); const polling = new PollingProcessor( + 'mockContextString', + makeConfig(1000, true, false), requests, - '/polling', - [], - makeConfig(), - {}, + makeEncoding(), dataCallback, errorCallback, ); @@ -137,11 +152,10 @@ it('polls repeatedly', async () => { requests.fetch = mockFetch('{ "flagA": true }', 200); const polling = new PollingProcessor( + 'mockContextString', + makeConfig(0.1, true, false), requests, - '/polling', - [], - makeConfig({ pollInterval: 0.1 }), - {}, + makeEncoding(), dataCallback, errorCallback, ); @@ -175,11 +189,10 @@ it('stops polling when stopped', (done) => { const errorCallback = jest.fn(); const polling = new PollingProcessor( + 'mockContextString', + makeConfig(0.01, true, false), requests, - '/stops', - [], - makeConfig({ pollInterval: 0.01 }), - {}, + makeEncoding(), dataCallback, errorCallback, ); @@ -196,15 +209,17 @@ it('stops polling when stopped', (done) => { it('includes the correct headers on requests', () => { const requests = makeRequests(); + const config = makeConfig(1, true, false); + config.baseHeaders = { + authorization: 'the-sdk-key', + 'user-agent': 'AnSDK/42', + }; + const polling = new PollingProcessor( + 'mockContextString', + config, requests, - '/polling', - [], - makeConfig(), - { - authorization: 'the-sdk-key', - 'user-agent': 'AnSDK/42', - }, + makeEncoding(), (_flags) => {}, (_error) => {}, ); @@ -222,15 +237,14 @@ it('includes the correct headers on requests', () => { polling.stop(); }); -it('defaults to using the "GET" verb', () => { +it('defaults to using the "GET" method', () => { const requests = makeRequests(); const polling = new PollingProcessor( + 'mockContextString', + makeConfig(1000, true, false), requests, - '/polling', - [], - makeConfig(), - {}, + makeEncoding(), (_flags) => {}, (_error) => {}, ); @@ -240,20 +254,20 @@ it('defaults to using the "GET" verb', () => { expect.anything(), expect.objectContaining({ method: 'GET', + body: undefined, }), ); polling.stop(); }); -it('can be configured to use the "REPORT" verb', () => { +it('can be configured to use the "REPORT" method', () => { const requests = makeRequests(); const polling = new PollingProcessor( + 'mockContextString', + makeConfig(1000, true, true), requests, - '/polling', - [], - makeConfig({ useReport: true }), - {}, + makeEncoding(), (_flags) => {}, (_error) => {}, ); @@ -263,6 +277,10 @@ it('can be configured to use the "REPORT" verb', () => { expect.anything(), expect.objectContaining({ method: 'REPORT', + headers: expect.objectContaining({ + 'content-type': 'application/json', + }), + body: 'mockContextString', }), ); polling.stop(); @@ -272,16 +290,21 @@ it('continues polling after receiving bad JSON', async () => { const requests = makeRequests(); const dataCallback = jest.fn(); const errorCallback = jest.fn(); - const config = makeConfig({ pollInterval: 0.1 }); + const logger = { + error: jest.fn(), + warn: jest.fn(), + info: jest.fn(), + debug: jest.fn(), + }; const polling = new PollingProcessor( + 'mockContextString', + makeConfig(0.1, true, false), requests, - '/polling', - [], - config, - {}, + makeEncoding(), dataCallback, errorCallback, + logger, ); polling.start(); @@ -292,7 +315,7 @@ it('continues polling after receiving bad JSON', async () => { requests.fetch = mockFetch('{ham', 200); await waitFor(() => expect(requests.fetch).toHaveBeenCalled()); await waitFor(() => expect(errorCallback).toHaveBeenCalled()); - expect(config.logger.error).toHaveBeenCalledWith('Polling received invalid data'); + expect(logger.error).toHaveBeenCalledWith('Polling received invalid data'); polling.stop(); }); @@ -300,16 +323,21 @@ it('continues polling after an exception thrown during a request', async () => { const requests = makeRequests(); const dataCallback = jest.fn(); const errorCallback = jest.fn(); - const config = makeConfig({ pollInterval: 0.1 }); + const logger = { + error: jest.fn(), + warn: jest.fn(), + info: jest.fn(), + debug: jest.fn(), + }; const polling = new PollingProcessor( + 'mockContextString', + makeConfig(0.1, true, false), requests, - '/polling', - [], - config, - {}, + makeEncoding(), dataCallback, errorCallback, + logger, ); polling.start(); @@ -322,7 +350,7 @@ it('continues polling after an exception thrown during a request', async () => { }); await waitFor(() => expect(requests.fetch).toHaveBeenCalled()); polling.stop(); - expect(config.logger.error).toHaveBeenCalledWith( + expect(logger.error).toHaveBeenCalledWith( 'Received I/O error (bad) for polling request - will retry', ); }); @@ -331,16 +359,21 @@ it('can handle recoverable http errors', async () => { const requests = makeRequests(); const dataCallback = jest.fn(); const errorCallback = jest.fn(); - const config = makeConfig({ pollInterval: 0.1 }); + const logger = { + error: jest.fn(), + warn: jest.fn(), + info: jest.fn(), + debug: jest.fn(), + }; const polling = new PollingProcessor( + 'mockContextString', + makeConfig(0.1, true, false), requests, - '/polling', - [], - config, - {}, + makeEncoding(), dataCallback, errorCallback, + logger, ); polling.start(); @@ -351,25 +384,28 @@ it('can handle recoverable http errors', async () => { requests.fetch = mockFetch('', 408); await waitFor(() => expect(requests.fetch).toHaveBeenCalled()); polling.stop(); - expect(config.logger.error).toHaveBeenCalledWith( - 'Received error 408 for polling request - will retry', - ); + expect(logger.error).toHaveBeenCalledWith('Received error 408 for polling request - will retry'); }); it('stops polling on unrecoverable error codes', (done) => { const requests = makeRequests(); const dataCallback = jest.fn(); const errorCallback = jest.fn(); - const config = makeConfig({ pollInterval: 0.01 }); + const logger = { + error: jest.fn(), + warn: jest.fn(), + info: jest.fn(), + debug: jest.fn(), + }; const polling = new PollingProcessor( + 'mockContextString', + makeConfig(0.01, true, false), requests, - '/polling', - [], - config, - {}, + makeEncoding(), dataCallback, errorCallback, + logger, ); polling.start(); @@ -378,7 +414,7 @@ it('stops polling on unrecoverable error codes', (done) => { // Polling should stop on the 401, but we need to give some time for more // polls to be done. setTimeout(() => { - expect(config.logger.error).toHaveBeenCalledWith( + expect(logger.error).toHaveBeenCalledWith( 'Received error 401 (invalid SDK key) for polling request - giving up permanently', ); expect(requests.fetch).toHaveBeenCalledTimes(1); diff --git a/packages/shared/sdk-client/__tests__/streaming/LDClientImpl.mocks.ts b/packages/shared/sdk-client/__tests__/streaming/LDClientImpl.mocks.ts new file mode 100644 index 000000000..c8e04abf6 --- /dev/null +++ b/packages/shared/sdk-client/__tests__/streaming/LDClientImpl.mocks.ts @@ -0,0 +1,50 @@ +import { EventSource, EventSourceInitDict } from '@launchdarkly/js-sdk-common'; + +export class MockEventSource implements EventSource { + eventsByType: Map = new Map(); + + handlers: Record void> = {}; + + closed = false; + + url: string; + + options: EventSourceInitDict; + + constructor(url: string, options: EventSourceInitDict) { + this.url = url; + this.options = options; + } + + onclose: (() => void) | undefined; + + onerror: (() => void) | undefined; + + onopen: (() => void) | undefined; + + onretrying: ((e: { delayMillis: number }) => void) | undefined; + + addEventListener(type: string, listener: (event?: { data?: any }) => void): void { + this.handlers[type] = listener; + + // replay events to listener + (this.eventsByType.get(type) ?? []).forEach((event) => { + listener(event); + }); + } + + close(): void { + this.closed = true; + } + + simulateEvents(type: string, events: { data?: any }[]) { + this.eventsByType.set(type, events); + } + + simulateError(error: { status: number; message: string }) { + const shouldRetry = this.options.errorFilter(error); + if (!shouldRetry) { + this.closed = true; + } + } +} diff --git a/packages/shared/sdk-client/__tests__/streaming/StreamingProcessor.test.ts b/packages/shared/sdk-client/__tests__/streaming/StreamingProcessor.test.ts new file mode 100644 index 000000000..fcd4a96d6 --- /dev/null +++ b/packages/shared/sdk-client/__tests__/streaming/StreamingProcessor.test.ts @@ -0,0 +1,344 @@ +import { + defaultHeaders, + Encoding, + EventName, + Info, + internal, + LDStreamingError, + Platform, + ProcessStreamResponse, +} from '@launchdarkly/js-sdk-common'; +import { createBasicPlatform, createLogger } from '@launchdarkly/private-js-mocks'; + +import { StreamingDataSourceConfig, StreamingProcessor } from '../../src/streaming'; + +let logger: ReturnType; + +const serviceEndpoints = { + events: '', + polling: '', + streaming: 'https://mockstream.ld.com', + diagnosticEventPath: '/diagnostic', + analyticsEventPath: '/bulk', + includeAuthorizationHeader: true, + payloadFilterKey: 'testPayloadFilterKey', +}; + +const dateNowString = '2023-08-10'; +const sdkKey = 'my-sdk-key'; +const event = { + data: { + flags: { + flagkey: { key: 'flagkey', version: 1 }, + }, + segments: { + segkey: { key: 'segkey', version: 2 }, + }, + }, +}; + +let basicPlatform: Platform; + +function getStreamingDataSourceConfig( + withReasons: boolean = false, + useReport: boolean = false, +): StreamingDataSourceConfig { + return { + credential: sdkKey, + // eslint-disable-next-line object-shorthand + serviceEndpoints: serviceEndpoints, + paths: { + pathGet(_encoding: Encoding, _plainContextString: string): string { + return '/stream/path/get'; + }, + pathReport(_encoding: Encoding, _plainContextString: string): string { + return '/stream/path/report'; + }, + }, + baseHeaders: { + authorization: 'my-sdk-key', + 'user-agent': 'TestUserAgent/2.0.2', + 'x-launchdarkly-wrapper': 'Rapper/1.2.3', + }, + initialRetryDelayMillis: 1000, + withReasons, + useReport, + }; +} + +beforeEach(() => { + basicPlatform = createBasicPlatform(); + logger = createLogger(); +}); + +const createMockEventSource = (streamUri: string = '', options: any = {}) => ({ + streamUri, + options, + onclose: jest.fn(), + addEventListener: jest.fn(), + close: jest.fn(), +}); + +describe('given a stream processor', () => { + let info: Info; + let streamingProcessor: StreamingProcessor; + let diagnosticsManager: internal.DiagnosticsManager; + let listeners: Map; + let mockEventSource: any; + let mockListener: ProcessStreamResponse; + let mockErrorHandler: jest.Mock; + let simulatePutEvent: (e?: any) => void; + let simulateError: (e: { status: number; message: string }) => boolean; + + beforeAll(() => { + jest.useFakeTimers(); + jest.setSystemTime(new Date(dateNowString)); + }); + + afterAll(() => { + jest.useRealTimers(); + }); + + beforeEach(() => { + mockErrorHandler = jest.fn(); + + info = basicPlatform.info; + + basicPlatform.requests = { + createEventSource: jest.fn((streamUri: string, options: any) => { + mockEventSource = createMockEventSource(streamUri, options); + return mockEventSource; + }), + getEventSourceCapabilities: jest.fn(() => ({ + readTimeout: true, + headers: true, + customMethod: true, + })), + } as any; + simulatePutEvent = (e: any = event) => { + mockEventSource.addEventListener.mock.calls[0][1](e); + }; + simulateError = (e: { status: number; message: string }): boolean => + mockEventSource.options.errorFilter(e); + + listeners = new Map(); + mockListener = { + deserializeData: jest.fn((data) => data), + processJson: jest.fn(), + }; + listeners.set('put', mockListener); + listeners.set('patch', mockListener); + + diagnosticsManager = new internal.DiagnosticsManager(sdkKey, basicPlatform, {}); + + streamingProcessor = new StreamingProcessor( + 'mockContextString', + getStreamingDataSourceConfig(), + listeners, + basicPlatform.requests, + basicPlatform.encoding!, + diagnosticsManager, + mockErrorHandler, + logger, + ); + + jest.spyOn(streamingProcessor, 'stop'); + streamingProcessor.start(); + }); + + afterEach(() => { + streamingProcessor.close(); + jest.resetAllMocks(); + }); + + it('uses expected uri and eventSource init args', () => { + expect(basicPlatform.requests.createEventSource).toBeCalledWith( + `${serviceEndpoints.streaming}/stream/path/get?filter=testPayloadFilterKey`, + { + errorFilter: expect.any(Function), + headers: defaultHeaders(sdkKey, info, undefined), + initialRetryDelayMillis: 1000, + readTimeoutMillis: 300000, + retryResetIntervalMillis: 60000, + }, + ); + }); + + it('sets streamInitialReconnectDelay correctly', () => { + streamingProcessor = new StreamingProcessor( + 'mockContextString', + getStreamingDataSourceConfig(), + listeners, + basicPlatform.requests, + basicPlatform.encoding!, + diagnosticsManager, + mockErrorHandler, + ); + streamingProcessor.start(); + + expect(basicPlatform.requests.createEventSource).toHaveBeenLastCalledWith( + `${serviceEndpoints.streaming}/stream/path/get?filter=testPayloadFilterKey`, + { + errorFilter: expect.any(Function), + headers: defaultHeaders(sdkKey, info, undefined), + initialRetryDelayMillis: 1000, + readTimeoutMillis: 300000, + retryResetIntervalMillis: 60000, + }, + ); + }); + + it('uses the report path and modifies init dict when useReport is true ', () => { + streamingProcessor = new StreamingProcessor( + 'mockContextString', + getStreamingDataSourceConfig(true, true), + listeners, + basicPlatform.requests, + basicPlatform.encoding!, + diagnosticsManager, + mockErrorHandler, + ); + streamingProcessor.start(); + + expect(basicPlatform.requests.createEventSource).toHaveBeenLastCalledWith( + `${serviceEndpoints.streaming}/stream/path/report?withReasons=true&filter=testPayloadFilterKey`, + expect.objectContaining({ + method: 'REPORT', + body: 'mockContextString', + errorFilter: expect.any(Function), + headers: expect.objectContaining({ 'content-type': 'application/json' }), + initialRetryDelayMillis: 1000, + readTimeoutMillis: 300000, + retryResetIntervalMillis: 60000, + }), + ); + }); + + it('withReasons and payload filter coexist', () => { + streamingProcessor = new StreamingProcessor( + 'mockContextString', + getStreamingDataSourceConfig(true, false), + listeners, + basicPlatform.requests, + basicPlatform.encoding!, + diagnosticsManager, + mockErrorHandler, + ); + streamingProcessor.start(); + + expect(basicPlatform.requests.createEventSource).toHaveBeenLastCalledWith( + `${serviceEndpoints.streaming}/stream/path/get?withReasons=true&filter=testPayloadFilterKey`, + { + errorFilter: expect.any(Function), + headers: defaultHeaders(sdkKey, info, undefined), + initialRetryDelayMillis: 1000, + readTimeoutMillis: 300000, + retryResetIntervalMillis: 60000, + }, + ); + }); + + it('adds listeners', () => { + expect(mockEventSource.addEventListener).toHaveBeenNthCalledWith( + 1, + 'put', + expect.any(Function), + ); + expect(mockEventSource.addEventListener).toHaveBeenNthCalledWith( + 2, + 'patch', + expect.any(Function), + ); + }); + + it('executes listeners', () => { + simulatePutEvent(); + const patchHandler = mockEventSource.addEventListener.mock.calls[1][1]; + patchHandler(event); + + expect(mockListener.deserializeData).toBeCalledTimes(2); + expect(mockListener.processJson).toBeCalledTimes(2); + }); + + it('passes error to callback if json data is malformed', async () => { + (mockListener.deserializeData as jest.Mock).mockReturnValue(false); + simulatePutEvent(); + + expect(logger.error).toBeCalledWith(expect.stringMatching(/invalid data in "put"/)); + expect(logger.debug).toBeCalledWith(expect.stringMatching(/invalid json/i)); + expect(mockErrorHandler.mock.lastCall[0].message).toMatch(/malformed json/i); + }); + + it('calls error handler if event.data prop is missing', async () => { + simulatePutEvent({ flags: {} }); + + expect(mockListener.deserializeData).not.toBeCalled(); + expect(mockListener.processJson).not.toBeCalled(); + expect(mockErrorHandler.mock.lastCall[0].message).toMatch(/unexpected payload/i); + }); + + it('closes and stops', async () => { + streamingProcessor.close(); + + expect(streamingProcessor.stop).toBeCalled(); + expect(mockEventSource.close).toBeCalled(); + // @ts-ignore + expect(streamingProcessor.eventSource).toBeUndefined(); + }); + + it('creates a stream init event', async () => { + const startTime = Date.now(); + simulatePutEvent(); + + const diagnosticEvent = diagnosticsManager.createStatsEventAndReset(0, 0, 0); + expect(diagnosticEvent.streamInits.length).toEqual(1); + const si = diagnosticEvent.streamInits[0]; + expect(si.timestamp).toEqual(startTime); + expect(si.failed).toBeFalsy(); + expect(si.durationMillis).toBeGreaterThanOrEqual(0); + }); + + describe.each([400, 408, 429, 500, 503])('given recoverable http errors', (status) => { + it(`continues retrying after error: ${status}`, () => { + const startTime = Date.now(); + const testError = { status, message: 'retry. recoverable.' }; + const willRetry = simulateError(testError); + + expect(willRetry).toBeTruthy(); + expect(mockErrorHandler).not.toBeCalled(); + expect(logger.warn).toBeCalledWith( + expect.stringMatching(new RegExp(`${status}.*will retry`)), + ); + + const diagnosticEvent = diagnosticsManager.createStatsEventAndReset(0, 0, 0); + expect(diagnosticEvent.streamInits.length).toEqual(1); + const si = diagnosticEvent.streamInits[0]; + expect(si.timestamp).toEqual(startTime); + expect(si.failed).toBeTruthy(); + expect(si.durationMillis).toBeGreaterThanOrEqual(0); + }); + }); + + describe.each([401, 403])('given irrecoverable http errors', (status) => { + it(`stops retrying after error: ${status}`, () => { + const startTime = Date.now(); + const testError = { status, message: 'stopping. irrecoverable.' }; + const willRetry = simulateError(testError); + + expect(willRetry).toBeFalsy(); + expect(mockErrorHandler).toBeCalledWith( + new LDStreamingError(testError.message, testError.status), + ); + expect(logger.error).toBeCalledWith( + expect.stringMatching(new RegExp(`${status}.*permanently`)), + ); + + const diagnosticEvent = diagnosticsManager.createStatsEventAndReset(0, 0, 0); + expect(diagnosticEvent.streamInits.length).toEqual(1); + const si = diagnosticEvent.streamInits[0]; + expect(si.timestamp).toEqual(startTime); + expect(si.failed).toBeTruthy(); + expect(si.durationMillis).toBeGreaterThanOrEqual(0); + }); + }); +}); diff --git a/packages/shared/sdk-client/__tests__/streaming/index.ts b/packages/shared/sdk-client/__tests__/streaming/index.ts new file mode 100644 index 000000000..a29d015e8 --- /dev/null +++ b/packages/shared/sdk-client/__tests__/streaming/index.ts @@ -0,0 +1 @@ +export { MockEventSource } from './LDClientImpl.mocks'; diff --git a/packages/shared/sdk-client/src/LDClientImpl.ts b/packages/shared/sdk-client/src/LDClientImpl.ts index d5baaeb2c..984eda484 100644 --- a/packages/shared/sdk-client/src/LDClientImpl.ts +++ b/packages/shared/sdk-client/src/LDClientImpl.ts @@ -1,9 +1,9 @@ import { AutoEnvAttributes, - ClientContext, clone, Context, defaultHeaders, + Encoding, internal, LDClientError, LDContext, @@ -36,6 +36,8 @@ import FlagManager from './flag-manager/FlagManager'; import { ItemDescriptor } from './flag-manager/ItemDescriptor'; import LDEmitter, { EventName } from './LDEmitter'; import PollingProcessor from './polling/PollingProcessor'; +import { StreamingProcessor } from './streaming'; +import { DataSourcePaths } from './streaming/DataSourceConfig'; import { DeleteFlag, Flags, PatchFlag } from './types'; const { ClientMessages, ErrorKinds } = internal; @@ -57,8 +59,6 @@ export default class LDClientImpl implements LDClient { private emitter: LDEmitter; private flagManager: FlagManager; - private readonly clientContext: ClientContext; - private eventSendingEnabled: boolean = true; private networkAvailable: boolean = true; private connectionMode: ConnectionMode; @@ -84,7 +84,6 @@ export default class LDClientImpl implements LDClient { this.config = new Configuration(options, internalOptions); this.connectionMode = this.config.initialConnectionMode; - this.clientContext = new ClientContext(sdkKey, this.config, platform); this.logger = this.config.logger; this.baseHeaders = defaultHeaders( @@ -271,31 +270,34 @@ export default class LDClientImpl implements LDClient { return listeners; } - /** - * Generates the url path for streaming. - * - * @protected This function must be overridden in subclasses for streaming - * to work. - * @param _context The LDContext object - */ - protected createStreamUriPath(_context: LDContext): string { - throw new Error( - 'createStreamUriPath not implemented. Client sdks must implement createStreamUriPath for streaming to work.', - ); + protected getStreamingPaths(): DataSourcePaths { + return { + pathGet(_encoding: Encoding, _plainContextString: string): string { + throw new Error( + 'getStreamingPaths not implemented. Client sdks must implement getStreamingPaths for streaming with GET to work.', + ); + }, + pathReport(_encoding: Encoding, _plainContextString: string): string { + throw new Error( + 'getStreamingPaths not implemented. Client sdks must implement getStreamingPaths for streaming with REPORT to work.', + ); + }, + }; } - /** - * Generates the url path for polling. - * @param _context - * - * @protected This function must be overridden in subclasses for polling - * to work. - * @param _context The LDContext object - */ - protected createPollUriPath(_context: LDContext): string { - throw new Error( - 'createPollUriPath not implemented. Client sdks must implement createPollUriPath for polling to work.', - ); + protected getPollingPaths(): DataSourcePaths { + return { + pathGet(_encoding: Encoding, _plainContextString: string): string { + throw new Error( + 'getPollingPaths not implemented. Client sdks must implement getPollingPaths for polling with GET to work.', + ); + }, + pathReport(_encoding: Encoding, _plainContextString: string): string { + throw new Error( + 'getPollingPaths not implemented. Client sdks must implement getPollingPaths for polling with REPORT to work.', + ); + }, + }; } private createIdentifyPromise(timeout: number) { @@ -414,17 +416,19 @@ export default class LDClientImpl implements LDClient { identifyResolve: any, identifyReject: any, ) { - const parameters: { key: string; value: string }[] = []; - if (this.config.withReasons) { - parameters.push({ key: 'withReasons', value: 'true' }); - } - this.updateProcessor = new PollingProcessor( - this.clientContext.platform.requests, - this.createPollUriPath(context), - parameters, - this.config, - this.baseHeaders, + JSON.stringify(context), + { + credential: this.sdkKey, + serviceEndpoints: this.config.serviceEndpoints, + paths: this.getPollingPaths(), + baseHeaders: this.baseHeaders, + pollInterval: this.config.pollInterval, + withReasons: this.config.withReasons, + useReport: this.config.useReport, + }, + this.platform.requests, + this.platform.encoding!, async (flags) => { this.logger.debug(`Handling polling result: ${Object.keys(flags)}`); @@ -452,17 +456,20 @@ export default class LDClientImpl implements LDClient { identifyResolve: any, identifyReject: any, ) { - const parameters: { key: string; value: string }[] = []; - if (this.config.withReasons) { - parameters.push({ key: 'withReasons', value: 'true' }); - } - - this.updateProcessor = new internal.StreamingProcessor( - this.clientContext, - this.createStreamUriPath(context), - parameters, + this.updateProcessor = new StreamingProcessor( + JSON.stringify(context), + { + credential: this.sdkKey, + serviceEndpoints: this.config.serviceEndpoints, + paths: this.getStreamingPaths(), + baseHeaders: this.baseHeaders, + initialRetryDelayMillis: this.config.streamInitialReconnectDelay * 1000, + withReasons: this.config.withReasons, + useReport: this.config.useReport, + }, this.createStreamListeners(checkedContext, identifyResolve), - this.baseHeaders, + this.platform.requests, + this.platform.encoding!, this.diagnosticsManager, (e) => { identifyReject(e); diff --git a/packages/shared/sdk-client/src/api/LDOptions.ts b/packages/shared/sdk-client/src/api/LDOptions.ts index 8d098c881..70e6599bc 100644 --- a/packages/shared/sdk-client/src/api/LDOptions.ts +++ b/packages/shared/sdk-client/src/api/LDOptions.ts @@ -198,6 +198,14 @@ export interface LDOptions { */ pollInterval?: number; + /** + * Directs the SDK to use the REPORT method for HTTP requests instead of GET. (Default: `false`) + * + * This setting applies both to requests to the streaming service, as well as flag requests when the SDK is in polling + * mode. + */ + useReport?: boolean; + /** * Whether LaunchDarkly should provide additional information about how flag values were * calculated. diff --git a/packages/shared/sdk-client/src/configuration/Configuration.ts b/packages/shared/sdk-client/src/configuration/Configuration.ts index cef4678c9..e68dd9f74 100644 --- a/packages/shared/sdk-client/src/configuration/Configuration.ts +++ b/packages/shared/sdk-client/src/configuration/Configuration.ts @@ -31,14 +31,14 @@ export default class Configuration { public readonly flushInterval = 30; public readonly streamInitialReconnectDelay = 1; - public readonly allAttributesPrivate = false; - public readonly debug = false; - public readonly diagnosticOptOut = false; - public readonly sendEvents = true; - public readonly sendLDHeaders = true; + public readonly allAttributesPrivate: boolean = false; + public readonly debug: boolean = false; + public readonly diagnosticOptOut: boolean = false; + public readonly sendEvents: boolean = true; + public readonly sendLDHeaders: boolean = true; - public readonly useReport = false; - public readonly withReasons = false; + public readonly useReport: boolean = false; + public readonly withReasons: boolean = false; public readonly privateAttributes: string[] = []; @@ -82,6 +82,8 @@ export default class Configuration { internalOptions.includeAuthorizationHeader, pristineOptions.payloadFilterKey, ); + this.useReport = pristineOptions.useReport ?? false; + this.tags = new ApplicationTags({ application: this.applicationInfo, logger: this.logger }); this.userAgentHeaderName = internalOptions.userAgentHeaderName ?? 'user-agent'; } diff --git a/packages/shared/sdk-client/src/configuration/validators.ts b/packages/shared/sdk-client/src/configuration/validators.ts index cc22874b4..17851bff1 100644 --- a/packages/shared/sdk-client/src/configuration/validators.ts +++ b/packages/shared/sdk-client/src/configuration/validators.ts @@ -35,6 +35,8 @@ const validators: Record = { pollInterval: TypeValidators.numberWithMin(30), + useReport: TypeValidators.Boolean, + privateAttributes: TypeValidators.StringArray, applicationInfo: TypeValidators.Object, diff --git a/packages/shared/sdk-client/src/index.ts b/packages/shared/sdk-client/src/index.ts index 59821cf54..a4186185d 100644 --- a/packages/shared/sdk-client/src/index.ts +++ b/packages/shared/sdk-client/src/index.ts @@ -16,4 +16,6 @@ export type { ConnectionMode, } from './api'; +export { DataSourcePaths } from './streaming'; + export { LDClientImpl }; diff --git a/packages/shared/sdk-client/src/polling/PollingProcessor.ts b/packages/shared/sdk-client/src/polling/PollingProcessor.ts index 5a11add60..1e4f229dd 100644 --- a/packages/shared/sdk-client/src/polling/PollingProcessor.ts +++ b/packages/shared/sdk-client/src/polling/PollingProcessor.ts @@ -1,43 +1,27 @@ import { - ApplicationTags, + Encoding, getPollingUri, httpErrorMessage, HttpErrorResponse, isHttpRecoverable, - LDHeaders, LDLogger, LDPollingError, Requests, - ServiceEndpoints, subsystem, } from '@launchdarkly/js-sdk-common'; +import { PollingDataSourceConfig } from '../streaming/DataSourceConfig'; import { Flags } from '../types'; import Requestor, { LDRequestError } from './Requestor'; export type PollingErrorHandler = (err: LDPollingError) => void; -/** - * Subset of configuration required for polling. - * - * @internal - */ -export type PollingConfig = { - logger: LDLogger; - pollInterval: number; - tags: ApplicationTags; - useReport: boolean; - serviceEndpoints: ServiceEndpoints; -}; - /** * @internal */ export default class PollingProcessor implements subsystem.LDStreamProcessor { private stopped = false; - private logger?: LDLogger; - private pollInterval: number; private timeoutHandle: any; @@ -45,19 +29,36 @@ export default class PollingProcessor implements subsystem.LDStreamProcessor { private requestor: Requestor; constructor( + private readonly plainContextString: string, + private readonly dataSourceConfig: PollingDataSourceConfig, requests: Requests, - uriPath: string, - parameters: { key: string; value: string }[], - config: PollingConfig, - baseHeaders: LDHeaders, + encoding: Encoding, private readonly dataHandler: (flags: Flags) => void, private readonly errorHandler?: PollingErrorHandler, + private readonly logger?: LDLogger, ) { - const uri = getPollingUri(config.serviceEndpoints, uriPath, parameters); - this.logger = config.logger; - this.pollInterval = config.pollInterval; + const path = dataSourceConfig.useReport + ? dataSourceConfig.paths.pathReport(encoding, plainContextString) + : dataSourceConfig.paths.pathGet(encoding, plainContextString); + + const parameters: { key: string; value: string }[] = []; + if (this.dataSourceConfig.withReasons) { + parameters.push({ key: 'withReasons', value: 'true' }); + } + + const uri = getPollingUri(dataSourceConfig.serviceEndpoints, path, parameters); + this.pollInterval = dataSourceConfig.pollInterval; + + let method = 'GET'; + const headers: { [key: string]: string } = { ...dataSourceConfig.baseHeaders }; + let body; + if (dataSourceConfig.useReport) { + method = 'REPORT'; + headers['content-type'] = 'application/json'; + body = plainContextString; // context is in body for REPORT + } - this.requestor = new Requestor(requests, uri, config.useReport, baseHeaders); + this.requestor = new Requestor(requests, uri, headers, method, body); } private async poll() { diff --git a/packages/shared/sdk-client/src/polling/Requestor.ts b/packages/shared/sdk-client/src/polling/Requestor.ts index ec433b550..48ba95b26 100644 --- a/packages/shared/sdk-client/src/polling/Requestor.ts +++ b/packages/shared/sdk-client/src/polling/Requestor.ts @@ -1,5 +1,5 @@ // eslint-disable-next-line max-classes-per-file -import { HttpErrorResponse, LDHeaders, Requests } from '@launchdarkly/js-sdk-common'; +import { HttpErrorResponse, Requests } from '@launchdarkly/js-sdk-common'; function isOk(status: number) { return status >= 200 && status <= 299; @@ -22,25 +22,21 @@ export class LDRequestError extends Error implements HttpErrorResponse { * @internal */ export default class Requestor { - private readonly headers: { [key: string]: string }; - private verb: string; - constructor( private requests: Requests, private readonly uri: string, - useReport: boolean, - baseHeaders: LDHeaders, - ) { - this.headers = { ...baseHeaders }; - this.verb = useReport ? 'REPORT' : 'GET'; - } + private readonly headers: { [key: string]: string }, + private readonly method: string, + private readonly body?: string, + ) {} async requestPayload(): Promise { let status: number | undefined; try { const res = await this.requests.fetch(this.uri, { - method: this.verb, + method: this.method, headers: this.headers, + body: this.body, }); if (isOk(res.status)) { return await res.text(); diff --git a/packages/shared/sdk-client/src/streaming/DataSourceConfig.ts b/packages/shared/sdk-client/src/streaming/DataSourceConfig.ts new file mode 100644 index 000000000..41ce87b40 --- /dev/null +++ b/packages/shared/sdk-client/src/streaming/DataSourceConfig.ts @@ -0,0 +1,23 @@ +import { Encoding, LDHeaders, ServiceEndpoints } from '@launchdarkly/js-sdk-common'; + +export interface DataSourceConfig { + credential: string; + serviceEndpoints: ServiceEndpoints; + baseHeaders: LDHeaders; + withReasons: boolean; + useReport: boolean; + paths: DataSourcePaths; +} + +export interface PollingDataSourceConfig extends DataSourceConfig { + pollInterval: number; +} + +export interface StreamingDataSourceConfig extends DataSourceConfig { + initialRetryDelayMillis: number; +} + +export interface DataSourcePaths { + pathGet(encoding: Encoding, plainContextString: string): string; + pathReport(encoding: Encoding, plainContextString: string): string; +} diff --git a/packages/shared/sdk-client/src/streaming/StreamingProcessor.ts b/packages/shared/sdk-client/src/streaming/StreamingProcessor.ts new file mode 100644 index 000000000..7c7a083a1 --- /dev/null +++ b/packages/shared/sdk-client/src/streaming/StreamingProcessor.ts @@ -0,0 +1,181 @@ +import { + Encoding, + EventName, + EventSource, + getStreamingUri, + httpErrorMessage, + HttpErrorResponse, + internal, + LDLogger, + LDStreamingError, + ProcessStreamResponse, + Requests, + shouldRetry, + subsystem, +} from '@launchdarkly/js-sdk-common'; + +import { StreamingDataSourceConfig } from './DataSourceConfig'; + +const reportJsonError = ( + type: string, + data: string, + logger?: LDLogger, + errorHandler?: internal.StreamingErrorHandler, +) => { + logger?.error(`Stream received invalid data in "${type}" message`); + logger?.debug(`Invalid JSON follows: ${data}`); + errorHandler?.(new LDStreamingError('Malformed JSON data in event stream')); +}; + +class StreamingProcessor implements subsystem.LDStreamProcessor { + private readonly headers: { [key: string]: string | string[] }; + private readonly streamUri: string; + + private eventSource?: EventSource; + private connectionAttemptStartTime?: number; + + constructor( + private readonly plainContextString: string, + private readonly dataSourceConfig: StreamingDataSourceConfig, + private readonly listeners: Map, + private readonly requests: Requests, + encoding: Encoding, + private readonly diagnosticsManager?: internal.DiagnosticsManager, + private readonly errorHandler?: internal.StreamingErrorHandler, + private readonly logger?: LDLogger, + ) { + // TODO: SC-255969 Implement better REPORT fallback logic + if (dataSourceConfig.useReport && !requests.getEventSourceCapabilities().customMethod) { + logger?.error( + "Configuration option useReport is true, but platform's EventSource does not support custom HTTP methods. Streaming may not work.", + ); + } + + const path = dataSourceConfig.useReport + ? dataSourceConfig.paths.pathReport(encoding, plainContextString) + : dataSourceConfig.paths.pathGet(encoding, plainContextString); + + const parameters: { key: string; value: string }[] = []; + if (this.dataSourceConfig.withReasons) { + parameters.push({ key: 'withReasons', value: 'true' }); + } + + this.requests = requests; + this.headers = { ...dataSourceConfig.baseHeaders }; + this.logger = logger; + this.streamUri = getStreamingUri(dataSourceConfig.serviceEndpoints, path, parameters); + } + + private logConnectionStarted() { + this.connectionAttemptStartTime = Date.now(); + } + + private logConnectionResult(success: boolean) { + if (this.connectionAttemptStartTime && this.diagnosticsManager) { + this.diagnosticsManager.recordStreamInit( + this.connectionAttemptStartTime, + !success, + Date.now() - this.connectionAttemptStartTime, + ); + } + + this.connectionAttemptStartTime = undefined; + } + + /** + * This is a wrapper around the passed errorHandler which adds additional + * diagnostics and logging logic. + * + * @param err The error to be logged and handled. + * @return boolean whether to retry the connection. + * + * @private + */ + private retryAndHandleError(err: HttpErrorResponse) { + if (!shouldRetry(err)) { + this.logConnectionResult(false); + this.errorHandler?.(new LDStreamingError(err.message, err.status)); + this.logger?.error(httpErrorMessage(err, 'streaming request')); + return false; + } + + this.logger?.warn(httpErrorMessage(err, 'streaming request', 'will retry')); + this.logConnectionResult(false); + this.logConnectionStarted(); + return true; + } + + start() { + this.logConnectionStarted(); + + let methodAndBodyOverrides; + if (this.dataSourceConfig.useReport) { + // REPORT will include a body, so content type is required. + this.headers['content-type'] = 'application/json'; + + // orverrides default method with REPORT and adds body. + methodAndBodyOverrides = { method: 'REPORT', body: this.plainContextString }; + } else { + // no method or body override + methodAndBodyOverrides = {}; + } + + // TLS is handled by the platform implementation. + const eventSource = this.requests.createEventSource(this.streamUri, { + headers: this.headers, // adds content-type header required when body will be present + ...methodAndBodyOverrides, + errorFilter: (error: HttpErrorResponse) => this.retryAndHandleError(error), + initialRetryDelayMillis: this.dataSourceConfig.initialRetryDelayMillis, + readTimeoutMillis: 5 * 60 * 1000, + retryResetIntervalMillis: 60 * 1000, + }); + this.eventSource = eventSource; + + eventSource.onclose = () => { + this.logger?.info('Closed LaunchDarkly stream connection'); + }; + + eventSource.onerror = () => { + // The work is done by `errorFilter`. + }; + + eventSource.onopen = () => { + this.logger?.info('Opened LaunchDarkly stream connection'); + }; + + eventSource.onretrying = (e) => { + this.logger?.info(`Will retry stream connection in ${e.delayMillis} milliseconds`); + }; + + this.listeners.forEach(({ deserializeData, processJson }, eventName) => { + eventSource.addEventListener(eventName, (event) => { + this.logger?.debug(`Received ${eventName} event`); + + if (event?.data) { + this.logConnectionResult(true); + const { data } = event; + const dataJson = deserializeData(data); + + if (!dataJson) { + reportJsonError(eventName, data, this.logger, this.errorHandler); + return; + } + processJson(dataJson); + } else { + this.errorHandler?.(new LDStreamingError('Unexpected payload from event stream')); + } + }); + }); + } + + stop() { + this.eventSource?.close(); + this.eventSource = undefined; + } + + close() { + this.stop(); + } +} + +export default StreamingProcessor; diff --git a/packages/shared/sdk-client/src/streaming/index.ts b/packages/shared/sdk-client/src/streaming/index.ts new file mode 100644 index 000000000..172e6c5f7 --- /dev/null +++ b/packages/shared/sdk-client/src/streaming/index.ts @@ -0,0 +1,8 @@ +import { + DataSourcePaths, + PollingDataSourceConfig, + StreamingDataSourceConfig, +} from './DataSourceConfig'; +import StreamingProcessor from './StreamingProcessor'; + +export { DataSourcePaths, PollingDataSourceConfig, StreamingProcessor, StreamingDataSourceConfig };