Skip to content

Commit

Permalink
Merge pull request #51 from uber-common/fix_lint_error
Browse files Browse the repository at this point in the history
Address all lint issue
  • Loading branch information
dongxiaoman authored Apr 13, 2018
2 parents f7d8144 + ebcfa97 commit 007c4c7
Show file tree
Hide file tree
Showing 8 changed files with 230 additions and 205 deletions.
1 change: 1 addition & 0 deletions .npmrc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
@sinonjs:registry=http://registry.npmjs.org
81 changes: 51 additions & 30 deletions lib/kafka_base_producer.js
Original file line number Diff line number Diff line change
Expand Up @@ -113,10 +113,13 @@ function KafkaBaseProducer(options, producerType) { // eslint-disable-line
if (self.batching) {
// default 100kb buffer cache per a topic
self.maxBatchSizeBytes = options.maxBatchSizeBytes || 100000;
self.topicToBatchQueue = {}; // map of topic name to MessageBatch
// map of topic name to MessageBatch
self.topicToBatchQueue = {};

self.flushCycleSecs = options.flushCycleSecs || 1; // flush a topic's batch message every second
var flushCache = function flushCache() { // eslint-disable-line
// flush a topic's batch message every second
self.flushCycleSecs = options.flushCycleSecs || 1;
var flushCache = function flushCache() {
// eslint-disable-line
self.flushEntireCache();
};
self.flushInterval = setInterval(flushCache, self.flushCycleSecs * 1000); // eslint-disable-line
Expand All @@ -132,7 +135,8 @@ function KafkaBaseProducer(options, producerType) { // eslint-disable-line
var auditMsg = auditMsgs[i];
self.produce(self.auditTopicName, auditMsg, (Date.now() / 1000));
}
self.topicToMsgcntMaps = {}; // reset the msg count map for next round of auditing
// reset the msg count map for next round of auditing
self.topicToMsgcntMaps = {};
};

self.auditInterval = setInterval(produceAuditMsg, self.auditTimeBucketIntervalInSec * 1000);
Expand All @@ -150,15 +154,17 @@ function KafkaBaseProducer(options, producerType) { // eslint-disable-line
var auditMsgAtProduce = auditMsgsAtProduce[i];
self.produce(self.auditTopicNameC3, auditMsgAtProduce, (Date.now() / 1000));
}
self.topicToMsgcntMapsAtProduce = {}; // reset the msg count map for next round of auditing
// reset the msg count map for next round of auditing
self.topicToMsgcntMapsAtProduce = {};

var auditMsgsAtBatch = self._generateAuditMsgs(self.auditTierAtBatch, self.auditDatacenter,
self.topicToMsgcntMapsAtBatch);
for (var j = 0; j < auditMsgsAtBatch.length; j++) {
var auditMsgAtBatch = auditMsgsAtBatch[j];
self.produce(self.auditTopicNameC3, auditMsgAtBatch, (Date.now() / 1000));
}
self.topicToMsgcntMapsAtBatch = {}; // reset the msg count map for next round of auditing
// reset the msg count map for next round of auditing
self.topicToMsgcntMapsAtBatch = {};
};

self.auditIntervalC3 = setInterval(produceC3AuditMsg, self.auditTimeBucketIntervalInSec * 1000);
Expand Down Expand Up @@ -228,9 +234,10 @@ KafkaBaseProducer.prototype.produce = function produce(topic, message, timeStamp

if (self.closing) {
if (callback) {
callback(new Error('cannot produce to closed KafkaRestProducer'));
} // TODO: else { self.logger.error() }
return;
return callback(new Error('cannot produce to closed KafkaRestProducer'));
}
// TODO: else { self.logger.error() }
return undefined;
}

if (self.restClient) {
Expand All @@ -253,26 +260,27 @@ KafkaBaseProducer.prototype.produce = function produce(topic, message, timeStamp
}

} else if (callback) {
callback(new Error('Kafka Rest Client is not initialized!'));
return callback(new Error('Kafka Rest Client is not initialized!'));
}
return undefined;
};

KafkaBaseProducer.prototype.produceSync = function produce(topic, producerRecord, callback) {
var self = this;

if (self.closing) {
if (callback) {
callback(new Error('cannot produceSync to closed KafkaDataProducer'));
return callback(new Error('cannot produceSync to closed KafkaDataProducer'));
}
return;
return undefined;
}

var err = producerRecord.validate();
if (err !== null) {
if (callback) {
callback(err);
return callback(err);
}
return;
return undefined;
}

if (self.restClient) {
Expand All @@ -285,8 +293,9 @@ KafkaBaseProducer.prototype.produceSync = function produce(topic, producerRecord
self._auditNewMsg(topic, timeBeginInSecC3, 1, self.topicToMsgcntMapsAtProduce);
}
} else if (callback) {
callback(new Error('Kafka Rest Client is not initialized!'));
return callback(new Error('Kafka Rest Client is not initialized!'));
}
return undefined;
};

