-
Notifications
You must be signed in to change notification settings - Fork 296
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
WIP: Kafka/AVRO output bot. #1251
base: develop
Are you sure you want to change the base?
Conversation
First of my questions: in order to make avro work, i've got to define an avro schema. For IntelMQ, this is a large data type mapping that should be stored as a configuration template. An example can be seen Here. Another thing to note is that for output, especially to avro, fields need to be flattened, and all |
Codecov Report
@@ Coverage Diff @@
## develop #1251 +/- ##
===========================================
- Coverage 75.14% 74.81% -0.33%
===========================================
Files 260 261 +1
Lines 12131 12211 +80
Branches 1623 1637 +14
===========================================
+ Hits 9116 9136 +20
- Misses 2662 2721 +59
- Partials 353 354 +1
|
Where's the question? :D
This is also the case for the elastic search bot, where a parameter has been used to specify the replacement character and |
So the question is simply, where should i put the files in the source so they can be referenced? Should i add them in contrib? For my instance i put config files in /opt/intelmq/var/lib/bots/kafka-output/[key_file.avsc, topics.conf, value_file.avsc] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good
except ImportError: | ||
avro = None | ||
|
||
if avro is not None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this and the two lines above be removed?
except ImportError:
from confluent_kafka.avro import AvroProducer
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm just using this to throw better errors in init() for external dependencies. I can remove them if you'd like.
avro = None | ||
|
||
if avro is not None: | ||
from confluent_kafka.avro import AvroProducer |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this for a different library version or why is it necessary? Please document the reason inline.
And this line is not in a try-except block.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did this again for better error handling as i was writing this. It can probably be removed as there's multiple python packages which fit within the kafka namespace. many of them have Producer class definitions, but no avro class definitions. confluent_kafka is a more specific namespace.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did this again for better error handling as i was writing this.
But an ImportError is not catched here.
if self.enable_avro is False: | ||
self.producer.produce(self.kafka_topic, dumps(event_dict).encode('utf-8'), callback=self.delivery_report) | ||
self.acknowledge_message() | ||
self.kafka.flush() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where is self.kafka
defined?
self.logger.debug('Shipped %s to topic: %s', format(submit_key), format(event_topic)) | ||
self.producer.produce(topic=event_topic, value=event_dict, key=submit_key) | ||
self.acknowledge_message() | ||
self.producer.poll(0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Better do this check before acknowledging and raise an error if that happened so the message does not get lost.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It doesn't look like the AvroProducer has much in terms of error handling, looking at it now, i don't think that poll() does anything. I think i'm just going to have to catch serialization errors on produce.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If a message could not be successfully delivered, self.acknowledge()
should not be called. For error handling (and logging), an Exception should be raised.
AFAIU this is still WIP, right? Changing the Milestone then to the next version, 1.1.0 is now coming soon. |
Yep, still a WIP. Sorry as always for the delay.
…On Thu, Jun 21, 2018, 8:28 AM Wagner ***@***.***> wrote:
AFAIU this is still WIP, right?
Changing the Milestone then to the next version, 1.1.0 is now coming soon.
—
You are receiving this because you authored the thread.
Reply to this email directly, view it on GitHub
<#1251 (comment)>,
or mute the thread
<https://github.com/notifications/unsubscribe-auth/ABWTcJQ9LLUqBtWT6h_k2BeIlb21xCWwks5t-5GLgaJpZM4UZlQp>
.
|
@z0r0 are you still working on this? The PR is in a pretty good shape and it would be a pity if we'd not continue the work. |
Hello, we ended up not implementing it and went with something else at work. That being said, I'm going to finish this one up on my own time in the coming weeks, so stay tuned. |
Sorry to hear that and thanks for your offer to finish it! Please let us know how we can support you. |
This is my first output bot contribution, and should be considered a work in progress.
This output bot goes a bit farther than simply outputting all threat intel to a kafka topic. The idea is that the intelligence can get routed to a topic that's dedicated to its source intelligence type. I'm opening this PR to solicit feedback on ironing out this bot.