From 3196119a9222f22bceb9617c3aea4a7cc07ca87b Mon Sep 17 00:00:00 2001 From: Xiaoman Dong Date: Fri, 6 Apr 2018 16:55:17 -0700 Subject: [PATCH 01/10] Address all lint issue --- lib/kafka_base_producer.js | 80 ++++++----- lib/kafka_rest_client.js | 72 +++++----- lib/message_batch.js | 11 +- lib/utils.js | 4 +- test/test_kafka_producer.js | 3 + test/test_kafka_rest_client.js | 243 ++++++++++++++++----------------- 6 files changed, 214 insertions(+), 199 deletions(-) diff --git a/lib/kafka_base_producer.js b/lib/kafka_base_producer.js index 8f99854..d8ea582 100644 --- a/lib/kafka_base_producer.js +++ b/lib/kafka_base_producer.js @@ -113,10 +113,13 @@ function KafkaBaseProducer(options, producerType) { // eslint-disable-line if (self.batching) { // default 100kb buffer cache per a topic self.maxBatchSizeBytes = options.maxBatchSizeBytes || 100000; - self.topicToBatchQueue = {}; // map of topic name to MessageBatch + // map of topic name to MessageBatch + self.topicToBatchQueue = {}; - self.flushCycleSecs = options.flushCycleSecs || 1; // flush a topic's batch message every second - var flushCache = function flushCache() { // eslint-disable-line + // flush a topic's batch message every second + self.flushCycleSecs = options.flushCycleSecs || 1; + var flushCache = function flushCache() { + // eslint-disable-line self.flushEntireCache(); }; self.flushInterval = setInterval(flushCache, self.flushCycleSecs * 1000); // eslint-disable-line @@ -132,7 +135,8 @@ function KafkaBaseProducer(options, producerType) { // eslint-disable-line var auditMsg = auditMsgs[i]; self.produce(self.auditTopicName, auditMsg, (Date.now() / 1000)); } - self.topicToMsgcntMaps = {}; // reset the msg count map for next round of auditing + // reset the msg count map for next round of auditing + self.topicToMsgcntMaps = {}; }; self.auditInterval = setInterval(produceAuditMsg, self.auditTimeBucketIntervalInSec * 1000); @@ -150,7 +154,8 @@ function KafkaBaseProducer(options, producerType) { // eslint-disable-line var auditMsgAtProduce = auditMsgsAtProduce[i]; self.produce(self.auditTopicNameC3, auditMsgAtProduce, (Date.now() / 1000)); } - self.topicToMsgcntMapsAtProduce = {}; // reset the msg count map for next round of auditing + // reset the msg count map for next round of auditing + self.topicToMsgcntMapsAtProduce = {}; var auditMsgsAtBatch = self._generateAuditMsgs(self.auditTierAtBatch, self.auditDatacenter, self.topicToMsgcntMapsAtBatch); @@ -158,7 +163,8 @@ function KafkaBaseProducer(options, producerType) { // eslint-disable-line var auditMsgAtBatch = auditMsgsAtBatch[j]; self.produce(self.auditTopicNameC3, auditMsgAtBatch, (Date.now() / 1000)); } - self.topicToMsgcntMapsAtBatch = {}; // reset the msg count map for next round of auditing + // reset the msg count map for next round of auditing + self.topicToMsgcntMapsAtBatch = {}; }; self.auditIntervalC3 = setInterval(produceC3AuditMsg, self.auditTimeBucketIntervalInSec * 1000); @@ -228,9 +234,10 @@ KafkaBaseProducer.prototype.produce = function produce(topic, message, timeStamp if (self.closing) { if (callback) { - callback(new Error('cannot produce to closed KafkaRestProducer')); - } // TODO: else { self.logger.error() } - return; + return callback(new Error('cannot produce to closed KafkaRestProducer')); + } + // TODO: else { self.logger.error() } + return null; } if (self.restClient) { @@ -253,8 +260,9 @@ KafkaBaseProducer.prototype.produce = function produce(topic, message, timeStamp } } else if (callback) { - callback(new Error('Kafka Rest Client is not initialized!')); + return callback(new Error('Kafka Rest Client is not initialized!')); } + return null; }; KafkaBaseProducer.prototype.produceSync = function produce(topic, producerRecord, callback) { @@ -262,17 +270,17 @@ KafkaBaseProducer.prototype.produceSync = function produce(topic, producerRecord if (self.closing) { if (callback) { - callback(new Error('cannot produceSync to closed KafkaDataProducer')); + return callback(new Error('cannot produceSync to closed KafkaDataProducer')); } - return; + return null; } var err = producerRecord.validate(); if (err !== null) { if (callback) { - callback(err); + return callback(err); } - return; + return null; } if (self.restClient) { @@ -285,8 +293,9 @@ KafkaBaseProducer.prototype.produceSync = function produce(topic, producerRecord self._auditNewMsg(topic, timeBeginInSecC3, 1, self.topicToMsgcntMapsAtProduce); } } else if (callback) { - callback(new Error('Kafka Rest Client is not initialized!')); + return callback(new Error('Kafka Rest Client is not initialized!')); } + return null; }; KafkaBaseProducer.prototype._produce = function _produce(msg, callback) { @@ -297,13 +306,14 @@ KafkaBaseProducer.prototype._produce = function _produce(msg, callback) { function onResponse(err, result) { if (callback) { - callback(err, result); + return callback(err, result); } self.pendingWrites--; if (self.closing) { self._tryClose(); } + return null; } }; @@ -316,16 +326,18 @@ KafkaBaseProducer.prototype.batch = function batch(topic, message, timeStamp, ca if (self.closing) { if (callback) { - callback(new Error('cannot batch() to closed KafkaRestProducer')); - } // TODO: else { self.logger.error() } - return; + return callback(new Error('cannot batch() to closed KafkaRestProducer')); + } + // TODO: else { self.logger.error() } + return null; } if (typeof message !== 'string' && !Buffer.isBuffer(message)) { if (callback) { - callback(new Error('For batching, message must be a string or buffer, not ' + (typeof message))); - } // TODO: else { self.logger.error(); } - return; + return callback(new Error('For batching, message must be a string or buffer, not ' + (typeof message))); + } + // TODO: else { self.logger.error(); } + return null; } var messageBatch; @@ -376,8 +388,9 @@ KafkaBaseProducer.prototype._produceBatch = function _produceBatch(messageBatch, } if (callback) { - callback(err, res); + return callback(err, res); } + return null; } }; @@ -399,7 +412,7 @@ KafkaBaseProducer.prototype.flushEntireCache = function flushEntireCache(callbac } if (pending === 0 && callback) { - callback(null); + return callback(null); } function onProduced(err) { @@ -409,9 +422,11 @@ KafkaBaseProducer.prototype.flushEntireCache = function flushEntireCache(callbac } if (--pending === 0 && callback) { - callback(null); + return callback(null); } + return null; } + return null; }; KafkaBaseProducer.prototype._auditNewMsg = function _auditNewMsg(topic, timeBeginInSec, msgCount, topicToMsgcntMaps) { @@ -435,9 +450,8 @@ KafkaBaseProducer.prototype._getTimeBeginInSec = function _getTimeBeginInSec(now // 1000000000000 (13 digits), as second, means Fri Sep 27 33658 01:46:40.... if (nowInSec > 999999999999.0) { return Math.floor((nowInSec / 1000) / self.auditTimeBucketIntervalInSec) * self.auditTimeBucketIntervalInSec; - } else { - return Math.floor(nowInSec / self.auditTimeBucketIntervalInSec) * self.auditTimeBucketIntervalInSec; } + return Math.floor(nowInSec / self.auditTimeBucketIntervalInSec) * self.auditTimeBucketIntervalInSec; }; KafkaBaseProducer.prototype._getTimeBeginInSecFromHp = function _getTimeBeginInSecFromHp(message) { @@ -521,8 +535,9 @@ KafkaBaseProducer.prototype.logLineWithTimeStamp = function logLine(topic, messa var wholeMessage = JSON.stringify(self.getWholeMsg(topic, message, timeStamp)); self.produce(topic, wholeMessage, timeStamp, function handleResponse(err, res) { if (callback) { - callback(err, res); + return callback(err, res); } + return null; }); }; @@ -583,7 +598,8 @@ KafkaBaseProducer.prototype.close = function close(callback) { (Date.now() / 1000), 'binary'); self._produce(produceMessage); } - self.topicToMsgcntMaps = {}; // reset the msg count map for next round of auditing + // reset the msg count map for next round of auditing + self.topicToMsgcntMaps = {}; } if (self.auditIntervalC3) { @@ -595,7 +611,8 @@ KafkaBaseProducer.prototype.close = function close(callback) { (Date.now() / 1000), 'binary'); self._produce(produceMessageAtProduce); } - self.topicToMsgcntMapsAtProduce = {}; // reset the msg count map for next round of auditing + // reset the msg count map for next round of auditing + self.topicToMsgcntMapsAtProduce = {}; var auditMsgsAtBatch = self._generateAuditMsgs(self.auditTierAtBatch, self.auditDatacenter, self.topicToMsgcntMapsAtBatch); @@ -604,7 +621,8 @@ KafkaBaseProducer.prototype.close = function close(callback) { (Date.now() / 1000), 'binary'); self._produce(produceMessageAtBatch); } - self.topicToMsgcntMapsAtBatch = {}; // reset the msg count map for next round of auditing + // reset the msg count map for next round of auditing + self.topicToMsgcntMapsAtBatch = {}; } self._tryClose(); diff --git a/lib/kafka_rest_client.js b/lib/kafka_rest_client.js index e377111..60d129c 100644 --- a/lib/kafka_rest_client.js +++ b/lib/kafka_rest_client.js @@ -25,8 +25,10 @@ var url = require('url'); var http = require('http'); var lbPool = require('lb_pool'); var os = require('os'); -var utils = require('./utils') +var utils = require('./utils'); +/* eslint-disable no-unused-vars */ var pkginfo = require('pkginfo')(module); +/* eslint-enable no-unused-vars */ var supportContentType = { 'binary': 'application/vnd.kafka.binary.v1', @@ -54,7 +56,8 @@ var connectWaitTimeMs = [0, 100, 3000, 10000, 30000, 60000, 300000, 600000]; var defaultReconnectRetryMs = 100; String.isNullOrEmpty = function checkNullOrEmpty(value) { - return !value; // based on https://codereview.stackexchange.com/questions/5572/string-isnullorempty-in-javascript + // based on https://codereview.stackexchange.com/questions/5572/string-isnullorempty-in-javascript + return !value; }; // Kafka 8 Rest Proxy Client. // Initialization: takes host and port and send a GET call to get all the @@ -89,11 +92,11 @@ function KafkaRestClient(options, callback) { self.enable = false; totalWaitingTimeMs = 0; self.scheduleConnect(1); - callback(); // Will not return an error since there are retries to rest proxy - } else { - self.enable = true; - callback(); + // Will not return an error since there are retries to rest proxy + return callback(); } + self.enable = true; + return callback(); }); self.scheduleTopicMapRefresh(); self.statsd = (options.statsd && typeof options.statsd.increment === 'function') ? options.statsd : null; @@ -108,19 +111,19 @@ KafkaRestClient.prototype.initHttpClient = function initHttpClient(options) { if (self.clientType === 'AtLeastOnce') { self.mainHttpClient = new lbPool.Pool(http, [self.requestUrl], { /* eslint-disable camelcase */ - /*jshint camelcase: false */ + /* jshint camelcase: false */ keep_alive: true, timeout: self.timeout, max_pending: self.maxPending - /*jshint camelcase: true */ + /* jshint camelcase: true */ /* eslint-enable camelcase */ }); } else { self.mainHttpClient = new lbPool.Pool(http, [self.requestUrl], { /* eslint-disable camelcase */ - /*jshint camelcase: false */ + /* jshint camelcase: false */ keep_alive: true - /*jshint camelcase: true */ + /* jshint camelcase: true */ /* eslint-enable camelcase */ }); } @@ -136,12 +139,13 @@ KafkaRestClient.prototype.initHttpClient = function initHttpClient(options) { KafkaRestClient.prototype.initLineageOption = function initLineageOption(options) { var self = this; if ('serviceName' in options) { - self.serviceName = options.serviceName; // the application name for data lineage + // the application name for data lineage + self.serviceName = options.serviceName; } else { if ('serviceNameEnv' in options) { self.serviceNameEnv = options.serviceNameEnv; } else { - self.serviceNameEnv = defaultAppNameEnvVar; // the Uber default app id + self.serviceNameEnv = defaultAppNameEnvVar; } /* eslint-disable no-process-env */ self.serviceName = process.env[self.serviceNameEnv]; @@ -197,9 +201,8 @@ KafkaRestClient.prototype.getProduceInterval = function getProduceInterval(retry return 1000; } return 600 * Math.pow(5, retry + 1); - } else { - return self.produceInterval; } + return self.produceInterval; }; KafkaRestClient.prototype.scheduleTopicMapRefresh = function scheduleTopicMapRefresh() { @@ -244,14 +247,14 @@ KafkaRestClient.prototype.discoverTopics = function discoverTopics(callback) { this.getTopicRequestBody(self.proxyHost, self.proxyPort, function updateTopicToUrlMapping(err, res) { if (err) { self.connecting = false; - callback(err, false); - } else { - self.connecting = false; - self.enable = true; - self.cachedTopicToUrlMapping = self.getTopicToUrlMapping(res); - self.topicDiscoveryTimes++; - callback(err, true); + return callback(err, false); } + self.connecting = false; + self.enable = true; + self.cachedTopicToUrlMapping = self.getTopicToUrlMapping(res); + self.topicDiscoveryTimes++; + return callback(err, true); + }); }; @@ -263,15 +266,15 @@ KafkaRestClient.prototype.getTopicRequestBody = function getTopicRequestBody(pro } self.mainHttpClient.get(endpoint, function handleGetCall(err, res, body) { if (err) { - callback(err, null); - } else { - callback(null, body); + return callback(err, null); } + callback(null, body); }); }; KafkaRestClient.prototype.getTopicToUrlMapping = function getTopicToUrlMapping(body) { var self = this; + /* eslint-disable no-try-catch */ try { var urlToTopicsJson = JSON.parse(body); var topicToUrlMapping = {}; @@ -287,6 +290,7 @@ KafkaRestClient.prototype.getTopicToUrlMapping = function getTopicToUrlMapping(b } catch (e) { return self.cachedTopicToUrlMapping; } + /* eslint-enable no-try-catch */ }; KafkaRestClient.prototype.isHeatPipeTopic = function isHeatPipeTopic(topic) { @@ -345,7 +349,7 @@ KafkaRestClient.prototype.produceWithRetry = function produceWithRetry(produceMe } else if (self.goToLocalAgent(produceMessage.topic)) { self.tryMakeRequest(produceMessage, self.maxRetries - 1, callback); } else { - callback(new Error('Kafka Rest Client is not enabled yet.')); + return callback(new Error('Kafka Rest Client is not enabled yet.')); } }; @@ -365,19 +369,19 @@ KafkaRestClient.prototype.tryMakeRequest = function tryMakeRequest(produceMessag if (self.clientType === 'AtLeastOnce') { self.urlToHttpClientMapping[pathUrl] = new lbPool.Pool(http, [pathUrl], { /* eslint-disable camelcase */ - /*jshint camelcase: false */ + /* jshint camelcase: false */ keep_alive: true, timeout: self.timeout, max_pending: self.maxPending - /*jshint camelcase: true */ + /* jshint camelcase: true */ /* eslint-enable camelcase */ }); } else { self.urlToHttpClientMapping[pathUrl] = new lbPool.Pool(http, [pathUrl], { /* eslint-disable camelcase */ - /*jshint camelcase: false */ + /* jshint camelcase: false */ keep_alive: true - /*jshint camelcase: true */ + /* jshint camelcase: true */ /* eslint-enable camelcase */ }); } @@ -414,10 +418,9 @@ KafkaRestClient.prototype.tryMakeRequest = function tryMakeRequest(produceMessag self.handleError4xx(produceMessage, res, callback); } else if (self.clientType !== 'AtLeastOnce' || (self.clientType === 'AtLeastOnce' && pathUrl !== self.localAgentHost + ':' + self.localAgentPort)) { - callback(null, body); - } else { - callback(new Error('Send to local agent for topic - ' + produceMessage.topic)); + return callback(null, body); } + callback(new Error('Send to local agent for topic - ' + produceMessage.topic)); }); }; @@ -448,11 +451,10 @@ KafkaRestClient.prototype.handleError5xx = function handleError1(produceMessage, // will cause problem when we try to JSON.stringify it err.attempt = null; } - callback(err); - if (self.statsd) { self.statsd.increment(metricPrefix + '.error'); } + return callback(err); } }; @@ -462,7 +464,7 @@ KafkaRestClient.prototype.handleError4xx = function handleError2(produceMessage, if (self.goToLocalAgent(produceMessage.topic) && self.maxRetries > 1) { self.tryMakeRequest(produceMessage, self.maxRetries - 1, callback); } else { - callback(new Error('Got statusCode ' + res.statusCode)); + return callback(new Error('Got statusCode ' + res.statusCode)); } }; diff --git a/lib/message_batch.js b/lib/message_batch.js index 9a2fd79..8304b48 100644 --- a/lib/message_batch.js +++ b/lib/message_batch.js @@ -25,8 +25,8 @@ var Buffer = require('buffer').Buffer; // The batch message payload should follow: // 4 BE bytes number of messages + 4 BE bytes size of message + actual message var KAFKA_PADDINGS = { - MESSAGE_PADDING_BYTES: 4, - BATCH_PADDING_BYTES: 4 + MESSAGE_PADDING_BYTES: 4, + BATCH_PADDING_BYTES: 4 }; Object.freeze(KAFKA_PADDINGS); @@ -71,9 +71,10 @@ MessageBatch.prototype.addMessage = function addMessage(message, callback) { } else { var err = new Error('For batching, message must be a string or buffer, not ' + (typeof message)); if (callback) { - callback(err); - } // TODO: else { self.logger.error(err); } - return; + return callback(err); + } + // TODO: else { self.logger.error(err); } + return null; } self.cachedBuf.writeInt32BE(bytesWritten - KAFKA_PADDINGS.MESSAGE_PADDING_BYTES, offset); diff --git a/lib/utils.js b/lib/utils.js index e67da47..0b93f42 100644 --- a/lib/utils.js +++ b/lib/utils.js @@ -24,7 +24,6 @@ var clientIDMaxBlockLength = 128; - // Retrieves some meaningful application name for data lineage function getServiceName() { var processName = process.argv[1]; @@ -32,7 +31,6 @@ function getServiceName() { return '[' + getLimitedLength(processName) + ',' + pid + ']'; } - function getLimitedLength(longName) { if (longName.length > clientIDMaxBlockLength) { longName = longName.substring(0, clientIDMaxBlockLength - 1); @@ -43,4 +41,4 @@ function getLimitedLength(longName) { module.exports = { getServiceName: getServiceName, getLimitedLength: getLimitedLength -} \ No newline at end of file +}; diff --git a/test/test_kafka_producer.js b/test/test_kafka_producer.js index 0ea7b2a..667d59a 100644 --- a/test/test_kafka_producer.js +++ b/test/test_kafka_producer.js @@ -473,6 +473,8 @@ test('Test generate audit msg', function testKafkaProducerGenerateAuditMsg(asser var cntTestTopic0 = 0; var cntTestTopic1 = 0; var cntTestTopic2 = 0; + + /* eslint-disable block-scoped-var */ for (i = 0; i < auditMsgs.length; i++) { var auditMsg = auditMsgs[i]; var json = JSON.parse(auditMsg); @@ -487,6 +489,7 @@ test('Test generate audit msg', function testKafkaProducerGenerateAuditMsg(asser } assert.equal(json.version, KafkaVersion); } + /* eslint-enable block-scoped-var */ assert.equal(cntTestTopic0, 5120000); assert.equal(cntTestTopic1, 2); diff --git a/test/test_kafka_rest_client.js b/test/test_kafka_rest_client.js index 3581d11..9321d7e 100644 --- a/test/test_kafka_rest_client.js +++ b/test/test_kafka_rest_client.js @@ -186,15 +186,6 @@ test('KafkaRestClient handle post with blacklist client', function testKafkaRest produceInterval: 100 }); - function getProduceMessage(topic, message, ts, type) { - var produceMessage = {}; - produceMessage.topic = topic; - produceMessage.message = message; - produceMessage.timeStamp = ts; - produceMessage.type = type; - return produceMessage; - } - async.parallel([ function test1(next) { restClient.produce(getProduceMessage('testTopic0', 'msg0', timeStamp, 'binary'), @@ -218,11 +209,11 @@ test('KafkaRestClient handle post with blacklist client', function testKafkaRest }); function verifyHeader(assert, PORT, restClient) { - var topicName = "LOGGING TOPICS"; + var topicName = 'LOGGING TOPICS'; var timeStamp = Date.now() / 1000.0; function OverriddenHttpClient(expectedServiceNameHeader, expectedServiceName, expectedClientVersionHeader, - expectedInstanceNameHeader, expectedInstanceName){ + expectedInstanceNameHeader, expectedInstanceName) { this.expectedServiceNameHeader = expectedServiceNameHeader; this.expectedServiceName = expectedServiceName; this.postMethodCalled = false; @@ -231,147 +222,149 @@ function verifyHeader(assert, PORT, restClient) { this.expectedInstanceName = expectedInstanceName; } - OverriddenHttpClient.prototype.post = function post(reqOpts, msg, cb){ + OverriddenHttpClient.prototype.post = function post(reqOpts, msg, cb) { assert.true(this.expectedServiceNameHeader in reqOpts.headers); assert.true(this.expectedInstanceNameHeader in reqOpts.headers); assert.equal(reqOpts.headers[this.expectedServiceNameHeader], this.expectedServiceName); assert.equal(reqOpts.headers[this.expectedInstanceNameHeader], this.expectedInstanceName); assert.true(reqOpts.headers[this.expectedClientVersionHeader].indexOf('node') >= 0); this.postMethodCalled = true; - //cb(); }; var mockedHttpClient = new OverriddenHttpClient(restClient.serviceNameHeader, restClient.serviceName, restClient.clientVersionHeader, restClient.instanceNameHeader, restClient.instanceName); - var urlPath = "localhost:" + PORT.toString(); + var urlPath = 'localhost:' + PORT.toString(); restClient.urlToHttpClientMapping = {}; restClient.urlToHttpClientMapping[urlPath] = mockedHttpClient; restClient.produce(getProduceMessage(topicName, 'bla', timeStamp, 'binary'), function assertErrorThrows(err) { - console.log(err.reason); + assert.true(err !== null); }); assert.true(mockedHttpClient.postMethodCalled); } -test('KafkaRestClient can apply lineage header config correctly', function testKafkaRestClientLineageHeaderConfig(assert) { - //Lets define a port we want to listen to - var PORT=15380; - var configs = { - proxyHost: 'localhost', - proxyPort: PORT, - proxyRefreshTime: 0 - }; +/* eslint-disable max-statements */ +test('KafkaRestClient can apply lineage header config correctly', + function testKafkaRestClientLineageHeaderConfig(assert) { + // Lets define a port we want to listen to + var PORT = 15380; - /* global process */ - /* eslint-disable no-process-env */ + var configs = { + proxyHost: 'localhost', + proxyPort: PORT, + proxyRefreshTime: 0 + }; - // case 1: pass in serviceName directly. use it - process.env.UDEPLOY_APP_ID = 'Pickle!'; - process.env.UDEPLOY_DEPLOYMENT_NAME = 'Beth'; - var restClient = new KafkaRestClient({ - proxyHost: configs.proxyHost, - proxyPort: configs.proxyPort, - refreshTime: configs.proxyRefreshTime, - maxRetries: 3, - serviceName: 'Rick and Morty', - instanceName: 'production #4' - }); - assert.equal(restClient.serviceNameHeader, 'kafka-rest-client-service-name'); - assert.equal(restClient.serviceName, 'Rick and Morty'); - assert.equal(restClient.instanceName, 'production #4'); - assert.true(restClient.clientVersion.indexOf('node') > -1); - verifyHeader(assert, PORT, restClient); + /* global process */ + /* eslint-disable no-process-env */ - // case 2: pass in nothing, and no environment var, - // result: generate default with client version, service name, and instance name - process.env.UDEPLOY_APP_ID = ''; - process.env.UDEPLOY_DEPLOYMENT_NAME = ''; - var restClient2 = new KafkaRestClient({ - proxyHost: configs.proxyHost, - proxyPort: configs.proxyPort, - refreshTime: configs.proxyRefreshTime, - maxRetries: 3 - }); - assert.equal(restClient2.serviceNameHeader, 'kafka-rest-client-service-name'); - assert.equal(restClient2.serviceNameEnv, 'UDEPLOY_APP_ID'); - assert.assert(restClient2.serviceName.indexOf('node-kafka-rest-client') > -1); - assert.assert(restClient2.instanceName.indexOf(os.hostname()) > -1); - assert.true(restClient2.clientVersion.indexOf('node') > -1); - verifyHeader(assert, PORT, restClient2); + // case 1: pass in serviceName directly. use it + process.env.UDEPLOY_APP_ID = 'Pickle!'; + process.env.UDEPLOY_DEPLOYMENT_NAME = 'Beth'; + var restClient = new KafkaRestClient({ + proxyHost: configs.proxyHost, + proxyPort: configs.proxyPort, + refreshTime: configs.proxyRefreshTime, + maxRetries: 3, + serviceName: 'Rick and Morty', + instanceName: 'production #4' + }); + assert.equal(restClient.serviceNameHeader, 'kafka-rest-client-service-name'); + assert.equal(restClient.serviceName, 'Rick and Morty'); + assert.equal(restClient.instanceName, 'production #4'); + assert.true(restClient.clientVersion.indexOf('node') > -1); + verifyHeader(assert, PORT, restClient); - // case 3: pass in nothing, but environment var has value, - // result: generate name by environment variable value - process.env.UDEPLOY_APP_ID = 'Pickle!'; - process.env.UDEPLOY_DEPLOYMENT_NAME = 'Planet-Express'; - var restClient3 = new KafkaRestClient({ - proxyHost: configs.proxyHost, - proxyPort: configs.proxyPort, - refreshTime: configs.proxyRefreshTime, - maxRetries: 3 - }); - assert.equal(restClient3.serviceNameHeader, 'kafka-rest-client-service-name'); - assert.equal(restClient3.serviceNameEnv, 'UDEPLOY_APP_ID'); - assert.equal(restClient3.instanceNameEnv, 'UDEPLOY_DEPLOYMENT_NAME'); - assert.equal(restClient3.serviceName, 'Pickle!'); - assert.assert(restClient3.instanceName, 'Planet-Express'); - assert.true(restClient3.clientVersion.indexOf('node') > -1); + // case 2: pass in nothing, and no environment var, + // result: generate default with client version, service name, and instance name + process.env.UDEPLOY_APP_ID = ''; + process.env.UDEPLOY_DEPLOYMENT_NAME = ''; + var restClient2 = new KafkaRestClient({ + proxyHost: configs.proxyHost, + proxyPort: configs.proxyPort, + refreshTime: configs.proxyRefreshTime, + maxRetries: 3 + }); + assert.equal(restClient2.serviceNameHeader, 'kafka-rest-client-service-name'); + assert.equal(restClient2.serviceNameEnv, 'UDEPLOY_APP_ID'); + assert.assert(restClient2.serviceName.indexOf('node-kafka-rest-client') > -1); + assert.assert(restClient2.instanceName.indexOf(os.hostname()) > -1); + assert.true(restClient2.clientVersion.indexOf('node') > -1); + verifyHeader(assert, PORT, restClient2); - verifyHeader(assert, PORT, restClient3); + // case 3: pass in nothing, but environment var has value, + // result: generate name by environment variable value + process.env.UDEPLOY_APP_ID = 'Pickle!'; + process.env.UDEPLOY_DEPLOYMENT_NAME = 'Planet-Express'; + var restClient3 = new KafkaRestClient({ + proxyHost: configs.proxyHost, + proxyPort: configs.proxyPort, + refreshTime: configs.proxyRefreshTime, + maxRetries: 3 + }); + assert.equal(restClient3.serviceNameHeader, 'kafka-rest-client-service-name'); + assert.equal(restClient3.serviceNameEnv, 'UDEPLOY_APP_ID'); + assert.equal(restClient3.instanceNameEnv, 'UDEPLOY_DEPLOYMENT_NAME'); + assert.equal(restClient3.serviceName, 'Pickle!'); + assert.assert(restClient3.instanceName, 'Planet-Express'); + assert.true(restClient3.clientVersion.indexOf('node') > -1); - // case 4: pass in customized environment variable name, and customize header name - // result: generate name by customized environment variable value with new header - process.env.UDEPLOY_APP_ID = 'Pickle!'; - process.env.RICK_APP_ID = 'Meeseek'; - process.env.UDEPLOY_DEPLOYMENT_NAME = 'Planet-Express'; - process.env.MORTY_INSTANCE_ID = 'Citadel'; - var restClient4 = new KafkaRestClient({ - proxyHost: configs.proxyHost, - proxyPort: configs.proxyPort, - refreshTime: configs.proxyRefreshTime, - maxRetries: 3, - serviceNameEnv: 'RICK_APP_ID', - serviceNameHeader: 'lineage-source', - instanceNameEnv: 'MORTY_INSTANCE_ID', - instanceNameHeader: 'ricklantis-mixup' - }); - assert.equal(restClient4.serviceNameHeader, 'lineage-source'); - assert.equal(restClient4.serviceNameEnv, 'RICK_APP_ID'); - assert.equal(restClient4.serviceName, 'Meeseek'); - assert.equal(restClient4.instanceNameEnv, 'MORTY_INSTANCE_ID'); - assert.equal(restClient4.instanceNameHeader, 'ricklantis-mixup'); - assert.equal(restClient4.instanceName, 'Citadel'); - assert.true(restClient4.clientVersion.indexOf('node') > -1); + verifyHeader(assert, PORT, restClient3); - verifyHeader(assert, PORT, restClient4); + // case 4: pass in customized environment variable name, and customize header name + // result: generate name by customized environment variable value with new header + process.env.UDEPLOY_APP_ID = 'Pickle!'; + process.env.RICK_APP_ID = 'Meeseek'; + process.env.UDEPLOY_DEPLOYMENT_NAME = 'Planet-Express'; + process.env.MORTY_INSTANCE_ID = 'Citadel'; + var restClient4 = new KafkaRestClient({ + proxyHost: configs.proxyHost, + proxyPort: configs.proxyPort, + refreshTime: configs.proxyRefreshTime, + maxRetries: 3, + serviceNameEnv: 'RICK_APP_ID', + serviceNameHeader: 'lineage-source', + instanceNameEnv: 'MORTY_INSTANCE_ID', + instanceNameHeader: 'ricklantis-mixup' + }); + assert.equal(restClient4.serviceNameHeader, 'lineage-source'); + assert.equal(restClient4.serviceNameEnv, 'RICK_APP_ID'); + assert.equal(restClient4.serviceName, 'Meeseek'); + assert.equal(restClient4.instanceNameEnv, 'MORTY_INSTANCE_ID'); + assert.equal(restClient4.instanceNameHeader, 'ricklantis-mixup'); + assert.equal(restClient4.instanceName, 'Citadel'); + assert.true(restClient4.clientVersion.indexOf('node') > -1); - // case 5: pass in everything in config - // result: the instance name and service name provided is used - process.env.UDEPLOY_APP_ID = 'Pickle!'; - process.env.RICK_APP_ID = 'Meeseek'; - process.env.UDEPLOY_DEPLOYMENT_NAME = 'Planet-Express'; - process.env.MORTY_INSTANCE_ID = 'Citadel'; - var restClient5 = new KafkaRestClient({ - proxyHost: configs.proxyHost, - proxyPort: configs.proxyPort, - refreshTime: configs.proxyRefreshTime, - maxRetries: 3, - serviceName: 'Meeseek', - serviceNameEnv: 'RICK_APP_ID', - serviceNameHeader: 'lineage-source', - instanceName: 'production #1', - instanceNameHeader: 'ricklantis-mixup', - clientVersionHeader: 'x-client-id' - }); - assert.equal(restClient5.serviceNameHeader, 'lineage-source'); - assert.equal(restClient5.serviceName, 'Meeseek'); - assert.equal(restClient5.instanceNameHeader, 'ricklantis-mixup'); - assert.equal(restClient5.instanceName, 'production #1'); - assert.equal(restClient5.clientVersionHeader, 'x-client-id'); - assert.true(restClient5.clientVersion.indexOf('node') > -1); + verifyHeader(assert, PORT, restClient4); - verifyHeader(assert, PORT, restClient5); - assert.end(); + // case 5: pass in everything in config + // result: the instance name and service name provided is used + process.env.UDEPLOY_APP_ID = 'Pickle!'; + process.env.RICK_APP_ID = 'Meeseek'; + process.env.UDEPLOY_DEPLOYMENT_NAME = 'Planet-Express'; + process.env.MORTY_INSTANCE_ID = 'Citadel'; + var restClient5 = new KafkaRestClient({ + proxyHost: configs.proxyHost, + proxyPort: configs.proxyPort, + refreshTime: configs.proxyRefreshTime, + maxRetries: 3, + serviceName: 'Meeseek', + serviceNameEnv: 'RICK_APP_ID', + serviceNameHeader: 'lineage-source', + instanceName: 'production #1', + instanceNameHeader: 'ricklantis-mixup', + clientVersionHeader: 'x-client-id' + }); + assert.equal(restClient5.serviceNameHeader, 'lineage-source'); + assert.equal(restClient5.serviceName, 'Meeseek'); + assert.equal(restClient5.instanceNameHeader, 'ricklantis-mixup'); + assert.equal(restClient5.instanceName, 'production #1'); + assert.equal(restClient5.clientVersionHeader, 'x-client-id'); + assert.true(restClient5.clientVersion.indexOf('node') > -1); + verifyHeader(assert, PORT, restClient5); + assert.end(); -}); + }); +/* eslint-enable max-statements */ From e88b8c04c269ff3f6d5a3826be389af24b9306b0 Mon Sep 17 00:00:00 2001 From: Xiaoman Dong Date: Mon, 9 Apr 2018 11:21:41 -0700 Subject: [PATCH 02/10] fix version so there is no exception --- package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/package.json b/package.json index 785c1da..369923a 100644 --- a/package.json +++ b/package.json @@ -32,7 +32,7 @@ "sinon": "*", "tape": "^4.2.0", "uber-licence": "^1.2.0", - "uber-standard": "^3.6.4" + "uber-standard": "^5.1.0" }, "license": "MIT", "scripts": { From a9afaaf289748a801252eb25c129ed7ebab739b7 Mon Sep 17 00:00:00 2001 From: Xiaoman Dong Date: Thu, 12 Apr 2018 11:25:14 -0700 Subject: [PATCH 03/10] fix jshint errors --- lib/kafka_base_producer.js | 31 +++++++++++++++++-------------- lib/kafka_rest_client.js | 2 ++ lib/message_batch.js | 2 +- package.json | 2 +- test/test_kafka_rest_client.js | 1 + 5 files changed, 22 insertions(+), 16 deletions(-) diff --git a/lib/kafka_base_producer.js b/lib/kafka_base_producer.js index d8ea582..56102ee 100644 --- a/lib/kafka_base_producer.js +++ b/lib/kafka_base_producer.js @@ -237,7 +237,7 @@ KafkaBaseProducer.prototype.produce = function produce(topic, message, timeStamp return callback(new Error('cannot produce to closed KafkaRestProducer')); } // TODO: else { self.logger.error() } - return null; + return undefined; } if (self.restClient) { @@ -262,7 +262,7 @@ KafkaBaseProducer.prototype.produce = function produce(topic, message, timeStamp } else if (callback) { return callback(new Error('Kafka Rest Client is not initialized!')); } - return null; + return undefined; }; KafkaBaseProducer.prototype.produceSync = function produce(topic, producerRecord, callback) { @@ -272,7 +272,7 @@ KafkaBaseProducer.prototype.produceSync = function produce(topic, producerRecord if (callback) { return callback(new Error('cannot produceSync to closed KafkaDataProducer')); } - return null; + return undefined; } var err = producerRecord.validate(); @@ -280,7 +280,7 @@ KafkaBaseProducer.prototype.produceSync = function produce(topic, producerRecord if (callback) { return callback(err); } - return null; + return undefined; } if (self.restClient) { @@ -295,7 +295,7 @@ KafkaBaseProducer.prototype.produceSync = function produce(topic, producerRecord } else if (callback) { return callback(new Error('Kafka Rest Client is not initialized!')); } - return null; + return undefined; }; KafkaBaseProducer.prototype._produce = function _produce(msg, callback) { @@ -304,17 +304,20 @@ KafkaBaseProducer.prototype._produce = function _produce(msg, callback) { self.pendingWrites++; self.restClient.produce(msg, onResponse); + /* eslint-disable callback-return */ function onResponse(err, result) { + if (callback) { - return callback(err, result); + callback(err, result); } self.pendingWrites--; + if (self.closing) { self._tryClose(); } - return null; } + /* eslint-enable callback-return */ }; // Add message to topic's BatchMessage in cache or add new topic to cache @@ -329,7 +332,7 @@ KafkaBaseProducer.prototype.batch = function batch(topic, message, timeStamp, ca return callback(new Error('cannot batch() to closed KafkaRestProducer')); } // TODO: else { self.logger.error() } - return null; + return undefined; } if (typeof message !== 'string' && !Buffer.isBuffer(message)) { @@ -337,7 +340,7 @@ KafkaBaseProducer.prototype.batch = function batch(topic, message, timeStamp, ca return callback(new Error('For batching, message must be a string or buffer, not ' + (typeof message))); } // TODO: else { self.logger.error(); } - return null; + return undefined; } var messageBatch; @@ -390,7 +393,7 @@ KafkaBaseProducer.prototype._produceBatch = function _produceBatch(messageBatch, if (callback) { return callback(err, res); } - return null; + return undefined; } }; @@ -424,9 +427,9 @@ KafkaBaseProducer.prototype.flushEntireCache = function flushEntireCache(callbac if (--pending === 0 && callback) { return callback(null); } - return null; + return undefined; } - return null; + return undefined; }; KafkaBaseProducer.prototype._auditNewMsg = function _auditNewMsg(topic, timeBeginInSec, msgCount, topicToMsgcntMaps) { @@ -508,7 +511,7 @@ KafkaBaseProducer.prototype._generateAuditMsg = function _generateAuditMsg(timeB /* eslint-enable camelcase */ return JSON.stringify(auditMsg); } - return null; + return undefined; }; /* jshint maxparams: 5 */ @@ -537,7 +540,7 @@ KafkaBaseProducer.prototype.logLineWithTimeStamp = function logLine(topic, messa if (callback) { return callback(err, res); } - return null; + return undefined; }); }; diff --git a/lib/kafka_rest_client.js b/lib/kafka_rest_client.js index 60d129c..9a9d66b 100644 --- a/lib/kafka_rest_client.js +++ b/lib/kafka_rest_client.js @@ -27,7 +27,9 @@ var lbPool = require('lb_pool'); var os = require('os'); var utils = require('./utils'); /* eslint-disable no-unused-vars */ +/* jshint -W098 */ var pkginfo = require('pkginfo')(module); +/* jshint +W089 */ /* eslint-enable no-unused-vars */ var supportContentType = { diff --git a/lib/message_batch.js b/lib/message_batch.js index 8304b48..11199a4 100644 --- a/lib/message_batch.js +++ b/lib/message_batch.js @@ -74,7 +74,7 @@ MessageBatch.prototype.addMessage = function addMessage(message, callback) { return callback(err); } // TODO: else { self.logger.error(err); } - return null; + return undefined; } self.cachedBuf.writeInt32BE(bytesWritten - KAFKA_PADDINGS.MESSAGE_PADDING_BYTES, offset); diff --git a/package.json b/package.json index 369923a..b971af3 100644 --- a/package.json +++ b/package.json @@ -20,7 +20,7 @@ "itape": "^1.5.0", "jscoverage": "*", "jscs": "^1.7.3", - "jshint": "^2.5.2", + "jshint": "^2.9.5", "mocha": "*", "mockery": "*", "nodemock": "*", diff --git a/test/test_kafka_rest_client.js b/test/test_kafka_rest_client.js index 9321d7e..361529a 100644 --- a/test/test_kafka_rest_client.js +++ b/test/test_kafka_rest_client.js @@ -212,6 +212,7 @@ function verifyHeader(assert, PORT, restClient) { var topicName = 'LOGGING TOPICS'; var timeStamp = Date.now() / 1000.0; + /* jshint maxparams: 5 */ function OverriddenHttpClient(expectedServiceNameHeader, expectedServiceName, expectedClientVersionHeader, expectedInstanceNameHeader, expectedInstanceName) { this.expectedServiceNameHeader = expectedServiceNameHeader; From 0ea2aceddb161ecb72653182430e4db3d5848e76 Mon Sep 17 00:00:00 2001 From: Xiaoman Dong Date: Thu, 12 Apr 2018 11:32:36 -0700 Subject: [PATCH 04/10] update test --- test/test_kafka_rest_client.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/test_kafka_rest_client.js b/test/test_kafka_rest_client.js index 361529a..072b86b 100644 --- a/test/test_kafka_rest_client.js +++ b/test/test_kafka_rest_client.js @@ -240,7 +240,7 @@ function verifyHeader(assert, PORT, restClient) { restClient.urlToHttpClientMapping[urlPath] = mockedHttpClient; restClient.produce(getProduceMessage(topicName, 'bla', timeStamp, 'binary'), function assertErrorThrows(err) { - assert.true(err !== null); + assert.true(err !== null && err !== undefined); }); assert.true(mockedHttpClient.postMethodCalled); } From 2ebc15d30062751046b0678816b2e2b7e7da4cd8 Mon Sep 17 00:00:00 2001 From: Xiaoman Dong Date: Thu, 12 Apr 2018 16:51:08 -0700 Subject: [PATCH 05/10] try fixing travis --- lib/kafka_rest_client.js | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/lib/kafka_rest_client.js b/lib/kafka_rest_client.js index 9a9d66b..6e951c0 100644 --- a/lib/kafka_rest_client.js +++ b/lib/kafka_rest_client.js @@ -255,8 +255,7 @@ KafkaRestClient.prototype.discoverTopics = function discoverTopics(callback) { self.enable = true; self.cachedTopicToUrlMapping = self.getTopicToUrlMapping(res); self.topicDiscoveryTimes++; - return callback(err, true); - + return callback(null, true); }); }; From 85fd42c8de4afceb4306ba6f5418d0140e48d770 Mon Sep 17 00:00:00 2001 From: Xiaoman Dong Date: Thu, 12 Apr 2018 16:52:44 -0700 Subject: [PATCH 06/10] try travis fix --- package.json | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/package.json b/package.json index b971af3..74b2313 100644 --- a/package.json +++ b/package.json @@ -7,9 +7,11 @@ ], "main": "./index.js", "dependencies": { + "@sinonjs/formatio": "^2.0.0", "error": "^7.0.1", "lb_pool": "^1.2.0", - "pkginfo": "^0.4.1" + "pkginfo": "^0.4.1", + "sinon": "^4.5.0" }, "devDependencies": { "async": "^1.5.2", From 767d72e58242d622a7095730f1ef416faf5d4a97 Mon Sep 17 00:00:00 2001 From: Xiaoman Dong Date: Thu, 12 Apr 2018 17:37:36 -0700 Subject: [PATCH 07/10] try fix travis build --- .npmrc | 1 + 1 file changed, 1 insertion(+) create mode 100644 .npmrc diff --git a/.npmrc b/.npmrc new file mode 100644 index 0000000..94bf4ee --- /dev/null +++ b/.npmrc @@ -0,0 +1 @@ +@sinonjs:registry=http://registry.npmjs.org From 409e4953028b591f897759b1d3220dca006fb951 Mon Sep 17 00:00:00 2001 From: Xiaoman Dong Date: Fri, 13 Apr 2018 11:04:57 -0700 Subject: [PATCH 08/10] fix accidental changes --- lib/kafka_base_producer.js | 2 +- lib/kafka_rest_client.js | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/kafka_base_producer.js b/lib/kafka_base_producer.js index 56102ee..6343538 100644 --- a/lib/kafka_base_producer.js +++ b/lib/kafka_base_producer.js @@ -511,7 +511,7 @@ KafkaBaseProducer.prototype._generateAuditMsg = function _generateAuditMsg(timeB /* eslint-enable camelcase */ return JSON.stringify(auditMsg); } - return undefined; + return null; }; /* jshint maxparams: 5 */ diff --git a/lib/kafka_rest_client.js b/lib/kafka_rest_client.js index 6e951c0..c310a67 100644 --- a/lib/kafka_rest_client.js +++ b/lib/kafka_rest_client.js @@ -465,7 +465,7 @@ KafkaRestClient.prototype.handleError4xx = function handleError2(produceMessage, if (self.goToLocalAgent(produceMessage.topic) && self.maxRetries > 1) { self.tryMakeRequest(produceMessage, self.maxRetries - 1, callback); } else { - return callback(new Error('Got statusCode ' + res.statusCode)); + return void callback(new Error('Got statusCode ' + res.statusCode)); } }; From 4c5375bca5bf77680f07bd83c8540b8e6d2d59d7 Mon Sep 17 00:00:00 2001 From: Xiaoman Dong Date: Fri, 13 Apr 2018 11:11:40 -0700 Subject: [PATCH 09/10] fix unit test failure --- lib/kafka_rest_client.js | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/lib/kafka_rest_client.js b/lib/kafka_rest_client.js index c310a67..99df529 100644 --- a/lib/kafka_rest_client.js +++ b/lib/kafka_rest_client.js @@ -414,9 +414,9 @@ KafkaRestClient.prototype.tryMakeRequest = function tryMakeRequest(produceMessag } if (err || (res && res.statusCode >= 500 && res.statusCode < 600)) { - self.handleError5xx(produceMessage, retry, metricPrefix, err, callback); + return self.handleError5xx(produceMessage, retry, metricPrefix, err, callback); } else if (res.statusCode >= 400 && res.statusCode < 500) { - self.handleError4xx(produceMessage, res, callback); + return self.handleError4xx(produceMessage, res, callback); } else if (self.clientType !== 'AtLeastOnce' || (self.clientType === 'AtLeastOnce' && pathUrl !== self.localAgentHost + ':' + self.localAgentPort)) { return callback(null, body); @@ -455,7 +455,7 @@ KafkaRestClient.prototype.handleError5xx = function handleError1(produceMessage, if (self.statsd) { self.statsd.increment(metricPrefix + '.error'); } - return callback(err); + return void callback(err); } }; From ebcfa97dff738e19d5f28cf774fdf91923075de9 Mon Sep 17 00:00:00 2001 From: Xiaoman Dong Date: Fri, 13 Apr 2018 11:21:04 -0700 Subject: [PATCH 10/10] remove the else in else-if-return-else-if-return --- lib/kafka_rest_client.js | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/lib/kafka_rest_client.js b/lib/kafka_rest_client.js index 99df529..60a0c6c 100644 --- a/lib/kafka_rest_client.js +++ b/lib/kafka_rest_client.js @@ -415,9 +415,11 @@ KafkaRestClient.prototype.tryMakeRequest = function tryMakeRequest(produceMessag if (err || (res && res.statusCode >= 500 && res.statusCode < 600)) { return self.handleError5xx(produceMessage, retry, metricPrefix, err, callback); - } else if (res.statusCode >= 400 && res.statusCode < 500) { + } + if (res.statusCode >= 400 && res.statusCode < 500) { return self.handleError4xx(produceMessage, res, callback); - } else if (self.clientType !== 'AtLeastOnce' || + } + if (self.clientType !== 'AtLeastOnce' || (self.clientType === 'AtLeastOnce' && pathUrl !== self.localAgentHost + ':' + self.localAgentPort)) { return callback(null, body); }