Skip to content

Commit

Permalink
Merge pull request #1237 from terascope/scheduling-fixes
Browse files Browse the repository at this point in the history
v0.54.0 Improved resiliency against elasticsearch distruptions
  • Loading branch information
jsnoble authored Jul 18, 2019
2 parents 1c02e2d + 0ad021b commit 384f25c
Show file tree
Hide file tree
Showing 64 changed files with 1,096 additions and 659 deletions.
6 changes: 6 additions & 0 deletions e2e/config/teraslice-master.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,12 @@ terafoundation:
default:
host:
- 'elasticsearch:49200'
requestTimeout: 60000
deadTimeout: 45000
sniffOnStart: false
sniffOnConnectionFault: false
suggestCompression: false

# ***********************
# Kafka Configuration
# ***********************
Expand Down
6 changes: 6 additions & 0 deletions e2e/config/teraslice-worker.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,12 @@ terafoundation:
default:
host:
- 'elasticsearch:49200'
requestTimeout: 60000
deadTimeout: 45000
sniffOnStart: false
sniffOnConnectionFault: false
suggestCompression: false

# ***********************
# Kafka Configuration
# ***********************
Expand Down
2 changes: 1 addition & 1 deletion packages/chunked-file-reader/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
"test:watch": "jest --coverage=false --notify --watch --onlyChanged"
},
"dependencies": {
"@terascope/utils": "^0.12.3",
"@terascope/utils": "^0.12.4",
"bluebird": "^3.5.5",
"csvtojson": "^2.0.8",
"lodash": "^4.17.11"
Expand Down
8 changes: 4 additions & 4 deletions packages/data-access-plugin/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,18 @@
"dependencies": {
"@terascope/data-access": "^0.12.0",
"@terascope/data-types": "^0.3.1",
"@terascope/elasticsearch-api": "^2.0.5",
"@terascope/utils": "^0.12.3",
"@terascope/elasticsearch-api": "^2.1.0",
"@terascope/utils": "^0.12.4",
"apollo-server-express": "^2.6.5",
"graphql": "^14.3.1",
"graphql-iso-date": "^3.6.1",
"graphql-tag": "^2.10.1",
"graphql-type-json": "^0.3.0",
"terafoundation": "^0.9.1",
"terafoundation": "^0.10.0",
"xlucene-evaluator": "^0.9.4"
},
"devDependencies": {
"@terascope/job-components": "^0.20.4",
"@terascope/job-components": "^0.20.5",
"@types/express": "^4.17.0",
"@types/got": "^9.6.0",
"@types/graphql-iso-date": "^3.3.1",
Expand Down
2 changes: 1 addition & 1 deletion packages/data-access/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
},
"dependencies": {
"@terascope/data-types": "^0.3.1",
"@terascope/utils": "^0.12.3",
"@terascope/utils": "^0.12.4",
"elasticsearch-store": "^0.9.1",
"xlucene-evaluator": "^0.9.4"
},
Expand Down
2 changes: 1 addition & 1 deletion packages/data-types/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
"test:watch": "jest --coverage=false --notify --watch --onlyChanged"
},
"dependencies": {
"@terascope/utils": "^0.12.3",
"@terascope/utils": "^0.12.4",
"graphql": "^14.3.1",
"yargs": "^13.2.4"
},
Expand Down
183 changes: 146 additions & 37 deletions packages/elasticsearch-api/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,13 @@

const _ = require('lodash');
const Promise = require('bluebird');
const { TSError, isFatalError } = require('@terascope/utils');
const {
isTest,
TSError,
isFatalError,
getBackoffDelay,
isRetryableError
} = require('@terascope/utils');

const DOCUMENT_EXISTS = 409;

Expand Down Expand Up @@ -52,7 +58,7 @@ module.exports = function elasticsearchApi(client = {}, logger, _opConfig) {
.catch(errHandler);
}

_runRequest();
waitForClient(() => _runRequest(), reject);
});
}

