From 40e4ae167be8f840add9bab33159f5cb6096b5a2 Mon Sep 17 00:00:00 2001 From: Sebastian Waldbauer Date: Thu, 24 Jun 2021 13:25:45 +0200 Subject: [PATCH] [ENH] Using msgpack instead of json Using msgpack instead of json results in faster (de)serialize and less memory usage. Redis is also capable of msgpack within its lua api i.e. https://github.com/kengonakajima/lua-msgpack-native. ====== Benchmark ======= JSON median size: 387 MSGPACK median size: 329 ------------------------ Diff: 16.20% JSON * Serialize: 39286 * Deserialize: 30713 MSGPACK * Serialize: 23483 * Deserialize: 12602 --------------------- DIFF * Serialize: 50.35% * Deserialize: 83.62% Data extracted from spamhaus-collector Measurements based on deduplicator-expert 460 events in total process by deducplicator-expert Signed-off-by: Sebastian Waldbauer --- debian/control | 2 + intelmq/bots/parsers/json/parser.py | 4 +- intelmq/lib/bot.py | 5 +- intelmq/lib/exceptions.py | 9 ++++ intelmq/lib/message.py | 49 +++++++++++++------ intelmq/lib/pipeline.py | 4 +- intelmq/lib/test.py | 24 ++++----- .../bots/collectors/tcp/test_collector.py | 5 +- .../bots/experts/cymru_whois/test_expert.py | 4 +- .../tests/bots/experts/idea/test_expert.py | 8 +-- .../tests/bots/parsers/json/test_parser.py | 3 +- intelmq/tests/lib/test_bot.py | 4 +- intelmq/tests/lib/test_message.py | 28 +++++++---- intelmq/tests/lib/test_pipeline.py | 13 ++--- setup.py | 1 + 15 files changed, 106 insertions(+), 57 deletions(-) diff --git a/debian/control b/debian/control index f602a4348..17620c1cc 100644 --- a/debian/control +++ b/debian/control @@ -21,6 +21,7 @@ Build-Depends: debhelper (>= 4.1.16), python3-sphinx-rtd-theme, python3-termstyle, python3-tz, + python3-msgpack, quilt, rsync, safe-rm @@ -42,6 +43,7 @@ Depends: bash-completion, python3-ruamel.yaml, python3-termstyle (>= 0.1.10), python3-tz, + python3-msgpack, redis-server, systemd, ${misc:Depends}, diff --git a/intelmq/bots/parsers/json/parser.py b/intelmq/bots/parsers/json/parser.py index d2b98e89c..4382fc58c 100644 --- a/intelmq/bots/parsers/json/parser.py +++ b/intelmq/bots/parsers/json/parser.py @@ -28,7 +28,9 @@ def process(self): for line in lines: new_event = MessageFactory.unserialize(line, harmonization=self.harmonization, - default_type='Event') + default_type='Event', + use_packer="json") + event = self.new_event(report) event.update(new_event) if 'raw' not in event: diff --git a/intelmq/lib/bot.py b/intelmq/lib/bot.py index 4b2844384..9c7ab4eb3 100644 --- a/intelmq/lib/bot.py +++ b/intelmq/lib/bot.py @@ -17,6 +17,7 @@ import inspect import io import json +import msgpack import logging import os import re @@ -320,8 +321,8 @@ def start(self, starting: bool = True, error_on_pipeline: bool = True, self.logger.error('Pipeline failed.') self.__disconnect_pipelines() - except exceptions.DecodingError as exc: - self.logger.exception('Could not decode message from pipeline. No retries useful.') + except exceptions.UnserializationError as exc: + self.logger.exception('Could not unserialize message from pipeline. No retries useful.') # ensure that we do not re-process the faulty message self.__error_retries_counter = self.error_max_retries + 1 diff --git a/intelmq/lib/exceptions.py b/intelmq/lib/exceptions.py index e3e6d710f..7c149ed85 100644 --- a/intelmq/lib/exceptions.py +++ b/intelmq/lib/exceptions.py @@ -172,3 +172,12 @@ def __init__(self, encodings=None, exception: UnicodeDecodeError = None, suffixes.append('with reason %r' % exception.reason) suffix = (' ' + ' '.join(suffixes)) if suffixes else '' super().__init__("Could not decode string%s." % suffix) + + +class UnserializationError(IntelMQException, ValueError): + """ + Unrecoverable error during message unserialization + """ + def __init__(self, exception: Exception = None, object: bytes = None): + self.object = object + super().__init__("Could not unserialize message%s." % exception) diff --git a/intelmq/lib/message.py b/intelmq/lib/message.py index bdde860bf..0de529e2f 100644 --- a/intelmq/lib/message.py +++ b/intelmq/lib/message.py @@ -14,6 +14,7 @@ import warnings from collections import defaultdict from typing import Any, Dict, Iterable, Optional, Sequence, Union +import msgpack import intelmq.lib.exceptions as exceptions import intelmq.lib.harmonization @@ -58,8 +59,8 @@ def from_dict(message: dict, harmonization=None, return class_reference(message, auto=True, harmonization=harmonization) @staticmethod - def unserialize(raw_message: str, harmonization: dict = None, - default_type: Optional[str] = None) -> dict: + def unserialize(raw_message: bytes, harmonization: dict = None, + default_type: Optional[str] = None, use_packer: str = "msgpack") -> dict: """ Takes JSON-encoded Message object, returns instance of correct class. @@ -72,12 +73,12 @@ def unserialize(raw_message: str, harmonization: dict = None, MessageFactory.from_dict MessageFactory.serialize """ - message = Message.unserialize(raw_message) + message = Message.unserialize(raw_message, use_packer=use_packer) return MessageFactory.from_dict(message, harmonization=harmonization, default_type=default_type) @staticmethod - def serialize(message): + def serialize(message) -> bytes: """ Takes instance of message-derived class and makes JSON-encoded Message. @@ -125,7 +126,7 @@ def __init__(self, message: Union[dict, tuple] = (), auto: bool = False, elif isinstance(message, tuple): self.iterable = dict(message) else: - raise ValueError("Type %r of message can't be handled, must be dict or tuple.", type(message)) + raise ValueError("Type %r of message can't be handled, must be dict or tuple." % type(message)) for key, value in self.iterable.items(): if not self.add(key, value, sanitize=False, raise_failure=False): self.add(key, value, sanitize=True) @@ -308,18 +309,32 @@ def deep_copy(self): harmonization={self.__class__.__name__.lower(): self.harmonization_config}) def __str__(self): - return self.serialize() + return self.serialize(use_packer="json") - def serialize(self): - self['__type'] = self.__class__.__name__ - json_dump = utils.decode(json.dumps(self)) - del self['__type'] - return json_dump + def serialize(self, use_packer: str = "msgpack"): + delete_type = False + if '__type' not in self: + delete_type = True + self['__type'] = self.__class__.__name__ + + if use_packer == "json": + packed = json.dumps(self) + else: + packed = msgpack.packb(self) + + if delete_type: + del self['__type'] + return packed @staticmethod - def unserialize(message_string: str): - message = json.loads(message_string) - return message + def unserialize(message: bytes, use_packer: str = "msgpack"): + try: + if use_packer == "json": + return json.loads(message) + else: + return msgpack.unpackb(message, raw=False) + except Exception as exc: + raise exceptions.UnserializationError(exception=exc, object=message) def __is_valid_key(self, key: str): try: @@ -466,7 +481,7 @@ def to_dict(self, hierarchical: bool = False, with_type: bool = False, json_dict_fp = json_dict_fp[subkey] for key, value in jsondicts.items(): - new_dict[key] = json.dumps(value, ensure_ascii=False) + new_dict[key] = json.dumps(value) return new_dict @@ -474,6 +489,10 @@ def to_json(self, hierarchical=False, with_type=False, jsondict_as_string=False) json_dict = self.to_dict(hierarchical=hierarchical, with_type=with_type) return json.dumps(json_dict, ensure_ascii=False, sort_keys=True) + def to_msgpack(self, hierarchical=False, with_type=False): + msgpack_dict = self.to_dict(hierarchical=hierarchical, with_type=with_type) + return msgpack.packb(msgpack_dict) + def __eq__(self, other: dict) -> bool: """ Wrapper is necessary as we have additional members diff --git a/intelmq/lib/pipeline.py b/intelmq/lib/pipeline.py index 6975bf181..ecae4351c 100644 --- a/intelmq/lib/pipeline.py +++ b/intelmq/lib/pipeline.py @@ -123,14 +123,14 @@ def send(self, message: str, path: str = "_default", path_permissive: bool = False): raise NotImplementedError - def receive(self) -> str: + def receive(self) -> bytes: if self._has_message: raise exceptions.PipelineError("There's already a message, first " "acknowledge the existing one.") retval = self._receive() self._has_message = True - return utils.decode(retval) + return retval def _receive(self) -> bytes: raise NotImplementedError diff --git a/intelmq/lib/test.py b/intelmq/lib/test.py index e577f408f..632a18cbf 100644 --- a/intelmq/lib/test.py +++ b/intelmq/lib/test.py @@ -12,6 +12,7 @@ import io import inspect import json +import msgpack import os import re import unittest @@ -152,8 +153,7 @@ def setUpClass(cls): elif cls.bot_type != 'collector' and cls.default_input_message == '': cls.default_input_message = {'__type': 'Event'} if type(cls.default_input_message) is dict: - cls.default_input_message = \ - utils.decode(json.dumps(cls.default_input_message)) + cls.default_input_message = msgpack.packb(cls.default_input_message) if cls.use_cache and not os.environ.get('INTELMQ_SKIP_REDIS'): password = os.environ.get('INTELMQ_TEST_REDIS_PASSWORD') or \ @@ -170,10 +170,10 @@ def setUpClass(cls): harmonization = utils.load_configuration(pkg_resources.resource_filename('intelmq', 'etc/harmonization.conf')) - def new_report(self, auto=False, examples=False): + def new_report(self, auto=False, examples=False) -> message.Report: return message.Report(harmonization=self.harmonization, auto=auto) - def new_event(self): + def new_event(self) -> message.Event: return message.Event(harmonization=self.harmonization) def get_mocked_logger(self, logger): @@ -242,7 +242,7 @@ def prepare_source_queue(self): self.input_queue = [] for msg in self.input_message: if type(msg) is dict: - self.input_queue.append(json.dumps(msg)) + self.input_queue.append(message.MessageFactory.serialize(msg)) elif issubclass(type(msg), message.Message): self.input_queue.append(msg.serialize()) else: @@ -320,8 +320,8 @@ def run_bot(self, iterations: int = 1, error_on_pipeline: bool = False, """ Test if report has required fields. """ if self.bot_type == 'collector': - for report_json in self.get_output_queue(): - report = message.MessageFactory.unserialize(report_json, + for report_data in self.get_output_queue(): + report = message.MessageFactory.unserialize(report_data, harmonization=self.harmonization) self.assertIsInstance(report, message.Report) self.assertIn('raw', report) @@ -329,8 +329,8 @@ def run_bot(self, iterations: int = 1, error_on_pipeline: bool = False, """ Test if event has required fields. """ if self.bot_type == 'parser': - for event_json in self.get_output_queue(): - event = message.MessageFactory.unserialize(event_json, + for event_data in self.get_output_queue(): + event = message.MessageFactory.unserialize(event_data, harmonization=self.harmonization) self.assertIsInstance(event, message.Event) self.assertIn('classification.type', event) @@ -389,7 +389,7 @@ def get_output_queue(self, path="_default"): """Getter for items in the output queues of this bot. Use in TestCase scenarios If there is multiple queues in named queue group, we return all the items chained. """ - return [utils.decode(text) for text in chain(*[self.pipe.state[x] for x in self.pipe.destination_queues[path]])] + return [text for text in chain(*[self.pipe.state[x] for x in self.pipe.destination_queues[path]])] # return [utils.decode(text) for text in self.pipe.state["%s-output" % self.bot_id]] def test_bot_name(self, *args, **kwargs): @@ -520,9 +520,9 @@ def assertMessageEqual(self, queue_pos, expected_msg, compare_raw=True, path="_d given queue position. """ event = self.get_output_queue(path=path)[queue_pos] - self.assertIsInstance(event, str) + self.assertIsInstance(event, bytes) - event_dict = json.loads(event) + event_dict = msgpack.unpackb(event, raw=False) if isinstance(expected_msg, (message.Event, message.Report)): expected = expected_msg.to_dict(with_type=True) else: diff --git a/intelmq/tests/bots/collectors/tcp/test_collector.py b/intelmq/tests/bots/collectors/tcp/test_collector.py index 449d41391..954b9f084 100644 --- a/intelmq/tests/bots/collectors/tcp/test_collector.py +++ b/intelmq/tests/bots/collectors/tcp/test_collector.py @@ -123,7 +123,10 @@ def test_intelmq_exchange(self): for i, msg in enumerate(self.get_output_queue()): report = MessageFactory.unserialize(msg, harmonization=self.harmonization, default_type='Event') - output = MessageFactory.unserialize(utils.base64_decode(report["raw"]), harmonization=self.harmonization, default_type='Event') + output = MessageFactory.unserialize(utils.base64_decode(report["raw"]), + harmonization=self.harmonization, + default_type='Event', + use_packer="json") self.assertDictEqual(output, INPUT1) del report['time.observation'] diff --git a/intelmq/tests/bots/experts/cymru_whois/test_expert.py b/intelmq/tests/bots/experts/cymru_whois/test_expert.py index 8d109e694..70343e701 100644 --- a/intelmq/tests/bots/experts/cymru_whois/test_expert.py +++ b/intelmq/tests/bots/experts/cymru_whois/test_expert.py @@ -3,7 +3,7 @@ # SPDX-License-Identifier: AGPL-3.0-or-later # -*- coding: utf-8 -*- -import json +import msgpack import unittest import intelmq.lib.test as test @@ -93,7 +93,7 @@ def test_6to4_result(self): """ self.input_message = EXAMPLE_6TO4_INPUT self.run_bot() - actual = json.loads(self.get_output_queue()[0]) + actual = msgpack.loads(self.get_output_queue()[0]) self.assertDictContainsSubset(EXAMPLE_6TO4_INPUT, actual) self.assertIn("source.asn", actual) self.assertIn("source.as_name", actual) diff --git a/intelmq/tests/bots/experts/idea/test_expert.py b/intelmq/tests/bots/experts/idea/test_expert.py index 53c1f392d..5a02ef7fe 100644 --- a/intelmq/tests/bots/experts/idea/test_expert.py +++ b/intelmq/tests/bots/experts/idea/test_expert.py @@ -5,8 +5,10 @@ # -*- coding: utf-8 -*- import unittest import json +import msgpack import intelmq.lib.test as test +from intelmq.lib.message import MessageFactory from intelmq.bots.experts.idea.expert import IdeaExpertBot from intelmq.lib.harmonization import ClassificationType @@ -86,10 +88,10 @@ def test_conversion(self): # The ID in the generated Idea event is random, so we have to extract # the data from the "output" field and compare after removing ID's event = self.get_output_queue()[0] - self.assertIsInstance(event, str) - event_dict = json.loads(event) + self.assertIsInstance(event, bytes) + event_dict = MessageFactory.unserialize(event) self.assertIsInstance(event_dict, dict) - self.assertTrue("output" in event_dict) + self.assertTrue(b"output" in event_dict) idea_event = json.loads(event_dict["output"]) self.assertIsInstance(idea_event, dict) del TEST_OUTPUT1["ID"] diff --git a/intelmq/tests/bots/parsers/json/test_parser.py b/intelmq/tests/bots/parsers/json/test_parser.py index c18d18dd0..2c83658ed 100644 --- a/intelmq/tests/bots/parsers/json/test_parser.py +++ b/intelmq/tests/bots/parsers/json/test_parser.py @@ -6,6 +6,8 @@ import base64 import os import unittest +import json +import msgpack import intelmq.lib.test as test from intelmq.bots.parsers.json.parser import JSONParserBot @@ -51,7 +53,6 @@ NO_DEFAULT_EVENT = MULTILINE_EVENTS[1].copy() NO_DEFAULT_EVENT['raw'] = base64.b64encode(b'{"source.ip": "127.0.0.2", "classification.type": "c2-server"}\n').decode() - class TestJSONParserBot(test.BotTestCase, unittest.TestCase): """ A TestCase for a MalwareDomainListParserBot. diff --git a/intelmq/tests/lib/test_bot.py b/intelmq/tests/lib/test_bot.py index b7b16192a..b8c71ec81 100644 --- a/intelmq/tests/lib/test_bot.py +++ b/intelmq/tests/lib/test_bot.py @@ -61,7 +61,7 @@ def test_encoding_error_on_input_message(self): """ self.input_message = b'foo\xc9bar' self.run_bot(iterations=1, allowed_error_count=1) - self.assertLogMatches(r'.*intelmq\.lib\.exceptions\.DecodingError:.*') + self.assertLogMatches(r'.*intelmq\.lib\.exceptions\.UnserializationError:.*') self.assertEqual(self.pipe.state['test-bot-input-internal'], []) self.assertEqual(self.pipe.state['test-bot-input'], []) self.assertEqual(self.pipe.state['test-bot-output'], []) @@ -71,7 +71,7 @@ def test_invalid_value_on_input_message(self): Test if the bot is dumping / not retrying a message which is impossible to parse. https://github.com/certtools/intelmq/issues/1765 """ - self.input_message = b'{"source.asn": 0, "__type": "Event"}' + self.input_message = {"source.asn": 0, "__type": "Event"} self.run_bot(iterations=1, allowed_error_count=1) self.assertLogMatches(r'.*intelmq\.lib\.exceptions\.InvalidValue:.*') self.assertEqual(self.pipe.state['test-bot-input-internal'], []) diff --git a/intelmq/tests/lib/test_message.py b/intelmq/tests/lib/test_message.py index b3f5c649d..4c079b959 100644 --- a/intelmq/tests/lib/test_message.py +++ b/intelmq/tests/lib/test_message.py @@ -11,6 +11,7 @@ but has a valid Harmonization configuration. """ import json +import msgpack import unittest import pkg_resources @@ -158,12 +159,12 @@ def test_event_ne_different_config(self): def test_invalid_type(self): """ Test if Message raises InvalidArgument for invalid type. """ with self.assertRaises(exceptions.InvalidArgument): - message.MessageFactory.unserialize('{"__type": "Message"}', harmonization=HARM) + message.MessageFactory.unserialize(msgpack.dumps({"__type": "Message"}), harmonization=HARM) def test_invalid_type2(self): """ Test if MessageFactory raises InvalidArgument for invalid type. """ with self.assertRaises(exceptions.InvalidArgument): - message.MessageFactory.unserialize('{"__type": "Invalid"}', harmonization=HARM) + message.MessageFactory.unserialize(msgpack.dumps({"__type": "Invalid"}), harmonization=HARM) def test_report_invalid_key(self): """ Test if report raises InvalidKey for invalid key in add(). """ @@ -365,10 +366,8 @@ def test_factory_serialize(self): report.add('feed.url', URL_SANE) report.add('raw', LOREM_BASE64, sanitize=False) actual = message.MessageFactory.serialize(report) - expected = ('{"raw": "bG9yZW0gaXBzdW0=", "__type": "Report", "feed.url' - '": "https://example.com/", "feed.name": "Example"}') - self.assertDictEqual(json.loads(expected), - json.loads(actual)) + expected = (b'\x84\xa9feed.name\xa7Example\xa8feed.url\xb4https://example.com/\xa3raw\xb0bG9yZW0gaXBzdW0=\xa6__type\xa6Report') + self.assertDictEqual(msgpack.unpackb(expected), msgpack.unpackb(actual)) def test_deep_copy_content(self): """ Test if deep_copy does return the same items. """ @@ -518,22 +517,31 @@ def test_event_json_hierarchical(self): '{"observation": "2015-01-01T13:37:00+00:00"}}') self.assertDictEqual(json.loads(expected), json.loads(actual)) + def test_event_msgpack(self): + """ Test event to_msgpack """ + event = self.new_event() + event = self.add_event_examples(event) + actual = event.to_msgpack() + self.assertIsInstance(actual, bytes) + excepted = (b'\x84\xa9feed.name\xa7Example\xa8feed.url\xb4https://example.com/\xa3raw\xb0bG9yZW0gaXBzdW0=\xb0time.observation\xb92015-01-01T13:37:00+00:00') + self.assertDictEqual(msgpack.unpackb(excepted), msgpack.unpackb(actual)) + def test_event_serialize(self): """ Test Event serialize. """ event = self.new_event() - self.assertEqual('{"__type": "Event"}', + self.assertEqual(b'\x81\xa6__type\xa5Event', event.serialize()) def test_event_string(self): """ Test Event serialize. """ event = self.new_event() - self.assertEqual('{"__type": "Event"}', + self.assertEqual(b'\x81\xa6__type\xa5Event', event.serialize()) def test_event_unicode(self): """ Test Event serialize. """ event = self.new_event() - self.assertEqual('{"__type": "Event"}', + self.assertEqual(b'\x81\xa6__type\xa5Event', event.serialize()) def test_event_from_report(self): @@ -599,7 +607,7 @@ def test_event_init_check_tuple(self): def test_event_init(self): """ Test if initialization method checks fields. """ - event = '{"__type": "Event", "source.asn": "foo"}' + event = msgpack.dumps({"__type": "Event", "source.asn": "foo"}) with self.assertRaises(exceptions.InvalidValue): message.MessageFactory.unserialize(event, harmonization=HARM) diff --git a/intelmq/tests/lib/test_pipeline.py b/intelmq/tests/lib/test_pipeline.py index 39f75eb0f..08adb7c0e 100644 --- a/intelmq/tests/lib/test_pipeline.py +++ b/intelmq/tests/lib/test_pipeline.py @@ -20,6 +20,7 @@ import intelmq.lib.pipeline as pipeline import intelmq.lib.test as test import intelmq.lib.exceptions as exceptions +import intelmq.lib.utils as utils SAMPLES = {'normal': [b'Lorem ipsum dolor sit amet', 'Lorem ipsum dolor sit amet'], @@ -67,7 +68,7 @@ def setUp(self): def test_receive(self): self.pipe.state['test-bot-input'] = [SAMPLES['normal'][0]] - self.assertEqual(SAMPLES['normal'][1], self.pipe.receive()) + self.assertEqual(SAMPLES['normal'][1], utils.decode(self.pipe.receive())) def test_send(self): self.pipe.send(SAMPLES['normal'][1]) @@ -76,7 +77,7 @@ def test_send(self): def test_receive_unicode(self): self.pipe.state['test-bot-input'] = [SAMPLES['unicode'][0]] - self.assertEqual(SAMPLES['unicode'][1], self.pipe.receive()) + self.assertEqual(SAMPLES['unicode'][1], utils.decode(self.pipe.receive())) def test_send_unicode(self): self.pipe.send(SAMPLES['unicode'][1]) @@ -107,7 +108,7 @@ def test_reject(self): self.pipe.state['test-bot-input'] = [SAMPLES['normal'][0]] self.pipe.receive() self.pipe.reject_message() - self.assertEqual(SAMPLES['normal'][1], self.pipe.receive()) + self.assertEqual(SAMPLES['normal'][1], utils.decode(self.pipe.receive())) def test_acknowledge(self): self.pipe.state['test-bot-input'] = [SAMPLES['normal'][0]] @@ -161,12 +162,12 @@ def test_send_receive(self): """ Sending bytest and receiving unicode. """ self.clear() self.pipe.send(SAMPLES['normal'][0]) - self.assertEqual(SAMPLES['normal'][1], self.pipe.receive()) + self.assertEqual(SAMPLES['normal'][1], utils.decode(self.pipe.receive())) def test_send_receive_unicode(self): self.clear() self.pipe.send(SAMPLES['unicode'][1]) - self.assertEqual(SAMPLES['unicode'][1], self.pipe.receive()) + self.assertEqual(SAMPLES['unicode'][1], utils.decode(self.pipe.receive())) def test_count(self): self.clear() @@ -185,7 +186,7 @@ def test_reject(self): self.pipe.send(SAMPLES['normal'][0]) self.pipe.receive() self.pipe.reject_message() - self.assertEqual(SAMPLES['normal'][1], self.pipe.receive()) + self.assertEqual(SAMPLES['normal'][1], utils.decode(self.pipe.receive())) def test_acknowledge(self): self.pipe.send(SAMPLES['normal'][0]) diff --git a/setup.py b/setup.py index 32469384e..df2111a83 100644 --- a/setup.py +++ b/setup.py @@ -20,6 +20,7 @@ 'redis>=2.10', 'requests>=2.2.0', 'ruamel.yaml', + 'msgpack>=0.5', ] exec(open(os.path.join(os.path.dirname(__file__),