Skip to content

Commit

Permalink
[PECO-729] Improve retry behavior (#230)
Browse files Browse the repository at this point in the history
* [PECO-729] Respect `Retry-After` header with falling back to backoff algorithm

Signed-off-by: Levko Kravets <[email protected]>

* [PECO-729] Extend list of HTTP status codes that could be retried

Signed-off-by: Levko Kravets <[email protected]>

* Pass `Request` object in addition to `Response` to `HttpRetryPolicy`

Signed-off-by: Levko Kravets <[email protected]>

* [PECO-729] Retry only idempotent requests (HTTP GET + restricted set of Thrift operations)

Signed-off-by: Levko Kravets <[email protected]>

* Update HttpRetryPolicy logic; add/update tests

Signed-off-by: Levko Kravets <[email protected]>

* Reduce max retry attempts to 5

Signed-off-by: Levko Kravets <[email protected]>

* Use `Retry-After` as a base for backoff, not instead of it

Signed-off-by: Levko Kravets <[email protected]>

---------

Signed-off-by: Levko Kravets <[email protected]>
  • Loading branch information
kravets-levko authored Mar 18, 2024
1 parent 08bbdfc commit 6673660
Show file tree
Hide file tree
Showing 13 changed files with 566 additions and 108 deletions.
8 changes: 4 additions & 4 deletions lib/DBSQLClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,10 @@ export default class DBSQLClient extends EventEmitter implements IDBSQLClient, I
useArrowNativeTypes: true,
socketTimeout: 15 * 60 * 1000, // 15 minutes

retryMaxAttempts: 30,
retriesTimeout: 900 * 1000,
retryDelayMin: 1 * 1000,
retryDelayMax: 60 * 1000,
retryMaxAttempts: 5,
retriesTimeout: 15 * 60 * 1000, // 15 minutes
retryDelayMin: 1 * 1000, // 1 second
retryDelayMax: 60 * 1000, // 60 seconds (1 minute)

useCloudFetch: false,
cloudFetchConcurrentDownloads: 10,
Expand Down
6 changes: 3 additions & 3 deletions lib/connection/connections/HttpConnection.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import thrift from 'thrift';
import https from 'https';
import http from 'http';
import { HeadersInit, Response } from 'node-fetch';
import { HeadersInit } from 'node-fetch';
import { ProxyAgent } from 'proxy-agent';

import IConnectionProvider from '../contracts/IConnectionProvider';
import IConnectionProvider, { HttpTransactionDetails } from '../contracts/IConnectionProvider';
import IConnectionOptions, { ProxyOptions } from '../contracts/IConnectionOptions';
import IClientContext from '../../contracts/IClientContext';

Expand Down Expand Up @@ -120,7 +120,7 @@ export default class HttpConnection implements IConnectionProvider {
return this.connection;
}

public async getRetryPolicy(): Promise<IRetryPolicy<Response>> {
public async getRetryPolicy(): Promise<IRetryPolicy<HttpTransactionDetails>> {
return new HttpRetryPolicy(this.context);
}
}
105 changes: 64 additions & 41 deletions lib/connection/connections/HttpRetryPolicy.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,15 @@
import { Response } from 'node-fetch';
import IRetryPolicy, { ShouldRetryResult, RetryableOperation } from '../contracts/IRetryPolicy';
import IClientContext, { ClientConfig } from '../../contracts/IClientContext';
import { HttpTransactionDetails } from '../contracts/IConnectionProvider';
import IClientContext from '../../contracts/IClientContext';
import RetryError, { RetryErrorCode } from '../../errors/RetryError';

function getRetryDelay(attempt: number, config: ClientConfig): number {
const scale = Math.max(1, 1.5 ** (attempt - 1)); // ensure scale >= 1
return Math.min(config.retryDelayMin * scale, config.retryDelayMax);
}

function delay(milliseconds: number): Promise<void> {
return new Promise<void>((resolve) => {
setTimeout(() => resolve(), milliseconds);
});
}

export default class HttpRetryPolicy implements IRetryPolicy<Response> {
export default class HttpRetryPolicy implements IRetryPolicy<HttpTransactionDetails> {
private context: IClientContext;

private readonly startTime: number; // in milliseconds
Expand All @@ -27,53 +22,81 @@ export default class HttpRetryPolicy implements IRetryPolicy<Response> {
this.attempt = 0;
}

public async shouldRetry(response: Response): Promise<ShouldRetryResult> {
if (!response.ok) {
switch (response.status) {
// On these status codes it's safe to retry the request. However,
// both error codes mean that server is overwhelmed or even down.
// Therefore, we need to add some delay between attempts so
// server can recover and more likely handle next request
case 429: // Too Many Requests
case 503: // Service Unavailable
this.attempt += 1;

const clientConfig = this.context.getConfig();
public async shouldRetry(details: HttpTransactionDetails): Promise<ShouldRetryResult> {
if (this.isRetryable(details)) {
const clientConfig = this.context.getConfig();

// Delay interval depends on current attempt - the more attempts we do
// the longer the interval will be
// TODO: Respect `Retry-After` header (PECO-729)
const retryDelay = getRetryDelay(this.attempt, clientConfig);

const attemptsExceeded = this.attempt >= clientConfig.retryMaxAttempts;
if (attemptsExceeded) {
throw new RetryError(RetryErrorCode.AttemptsExceeded, response);
}
// Don't retry if overall retry timeout exceeded
const timeoutExceeded = Date.now() - this.startTime >= clientConfig.retriesTimeout;
if (timeoutExceeded) {
throw new RetryError(RetryErrorCode.TimeoutExceeded, details);
}

const timeoutExceeded = Date.now() - this.startTime + retryDelay >= clientConfig.retriesTimeout;
if (timeoutExceeded) {
throw new RetryError(RetryErrorCode.TimeoutExceeded, response);
}
this.attempt += 1;

return { shouldRetry: true, retryAfter: retryDelay };
// Don't retry if max attempts count reached
const attemptsExceeded = this.attempt >= clientConfig.retryMaxAttempts;
if (attemptsExceeded) {
throw new RetryError(RetryErrorCode.AttemptsExceeded, details);
}

// TODO: Here we should handle other error types (see PECO-730)
// If possible, use `Retry-After` header as a floor for a backoff algorithm
const retryAfterHeader = this.getRetryAfterHeader(details, clientConfig.retryDelayMin);
const retryAfter = this.getBackoffDelay(
this.attempt,
retryAfterHeader ?? clientConfig.retryDelayMin,
clientConfig.retryDelayMax,
);

// no default
}
return { shouldRetry: true, retryAfter };
}

return { shouldRetry: false };
}

public async invokeWithRetry(operation: RetryableOperation<Response>): Promise<Response> {
public async invokeWithRetry(operation: RetryableOperation<HttpTransactionDetails>): Promise<HttpTransactionDetails> {
for (;;) {
const response = await operation(); // eslint-disable-line no-await-in-loop
const status = await this.shouldRetry(response); // eslint-disable-line no-await-in-loop
const details = await operation(); // eslint-disable-line no-await-in-loop
const status = await this.shouldRetry(details); // eslint-disable-line no-await-in-loop
if (!status.shouldRetry) {
return response;
return details;
}
await delay(status.retryAfter); // eslint-disable-line no-await-in-loop
}
}

protected isRetryable({ response }: HttpTransactionDetails): boolean {
const statusCode = response.status;

const result =
// Retry on all codes below 100
statusCode < 100 ||
// ...and on `429 Too Many Requests`
statusCode === 429 ||
// ...and on all `5xx` codes except for `501 Not Implemented`
(statusCode >= 500 && statusCode !== 501);

return result;
}

protected getRetryAfterHeader({ response }: HttpTransactionDetails, delayMin: number): number | undefined {
// `Retry-After` header may contain a date after which to retry, or delay seconds. We support only delay seconds.
// Value from `Retry-After` header is used when:
// 1. it's available and is non-empty
// 2. it could be parsed as a number, and is greater than zero
// 3. additionally, we clamp it to not be smaller than minimal retry delay
const header = response.headers.get('Retry-After') || '';
if (header !== '') {
const value = Number(header);
if (Number.isFinite(value) && value > 0) {
return Math.max(delayMin, value);
}
}
return undefined;
}

protected getBackoffDelay(attempt: number, delayMin: number, delayMax: number): number {
const value = 2 ** attempt * delayMin;
return Math.min(value, delayMax);
}
}
13 changes: 13 additions & 0 deletions lib/connection/connections/NullRetryPolicy.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import IRetryPolicy, { ShouldRetryResult, RetryableOperation } from '../contracts/IRetryPolicy';

export default class NullRetryPolicy<R> implements IRetryPolicy<R> {
// eslint-disable-next-line @typescript-eslint/no-unused-vars
public async shouldRetry(details: R): Promise<ShouldRetryResult> {
return { shouldRetry: false };
}

public async invokeWithRetry(operation: RetryableOperation<R>): Promise<R> {
// Just invoke the operation, don't attempt to retry it
return operation();
}
}
66 changes: 58 additions & 8 deletions lib/connection/connections/ThriftHttpConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@

import { EventEmitter } from 'events';
import { TBinaryProtocol, TBufferedTransport, Thrift, TProtocol, TProtocolConstructor, TTransport } from 'thrift';
import fetch, { RequestInit, HeadersInit, Response, FetchError } from 'node-fetch';
import fetch, { RequestInit, HeadersInit, Request, Response, FetchError } from 'node-fetch';
// @ts-expect-error TS7016: Could not find a declaration file for module
import InputBufferUnderrunError from 'thrift/lib/nodejs/lib/thrift/input_buffer_underrun_error';
import IRetryPolicy from '../contracts/IRetryPolicy';
import { HttpTransactionDetails } from '../contracts/IConnectionProvider';
import NullRetryPolicy from './NullRetryPolicy';

export class THTTPException extends Thrift.TApplicationException {
public readonly statusCode: unknown;
Expand All @@ -32,7 +34,7 @@ interface ThriftHttpConnectionOptions {
url: string;
transport?: TTransportType;
protocol?: TProtocolConstructor;
getRetryPolicy(): Promise<IRetryPolicy<Response>>;
getRetryPolicy(): Promise<IRetryPolicy<HttpTransactionDetails>>;
}

// This type describes a shape of internals of Thrift client object.
Expand All @@ -47,29 +49,56 @@ type ThriftClient = {
[key: string]: (input: TProtocol, mtype: Thrift.MessageType, seqId: number) => void;
};

const retryableThriftMethods = new Set([
'GetOperationStatus',
'CancelOperation',
'CloseOperation',
'GetResultSetMetadata',
'CloseSession',
'GetInfo',
'GetTypeInfo',
'GetCatalogs',
'GetSchemas',
'GetTables',
'GetTableTypes',
'GetColumns',
'GetFunctions',
'GetPrimaryKeys',
'GetCrossReference',
]);

export default class ThriftHttpConnection extends EventEmitter {
private readonly url: string;

private config: RequestInit;

private options: ThriftHttpConnectionOptions;

// This field is used by Thrift internally, so name and type are important
private readonly transport: TTransportType;

// This field is used by Thrift internally, so name and type are important
private readonly protocol: TProtocolConstructor;

private readonly getRetryPolicy: () => Promise<IRetryPolicy<Response>>;

// thrift.createClient sets this field internally
public client?: ThriftClient;

constructor(options: ThriftHttpConnectionOptions, config: RequestInit = {}) {
super();
this.url = options.url;
this.config = config;
this.options = options;
this.transport = options.transport ?? TBufferedTransport;
this.protocol = options.protocol ?? TBinaryProtocol;
this.getRetryPolicy = options.getRetryPolicy;
}

protected async getRetryPolicy(thriftMethodName?: string): Promise<IRetryPolicy<HttpTransactionDetails>> {
// Allow retry behavior only for Thrift operations that are for sure safe to retry
if (thriftMethodName && retryableThriftMethods.has(thriftMethodName)) {
return this.options.getRetryPolicy();
}
// Don't retry everything that is not explicitly allowed to retry
return new NullRetryPolicy();
}

public setHeaders(headers: HeadersInit) {
Expand All @@ -92,12 +121,16 @@ export default class ThriftHttpConnection extends EventEmitter {
body: data,
};

this.getRetryPolicy()
this.getThriftMethodName(data)
.then((thriftMethod) => this.getRetryPolicy(thriftMethod))
.then((retryPolicy) => {
const makeRequest = () => fetch(this.url, requestConfig);
const makeRequest = () => {
const request = new Request(this.url, requestConfig);
return fetch(request).then((response) => ({ request, response }));
};
return retryPolicy.invokeWithRetry(makeRequest);
})
.then((response) => {
.then(({ response }) => {
if (response.status !== 200) {
throw new THTTPException(response);
}
Expand Down Expand Up @@ -131,6 +164,23 @@ export default class ThriftHttpConnection extends EventEmitter {
});
}

private getThriftMethodName(thriftMessage: Buffer): Promise<string | undefined> {
return new Promise((resolve) => {
try {
const receiver = this.transport.receiver((transportWithData) => {
const Protocol = this.protocol;
const proto = new Protocol(transportWithData);
const header = proto.readMessageBegin();
resolve(header.fname);
}, 0 /* `seqId` could be any because it's ignored */);

receiver(thriftMessage);
} catch {
resolve(undefined);
}
});
}

private handleThriftResponse(transportWithData: TTransport) {
if (!this.client) {
throw new Thrift.TApplicationException(Thrift.TApplicationExceptionType.INTERNAL_ERROR, 'Client not available');
Expand Down
9 changes: 7 additions & 2 deletions lib/connection/contracts/IConnectionProvider.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
import http from 'http';
import { HeadersInit, Response } from 'node-fetch';
import { HeadersInit, Request, Response } from 'node-fetch';
import IRetryPolicy from './IRetryPolicy';

export interface HttpTransactionDetails {
request: Request;
response: Response;
}

export default interface IConnectionProvider {
getThriftConnection(): Promise<any>;

getAgent(): Promise<http.Agent>;

setHeaders(headers: HeadersInit): void;

getRetryPolicy(): Promise<IRetryPolicy<Response>>;
getRetryPolicy(): Promise<IRetryPolicy<HttpTransactionDetails>>;
}
2 changes: 1 addition & 1 deletion lib/connection/contracts/IRetryPolicy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ export type ShouldRetryResult =
export type RetryableOperation<R> = () => Promise<R>;

export default interface IRetryPolicy<R> {
shouldRetry(response: R): Promise<ShouldRetryResult>;
shouldRetry(details: R): Promise<ShouldRetryResult>;

invokeWithRetry(operation: RetryableOperation<R>): Promise<R>;
}
10 changes: 9 additions & 1 deletion lib/hive/Commands/BaseCommand.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,15 @@ export default abstract class BaseCommand {
return await this.invokeCommand<Response>(request, command);
} catch (error) {
if (error instanceof RetryError) {
const statusCode = error.payload instanceof Response ? error.payload.status : undefined;
let statusCode: number | undefined;
if (
error.payload &&
typeof error.payload === 'object' &&
'response' in error.payload &&
error.payload.response instanceof Response
) {
statusCode = error.payload.response.status;
}

switch (error.errorCode) {
case RetryErrorCode.AttemptsExceeded:
Expand Down
8 changes: 6 additions & 2 deletions lib/result/CloudFetchResultHandler.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import LZ4 from 'lz4';
import fetch, { RequestInfo, RequestInit } from 'node-fetch';
import fetch, { RequestInfo, RequestInit, Request } from 'node-fetch';
import { TGetResultSetMetadataResp, TRowSet, TSparkArrowResultLink } from '../../thrift/TCLIService_types';
import IClientContext from '../contracts/IClientContext';
import IResultsProvider, { ResultsProviderFetchNextOptions } from './IResultsProvider';
Expand Down Expand Up @@ -77,6 +77,10 @@ export default class CloudFetchResultHandler implements IResultsProvider<Array<B
const retryPolicy = await connectionProvider.getRetryPolicy();

const requestConfig: RequestInit = { agent, ...init };
return retryPolicy.invokeWithRetry(() => fetch(url, requestConfig));
const result = await retryPolicy.invokeWithRetry(() => {
const request = new Request(url, requestConfig);
return fetch(request).then((response) => ({ request, response }));
});
return result.response;
}
}
Loading

0 comments on commit 6673660

Please sign in to comment.