From 6d46e003b2600f5fa9dd64827db1b48724c301ec Mon Sep 17 00:00:00 2001 From: Charlie Iorg Date: Fri, 10 Mar 2023 10:28:06 -0700 Subject: [PATCH 01/10] updated es-api types --- packages/elasticsearch-api/index.js | 43 +++++++++++++++++++-- packages/elasticsearch-api/types/index.d.ts | 4 +- 2 files changed, 42 insertions(+), 5 deletions(-) diff --git a/packages/elasticsearch-api/index.js b/packages/elasticsearch-api/index.js index 7c19a1e7569..2e854516673 100644 --- a/packages/elasticsearch-api/index.js +++ b/packages/elasticsearch-api/index.js @@ -278,6 +278,7 @@ module.exports = function elasticsearchApi(client, logger, _opConfig) { */ function _filterRetryRecords(actionRecords, result) { const retry = []; + const deadLetter = []; const { items } = result; let nonRetriableError = false; @@ -309,6 +310,16 @@ module.exports = function elasticsearchApi(client, logger, _opConfig) { ) { nonRetriableError = true; reason = `${item.error.type}--${item.error.reason}`; + + // caused by is not always present in error reply, but is useful if it is + if (item.error.caused_by) { + reason += `${item.error.caused_by.type}: ${item.error.caused_by.reason}`; + } + + if (config._dead_letter_action === 'kafka_dead_letter') { + deadLetter.push({ doc: actionRecords[i], reason }); + continue; + } break; } } else if (item.status == null || item.status < 400) { @@ -316,9 +327,26 @@ module.exports = function elasticsearchApi(client, logger, _opConfig) { } } + /** + item { + "_index": "generated-data-v1", + "_type": "_doc", + "_id": "Hwwwwwwww", + "status": 400, + "error": { + "type": "mapper_parsing_exception", + "reason": "failed to parse field [bytes] of type [long] in document with id 'Hwwwwwwww'", + "caused_by": { + "type": "illegal_argument_exception", + "reason": "For input string: \"oogahboogah\"" + } + } + } + */ + if (nonRetriableError) { return { - retry: [], successful, error: true, reason + retry: [], successful, error: true, reason, deadLetter }; } @@ -373,15 +401,24 @@ module.exports = function elasticsearchApi(client, logger, _opConfig) { } const { - retry, successful, error, reason + retry, successful, error, reason, deadLetter } = _filterRetryRecords(actionRecords, results); if (error) { + if (config._dead_letter_action === 'kafka_dead_letter') { + return { + count: previousCount + successful, + deadLetter + }; + } + throw new Error(`bulk send error: ${reason}`); } if (retry.length === 0) { - return previousCount + successful; + return { + count: previousCount + successful + }; } warning(); diff --git a/packages/elasticsearch-api/types/index.d.ts b/packages/elasticsearch-api/types/index.d.ts index f4222410064..49255b21113 100644 --- a/packages/elasticsearch-api/types/index.d.ts +++ b/packages/elasticsearch-api/types/index.d.ts @@ -32,9 +32,9 @@ declare namespace elasticsearchAPI { /** * The new and improved bulk send with proper retry support * - * @returns the number of affected rows + * @returns the number of affected rows, and deadLetter records if config is set */ - bulkSend: (data: BulkRecord[]) => Promise; + bulkSend: (data: BulkRecord[]) => Promise<{ count: number; deadLetter?: any[]; }>; nodeInfo: (query: any) => Promise; nodeStats: (query: any) => Promise; buildQuery: (opConfig: Config, msg: any) => ClientParams.SearchParams; From c9138ac0a92a932cb379f6d820d7cb6cc7747d3d Mon Sep 17 00:00:00 2001 From: Charlie Iorg Date: Fri, 10 Mar 2023 15:23:19 -0700 Subject: [PATCH 02/10] updates and tests --- packages/elasticsearch-api/index.js | 45 ++---- packages/elasticsearch-api/test/api-spec.js | 4 +- .../test/bulk-send-additional-spec.js | 141 ++++++++++++++++++ .../test/bulk-send-limit-spec.js | 63 -------- packages/elasticsearch-api/types/index.d.ts | 2 +- .../storage/backends/elasticsearch_store.js | 2 +- 6 files changed, 159 insertions(+), 98 deletions(-) create mode 100644 packages/elasticsearch-api/test/bulk-send-additional-spec.js delete mode 100644 packages/elasticsearch-api/test/bulk-send-limit-spec.js diff --git a/packages/elasticsearch-api/index.js b/packages/elasticsearch-api/index.js index 2e854516673..4f5181fc19e 100644 --- a/packages/elasticsearch-api/index.js +++ b/packages/elasticsearch-api/index.js @@ -13,7 +13,7 @@ const { get, toNumber, isString, isSimpleObject, castArray, flatten, toBoolean, uniq, random, cloneDeep, DataEntity, - isDeepEqual, getTypeOf, isProd + isDeepEqual, getTypeOf, isProd, has } = require('@terascope/utils'); const { ElasticsearchDistribution } = require('@terascope/types'); @@ -311,13 +311,11 @@ module.exports = function elasticsearchApi(client, logger, _opConfig) { nonRetriableError = true; reason = `${item.error.type}--${item.error.reason}`; - // caused by is not always present in error reply, but is useful if it is - if (item.error.caused_by) { - reason += `${item.error.caused_by.type}: ${item.error.caused_by.reason}`; - } + // caused_by is not always present in error msg, but useful if it is there + if (has(item.error, 'caused_by.reason')) reason += `--${item.error.caused_by.reason}`; if (config._dead_letter_action === 'kafka_dead_letter') { - deadLetter.push({ doc: actionRecords[i], reason }); + deadLetter.push({ doc: actionRecords[i].data, reason }); continue; } break; @@ -327,23 +325,6 @@ module.exports = function elasticsearchApi(client, logger, _opConfig) { } } - /** - item { - "_index": "generated-data-v1", - "_type": "_doc", - "_id": "Hwwwwwwww", - "status": 400, - "error": { - "type": "mapper_parsing_exception", - "reason": "failed to parse field [bytes] of type [long] in document with id 'Hwwwwwwww'", - "caused_by": { - "type": "illegal_argument_exception", - "reason": "For input string: \"oogahboogah\"" - } - } - } - */ - if (nonRetriableError) { return { retry: [], successful, error: true, reason, deadLetter @@ -392,12 +373,14 @@ module.exports = function elasticsearchApi(client, logger, _opConfig) { const results = response.body ? response.body : response; if (!results.errors) { - return results.items.reduce((c, item) => { - const [value] = Object.values(item); - // ignore non-successful status codes - if (value.status != null && value.status >= 400) return c; - return c + 1; - }, 0); + return { + recordCount: results.items.reduce((c, item) => { + const [value] = Object.values(item); + // ignore non-successful status codes + if (value.status != null && value.status >= 400) return c; + return c + 1; + }, 0) + }; } const { @@ -407,7 +390,7 @@ module.exports = function elasticsearchApi(client, logger, _opConfig) { if (error) { if (config._dead_letter_action === 'kafka_dead_letter') { return { - count: previousCount + successful, + recordCount: previousCount + successful, deadLetter }; } @@ -417,7 +400,7 @@ module.exports = function elasticsearchApi(client, logger, _opConfig) { if (retry.length === 0) { return { - count: previousCount + successful + recordCount: previousCount + successful }; } diff --git a/packages/elasticsearch-api/test/api-spec.js b/packages/elasticsearch-api/test/api-spec.js index e63b90b7f9d..0d0f0bcd35a 100644 --- a/packages/elasticsearch-api/test/api-spec.js +++ b/packages/elasticsearch-api/test/api-spec.js @@ -768,7 +768,7 @@ describe('elasticsearch-api', () => { { delete: { _index: 'some_index', _type: 'events', _id: 5 } } ] }); - return expect(result).toBe(2); + return expect(result).toEqual({ recordCount: 2 }); }); it('can remove type from bulkSend', async () => { @@ -877,7 +877,7 @@ describe('elasticsearch-api', () => { }); const result = await api.bulkSend(myBulkData); - expect(result).toBe(2); + expect(result).toEqual({ recordCount: 2 }); bulkError = ['some_thing_else', 'some_thing_else']; diff --git a/packages/elasticsearch-api/test/bulk-send-additional-spec.js b/packages/elasticsearch-api/test/bulk-send-additional-spec.js new file mode 100644 index 00000000000..6d0a354c0b6 --- /dev/null +++ b/packages/elasticsearch-api/test/bulk-send-additional-spec.js @@ -0,0 +1,141 @@ +'use strict'; + +const { + debugLogger, + chunk, + pMap, + DataEntity, + cloneDeep +} = require('@terascope/utils'); +const { ElasticsearchTestHelpers } = require('elasticsearch-store'); +const elasticsearchAPI = require('../index'); + +const { + makeClient, cleanupIndex, waitForData, + EvenDateData, TEST_INDEX_PREFIX, + createMappingFromDatatype +} = ElasticsearchTestHelpers; + +const THREE_MINUTES = 3 * 60 * 1000; + +jest.setTimeout(THREE_MINUTES + 60000); + +function formatUploadData( + index, data, isES8ClientTest = false +) { + const results = []; + + data.forEach((record, i) => { + const meta = { _index: index, _id: i + 1 }; + + if (!isES8ClientTest) { + meta._type = '_doc'; + } + + results.push({ action: { index: meta }, data: record }); + }); + + return results; +} + +describe('bulkSend', () => { + let client; + let api; + let isElasticsearch8 = false; + + beforeAll(async () => { + client = await makeClient(); + }); + + describe('can work with congested queues', () => { + const logger = debugLogger('congested_test'); + const index = `${TEST_INDEX_PREFIX}_congested_queues_`; + + beforeAll(async () => { + await cleanupIndex(client, index); + api = elasticsearchAPI(client, logger); + isElasticsearch8 = api.isElasticsearch8(); + }); + + afterAll(async () => { + await cleanupIndex(client, index); + }); + + it('can get correct data even with congested queues', async () => { + const chunkedData = chunk(EvenDateData.data, 50); + + await pMap(chunkedData, async (cData) => { + const formattedData = formatUploadData(index, cData, isElasticsearch8); + return api.bulkSend(formattedData); + }, { concurrency: 9 }); + + await waitForData(client, index, EvenDateData.data.length, logger, THREE_MINUTES); + }); + }); + + describe('can return non-retryable records', () => { + const logger = debugLogger('congested_test'); + const index = `${TEST_INDEX_PREFIX}_non-retryable-records`; + + beforeAll(async () => { + await cleanupIndex(client, index); + api = elasticsearchAPI(client, logger, { _dead_letter_action: 'kafka_dead_letter' }); + isElasticsearch8 = api.isElasticsearch8(); + + const overrides = { + settings: { + 'index.number_of_shards': 1, + 'index.number_of_replicas': 0, + }, + }; + + const mapping = await createMappingFromDatatype( + client, EvenDateData.EvenDataType, '_doc', overrides + ); + + mapping.index = index; + + await client.indices.create(mapping); + }); + + afterAll(async () => { + await cleanupIndex(client, index); + }); + + it('returns records that cannot be tried again and the reason', async () => { + const docs = cloneDeep(EvenDateData.data.slice(0, 2)); + + docs[0].bytes = 'this is a bad value'; + + const result = await api.bulkSend(formatUploadData(index, docs, isElasticsearch8)); + + expect(result).toEqual({ + recordCount: 1, + deadLetter: [ + { + doc: DataEntity.make({ + ip: '120.67.248.156', + userAgent: 'Mozilla/5.0 (Windows; U; Windows NT 6.1) AppleWebKit/533.1.2 (KHTML, like Gecko) Chrome/35.0.894.0 Safari/533.1.2', + url: 'http://lucious.biz', + uuid: 'b23a8550-0081-453f-9e80-93a90782a5bd', + created: '2019-04-26T15:00:23.225+00:00', + ipv6: '9e79:7798:585a:b847:f1c4:81eb:0c3d:7eb8', + location: '50.15003, -94.89355', + bytes: 'this is a bad value' + }), + // eslint-disable-next-line no-useless-escape, quotes + reason: `mapper_parsing_exception--failed to parse field [bytes] of type [integer] in document with id '1'--For input string: \"this is a bad value\"` + } + ] + }); + }); + + it('should return a count if not un-retryable records', async () => { + const docs = cloneDeep(EvenDateData.data.slice(0, 2)); + + const result = await api.bulkSend(formatUploadData(index, docs, isElasticsearch8)); + + expect(result).toEqual({ recordCount: 2 }); + }); + }); +}); diff --git a/packages/elasticsearch-api/test/bulk-send-limit-spec.js b/packages/elasticsearch-api/test/bulk-send-limit-spec.js deleted file mode 100644 index d299bbb824e..00000000000 --- a/packages/elasticsearch-api/test/bulk-send-limit-spec.js +++ /dev/null @@ -1,63 +0,0 @@ -'use strict'; - -const { debugLogger, chunk, pMap } = require('@terascope/utils'); -const { ElasticsearchTestHelpers } = require('elasticsearch-store'); -const elasticsearchAPI = require('../index'); - -const { - makeClient, cleanupIndex, waitForData, - EvenDateData, TEST_INDEX_PREFIX -} = ElasticsearchTestHelpers; - -const THREE_MINUTES = 3 * 60 * 1000; - -jest.setTimeout(THREE_MINUTES + 30000); - -function formatUploadData( - index, data, isES8ClientTest = false -) { - const results = []; - - data.forEach((record) => { - const meta = { _index: index }; - - if (!isES8ClientTest) { - meta._type = '_doc'; - } - - results.push({ action: { index: meta }, data: record }); - }); - - return results; -} - -describe('bulkSend can work with congested queues', () => { - const logger = debugLogger('congested_test'); - const index = `${TEST_INDEX_PREFIX}_congested_queues_`; - - let client; - let api; - let isElasticsearch8 = false; - - beforeAll(async () => { - client = await makeClient(); - await cleanupIndex(client, index); - api = elasticsearchAPI(client, logger); - isElasticsearch8 = api.isElasticsearch8(); - }); - - afterAll(async () => { - await cleanupIndex(client, index); - }); - - it('can get correct data even with congested queues', async () => { - const chunkedData = chunk(EvenDateData.data, 50); - - await pMap(chunkedData, async (cData) => { - const formattedData = formatUploadData(index, cData, isElasticsearch8); - return api.bulkSend(formattedData); - }, { concurrency: 9 }); - - await waitForData(client, index, EvenDateData.data.length, logger, THREE_MINUTES); - }); -}); diff --git a/packages/elasticsearch-api/types/index.d.ts b/packages/elasticsearch-api/types/index.d.ts index 49255b21113..085fe6474d1 100644 --- a/packages/elasticsearch-api/types/index.d.ts +++ b/packages/elasticsearch-api/types/index.d.ts @@ -34,7 +34,7 @@ declare namespace elasticsearchAPI { * * @returns the number of affected rows, and deadLetter records if config is set */ - bulkSend: (data: BulkRecord[]) => Promise<{ count: number; deadLetter?: any[]; }>; + bulkSend: (data: BulkRecord[]) => Promise<{ recordCount: number; deadLetter?: any[]; }>; nodeInfo: (query: any) => Promise; nodeStats: (query: any) => Promise; buildQuery: (opConfig: Config, msg: any) => ClientParams.SearchParams; diff --git a/packages/teraslice/lib/storage/backends/elasticsearch_store.js b/packages/teraslice/lib/storage/backends/elasticsearch_store.js index 362d834a3fe..4b350f030a4 100644 --- a/packages/teraslice/lib/storage/backends/elasticsearch_store.js +++ b/packages/teraslice/lib/storage/backends/elasticsearch_store.js @@ -377,7 +377,7 @@ module.exports = async function elasticsearchStorage(backendConfig) { bulkQueue = []; try { - const recordCount = await bulkSend(bulkRequest); + const { recordCount } = await bulkSend(bulkRequest); const extraMsg = shuttingDown ? ', on shutdown' : ''; logger.debug(`flushed ${recordCount}${extraMsg} records to index ${indexName}`); } finally { From 05fd09dd2a62ac7bfe72622a25d2ab527f58dcd9 Mon Sep 17 00:00:00 2001 From: Charlie Iorg Date: Fri, 10 Mar 2023 16:19:21 -0700 Subject: [PATCH 03/10] updated return value --- packages/elasticsearch-api/index.js | 53 ++++++++++--------- .../workers/execution-controller/scheduler.js | 4 +- 2 files changed, 30 insertions(+), 27 deletions(-) diff --git a/packages/elasticsearch-api/index.js b/packages/elasticsearch-api/index.js index 4f5181fc19e..58f62044fd1 100644 --- a/packages/elasticsearch-api/index.js +++ b/packages/elasticsearch-api/index.js @@ -340,7 +340,7 @@ module.exports = function elasticsearchApi(client, logger, _opConfig) { /** * @param data {Array<{ action: data }>} - * @returns {Promise} + * @returns {Promise<{ recordCount: number, deadLetter: record[] }>} */ async function _bulkSend(actionRecords, previousCount = 0, previousRetryDelay = 0) { const body = actionRecords.flatMap((record, index) => { @@ -353,16 +353,7 @@ module.exports = function elasticsearchApi(client, logger, _opConfig) { } if (!isElasticsearch6()) { - const actionKey = getFirstKey(record.action); - const { _type, ...withoutTypeAction } = record.action[actionKey]; - // if data is specified return both - return record.data ? [{ - ...record.action, - [actionKey]: withoutTypeAction - }, record.data] : [{ - ...record.action, - [actionKey]: withoutTypeAction - }]; + return _removeTypeFromAction(record); } // if data is specified return both @@ -372,16 +363,7 @@ module.exports = function elasticsearchApi(client, logger, _opConfig) { const response = await _clientRequest('bulk', { body }); const results = response.body ? response.body : response; - if (!results.errors) { - return { - recordCount: results.items.reduce((c, item) => { - const [value] = Object.values(item); - // ignore non-successful status codes - if (value.status != null && value.status >= 400) return c; - return c + 1; - }, 0) - }; - } + if (!results.errors) return { recordCount: _affectedRowsCount(results) }; const { retry, successful, error, reason, deadLetter @@ -399,9 +381,7 @@ module.exports = function elasticsearchApi(client, logger, _opConfig) { } if (retry.length === 0) { - return { - recordCount: previousCount + successful - }; + return { recordCount: previousCount + successful }; } warning(); @@ -410,10 +390,24 @@ module.exports = function elasticsearchApi(client, logger, _opConfig) { return _bulkSend(retry, previousCount + successful, nextRetryDelay); } + function _removeTypeFromAction(record) { + const actionKey = getFirstKey(record.action); + + const { _type, ...withoutTypeAction } = record.action[actionKey]; + // if data is specified return both + + if (record.data == null) { + return [{ ...record.action, [actionKey]: withoutTypeAction }]; + } + + return [{ ...record.action, [actionKey]: withoutTypeAction }, record.data]; + } + /** * The new and improved bulk send with proper retry support * - * @returns {Promise} the number of affected rows + * @returns {Promise<{ recordCount: number, deadLetter: record[] }>} + * the number of affected rows and records for kafka dead letter queue */ function bulkSend(data) { if (!Array.isArray(data)) { @@ -423,6 +417,15 @@ module.exports = function elasticsearchApi(client, logger, _opConfig) { return Promise.resolve(_bulkSend(data)); } + function _affectedRowsCount(results) { + return results.items.reduce((c, item) => { + const [value] = Object.values(item); + // ignore non-successful status codes + if (value.status != null && value.status >= 400) return c; + return c + 1; + }, 0); + } + function _warn(warnLogger, msg) { let _lastTime = null; return () => { diff --git a/packages/teraslice/lib/workers/execution-controller/scheduler.js b/packages/teraslice/lib/workers/execution-controller/scheduler.js index 44a9788391a..132771c90d1 100644 --- a/packages/teraslice/lib/workers/execution-controller/scheduler.js +++ b/packages/teraslice/lib/workers/execution-controller/scheduler.js @@ -393,9 +393,9 @@ class Scheduler { this._creating += slices.length; try { - const count = await this.stateStore.createSlices(this.exId, slices); + const { recordCount } = await this.stateStore.createSlices(this.exId, slices); this.enqueueSlices(slices); - this._creating -= count; + this._creating -= recordCount; } catch (err) { const { lifecycle } = this.executionContext.config; if (lifecycle === 'once') { From 624a460e63d3fc2524419e993cb9b96d01a64f73 Mon Sep 17 00:00:00 2001 From: Charlie Iorg Date: Fri, 10 Mar 2023 17:02:12 -0700 Subject: [PATCH 04/10] refactoring --- packages/elasticsearch-api/index.js | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/packages/elasticsearch-api/index.js b/packages/elasticsearch-api/index.js index 58f62044fd1..e90f2dd32f5 100644 --- a/packages/elasticsearch-api/index.js +++ b/packages/elasticsearch-api/index.js @@ -352,9 +352,7 @@ module.exports = function elasticsearchApi(client, logger, _opConfig) { throw new Error(`Bulk send record is missing the action property${dbg}`); } - if (!isElasticsearch6()) { - return _removeTypeFromAction(record); - } + if (!isElasticsearch6()) return _es6Prep(record); // if data is specified return both return record.data ? [record.action, record.data] : [record.action]; @@ -390,17 +388,17 @@ module.exports = function elasticsearchApi(client, logger, _opConfig) { return _bulkSend(retry, previousCount + successful, nextRetryDelay); } - function _removeTypeFromAction(record) { + function _es6Prep(record) { const actionKey = getFirstKey(record.action); const { _type, ...withoutTypeAction } = record.action[actionKey]; // if data is specified return both - if (record.data == null) { - return [{ ...record.action, [actionKey]: withoutTypeAction }]; - } + const body = [{ ...record.action, [actionKey]: withoutTypeAction }]; - return [{ ...record.action, [actionKey]: withoutTypeAction }, record.data]; + if (record.data != null) body.push(record.data); + + return body; } /** From 0b527ed1c79f9db8e3e62a12f3bb88a19f248999 Mon Sep 17 00:00:00 2001 From: Charlie Iorg Date: Fri, 10 Mar 2023 17:08:00 -0700 Subject: [PATCH 05/10] refactoring --- packages/elasticsearch-api/index.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/elasticsearch-api/index.js b/packages/elasticsearch-api/index.js index e90f2dd32f5..7000ad66c35 100644 --- a/packages/elasticsearch-api/index.js +++ b/packages/elasticsearch-api/index.js @@ -352,7 +352,7 @@ module.exports = function elasticsearchApi(client, logger, _opConfig) { throw new Error(`Bulk send record is missing the action property${dbg}`); } - if (!isElasticsearch6()) return _es6Prep(record); + if (!isElasticsearch6()) return _nonEs6Prep(record); // if data is specified return both return record.data ? [record.action, record.data] : [record.action]; @@ -388,7 +388,7 @@ module.exports = function elasticsearchApi(client, logger, _opConfig) { return _bulkSend(retry, previousCount + successful, nextRetryDelay); } - function _es6Prep(record) { + function _nonEs6Prep(record) { const actionKey = getFirstKey(record.action); const { _type, ...withoutTypeAction } = record.action[actionKey]; From 8a127b38a6f1d7e7bb49ef61fba3900296f961a9 Mon Sep 17 00:00:00 2001 From: Charlie Iorg Date: Mon, 13 Mar 2023 11:44:26 -0700 Subject: [PATCH 06/10] updated tests --- packages/elasticsearch-api/index.js | 5 +-- .../test/bulk-send-additional-spec.js | 37 ++++++++----------- .../execution-controller/scheduler-spec.js | 2 +- 3 files changed, 18 insertions(+), 26 deletions(-) diff --git a/packages/elasticsearch-api/index.js b/packages/elasticsearch-api/index.js index 7000ad66c35..32c77c9c68d 100644 --- a/packages/elasticsearch-api/index.js +++ b/packages/elasticsearch-api/index.js @@ -13,7 +13,7 @@ const { get, toNumber, isString, isSimpleObject, castArray, flatten, toBoolean, uniq, random, cloneDeep, DataEntity, - isDeepEqual, getTypeOf, isProd, has + isDeepEqual, getTypeOf, isProd } = require('@terascope/utils'); const { ElasticsearchDistribution } = require('@terascope/types'); @@ -311,9 +311,6 @@ module.exports = function elasticsearchApi(client, logger, _opConfig) { nonRetriableError = true; reason = `${item.error.type}--${item.error.reason}`; - // caused_by is not always present in error msg, but useful if it is there - if (has(item.error, 'caused_by.reason')) reason += `--${item.error.caused_by.reason}`; - if (config._dead_letter_action === 'kafka_dead_letter') { deadLetter.push({ doc: actionRecords[i].data, reason }); continue; diff --git a/packages/elasticsearch-api/test/bulk-send-additional-spec.js b/packages/elasticsearch-api/test/bulk-send-additional-spec.js index 6d0a354c0b6..3a88f9557bc 100644 --- a/packages/elasticsearch-api/test/bulk-send-additional-spec.js +++ b/packages/elasticsearch-api/test/bulk-send-additional-spec.js @@ -4,8 +4,8 @@ const { debugLogger, chunk, pMap, - DataEntity, - cloneDeep + cloneDeep, + DataEntity } = require('@terascope/utils'); const { ElasticsearchTestHelpers } = require('elasticsearch-store'); const elasticsearchAPI = require('../index'); @@ -109,25 +109,20 @@ describe('bulkSend', () => { const result = await api.bulkSend(formatUploadData(index, docs, isElasticsearch8)); - expect(result).toEqual({ - recordCount: 1, - deadLetter: [ - { - doc: DataEntity.make({ - ip: '120.67.248.156', - userAgent: 'Mozilla/5.0 (Windows; U; Windows NT 6.1) AppleWebKit/533.1.2 (KHTML, like Gecko) Chrome/35.0.894.0 Safari/533.1.2', - url: 'http://lucious.biz', - uuid: 'b23a8550-0081-453f-9e80-93a90782a5bd', - created: '2019-04-26T15:00:23.225+00:00', - ipv6: '9e79:7798:585a:b847:f1c4:81eb:0c3d:7eb8', - location: '50.15003, -94.89355', - bytes: 'this is a bad value' - }), - // eslint-disable-next-line no-useless-escape, quotes - reason: `mapper_parsing_exception--failed to parse field [bytes] of type [integer] in document with id '1'--For input string: \"this is a bad value\"` - } - ] - }); + expect(result.recordCount).toBe(1); + + expect(result.deadLetter[0].doc).toEqual(DataEntity.make({ + ip: '120.67.248.156', + userAgent: 'Mozilla/5.0 (Windows; U; Windows NT 6.1) AppleWebKit/533.1.2 (KHTML, like Gecko) Chrome/35.0.894.0 Safari/533.1.2', + url: 'http://lucious.biz', + uuid: 'b23a8550-0081-453f-9e80-93a90782a5bd', + created: '2019-04-26T15:00:23.225+00:00', + ipv6: '9e79:7798:585a:b847:f1c4:81eb:0c3d:7eb8', + location: '50.15003, -94.89355', + bytes: 'this is a bad value' + })); + + expect(result.deadLetter[0].reason).toBeDefined(); }); it('should return a count if not un-retryable records', async () => { diff --git a/packages/teraslice/test/workers/execution-controller/scheduler-spec.js b/packages/teraslice/test/workers/execution-controller/scheduler-spec.js index 026a9c609f7..f30fa4e9ce4 100644 --- a/packages/teraslice/test/workers/execution-controller/scheduler-spec.js +++ b/packages/teraslice/test/workers/execution-controller/scheduler-spec.js @@ -60,7 +60,7 @@ describe('Scheduler', () => { throw new Error(`Got invalid ex_id ${exId}`); } await pDelay(0); - return slices.length; + return { recordCount: slices.length }; } }; From 4a0b1abb1a5f97e29b2207d35a39d245a4572e88 Mon Sep 17 00:00:00 2001 From: Charlie Iorg Date: Tue, 14 Mar 2023 09:21:12 -0700 Subject: [PATCH 07/10] split tests up --- ...nd-additional-spec.js => bulk-send-dlq.js} | 34 +-------- .../test/bulk-send-limit-spec.js | 72 +++++++++++++++++++ packages/elasticsearch-api/types/index.d.ts | 2 +- 3 files changed, 75 insertions(+), 33 deletions(-) rename packages/elasticsearch-api/test/{bulk-send-additional-spec.js => bulk-send-dlq.js} (74%) create mode 100644 packages/elasticsearch-api/test/bulk-send-limit-spec.js diff --git a/packages/elasticsearch-api/test/bulk-send-additional-spec.js b/packages/elasticsearch-api/test/bulk-send-dlq.js similarity index 74% rename from packages/elasticsearch-api/test/bulk-send-additional-spec.js rename to packages/elasticsearch-api/test/bulk-send-dlq.js index 3a88f9557bc..11b984db93a 100644 --- a/packages/elasticsearch-api/test/bulk-send-additional-spec.js +++ b/packages/elasticsearch-api/test/bulk-send-dlq.js @@ -2,8 +2,6 @@ const { debugLogger, - chunk, - pMap, cloneDeep, DataEntity } = require('@terascope/utils'); @@ -11,14 +9,12 @@ const { ElasticsearchTestHelpers } = require('elasticsearch-store'); const elasticsearchAPI = require('../index'); const { - makeClient, cleanupIndex, waitForData, + makeClient, cleanupIndex, EvenDateData, TEST_INDEX_PREFIX, createMappingFromDatatype } = ElasticsearchTestHelpers; -const THREE_MINUTES = 3 * 60 * 1000; - -jest.setTimeout(THREE_MINUTES + 60000); +jest.setTimeout(10000); function formatUploadData( index, data, isES8ClientTest = false @@ -47,32 +43,6 @@ describe('bulkSend', () => { client = await makeClient(); }); - describe('can work with congested queues', () => { - const logger = debugLogger('congested_test'); - const index = `${TEST_INDEX_PREFIX}_congested_queues_`; - - beforeAll(async () => { - await cleanupIndex(client, index); - api = elasticsearchAPI(client, logger); - isElasticsearch8 = api.isElasticsearch8(); - }); - - afterAll(async () => { - await cleanupIndex(client, index); - }); - - it('can get correct data even with congested queues', async () => { - const chunkedData = chunk(EvenDateData.data, 50); - - await pMap(chunkedData, async (cData) => { - const formattedData = formatUploadData(index, cData, isElasticsearch8); - return api.bulkSend(formattedData); - }, { concurrency: 9 }); - - await waitForData(client, index, EvenDateData.data.length, logger, THREE_MINUTES); - }); - }); - describe('can return non-retryable records', () => { const logger = debugLogger('congested_test'); const index = `${TEST_INDEX_PREFIX}_non-retryable-records`; diff --git a/packages/elasticsearch-api/test/bulk-send-limit-spec.js b/packages/elasticsearch-api/test/bulk-send-limit-spec.js new file mode 100644 index 00000000000..724ada1f85d --- /dev/null +++ b/packages/elasticsearch-api/test/bulk-send-limit-spec.js @@ -0,0 +1,72 @@ +'use strict'; + +const { + debugLogger, + chunk, + pMap +} = require('@terascope/utils'); +const { ElasticsearchTestHelpers } = require('elasticsearch-store'); +const elasticsearchAPI = require('../index'); + +const { + makeClient, cleanupIndex, waitForData, + EvenDateData, TEST_INDEX_PREFIX +} = ElasticsearchTestHelpers; + +const THREE_MINUTES = 3 * 60 * 1000; + +jest.setTimeout(THREE_MINUTES + 60000); + +function formatUploadData( + index, data, isES8ClientTest = false +) { + const results = []; + + data.forEach((record) => { + const meta = { _index: index }; + + if (!isES8ClientTest) { + meta._type = '_doc'; + } + + results.push({ action: { index: meta }, data: record }); + }); + + return results; +} + +describe('bulkSend', () => { + let client; + let api; + let isElasticsearch8 = false; + + beforeAll(async () => { + client = await makeClient(); + }); + + describe('can work with congested queues', () => { + const logger = debugLogger('congested_test'); + const index = `${TEST_INDEX_PREFIX}_congested_queues_`; + + beforeAll(async () => { + await cleanupIndex(client, index); + api = elasticsearchAPI(client, logger); + isElasticsearch8 = api.isElasticsearch8(); + }); + + afterAll(async () => { + await cleanupIndex(client, index); + }); + + it('can get correct data even with congested queues', async () => { + const chunkedData = chunk(EvenDateData.data, 50); + + await pMap(chunkedData, async (cData) => { + const formattedData = formatUploadData(index, cData, isElasticsearch8); + return api.bulkSend(formattedData); + }, { concurrency: 9 }); + + await waitForData(client, index, EvenDateData.data.length, logger, THREE_MINUTES); + }); + }); +}); diff --git a/packages/elasticsearch-api/types/index.d.ts b/packages/elasticsearch-api/types/index.d.ts index 085fe6474d1..e0a6dcf7bda 100644 --- a/packages/elasticsearch-api/types/index.d.ts +++ b/packages/elasticsearch-api/types/index.d.ts @@ -62,7 +62,7 @@ declare namespace elasticsearchAPI { /** * This is used for improved bulk sending function */ - export interface AnyBulkAction { + export interface AnyBulkAction { update?: Partial; index?: Partial; create?: Partial; From 1e1466ca66a804595f2275603d5abd2cd76331a3 Mon Sep 17 00:00:00 2001 From: Charlie Iorg Date: Tue, 14 Mar 2023 09:49:41 -0700 Subject: [PATCH 08/10] updated test wording --- packages/elasticsearch-api/test/bulk-send-dlq.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/elasticsearch-api/test/bulk-send-dlq.js b/packages/elasticsearch-api/test/bulk-send-dlq.js index 11b984db93a..b3ed9772128 100644 --- a/packages/elasticsearch-api/test/bulk-send-dlq.js +++ b/packages/elasticsearch-api/test/bulk-send-dlq.js @@ -72,7 +72,7 @@ describe('bulkSend', () => { await cleanupIndex(client, index); }); - it('returns records that cannot be tried again and the reason', async () => { + it('returns records that cannot be tried again if dlq config is set', async () => { const docs = cloneDeep(EvenDateData.data.slice(0, 2)); docs[0].bytes = 'this is a bad value'; From 7e38d6cf9beffbe1f423ad3b4da05a06c1d95f0f Mon Sep 17 00:00:00 2001 From: Charlie Iorg Date: Tue, 14 Mar 2023 10:23:13 -0700 Subject: [PATCH 09/10] bumped version --- packages/elasticsearch-api/package.json | 2 +- packages/teraslice-state-storage/package.json | 4 ++-- packages/teraslice/package.json | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/packages/elasticsearch-api/package.json b/packages/elasticsearch-api/package.json index feebb8df1b2..139600a362d 100644 --- a/packages/elasticsearch-api/package.json +++ b/packages/elasticsearch-api/package.json @@ -1,7 +1,7 @@ { "name": "@terascope/elasticsearch-api", "displayName": "Elasticsearch API", - "version": "3.3.7", + "version": "3.3.8", "description": "Elasticsearch client api used across multiple services, handles retries and exponential backoff", "homepage": "https://github.com/terascope/teraslice/tree/master/packages/elasticsearch-api#readme", "bugs": { diff --git a/packages/teraslice-state-storage/package.json b/packages/teraslice-state-storage/package.json index 416d6207f04..5a4814297e6 100644 --- a/packages/teraslice-state-storage/package.json +++ b/packages/teraslice-state-storage/package.json @@ -1,7 +1,7 @@ { "name": "@terascope/teraslice-state-storage", "displayName": "Teraslice State Storage", - "version": "0.36.7", + "version": "0.36.8", "description": "State storage operation api for teraslice", "homepage": "https://github.com/terascope/teraslice/tree/master/packages/teraslice-state-storage#readme", "bugs": { @@ -23,7 +23,7 @@ "test:watch": "ts-scripts test --watch . --" }, "dependencies": { - "@terascope/elasticsearch-api": "^3.3.7", + "@terascope/elasticsearch-api": "^3.3.8", "@terascope/utils": "^0.45.5" }, "engines": { diff --git a/packages/teraslice/package.json b/packages/teraslice/package.json index dd5f5df13f6..2613b85a9e0 100644 --- a/packages/teraslice/package.json +++ b/packages/teraslice/package.json @@ -37,7 +37,7 @@ "ms": "^2.1.3" }, "dependencies": { - "@terascope/elasticsearch-api": "^3.3.7", + "@terascope/elasticsearch-api": "^3.3.8", "@terascope/job-components": "^0.58.5", "@terascope/teraslice-messaging": "^0.28.5", "@terascope/utils": "^0.45.5", From fa3fd47176670e57120c8a3af5534ff6cdb5ab34 Mon Sep 17 00:00:00 2001 From: Charlie Iorg Date: Wed, 15 Mar 2023 09:57:12 -0700 Subject: [PATCH 10/10] bumped packages as a minor update for release --- packages/elasticsearch-api/package.json | 2 +- packages/teraslice-state-storage/package.json | 4 ++-- packages/teraslice/package.json | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/packages/elasticsearch-api/package.json b/packages/elasticsearch-api/package.json index 139600a362d..e2ce9331aca 100644 --- a/packages/elasticsearch-api/package.json +++ b/packages/elasticsearch-api/package.json @@ -1,7 +1,7 @@ { "name": "@terascope/elasticsearch-api", "displayName": "Elasticsearch API", - "version": "3.3.8", + "version": "3.4.0", "description": "Elasticsearch client api used across multiple services, handles retries and exponential backoff", "homepage": "https://github.com/terascope/teraslice/tree/master/packages/elasticsearch-api#readme", "bugs": { diff --git a/packages/teraslice-state-storage/package.json b/packages/teraslice-state-storage/package.json index 5a4814297e6..6c9848f5283 100644 --- a/packages/teraslice-state-storage/package.json +++ b/packages/teraslice-state-storage/package.json @@ -1,7 +1,7 @@ { "name": "@terascope/teraslice-state-storage", "displayName": "Teraslice State Storage", - "version": "0.36.8", + "version": "0.37.0", "description": "State storage operation api for teraslice", "homepage": "https://github.com/terascope/teraslice/tree/master/packages/teraslice-state-storage#readme", "bugs": { @@ -23,7 +23,7 @@ "test:watch": "ts-scripts test --watch . --" }, "dependencies": { - "@terascope/elasticsearch-api": "^3.3.8", + "@terascope/elasticsearch-api": "^3.4.0", "@terascope/utils": "^0.45.5" }, "engines": { diff --git a/packages/teraslice/package.json b/packages/teraslice/package.json index 2613b85a9e0..78ff4149ab9 100644 --- a/packages/teraslice/package.json +++ b/packages/teraslice/package.json @@ -37,7 +37,7 @@ "ms": "^2.1.3" }, "dependencies": { - "@terascope/elasticsearch-api": "^3.3.8", + "@terascope/elasticsearch-api": "^3.4.0", "@terascope/job-components": "^0.58.5", "@terascope/teraslice-messaging": "^0.28.5", "@terascope/utils": "^0.45.5",