Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for several event types in the same topic #71

Merged
merged 65 commits into from
Sep 24, 2019
Merged
Show file tree
Hide file tree
Changes from 62 commits
Commits
Show all changes
65 commits
Select commit Hold shift + click to select a range
dec045b
Chore(Support multiply types in a single topics): Changing producer t…
Aug 29, 2019
02682d0
Chore(Support multiply types in a single topics): Reading and produci…
Sep 4, 2019
7597194
Chore(Support multiply types in a single topics): Fixing the base tests.
Sep 5, 2019
2f65ca4
refactoring(Support multiply types in a single topics):
Sep 5, 2019
8db6159
refactoring(Support multiply types in a single topics):
Sep 5, 2019
2e4fd93
chor(Support multiply types in a single topics): Enable circleCi
Sep 5, 2019
7204e4c
chor(Support multiply types in a single topics): Enable circleCi
Sep 5, 2019
a18d644
chor(Support multiply types in a single topics): Enable circleCi
Sep 5, 2019
70992bc
chor(Support multiply types in a single topics): Enable circleCi
Sep 5, 2019
de4a90a
chore(Setting up circleCi)
Sep 5, 2019
e58d81b
chore(Setting up circleCi)
Sep 5, 2019
3102dc6
chore(Setting up circleCi)
Sep 5, 2019
920b5d5
chore(Setting up circleCi)
Sep 5, 2019
fe779dc
Enable docker
Sep 5, 2019
807455f
Enable docker
Sep 5, 2019
3d4a4fa
configuration(Setting up tests)
Sep 5, 2019
a6b110c
configuration(Setting up tests)
Sep 5, 2019
af33bd5
configuration(downgrading npm version used by circleCi)
Sep 5, 2019
e578795
configuration(downgrading npm and node versions used by circleCi)
Sep 5, 2019
25a197b
configuration(downgrading npm and node versions used by circleCi)
Sep 5, 2019
51930e5
configuration(downgrading npm and node versions used by circleCi)
Sep 5, 2019
26680e0
configuration(downgrading npm and node versions used by circleCi)
Sep 5, 2019
3b41466
configuration(Setting up tests)
Sep 5, 2019
aa86743
configuration(Setting up tests)
Sep 5, 2019
ab2c06d
configuration(Setting up tests)
Sep 5, 2019
4ac4bc5
configuration(merging build and test workflows into one in cricleCi)
Sep 5, 2019
22a8f5f
configuration(setting up kafka hostnames in cricleCi)
Sep 5, 2019
b75c9e0
configuration(setting up kafka hostnames in cricleCi)
Sep 5, 2019
2fd0801
configuration(cricleCi)
Sep 5, 2019
d72e243
configuration(troubleshooting circleCi network problems when connecti…
Sep 5, 2019
8a7bb14
configuration(troubleshooting circleCi network problems when connecti…
Sep 5, 2019
36fc50a
configuration(troubleshooting circleCi network problems when connecti…
Sep 5, 2019
d98ef6c
configuration(troubleshooting circleCi network problems when connecti…
Sep 5, 2019
25f7a6c
configuration(troubleshooting circleCi network problems when connecti…
Sep 5, 2019
bc50656
configuration(troubleshooting circleCi network problems when connecti…
Sep 5, 2019
7812765
configuration(troubleshooting circleCi network problems when connecti…
Sep 5, 2019
8080b19
configuration(troubleshooting circleCi network problems)
Sep 5, 2019
818ba53
configuration(troubleshooting circleCi network problems)
Sep 5, 2019
b59f587
configuration(troubleshooting circleCi network problems)
Sep 5, 2019
a7bfd60
configuration(troubleshooting)
Sep 5, 2019
c44fa92
configuration(troubleshooting)
Sep 5, 2019
6c88861
configuration(troubleshooting)
Sep 5, 2019
040200c
configuration(troubleshooting)
Sep 5, 2019
d105e2f
configuration(troubleshooting)
Sep 6, 2019
7b6e887
configuration(tweaking circleCi build)
Sep 6, 2019
788137a
configuration(simplifying circleCi build)
Sep 6, 2019
05d714f
fix(incorrect implementation of io.confluent.kafka.serializers.subjec…
Sep 6, 2019
a97eccf
documentation(fixing icon to circleci test status)
Sep 6, 2019
b315e16
tests(should produce and consume a multi type message using consume "…
Sep 6, 2019
a1d6066
documentation()
Sep 6, 2019
adb1b0c
pr(fixing a typo)
Sep 6, 2019
e2612d2
pr(implementing comments)
Sep 9, 2019
83c2eec
Merge branch 'master' into master
pleszczy Sep 9, 2019
996327a
pr(implementing comments)
Sep 9, 2019
aca5fa7
Merge remote-tracking branch 'origin/master'
Sep 9, 2019
b3a984c
Changing how nodejs is being installed on circleCi. Switching from nv…
Sep 9, 2019
1c0d969
Changing how nodejs is being installed on circleCi. Switching from nv…
Sep 9, 2019
eaa616f
Changing how nodejs is being installed on circleCi. Switching from nv…
Sep 9, 2019
75b8ca8
Changing how nodejs is being installed on circleCi. Switching from nv…
Sep 9, 2019
7887800
Reverting : Changing how nodejs is being installed on circleCi. Going…
Sep 9, 2019
2788b5d
Updating documentation.
Sep 9, 2019
0c042e1
Implementing pull request comments.
Sep 10, 2019
cff58a2
Formatting requierments according to :
Sep 10, 2019
d33c352
Updating changelog.
Sep 18, 2019
e7b882f
Merge branch 'master' into master
pleszczy Sep 18, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,6 @@ npm-debug.log
dump.rdb
wiki
temp
report.html
coverage
.nyc_output
47 changes: 36 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,17 @@ You are highly encouraged to read the ["node-rdkafka" documentation](https://bli
The `Kafka.CODES` enumeration of constant values provided by the "node-rdkafka" library is also available as a static var at:

```js
var KafkaAvro = require('kafka-avro');
const KafkaAvro = require('kafka-avro');

console.log(KafkaAvro.CODES);
```

### Initialize kafka-avro

```js
var KafkaAvro = require('kafka-avro');
const KafkaAvro = require('kafka-avro');

var kafkaAvro = new KafkaAvro({
const kafkaAvro = new KafkaAvro({
kafkaBroker: 'localhost:9092',
schemaRegistry: 'http://localhost:8081',
});
Expand All @@ -66,6 +66,8 @@ When instantiating kafka-avro you may pass the following options:
* `parseOptions` **Object** Schema parse options to pass to `avro.parse()`. `parseOptions.wrapUnions` is set to `true` by default.
* `httpsAgent` **Object** initialized [https Agent class](https://nodejs.org/api/https.html#https_class_https_agent)
* `shouldFailWhenSchemaIsMissing` **Boolean** Set to true if producing a message for which no AVRO schema can be found should throw an error
* `keySubjectStrategy` **String** A SubjectNameStrategy for key. It is used by the Avro serializer to determine the subject name under which the event record schemas should be registered in the schema registry. The default is TopicNameStrategy. Allowed values are [TopicRecordNameStrategy, TopicNameStrategy, RecordNameStrategy]
* `valueSubjectStrategy` **String** **String** A SubjectNameStrategy for value. It is used by the Avro serializer to determine the subject name under which the event record schemas should be registered in the schema registry. The default is TopicNameStrategy. Allowed values are [TopicRecordNameStrategy, TopicNameStrategy, RecordNameStrategy]

### Producer

Expand All @@ -79,17 +81,17 @@ kafkaAvro.getProducer({
})
// "getProducer()" returns a Bluebird Promise.
.then(function(producer) {
var topicName = 'test';
const topicName = 'test';

producer.on('disconnected', function(arg) {
console.log('producer disconnected. ' + JSON.stringify(arg));
});

var value = {name:'John'};
var key = 'key';
const value = {name:'John'};
const key = 'key';

// if partition is set to -1, librdkafka will use the default partitioner
var partition = -1;
const partition = -1;
producer.produce(topicName, partition, value, key);
})
```
Expand Down Expand Up @@ -134,7 +136,7 @@ kafkaAvro.getConsumer({
})
.then(function(consumer) {
// Subscribe and consume.
var topicName = 'test';
const topicName = 'test';
consumer.subscribe([topicName]);
consumer.consume();
consumer.on('data', function(rawData) {
Expand Down Expand Up @@ -201,6 +203,29 @@ kafka-avro intercepts all incoming messages and augments the object with two mor

The KafkaAvro instance also provides the following methods:

### Support for several event types in the same topic
Kafka Avro can support several events types in the same topic. This requires using TopicRecordNameStrategy strategy.

```js
const KafkaAvro = require('kafka-avro');

const kafkaAvro = new KafkaAvro({
kafkaBroker: 'localhost:9092',
schemaRegistry: 'http://localhost:8081',
keySubjectStrategy: "TopicRecordNameStrategy",
valueSubjectStrategy: "TopicRecordNameStrategy",
});

// Query the Schema Registry for all topic-schema's
// fetch them and evaluate them.
kafkaAvro.init()
.then(function() {
console.log('Ready to use');
});
```

You can read more about this here : https://www.confluent.io/blog/put-several-event-types-kafka-topic/

### Logging

The Kafka Avro library logs messages using the [Bunyan logger](https://github.com/trentm/node-bunyan/). To enable logging you will have to define at least one of the needed ENV variables:
Expand All @@ -219,10 +244,10 @@ The Kafka Avro library logs messages using the [Bunyan logger](https://github.co
**Returns** {Bunyan.Logger} [Bunyan logger](https://github.com/trentm/node-bunyan/) instance.

```js
var KafkaAvro = require('kafka-avro');
var fmt = require('bunyan-format');
const KafkaAvro = require('kafka-avro');
const fmt = require('bunyan-format');

var kafkaLog = KafkaAvro.getLogger();
const kafkaLog = KafkaAvro.getLogger();

kafkaLog.addStream({
type: 'stream',
Expand Down
58 changes: 42 additions & 16 deletions circle.yml
Original file line number Diff line number Diff line change
@@ -1,16 +1,42 @@
machine:
pre:
- curl -sSL https://s3.amazonaws.com/circle-downloads/install-circleci-docker.sh | bash -s -- 1.10.0
- pip install docker-compose
services:
- docker
node:
version: 8.2.0
hosts:
kafka: 127.0.0.1

dependencies:
pre:
- docker-compose up -d
- sudo apt-get update;
- sudo apt-get -y install libsasl2-dev libssl-dev
version: 2
jobs:
build:
working_directory: ~/kafka-avro
machine:
image: ubuntu-1604:201903-01
docker_layer_caching: true
steps:
- checkout
- run:
name: Start kafka and schema-registry
command: docker-compose up -d
- run:
name: Install [email protected]
command: |
set +e
curl -o- https://raw.githubusercontent.com/creationix/nvm/v0.33.5/install.sh | bash
export NVM_DIR="/opt/circleci/.nvm"
[ -s "$NVM_DIR/nvm.sh" ] && \. "$NVM_DIR/nvm.sh"
nvm install v8.16
nvm alias default v8.16
# Each step uses the same `$BASH_ENV`, so need to modify it
echo 'export NVM_DIR="/opt/circleci/.nvm"' >> $BASH_ENV
echo "[ -s \"$NVM_DIR/nvm.sh\" ] && . \"$NVM_DIR/nvm.sh\"" >> $BASH_ENV
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can simplify this if we don't use nvm - just raw node. What you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need v8 version of node to build node-rdkafka. Ill see if I can use apt-get to install a concrete version of node.

Copy link
Contributor Author

@pleszczy pleszczy Sep 9, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried but I couldn't get apt-get to install the version I wanted. I've moved back to nvm.

command: |
            curl -sL https://deb.nodesource.com/setup_8.x | sudo -E bash -
            sudo apt install nodejs
            node --version

image`

https://circleci.com/gh/pleszczy/kafka-avro/63

- restore_cache:
key: dependency-cache-{{ checksum "package.json" }}
- save_cache:
key: dependency-cache-{{ checksum "package.json" }}
paths:
- node_modules
- run:
name: Install projects dependencies
command: 'unset NVM_NODEJS_ORG_MIRROR NVM_IOJS_ORG_MIRROR && npm install'
- run:
name: Add hostnames for kafka
command: echo 127.0.0.1 kafka | sudo tee -a /etc/hosts && echo 127.0.0.1 schema-registry | sudo tee -a /etc/hosts && echo 127.0.0.1 zookeeper | sudo tee -a /etc/hosts
- run:
name: print /etc/hosts content
command: cat /etc/hosts
- run:
name: Test
command: npm run-script coverage
68 changes: 34 additions & 34 deletions lib/kafka-avro.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,29 +6,28 @@
* Copyright © Waldo, Inc.
* Licensed under the MIT license.
*/
var EventEmitter = require('events').EventEmitter;

var Promise = require('bluebird');
var cip = require('cip');
var Kafka = require('node-rdkafka');

var rootLog = require('./log.lib');
var log = rootLog.getChild(__filename);

var SchemaRegistry = require('./schema-registry');
const EventEmitter = require('events').EventEmitter;
const Promise = require('bluebird');
const cip = require('cip');
const Kafka = require('node-rdkafka');
const rootLog = require('./log.lib');
const log = rootLog.getChild(__filename);
const SchemaRegistry = require('./schema-registry');
const { SubjectNameStrategy } = require('./subject-strategy');

//
// Mixins
//
var Producer = require('./kafka-producer');
var Consumer = require('./kafka-consumer');
const Producer = require('./kafka-producer');
const Consumer = require('./kafka-consumer');

var CeventEmitter = cip.cast(EventEmitter);
const CeventEmitter = cip.cast(EventEmitter);

function noop() {}
function noop() {
}

/**
* @fileOverview bootstrap and master exporing module.
* @fileOverview bootstrap and master exporting module.
*/

/**
Expand All @@ -37,30 +36,32 @@ function noop() {}
* @param {Object} opts The options.
* @constructor
*/
var KafkaAvro = module.exports = CeventEmitter.extend(function(opts) {
const KafkaAvro = module.exports = CeventEmitter.extend(function (opts) {
/** @type {string} The SR url */
this.kafkaBrokerUrl = opts.kafkaBroker;

var srOpts = {
const srOpts = {
schemaRegistryUrl: opts.schemaRegistry,
selectedTopics: opts.topics || null,
fetchAllVersions: opts.fetchAllVersions || false,
fetchRefreshRate: opts.fetchRefreshRate || 0,
parseOptions: opts.parseOptions,
httpsAgent: opts.httpsAgent
httpsAgent: opts.httpsAgent,
keySubjectStrategy: new SubjectNameStrategy(opts.keySubjectStrategy),
valueSubjectStrategy: new SubjectNameStrategy(opts.valueSubjectStrategy),
};

/** @type {kafka-avro.SchemaRegistry} Instanciated SR. */
/** @type {kafka-avro.SchemaRegistry} Instantiated SR. */
this.sr = new SchemaRegistry(srOpts);

/** @type {boolean} Whether the producer should fail when no schema was found. */
this._shouldFailWhenSchemaIsMissing = opts.shouldFailWhenSchemaIsMissing === true;

/** @type {Array.<node-rdkafka.Producer>} Instanciated producers. */
this._producers = [];
/** @type {Array.<node-rdkafka.Consumer>} Instanciated consumers. */
/** @type {Array.<node-rdkafka.Consumer>} Instantiated consumers. */
this._consumers = [];
/** @type {Array.<node-rdkafka.ConsumerStream>} Instanciated consumers. */
/** @type {Array.<node-rdkafka.ConsumerStream>} Instantiated consumers. */
this._consumersStream = [];
});

Expand All @@ -71,9 +72,6 @@ var KafkaAvro = module.exports = CeventEmitter.extend(function(opts) {
*/
KafkaAvro.CODES = Kafka.CODES;

//
// Add Mixins
//
KafkaAvro.mixin(Producer);
KafkaAvro.mixin(Consumer);

Expand Down Expand Up @@ -103,21 +101,23 @@ KafkaAvro.prototype.init = Promise.method(function () {
* @return {Promise} A Promise.
*/
KafkaAvro.prototype.dispose = Promise.method(function () {
const disconnectPromises = [];

var disconnectPromises = [];

log.info('dispose() :: Disposing kafka-avro instance. Total consumers:',
this._consumers.length, 'Total producers:', this._producers.length);
log.info('dispose() :: Disposing kafka-avro instance. Total consumers: noOfConsumers, Total producers: noOfProducers',
{
noOfConsumers: this._consumers.length,
noOfProducers: this._producers.length
});

this._consumers.forEach(function(consumer) {
var discon = Promise.promisify(consumer.disconnect.bind(consumer));
var disconProm = discon().catch(noop);
this._consumers.forEach(function (consumer) {
const discon = Promise.promisify(consumer.disconnect.bind(consumer));
const disconProm = discon().catch(noop);

disconnectPromises.push(disconProm);
});
this._producers.forEach(function(producer) {
var discon = Promise.promisify(producer.disconnect.bind(producer));
var disconProm = discon().catch(noop);
this._producers.forEach(function (producer) {
const discon = Promise.promisify(producer.disconnect.bind(producer));
const disconProm = discon().catch(noop);
disconnectPromises.push(disconProm);
});

Expand Down
Loading