Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: rework how http source uses the transformer #537

Merged
merged 1 commit into from
Dec 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 33 additions & 92 deletions packages/source-http/src/__tests__/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -142,47 +142,6 @@ describe('GIVEN an HTTP Source ', () => {
});
});

describe('WHEN no transformResponseToPagesModulePath is undefined', () => {
const server = setupServer();
beforeAll(() => {
server.use(...successHandlers);
server.listen({ onUnhandledRequest: 'warn' });
});
afterAll(() => {
server.close();
});

it('should merge results from all endpoints into 1 array', done => {
const source$: Observable<Page[]> = Source.create(
{ ...options, transformResponseToPagesModulePath: undefined },
{ schedule }
);

source$.pipe(take(1)).subscribe({
next: result => {
expect(result.length).toEqual(3);
},
complete: () => done()
});
});

it('no transformation of responses is carried out', done => {
const source$: Observable<Page[]> = Source.create(
{ ...options, transformResponseToPagesModulePath: undefined },
{ schedule }
);

source$.pipe(take(1)).subscribe({
next: result => {
expect(result[0]).toEqual({ name: 'Alice' });
expect(result[1]).toEqual({ name: 'Bob' });
expect(result[2]).toEqual({ name: 'Eve' });
},
complete: () => done()
});
});
});