Expand Down Expand Up @@ -175,7 +181,7 @@ module.exports = function elasticsearchApi(client = {}, logger, _opConfig) {
if (resultIndex.found) {
resultIndex.indexWindowSize.forEach((ind) => {
logger.warn(
`max_result_window for index: ${ind.name} is set at ${ind.windowSize} . On very large indices it is possible that a slice can not be divided to stay below this limit. If that occurs an error will be thrown by Elasticsearch and the slice can not be processed. Increasing max_result_window in the Elasticsearch index settings will resolve the problem.`
`max_result_window for index: ${ind.name} is set at ${ind.windowSize}. On very large indices it is possible that a slice can not be divided to stay below this limit. If that occurs an error will be thrown by Elasticsearch and the slice can not be processed. Increasing max_result_window in the Elasticsearch index settings will resolve the problem.`
);
});
} else {
Expand Down Expand Up @@ -230,15 +236,15 @@ module.exports = function elasticsearchApi(client = {}, logger, _opConfig) {
}

if (nonRetriableError) {
return { data: [], error: nonRetriableError, reason };
return { data: [], error: true, reason };
}

return { data: retry, error: false };
}

function bulkSend(data) {
return new Promise((resolve, reject) => {
const retry = _retryFn(_sendData, data);
const retry = _retryFn(_sendData, data, reject);

function _sendData(formattedData) {
return _clientRequest('bulk', { body: formattedData })
Expand All @@ -247,7 +253,9 @@ module.exports = function elasticsearchApi(client = {}, logger, _opConfig) {
const response = _filterResponse(formattedData, results);

if (response.error) {
reject(new TSError(response.reason));
reject(new TSError(response.reason, {
retryable: false
}));
} else if (response.data.length === 0) {
// may get doc already created error, if so just return
resolve(results);
Expand Down Expand Up @@ -484,7 +492,7 @@ module.exports = function elasticsearchApi(client = {}, logger, _opConfig) {
function _searchES(query) {
return new Promise((resolve, reject) => {
const errHandler = _errorHandler(_performSearch, query, reject, '->search()');
const retry = _retryFn(_performSearch, query);
const retry = _retryFn(_performSearch, query, reject);

function _performSearch(queryParam) {
client
Expand Down Expand Up @@ -517,38 +525,83 @@ module.exports = function elasticsearchApi(client = {}, logger, _opConfig) {
.catch(errHandler);
}

_performSearch(query);
waitForClient(() => _performSearch(query), reject);
});
}

function _retryFn(fn, data) {
const retryTimer = { start: retryStart, limit: retryLimit };
/**
* Wait for the client to be available before resolving,
* this will also naturally stagger many in-flight requests
*
* - reject if the connection is closed
* - resolve after timeout to let the underlying client deal with any problems
*/
function waitForClient(resolve, reject) {
let intervalId = null;
const startTime = Date.now();

// set different values for when process.env.NODE_ENV === test
const timeoutMs = isTest ? 1000 : _.random(5000, 15000);
const intervalMs = isTest ? 50 : 100;

// avoiding setting the interval if we don't need to
if (_checkClient()) {
return;
}

intervalId = setInterval(_checkClient, intervalMs);

function _checkClient() {
const elapsed = Date.now() - startTime;
try {
const valid = verifyClient();
if (!valid && elapsed <= timeoutMs) return false;

clearInterval(intervalId);
resolve(elapsed);
return true;
} catch (err) {
clearInterval(intervalId);
reject(err);
return true;
}
}
}

function _retryFn(fn, data, reject) {
let delay = 0;

return (_data) => {
const args = _data || data;
const randomMs = Math.random() * (retryTimer.limit - retryTimer.start);
const timer = Math.floor(randomMs + retryTimer.start);

if (retryTimer.limit < 60000) {
retryTimer.limit += retryLimit;
}
if (retryTimer.start < 30000) {
retryTimer.start += retryStart;
}
setTimeout(() => {
fn(args);
}, timer);
waitForClient((elapsed) => {
delay = getBackoffDelay(delay, 2, retryLimit, retryStart);

let timeoutMs = delay - elapsed;
if (timeoutMs < 1) timeoutMs = 1;

setTimeout(() => {
fn(args);
}, timeoutMs);
}, reject);
};
}

function _errorHandler(fn, data, reject, fnName = '->unknown()') {
const retry = _retryFn(fn, data);
const retry = _retryFn(fn, data, reject);
return function _errorHandlerFn(err) {
const isRejectedError = _.get(err, 'body.error.type') === 'es_rejected_execution_exception';
// const isConnectionError = _.get(err, 'message') === 'No Living connections';
let retryable = false;
if (isRetryableError(err)) {
retryable = true;
} else {
const isRejectedError = _.get(err, 'body.error.type') === 'es_rejected_execution_exception';
const isConnectionError = _.get(err, 'message', '').includes('No Living connections');
if (isRejectedError || isConnectionError) {
retryable = true;
}
}

if (isRejectedError) {
// this iteration we will not handle with no living connections issue
if (retryable) {
retry();
} else {
reject(
Expand All @@ -568,30 +621,84 @@ module.exports = function elasticsearchApi(client = {}, logger, _opConfig) {
return num >= 210;
}

function _isAvailable(index) {
const query = { index, q: '*' };
function isAvailable(index, recordType) {
const query = {
index,
q: '',
size: 0,
terminate_after: '1',
};

const label = recordType ? `for ${recordType}` : index;

return new Promise((resolve) => {
search(query)
return new Promise((resolve, reject) => {
client
.search(query)
.then((results) => {
logger.trace(`index ${index} is now available`);
logger.trace(`index ${label} is now available`);
resolve(results);
})
.catch(() => {
const isReady = setInterval(() => {
search(query)
let running = false;
const checkInterval = setInterval(() => {
if (running) return;
running = true;

try {
const valid = verifyClient();
if (!valid) {
logger.debug(`index ${label} is in an invalid state`);
return;
}
} catch (err) {
running = false;
clearInterval(checkInterval);
reject(err);
return;
}

client
.search(query)
.then((results) => {
clearInterval(isReady);
running = false;

clearInterval(checkInterval);
resolve(results);
})
.catch(() => {
logger.warn('verifying job index is open');
running = false;

logger.warn(`verifying index ${label} is open`);
});
}, 200);
});
});
}

/**
* Verify the state of the underlying es client.
* Will throw error if in fatal state, like the connection is closed.
*
* @returns {boolean} if client is working state it will return true
*/
function verifyClient() {
const closed = _.get(client, 'transport.closed', false);
if (closed) {
throw new TSError('Elasticsearch Client is closed', {
fatalError: true
});
}

const alive = _.get(client, 'transport.connectionPool._conns.alive');
// so we don't break existing tests with mocked clients, we will default to 1
const aliveCount = alive && Array.isArray(alive) ? alive.length : 1;
if (!aliveCount) {
return false;
}

return true;
}

function _migrate(index, migrantIndexName, mapping, recordType, clusterName) {
const reindexQuery = {
slices: 4,
Expand Down Expand Up @@ -755,7 +862,7 @@ module.exports = function elasticsearchApi(client = {}, logger, _opConfig) {
return new Promise((resolve, reject) => {
const attemptToCreateIndex = () => {
_createIndex(newIndex, migrantIndexName, mapping, recordType, clusterName)
.then(() => _isAvailable(newIndex))
.then(() => isAvailable(newIndex))
.catch((err) => {
if (isFatalError(err)) return Promise.reject(err);

Expand Down Expand Up @@ -793,7 +900,7 @@ module.exports = function elasticsearchApi(client = {}, logger, _opConfig) {
}
if (bool) {
logger.info('connection to elasticsearch has been established');
return _isAvailable(newIndex);
return isAvailable(newIndex);
}
return Promise.resolve();
})
Expand Down Expand Up @@ -831,6 +938,7 @@ module.exports = function elasticsearchApi(client = {}, logger, _opConfig) {
mget,
index: indexFn,
indexWithId,
isAvailable,
create,
update,
remove,
Expand All @@ -845,6 +953,7 @@ module.exports = function elasticsearchApi(client = {}, logger, _opConfig) {
indexRefresh,
indexRecovery,
indexSetup,
verifyClient,
validateGeoParameters,
// The APIs below are deprecated and should be removed.
index_exists: indexExists,
Expand Down
4 changes: 2 additions & 2 deletions packages/elasticsearch-api/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@terascope/elasticsearch-api",
"version": "2.0.5",
"version": "2.1.0",
"description": "Elasticsearch client api used across multiple services, handles retries and exponential backoff",
"homepage": "https://github.com/terascope/teraslice/tree/master/packages/elasticsearch-api#readme",
"bugs": {
Expand All @@ -19,7 +19,7 @@
"test:watch": "jest --coverage=false --notify --watch --onlyChanged"
},
"dependencies": {
"@terascope/utils": "^0.12.3",
"@terascope/utils": "^0.12.4",
"bluebird": "^3.5.5",
"lodash": "^4.17.11"
},
Expand Down
Loading

0 comments on commit 384f25c

Please sign in to comment.