Skip to content

Commit

Permalink
[ENH] Using msgpack instead of json
Browse files Browse the repository at this point in the history
Using msgpack instead of json results in faster (de)serialize and
less memory usage. Redis is also capable of msgpack within its lua
api i.e. https://github.com/kengonakajima/lua-msgpack-native.

====== Benchmark =======
JSON median size: 387
MSGPACK median size: 329
------------------------
Diff: 16.20%

JSON
* Serialize: 39286
* Deserialize: 30713

MSGPACK
* Serialize: 23483
* Deserialize: 12602
---------------------
DIFF
* Serialize: 50.35%
* Deserialize: 83.62%

Data extracted from spamhaus-collector
Measurements based on deduplicator-expert
460 events in total process by deducplicator-expert

Signed-off-by: Sebastian Waldbauer <[email protected]>
  • Loading branch information
waldbauer-certat committed Jun 30, 2021
1 parent 36d7940 commit 40e4ae1
Show file tree
Hide file tree
Showing 15 changed files with 106 additions and 57 deletions.
2 changes: 2 additions & 0 deletions debian/control
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ Build-Depends: debhelper (>= 4.1.16),
python3-sphinx-rtd-theme,
python3-termstyle,
python3-tz,
python3-msgpack,
quilt,
rsync,
safe-rm
Expand All @@ -42,6 +43,7 @@ Depends: bash-completion,
python3-ruamel.yaml,
python3-termstyle (>= 0.1.10),
python3-tz,
python3-msgpack,
redis-server,
systemd,
${misc:Depends},
Expand Down
4 changes: 3 additions & 1 deletion intelmq/bots/parsers/json/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ def process(self):
for line in lines:
new_event = MessageFactory.unserialize(line,
harmonization=self.harmonization,
default_type='Event')
default_type='Event',
use_packer="json")

