Skip to content

Commit

Permalink
Merge pull request #3346 from terascope/es-api-surface-non-retry-errors
Browse files Browse the repository at this point in the history
es-api returns non-retryable records for dlq
  • Loading branch information
ciorg authored Mar 15, 2023
2 parents 2dfeb03 + fa3fd47 commit 9b3328f
Show file tree
Hide file tree
Showing 11 changed files with 190 additions and 57 deletions.
68 changes: 43 additions & 25 deletions packages/elasticsearch-api/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,7 @@ module.exports = function elasticsearchApi(client, logger, _opConfig) {
*/
function _filterRetryRecords(actionRecords, result) {
const retry = [];
const deadLetter = [];
const { items } = result;

let nonRetriableError = false;
Expand Down Expand Up @@ -309,6 +310,11 @@ module.exports = function elasticsearchApi(client, logger, _opConfig) {
) {
nonRetriableError = true;
reason = `${item.error.type}--${item.error.reason}`;

if (config._dead_letter_action === 'kafka_dead_letter') {
deadLetter.push({ doc: actionRecords[i].data, reason });
continue;
}
break;
}
} else if (item.status == null || item.status < 400) {
Expand All @@ -318,7 +324,7 @@ module.exports = function elasticsearchApi(client, logger, _opConfig) {

if (nonRetriableError) {
return {
retry: [], successful, error: true, reason
retry: [], successful, error: true, reason, deadLetter
};
}

Expand All @@ -331,7 +337,7 @@ module.exports = function elasticsearchApi(client, logger, _opConfig) {

/**
* @param data {Array<{ action: data }>}
* @returns {Promise<number>}
* @returns {Promise<{ recordCount: number, deadLetter: record[] }>}
*/
async function _bulkSend(actionRecords, previousCount = 0, previousRetryDelay = 0) {
const body = actionRecords.flatMap((record, index) => {
Expand All @@ -343,18 +349,7 @@ module.exports = function elasticsearchApi(client, logger, _opConfig) {
throw new Error(`Bulk send record is missing the action property${dbg}`);
}

if (!isElasticsearch6()) {
const actionKey = getFirstKey(record.action);
const { _type, ...withoutTypeAction } = record.action[actionKey];
// if data is specified return both
return record.data ? [{
...record.action,
[actionKey]: withoutTypeAction
}, record.data] : [{
...record.action,
[actionKey]: withoutTypeAction
}];
}
if (!isElasticsearch6()) return _nonEs6Prep(record);

// if data is specified return both
return record.data ? [record.action, record.data] : [record.action];
Expand All @@ -363,25 +358,25 @@ module.exports = function elasticsearchApi(client, logger, _opConfig) {
const response = await _clientRequest('bulk', { body });
const results = response.body ? response.body : response;

if (!results.errors) {
return results.items.reduce((c, item) => {
const [value] = Object.values(item);
// ignore non-successful status codes
if (value.status != null && value.status >= 400) return c;
return c + 1;
}, 0);
}
if (!results.errors) return { recordCount: _affectedRowsCount(results) };

const {
retry, successful, error, reason
retry, successful, error, reason, deadLetter
} = _filterRetryRecords(actionRecords, results);

if (error) {
if (config._dead_letter_action === 'kafka_dead_letter') {
return {
recordCount: previousCount + successful,
deadLetter
};
}

throw new Error(`bulk send error: ${reason}`);
}

if (retry.length === 0) {
return previousCount + successful;
return { recordCount: previousCount + successful };
}

warning();
Expand All @@ -390,10 +385,24 @@ module.exports = function elasticsearchApi(client, logger, _opConfig) {
return _bulkSend(retry, previousCount + successful, nextRetryDelay);
}

function _nonEs6Prep(record) {
const actionKey = getFirstKey(record.action);

const { _type, ...withoutTypeAction } = record.action[actionKey];
// if data is specified return both

const body = [{ ...record.action, [actionKey]: withoutTypeAction }];

if (record.data != null) body.push(record.data);

return body;
}

/**
* The new and improved bulk send with proper retry support
*
* @returns {Promise<number>} the number of affected rows
* @returns {Promise<{ recordCount: number, deadLetter: record[] }>}
* the number of affected rows and records for kafka dead letter queue
*/
function bulkSend(data) {
if (!Array.isArray(data)) {
Expand All @@ -403,6 +412,15 @@ module.exports = function elasticsearchApi(client, logger, _opConfig) {
return Promise.resolve(_bulkSend(data));
}

function _affectedRowsCount(results) {
return results.items.reduce((c, item) => {
const [value] = Object.values(item);
// ignore non-successful status codes
if (value.status != null && value.status >= 400) return c;
return c + 1;
}, 0);
}

function _warn(warnLogger, msg) {
let _lastTime = null;
return () => {
Expand Down
2 changes: 1 addition & 1 deletion packages/elasticsearch-api/package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "@terascope/elasticsearch-api",
"displayName": "Elasticsearch API",
"version": "3.3.7",
"version": "3.4.0",
"description": "Elasticsearch client api used across multiple services, handles retries and exponential backoff",
"homepage": "https://github.com/terascope/teraslice/tree/master/packages/elasticsearch-api#readme",
"bugs": {
Expand Down
4 changes: 2 additions & 2 deletions packages/elasticsearch-api/test/api-spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -768,7 +768,7 @@ describe('elasticsearch-api', () => {
{ delete: { _index: 'some_index', _type: 'events', _id: 5 } }
]
});
return expect(result).toBe(2);
return expect(result).toEqual({ recordCount: 2 });
});

it('can remove type from bulkSend', async () => {
Expand Down Expand Up @@ -877,7 +877,7 @@ describe('elasticsearch-api', () => {
});
const result = await api.bulkSend(myBulkData);

expect(result).toBe(2);
expect(result).toEqual({ recordCount: 2 });

bulkError = ['some_thing_else', 'some_thing_else'];

Expand Down
106 changes: 106 additions & 0 deletions packages/elasticsearch-api/test/bulk-send-dlq.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
'use strict';

const {
debugLogger,
cloneDeep,
DataEntity
} = require('@terascope/utils');
const { ElasticsearchTestHelpers } = require('elasticsearch-store');
const elasticsearchAPI = require('../index');

const {
makeClient, cleanupIndex,
EvenDateData, TEST_INDEX_PREFIX,
createMappingFromDatatype
} = ElasticsearchTestHelpers;

jest.setTimeout(10000);

function formatUploadData(
index, data, isES8ClientTest = false
) {
const results = [];

data.forEach((record, i) => {
const meta = { _index: index, _id: i + 1 };

if (!isES8ClientTest) {
meta._type = '_doc';
}

results.push({ action: { index: meta }, data: record });
});

return results;
}

describe('bulkSend', () => {
let client;
let api;
let isElasticsearch8 = false;

beforeAll(async () => {
client = await makeClient();
});

describe('can return non-retryable records', () => {
const logger = debugLogger('congested_test');
const index = `${TEST_INDEX_PREFIX}_non-retryable-records`;

beforeAll(async () => {
await cleanupIndex(client, index);
api = elasticsearchAPI(client, logger, { _dead_letter_action: 'kafka_dead_letter' });
isElasticsearch8 = api.isElasticsearch8();

const overrides = {
settings: {
'index.number_of_shards': 1,
'index.number_of_replicas': 0,
},
};

const mapping = await createMappingFromDatatype(
client, EvenDateData.EvenDataType, '_doc', overrides
);

mapping.index = index;

await client.indices.create(mapping);
});

afterAll(async () => {
await cleanupIndex(client, index);
});

it('returns records that cannot be tried again if dlq config is set', async () => {
const docs = cloneDeep(EvenDateData.data.slice(0, 2));

docs[0].bytes = 'this is a bad value';

const result = await api.bulkSend(formatUploadData(index, docs, isElasticsearch8));

expect(result.recordCount).toBe(1);

expect(result.deadLetter[0].doc).toEqual(DataEntity.make({
ip: '120.67.248.156',
userAgent: 'Mozilla/5.0 (Windows; U; Windows NT 6.1) AppleWebKit/533.1.2 (KHTML, like Gecko) Chrome/35.0.894.0 Safari/533.1.2',
url: 'http://lucious.biz',
uuid: 'b23a8550-0081-453f-9e80-93a90782a5bd',
created: '2019-04-26T15:00:23.225+00:00',
ipv6: '9e79:7798:585a:b847:f1c4:81eb:0c3d:7eb8',
location: '50.15003, -94.89355',
bytes: 'this is a bad value'
}));

expect(result.deadLetter[0].reason).toBeDefined();
});

it('should return a count if not un-retryable records', async () => {
const docs = cloneDeep(EvenDateData.data.slice(0, 2));

const result = await api.bulkSend(formatUploadData(index, docs, isElasticsearch8));

expect(result).toEqual({ recordCount: 2 });
});
});
});
47 changes: 28 additions & 19 deletions packages/elasticsearch-api/test/bulk-send-limit-spec.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
'use strict';

const { debugLogger, chunk, pMap } = require('@terascope/utils');
const {
debugLogger,
chunk,
pMap
} = require('@terascope/utils');
const { ElasticsearchTestHelpers } = require('elasticsearch-store');
const elasticsearchAPI = require('../index');

Expand All @@ -11,7 +15,7 @@ const {

const THREE_MINUTES = 3 * 60 * 1000;

jest.setTimeout(THREE_MINUTES + 30000);
jest.setTimeout(THREE_MINUTES + 60000);

function formatUploadData(
index, data, isES8ClientTest = false
Expand All @@ -31,33 +35,38 @@ function formatUploadData(
return results;
}

describe('bulkSend can work with congested queues', () => {
const logger = debugLogger('congested_test');
const index = `${TEST_INDEX_PREFIX}_congested_queues_`;

describe('bulkSend', () => {
let client;
let api;
let isElasticsearch8 = false;

beforeAll(async () => {
client = await makeClient();
await cleanupIndex(client, index);
api = elasticsearchAPI(client, logger);
isElasticsearch8 = api.isElasticsearch8();
});

afterAll(async () => {
await cleanupIndex(client, index);
});
describe('can work with congested queues', () => {
const logger = debugLogger('congested_test');
const index = `${TEST_INDEX_PREFIX}_congested_queues_`;

beforeAll(async () => {
await cleanupIndex(client, index);
api = elasticsearchAPI(client, logger);
isElasticsearch8 = api.isElasticsearch8();
});

afterAll(async () => {
await cleanupIndex(client, index);
});

it('can get correct data even with congested queues', async () => {
const chunkedData = chunk(EvenDateData.data, 50);
it('can get correct data even with congested queues', async () => {
const chunkedData = chunk(EvenDateData.data, 50);

await pMap(chunkedData, async (cData) => {
const formattedData = formatUploadData(index, cData, isElasticsearch8);
return api.bulkSend(formattedData);
}, { concurrency: 9 });
await pMap(chunkedData, async (cData) => {
const formattedData = formatUploadData(index, cData, isElasticsearch8);
return api.bulkSend(formattedData);
}, { concurrency: 9 });

await waitForData(client, index, EvenDateData.data.length, logger, THREE_MINUTES);
await waitForData(client, index, EvenDateData.data.length, logger, THREE_MINUTES);
});
});
});
6 changes: 3 additions & 3 deletions packages/elasticsearch-api/types/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ declare namespace elasticsearchAPI {
/**
* The new and improved bulk send with proper retry support
*
* @returns the number of affected rows
* @returns the number of affected rows, and deadLetter records if config is set
*/
bulkSend: (data: BulkRecord[]) => Promise<number>;
bulkSend: (data: BulkRecord[]) => Promise<{ recordCount: number; deadLetter?: any[]; }>;
nodeInfo: (query: any) => Promise<any>;
nodeStats: (query: any) => Promise<any>;
buildQuery: (opConfig: Config, msg: any) => ClientParams.SearchParams;
Expand Down Expand Up @@ -62,7 +62,7 @@ declare namespace elasticsearchAPI {
/**
* This is used for improved bulk sending function
*/
export interface AnyBulkAction {
export interface AnyBulkAction {
update?: Partial<BulkActionMetadata>;
index?: Partial<BulkActionMetadata>;
create?: Partial<BulkActionMetadata>;
Expand Down
4 changes: 2 additions & 2 deletions packages/teraslice-state-storage/package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "@terascope/teraslice-state-storage",
"displayName": "Teraslice State Storage",
"version": "0.36.7",
"version": "0.37.0",
"description": "State storage operation api for teraslice",
"homepage": "https://github.com/terascope/teraslice/tree/master/packages/teraslice-state-storage#readme",
"bugs": {
Expand All @@ -23,7 +23,7 @@
"test:watch": "ts-scripts test --watch . --"
},
"dependencies": {
"@terascope/elasticsearch-api": "^3.3.7",
"@terascope/elasticsearch-api": "^3.4.0",
"@terascope/utils": "^0.45.5"
},
"engines": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ module.exports = async function elasticsearchStorage(backendConfig) {
bulkQueue = [];

try {
const recordCount = await bulkSend(bulkRequest);
const { recordCount } = await bulkSend(bulkRequest);
const extraMsg = shuttingDown ? ', on shutdown' : '';
logger.debug(`flushed ${recordCount}${extraMsg} records to index ${indexName}`);
} finally {
Expand Down
Loading

0 comments on commit 9b3328f

Please sign in to comment.