diff --git a/.travis.yml b/.travis.yml index 03bede2..174b998 100644 --- a/.travis.yml +++ b/.travis.yml @@ -5,3 +5,6 @@ node_js: notifications: email: - xiangfu@uber.com + +before_install: + - if [[ `npm -v` != 2* ]]; then npm i -g npm@2; fi \ No newline at end of file diff --git a/lib/kafka_base_producer.js b/lib/kafka_base_producer.js index 6343538..4d193da 100644 --- a/lib/kafka_base_producer.js +++ b/lib/kafka_base_producer.js @@ -307,12 +307,12 @@ KafkaBaseProducer.prototype._produce = function _produce(msg, callback) { /* eslint-disable callback-return */ function onResponse(err, result) { + self.pendingWrites--; + if (callback) { callback(err, result); } - self.pendingWrites--; - if (self.closing) { self._tryClose(); } diff --git a/package.json b/package.json index b94ccca..18f1883 100644 --- a/package.json +++ b/package.json @@ -9,7 +9,7 @@ "dependencies": { "@sinonjs/formatio": "^2.0.0", "error": "^7.0.1", - "lb_pool": "^1.2.0", + "lb_pool": "^1.7.1", "pkginfo": "^0.4.1", "sinon": "^4.5.0" }, @@ -29,7 +29,7 @@ "opn": "^0.1.2", "pre-commit": "0.0.9", "proxyquire": "*", - "rewire": "*", + "rewire": "3.0.2", "should": "*", "sinon": "*", "tape": "^4.2.0", @@ -50,8 +50,12 @@ "view-cover": "opn ./coverage/*/index.html" }, "engine": { - "node": ">= 0.10.x" + "node": ">= 0.10.32" }, + "engines": { + "node": "^0.10.32", + "npm": "2.15.11" + }, "pre-commit": [ "check-licence", "lint", diff --git a/test/test_kafka_producer.js b/test/test_kafka_producer.js index 667d59a..18c43ef 100644 --- a/test/test_kafka_producer.js +++ b/test/test_kafka_producer.js @@ -450,13 +450,15 @@ test('Test generate audit msg', function testKafkaProducerGenerateAuditMsg(asser proxyHost: 'localhost', proxyPort: PORT, enableAudit: true, - auditTimeBucketIntervalInSec: 1 + auditTimeBucketIntervalInSec: 1, + timeout: 0, + maxRetries: 0 }; var producer = new KafkaProducer(configs); producer.connect(onConnect); function onConnect() { assert.equal(producer.producer.restClient.enable, true); - for (var i = 0; i < 5120000; i++) { + for (var i = 0; i < 512000; i++) { producer.produce('testTopic0', 'Important message', Date.now() / 1000.0); } producer.produce('testTopic1', 'Important message', Date.now() / 1000.0); @@ -491,7 +493,7 @@ test('Test generate audit msg', function testKafkaProducerGenerateAuditMsg(asser } /* eslint-enable block-scoped-var */ - assert.equal(cntTestTopic0, 5120000); + assert.equal(cntTestTopic0, 512000); assert.equal(cntTestTopic1, 2); assert.equal(cntTestTopic2, 3); assert.end(); @@ -499,10 +501,10 @@ test('Test generate audit msg', function testKafkaProducerGenerateAuditMsg(asser /* eslint-enable camelcase */ /* eslint-disable no-undef,block-scoped-var */ + server.stop(); setTimeout(function stopTest1() { - server.stop(); producer.close(); - }, 2000); + }, 5000); /* eslint-enable no-undef,block-scoped-var */ } }); @@ -557,7 +559,7 @@ test('kafkaProducer handle no meta data situation', function testKafkaProducerHa function test2(next) { kafkaProducer.produce('hp_testTopic0', 'Important message', Date.now() / 1000.0, function assertErrorThrows2(err) { - assert.equal(err.reason, 'connect ECONNREFUSED'); + assert.true(err.reason.indexOf('connect ECONNREFUSED') >= 0); next(); }); } diff --git a/test/test_kafka_rest_client.js b/test/test_kafka_rest_client.js index 072b86b..80304eb 100644 --- a/test/test_kafka_rest_client.js +++ b/test/test_kafka_rest_client.js @@ -93,14 +93,14 @@ test('KafkaRestClient handle failed post with retries', function testKafkaRestCl function test1(next) { restClient.produce(getProduceMessage('testTopic0', 'msg0', timeStamp, 'binary'), function assertHttpErrorReason(err) { - assert.equal(err.reason, 'connect ECONNREFUSED'); + assert.true(err.reason.indexOf('connect ECONNREFUSED') >= 0); next(); }); }, function test2(next) { restClient.produce(getProduceMessage('hp.testTopic1', 'msg1', timeStamp, 'binary'), function assertErrorThrows(err) { - assert.equal(err.reason, 'connect ECONNREFUSED'); + assert.true(err.reason.indexOf('connect ECONNREFUSED') >= 0); next(); }); } @@ -110,7 +110,7 @@ test('KafkaRestClient handle failed post with retries', function testKafkaRestCl function test3(next) { restClient.produce(getProduceMessage('testTopic0', 'msg1', timeStamp, 'binary'), function assertErrorThrows(err) { - assert.equal(err.reason, 'connect ECONNREFUSED'); + assert.true(err.reason.indexOf('connect ECONNREFUSED') >= 0); next(); }); }, @@ -149,14 +149,14 @@ test('KafkaRestClient handle not cached topics', function testKafkaRestClientHan function test1(next) { restClient.produce(getProduceMessage('hp-testTopic-not-in-map', 'msg0', timeStamp, 'binary'), function assertErrorThrows(err) { - assert.equal(err.reason, 'connect ECONNREFUSED'); + assert.true(err.reason.indexOf('connect ECONNREFUSED') >= 0); next(); }); }, function test2(next) { restClient.produce(getProduceMessage('testTopic-not-in-map', 'msg0', timeStamp, 'binary'), function assertHttpErrorReason(err) { - assert.equal(err.reason, 'connect ECONNREFUSED'); + assert.true(err.reason.indexOf('connect ECONNREFUSED') >= 0); server.start(); restClient.produce(getProduceMessage('testTopic-not-in-map', 'msg0', timeStamp, 'binary'), function assertHttpErrorReason2(err2) { @@ -190,14 +190,14 @@ test('KafkaRestClient handle post with blacklist client', function testKafkaRest function test1(next) { restClient.produce(getProduceMessage('testTopic0', 'msg0', timeStamp, 'binary'), function assertHttpErrorReason(err) { - assert.equal(err.reason, 'connect ECONNREFUSED'); + assert.true(err.reason.indexOf('connect ECONNREFUSED') >= 0); next(); }); }, function test2(next) { restClient.produce(getProduceMessage('testTopic1', 'msg0', timeStamp, 'binary'), function assertErrorThrows(err, resp) { - assert.equal(err.reason, 'connect ECONNREFUSED'); + assert.true(err.reason.indexOf('connect ECONNREFUSED') >= 0); assert.equal(resp, undefined); next(); });