event = self.new_event(report)
event.update(new_event)
if 'raw' not in event:
Expand Down
5 changes: 3 additions & 2 deletions intelmq/lib/bot.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import inspect
import io
import json
import msgpack
import logging
import os
import re
Expand Down Expand Up @@ -320,8 +321,8 @@ def start(self, starting: bool = True, error_on_pipeline: bool = True,
self.logger.error('Pipeline failed.')
self.__disconnect_pipelines()

except exceptions.DecodingError as exc:
self.logger.exception('Could not decode message from pipeline. No retries useful.')
except exceptions.UnserializationError as exc:
self.logger.exception('Could not unserialize message from pipeline. No retries useful.')

# ensure that we do not re-process the faulty message
self.__error_retries_counter = self.error_max_retries + 1
Expand Down
9 changes: 9 additions & 0 deletions intelmq/lib/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,3 +172,12 @@ def __init__(self, encodings=None, exception: UnicodeDecodeError = None,
suffixes.append('with reason %r' % exception.reason)
suffix = (' ' + ' '.join(suffixes)) if suffixes else ''
super().__init__("Could not decode string%s." % suffix)


class UnserializationError(IntelMQException, ValueError):
"""
Unrecoverable error during message unserialization
"""
def __init__(self, exception: Exception = None, object: bytes = None):
self.object = object
super().__init__("Could not unserialize message%s." % exception)
49 changes: 34 additions & 15 deletions intelmq/lib/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import warnings
from collections import defaultdict
from typing import Any, Dict, Iterable, Optional, Sequence, Union
import msgpack

import intelmq.lib.exceptions as exceptions
import intelmq.lib.harmonization
Expand Down Expand Up @@ -58,8 +59,8 @@ def from_dict(message: dict, harmonization=None,
return class_reference(message, auto=True, harmonization=harmonization)

@staticmethod
def unserialize(raw_message: str, harmonization: dict = None,
default_type: Optional[str] = None) -> dict:
def unserialize(raw_message: bytes, harmonization: dict = None,
default_type: Optional[str] = None, use_packer: str = "msgpack") -> dict:
"""
Takes JSON-encoded Message object, returns instance of correct class.
Expand All @@ -72,12 +73,12 @@ def unserialize(raw_message: str, harmonization: dict = None,
MessageFactory.from_dict
MessageFactory.serialize
"""
message = Message.unserialize(raw_message)
message = Message.unserialize(raw_message, use_packer=use_packer)
return MessageFactory.from_dict(message, harmonization=harmonization,
default_type=default_type)

@staticmethod
def serialize(message):
def serialize(message) -> bytes:
"""
Takes instance of message-derived class and makes JSON-encoded Message.
Expand Down Expand Up @@ -125,7 +126,7 @@ def __init__(self, message: Union[dict, tuple] = (), auto: bool = False,
elif isinstance(message, tuple):
self.iterable = dict(message)
else:
raise ValueError("Type %r of message can't be handled, must be dict or tuple.", type(message))
raise ValueError("Type %r of message can't be handled, must be dict or tuple." % type(message))
for key, value in self.iterable.items():
if not self.add(key, value, sanitize=False, raise_failure=False):
self.add(key, value, sanitize=True)
Expand Down Expand Up @@ -308,18 +309,32 @@ def deep_copy(self):
harmonization={self.__class__.__name__.lower(): self.harmonization_config})

def __str__(self):
return self.serialize()
return self.serialize(use_packer="json")

def serialize(self):
self['__type'] = self.__class__.__name__
json_dump = utils.decode(json.dumps(self))
del self['__type']
return json_dump
def serialize(self, use_packer: str = "msgpack"):
delete_type = False
if '__type' not in self:
delete_type = True
self['__type'] = self.__class__.__name__

if use_packer == "json":
packed = json.dumps(self)
else:
packed = msgpack.packb(self)

if delete_type:
del self['__type']
return packed

@staticmethod
def unserialize(message_string: str):
message = json.loads(message_string)
return message
def unserialize(message: bytes, use_packer: str = "msgpack"):
try:
if use_packer == "json":
return json.loads(message)
else:
return msgpack.unpackb(message, raw=False)
except Exception as exc:
raise exceptions.UnserializationError(exception=exc, object=message)

def __is_valid_key(self, key: str):
try:
Expand Down Expand Up @@ -466,14 +481,18 @@ def to_dict(self, hierarchical: bool = False, with_type: bool = False,
json_dict_fp = json_dict_fp[subkey]

for key, value in jsondicts.items():
new_dict[key] = json.dumps(value, ensure_ascii=False)
new_dict[key] = json.dumps(value)

return new_dict

def to_json(self, hierarchical=False, with_type=False, jsondict_as_string=False):
json_dict = self.to_dict(hierarchical=hierarchical, with_type=with_type)
return json.dumps(json_dict, ensure_ascii=False, sort_keys=True)

def to_msgpack(self, hierarchical=False, with_type=False):
msgpack_dict = self.to_dict(hierarchical=hierarchical, with_type=with_type)
return msgpack.packb(msgpack_dict)

def __eq__(self, other: dict) -> bool:
"""
Wrapper is necessary as we have additional members
Expand Down
4 changes: 2 additions & 2 deletions intelmq/lib/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,14 +123,14 @@ def send(self, message: str, path: str = "_default",
path_permissive: bool = False):
raise NotImplementedError

def receive(self) -> str:
def receive(self) -> bytes:
if self._has_message:
raise exceptions.PipelineError("There's already a message, first "
"acknowledge the existing one.")

retval = self._receive()
self._has_message = True
return utils.decode(retval)
return retval

def _receive(self) -> bytes:
raise NotImplementedError
Expand Down
24 changes: 12 additions & 12 deletions intelmq/lib/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import io
import inspect
import json
import msgpack
import os
import re
import unittest
Expand Down Expand Up @@ -152,8 +153,7 @@ def setUpClass(cls):
elif cls.bot_type != 'collector' and cls.default_input_message == '':
cls.default_input_message = {'__type': 'Event'}
if type(cls.default_input_message) is dict:
cls.default_input_message = \
utils.decode(json.dumps(cls.default_input_message))
cls.default_input_message = msgpack.packb(cls.default_input_message)

if cls.use_cache and not os.environ.get('INTELMQ_SKIP_REDIS'):
password = os.environ.get('INTELMQ_TEST_REDIS_PASSWORD') or \
Expand All @@ -170,10 +170,10 @@ def setUpClass(cls):
harmonization = utils.load_configuration(pkg_resources.resource_filename('intelmq',
'etc/harmonization.conf'))

def new_report(self, auto=False, examples=False):
def new_report(self, auto=False, examples=False) -> message.Report:
return message.Report(harmonization=self.harmonization, auto=auto)

def new_event(self):
def new_event(self) -> message.Event:
return message.Event(harmonization=self.harmonization)

def get_mocked_logger(self, logger):
Expand Down Expand Up @@ -242,7 +242,7 @@ def prepare_source_queue(self):
self.input_queue = []
for msg in self.input_message:
if type(msg) is dict:
self.input_queue.append(json.dumps(msg))
self.input_queue.append(message.MessageFactory.serialize(msg))
elif issubclass(type(msg), message.Message):
self.input_queue.append(msg.serialize())
else:
Expand Down Expand Up @@ -320,17 +320,17 @@ def run_bot(self, iterations: int = 1, error_on_pipeline: bool = False,

""" Test if report has required fields. """
if self.bot_type == 'collector':
for report_json in self.get_output_queue():
report = message.MessageFactory.unserialize(report_json,
for report_data in self.get_output_queue():
report = message.MessageFactory.unserialize(report_data,
harmonization=self.harmonization)
self.assertIsInstance(report, message.Report)
self.assertIn('raw', report)
self.assertIn('time.observation', report)

""" Test if event has required fields. """
if self.bot_type == 'parser':
for event_json in self.get_output_queue():
event = message.MessageFactory.unserialize(event_json,
for event_data in self.get_output_queue():
event = message.MessageFactory.unserialize(event_data,
harmonization=self.harmonization)
self.assertIsInstance(event, message.Event)
self.assertIn('classification.type', event)
Expand Down Expand Up @@ -389,7 +389,7 @@ def get_output_queue(self, path="_default"):
"""Getter for items in the output queues of this bot. Use in TestCase scenarios
If there is multiple queues in named queue group, we return all the items chained.
"""
return [utils.decode(text) for text in chain(*[self.pipe.state[x] for x in self.pipe.destination_queues[path]])]
return [text for text in chain(*[self.pipe.state[x] for x in self.pipe.destination_queues[path]])]
# return [utils.decode(text) for text in self.pipe.state["%s-output" % self.bot_id]]

def test_bot_name(self, *args, **kwargs):
Expand Down Expand Up @@ -520,9 +520,9 @@ def assertMessageEqual(self, queue_pos, expected_msg, compare_raw=True, path="_d
given queue position.
"""
event = self.get_output_queue(path=path)[queue_pos]
self.assertIsInstance(event, str)
self.assertIsInstance(event, bytes)

event_dict = json.loads(event)
event_dict = msgpack.unpackb(event, raw=False)
if isinstance(expected_msg, (message.Event, message.Report)):
expected = expected_msg.to_dict(with_type=True)
else:
Expand Down
5 changes: 4 additions & 1 deletion intelmq/tests/bots/collectors/tcp/test_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,10 @@ def test_intelmq_exchange(self):
for i, msg in enumerate(self.get_output_queue()):
report = MessageFactory.unserialize(msg, harmonization=self.harmonization, default_type='Event')

output = MessageFactory.unserialize(utils.base64_decode(report["raw"]), harmonization=self.harmonization, default_type='Event')
output = MessageFactory.unserialize(utils.base64_decode(report["raw"]),
harmonization=self.harmonization,
default_type='Event',
use_packer="json")
self.assertDictEqual(output, INPUT1)

del report['time.observation']
Expand Down
4 changes: 2 additions & 2 deletions intelmq/tests/bots/experts/cymru_whois/test_expert.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
# SPDX-License-Identifier: AGPL-3.0-or-later

# -*- coding: utf-8 -*-
import json
import msgpack
import unittest

import intelmq.lib.test as test
Expand Down Expand Up @@ -93,7 +93,7 @@ def test_6to4_result(self):
"""
self.input_message = EXAMPLE_6TO4_INPUT
self.run_bot()
actual = json.loads(self.get_output_queue()[0])
actual = msgpack.loads(self.get_output_queue()[0])
self.assertDictContainsSubset(EXAMPLE_6TO4_INPUT, actual)
self.assertIn("source.asn", actual)
self.assertIn("source.as_name", actual)
Expand Down
8 changes: 5 additions & 3 deletions intelmq/tests/bots/experts/idea/test_expert.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@
# -*- coding: utf-8 -*-
import unittest
import json
import msgpack

import intelmq.lib.test as test
from intelmq.lib.message import MessageFactory
from intelmq.bots.experts.idea.expert import IdeaExpertBot
from intelmq.lib.harmonization import ClassificationType

Expand Down Expand Up @@ -86,10 +88,10 @@ def test_conversion(self):
# The ID in the generated Idea event is random, so we have to extract
# the data from the "output" field and compare after removing ID's
event = self.get_output_queue()[0]
self.assertIsInstance(event, str)
event_dict = json.loads(event)
self.assertIsInstance(event, bytes)
event_dict = MessageFactory.unserialize(event)
self.assertIsInstance(event_dict, dict)
self.assertTrue("output" in event_dict)
self.assertTrue(b"output" in event_dict)
idea_event = json.loads(event_dict["output"])
self.assertIsInstance(idea_event, dict)
del TEST_OUTPUT1["ID"]
Expand Down
3 changes: 2 additions & 1 deletion intelmq/tests/bots/parsers/json/test_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import base64
import os
import unittest
import json
import msgpack

import intelmq.lib.test as test
from intelmq.bots.parsers.json.parser import JSONParserBot
Expand Down Expand Up @@ -51,7 +53,6 @@
NO_DEFAULT_EVENT = MULTILINE_EVENTS[1].copy()
NO_DEFAULT_EVENT['raw'] = base64.b64encode(b'{"source.ip": "127.0.0.2", "classification.type": "c2-server"}\n').decode()


class TestJSONParserBot(test.BotTestCase, unittest.TestCase):
"""
A TestCase for a MalwareDomainListParserBot.
Expand Down
4 changes: 2 additions & 2 deletions intelmq/tests/lib/test_bot.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def test_encoding_error_on_input_message(self):
"""
self.input_message = b'foo\xc9bar'
self.run_bot(iterations=1, allowed_error_count=1)
self.assertLogMatches(r'.*intelmq\.lib\.exceptions\.DecodingError:.*')
self.assertLogMatches(r'.*intelmq\.lib\.exceptions\.UnserializationError:.*')
self.assertEqual(self.pipe.state['test-bot-input-internal'], [])
self.assertEqual(self.pipe.state['test-bot-input'], [])
self.assertEqual(self.pipe.state['test-bot-output'], [])
Expand All @@ -71,7 +71,7 @@ def test_invalid_value_on_input_message(self):
Test if the bot is dumping / not retrying a message which is impossible to parse.
https://github.com/certtools/intelmq/issues/1765
"""
self.input_message = b'{"source.asn": 0, "__type": "Event"}'
self.input_message = {"source.asn": 0, "__type": "Event"}
self.run_bot(iterations=1, allowed_error_count=1)
self.assertLogMatches(r'.*intelmq\.lib\.exceptions\.InvalidValue:.*')
self.assertEqual(self.pipe.state['test-bot-input-internal'], [])
Expand Down
Loading

0 comments on commit 40e4ae1

Please sign in to comment.