diff --git a/lib/DBSQLClient.ts b/lib/DBSQLClient.ts index 620143f4..57d7225d 100644 --- a/lib/DBSQLClient.ts +++ b/lib/DBSQLClient.ts @@ -77,10 +77,10 @@ export default class DBSQLClient extends EventEmitter implements IDBSQLClient, I useArrowNativeTypes: true, socketTimeout: 15 * 60 * 1000, // 15 minutes - retryMaxAttempts: 30, - retriesTimeout: 900 * 1000, - retryDelayMin: 1 * 1000, - retryDelayMax: 60 * 1000, + retryMaxAttempts: 5, + retriesTimeout: 15 * 60 * 1000, // 15 minutes + retryDelayMin: 1 * 1000, // 1 second + retryDelayMax: 60 * 1000, // 60 seconds (1 minute) useCloudFetch: false, cloudFetchConcurrentDownloads: 10, diff --git a/lib/connection/connections/HttpConnection.ts b/lib/connection/connections/HttpConnection.ts index 9a413ade..e113da5f 100644 --- a/lib/connection/connections/HttpConnection.ts +++ b/lib/connection/connections/HttpConnection.ts @@ -1,10 +1,10 @@ import thrift from 'thrift'; import https from 'https'; import http from 'http'; -import { HeadersInit, Response } from 'node-fetch'; +import { HeadersInit } from 'node-fetch'; import { ProxyAgent } from 'proxy-agent'; -import IConnectionProvider from '../contracts/IConnectionProvider'; +import IConnectionProvider, { HttpTransactionDetails } from '../contracts/IConnectionProvider'; import IConnectionOptions, { ProxyOptions } from '../contracts/IConnectionOptions'; import IClientContext from '../../contracts/IClientContext'; @@ -120,7 +120,7 @@ export default class HttpConnection implements IConnectionProvider { return this.connection; } - public async getRetryPolicy(): Promise> { + public async getRetryPolicy(): Promise> { return new HttpRetryPolicy(this.context); } } diff --git a/lib/connection/connections/HttpRetryPolicy.ts b/lib/connection/connections/HttpRetryPolicy.ts index 6d649555..36506aee 100644 --- a/lib/connection/connections/HttpRetryPolicy.ts +++ b/lib/connection/connections/HttpRetryPolicy.ts @@ -1,20 +1,15 @@ -import { Response } from 'node-fetch'; import IRetryPolicy, { ShouldRetryResult, RetryableOperation } from '../contracts/IRetryPolicy'; -import IClientContext, { ClientConfig } from '../../contracts/IClientContext'; +import { HttpTransactionDetails } from '../contracts/IConnectionProvider'; +import IClientContext from '../../contracts/IClientContext'; import RetryError, { RetryErrorCode } from '../../errors/RetryError'; -function getRetryDelay(attempt: number, config: ClientConfig): number { - const scale = Math.max(1, 1.5 ** (attempt - 1)); // ensure scale >= 1 - return Math.min(config.retryDelayMin * scale, config.retryDelayMax); -} - function delay(milliseconds: number): Promise { return new Promise((resolve) => { setTimeout(() => resolve(), milliseconds); }); } -export default class HttpRetryPolicy implements IRetryPolicy { +export default class HttpRetryPolicy implements IRetryPolicy { private context: IClientContext; private readonly startTime: number; // in milliseconds @@ -27,53 +22,81 @@ export default class HttpRetryPolicy implements IRetryPolicy { this.attempt = 0; } - public async shouldRetry(response: Response): Promise { - if (!response.ok) { - switch (response.status) { - // On these status codes it's safe to retry the request. However, - // both error codes mean that server is overwhelmed or even down. - // Therefore, we need to add some delay between attempts so - // server can recover and more likely handle next request - case 429: // Too Many Requests - case 503: // Service Unavailable - this.attempt += 1; - - const clientConfig = this.context.getConfig(); + public async shouldRetry(details: HttpTransactionDetails): Promise { + if (this.isRetryable(details)) { + const clientConfig = this.context.getConfig(); - // Delay interval depends on current attempt - the more attempts we do - // the longer the interval will be - // TODO: Respect `Retry-After` header (PECO-729) - const retryDelay = getRetryDelay(this.attempt, clientConfig); - - const attemptsExceeded = this.attempt >= clientConfig.retryMaxAttempts; - if (attemptsExceeded) { - throw new RetryError(RetryErrorCode.AttemptsExceeded, response); - } + // Don't retry if overall retry timeout exceeded + const timeoutExceeded = Date.now() - this.startTime >= clientConfig.retriesTimeout; + if (timeoutExceeded) { + throw new RetryError(RetryErrorCode.TimeoutExceeded, details); + } - const timeoutExceeded = Date.now() - this.startTime + retryDelay >= clientConfig.retriesTimeout; - if (timeoutExceeded) { - throw new RetryError(RetryErrorCode.TimeoutExceeded, response); - } + this.attempt += 1; - return { shouldRetry: true, retryAfter: retryDelay }; + // Don't retry if max attempts count reached + const attemptsExceeded = this.attempt >= clientConfig.retryMaxAttempts; + if (attemptsExceeded) { + throw new RetryError(RetryErrorCode.AttemptsExceeded, details); + } - // TODO: Here we should handle other error types (see PECO-730) + // If possible, use `Retry-After` header as a floor for a backoff algorithm + const retryAfterHeader = this.getRetryAfterHeader(details, clientConfig.retryDelayMin); + const retryAfter = this.getBackoffDelay( + this.attempt, + retryAfterHeader ?? clientConfig.retryDelayMin, + clientConfig.retryDelayMax, + ); - // no default - } + return { shouldRetry: true, retryAfter }; } return { shouldRetry: false }; } - public async invokeWithRetry(operation: RetryableOperation): Promise { + public async invokeWithRetry(operation: RetryableOperation): Promise { for (;;) { - const response = await operation(); // eslint-disable-line no-await-in-loop - const status = await this.shouldRetry(response); // eslint-disable-line no-await-in-loop + const details = await operation(); // eslint-disable-line no-await-in-loop + const status = await this.shouldRetry(details); // eslint-disable-line no-await-in-loop if (!status.shouldRetry) { - return response; + return details; } await delay(status.retryAfter); // eslint-disable-line no-await-in-loop } } + + protected isRetryable({ response }: HttpTransactionDetails): boolean { + const statusCode = response.status; + + const result = + // Retry on all codes below 100 + statusCode < 100 || + // ...and on `429 Too Many Requests` + statusCode === 429 || + // ...and on all `5xx` codes except for `501 Not Implemented` + (statusCode >= 500 && statusCode !== 501); + + return result; + } + + protected getRetryAfterHeader({ response }: HttpTransactionDetails, delayMin: number): number | undefined { + // `Retry-After` header may contain a date after which to retry, or delay seconds. We support only delay seconds. + // Value from `Retry-After` header is used when: + // 1. it's available and is non-empty + // 2. it could be parsed as a number, and is greater than zero + // 3. additionally, we clamp it to not be smaller than minimal retry delay + const header = response.headers.get('Retry-After') || ''; + if (header !== '') { + const value = Number(header); + if (Number.isFinite(value) && value > 0) { + return Math.max(delayMin, value); + } + } + return undefined; + } + + protected getBackoffDelay(attempt: number, delayMin: number, delayMax: number): number { + const value = 2 ** attempt * delayMin; + return Math.min(value, delayMax); + } } diff --git a/lib/connection/connections/NullRetryPolicy.ts b/lib/connection/connections/NullRetryPolicy.ts new file mode 100644 index 00000000..3f3fc75d --- /dev/null +++ b/lib/connection/connections/NullRetryPolicy.ts @@ -0,0 +1,13 @@ +import IRetryPolicy, { ShouldRetryResult, RetryableOperation } from '../contracts/IRetryPolicy'; + +export default class NullRetryPolicy implements IRetryPolicy { + // eslint-disable-next-line @typescript-eslint/no-unused-vars + public async shouldRetry(details: R): Promise { + return { shouldRetry: false }; + } + + public async invokeWithRetry(operation: RetryableOperation): Promise { + // Just invoke the operation, don't attempt to retry it + return operation(); + } +} diff --git a/lib/connection/connections/ThriftHttpConnection.ts b/lib/connection/connections/ThriftHttpConnection.ts index 27612ce0..2b3b493c 100644 --- a/lib/connection/connections/ThriftHttpConnection.ts +++ b/lib/connection/connections/ThriftHttpConnection.ts @@ -6,10 +6,12 @@ import { EventEmitter } from 'events'; import { TBinaryProtocol, TBufferedTransport, Thrift, TProtocol, TProtocolConstructor, TTransport } from 'thrift'; -import fetch, { RequestInit, HeadersInit, Response, FetchError } from 'node-fetch'; +import fetch, { RequestInit, HeadersInit, Request, Response, FetchError } from 'node-fetch'; // @ts-expect-error TS7016: Could not find a declaration file for module import InputBufferUnderrunError from 'thrift/lib/nodejs/lib/thrift/input_buffer_underrun_error'; import IRetryPolicy from '../contracts/IRetryPolicy'; +import { HttpTransactionDetails } from '../contracts/IConnectionProvider'; +import NullRetryPolicy from './NullRetryPolicy'; export class THTTPException extends Thrift.TApplicationException { public readonly statusCode: unknown; @@ -32,7 +34,7 @@ interface ThriftHttpConnectionOptions { url: string; transport?: TTransportType; protocol?: TProtocolConstructor; - getRetryPolicy(): Promise>; + getRetryPolicy(): Promise>; } // This type describes a shape of internals of Thrift client object. @@ -47,19 +49,37 @@ type ThriftClient = { [key: string]: (input: TProtocol, mtype: Thrift.MessageType, seqId: number) => void; }; +const retryableThriftMethods = new Set([ + 'GetOperationStatus', + 'CancelOperation', + 'CloseOperation', + 'GetResultSetMetadata', + 'CloseSession', + 'GetInfo', + 'GetTypeInfo', + 'GetCatalogs', + 'GetSchemas', + 'GetTables', + 'GetTableTypes', + 'GetColumns', + 'GetFunctions', + 'GetPrimaryKeys', + 'GetCrossReference', +]); + export default class ThriftHttpConnection extends EventEmitter { private readonly url: string; private config: RequestInit; + private options: ThriftHttpConnectionOptions; + // This field is used by Thrift internally, so name and type are important private readonly transport: TTransportType; // This field is used by Thrift internally, so name and type are important private readonly protocol: TProtocolConstructor; - private readonly getRetryPolicy: () => Promise>; - // thrift.createClient sets this field internally public client?: ThriftClient; @@ -67,9 +87,18 @@ export default class ThriftHttpConnection extends EventEmitter { super(); this.url = options.url; this.config = config; + this.options = options; this.transport = options.transport ?? TBufferedTransport; this.protocol = options.protocol ?? TBinaryProtocol; - this.getRetryPolicy = options.getRetryPolicy; + } + + protected async getRetryPolicy(thriftMethodName?: string): Promise> { + // Allow retry behavior only for Thrift operations that are for sure safe to retry + if (thriftMethodName && retryableThriftMethods.has(thriftMethodName)) { + return this.options.getRetryPolicy(); + } + // Don't retry everything that is not explicitly allowed to retry + return new NullRetryPolicy(); } public setHeaders(headers: HeadersInit) { @@ -92,12 +121,16 @@ export default class ThriftHttpConnection extends EventEmitter { body: data, }; - this.getRetryPolicy() + this.getThriftMethodName(data) + .then((thriftMethod) => this.getRetryPolicy(thriftMethod)) .then((retryPolicy) => { - const makeRequest = () => fetch(this.url, requestConfig); + const makeRequest = () => { + const request = new Request(this.url, requestConfig); + return fetch(request).then((response) => ({ request, response })); + }; return retryPolicy.invokeWithRetry(makeRequest); }) - .then((response) => { + .then(({ response }) => { if (response.status !== 200) { throw new THTTPException(response); } @@ -131,6 +164,23 @@ export default class ThriftHttpConnection extends EventEmitter { }); } + private getThriftMethodName(thriftMessage: Buffer): Promise { + return new Promise((resolve) => { + try { + const receiver = this.transport.receiver((transportWithData) => { + const Protocol = this.protocol; + const proto = new Protocol(transportWithData); + const header = proto.readMessageBegin(); + resolve(header.fname); + }, 0 /* `seqId` could be any because it's ignored */); + + receiver(thriftMessage); + } catch { + resolve(undefined); + } + }); + } + private handleThriftResponse(transportWithData: TTransport) { if (!this.client) { throw new Thrift.TApplicationException(Thrift.TApplicationExceptionType.INTERNAL_ERROR, 'Client not available'); diff --git a/lib/connection/contracts/IConnectionProvider.ts b/lib/connection/contracts/IConnectionProvider.ts index ffd8e5be..08c21385 100644 --- a/lib/connection/contracts/IConnectionProvider.ts +++ b/lib/connection/contracts/IConnectionProvider.ts @@ -1,7 +1,12 @@ import http from 'http'; -import { HeadersInit, Response } from 'node-fetch'; +import { HeadersInit, Request, Response } from 'node-fetch'; import IRetryPolicy from './IRetryPolicy'; +export interface HttpTransactionDetails { + request: Request; + response: Response; +} + export default interface IConnectionProvider { getThriftConnection(): Promise; @@ -9,5 +14,5 @@ export default interface IConnectionProvider { setHeaders(headers: HeadersInit): void; - getRetryPolicy(): Promise>; + getRetryPolicy(): Promise>; } diff --git a/lib/connection/contracts/IRetryPolicy.ts b/lib/connection/contracts/IRetryPolicy.ts index d389795f..03862846 100644 --- a/lib/connection/contracts/IRetryPolicy.ts +++ b/lib/connection/contracts/IRetryPolicy.ts @@ -10,7 +10,7 @@ export type ShouldRetryResult = export type RetryableOperation = () => Promise; export default interface IRetryPolicy { - shouldRetry(response: R): Promise; + shouldRetry(details: R): Promise; invokeWithRetry(operation: RetryableOperation): Promise; } diff --git a/lib/hive/Commands/BaseCommand.ts b/lib/hive/Commands/BaseCommand.ts index c211bc38..8a255a3a 100644 --- a/lib/hive/Commands/BaseCommand.ts +++ b/lib/hive/Commands/BaseCommand.ts @@ -19,7 +19,15 @@ export default abstract class BaseCommand { return await this.invokeCommand(request, command); } catch (error) { if (error instanceof RetryError) { - const statusCode = error.payload instanceof Response ? error.payload.status : undefined; + let statusCode: number | undefined; + if ( + error.payload && + typeof error.payload === 'object' && + 'response' in error.payload && + error.payload.response instanceof Response + ) { + statusCode = error.payload.response.status; + } switch (error.errorCode) { case RetryErrorCode.AttemptsExceeded: diff --git a/lib/result/CloudFetchResultHandler.ts b/lib/result/CloudFetchResultHandler.ts index e35cbc6f..4b2b9369 100644 --- a/lib/result/CloudFetchResultHandler.ts +++ b/lib/result/CloudFetchResultHandler.ts @@ -1,5 +1,5 @@ import LZ4 from 'lz4'; -import fetch, { RequestInfo, RequestInit } from 'node-fetch'; +import fetch, { RequestInfo, RequestInit, Request } from 'node-fetch'; import { TGetResultSetMetadataResp, TRowSet, TSparkArrowResultLink } from '../../thrift/TCLIService_types'; import IClientContext from '../contracts/IClientContext'; import IResultsProvider, { ResultsProviderFetchNextOptions } from './IResultsProvider'; @@ -77,6 +77,10 @@ export default class CloudFetchResultHandler implements IResultsProvider fetch(url, requestConfig)); + const result = await retryPolicy.invokeWithRetry(() => { + const request = new Request(url, requestConfig); + return fetch(request).then((response) => ({ request, response })); + }); + return result.response; } } diff --git a/tests/unit/connection/connections/HttpRetryPolicy.test.js b/tests/unit/connection/connections/HttpRetryPolicy.test.js new file mode 100644 index 00000000..8c0e0a31 --- /dev/null +++ b/tests/unit/connection/connections/HttpRetryPolicy.test.js @@ -0,0 +1,298 @@ +const { expect, AssertionError } = require('chai'); +const sinon = require('sinon'); +const { Request, Response } = require('node-fetch'); +const HttpRetryPolicy = require('../../../../dist/connection/connections/HttpRetryPolicy').default; +const { default: RetryError, RetryErrorCode } = require('../../../../dist/errors/RetryError'); +const DBSQLClient = require('../../../../dist/DBSQLClient').default; + +class ClientContextMock { + constructor(configOverrides) { + this.configOverrides = configOverrides; + } + + getConfig() { + const defaultConfig = DBSQLClient.getDefaultConfig(); + return { + ...defaultConfig, + ...this.configOverrides, + }; + } +} + +describe('HttpRetryPolicy', () => { + it('should properly compute backoff delay', async () => { + const context = new ClientContextMock({ retryDelayMin: 3, retryDelayMax: 20 }); + const { retryDelayMin, retryDelayMax } = context.getConfig(); + const policy = new HttpRetryPolicy(context); + + expect(policy.getBackoffDelay(0, retryDelayMin, retryDelayMax)).to.equal(3); + expect(policy.getBackoffDelay(1, retryDelayMin, retryDelayMax)).to.equal(6); + expect(policy.getBackoffDelay(2, retryDelayMin, retryDelayMax)).to.equal(12); + expect(policy.getBackoffDelay(3, retryDelayMin, retryDelayMax)).to.equal(retryDelayMax); + expect(policy.getBackoffDelay(4, retryDelayMin, retryDelayMax)).to.equal(retryDelayMax); + }); + + it('should extract delay from `Retry-After` header', async () => { + const context = new ClientContextMock({ retryDelayMin: 3, retryDelayMax: 20 }); + const { retryDelayMin } = context.getConfig(); + const policy = new HttpRetryPolicy(context); + + function createMock(headers) { + return { + request: new Request('http://localhost'), + response: new Response(undefined, { headers }), + }; + } + + // Missing `Retry-After` header + expect(policy.getRetryAfterHeader(createMock({}), retryDelayMin)).to.be.undefined; + + // Valid `Retry-After`, several header name variants + expect(policy.getRetryAfterHeader(createMock({ 'Retry-After': '10' }), retryDelayMin)).to.equal(10); + expect(policy.getRetryAfterHeader(createMock({ 'retry-after': '10' }), retryDelayMin)).to.equal(10); + expect(policy.getRetryAfterHeader(createMock({ 'RETRY-AFTER': '10' }), retryDelayMin)).to.equal(10); + + // Invalid header values (non-numeric, negative) + expect(policy.getRetryAfterHeader(createMock({ 'Retry-After': 'test' }), retryDelayMin)).to.be.undefined; + expect(policy.getRetryAfterHeader(createMock({ 'Retry-After': '-10' }), retryDelayMin)).to.be.undefined; + + // It should not be smaller than min value, but can be greater than max value + expect(policy.getRetryAfterHeader(createMock({ 'Retry-After': '1' }), retryDelayMin)).to.equal(retryDelayMin); + expect(policy.getRetryAfterHeader(createMock({ 'Retry-After': '200' }), retryDelayMin)).to.equal(200); + }); + + it('should check if HTTP transaction is safe to retry', async () => { + const policy = new HttpRetryPolicy(new ClientContextMock()); + + function createMock(status) { + return { + request: new Request('http://localhost'), + response: new Response(undefined, { status }), + }; + } + + // Status codes below 100 can be retried + for (let status = 1; status < 100; status += 1) { + expect(policy.isRetryable(createMock(status))).to.be.true; + } + + // Status codes between 100 (including) and 500 (excluding) should not be retried + // The only exception is 429 (Too many requests) + for (let status = 100; status < 500; status += 1) { + const expectedResult = status === 429 ? true : false; + expect(policy.isRetryable(createMock(status))).to.equal(expectedResult); + } + + // Status codes above 500 can be retried, except for 501 + for (let status = 500; status < 1000; status += 1) { + const expectedResult = status === 501 ? false : true; + expect(policy.isRetryable(createMock(status))).to.equal(expectedResult); + } + }); + + describe('shouldRetry', () => { + it('should not retry if transaction succeeded', async () => { + const context = new ClientContextMock({ retryMaxAttempts: 3 }); + const clientConfig = context.getConfig(); + const policy = new HttpRetryPolicy(context); + + function createMock(status) { + return { + request: new Request('http://localhost'), + response: new Response(undefined, { status }), + }; + } + + // Try several times to make sure it doesn't increment an attempts counter + for (let attempt = 1; attempt <= clientConfig.retryMaxAttempts + 1; attempt += 1) { + const result = await policy.shouldRetry(createMock(200)); + expect(result.shouldRetry).to.be.false; + expect(policy.attempt).to.equal(0); + } + + // Make sure it doesn't trigger timeout when not needed + policy.startTime = Date.now() - clientConfig.retriesTimeout * 2; + const result = await policy.shouldRetry(createMock(200)); + expect(result.shouldRetry).to.be.false; + }); + + it('should use `Retry-After` header as a base for backoff', async () => { + const context = new ClientContextMock({ retryDelayMin: 3, retryDelayMax: 100, retryMaxAttempts: 10 }); + const policy = new HttpRetryPolicy(context); + + function createMock(headers) { + return { + request: new Request('http://localhost'), + response: new Response(undefined, { status: 500, headers }), + }; + } + + const result1 = await policy.shouldRetry(createMock({ 'Retry-After': '5' })); + expect(result1.shouldRetry).to.be.true; + expect(result1.retryAfter).to.equal(10); + + const result2 = await policy.shouldRetry(createMock({ 'Retry-After': '8' })); + expect(result2.shouldRetry).to.be.true; + expect(result2.retryAfter).to.equal(32); + + policy.attempt = 4; + const result3 = await policy.shouldRetry(createMock({ 'Retry-After': '10' })); + expect(result3.shouldRetry).to.be.true; + expect(result3.retryAfter).to.equal(100); + }); + + it('should use backoff when `Retry-After` header is missing', async () => { + const context = new ClientContextMock({ + retryDelayMin: 3, + retryDelayMax: 20, + retryMaxAttempts: Number.POSITIVE_INFINITY, // remove limit on max attempts + }); + const clientConfig = context.getConfig(); + const policy = new HttpRetryPolicy(context); + + function createMock(headers) { + return { + request: new Request('http://localhost'), + response: new Response(undefined, { status: 500, headers }), + }; + } + + const result1 = await policy.shouldRetry(createMock({})); + expect(result1.shouldRetry).to.be.true; + expect(result1.retryAfter).to.equal(6); + + policy.attempt = 4; + const result2 = await policy.shouldRetry(createMock({ 'Retry-After': 'test' })); + expect(result2.shouldRetry).to.be.true; + expect(result2.retryAfter).to.equal(clientConfig.retryDelayMax); + }); + + it('should check if retry timeout reached', async () => { + const context = new ClientContextMock(); + const clientConfig = context.getConfig(); + const policy = new HttpRetryPolicy(context); + + function createMock() { + return { + request: new Request('http://localhost', { method: 'POST' }), + response: new Response(undefined, { status: 500 }), + }; + } + + const result = await policy.shouldRetry(createMock()); + expect(result.shouldRetry).to.be.true; + + // Modify start time to be in the past so the next `shouldRetry` would fail + policy.startTime = Date.now() - clientConfig.retriesTimeout * 2; + try { + await policy.shouldRetry(createMock()); + expect.fail('It should throw an error'); + } catch (error) { + if (error instanceof AssertionError) { + throw error; + } + expect(error).to.be.instanceOf(RetryError); + expect(error.errorCode).to.equal(RetryErrorCode.TimeoutExceeded); + } + }); + + it('should check if retry attempts exceeded', async () => { + const context = new ClientContextMock({ retryMaxAttempts: 3 }); + const clientConfig = context.getConfig(); + const policy = new HttpRetryPolicy(context); + + function createMock() { + return { + request: new Request('http://localhost', { method: 'POST' }), + response: new Response(undefined, { status: 500 }), + }; + } + + // First attempts should succeed + for (let attempt = 1; attempt < clientConfig.retryMaxAttempts; attempt += 1) { + const result = await policy.shouldRetry(createMock()); + expect(result.shouldRetry).to.be.true; + } + + // Modify start time to be in the past so the next `shouldRetry` would fail + try { + await policy.shouldRetry(createMock()); + expect.fail('It should throw an error'); + } catch (error) { + if (error instanceof AssertionError) { + throw error; + } + expect(error).to.be.instanceOf(RetryError); + expect(error.errorCode).to.equal(RetryErrorCode.AttemptsExceeded); + } + }); + }); + + describe('invokeWithRetry', () => { + it('should retry an operation until it succeeds', async () => { + const context = new ClientContextMock({ + retryDelayMin: 1, + retryDelayMax: 2, + retryMaxAttempts: 20, + }); + const policy = new HttpRetryPolicy(context); + + sinon.spy(policy, 'shouldRetry'); + + function createMock(status) { + return { + request: new Request('http://localhost'), + response: new Response(undefined, { status }), + }; + } + + const expectedAttempts = 3; + + const operation = sinon + .stub() + .returns(createMock(500)) + .onCall(expectedAttempts - 1) // call numbers are zero-based + .returns(createMock(200)); + + const result = await policy.invokeWithRetry(operation); + expect(policy.shouldRetry.callCount).to.equal(expectedAttempts); + expect(result.response.status).to.equal(200); + expect(operation.callCount).to.equal(expectedAttempts); + }); + + it('should stop retrying if retry limits reached', async () => { + const context = new ClientContextMock({ + retryDelayMin: 1, + retryDelayMax: 2, + retryMaxAttempts: 3, + }); + const clientConfig = context.getConfig(); + const policy = new HttpRetryPolicy(context); + + sinon.spy(policy, 'shouldRetry'); + + function createMock(status) { + return { + request: new Request('http://localhost'), + response: new Response(undefined, { status }), + }; + } + + const expectedAttempts = clientConfig.retryMaxAttempts; + + const operation = sinon.stub().returns(createMock(500)); + + try { + await policy.invokeWithRetry(operation); + expect.fail('It should throw an error'); + } catch (error) { + if (error instanceof AssertionError) { + throw error; + } + expect(error).to.be.instanceOf(RetryError); + expect(policy.shouldRetry.callCount).to.equal(expectedAttempts); + expect(operation.callCount).to.equal(expectedAttempts); + } + }); + }); +}); diff --git a/tests/unit/connection/connections/NullRetryPolicy.test.js b/tests/unit/connection/connections/NullRetryPolicy.test.js new file mode 100644 index 00000000..f804e812 --- /dev/null +++ b/tests/unit/connection/connections/NullRetryPolicy.test.js @@ -0,0 +1,31 @@ +const { expect, AssertionError } = require('chai'); +const sinon = require('sinon'); +const NullRetryPolicy = require('../../../../dist/connection/connections/NullRetryPolicy').default; + +describe('NullRetryPolicy', () => { + it('should never allow retries', async () => { + const policy = new NullRetryPolicy(); + + // make several attempts + for (let i = 0; i < 5; i += 1) { + const { shouldRetry } = await policy.shouldRetry(undefined); + expect(shouldRetry).to.be.false; + } + }); + + it('should not retry the provided callback', async () => { + const policy = new NullRetryPolicy(); + + const operation = sinon.stub().returns(Promise.reject(new Error())); + try { + await policy.invokeWithRetry(operation); + expect.fail('It should throw an error'); + } catch (error) { + if (error instanceof AssertionError) { + throw error; + } + + expect(operation.callCount).to.equal(1); + } + }); +}); diff --git a/tests/unit/hive/commands/BaseCommand.test.js b/tests/unit/hive/commands/BaseCommand.test.js index bfee82de..b7c8a48f 100644 --- a/tests/unit/hive/commands/BaseCommand.test.js +++ b/tests/unit/hive/commands/BaseCommand.test.js @@ -1,5 +1,5 @@ const { expect, AssertionError } = require('chai'); -const { Response } = require('node-fetch'); +const { Request, Response } = require('node-fetch'); const { Thrift } = require('thrift'); const HiveDriverError = require('../../../../dist/errors/HiveDriverError').default; const BaseCommand = require('../../../../dist/hive/Commands/BaseCommand').default; @@ -113,9 +113,11 @@ describe('BaseCommand', () => { const command = new CustomCommand( new ThriftClientMock(context, () => { methodCallCount += 1; - return new Response(undefined, { + const request = new Request('http://localhost/', { method: 'POST' }); + const response = new Response(undefined, { status: statusCode, }); + return { request, response }; }), context, ); @@ -150,9 +152,11 @@ describe('BaseCommand', () => { const command = new CustomCommand( new ThriftClientMock(context, () => { methodCallCount += 1; - return new Response(undefined, { + const request = new Request('http://localhost/', { method: 'POST' }); + const response = new Response(undefined, { status: statusCode, }); + return { request, response }; }), context, ); @@ -189,18 +193,21 @@ describe('BaseCommand', () => { let methodCallCount = 0; const command = new CustomCommand( new ThriftClientMock(context, () => { + const request = new Request('http://localhost/', { method: 'POST' }); + methodCallCount += 1; if (methodCallCount <= 3) { - return new Response(undefined, { + const response = new Response(undefined, { status: statusCode, }); + return { request, response }; } const response = new Response(undefined, { status: 200, }); response.body = ThriftClientMock.defaultResponse; - return response; + return { request, response }; }), context, ); diff --git a/tests/unit/result/CloudFetchResultHandler.test.js b/tests/unit/result/CloudFetchResultHandler.test.js index 96597779..3d9c67ea 100644 --- a/tests/unit/result/CloudFetchResultHandler.test.js +++ b/tests/unit/result/CloudFetchResultHandler.test.js @@ -77,14 +77,39 @@ const sampleExpiredRowSet = { ], }; +class ClientContextMock { + constructor(configOverrides) { + this.configOverrides = configOverrides; + this.fetchHandler = sinon.stub(); + + this.connectionProvider = { + getAgent: () => Promise.resolve(undefined), + getRetryPolicy: sinon.stub().returns( + Promise.resolve({ + shouldRetry: sinon.stub().returns(Promise.resolve({ shouldRetry: false })), + invokeWithRetry: sinon.stub().callsFake(() => this.fetchHandler().then((response) => ({ response }))), + }), + ), + }; + } + + getConfig() { + const defaultConfig = DBSQLClient.getDefaultConfig(); + return { + ...defaultConfig, + ...this.configOverrides, + }; + } + + getConnectionProvider() { + return Promise.resolve(this.connectionProvider); + } +} + describe('CloudFetchResultHandler', () => { it('should report pending data if there are any', async () => { + const context = new ClientContextMock({ cloudFetchConcurrentDownloads: 1 }); const rowSetProvider = new ResultsProviderMock(); - const clientConfig = DBSQLClient.getDefaultConfig(); - - const context = { - getConfig: () => clientConfig, - }; const result = new CloudFetchResultHandler(context, rowSetProvider, {}); @@ -108,20 +133,16 @@ describe('CloudFetchResultHandler', () => { }); it('should extract links from row sets', async () => { - const clientConfig = DBSQLClient.getDefaultConfig(); - clientConfig.cloudFetchConcurrentDownloads = 0; // this will prevent it from downloading batches + const context = new ClientContextMock({ cloudFetchConcurrentDownloads: 0 }); const rowSets = [sampleRowSet1, sampleEmptyRowSet, sampleRowSet2]; const expectedLinksCount = rowSets.reduce((prev, item) => prev + (item.resultLinks?.length ?? 0), 0); const rowSetProvider = new ResultsProviderMock(rowSets); - const context = { - getConfig: () => clientConfig, - }; const result = new CloudFetchResultHandler(context, rowSetProvider, {}); - sinon.stub(result, 'fetch').returns( + context.fetchHandler.returns( Promise.resolve({ ok: true, status: 200, @@ -136,12 +157,12 @@ describe('CloudFetchResultHandler', () => { expect(result.pendingLinks.length).to.be.equal(expectedLinksCount); expect(result.downloadTasks.length).to.be.equal(0); - expect(result.fetch.called).to.be.false; + expect(context.fetchHandler.called).to.be.false; }); it('should download batches according to settings', async () => { - const clientConfig = DBSQLClient.getDefaultConfig(); - clientConfig.cloudFetchConcurrentDownloads = 3; + const context = new ClientContextMock({ cloudFetchConcurrentDownloads: 3 }); + const clientConfig = context.getConfig(); const rowSet = { startRowOffset: 0, @@ -149,13 +170,10 @@ describe('CloudFetchResultHandler', () => { }; const expectedLinksCount = rowSet.resultLinks.length; // 5 const rowSetProvider = new ResultsProviderMock([rowSet]); - const context = { - getConfig: () => clientConfig, - }; const result = new CloudFetchResultHandler(context, rowSetProvider, {}); - sinon.stub(result, 'fetch').returns( + context.fetchHandler.returns( Promise.resolve({ ok: true, status: 200, @@ -173,7 +191,9 @@ describe('CloudFetchResultHandler', () => { expect(items.length).to.be.gt(0); expect(await rowSetProvider.hasMore()).to.be.false; - expect(result.fetch.callCount).to.be.equal(clientConfig.cloudFetchConcurrentDownloads); + // it should use retry policy for all requests + expect(context.connectionProvider.getRetryPolicy.called).to.be.true; + expect(context.fetchHandler.callCount).to.be.equal(clientConfig.cloudFetchConcurrentDownloads); expect(result.pendingLinks.length).to.be.equal(expectedLinksCount - clientConfig.cloudFetchConcurrentDownloads); expect(result.downloadTasks.length).to.be.equal(clientConfig.cloudFetchConcurrentDownloads - 1); } @@ -184,7 +204,9 @@ describe('CloudFetchResultHandler', () => { expect(items.length).to.be.gt(0); expect(await rowSetProvider.hasMore()).to.be.false; - expect(result.fetch.callCount).to.be.equal(clientConfig.cloudFetchConcurrentDownloads + 1); + // it should use retry policy for all requests + expect(context.connectionProvider.getRetryPolicy.called).to.be.true; + expect(context.fetchHandler.callCount).to.be.equal(clientConfig.cloudFetchConcurrentDownloads + 1); expect(result.pendingLinks.length).to.be.equal( expectedLinksCount - clientConfig.cloudFetchConcurrentDownloads - 1, ); @@ -197,7 +219,9 @@ describe('CloudFetchResultHandler', () => { expect(items.length).to.be.gt(0); expect(await rowSetProvider.hasMore()).to.be.false; - expect(result.fetch.callCount).to.be.equal(clientConfig.cloudFetchConcurrentDownloads + 2); + // it should use retry policy for all requests + expect(context.connectionProvider.getRetryPolicy.called).to.be.true; + expect(context.fetchHandler.callCount).to.be.equal(clientConfig.cloudFetchConcurrentDownloads + 2); expect(result.pendingLinks.length).to.be.equal( expectedLinksCount - clientConfig.cloudFetchConcurrentDownloads - 2, ); @@ -206,18 +230,15 @@ describe('CloudFetchResultHandler', () => { }); it('should handle LZ4 compressed data', async () => { - const clientConfig = DBSQLClient.getDefaultConfig(); + const context = new ClientContextMock(); const rowSetProvider = new ResultsProviderMock([sampleRowSet1]); - const context = { - getConfig: () => clientConfig, - }; const result = new CloudFetchResultHandler(context, rowSetProvider, { lz4Compressed: true }); const expectedBatch = Buffer.concat([sampleArrowSchema, sampleArrowBatch]); - sinon.stub(result, 'fetch').returns( + context.fetchHandler.returns( Promise.resolve({ ok: true, status: 200, @@ -231,22 +252,20 @@ describe('CloudFetchResultHandler', () => { const items = await result.fetchNext({ limit: 10000 }); expect(await rowSetProvider.hasMore()).to.be.false; - expect(result.fetch.called).to.be.true; + // it should use retry policy for all requests + expect(context.connectionProvider.getRetryPolicy.called).to.be.true; + expect(context.fetchHandler.called).to.be.true; expect(items).to.deep.eq([expectedBatch]); }); it('should handle HTTP errors', async () => { - const clientConfig = DBSQLClient.getDefaultConfig(); - clientConfig.cloudFetchConcurrentDownloads = 1; + const context = new ClientContextMock({ cloudFetchConcurrentDownloads: 1 }); const rowSetProvider = new ResultsProviderMock([sampleRowSet1]); - const context = { - getConfig: () => clientConfig, - }; const result = new CloudFetchResultHandler(context, rowSetProvider, {}); - sinon.stub(result, 'fetch').returns( + context.fetchHandler.returns( Promise.resolve({ ok: false, status: 500, @@ -263,21 +282,19 @@ describe('CloudFetchResultHandler', () => { throw error; } expect(error.message).to.contain('Internal Server Error'); - expect(result.fetch.callCount).to.be.equal(1); + // it should use retry policy for all requests + expect(context.connectionProvider.getRetryPolicy.called).to.be.true; + expect(context.fetchHandler.callCount).to.be.equal(1); } }); it('should handle expired links', async () => { + const context = new ClientContextMock(); const rowSetProvider = new ResultsProviderMock([sampleExpiredRowSet]); - const clientConfig = DBSQLClient.getDefaultConfig(); - - const context = { - getConfig: () => clientConfig, - }; const result = new CloudFetchResultHandler(context, rowSetProvider, {}); - sinon.stub(result, 'fetch').returns( + context.fetchHandler.returns( Promise.resolve({ ok: true, status: 200, @@ -298,8 +315,10 @@ describe('CloudFetchResultHandler', () => { throw error; } expect(error.message).to.contain('CloudFetch link has expired'); + // it should use retry policy for all requests + expect(context.connectionProvider.getRetryPolicy.called).to.be.true; // Row set contains a one valid and one expired link; only valid link should be requested - expect(result.fetch.callCount).to.be.equal(1); + expect(context.fetchHandler.callCount).to.be.equal(1); } }); });