describe('WHEN noProxy option is used', () => {
const server = setupServer();
beforeAll(() => {
Expand Down Expand Up @@ -226,7 +185,10 @@ describe('GIVEN the createHttpSource function ', () => {
});

it('should merge results from all endpoints into 1 array', done => {
const source$: Observable<Page[]> = createHttpSource(options, { schedule });
const source$: Observable<Page[]> = createHttpSource(
{ ...options, transformer: toUpperCaseTransformer },
{ schedule }
);

source$.pipe(take(1)).subscribe({
next: result => {
Expand All @@ -237,7 +199,10 @@ describe('GIVEN the createHttpSource function ', () => {
});

it('should transform the responses using the transform function', done => {
const source$: Observable<Page[]> = createHttpSource(options, { schedule });
const source$: Observable<Page[]> = createHttpSource(
{ ...options, transformer: toUpperCaseTransformer },
{ schedule }
);

source$.pipe(take(1)).subscribe({
next: result => {
Expand Down Expand Up @@ -271,7 +236,10 @@ describe('GIVEN the createHttpSource function ', () => {
});

it('should merge results from **successful** endpoints into 1 array', done => {
const source$: Observable<Page[]> = createHttpSource(options, { schedule });
const source$: Observable<Page[]> = createHttpSource(
{ ...options, transformer: toUpperCaseTransformer },
{ schedule }
);

source$.pipe(take(1)).subscribe({
next: result => {
Expand All @@ -283,63 +251,30 @@ describe('GIVEN the createHttpSource function ', () => {
});
});

describe('WHEN the transformer has a request config', () => {
describe('WHEN the transformer is passed params', () => {
const server = setupServer();
const mockTransformer = jest.fn();
beforeAll(() => {
server.use(...successHandlers);
server.listen({ onUnhandledRequest: 'warn' });
});
afterAll(() => {
server.close();
});
it('should merge results from **successful** endpoints into 1 array', done => {
const source$: Observable<Page[]> = createHttpSource(options, { schedule });

source$.pipe(take(1)).subscribe({
next: result => {
expect(result.length).toEqual(3);
expect(result[0]).toEqual('ALICE');
},
complete: () => done()
});
});
});

describe('WHEN no transformResponseToPagesModulePath is undefined', () => {
const server = setupServer();
beforeAll(() => {
server.use(...successHandlers);
server.listen({ onUnhandledRequest: 'warn' });
});
afterAll(() => {
server.close();
});

it('should merge results from all endpoints into 1 array', done => {
it('should pass transformer options to the transformer', done => {
const source$: Observable<Page[]> = createHttpSource(
{ ...options, transformResponseToPagesModulePath: undefined },
{ ...options, transformer: mockTransformer, transformerOptions: { option: 'an option' } },
{ schedule }
);

source$.pipe(take(1)).subscribe({
next: result => {
expect(result.length).toEqual(3);
},
complete: () => done()
});
});

it('no transformation of responses is carried out', done => {
const source$: Observable<Page[]> = createHttpSource(
{ ...options, transformResponseToPagesModulePath: undefined },
{ schedule }
);

source$.pipe(take(1)).subscribe({
next: result => {
expect(result[0]).toEqual({ name: 'Alice' });
expect(result[1]).toEqual({ name: 'Bob' });
expect(result[2]).toEqual({ name: 'Eve' });
expect(mockTransformer).toBeCalledTimes(3);
expect(mockTransformer.mock.calls[0][0]).toEqual({ name: 'Alice' });
expect(mockTransformer.mock.calls[0][1]).toEqual(options.prefixDir);
expect(mockTransformer.mock.calls[0][2]).toEqual(0);
expect(mockTransformer.mock.calls[0][3]).toEqual({ option: 'an option' });
},
complete: () => done()
});
Expand All @@ -361,9 +296,12 @@ describe('GIVEN the createHttpSource function ', () => {
});

it('should merge results from all endpoints into 1 array', done => {
const source$: Observable<Page[]> = createHttpSource(configuredRequestsOptions, {
schedule
});
const source$: Observable<Page[]> = createHttpSource(
{ ...configuredRequestsOptions, transformer: toUpperCaseTransformer },
{
schedule
}
);

source$.pipe(take(1)).subscribe({
next: result => {
Expand All @@ -374,9 +312,12 @@ describe('GIVEN the createHttpSource function ', () => {
});

it('should transform the responses using the transform function', done => {
const source$: Observable<Page[]> = createHttpSource(configuredRequestsOptions, {
schedule
});
const source$: Observable<Page[]> = createHttpSource(
{ ...configuredRequestsOptions, transformer: toUpperCaseTransformer },
{
schedule
}
);

source$.pipe(take(1)).subscribe({
next: result => {
Expand Down
99 changes: 99 additions & 0 deletions packages/source-http/src/createHttpSource.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
import { forkJoin, timer } from 'rxjs';
import { switchMap, map } from 'rxjs/operators';
import { z } from 'zod';
import type { Page, SourceConfig } from '@jpmorganchase/mosaic-types';
import { fromHttpRequest, isErrorResponse } from '@jpmorganchase/mosaic-from-http-request';
import { sourceScheduleSchema, validateMosaicSchema } from '@jpmorganchase/mosaic-schemas';

import { createProxyAgent } from './proxyAgent.js';
import { ResponseTransformer } from './fromDynamicImport.js';

export { createProxyAgent };

export type HttpSourceResponseTransformerType<TResponse, TPage> = ResponseTransformer<
TResponse,
TPage
>;

export const httpSourceCreatorSchema = z.object({
schedule: sourceScheduleSchema.optional(),
endpoints: z.array(z.string().url()).optional().default([]),
prefixDir: z.string({ required_error: 'Please provide a prefixDir' }),
requestTimeout: z.number().optional().default(5000),
proxyEndpoint: z.string().url().optional(),
noProxy: z
.any()
.transform(val => new RegExp(val))
.optional(),
requestHeaders: z.object({}).passthrough().optional(),
transformerOptions: z.unknown().optional()
});

export interface CreateHttpSourceParams<TResponse, TPage>
extends z.input<typeof httpSourceCreatorSchema> {
configuredRequests?: Request[];
transformer: HttpSourceResponseTransformerType<TResponse, TPage>;
}

/**
* For use inside *other* sources.
* Allows a transformer function to be passed directly without the need for dynamic imports.
*
*/
export function createHttpSource<TResponse, TPage extends Page>(
{ configuredRequests, transformer, ...restOptions }: CreateHttpSourceParams<TResponse, TPage>,
{ schedule }: SourceConfig
) {
const {
endpoints,
prefixDir,
proxyEndpoint,
noProxy,
requestHeaders,
requestTimeout,
transformerOptions
} = validateMosaicSchema(httpSourceCreatorSchema, restOptions);

const delayMs = schedule.checkIntervalMins * 60000;
let requests = configuredRequests || [];

if (endpoints.length > 0) {
requests = endpoints.map(endpoint => {
let agent;
const headers = requestHeaders
? (requestHeaders as HeadersInit)
: {
'Content-Type': 'application/json'
};

if (!noProxy?.test(endpoint) && proxyEndpoint) {
agent = createProxyAgent(proxyEndpoint);
}

return new Request(new URL(endpoint), {
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-ignore
agent,
headers,
timeout: requestTimeout
});
});
}

return timer(schedule.initialDelayMs, delayMs).pipe(
switchMap(() => {
const fetches = requests.map((request, index) =>
fromHttpRequest<TResponse>(request).pipe(
map(response => {
if (isErrorResponse<TResponse>(response)) {
return [];
}

return transformer(response, prefixDir, index, transformerOptions);
})
)
);
return forkJoin(fetches).pipe(map(result => result.flat()));
})
);
}
25 changes: 11 additions & 14 deletions packages/source-http/src/fromDynamicImport.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
import { distinctUntilChanged, from, iif, of, switchMap } from 'rxjs';
import type { Page } from '@jpmorganchase/mosaic-types';
import { distinctUntilChanged, from, switchMap } from 'rxjs';

export type ResponseTransformer<TResponse, TOptions> = (
export type ResponseTransformer<TResponse, TPage> = (
response: TResponse,
prefixDir: string,
index: number,
options?: TOptions
) => Array<TResponse>;
...rest: any[]
) => Array<TPage>;

async function importTransformer<T, O>(
async function importTransformer<TResponse, TPage>(
modulePath: string
): Promise<{
transformer: ResponseTransformer<T, O>;
transformer: ResponseTransformer<TResponse, TPage>;
}> {
const { default: transformResponseToPages } = await import(modulePath);
if (!transformResponseToPages) {
Expand All @@ -20,12 +21,8 @@ async function importTransformer<T, O>(
return { transformer: transformResponseToPages };
}

export const fromDynamicImport = <TResponse = unknown, TOptions = unknown>(modulePath?: string) =>
iif(
() => modulePath === undefined,
of({ transformer: null }),
from(String(modulePath)).pipe(
distinctUntilChanged(),
switchMap(() => importTransformer<TResponse, TOptions>(String(modulePath)))
)
export const fromDynamicImport = <TResponse = unknown, TPage = Page>(modulePath: string) =>
from(modulePath).pipe(
distinctUntilChanged(),
switchMap(() => importTransformer<TResponse, TPage>(modulePath))
);
Loading
Loading