KafkaBaseProducer.prototype._produce = function _produce(msg, callback) {
Expand All @@ -295,16 +304,20 @@ KafkaBaseProducer.prototype._produce = function _produce(msg, callback) {
self.pendingWrites++;
self.restClient.produce(msg, onResponse);

/* eslint-disable callback-return */
function onResponse(err, result) {

if (callback) {
callback(err, result);
}

self.pendingWrites--;

if (self.closing) {
self._tryClose();
}
}
/* eslint-enable callback-return */
};

// Add message to topic's BatchMessage in cache or add new topic to cache
Expand All @@ -316,16 +329,18 @@ KafkaBaseProducer.prototype.batch = function batch(topic, message, timeStamp, ca

if (self.closing) {
if (callback) {
callback(new Error('cannot batch() to closed KafkaRestProducer'));
} // TODO: else { self.logger.error() }
return;
return callback(new Error('cannot batch() to closed KafkaRestProducer'));
}
// TODO: else { self.logger.error() }
return undefined;
}

if (typeof message !== 'string' && !Buffer.isBuffer(message)) {
if (callback) {
callback(new Error('For batching, message must be a string or buffer, not ' + (typeof message)));
} // TODO: else { self.logger.error(); }
return;
return callback(new Error('For batching, message must be a string or buffer, not ' + (typeof message)));
}
// TODO: else { self.logger.error(); }
return undefined;
}

var messageBatch;
Expand Down Expand Up @@ -376,8 +391,9 @@ KafkaBaseProducer.prototype._produceBatch = function _produceBatch(messageBatch,
}

if (callback) {
callback(err, res);
return callback(err, res);
}
return undefined;
}
};

Expand All @@ -399,7 +415,7 @@ KafkaBaseProducer.prototype.flushEntireCache = function flushEntireCache(callbac
}

if (pending === 0 && callback) {
callback(null);
return callback(null);
}

function onProduced(err) {
Expand All @@ -409,9 +425,11 @@ KafkaBaseProducer.prototype.flushEntireCache = function flushEntireCache(callbac
}

if (--pending === 0 && callback) {
callback(null);
return callback(null);
}
return undefined;
}
return undefined;
};

KafkaBaseProducer.prototype._auditNewMsg = function _auditNewMsg(topic, timeBeginInSec, msgCount, topicToMsgcntMaps) {
Expand All @@ -435,9 +453,8 @@ KafkaBaseProducer.prototype._getTimeBeginInSec = function _getTimeBeginInSec(now
// 1000000000000 (13 digits), as second, means Fri Sep 27 33658 01:46:40....
if (nowInSec > 999999999999.0) {
return Math.floor((nowInSec / 1000) / self.auditTimeBucketIntervalInSec) * self.auditTimeBucketIntervalInSec;
} else {
return Math.floor(nowInSec / self.auditTimeBucketIntervalInSec) * self.auditTimeBucketIntervalInSec;
}
return Math.floor(nowInSec / self.auditTimeBucketIntervalInSec) * self.auditTimeBucketIntervalInSec;
};

KafkaBaseProducer.prototype._getTimeBeginInSecFromHp = function _getTimeBeginInSecFromHp(message) {
Expand Down Expand Up @@ -521,8 +538,9 @@ KafkaBaseProducer.prototype.logLineWithTimeStamp = function logLine(topic, messa
var wholeMessage = JSON.stringify(self.getWholeMsg(topic, message, timeStamp));
self.produce(topic, wholeMessage, timeStamp, function handleResponse(err, res) {
if (callback) {
callback(err, res);
return callback(err, res);
}
return undefined;
});
};

Expand Down Expand Up @@ -583,7 +601,8 @@ KafkaBaseProducer.prototype.close = function close(callback) {
(Date.now() / 1000), 'binary');
self._produce(produceMessage);
}
self.topicToMsgcntMaps = {}; // reset the msg count map for next round of auditing
// reset the msg count map for next round of auditing
self.topicToMsgcntMaps = {};
}

if (self.auditIntervalC3) {
Expand All @@ -595,7 +614,8 @@ KafkaBaseProducer.prototype.close = function close(callback) {
(Date.now() / 1000), 'binary');
self._produce(produceMessageAtProduce);
}
self.topicToMsgcntMapsAtProduce = {}; // reset the msg count map for next round of auditing
// reset the msg count map for next round of auditing
self.topicToMsgcntMapsAtProduce = {};

var auditMsgsAtBatch = self._generateAuditMsgs(self.auditTierAtBatch, self.auditDatacenter,
self.topicToMsgcntMapsAtBatch);
Expand All @@ -604,7 +624,8 @@ KafkaBaseProducer.prototype.close = function close(callback) {
(Date.now() / 1000), 'binary');
self._produce(produceMessageAtBatch);
}
self.topicToMsgcntMapsAtBatch = {}; // reset the msg count map for next round of auditing
// reset the msg count map for next round of auditing
self.topicToMsgcntMapsAtBatch = {};
}

self._tryClose();
Expand Down
Loading

0 comments on commit 007c4c7

Please sign in to comment.