diff --git a/e2e/config/teraslice-master.yaml b/e2e/config/teraslice-master.yaml index b2166396398..96a186eb50d 100644 --- a/e2e/config/teraslice-master.yaml +++ b/e2e/config/teraslice-master.yaml @@ -9,6 +9,12 @@ terafoundation: default: host: - 'elasticsearch:49200' + requestTimeout: 60000 + deadTimeout: 45000 + sniffOnStart: false + sniffOnConnectionFault: false + suggestCompression: false + # *********************** # Kafka Configuration # *********************** diff --git a/e2e/config/teraslice-worker.yaml b/e2e/config/teraslice-worker.yaml index 081993f154f..4905cbca6a5 100644 --- a/e2e/config/teraslice-worker.yaml +++ b/e2e/config/teraslice-worker.yaml @@ -9,6 +9,12 @@ terafoundation: default: host: - 'elasticsearch:49200' + requestTimeout: 60000 + deadTimeout: 45000 + sniffOnStart: false + sniffOnConnectionFault: false + suggestCompression: false + # *********************** # Kafka Configuration # *********************** diff --git a/packages/chunked-file-reader/package.json b/packages/chunked-file-reader/package.json index 2ff5c6d221e..5eb91c66b97 100644 --- a/packages/chunked-file-reader/package.json +++ b/packages/chunked-file-reader/package.json @@ -22,7 +22,7 @@ "test:watch": "jest --coverage=false --notify --watch --onlyChanged" }, "dependencies": { - "@terascope/utils": "^0.12.3", + "@terascope/utils": "^0.12.4", "bluebird": "^3.5.5", "csvtojson": "^2.0.8", "lodash": "^4.17.11" diff --git a/packages/data-access-plugin/package.json b/packages/data-access-plugin/package.json index 2fd1df37fe0..af43387346a 100644 --- a/packages/data-access-plugin/package.json +++ b/packages/data-access-plugin/package.json @@ -30,18 +30,18 @@ "dependencies": { "@terascope/data-access": "^0.12.0", "@terascope/data-types": "^0.3.1", - "@terascope/elasticsearch-api": "^2.0.5", - "@terascope/utils": "^0.12.3", + "@terascope/elasticsearch-api": "^2.1.0", + "@terascope/utils": "^0.12.4", "apollo-server-express": "^2.6.5", "graphql": "^14.3.1", "graphql-iso-date": "^3.6.1", "graphql-tag": "^2.10.1", "graphql-type-json": "^0.3.0", - "terafoundation": "^0.9.1", + "terafoundation": "^0.10.0", "xlucene-evaluator": "^0.9.4" }, "devDependencies": { - "@terascope/job-components": "^0.20.4", + "@terascope/job-components": "^0.20.5", "@types/express": "^4.17.0", "@types/got": "^9.6.0", "@types/graphql-iso-date": "^3.3.1", diff --git a/packages/data-access/package.json b/packages/data-access/package.json index f943d54a887..7ee02b5eb85 100644 --- a/packages/data-access/package.json +++ b/packages/data-access/package.json @@ -27,7 +27,7 @@ }, "dependencies": { "@terascope/data-types": "^0.3.1", - "@terascope/utils": "^0.12.3", + "@terascope/utils": "^0.12.4", "elasticsearch-store": "^0.9.1", "xlucene-evaluator": "^0.9.4" }, diff --git a/packages/data-types/package.json b/packages/data-types/package.json index d2cb60d7445..1a0bfe6894c 100644 --- a/packages/data-types/package.json +++ b/packages/data-types/package.json @@ -28,7 +28,7 @@ "test:watch": "jest --coverage=false --notify --watch --onlyChanged" }, "dependencies": { - "@terascope/utils": "^0.12.3", + "@terascope/utils": "^0.12.4", "graphql": "^14.3.1", "yargs": "^13.2.4" }, diff --git a/packages/elasticsearch-api/index.js b/packages/elasticsearch-api/index.js index 1fd9fbe39b5..5b99c64e6ca 100644 --- a/packages/elasticsearch-api/index.js +++ b/packages/elasticsearch-api/index.js @@ -2,7 +2,13 @@ const _ = require('lodash'); const Promise = require('bluebird'); -const { TSError, isFatalError } = require('@terascope/utils'); +const { + isTest, + TSError, + isFatalError, + getBackoffDelay, + isRetryableError +} = require('@terascope/utils'); const DOCUMENT_EXISTS = 409; @@ -52,7 +58,7 @@ module.exports = function elasticsearchApi(client = {}, logger, _opConfig) { .catch(errHandler); } - _runRequest(); + waitForClient(() => _runRequest(), reject); }); } @@ -175,7 +181,7 @@ module.exports = function elasticsearchApi(client = {}, logger, _opConfig) { if (resultIndex.found) { resultIndex.indexWindowSize.forEach((ind) => { logger.warn( - `max_result_window for index: ${ind.name} is set at ${ind.windowSize} . On very large indices it is possible that a slice can not be divided to stay below this limit. If that occurs an error will be thrown by Elasticsearch and the slice can not be processed. Increasing max_result_window in the Elasticsearch index settings will resolve the problem.` + `max_result_window for index: ${ind.name} is set at ${ind.windowSize}. On very large indices it is possible that a slice can not be divided to stay below this limit. If that occurs an error will be thrown by Elasticsearch and the slice can not be processed. Increasing max_result_window in the Elasticsearch index settings will resolve the problem.` ); }); } else { @@ -230,7 +236,7 @@ module.exports = function elasticsearchApi(client = {}, logger, _opConfig) { } if (nonRetriableError) { - return { data: [], error: nonRetriableError, reason }; + return { data: [], error: true, reason }; } return { data: retry, error: false }; @@ -238,7 +244,7 @@ module.exports = function elasticsearchApi(client = {}, logger, _opConfig) { function bulkSend(data) { return new Promise((resolve, reject) => { - const retry = _retryFn(_sendData, data); + const retry = _retryFn(_sendData, data, reject); function _sendData(formattedData) { return _clientRequest('bulk', { body: formattedData }) @@ -247,7 +253,9 @@ module.exports = function elasticsearchApi(client = {}, logger, _opConfig) { const response = _filterResponse(formattedData, results); if (response.error) { - reject(new TSError(response.reason)); + reject(new TSError(response.reason, { + retryable: false + })); } else if (response.data.length === 0) { // may get doc already created error, if so just return resolve(results); @@ -484,7 +492,7 @@ module.exports = function elasticsearchApi(client = {}, logger, _opConfig) { function _searchES(query) { return new Promise((resolve, reject) => { const errHandler = _errorHandler(_performSearch, query, reject, '->search()'); - const retry = _retryFn(_performSearch, query); + const retry = _retryFn(_performSearch, query, reject); function _performSearch(queryParam) { client @@ -517,38 +525,83 @@ module.exports = function elasticsearchApi(client = {}, logger, _opConfig) { .catch(errHandler); } - _performSearch(query); + waitForClient(() => _performSearch(query), reject); }); } - function _retryFn(fn, data) { - const retryTimer = { start: retryStart, limit: retryLimit }; + /** + * Wait for the client to be available before resolving, + * this will also naturally stagger many in-flight requests + * + * - reject if the connection is closed + * - resolve after timeout to let the underlying client deal with any problems + */ + function waitForClient(resolve, reject) { + let intervalId = null; + const startTime = Date.now(); + + // set different values for when process.env.NODE_ENV === test + const timeoutMs = isTest ? 1000 : _.random(5000, 15000); + const intervalMs = isTest ? 50 : 100; + + // avoiding setting the interval if we don't need to + if (_checkClient()) { + return; + } + + intervalId = setInterval(_checkClient, intervalMs); + + function _checkClient() { + const elapsed = Date.now() - startTime; + try { + const valid = verifyClient(); + if (!valid && elapsed <= timeoutMs) return false; + + clearInterval(intervalId); + resolve(elapsed); + return true; + } catch (err) { + clearInterval(intervalId); + reject(err); + return true; + } + } + } + + function _retryFn(fn, data, reject) { + let delay = 0; return (_data) => { const args = _data || data; - const randomMs = Math.random() * (retryTimer.limit - retryTimer.start); - const timer = Math.floor(randomMs + retryTimer.start); - if (retryTimer.limit < 60000) { - retryTimer.limit += retryLimit; - } - if (retryTimer.start < 30000) { - retryTimer.start += retryStart; - } - setTimeout(() => { - fn(args); - }, timer); + waitForClient((elapsed) => { + delay = getBackoffDelay(delay, 2, retryLimit, retryStart); + + let timeoutMs = delay - elapsed; + if (timeoutMs < 1) timeoutMs = 1; + + setTimeout(() => { + fn(args); + }, timeoutMs); + }, reject); }; } function _errorHandler(fn, data, reject, fnName = '->unknown()') { - const retry = _retryFn(fn, data); + const retry = _retryFn(fn, data, reject); return function _errorHandlerFn(err) { - const isRejectedError = _.get(err, 'body.error.type') === 'es_rejected_execution_exception'; - // const isConnectionError = _.get(err, 'message') === 'No Living connections'; + let retryable = false; + if (isRetryableError(err)) { + retryable = true; + } else { + const isRejectedError = _.get(err, 'body.error.type') === 'es_rejected_execution_exception'; + const isConnectionError = _.get(err, 'message', '').includes('No Living connections'); + if (isRejectedError || isConnectionError) { + retryable = true; + } + } - if (isRejectedError) { - // this iteration we will not handle with no living connections issue + if (retryable) { retry(); } else { reject( @@ -568,30 +621,84 @@ module.exports = function elasticsearchApi(client = {}, logger, _opConfig) { return num >= 210; } - function _isAvailable(index) { - const query = { index, q: '*' }; + function isAvailable(index, recordType) { + const query = { + index, + q: '', + size: 0, + terminate_after: '1', + }; + + const label = recordType ? `for ${recordType}` : index; - return new Promise((resolve) => { - search(query) + return new Promise((resolve, reject) => { + client + .search(query) .then((results) => { - logger.trace(`index ${index} is now available`); + logger.trace(`index ${label} is now available`); resolve(results); }) .catch(() => { - const isReady = setInterval(() => { - search(query) + let running = false; + const checkInterval = setInterval(() => { + if (running) return; + running = true; + + try { + const valid = verifyClient(); + if (!valid) { + logger.debug(`index ${label} is in an invalid state`); + return; + } + } catch (err) { + running = false; + clearInterval(checkInterval); + reject(err); + return; + } + + client + .search(query) .then((results) => { - clearInterval(isReady); + running = false; + + clearInterval(checkInterval); resolve(results); }) .catch(() => { - logger.warn('verifying job index is open'); + running = false; + + logger.warn(`verifying index ${label} is open`); }); }, 200); }); }); } + /** + * Verify the state of the underlying es client. + * Will throw error if in fatal state, like the connection is closed. + * + * @returns {boolean} if client is working state it will return true + */ + function verifyClient() { + const closed = _.get(client, 'transport.closed', false); + if (closed) { + throw new TSError('Elasticsearch Client is closed', { + fatalError: true + }); + } + + const alive = _.get(client, 'transport.connectionPool._conns.alive'); + // so we don't break existing tests with mocked clients, we will default to 1 + const aliveCount = alive && Array.isArray(alive) ? alive.length : 1; + if (!aliveCount) { + return false; + } + + return true; + } + function _migrate(index, migrantIndexName, mapping, recordType, clusterName) { const reindexQuery = { slices: 4, @@ -755,7 +862,7 @@ module.exports = function elasticsearchApi(client = {}, logger, _opConfig) { return new Promise((resolve, reject) => { const attemptToCreateIndex = () => { _createIndex(newIndex, migrantIndexName, mapping, recordType, clusterName) - .then(() => _isAvailable(newIndex)) + .then(() => isAvailable(newIndex)) .catch((err) => { if (isFatalError(err)) return Promise.reject(err); @@ -793,7 +900,7 @@ module.exports = function elasticsearchApi(client = {}, logger, _opConfig) { } if (bool) { logger.info('connection to elasticsearch has been established'); - return _isAvailable(newIndex); + return isAvailable(newIndex); } return Promise.resolve(); }) @@ -831,6 +938,7 @@ module.exports = function elasticsearchApi(client = {}, logger, _opConfig) { mget, index: indexFn, indexWithId, + isAvailable, create, update, remove, @@ -845,6 +953,7 @@ module.exports = function elasticsearchApi(client = {}, logger, _opConfig) { indexRefresh, indexRecovery, indexSetup, + verifyClient, validateGeoParameters, // The APIs below are deprecated and should be removed. index_exists: indexExists, diff --git a/packages/elasticsearch-api/package.json b/packages/elasticsearch-api/package.json index c04a23bd789..ba482ee50cb 100644 --- a/packages/elasticsearch-api/package.json +++ b/packages/elasticsearch-api/package.json @@ -1,6 +1,6 @@ { "name": "@terascope/elasticsearch-api", - "version": "2.0.5", + "version": "2.1.0", "description": "Elasticsearch client api used across multiple services, handles retries and exponential backoff", "homepage": "https://github.com/terascope/teraslice/tree/master/packages/elasticsearch-api#readme", "bugs": { @@ -19,7 +19,7 @@ "test:watch": "jest --coverage=false --notify --watch --onlyChanged" }, "dependencies": { - "@terascope/utils": "^0.12.3", + "@terascope/utils": "^0.12.4", "bluebird": "^3.5.5", "lodash": "^4.17.11" }, diff --git a/packages/elasticsearch-api/test/api-spec.js b/packages/elasticsearch-api/test/api-spec.js index abb6c7a4428..40607bdce8e 100644 --- a/packages/elasticsearch-api/test/api-spec.js +++ b/packages/elasticsearch-api/test/api-spec.js @@ -170,7 +170,9 @@ describe('elasticsearch-api', () => { function getRecoveryData(index) { const obj = {}; - obj[index] = { shards: [{ shard: 1, primary: true, stage: recoverError ? 'notdone' : 'DONE' }] }; + obj[index] = { + shards: [{ shard: 1, primary: true, stage: recoverError ? 'notdone' : 'DONE' }] + }; return obj; } @@ -180,7 +182,9 @@ describe('elasticsearch-api', () => { response.errors = true; response.items = results.body.map((obj, index) => { _.forOwn(obj, (value, key) => { - obj[key] = _.assign(value, { error: { type: bulkError[index] || 'someType', reason: 'someReason' } }); + obj[key] = _.assign(value, { + error: { type: bulkError[index] || 'someType', reason: 'someReason' } + }); }); return obj; }); @@ -200,6 +204,17 @@ describe('elasticsearch-api', () => { } const client = { + // set this so we can verify the index + transport: { + closed: false, + requestTimeout: 50, + connectionPool: { + _conns: { + alive: [{}], + dead: [{}] + } + } + }, mget: () => Promise.resolve(getData()), get: () => Promise.resolve(recordsReturned[0]), index: () => Promise.resolve(postedData('created')), @@ -271,7 +286,9 @@ describe('elasticsearch-api', () => { it('can instantiate', () => { let api; - expect(() => { api = esApi(client, logger); }).not.toThrow(); + expect(() => { + api = esApi(client, logger); + }).not.toThrow(); expect(api).toBeDefined(); expect(typeof api).toEqual('object'); expect(api.search).toBeDefined(); @@ -314,57 +331,55 @@ describe('elasticsearch-api', () => { expect(typeof api.indexSetup).toEqual('function'); }); - it('count returns total amount for query', (done) => { + it('count returns total amount for query', async () => { const query = { body: 'someQuery' }; const api = esApi(client, logger); - Promise.resolve(api.count(query)) - .then((results) => { - expect(query).toEqual({ body: 'someQuery', size: 0 }); - expect(results).toEqual(0); - total = 500; - return api.count(query); - }) - .then(results => expect(results).toEqual(500)) - .catch(fail) - .finally(() => { done(); }); + const results = await api.count(query); + expect(query).toEqual({ body: 'someQuery', size: 0 }); + expect(results).toEqual(0); + total = 500; + return expect(api.count(query)).resolves.toEqual(500); }); - it('can search', (done) => { + it('can search', async () => { const query = { body: 'someQuery' }; const api = esApi(client, logger); const apiFullResponse = esApi(client, logger, { full_response: true }); recordsReturned = [{ _source: { some: 'data' } }]; - Promise.all([api.search(query), apiFullResponse.search(query)]) - .spread((results1, results2) => { - expect(results1).toEqual([recordsReturned[0]._source]); - expect(results2).toEqual(getData()); - }) - .catch(fail) - .finally(() => { done(); }); + const [results1, results2] = await Promise.all([ + api.search(query), + apiFullResponse.search(query) + ]); + expect(results1).toEqual([recordsReturned[0]._source]); + expect(results2).toEqual(getData()); }); - it('search can handle rejection errors', (done) => { + it('search can handle rejection errors', async () => { const query = { body: 'someQuery' }; const api = esApi(client, logger); let queryFailed = false; searchError = { body: { error: { type: 'es_rejected_execution_exception' } } }; recordsReturned = [{ _source: { some: 'data' } }]; - Promise.all([api.search(query), waitFor(50, () => { searchError = false; })]) - .spread((results) => { - expect(results).toEqual([recordsReturned[0]._source]); - searchError = { body: { error: { type: 'some_thing_else' } } }; - return api.search(query) - .catch(() => { queryFailed = true; }); + const [results] = await Promise.all([ + api.search(query), + waitFor(50, () => { + searchError = false; }) - .then(() => expect(queryFailed).toEqual(true)) - .catch(fail) - .finally(() => { done(); }); + ]); + expect(results).toEqual([recordsReturned[0]._source]); + searchError = { body: { error: { type: 'some_thing_else' } } }; + try { + await api.search(query); + } catch (e) { + queryFailed = true; + } + return expect(queryFailed).toEqual(true); }); - it('search can handle shard errors', (done) => { + it('search can handle shard errors', async () => { const query = { body: 'someQuery' }; const api = esApi(client, logger); let queryFailed = false; @@ -372,190 +387,161 @@ describe('elasticsearch-api', () => { failures = [{ reason: { type: 'es_rejected_execution_exception' } }]; recordsReturned = [{ _source: { some: 'data' } }]; - Promise.all([ + const [results] = await Promise.all([ api.search(query), waitFor(20, () => { failed = 0; failures = []; }) - ]) - .spread((results) => { - expect(results).toEqual([recordsReturned[0]._source]); - failed = 4; - failures = [{ reason: { type: 'some other error' } }]; - return Promise.all([ - api.search(query), - waitFor(50, () => { - failed = 0; - failures = []; - }) - ]).catch(() => { queryFailed = true; }); - }) - .then(() => expect(queryFailed).toEqual(true)) - .catch(fail) - .finally(() => { done(); }); + ]); + expect(results).toEqual([recordsReturned[0]._source]); + failed = 4; + failures = [{ reason: { type: 'some other error' } }]; + try { + await Promise.all([ + api.search(query), + waitFor(50, () => { + failed = 0; + failures = []; + }) + ]); + } catch (e) { + queryFailed = true; + } + return expect(queryFailed).toEqual(true); }); - it('can call mget', (done) => { + it('can call mget', async () => { const query = { body: 'someQuery' }; const api = esApi(client, logger); recordsReturned = [{ _source: { some: 'data' } }]; - Promise.resolve(api.mget(query)) - .then(results => expect(results).toEqual(getData())) - .catch(fail) - .finally(() => { done(); }); + const results = await api.mget(query); + return expect(results).toEqual(getData()); }); - it('can call get', (done) => { + it('can call get', async () => { const query = { body: 'someQuery' }; const api = esApi(client, logger); recordsReturned = [{ _source: { some: 'data' } }]; - Promise.resolve(api.get(query)) - .then(results => expect(results).toEqual(recordsReturned[0]._source)) - .catch(fail) - .finally(() => { done(); }); + const results = await api.get(query); + return expect(results).toEqual(recordsReturned[0]._source); }); - it('can call index', (done) => { + it('can call index', async () => { const query = { index: 'someIndex', type: 'sometype', body: 'someQuery' }; const api = esApi(client, logger); - Promise.resolve(api.index(query)) - .then(results => expect(results.created).toEqual(true)) - .catch(fail) - .finally(() => { done(); }); + const results = await api.index(query); + return expect(results.created).toEqual(true); }); - it('can call indexWithId', (done) => { + it('can call indexWithId', async () => { const query = { - index: 'someIndex', id: 'someId', type: 'sometype', body: 'someQuery' + index: 'someIndex', + id: 'someId', + type: 'sometype', + body: 'someQuery' }; const api = esApi(client, logger); recordsReturned = [{ _source: { some: 'data' } }]; - Promise.resolve(api.indexWithId(query)) - .then(results => expect(results).toEqual(query.body)) - .catch(fail) - .finally(() => { done(); }); + const results = await api.indexWithId(query); + return expect(results).toEqual(query.body); }); - it('can call create', (done) => { + it('can call create', async () => { const query = { index: 'someIndex', type: 'sometype', body: 'someQuery' }; const api = esApi(client, logger); - Promise.resolve(api.create(query)) - .then(results => expect(results).toEqual(query.body)) - .catch(fail) - .finally(() => { done(); }); + const results = await api.create(query); + return expect(results).toEqual(query.body); }); - it('can call update', (done) => { + it('can call update', async () => { const query = { index: 'someIndex', type: 'sometype', body: { doc: { some: 'data' } } }; const api = esApi(client, logger); - Promise.resolve(api.update(query)) - .then(results => expect(results).toEqual(query.body.doc)) - .catch(fail) - .finally(() => { done(); }); + const results = await api.update(query); + return expect(results).toEqual(query.body.doc); }); - it('can call remove', (done) => { + it('can call remove', async () => { const query = { index: 'someIndex', type: 'sometype', id: 'someId' }; const api = esApi(client, logger); - Promise.resolve(api.remove(query)) - .then(results => expect(results).toEqual(true)) - .catch(fail) - .finally(() => { done(); }); + const results = await api.remove(query); + return expect(results).toEqual(true); }); - it('can call indexExists', (done) => { + it('can call indexExists', async () => { const query = { index: 'someIndex' }; const api = esApi(client, logger); - Promise.resolve(api.indexExists(query)) - .then(results => expect(results).toEqual(true)) - .catch(fail) - .finally(() => { done(); }); + const results = await api.indexExists(query); + return expect(results).toEqual(true); }); - it('can call indexCreate', (done) => { + it('can call indexCreate', async () => { const query = { index: 'someIndex' }; const api = esApi(client, logger); - Promise.resolve(api.indexCreate(query)) - .then(results => expect(results.acknowledged).toEqual(true)) - .catch(fail) - .finally(() => { done(); }); + const results = await api.indexCreate(query); + return expect(results.acknowledged).toEqual(true); }); - it('can call indexRefresh', (done) => { + it('can call indexRefresh', async () => { const query = { index: 'someIndex' }; const api = esApi(client, logger); - Promise.resolve(api.indexRefresh(query)) - .then(results => expect(results).toBeTruthy()) - .catch(fail) - .finally(() => { done(); }); + const results = await api.indexRefresh(query); + return expect(results).toBeTruthy(); }); - it('can call indexRecovery', (done) => { + it('can call indexRecovery', async () => { const query = { index: 'someIndex' }; const api = esApi(client, logger); - Promise.resolve(api.indexRecovery(query)) - .then(results => expect(results[query.index]).toBeTruthy()) - .catch(fail) - .finally(() => { done(); }); + const results = await api.indexRecovery(query); + return expect(results[query.index]).toBeTruthy(); }); - it('can call nodeInfo', (done) => { + it('can call nodeInfo', async () => { const api = esApi(client, logger); - Promise.resolve(api.nodeInfo()) - .then(results => expect(results).toBeTruthy()) - .catch(fail) - .finally(() => { done(); }); + const results = await api.nodeInfo(); + return expect(results).toBeTruthy(); }); - it('can call nodeStats', (done) => { + it('can call nodeStats', async () => { const api = esApi(client, logger); - Promise.resolve(api.nodeStats()) - .then(results => expect(results).toBeTruthy()) - .catch(fail) - .finally(() => { done(); }); + const results = await api.nodeStats(); + return expect(results).toBeTruthy(); }); - it('can call nodeStats', (done) => { + it('can call nodeStats', async () => { const api = esApi(client, logger); - Promise.resolve(api.nodeStats()) - .then(results => expect(results).toBeTruthy()) - .catch(fail) - .finally(() => { done(); }); + const results = await api.nodeStats(); + return expect(results).toBeTruthy(); }); - it('can warn window size with version', (done) => { + it('can warn window size with version', () => { const api = esApi(client, logger, { index: 'some_index' }); - Promise.resolve(api.version()) - .catch(fail) - .finally(() => { done(); }); + return api.version(); }); - it('can call putTemplate', (done) => { + it('can call putTemplate', async () => { const api = esApi(client, logger); - Promise.resolve(api.putTemplate(template, 'somename')) - .then(results => expect(results.acknowledged).toEqual(true)) - .catch(fail) - .finally(() => { done(); }); + const results = await api.putTemplate(template, 'somename'); + return expect(results.acknowledged).toEqual(true); }); - it('can call bulkSend', (done) => { + it('can call bulkSend', async () => { const api = esApi(client, logger); const myBulkData = [ { index: { _index: 'some_index', _type: 'events', _id: 1 } }, @@ -563,37 +549,42 @@ describe('elasticsearch-api', () => { { delete: { _index: 'some_index', _type: 'events', _id: 5 } } ]; - Promise.resolve(api.bulkSend(myBulkData)) - .then(results => expect(results).toBeTruthy()) - .catch(fail) - .finally(() => { done(); }); + const results = await api.bulkSend(myBulkData); + return expect(results).toBeTruthy(); }); - it('can call bulkSend with errors', (done) => { + it('can call bulkSend with errors', async () => { const api = esApi(client, logger); const myBulkData = [ { index: { _index: 'some_index', _type: 'events', _id: 1 } }, { title: 'foo' }, { delete: { _index: 'some_index', _type: 'events', _id: 5 } } ]; - bulkError = ['es_rejected_execution_exception', 'es_rejected_execution_exception', 'es_rejected_execution_exception']; - let queryFailed = false; - Promise.all([api.bulkSend(myBulkData), waitFor(20, () => { bulkError = false; })]) - .spread((results) => { - expect(results).toBeTruthy(); - bulkError = ['some_thing_else', 'some_thing_else', 'some_thing_else']; - return Promise.all([ - api.bulkSend(myBulkData), - waitFor(20, () => { bulkError = false; }) - ]).catch((err) => { - queryFailed = true; - expect(err.message).toInclude('some_thing_else--someReason'); - }); + bulkError = [ + 'es_rejected_execution_exception', + 'es_rejected_execution_exception', + 'es_rejected_execution_exception' + ]; + + const [results] = await Promise.all([ + api.bulkSend(myBulkData), + waitFor(20, () => { + bulkError = false; }) - .then(() => expect(queryFailed).toEqual(true)) - .catch(fail) - .finally(() => { done(); }); + ]); + + expect(results).toBeTruthy(); + bulkError = ['some_thing_else', 'some_thing_else', 'some_thing_else']; + + return expect( + Promise.all([ + api.bulkSend(myBulkData), + waitFor(20, () => { + bulkError = false; + }) + ]) + ).rejects.toThrow(/some_thing_else--someReason/); }); it('can call buildQuery for geo queries', () => { @@ -644,7 +635,6 @@ describe('elasticsearch-api', () => { geo_sort_order: 'asc' }; - const goodConfig1 = { index: 'some_index', geo_field: 'some_field', @@ -773,7 +763,8 @@ describe('elasticsearch-api', () => { } } } - }]; + } + ]; const finalResponse1 = makeResponse(goodConfig1, msg1, response1); const finalResponse2 = makeResponse(goodConfig2, msg1, response1, sort1); @@ -781,14 +772,30 @@ describe('elasticsearch-api', () => { const finalResponse4 = makeResponse(goodConfig4, msg1, response2, sort3); const finalResponse5 = makeResponse(goodConfig2, msg2, response3, sort1); - expect(() => api.buildQuery(badOpConfig1, msg1)).toThrowError('if geo_field is specified then the appropriate geo_box or geo_distance query parameters need to be provided as well'); - expect(() => api.buildQuery(badOpConfig2, msg1)).toThrowError('Both geo_box_top_left and geo_box_bottom_right must be provided for a geo bounding box query.'); - expect(() => api.buildQuery(badOpConfig3, msg1)).toThrowError('Both geo_box_top_left and geo_box_bottom_right must be provided for a geo bounding box query.'); - expect(() => api.buildQuery(badOpConfig4, msg1)).toThrowError('Both geo_point and geo_distance must be provided for a geo_point query.'); - expect(() => api.buildQuery(badOpConfig5, msg1)).toThrowError('Both geo_point and geo_distance must be provided for a geo_point query.'); - expect(() => api.buildQuery(badOpConfig6, msg1)).toThrowError('geo_box and geo_distance queries can not be combined.'); - expect(() => api.buildQuery(badOpConfig7, msg1)).toThrowError('bounding box search requires geo_sort_point to be set if any other geo_sort_* parameter is provided'); - expect(() => api.buildQuery(badOpConfig8, msg1)).toThrowError('bounding box search requires geo_sort_point to be set if any other geo_sort_* parameter is provided'); + expect(() => api.buildQuery(badOpConfig1, msg1)).toThrowError( + 'if geo_field is specified then the appropriate geo_box or geo_distance query parameters need to be provided as well' + ); + expect(() => api.buildQuery(badOpConfig2, msg1)).toThrowError( + 'Both geo_box_top_left and geo_box_bottom_right must be provided for a geo bounding box query.' + ); + expect(() => api.buildQuery(badOpConfig3, msg1)).toThrowError( + 'Both geo_box_top_left and geo_box_bottom_right must be provided for a geo bounding box query.' + ); + expect(() => api.buildQuery(badOpConfig4, msg1)).toThrowError( + 'Both geo_point and geo_distance must be provided for a geo_point query.' + ); + expect(() => api.buildQuery(badOpConfig5, msg1)).toThrowError( + 'Both geo_point and geo_distance must be provided for a geo_point query.' + ); + expect(() => api.buildQuery(badOpConfig6, msg1)).toThrowError( + 'geo_box and geo_distance queries can not be combined.' + ); + expect(() => api.buildQuery(badOpConfig7, msg1)).toThrowError( + 'bounding box search requires geo_sort_point to be set if any other geo_sort_* parameter is provided' + ); + expect(() => api.buildQuery(badOpConfig8, msg1)).toThrowError( + 'bounding box search requires geo_sort_point to be set if any other geo_sort_* parameter is provided' + ); expect(api.buildQuery(goodConfig1, msg1)).toEqual(finalResponse1); expect(api.buildQuery(goodConfig2, msg1)).toEqual(finalResponse2); @@ -803,7 +810,11 @@ describe('elasticsearch-api', () => { const opConfig1 = { index: 'some_index' }; const opConfig2 = { index: 'some_index', date_field_name: 'created' }; const opConfig3 = { index: 'some_index', query: 'someLucene:query' }; - const opConfig4 = { index: 'some_index', query: 'someLucene:query', fields: ['field1', 'field2'] }; + const opConfig4 = { + index: 'some_index', + query: 'someLucene:query', + fields: ['field1', 'field2'] + }; const msg1 = { count: 100, key: 'someKey' }; const msg2 = { count: 100, start: new Date(), end: new Date() }; @@ -830,7 +841,10 @@ describe('elasticsearch-api', () => { const response1 = { wildcard: { _uid: 'someKey' } }; const response2 = { range: { created: { gte: msg2.start, lt: msg2.end } } }; const response3 = { query_string: { query: opConfig3.query } }; - const response4 = [{ wildcard: { _uid: 'someKey' } }, { query_string: { query: opConfig3.query } }]; + const response4 = [ + { wildcard: { _uid: 'someKey' } }, + { query_string: { query: opConfig3.query } } + ]; expect(api.buildQuery(opConfig1, msg1)).toEqual(makeResponse(opConfig1, msg1, response1)); expect(api.buildQuery(opConfig2, msg2)).toEqual(makeResponse(opConfig2, msg2, response2)); @@ -839,7 +853,7 @@ describe('elasticsearch-api', () => { expect(api.buildQuery(opConfig3, msg1)).toEqual(makeResponse(opConfig3, msg1, response4)); }); - it('can set up an index', (done) => { + it('can set up an index', () => { const api = esApi(client, logger); const clusterName = 'teracluster'; const newIndex = 'teracluster__state'; @@ -847,12 +861,17 @@ describe('elasticsearch-api', () => { const recordType = 'state'; const clientName = 'default'; - api.indexSetup(clusterName, newIndex, migrantIndexName, template, recordType, clientName) - .catch(fail) - .finally(() => { done(); }); + return api.indexSetup( + clusterName, + newIndex, + migrantIndexName, + template, + recordType, + clientName + ); }); - it('can set up an index and wait for availability', (done) => { + it('can set up an index and wait for availability', () => { const api = esApi(client, logger); const clusterName = 'teracluster'; const newIndex = 'teracluster__state'; @@ -862,8 +881,10 @@ describe('elasticsearch-api', () => { searchError = true; - Promise.all([ - waitFor(300, () => { searchError = false; }), + return Promise.all([ + waitFor(300, () => { + searchError = false; + }), api.indexSetup( clusterName, newIndex, @@ -872,12 +893,10 @@ describe('elasticsearch-api', () => { recordType, clientName ) - ]) - .catch(fail) - .finally(() => { done(); }); + ]); }); - it('can wait for elasticsearch availability', (done) => { + it('can wait for elasticsearch availability', () => { const api = esApi(client, logger); const clusterName = 'teracluster'; const newIndex = 'teracluster__state'; @@ -888,7 +907,7 @@ describe('elasticsearch-api', () => { elasticDown = true; recoverError = true; - Promise.all([ + return Promise.all([ api.indexSetup( clusterName, newIndex, @@ -898,14 +917,16 @@ describe('elasticsearch-api', () => { clientName, 1000 ), - waitFor(1, () => { elasticDown = false; }), - waitFor(1200, () => { recoverError = false; }), - ]) - .catch(fail) - .finally(() => { done(); }); + waitFor(1, () => { + elasticDown = false; + }), + waitFor(1200, () => { + recoverError = false; + }) + ]); }); - it('can send template on state mapping changes, does not migrate', (done) => { + it('can send template on state mapping changes, does not migrate', async () => { const api = esApi(client, logger); const clusterName = 'teracluster'; const newIndex = 'teracluster__state'; @@ -915,16 +936,19 @@ describe('elasticsearch-api', () => { changeMappings = true; - api.indexSetup(clusterName, newIndex, migrantIndexName, template, recordType, clientName) - .then(() => { - expect(putTemplateCalled).toEqual(true); - expect(reindexCalled).toEqual(false); - }) - .catch(fail) - .finally(() => { done(); }); + await api.indexSetup( + clusterName, + newIndex, + migrantIndexName, + template, + recordType, + clientName + ); + expect(putTemplateCalled).toEqual(true); + expect(reindexCalled).toEqual(false); }); - it('can migrate on mapping changes', (done) => { + it('can migrate on mapping changes', async () => { const api = esApi(client, logger); const clusterName = 'teracluster'; const newIndex = 'teracluster__ex'; @@ -935,13 +959,16 @@ describe('elasticsearch-api', () => { changeMappings = true; isExecutionTemplate = true; - api.indexSetup(clusterName, newIndex, migrantIndexName, template, recordType, clientName) - .then(() => { - expect(reindexCalled).toEqual(true); - expect(indicesDeleteCalled).toEqual(true); - expect(indicesPutAliasCalled).toEqual(true); - }) - .catch(fail) - .finally(() => { done(); }); + await api.indexSetup( + clusterName, + newIndex, + migrantIndexName, + template, + recordType, + clientName + ); + expect(reindexCalled).toEqual(true); + expect(indicesDeleteCalled).toEqual(true); + expect(indicesPutAliasCalled).toEqual(true); }); }); diff --git a/packages/elasticsearch-api/types/index.d.ts b/packages/elasticsearch-api/types/index.d.ts index 74938c7002e..5a85aa0fa6d 100644 --- a/packages/elasticsearch-api/types/index.d.ts +++ b/packages/elasticsearch-api/types/index.d.ts @@ -6,7 +6,7 @@ import { Logger } from '@terascope/utils'; export = elasticsearchAPI; -declare function elasticsearchAPI(client: Client, logger: Logger, config?: elasticsearchAPI.Config): elasticsearchAPI.Client +declare function elasticsearchAPI(client: Client, logger: Logger, config?: elasticsearchAPI.Config): elasticsearchAPI.Client; declare namespace elasticsearchAPI { export interface Config { @@ -16,12 +16,13 @@ declare namespace elasticsearchAPI { } export interface Client { - search: (query: es.SearchParams) => Promise; + search: (query: es.SearchParams) => Promise; count: (query: es.CountParams) => Promise; get: (query: es.GetParams) => Promise; mget: (query: es.MGetParams) => Promise; index: (query: es.IndexDocumentParams) => Promise; indexWithId: (query: es.IndexDocumentParams) => Promise; + isAvailable: (index: string, recordType?: string) => Promise; create: (query: es.CreateDocumentParams) => Promise; update: (query: es.UpdateDocumentParams) => Promise; remove: (query: es.DeleteDocumentParams) => Promise; @@ -36,6 +37,7 @@ declare namespace elasticsearchAPI { indexRefresh: (query: es.IndicesRefreshParams) => Promise; indexRecovery: (query: es.IndicesRecoveryParams) => Promise; indexSetup: (clusterName, newIndex, migrantIndexName, mapping, recordType, clientName) => Promise; + verifyClient: () => boolean; validateGeoParameters: (opConfig: any) => any; } } diff --git a/packages/elasticsearch-store/package.json b/packages/elasticsearch-store/package.json index 3ee4a76fa37..d4ce5254970 100644 --- a/packages/elasticsearch-store/package.json +++ b/packages/elasticsearch-store/package.json @@ -25,7 +25,7 @@ "test:watch": "jest --coverage=false --notify --watch --onlyChanged" }, "dependencies": { - "@terascope/utils": "^0.12.3", + "@terascope/utils": "^0.12.4", "ajv": "^6.10.0", "nanoid": "^2.0.3", "rambda": "^2.11.1", diff --git a/packages/elasticsearch-store/src/index-store.ts b/packages/elasticsearch-store/src/index-store.ts index e2c09dc146f..503908d8b17 100644 --- a/packages/elasticsearch-store/src/index-store.ts +++ b/packages/elasticsearch-store/src/index-store.ts @@ -468,7 +468,7 @@ type BulkRequestMetadata = { _index: string; _type: string; _id?: string; - }; + } }; interface RecordResponse { diff --git a/packages/job-components/package.json b/packages/job-components/package.json index 5c164e74f0d..3b7dcfacb4f 100644 --- a/packages/job-components/package.json +++ b/packages/job-components/package.json @@ -1,6 +1,6 @@ { "name": "@terascope/job-components", - "version": "0.20.4", + "version": "0.20.5", "description": "A teraslice library for validating jobs schemas, registering apis, and defining and running new Job APIs", "homepage": "https://github.com/terascope/teraslice/tree/master/packages/job-components#readme", "bugs": { @@ -34,7 +34,7 @@ }, "dependencies": { "@terascope/queue": "^1.1.6", - "@terascope/utils": "^0.12.3", + "@terascope/utils": "^0.12.4", "convict": "^4.4.1", "datemath-parser": "^1.0.6", "uuid": "^3.3.2" diff --git a/packages/job-components/src/execution-context/utils.ts b/packages/job-components/src/execution-context/utils.ts index c564e1e020f..2bd171ff0b4 100644 --- a/packages/job-components/src/execution-context/utils.ts +++ b/packages/job-components/src/execution-context/utils.ts @@ -1,4 +1,4 @@ -import { isFunction } from '@terascope/utils'; +import { isFunction, get } from '@terascope/utils'; import { OperationAPI, OperationAPIType } from '../operations'; import { Context } from '../interfaces'; @@ -17,14 +17,12 @@ export function getOperationAPIType(api: any): OperationAPIType { } export function makeContextLogger(context: Context, moduleName: string, extra = {}) { - const { assignment, cluster } = context; - return context.apis.foundation.makeLogger( Object.assign( { module: moduleName, - worker_id: cluster.worker.id, - assignment, + worker_id: get(context, 'cluster.worker.id'), + assignment: get(context, 'assignment'), }, extra ) diff --git a/packages/job-components/src/execution-context/worker.ts b/packages/job-components/src/execution-context/worker.ts index 38bac264164..9024a288bc8 100644 --- a/packages/job-components/src/execution-context/worker.ts +++ b/packages/job-components/src/execution-context/worker.ts @@ -363,7 +363,7 @@ export class WorkerExecutionContext extends BaseExecutionContext= bulkSize) { @@ -239,30 +242,36 @@ module.exports = function module(backendConfig) { }); } - function _flush(shuttingDown = false) { - if (!bulkQueue.length) return Promise.resolve(); - if (!shuttingDown && savingBulk) return Promise.resolve(); + async function bulkSend(bulkRequest) { + const recordCount = (bulkRequest.length / 2); + + await pRetry(async () => elasticsearch.bulkSend(bulkRequest), { + reason: `Failure to bulk create "${recordType}"`, + logError: logger.warn, + delay: isTest ? 100 : 1000, + backoff: 5, + retries: 100, + }); + + return recordCount; + } + + async function _flush(shuttingDown = false) { + if (!bulkQueue.length) return; + if (!shuttingDown && savingBulk) return; + savingBulk = true; const bulkRequest = bulkQueue.slice(); bulkQueue = []; - return elasticsearch - .bulkSend(bulkRequest) - .then((results) => { - const records = results.items.length; - const extraMsg = shuttingDown ? ', on shutdown' : ''; - logger.debug(`flushed ${records}${extraMsg} records to index ${indexName}`); - }) - .catch((err) => { - const error = new TSError(err, { - reason: `Failure to flush "${recordType}"`, - }); - return Promise.reject(error); - }) - .finally(() => { - savingBulk = false; - }); + try { + const recordCount = await bulkSend(bulkRequest); + const extraMsg = shuttingDown ? ', on shutdown' : ''; + logger.debug(`flushed ${recordCount}${extraMsg} records to index ${indexName}`); + } finally { + savingBulk = false; + } } function getMapFile() { @@ -276,50 +285,6 @@ module.exports = function module(backendConfig) { return mapping; } - function isAvailable(indexArg = indexName) { - const query = { - index: indexArg, - q: '*', - size: 0, - terminate_after: '1', - }; - - return new Promise((resolve) => { - elasticsearch - .search(query) - .then((results) => { - logger.trace(`index ${indexName} is now available`); - resolve(results); - }) - .catch(() => { - let running = false; - const isReady = setInterval(() => { - if (isShutdown) { - clearInterval(isReady); - return; - } - - if (running) return; - running = true; - - elasticsearch - .search(query) - .then((results) => { - running = false; - - clearInterval(isReady); - resolve(results); - }) - .catch(() => { - running = false; - - logger.warn(`verifying ${recordType} index is open`); - }); - }, 200); - }); - }); - } - function sendTemplate(mapping) { if (mapping.template) { const clusterName = context.sysconfig.teraslice.name; @@ -357,7 +322,7 @@ module.exports = function module(backendConfig) { .then(results => results) .catch((err) => { // It's not really an error if it's just that the index is already there - if (parseError(err).match(/index_already_exists_exception/)) { + if (parseError(err).match(/already_exists_exception/)) { return true; } @@ -382,6 +347,22 @@ module.exports = function module(backendConfig) { return elasticsearch.putTemplate(template, name); } + function verifyClient() { + if (isShutdown) return false; + return elasticsearch.verifyClient(); + } + + async function waitForClient() { + let valid = elasticsearch.verifyClient(); + if (valid) return; + + await pWhilst(() => valid, async () => { + if (isShutdown) throw new Error('Elasticsearch store is shutdown'); + valid = elasticsearch.verifyClient(); + await pDelay(100); + }); + } + // Periodically flush the bulkQueue so we don't end up with cached data lingering. flushInterval = setInterval(() => { _flush().catch((err) => { @@ -401,10 +382,13 @@ module.exports = function module(backendConfig) { create, update, bulk, + bulkSend, remove, shutdown, count, putTemplate, + waitForClient, + verifyClient, }; const isMultiIndex = indexName[indexName.length - 1] === '*'; @@ -417,12 +401,21 @@ module.exports = function module(backendConfig) { newIndex = timeseriesIndex(timeseriesFormat, indexName.slice(0, nameSize)).index; } - return new Promise((resolve) => { + return new Promise((resolve, reject) => { const clientName = JSON.stringify({ connection: config.state.connection, index: indexName, }); - client = getClient(context, config.state, 'elasticsearch'); + + const connectionConfig = Object.assign({}, config.state); + if (connectionConfig.connection_cache == null) { + connectionConfig.connection_cache = true; + } + client = getClient(context, connectionConfig, 'elasticsearch'); + if (!client) { + reject(new Error(`Unable to get client for connection: ${config.state.connection}`)); + return; + } let { connection } = config.state; if (config.state.endpoint) { @@ -435,8 +428,8 @@ module.exports = function module(backendConfig) { }; elasticsearch = elasticsearchApi(client, logger, options); - return _createIndex(newIndex) - .then(() => isAvailable(newIndex)) + _createIndex(newIndex) + .then(() => elasticsearch.isAvailable(newIndex, recordType)) .then(() => resolve(api)) .catch((err) => { const error = new TSError(err, { @@ -473,7 +466,7 @@ module.exports = function module(backendConfig) { if (bool) { clearInterval(checking); logger.info('connection to elasticsearch has been established'); - return isAvailable(newIndex).then(() => { + return elasticsearch.isAvailable(newIndex, recordType).then(() => { resolve(api); }); } diff --git a/packages/teraslice/lib/cluster/storage/execution.js b/packages/teraslice/lib/cluster/storage/execution.js index 6aa3654b6b2..2ccfe29b302 100644 --- a/packages/teraslice/lib/cluster/storage/execution.js +++ b/packages/teraslice/lib/cluster/storage/execution.js @@ -2,9 +2,10 @@ const _ = require('lodash'); -const { TSError } = require('@terascope/utils'); +const { TSError, pRetry } = require('@terascope/utils'); const uuid = require('uuid'); const Promise = require('bluebird'); +const { makeLogger } = require('../../workers/helpers/terafoundation'); const elasticsearchBackend = require('./backends/elasticsearch_store'); const INIT_STATUS = ['pending', 'scheduling', 'initializing']; @@ -16,16 +17,16 @@ const VALID_STATUS = INIT_STATUS.concat(RUNNING_STATUS).concat(TERMINAL_STATUS); // Module to manager job states in Elasticsearch. // All functions in this module return promises that must be resolved to // get the final result. -module.exports = function module(context) { - const logger = context.apis.foundation.makeLogger({ module: 'ex_storage' }); +module.exports = function executionStorage(context) { + const logger = makeLogger(context, 'ex_storage'); const config = context.sysconfig.teraslice; const jobType = 'ex'; const indexName = `${config.name}__ex`; let backend; - function getExecution(exId) { - if (!exId) return Promise.reject(new Error('Execution.get() requires a exId')); + async function getExecution(exId) { + if (!exId) throw new Error('Execution.get() requires a exId'); return backend.get(exId); } @@ -58,15 +59,15 @@ module.exports = function module(context) { return metaData; } - function getStatus(exId) { - return getExecution(exId) - .then(result => result._status) - .catch((err) => { - const error = new TSError(err, { - reason: `Cannot get execution status ${exId}` - }); - return Promise.reject(error); + async function getStatus(exId) { + try { + const result = await getExecution(exId); + return result._status; + } catch (err) { + throw new TSError(err, { + reason: `Cannot get execution status ${exId}` }); + } } // verify the current status to make sure it can be updated to the desired status @@ -86,13 +87,17 @@ module.exports = function module(context) { // when the current status is running it cannot be set to an init status if (_isRunningStatus(status) && _isInitStatus(desiredStatus)) { - const error = new Error(`Cannot update running job status of "${status}" to init status of "${desiredStatus}"`); + const error = new TSError(`Cannot update running job status of "${status}" to init status of "${desiredStatus}"`, { + statusCode: 422 + }); return Promise.reject(error); } // when the status is a terminal status, it cannot be set to again if (_isTerminalStatus(status)) { - const error = new Error(`Cannot update terminal job status of "${status}" to "${desiredStatus}"`); + const error = new TSError(`Cannot update terminal job status of "${status}" to "${desiredStatus}"`, { + statusCode: 422 + }); return Promise.reject(error); } @@ -101,23 +106,29 @@ module.exports = function module(context) { }); } - function setStatus(exId, status, metaData) { - return verifyStatusUpdate(exId, status) - .then(() => { - const statusObj = { _status: status }; - if (metaData) { - _.assign(statusObj, metaData); - } - return update(exId, statusObj); - }) - .then(() => exId) - .catch((err) => { - const error = new TSError(err, { - statusCode: 422, - reason: `Unable to set execution ${exId} status code to ${status}` - }); - return Promise.reject(error); + async function setStatus(exId, status, metaData) { + await waitForClient(); + await pRetry(() => verifyStatusUpdate(exId, status), { + matches: ['no_shard_available_action_exception'], + delay: 1000, + retries: 10, + backoff: 5 + }); + + try { + const statusObj = { _status: status }; + if (metaData) { + Object.assign(statusObj, metaData); + } + await update(exId, statusObj); + } catch (err) { + throw new TSError(err, { + statusCode: 422, + reason: `Unable to set execution ${exId} status code to ${status}` }); + } + + return exId; } function remove(exId) { @@ -129,6 +140,14 @@ module.exports = function module(context) { return backend.shutdown(forceShutdown); } + function verifyClient() { + return backend.verifyClient(); + } + + function waitForClient() { + return backend.waitForClient(); + } + function getTerminalStatuses() { return TERMINAL_STATUS.slice(); } @@ -171,6 +190,8 @@ module.exports = function module(context) { getStatus, executionMetaData, verifyStatusUpdate, + waitForClient, + verifyClient, }; const backendConfig = { diff --git a/packages/teraslice/lib/cluster/storage/jobs.js b/packages/teraslice/lib/cluster/storage/jobs.js index a219f9aa673..578beca919b 100644 --- a/packages/teraslice/lib/cluster/storage/jobs.js +++ b/packages/teraslice/lib/cluster/storage/jobs.js @@ -1,15 +1,14 @@ 'use strict'; const uuid = require('uuid'); +const { makeLogger } = require('../../workers/helpers/terafoundation'); const elasticsearchBackend = require('./backends/elasticsearch_store'); // Module to manager job states in Elasticsearch. // All functions in this module return promises that must be resolved to // get the final result. -module.exports = function module(context) { - const logger = context.apis.foundation.makeLogger({ - module: 'job_storage' - }); +module.exports = function jobsStorage(context) { + const logger = makeLogger(context, 'job_storage'); const config = context.sysconfig.teraslice; const jobType = 'job'; @@ -52,12 +51,22 @@ module.exports = function module(context) { return backend.shutdown(forceShutdown); } + function verifyClient() { + return backend.verifyClient(); + } + + function waitForClient() { + return backend.waitForClient(); + } + const api = { get: getJob, search, create, update, remove, + verifyClient, + waitForClient, shutdown }; diff --git a/packages/teraslice/lib/cluster/storage/logs.js b/packages/teraslice/lib/cluster/storage/logs.js index c3de8738384..ac7f95f0ed0 100644 --- a/packages/teraslice/lib/cluster/storage/logs.js +++ b/packages/teraslice/lib/cluster/storage/logs.js @@ -4,7 +4,7 @@ const _ = require('lodash'); const Promise = require('bluebird'); const { getClient } = require('@terascope/job-components'); -module.exports = function module(context) { +module.exports = function logsStorage(context) { if (_.includes(context.sysconfig.terafoundation.logging, 'elasticsearch')) { const client = getClient(context, context.sysconfig.teraslice.state, 'elasticsearch'); const template = require('./backends/mappings/logs.json'); diff --git a/packages/teraslice/lib/cluster/storage/state.js b/packages/teraslice/lib/cluster/storage/state.js index a3af9bd943d..6c30923bd05 100644 --- a/packages/teraslice/lib/cluster/storage/state.js +++ b/packages/teraslice/lib/cluster/storage/state.js @@ -6,16 +6,20 @@ const { pRetry, toString, isRetryableError, - parseErrorInfo + parseErrorInfo, + isTest, } = require('@terascope/utils'); const { timeseriesIndex } = require('../../utils/date_utils'); +const { makeLogger } = require('../../workers/helpers/terafoundation'); const elasticsearchBackend = require('./backends/elasticsearch_store'); // Module to manager job states in Elasticsearch. // All functions in this module return promises that must be resolved to // get the final result. -module.exports = function module(context) { - const logger = context.apis.foundation.makeLogger({ module: 'state_storage' }); +module.exports = async function stateStorage(context) { + const recordType = 'state'; + + const logger = makeLogger(context, 'state_storage'); const config = context.sysconfig.teraslice; const _index = `${config.name}__state`; // making this to pass down to backend for dynamic index searches @@ -24,7 +28,31 @@ module.exports = function module(context) { let backend; - function createState(exId, slice, state, error) { + async function createState(exId, slice, state, error) { + await waitForClient(); + + const { record, index } = _createSliceRecord(exId, slice, state, error); + return backend.indexWithId(slice.slice_id, record, index); + } + + async function createSlices(exId, slices) { + await waitForClient(); + + const bulkRequest = []; + for (const slice of slices) { + const { record, index } = _createSliceRecord(exId, slice, 'start'); + bulkRequest.push({ + index: { + _index: index, + _type: recordType, + _id: record.slice_id, + }, + }, record); + } + return backend.bulkSend(bulkRequest); + } + + function _createSliceRecord(exId, slice, state, error) { const { index } = timeseriesIndex(timeseriesFormat, _index, slice._created); const record = { slice_id: slice.slice_id, @@ -40,8 +68,7 @@ module.exports = function module(context) { if (error) { record.error = toString(error); } - - return backend.indexWithId(slice.slice_id, record, index); + return { record, index }; } function updateState(slice, state, error) { @@ -58,6 +85,8 @@ module.exports = function module(context) { let notFoundErrCount = 0; async function update() { + await waitForClient(); + try { return await backend.update(slice.slice_id, record, indexData.index); } catch (_err) { @@ -79,34 +108,34 @@ module.exports = function module(context) { return pRetry(update, { retries: 10000, - delay: 1000, + delay: isTest ? 100 : 1000, backoff: 5, endWithFatal: true, }); } - function executionStartingSlice(exId, slicerId) { + async function executionStartingSlice(exId, slicerId) { const startQuery = `ex_id:${exId} AND slicer_id:${slicerId}`; const recoveryData = {}; - return backend.search(startQuery, 0, 1, 'slicer_order:desc') - .then((startingData) => { - if (startingData.length > 0) { - recoveryData.lastSlice = JSON.parse(startingData[0].request); - logger.info(`last slice process for slicer_id ${slicerId}, ex_id: ${exId} is`, recoveryData.lastSlice); - } + await waitForClient(); - return recoveryData; - }) - .catch((err) => { - const error = new TSError(err, { - reason: 'Failure getting the newest record' - }); - return Promise.reject(error); + try { + const startingData = await backend.search(startQuery, 0, 1, 'slicer_order:desc'); + if (startingData.length > 0) { + recoveryData.lastSlice = JSON.parse(startingData[0].request); + logger.info(`last slice process for slicer_id ${slicerId}, ex_id: ${exId} is`, recoveryData.lastSlice); + } + + return recoveryData; + } catch (err) { + throw new TSError(err, { + reason: 'Failure getting the newest record' }); + } } - function recoverSlices(exId, slicerId, cleanupType) { + async function recoverSlices(exId, slicerId, cleanupType) { let retryQuery = `ex_id:${exId} AND slicer_id:${slicerId}`; if (cleanupType && cleanupType === 'errors') { @@ -116,20 +145,23 @@ module.exports = function module(context) { } // Look for all slices that haven't been completed so they can be retried. - return backend.refresh(indexName) - .then(() => backend.search(retryQuery, 0, 5000)) - .then(results => results.map(doc => ({ + try { + await waitForClient(); + await backend.refresh(indexName); + + const results = await backend.search(retryQuery, 0, 5000); + return results.map(doc => ({ slice_id: doc.slice_id, slicer_id: doc.slicer_id, request: JSON.parse(doc.request), _created: doc._created - }))) - .catch((err) => { - const error = new TSError(err, { - reason: 'An error has occurred accessing the state log for retry' - }); - return Promise.reject(error); + })); + } catch (err) { + const error = new TSError(err, { + reason: 'An error has occurred accessing the state log for retry' }); + return Promise.reject(error); + } } function search(query, from, size, sort) { @@ -150,13 +182,24 @@ module.exports = function module(context) { return backend.refresh(index); } + function verifyClient() { + return backend.verifyClient(); + } + + function waitForClient() { + return backend.waitForClient(); + } + const api = { search, createState, + createSlices, updateState, recoverSlices, executionStartingSlice, count, + waitForClient, + verifyClient, shutdown, refresh, }; @@ -164,7 +207,7 @@ module.exports = function module(context) { const backendConfig = { context, indexName, - recordType: 'state', + recordType, idField: 'slice_id', fullResponse: false, logRecord: false, @@ -172,10 +215,8 @@ module.exports = function module(context) { storageName: 'state' }; - return elasticsearchBackend(backendConfig) - .then((elasticsearch) => { - backend = elasticsearch; - logger.info('state storage initialized'); - return api; - }); + const elasticsearch = await elasticsearchBackend(backendConfig); + backend = elasticsearch; + logger.info('state storage initialized'); + return api; }; diff --git a/packages/teraslice/lib/config/schemas/system.js b/packages/teraslice/lib/config/schemas/system.js index c0d7f353096..cd227df4955 100644 --- a/packages/teraslice/lib/config/schemas/system.js +++ b/packages/teraslice/lib/config/schemas/system.js @@ -6,7 +6,7 @@ const path = require('path'); const ip = _.chain(require('os').networkInterfaces()) .values() .flatten() - .filter(val => (val.family === 'IPv4' && val.internal === false)) + .filter(val => val.family === 'IPv4' && val.internal === false) .map('address') .head() .value(); @@ -30,13 +30,16 @@ const schema = { format: 'optional_String' }, shutdown_timeout: { - doc: 'time in milliseconds, to allow workers and slicers to finish operations before forcefully shutting down', + doc: + 'time in milliseconds, to allow workers and slicers to finish operations before forcefully shutting down', default: 60000, format(val) { if (isNaN(val)) { throw new Error('shutdown_timeout parameter for teraslice must be a number'); } else if (val < 0) { - throw new Error('shutdown_timeout parameter for teraslice must be a positive number'); + throw new Error( + 'shutdown_timeout parameter for teraslice must be a positive number' + ); } } }, @@ -66,7 +69,8 @@ const schema = { format: Boolean }, master_hostname: { - doc: 'hostname where the cluster_master resides, used to notify all node_masters where to connect', + doc: + 'hostname where the cluster_master resides, used to notify all node_masters where to connect', default: 'localhost', format: 'required_String' }, @@ -88,7 +92,9 @@ const schema = { throw new Error('state parameter must be an object with a key named "connection"'); } if (typeof val.connection !== 'string') { - throw new Error('state parameter object with a key "connection" must be of type String as the value'); + throw new Error( + 'state parameter object with a key "connection" must be of type String as the value' + ); } } }, @@ -96,56 +102,57 @@ const schema = { analytics: { number_of_shards: { doc: 'The number of shards for the analytics index', - default: 5, + default: 5 }, number_of_replicas: { doc: 'The number of replicas for the analytics index', - default: 1, - }, + default: 1 + } }, assets: { number_of_shards: { doc: 'The number of shards for the assets index', - default: 5, + default: 5 }, number_of_replicas: { doc: 'The number of replicas for the assets index', - default: 1, - }, + default: 1 + } }, jobs: { number_of_shards: { doc: 'The number of shards for the jobs index', - default: 5, + default: 5 }, number_of_replicas: { doc: 'The number of replicas for the jobs index', - default: 1, - }, + default: 1 + } }, execution: { number_of_shards: { doc: 'The number of shards for the execution index', - default: 5, + default: 5 }, number_of_replicas: { doc: 'The number of replicas for the execution index', - default: 1, - }, + default: 1 + } }, state: { number_of_shards: { doc: 'The number of shards for the state index', - default: 5, + default: 5 }, number_of_replicas: { doc: 'The number of replicas for the state index', - default: 1, - }, - }, + default: 1 + } + } }, action_timeout: { - doc: 'time in milliseconds for waiting for a action ( pause/stop job, etc) to complete before throwing an error', + doc: + 'time in milliseconds for waiting for a action ( pause/stop job, etc) to complete before throwing an error', default: 300000, format(val) { if (isNaN(val)) { @@ -156,18 +163,22 @@ const schema = { } }, network_latency_buffer: { - doc: 'time in milliseconds buffer which is combined with action_timeout to determine how long the cluster master will wait till it throws an error', + doc: + 'time in milliseconds buffer which is combined with action_timeout to determine how long the cluster master will wait till it throws an error', default: 15000, format(val) { if (isNaN(val)) { throw new Error('network_latency_buffer parameter for teraslice must be a number'); } else if (val <= 0) { - throw new Error('network_latency_buffer parameter for teraslice must be greater than zero'); + throw new Error( + 'network_latency_buffer parameter for teraslice must be greater than zero' + ); } } }, slicer_timeout: { - doc: 'time in milliseconds that the slicer will wait for worker connection before terminating the job', + doc: + 'time in milliseconds that the slicer will wait for worker connection before terminating the job', default: 180000, format(val) { if (isNaN(val)) { @@ -182,42 +193,57 @@ const schema = { default: 3, format(val) { if (isNaN(val)) { - throw new Error('slicer_allocation_attempts parameter for teraslice must be a number'); + throw new Error( + 'slicer_allocation_attempts parameter for teraslice must be a number' + ); } else if (val <= 0) { - throw new Error('slicer_allocation_attempts parameter for teraslice must be greater than zero'); + throw new Error( + 'slicer_allocation_attempts parameter for teraslice must be greater than zero' + ); } } }, node_state_interval: { - doc: 'time in milliseconds that indicates when the cluster master will ping nodes for their state', + doc: + 'time in milliseconds that indicates when the cluster master will ping nodes for their state', default: 5000, format(val) { if (isNaN(val)) { throw new Error('node_state_interval parameter for teraslice must be a number'); } else if (val <= 0) { - throw new Error('node_state_interval parameter for teraslice must be greater than zero'); + throw new Error( + 'node_state_interval parameter for teraslice must be greater than zero' + ); } } }, node_disconnect_timeout: { - doc: 'time in milliseconds that the cluster will wait untill it drops that node from state and attempts to provision the lost workers', + doc: + 'time in milliseconds that the cluster will wait untill it drops that node from state and attempts to provision the lost workers', default: 300000, format(val) { if (isNaN(val)) { throw new Error('node_disconnect_timeout parameter for teraslice must be a number'); } else if (val <= 0) { - throw new Error('node_disconnect_timeout parameter for teraslice must be greater than zero'); + throw new Error( + 'node_disconnect_timeout parameter for teraslice must be greater than zero' + ); } } }, worker_disconnect_timeout: { - doc: 'time in milliseconds that the slicer will wait after all workers have disconnected before terminating the job', + doc: + 'time in milliseconds that the slicer will wait after all workers have disconnected before terminating the job', default: 300000, format(val) { if (isNaN(val)) { - throw new Error('worker_disconnect_timeout parameter for teraslice must be a number'); + throw new Error( + 'worker_disconnect_timeout parameter for teraslice must be a number' + ); } else if (val <= 0) { - throw new Error('worker_disconnect_timeout parameter for teraslice must be greater than zero'); + throw new Error( + 'worker_disconnect_timeout parameter for teraslice must be greater than zero' + ); } } }, @@ -231,7 +257,9 @@ const schema = { } arr.forEach((value) => { if (isNaN(value)) { - throw new Error('values specified in slicer_port_range must be a number specified as a string'); + throw new Error( + 'values specified in slicer_port_range must be a number specified as a string' + ); } }); } @@ -290,7 +318,6 @@ const schema = { } }; - function configSchema() { return { teraslice: schema }; } diff --git a/packages/teraslice/lib/workers/execution-controller/execution-analytics.js b/packages/teraslice/lib/workers/execution-controller/execution-analytics.js index 646c6174903..3b9f3108be2 100644 --- a/packages/teraslice/lib/workers/execution-controller/execution-analytics.js +++ b/packages/teraslice/lib/workers/execution-controller/execution-analytics.js @@ -6,7 +6,7 @@ const { makeLogger } = require('../helpers/terafoundation'); class ExecutionAnalytics { constructor(context, executionContext, client) { - this.logger = makeLogger(context, executionContext, 'execution_analytics'); + this.logger = makeLogger(context, 'execution_analytics'); this.events = context.apis.foundation.getSystemEvents(); this.executionContext = executionContext; this.client = client; @@ -29,7 +29,7 @@ class ExecutionAnalytics { slicers: 0, subslice_by_key: 0, started: '', - queuing_complete: '', + queuing_complete: '' }; this.pushedAnalytics = { @@ -39,7 +39,7 @@ class ExecutionAnalytics { job_duration: 0, workers_joined: 0, workers_disconnected: 0, - workers_reconnected: 0, + workers_reconnected: 0 }; this._registerHandlers(); @@ -58,7 +58,7 @@ class ExecutionAnalytics { name, ex_id: exId, job_id: jobId, - stats: this.getAnalytics(), + stats: this.getAnalytics() })); this.isRunning = true; diff --git a/packages/teraslice/lib/workers/execution-controller/index.js b/packages/teraslice/lib/workers/execution-controller/index.js index 58abac6ecd5..4d9c4ba0900 100644 --- a/packages/teraslice/lib/workers/execution-controller/index.js +++ b/packages/teraslice/lib/workers/execution-controller/index.js @@ -21,7 +21,7 @@ const { formatURL } = Messaging; class ExecutionController { constructor(context, executionContext) { const workerId = generateWorkerId(context); - const logger = makeLogger(context, executionContext, 'execution_controller'); + const logger = makeLogger(context, 'execution_controller'); const events = context.apis.foundation.getSystemEvents(); const slicerPort = executionContext.config.slicer_port; const config = context.sysconfig.teraslice; @@ -218,7 +218,9 @@ class ExecutionController { // this is updating the opConfig for elasticsearch start and/or end dates for ex, // this assumes elasticsearch is first - this.stores.exStore.update(this.exId, { operations: update }); + this.stores.exStore.update(this.exId, { operations: update }).catch((err) => { + this.logger.error(err, 'slicer event execution update failure'); + }); }; this._handlers['slicers:finished'] = (err) => { @@ -379,6 +381,7 @@ class ExecutionController { clearInterval(this.sliceFailureInterval); clearTimeout(this.workerConnectTimeoutId); clearTimeout(this.workerDisconnectTimeoutId); + clearInterval(this._verifyStoresInterval); await this._waitForExecutionFinished(); @@ -426,6 +429,7 @@ class ExecutionController { this.startTime = Date.now(); this.isStarted = true; + this._verifyStores(); // wait for paused await pWhilst(() => this.isPaused && !this.isShuttdown, () => pDelay(100)); @@ -499,7 +503,7 @@ class ExecutionController { }); dispatch.length = 0; - Promise.all(promises).catch(err => this.logger.error('failure to dispatch slices', err)); + Promise.all(promises).catch(err => this.logger.error(err, 'failure to dispatch slices')); }); } @@ -610,7 +614,6 @@ class ExecutionController { // if this.slicerFailed is true, slicer has already been marked as failed if (this.slicerFailed) return; - const { logger } = this; const { exStore } = this.stores; const executionStats = this.executionAnalytics.getAnalytics(); @@ -621,7 +624,7 @@ class ExecutionController { const isStopping = status === 'stopping' || status === 'stopped'; if (isStopping) { const metaData = exStore.executionMetaData(executionStats); - logger.debug(`execution is set to ${status}, status will not be updated`); + this.logger.debug(`execution is set to ${status}, status will not be updated`); await exStore.update(this.exId, metaData); return; } @@ -630,7 +633,7 @@ class ExecutionController { this.exId } received shutdown before the slicer could complete, setting status to "terminated"`; const metaData = exStore.executionMetaData(executionStats, errMsg); - logger.error(errMsg); + this.logger.error(errMsg); await exStore.setStatus(this.exId, 'terminated', metaData); return; } @@ -643,13 +646,13 @@ class ExecutionController { if (errors > 0 || started > 0) { const errMsg = this._formartExecutionFailure({ errors, started }); const errorMeta = exStore.executionMetaData(executionStats, errMsg); - logger.error(errMsg); + this.logger.error(errMsg); await exStore.setStatus(this.exId, 'failed', errorMeta); return; } const metaData = exStore.executionMetaData(executionStats); - logger.info(`execution ${this.exId} has completed`); + this.logger.info(`execution ${this.exId} has completed`); await exStore.setStatus(this.exId, 'completed', metaData); } @@ -660,7 +663,7 @@ class ExecutionController { this.executionAnalytics.set('job_duration', time); - if (this.collectAnalytics) { + if (this.collectAnalytics && this.slicerAnalytics) { this.slicerAnalytics.analyzeStats(); } @@ -826,6 +829,60 @@ class ExecutionController { return false; } + _verifyStores() { + let paused = false; + + const prettyStoreNames = { + exStore: 'execution', + stateStore: 'state' + }; + + const logPaused = _.throttle((storesStr) => { + this.logger.warn(`${storesStr} are in a invalid state, scheduler is paused`); + }, 10 * 1000); + + clearInterval(this._verifyStoresInterval); + this._verifyStoresInterval = setInterval(() => { + if (!this.stores) return; + if (this.isShuttingDown || this.isShutdown) return; + + const invalid = []; + for (const [name, store] of Object.entries(this.stores)) { + try { + const valid = store.verifyClient(); + if (!valid) { + invalid.push(prettyStoreNames[name] || name); + } + } catch (err) { + clearInterval(this._verifyStoresInterval); + this._terminalError(err); + return; + } + } + + if (invalid.length) { + const storesStr = `elasticsearch stores ${invalid.join(', ')}`; + if (paused) { + logPaused(storesStr); + return; + } + + this.logger.warn(`${storesStr} are in a invalid state, pausing scheduler...`); + paused = true; + this.scheduler.pause(); + return; + } + + if (paused) { + this.logger.info( + 'elasticsearch stores are now in a valid state, resumming scheduler...' + ); + paused = false; + this.scheduler.start(); + } + }, 100); + } + _initSliceFailureWatchDog() { const probationWindow = this.executionContext.config.probation_window; let watchDogSet = false; diff --git a/packages/teraslice/lib/workers/execution-controller/recovery.js b/packages/teraslice/lib/workers/execution-controller/recovery.js index 286af86532c..41cd82bebdd 100644 --- a/packages/teraslice/lib/workers/execution-controller/recovery.js +++ b/packages/teraslice/lib/workers/execution-controller/recovery.js @@ -3,6 +3,7 @@ const _ = require('lodash'); const Promise = require('bluebird'); const Queue = require('@terascope/queue'); +const { makeLogger } = require('../helpers/terafoundation'); function recovery(context, stateStore, executionContext) { const events = context.apis.foundation.getSystemEvents(); @@ -11,16 +12,12 @@ function recovery(context, stateStore, executionContext) { const cleanupType = executionContext.config.recovered_slice_type; const recoverExecution = executionContext.config.recovered_execution; - const { exId, jobId } = executionContext; + const { exId } = executionContext; let recoverComplete = true; let isShutdown = false; - const logger = context.apis.foundation.makeLogger({ - module: 'execution_recovery', - ex_id: exId, - job_id: jobId, - }); + const logger = makeLogger(context, 'execution_recovery'); const retryState = {}; @@ -169,7 +166,7 @@ function recovery(context, stateStore, executionContext) { _recoveryBatchCompleted, _setId, _waitForRecoveryBatchCompletion, - _sliceComplete, + _sliceComplete }; } @@ -183,7 +180,7 @@ function recovery(context, stateStore, executionContext) { recoveryComplete, handle, shutdown, - __test_context: testContext, + __test_context: testContext }; } diff --git a/packages/teraslice/lib/workers/execution-controller/scheduler.js b/packages/teraslice/lib/workers/execution-controller/scheduler.js index 93381250b4b..d8d108a35bf 100644 --- a/packages/teraslice/lib/workers/execution-controller/scheduler.js +++ b/packages/teraslice/lib/workers/execution-controller/scheduler.js @@ -1,7 +1,7 @@ 'use strict'; const { - noop, pDelay, get, toString + noop, pDelay, get, toString, makeISODate } = require('@terascope/utils'); const pWhilst = require('p-whilst'); const Queue = require('@terascope/queue'); @@ -11,7 +11,7 @@ const { makeLogger } = require('../helpers/terafoundation'); class Scheduler { constructor(context, executionContext) { this.context = context; - this.logger = makeLogger(context, executionContext, 'execution_scheduler'); + this.logger = makeLogger(context, 'execution_scheduler'); this.events = context.apis.foundation.getSystemEvents(); this.executionContext = executionContext; this.exId = executionContext.exId; @@ -319,7 +319,7 @@ class Scheduler { createInterval = setInterval(() => { if (!this.pendingSlicerCount) return; - this._drainPendingSlices().catch(err => this.logger.error(err, 'failure creating slices')); + this._drainPendingSlices().catch(onSlicerFailure); }, 5); this._processCleanup = cleanup; @@ -346,27 +346,37 @@ class Scheduler { return this.executionContext.slicer().getSlices(this.queueRemainder); } - // In the case of recovery slices have already been - // created, so its important to skip this step - _ensureSliceState(slice) { - if (slice._created) return Promise.resolve(slice); + async _createSlices(allSlices) { + // filter out anything that doesn't need to be created + const slices = []; - slice._created = new Date().toISOString(); + for (const slice of allSlices) { + // In the case of recovery slices have already been + // created, so its important to skip this step + if (slice._created) { + this.enqueueSlice(slice); + } else { + slice._created = makeISODate(); + slices.push(slice); + } + } - // this.stateStore is attached from the execution_controller - return this.stateStore.createState(this.exId, slice, 'start').then(() => slice); - } + if (!slices.length) return; - _createSlices(slices) { this._creating += slices.length; - const promises = slices.map(slice => this._ensureSliceState(slice) - .then(_slice => this.enqueueSlice(_slice)) - .finally(() => { - this._creating -= 1; - })); - - return Promise.all(promises); + try { + const count = await this.stateStore.createSlices(this.exId, slices); + this.enqueueSlices(slices); + this._creating -= count; + } catch (err) { + const { lifecycle } = this.executionContext.config; + if (lifecycle === 'once') { + throw err; + } else { + this.logger.error(err, 'failure creating slices'); + } + } } async _recoverSlices() { diff --git a/packages/teraslice/lib/workers/execution-controller/slice-analytics.js b/packages/teraslice/lib/workers/execution-controller/slice-analytics.js index 486791eea1c..7177427ca29 100644 --- a/packages/teraslice/lib/workers/execution-controller/slice-analytics.js +++ b/packages/teraslice/lib/workers/execution-controller/slice-analytics.js @@ -1,15 +1,10 @@ 'use strict'; const _ = require('lodash'); +const { makeLogger } = require('../helpers/terafoundation'); module.exports = function _sliceAnalytics(context, executionContext) { - const { exId, jobId } = executionContext; - - const logger = context.apis.foundation.makeLogger({ - module: 'slice_analytics', - ex_id: exId, - job_id: jobId, - }); + const logger = makeLogger(context, 'slice_analytics'); const events = context.apis.foundation.getSystemEvents(); @@ -19,7 +14,7 @@ module.exports = function _sliceAnalytics(context, executionContext) { const sliceAnalytics = { time: [], size: [], - memory: [], + memory: [] }; for (let i = 0; i < operations.length; i += 1) { @@ -28,21 +23,21 @@ module.exports = function _sliceAnalytics(context, executionContext) { max: 0, sum: 0, total: 0, - average: 0, + average: 0 }); sliceAnalytics.size.push({ min: 0, max: 0, sum: 0, total: 0, - average: 0, + average: 0 }); sliceAnalytics.memory.push({ min: 0, max: 0, sum: 0, total: 0, - average: 0, + average: 0 }); } @@ -115,6 +110,6 @@ average memory: ${memory.average}, min: ${memory.min}, and max: ${memory.max} addStats, analyzeStats, getStats, - shutdown, + shutdown }; }; diff --git a/packages/teraslice/lib/workers/helpers/terafoundation.js b/packages/teraslice/lib/workers/helpers/terafoundation.js index f80742dea53..820c8949946 100644 --- a/packages/teraslice/lib/workers/helpers/terafoundation.js +++ b/packages/teraslice/lib/workers/helpers/terafoundation.js @@ -1,7 +1,8 @@ 'use strict'; -const { get } = require('@terascope/utils'); +const { get, isFunction, isString } = require('@terascope/utils'); const { makeContextLogger } = require('@terascope/job-components'); +const { safeDecode } = require('../../utils/encoding_utils'); function generateWorkerId(context) { const { hostname } = context.sysconfig.teraslice; @@ -9,14 +10,34 @@ function generateWorkerId(context) { return `${hostname}__${clusterId}`; } -function makeLogger(context, executionContext, moduleName, extra = {}) { - const { exId, jobId } = executionContext; +function makeLogger(context, moduleName, extra = {}) { + if (!context || !context.apis) { + throw new Error('makeLogger expected terafoundation context as first arg'); + } - return makeContextLogger( - context, - moduleName, - Object.assign({}, extra, { ex_id: exId, job_id: jobId }) - ); + if (!moduleName || !isString(moduleName)) { + throw new Error('makeLogger expected module name as second arg'); + } + + const exAPI = context.apis.executionContext; + if (exAPI && isFunction(exAPI.makeLogger)) { + return exAPI.makeLogger(moduleName, extra); + } + + const defaultContext = {}; + if (process.env.EX) { + const ex = safeDecode(process.env.EX); + const exId = get(ex, 'ex_id'); + const jobId = get(ex, 'job_id'); + if (exId) { + defaultContext.ex_id = exId; + } + if (jobId) { + defaultContext.job_id = jobId; + } + } + + return makeContextLogger(context, moduleName, Object.assign(defaultContext, extra)); } module.exports = { generateWorkerId, makeLogger }; diff --git a/packages/teraslice/lib/workers/helpers/worker-shutdown.js b/packages/teraslice/lib/workers/helpers/worker-shutdown.js index 546370231a3..30d4f1e5b5b 100644 --- a/packages/teraslice/lib/workers/helpers/worker-shutdown.js +++ b/packages/teraslice/lib/workers/helpers/worker-shutdown.js @@ -2,6 +2,7 @@ const Promise = require('bluebird'); const { get, pDelay, isError } = require('@terascope/utils'); +const { makeLogger } = require('./terafoundation'); function waitForWorkerShutdown(context, eventName) { const shutdownTimeout = get(context, 'sysconfig.teraslice.shutdown_timeout', 30000); @@ -32,8 +33,9 @@ function shutdownHandler(context, shutdownFn) { || process.env.assignment || 'unknown-assignment'; + const isK8s = get(context, 'sysconfig.teraslice.cluster_manager_type') === 'kubernetes'; const isProcessRestart = process.env.process_restart; - const restartOnFailure = assignment !== 'exectution_controller'; + const allowNonZeroExitCode = isK8s || assignment !== 'exectution_controller'; const api = { exiting: false, exit @@ -42,7 +44,7 @@ function shutdownHandler(context, shutdownFn) { const shutdownTimeout = get(context, 'sysconfig.teraslice.shutdown_timeout', 20 * 1000); const events = context.apis.foundation.getSystemEvents(); - const logger = context.apis.foundation.makeLogger({ module: `${assignment}:shutdown_handler` }); + const logger = makeLogger(context, `${assignment}:shutdown_handler`); if (assignment === 'execution_controller' && isProcessRestart) { logger.fatal( @@ -95,7 +97,11 @@ function shutdownHandler(context, shutdownFn) { logger.error(error, `${assignment} while shutting down`); } finally { await flushLogs(); - process.exit(); + if (allowNonZeroExitCode) { + process.exit(); + } else { + process.exit(0); + } } } @@ -118,7 +124,7 @@ function shutdownHandler(context, shutdownFn) { process.on('uncaughtException', (err) => { logger.fatal(err, `${assignment} received an uncaughtException, ${exitingIn()}`); if (!api.exiting) { - process.exitCode = restartOnFailure ? 1 : 0; + process.exitCode = 1; } exit('uncaughtException', err); }); @@ -126,7 +132,7 @@ function shutdownHandler(context, shutdownFn) { process.once('unhandledRejection', (err) => { logger.fatal(err, `${assignment} received an unhandledRejection, ${exitingIn()}`); if (!api.exiting) { - process.exitCode = restartOnFailure ? 1 : 0; + process.exitCode = 1; } exit('unhandledRejection', err); }); @@ -147,7 +153,7 @@ function shutdownHandler(context, shutdownFn) { events.once('client:initialization:error', (err) => { logger.fatal(`${assignment} received a client initialization error, ${exitingIn()}`, err); if (!api.exiting) { - process.exitCode = restartOnFailure ? 1 : 0; + process.exitCode = 1; } exit('client:initialization:error', err); }); diff --git a/packages/teraslice/lib/workers/worker/index.js b/packages/teraslice/lib/workers/worker/index.js index fcd2503d9eb..f251ac759f3 100644 --- a/packages/teraslice/lib/workers/worker/index.js +++ b/packages/teraslice/lib/workers/worker/index.js @@ -1,7 +1,6 @@ 'use strict'; -const { get } = require('@terascope/utils'); -const { isFatalError } = require('@terascope/job-components'); +const { get, getFullErrorStack, isFatalError } = require('@terascope/utils'); const { ExecutionController, formatURL } = require('@terascope/teraslice-messaging'); const { makeStateStore, makeAnalyticsStore } = require('../../cluster/storage'); const { waitForWorkerShutdown } = require('../helpers/worker-shutdown'); @@ -11,12 +10,12 @@ const Slice = require('./slice'); class Worker { constructor(context, executionContext) { const workerId = generateWorkerId(context); - const logger = makeLogger(context, executionContext, 'worker'); + const logger = makeLogger(context, 'worker'); const events = context.apis.foundation.getSystemEvents(); const { slicer_port: slicerPort, - slicer_hostname: slicerHostname, + slicer_hostname: slicerHostname } = executionContext.config; const config = context.sysconfig.teraslice; @@ -31,7 +30,7 @@ class Worker { networkLatencyBuffer, connectTimeout: workerDisconnectTimeout, actionTimeout, - logger, + logger }); this.slice = new Slice(context, executionContext); @@ -80,7 +79,8 @@ class Worker { try { await this.runOnce(); } catch (err) { - this.logger.fatal(err, 'Worker must shutdown to Fatal Error'); + process.exitCode = 1; + this.logger.fatal(err, 'Worker must shutdown due to fatal error'); this.shutdown(false); } finally { running = false; @@ -136,7 +136,7 @@ class Worker { await this.client.sendSliceComplete({ slice: this.slice.slice, - analytics: this.slice.analyticsData, + analytics: this.slice.analyticsData }); await this.executionContext.onSliceFinished(sliceId); @@ -150,7 +150,7 @@ class Worker { await this.client.sendSliceComplete({ slice: this.slice.slice, analytics: this.slice.analyticsData, - error: err.toString(), + error: getFullErrorStack(err) }); } @@ -161,12 +161,14 @@ class Worker { async shutdown(block = true) { if (this.isShutdown) return; if (!this.isInitialized) return; + const { exId } = this.executionContext; + if (this.isShuttingDown) { const msgs = [ 'worker', - `shutdown was called for ${this.exId}`, + `shutdown was called for ${exId}`, 'but it was already shutting down', - block ? ', will block until done' : '', + block ? ', will block until done' : '' ]; this.logger.debug(msgs.join(' ')); @@ -176,8 +178,6 @@ class Worker { return; } - const { exId } = this.executionContext; - this.client.available = false; this.isShuttingDown = true; @@ -192,7 +192,7 @@ class Worker { // and wait for the slice to finish await Promise.all([ this.slice.flush().catch(pushError), - this._waitForSliceToFinish().catch(pushError), + this._waitForSliceToFinish().catch(pushError) ]); this.events.emit('worker:shutdown'); @@ -211,7 +211,7 @@ class Worker { })(), (async () => { await this.client.shutdown().catch(pushError); - })(), + })() ]); const n = this.slicesProcessed; diff --git a/packages/teraslice/lib/workers/worker/slice.js b/packages/teraslice/lib/workers/worker/slice.js index 5208045cbd9..5589bded589 100644 --- a/packages/teraslice/lib/workers/worker/slice.js +++ b/packages/teraslice/lib/workers/worker/slice.js @@ -17,8 +17,8 @@ class Slice { this.stateStore = stores.stateStore; this.analyticsStore = stores.analyticsStore; this.slice = slice; - this.logger = makeLogger(this.context, this.executionContext, 'slice', { - slice_id: sliceId, + this.logger = makeLogger(this.context, 'slice', { + slice_id: sliceId }); await this.executionContext.initializeSlice(slice); @@ -72,7 +72,7 @@ class Slice { } catch (err) { this.logger.error( new TSError(err, { - reason: `Slice: ${slice.slice_id} failure on lifecycle event onSliceFinalizing`, + reason: `Slice: ${slice.slice_id} failure on lifecycle event onSliceFinalizing` }) ); } @@ -84,7 +84,7 @@ class Slice { } catch (err) { this.logger.error( new TSError(err, { - reason: `Slice: ${slice.slice_id} failure on lifecycle event onSliceFailed`, + reason: `Slice: ${slice.slice_id} failure on lifecycle event onSliceFailed` }) ); } @@ -106,7 +106,7 @@ class Slice { } catch (_err) { this.logger.error( new TSError(_err, { - reason: 'Failure to update analytics', + reason: 'Failure to update analytics' }) ); } @@ -133,7 +133,7 @@ class Slice { await this._onSliceFailure(slice); throw new TSError(err, { - reason: 'Slice failed processing', + reason: 'Slice failed processing' }); } } diff --git a/packages/teraslice/package.json b/packages/teraslice/package.json index 853f8e15539..a5f9bf7dd1b 100644 --- a/packages/teraslice/package.json +++ b/packages/teraslice/package.json @@ -1,6 +1,6 @@ { "name": "teraslice", - "version": "0.53.3", + "version": "0.54.0", "description": "Distributed computing platform for processing JSON data", "homepage": "https://github.com/terascope/teraslice#readme", "bugs": { @@ -34,12 +34,12 @@ "test:watch": "jest --coverage=false --notify --watch --onlyChanged" }, "dependencies": { - "@terascope/elasticsearch-api": "^2.0.5", + "@terascope/elasticsearch-api": "^2.1.0", "@terascope/error-parser": "^1.0.2", - "@terascope/job-components": "^0.20.4", + "@terascope/job-components": "^0.20.5", "@terascope/queue": "^1.1.6", "@terascope/teraslice-messaging": "^0.3.3", - "@terascope/utils": "^0.12.3", + "@terascope/utils": "^0.12.4", "async-mutex": "^0.1.3", "barbe": "^3.0.15", "bluebird": "^3.5.5", @@ -60,7 +60,7 @@ "shortid": "^2.2.14", "socket.io": "^1.7.4", "socket.io-client": "^1.7.4", - "terafoundation": "^0.9.1", + "terafoundation": "^0.10.0", "uuid": "^3.3.2" }, "devDependencies": { diff --git a/packages/teraslice/test/config/schemas/system_schema-spec.js b/packages/teraslice/test/config/schemas/system_schema-spec.js index b396001d782..af85d5edab0 100644 --- a/packages/teraslice/test/config/schemas/system_schema-spec.js +++ b/packages/teraslice/test/config/schemas/system_schema-spec.js @@ -1,9 +1,11 @@ 'use strict'; const convict = require('convict'); -const sysSchema = require('../../../lib/config/schemas/system'); +// load any convict schema require('@terascope/job-components'); +const sysSchema = require('../../../lib/config/schemas/system'); + describe('system_schema', () => { const schema = sysSchema.config_schema({}).teraslice; @@ -29,6 +31,8 @@ describe('system_schema', () => { }); it('assets_directory is optional but requires a string', () => { - expect(checkValidation({ assets_directory: 234 })).toEqual('assets_directory: This field is optional but if specified it must be of type string: value was 234'); + expect(checkValidation({ assets_directory: 234 })).toEqual( + 'assets_directory: This field is optional but if specified it must be of type string: value was 234' + ); }); }); diff --git a/packages/teraslice/test/workers/execution-controller/scheduler-spec.js b/packages/teraslice/test/workers/execution-controller/scheduler-spec.js index 989c55e01c2..b9836c2e703 100644 --- a/packages/teraslice/test/workers/execution-controller/scheduler-spec.js +++ b/packages/teraslice/test/workers/execution-controller/scheduler-spec.js @@ -2,7 +2,7 @@ const _ = require('lodash'); const uuidv4 = require('uuid/v4'); -const Promise = require('bluebird'); +const { pDelay } = require('@terascope/utils'); const TestContext = require('../helpers/test-context'); const Scheduler = require('../../../lib/workers/execution-controller/scheduler'); @@ -38,7 +38,7 @@ describe('Scheduler', () => { assignment: 'execution_controller', slicers, newOps: true, - countPerSlicer, + countPerSlicer }); await testContext.initialize(); @@ -46,7 +46,11 @@ describe('Scheduler', () => { scheduler = new Scheduler(testContext.context, testContext.executionContext); scheduler.stateStore = { - createState: () => Promise.delay(), + createState: () => pDelay(0), + createSlices: async (exId, slices) => { + await pDelay(0); + return slices.length; + } }; testContext.attachCleanup(() => scheduler.shutdown()); @@ -66,18 +70,18 @@ describe('Scheduler', () => { it('should be able to reenqueue a slice', () => { scheduler.enqueueSlices([ { - slice_id: 1, + slice_id: 1 }, { - slice_id: 2, - }, + slice_id: 2 + } ]); scheduler.enqueueSlice({ slice_id: 1 }); scheduler.enqueueSlice( { - slice_id: 3, + slice_id: 3 }, true ); @@ -85,14 +89,14 @@ describe('Scheduler', () => { const slices = scheduler.getSlices(100); expect(slices).toEqual([ { - slice_id: 3, + slice_id: 3 }, { - slice_id: 1, + slice_id: 1 }, { - slice_id: 2, - }, + slice_id: 2 + } ]); }); @@ -103,13 +107,13 @@ describe('Scheduler', () => { scheduler.run(), getSlices().then((_slices) => { slices = _slices; - }), + }) ]); expect(scheduler.paused).toBeFalse(); expect(scheduler.slicersDone).toBeTrue(); expect(scheduler.queueLength).toEqual(0); - expect(slices).toBeArrayOfSize(expectedCount); + expect(slices).toHaveLength(expectedCount); expect(scheduler.isFinished).toBeTrue(); }); @@ -127,10 +131,10 @@ describe('Scheduler', () => { scheduler.run(), getSlices().then((_slices) => { slices = _slices; - }), + }) ]); - expect(slices).toBeArrayOfSize(expectedCount); + expect(slices).toHaveLength(expectedCount); expect(scheduler.isFinished).toBeTrue(); expect(scheduler.slicersDone).toBeTrue(); }); @@ -144,7 +148,7 @@ describe('Scheduler', () => { scheduler.run(), getSlices().then((_slices) => { slices = _slices; - }), + }) ]); // be more flexible @@ -163,9 +167,9 @@ describe('Scheduler', () => { slicer_id: 1, slicer_order: 0, request: { - id: _.uniqueId('recover-'), + id: _.uniqueId('recover-') }, - _created: new Date().toISOString(), + _created: new Date().toISOString() })); const emitDone = _.once(() => { @@ -208,7 +212,7 @@ describe('Scheduler', () => { }, exitAfterComplete() { return false; - }, + } }; expectedCount += recoveryRecords.length; @@ -217,10 +221,10 @@ describe('Scheduler', () => { scheduler.run(), getSlices().then((_slices) => { slices = _slices; - }), + }) ]); - expect(slices).toBeArrayOfSize(expectedCount); + expect(slices).toHaveLength(expectedCount); expect(scheduler.ready).toBeTrue(); expect(scheduler.isFinished).toBeTrue(); expect(scheduler.stopped).toBeFalse(); @@ -234,9 +238,9 @@ describe('Scheduler', () => { slicer_id: 1, slicer_order: 0, request: { - id: _.uniqueId('recover-'), + id: _.uniqueId('recover-') }, - _created: new Date().toISOString(), + _created: new Date().toISOString() })); let slices = []; @@ -278,7 +282,7 @@ describe('Scheduler', () => { }, exitAfterComplete() { return true; - }, + } }; expectedCount = recoveryRecords.length; @@ -287,10 +291,10 @@ describe('Scheduler', () => { scheduler.run(), getSlices().then((_slices) => { slices = _slices; - }), + }) ]); - expect(slices).toBeArrayOfSize(expectedCount); + expect(slices).toHaveLength(expectedCount); expect(scheduler.ready).toBeFalse(); expect(scheduler.isFinished).toBeTrue(); expect(scheduler.stopped).toBeFalse(); diff --git a/packages/teraslice/test/workers/helpers/configs.js b/packages/teraslice/test/workers/helpers/configs.js index 00ac84474d1..22845bb2046 100644 --- a/packages/teraslice/test/workers/helpers/configs.js +++ b/packages/teraslice/test/workers/helpers/configs.js @@ -2,7 +2,6 @@ const path = require('path'); const Chance = require('chance'); -const random = require('lodash/random'); const pickBy = require('lodash/pickBy'); const { newId } = require('../../../lib/utils/id_utils'); @@ -12,14 +11,6 @@ const { ELASTICSEARCH_HOST } = process.env; const chance = new Chance(); -const newSliceConfig = (request = { example: 'slice-data' }) => ({ - slice_id: newId('slice-id', true), - slicer_id: newId('slicer-id', true), - slicer_order: random(0, 1000), - request, - _created: new Date().toISOString() -}); - const newConfig = (options = {}) => { const { newOps } = options; let { operations } = options; @@ -28,11 +19,11 @@ const newConfig = (options = {}) => { operations = [ pickBy({ _op: path.join(opsPath, 'new-reader'), - countPerSlicer: options.countPerSlicer, + countPerSlicer: options.countPerSlicer }), pickBy({ _op: path.join(opsPath, 'new-op'), - failOnSliceRetry: options.failOnSliceRetry || false, + failOnSliceRetry: options.failOnSliceRetry || false }), { _op: 'noop' @@ -47,13 +38,13 @@ const newConfig = (options = {}) => { results: options.readerResults, slicerResults: options.slicerResults, slicerErrorAt: options.slicerErrorAt, - slicerQueueLength: options.slicerQueueLength, + slicerQueueLength: options.slicerQueueLength }), pickBy({ _op: path.join(opsPath, 'example-op'), exampleProp: 123, errorAt: options.opErrorAt, - results: options.opResults, + results: options.opResults }) ]; } @@ -69,7 +60,7 @@ const newConfig = (options = {}) => { slicers = 1, recoveredExecution, recoveredSliceType, - probationWindow = 5000, + probationWindow = 5000 } = options; return { @@ -88,11 +79,10 @@ const newConfig = (options = {}) => { node_id: newId('node-id', true), slicer_port: slicerPort, slicer_hostname: 'localhost', - probation_window: probationWindow, + probation_window: probationWindow }; }; - const newSysConfig = (options = {}) => { const { clusterName = 'test-teraslice-cluster', @@ -100,7 +90,7 @@ const newSysConfig = (options = {}) => { actionTimeout = 2000, shutdownTimeout = 4000, assetDir, - clusterMasterPort, + clusterMasterPort } = options; return { @@ -111,7 +101,7 @@ const newSysConfig = (options = {}) => { default: { host: [ELASTICSEARCH_HOST], requestTimeout: timeout, - deadTimeout: timeout, + deadTimeout: timeout } } } @@ -150,7 +140,7 @@ const newSysConfig = (options = {}) => { state: { number_of_shards: 1, number_of_replicas: 0 - }, + } } } }; @@ -159,6 +149,5 @@ const newSysConfig = (options = {}) => { module.exports = { opsPath, newConfig, - newSliceConfig, newSysConfig }; diff --git a/packages/teraslice/test/workers/helpers/index.js b/packages/teraslice/test/workers/helpers/index.js index 5d957eebe21..15258727826 100644 --- a/packages/teraslice/test/workers/helpers/index.js +++ b/packages/teraslice/test/workers/helpers/index.js @@ -2,19 +2,13 @@ const TestContext = require('./test-context'); const { - newSliceConfig, - newConfig, - newSysConfig, - opsPath, - newId, + newConfig, newSysConfig, opsPath, newId } = require('./configs'); - module.exports = { newConfig, - newSliceConfig, opsPath, newId, newSysConfig, - TestContext, + TestContext }; diff --git a/packages/teraslice/test/workers/helpers/test-context.js b/packages/teraslice/test/workers/helpers/test-context.js index 2b7fcd8826f..e7c72ed822d 100644 --- a/packages/teraslice/test/workers/helpers/test-context.js +++ b/packages/teraslice/test/workers/helpers/test-context.js @@ -2,11 +2,10 @@ /* eslint-disable no-console */ -const Promise = require('bluebird'); -const _ = require('lodash'); -const { createTempDirSync, cleanupTempDirs } = require('jest-fixtures'); -const path = require('path'); const fs = require('fs-extra'); +const path = require('path'); +const { createTempDirSync, cleanupTempDirs } = require('jest-fixtures'); +const { newTestSlice, get } = require('@terascope/job-components'); const { ClusterMaster } = require('@terascope/teraslice-messaging'); const { @@ -14,7 +13,7 @@ const { makeStateStore, makeAnalyticsStore, makeExStore, - makeJobStore, + makeJobStore } = require('../../../lib/cluster/storage'); const { initializeJob } = require('../../../lib/workers/helpers/job'); @@ -22,7 +21,7 @@ const makeTerafoundationContext = require('../../../lib/workers/context/terafoun const makeExecutionContext = require('../../../lib/workers/context/execution-context'); const { newId } = require('../../../lib/utils/id_utils'); const { findPort } = require('../../../lib/utils/port_utils'); -const { newConfig, newSysConfig, newSliceConfig } = require('./configs'); +const { newConfig, newSysConfig } = require('./configs'); const zipDirectory = require('./zip-directory'); const { TERASLICE_CLUSTER_NAME } = process.env; @@ -36,10 +35,7 @@ const stores = {}; class TestContext { constructor(options = {}) { const { - clusterMasterPort, - shutdownTimeout, - actionTimeout, - timeout, + clusterMasterPort, shutdownTimeout, actionTimeout, timeout } = options; this.setupId = newId('setup', true); @@ -51,7 +47,7 @@ class TestContext { clusterMasterPort, actionTimeout, timeout, - shutdownTimeout, + shutdownTimeout }); this.config = newConfig(options); @@ -83,7 +79,8 @@ class TestContext { this.jobId = this.executionContext.config.job_id; } - get stores() { // eslint-disable-line + get stores() { + // eslint-disable-line return stores; } @@ -91,15 +88,21 @@ class TestContext { if (this.clusterMaster) return this.clusterMaster; const port = await findPort(); - const networkLatencyBuffer = _.get(this.context, 'sysconfig.teraslice.network_latency_buffer'); - const actionTimeout = _.get(this.context, 'sysconfig.teraslice.action_timeout'); - const nodeDisconnectTimeout = _.get(this.context, 'sysconfig.teraslice.node_disconnect_timeout'); + const networkLatencyBuffer = get( + this.context, + 'sysconfig.teraslice.network_latency_buffer' + ); + const actionTimeout = get(this.context, 'sysconfig.teraslice.action_timeout'); + const nodeDisconnectTimeout = get( + this.context, + 'sysconfig.teraslice.node_disconnect_timeout' + ); this.clusterMaster = new ClusterMaster.Server({ port, networkLatencyBuffer, actionTimeout, - nodeDisconnectTimeout, + nodeDisconnectTimeout }); await this.clusterMaster.start(); @@ -128,7 +131,7 @@ class TestContext { } async newSlice() { - const sliceConfig = newSliceConfig(); + const sliceConfig = newTestSlice({ request: { example: 'slice-data' } }); await this.addStateStore(); await stores.stateStore.createState(this.exId, sliceConfig, 'start'); return sliceConfig; @@ -173,7 +176,7 @@ class TestContext { async cleanup() { if (this.clean) return; - await Promise.map(this._cleanupFns, fn => fn()); + await Promise.all(this._cleanupFns.map(fn => fn())); this._cleanupFns.length = 0; this.events.removeAllListeners(); @@ -208,7 +211,7 @@ async function cleanupAll(withEs) { if (withEs && Object.keys(stores).length) { try { - await Promise.map(stores, store => store.shutdown(true)); + await Promise.all(Object.values(stores).map(store => store.shutdown(true))); } catch (err) { console.error(err); } diff --git a/packages/ts-transforms/package.json b/packages/ts-transforms/package.json index af6fb2f7dc4..c9e9afc82fd 100644 --- a/packages/ts-transforms/package.json +++ b/packages/ts-transforms/package.json @@ -37,7 +37,7 @@ "test:watch": "jest --coverage=false --notify --watch --onlyChanged" }, "dependencies": { - "@terascope/utils": "^0.12.3", + "@terascope/utils": "^0.12.4", "@types/graphlib": "^2.1.5", "@types/shortid": "^0.0.29", "awesome-phonenumber": "^2.12.0", diff --git a/packages/ui-components/package.json b/packages/ui-components/package.json index 116fd66494c..c3537941dbd 100644 --- a/packages/ui-components/package.json +++ b/packages/ui-components/package.json @@ -24,7 +24,7 @@ }, "dependencies": { "@terascope/data-access": "^0.12.0", - "@terascope/utils": "^0.12.3", + "@terascope/utils": "^0.12.4", "apollo-boost": "^0.4.3", "apollo-client": "^2.6.3", "date-fns": "^1.30.1", diff --git a/packages/ui-core/package.json b/packages/ui-core/package.json index 436e4b91d7d..39913a37deb 100644 --- a/packages/ui-core/package.json +++ b/packages/ui-core/package.json @@ -44,7 +44,7 @@ "devDependencies": { "@craco/craco": "^5.2.3", "@terascope/ui-components": "^0.3.6", - "@terascope/utils": "^0.12.3", + "@terascope/utils": "^0.12.4", "@types/jest": "24.0.15", "@types/node": "12.6.6", "@types/react": "16.8.23", diff --git a/packages/ui-data-access/package.json b/packages/ui-data-access/package.json index 0b65d4708ba..9226dae76b9 100644 --- a/packages/ui-data-access/package.json +++ b/packages/ui-data-access/package.json @@ -26,7 +26,7 @@ "@terascope/data-access": "^0.12.0", "@terascope/data-types": "^0.3.1", "@terascope/ui-components": "^0.3.6", - "@terascope/utils": "^0.12.3", + "@terascope/utils": "^0.12.4", "@types/jest": "24.0.15", "@types/node": "12.6.6", "@types/react": "16.8.23", diff --git a/packages/utils/package.json b/packages/utils/package.json index 9dc758d02ec..19f9a6e8b2f 100644 --- a/packages/utils/package.json +++ b/packages/utils/package.json @@ -1,6 +1,6 @@ { "name": "@terascope/utils", - "version": "0.12.3", + "version": "0.12.4", "description": "A collection of Teraslice Utilities", "homepage": "https://github.com/terascope/teraslice/tree/master/packages/utils#readme", "bugs": { diff --git a/packages/utils/src/promises.ts b/packages/utils/src/promises.ts index 8344a849ca0..34e8d546afb 100644 --- a/packages/utils/src/promises.ts +++ b/packages/utils/src/promises.ts @@ -122,11 +122,11 @@ export async function pRetry(fn: PromiseFn, options?: Partial 1) { - await pDelay(config._currentDelay); - config.retries--; config._currentDelay = getBackoffDelay(config._currentDelay, config.backoff, config.maxDelay, config.delay); + await pDelay(config._currentDelay); + config.logError(err, 'retry error, retrying...', { ...config, _context: null, diff --git a/packages/utils/src/strings.ts b/packages/utils/src/strings.ts index b46821388d8..4f1a2d1086e 100644 --- a/packages/utils/src/strings.ts +++ b/packages/utils/src/strings.ts @@ -8,7 +8,7 @@ export function toString(val: any): string { if (val == null) return ''; if (isString(val)) return val; if (typeof val === 'number' && !Number.isNaN(val)) return `${val}`; - if (val && typeof val === 'object' && val.message && val.stack) { + if (val.message && val.stack) { return val.toString(); } diff --git a/packages/xlucene-evaluator/package.json b/packages/xlucene-evaluator/package.json index 93d57d58819..118b47e12b2 100644 --- a/packages/xlucene-evaluator/package.json +++ b/packages/xlucene-evaluator/package.json @@ -32,7 +32,7 @@ "test:watch": "jest --coverage=false --notify --watch --onlyChanged" }, "dependencies": { - "@terascope/utils": "^0.12.3", + "@terascope/utils": "^0.12.4", "@turf/bbox": "^6.0.1", "@turf/bbox-polygon": "^6.0.1", "@turf/boolean-point-in-polygon": "^6.0.1", diff --git a/packages/xlucene-evaluator/test/parser-spec.ts b/packages/xlucene-evaluator/test/parser-spec.ts index 6e1f0fed604..8dfc307d226 100644 --- a/packages/xlucene-evaluator/test/parser-spec.ts +++ b/packages/xlucene-evaluator/test/parser-spec.ts @@ -1,7 +1,7 @@ import 'jest-extended'; -import { TSError } from '@terascope/utils'; +import { TSError, times } from '@terascope/utils'; import allTestCases from './cases/parser'; -import { Parser } from '../src/parser'; +import { Parser, ASTType } from '../src/parser'; describe('Parser', () => { for (const [key, testCases] of Object.entries(allTestCases)) { @@ -15,6 +15,20 @@ describe('Parser', () => { }); } + describe('when testing edge cases', () => { + describe('given a gigantic query', () => { + it('should be able to parse it', () => { + const partOne = times(300, n => `a:${n}`).join(' OR '); + const partTwo = times(200, n => `b:${n}`).join(' OR '); + const partThree = times(500, n => `c:${n}`).join(') OR ('); + const parser = new Parser(`(${partOne}) AND ${partTwo} OR (${partThree})`); + expect(parser.ast).toMatchObject({ + type: ASTType.LogicalGroup, + }); + }); + }); + }); + describe('when given a invalid query "(ba"', () => { it('should throw an error', () => { // tslint:disable-next-line: max-line-length diff --git a/packages/xlucene-evaluator/test/translator-spec.ts b/packages/xlucene-evaluator/test/translator-spec.ts index 6e748d6a934..f1bc1e2ea05 100644 --- a/packages/xlucene-evaluator/test/translator-spec.ts +++ b/packages/xlucene-evaluator/test/translator-spec.ts @@ -1,5 +1,5 @@ import 'jest-extended'; -import { debugLogger, get } from '@terascope/utils'; +import { debugLogger, get, times } from '@terascope/utils'; import { buildAnyQuery } from '../src/translator/utils'; import { Translator, TypeConfig } from '../src'; import { AST, Parser } from '../src/parser'; @@ -63,6 +63,80 @@ describe('Translator', () => { }); } + describe('when testing edge cases', () => { + describe('given a gigantic query', () => { + it('should be able to translate it', () => { + const randomFloat = (n: number) => { + return Math.random() * n; + }; + + const randomInt = (n: number) => { + return Math.round(randomFloat(n)); + }; + + const randomVal = (n: number): string => { + if (Math.random() < Math.random()) { + return `(${randomInt(n)} ${randomInt(n)} ${randomInt(n)})`; + } + if (Math.random() < Math.random()) { + return `[* TO ${randomInt(n)}}`; + } + if (Math.random() < Math.random()) { + return '/[a-z]+/'; + } + if (Math.random() < Math.random()) { + return 'hi:the?e'; + } + if (Math.random() < Math.random()) { + return `>=${randomInt(n)}`; + } + if (Math.random() < Math.random()) { + return `<${randomFloat(n)}`; + } + if (Math.random() < Math.random()) { + return '[2012-01-01 TO 2012-12-31]'; + } + if (Math.random() < Math.random()) { + return `[* TO ${randomInt(n)}}`; + } + if (Math.random() < Math.random()) { + return `(_geo_point_:"${randomFloat(n)},${randomFloat(n)}" _geo_distance_:${randomInt(n)}m)`; + } + return '"some-random-string"'; + }; + + const joinParts = (parts: string[]) => { + return parts + .map((part, i, arr) => { + if (i + 1 === arr.length) return `${part}`; + if (i % 2 === 0) return `(${part}) OR`; + if (i % 5 === 0) return `${part} OR`; + if (i % 7 === 0) return `${part} AND NOT`; + return `(${part}) AND`; + }) + .join(' '); + }; + + const partsA = times(20, n => times(20, i => `example_a_${n}_${i}:${randomVal(n)}`).join(n % 10 === 0 ? ') OR (' : ' OR ')); + const partsB = times(20, n => times(20, i => `example_b_${n}_${i}:${randomVal(n)}`).join(n % 10 === 0 ? ') OR (' : ' OR ')); + const partsC = times(20, n => times(20, i => `example_c_${n}_${i}:${randomVal(n)}`).join(n % 10 === 0 ? ') OR (' : ' OR ')); + const query = joinParts([partsA, partsB, partsC].map(joinParts)); + + const translator = new Translator(query); + const result = translator.toElasticsearchDSL(); + expect(result).toMatchObject({ + query: { + constant_score: { + filter: { + bool: {}, + }, + }, + }, + }); + }); + }); + }); + describe('when given an empty string', () => { it('should translate it to an empty query', () => { const translator = new Translator('');