-
Notifications
You must be signed in to change notification settings - Fork 34
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support for several event types in the same topic #71
Conversation
…o use type-topicName as a subject for avro schema
…ng multiple types on one topic.
* Using let/const instead of var * Standardizing logging message format * Using template strings * Using standard nodejs err keyword * Reformatting code with default intellij formatter for js
* Using let/const instead of var * Updating circleCi to 2.0
…ng to kafka and schema-registry)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wow, great job. But you do a lot of things, instead of only add the strategies:
- circle2 (thanks!)
- replace dated
var
toconst
, template strings, typos etc. This "replace" itself - including addasync/await
and exampels in readme.md - is a "feature" that demands oneminor
release. - add the TopicRecordNameStrategy strategy
All this could be 3 PR's - it's very hard to review :/ BTW, you are replacing completely the current TopicNameStrategy
by ? (or I miss some point?). If you do, we need to support both strategies since TopicNameStrategy
is the default.
And other few comments.
What do you think?
@@ -101,65 +99,60 @@ Consumer.prototype._onWrapper = function (consumerInstance, eventName, cb) { | |||
return consumerInstance.__kafkaAvro_on(eventName, cb); | |||
} | |||
|
|||
return consumerInstance.__kafkaAvro_on('data', function(message) { | |||
if (!this.sr.keySchemas[message.topic]) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why you remove this guard clause?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's because I've moved away from this mechanism in favor of extracting schemaId from the magic byte. There is the same warning being logged a couple of lines below though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
got it
nvm alias default v8.16 | ||
# Each step uses the same `$BASH_ENV`, so need to modify it | ||
echo 'export NVM_DIR="/opt/circleci/.nvm"' >> $BASH_ENV | ||
echo "[ -s \"$NVM_DIR/nvm.sh\" ] && . \"$NVM_DIR/nvm.sh\"" >> $BASH_ENV |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can simplify this if we don't use nvm
- just raw node. What you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need v8 version of node to build node-rdkafka. Ill see if I can use apt-get to install a concrete version of node.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried but I couldn't get apt-get to install the version I wanted. I've moved back to nvm.
command: |
curl -sL https://deb.nodesource.com/setup_8.x | sudo -E bash -
sudo apt install nodejs
node --version
lib/kafka-producer.js
Outdated
? this.sr.valueSchemas[topicName] | ||
: this.sr.keySchemas[topicName]; | ||
// implementing io.confluent.kafka.serializers.subject.TopicRecordNameStrategy | ||
// FIXME: It will break if code gets minimized |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why the code would be minimized?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What I meant is if you use this library in your project and minimize the project and its dependencies. It'll break because the the type name will be converted to a random string.
lib/magic-byte.js
Outdated
throw new TypeError('Message not serialized with magic byte'); | ||
magicByte.fromMessageBuffer = function (encodedMessage, sr) { | ||
if (!encodedMessage || encodedMessage[0] !== MAGIC_BYTE) { | ||
return null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why are you returning null instead of throw TypeError?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That because it's simpler to me to just check for null instead of doing a try catch block. But it doesn't matter that much to me and it's up to you preferred code style. If you would like I can convert it back to try-catch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yep, it is simple, but the entire code is based in try-catch, let's try to break less possible
lib/magic-byte.js
Outdated
} else { | ||
decoded = sr.schemaTypeById[schemaKey].decode(encodedMessage, 5); | ||
if (!schemaType) { | ||
return null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this shouldnt be return type.decode(encodedMessage, 5);
to keep the previous behavior?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have changed the behaviour here. It was unnecessary since all we need is just the magic byte and sr.schemaTypeById
map.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wow, great job. But you do a lot of things, instead of only add the strategies:
Apologies for that, I needed this functionality and I usually do refactoring while working on something too. I have also needed a working CI to keep track of builds. I did hold off on converting to TypeScript though.
- circle2 (thanks!)
- replace dated
var
toconst
, template strings, typos etc. This "replace" itself - including addasync/await
and exampels in readme.md - is a "feature" that demands oneminor
release.- add the TopicRecordNameStrategy strategy
All this could be 3 PR's - it's very hard to review :/ BTW, you are replacing completely the current
TopicNameStrategy
by ? (or I miss some point?). If you do, we need to support both strategies sinceTopicNameStrategy
is the default.
You are right here, I'll implement all the confluent strategies.
And other few comments.
What do you think?
Thank you for the constructive review. I have implemented most of the comments and I'll implement the other confluent strategies now.
@@ -101,65 +99,60 @@ Consumer.prototype._onWrapper = function (consumerInstance, eventName, cb) { | |||
return consumerInstance.__kafkaAvro_on(eventName, cb); | |||
} | |||
|
|||
return consumerInstance.__kafkaAvro_on('data', function(message) { | |||
if (!this.sr.keySchemas[message.topic]) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's because I've moved away from this mechanism in favor of extracting schemaId from the magic byte. There is the same warning being logged a couple of lines below though.
lib/kafka-producer.js
Outdated
? this.sr.valueSchemas[topicName] | ||
: this.sr.keySchemas[topicName]; | ||
// implementing io.confluent.kafka.serializers.subject.TopicRecordNameStrategy | ||
// FIXME: It will break if code gets minimized |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What I meant is if you use this library in your project and minimize the project and its dependencies. It'll break because the the type name will be converted to a random string.
nvm alias default v8.16 | ||
# Each step uses the same `$BASH_ENV`, so need to modify it | ||
echo 'export NVM_DIR="/opt/circleci/.nvm"' >> $BASH_ENV | ||
echo "[ -s \"$NVM_DIR/nvm.sh\" ] && . \"$NVM_DIR/nvm.sh\"" >> $BASH_ENV |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need v8 version of node to build node-rdkafka. Ill see if I can use apt-get to install a concrete version of node.
lib/magic-byte.js
Outdated
throw new TypeError('Message not serialized with magic byte'); | ||
magicByte.fromMessageBuffer = function (encodedMessage, sr) { | ||
if (!encodedMessage || encodedMessage[0] !== MAGIC_BYTE) { | ||
return null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That because it's simpler to me to just check for null instead of doing a try catch block. But it doesn't matter that much to me and it's up to you preferred code style. If you would like I can convert it back to try-catch.
lib/magic-byte.js
Outdated
} else { | ||
decoded = sr.schemaTypeById[schemaKey].decode(encodedMessage, 5); | ||
if (!schemaType) { | ||
return null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have changed the behaviour here. It was unnecessary since all we need is just the magic byte and sr.schemaTypeById
map.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm thanks for the adjusts. I just replied some comments
About Typescript, there is an issue that can be related with it (#59) - but it would be a complete refactoring, almost a full rewrite. I don't see the advantages of doing this here :/
I dumped 2.0.0 with lidbrdkafka
today, please update your branch and see everything is ok. Then, we can test your version with real scenarios before the merge of possible 3.0.0
@@ -101,65 +99,60 @@ Consumer.prototype._onWrapper = function (consumerInstance, eventName, cb) { | |||
return consumerInstance.__kafkaAvro_on(eventName, cb); | |||
} | |||
|
|||
return consumerInstance.__kafkaAvro_on('data', function(message) { | |||
if (!this.sr.keySchemas[message.topic]) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
got it
lib/magic-byte.js
Outdated
throw new TypeError('Message not serialized with magic byte'); | ||
magicByte.fromMessageBuffer = function (encodedMessage, sr) { | ||
if (!encodedMessage || encodedMessage[0] !== MAGIC_BYTE) { | ||
return null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yep, it is simple, but the entire code is based in try-catch, let's try to break less possible
Done. |
[Node.js dependencies (e.g. require('util'))] [Third Party dependencies (e.g. require('axios'))] [Local dependencies (e.g. require('./local'))]
@ricardohbin I did local testing and it was successfully. Hope this can be merged to upstream. |
@pleszczy nice! I will just do a double check with some large scale production data to see everything is ok - if everything goes ok I will ship in this week. Can you add yourself with this feature description in README CHANGELOG plz? It would be 3.0.0 probably (in concept it could be 2.1.0, but how this PR add a new feature + whole feature, it's better go with caution and avoid breaks) |
Done, thank you. |
@pleszczy I tested here and everything is ok in my test. Thank you very much for this big contribution (feature + code upgrade + CI/CD). Shipping 3.0.0! |
Support for several event types in the same topic
Kafka Avro library will now support several events types in the same topic. This requires using TopicRecordNameStrategy strategy.