diff --git a/debian/control b/debian/control index 3548403c7..f69dc30ea 100644 --- a/debian/control +++ b/debian/control @@ -20,6 +20,7 @@ Build-Depends: debhelper (>= 4.1.16), python3-sphinx-rtd-theme, python3-termstyle, python3-tz, + python3-msgpack, quilt, rsync, safe-rm @@ -40,6 +41,7 @@ Depends: bash-completion, python3-requests (>= 2.2.1), python3-termstyle (>= 0.1.10), python3-tz, + python3-msgpack, redis-server, systemd, ${misc:Depends}, diff --git a/intelmq/bots/parsers/json/parser.py b/intelmq/bots/parsers/json/parser.py index 1a1cdc824..47eaf97ec 100644 --- a/intelmq/bots/parsers/json/parser.py +++ b/intelmq/bots/parsers/json/parser.py @@ -25,7 +25,9 @@ def process(self): for line in lines: new_event = MessageFactory.unserialize(line, harmonization=self.harmonization, - default_type='Event') + default_type='Event', + use_packer="json") + event = self.new_event(report) event.update(new_event) if 'raw' not in event: diff --git a/intelmq/lib/bot.py b/intelmq/lib/bot.py index 80b046414..4d21f947a 100644 --- a/intelmq/lib/bot.py +++ b/intelmq/lib/bot.py @@ -13,6 +13,7 @@ import inspect import io import json +import msgpack import logging import os import re @@ -318,8 +319,8 @@ def start(self, starting: bool = True, error_on_pipeline: bool = True, self.logger.error('Pipeline failed.') self.__disconnect_pipelines() - except exceptions.DecodingError as exc: - self.logger.exception('Could not decode message from pipeline. No retries useful.') + except exceptions.UnserializationError as exc: + self.logger.exception('Could not unserialize message from pipeline. No retries useful.') # ensure that we do not re-process the faulty message self.__error_retries_counter = self.error_max_retries + 1 diff --git a/intelmq/lib/exceptions.py b/intelmq/lib/exceptions.py index e5d5afd95..0037e4764 100644 --- a/intelmq/lib/exceptions.py +++ b/intelmq/lib/exceptions.py @@ -168,3 +168,12 @@ def __init__(self, encodings=None, exception: UnicodeDecodeError = None, suffixes.append('with reason %r' % exception.reason) suffix = (' ' + ' '.join(suffixes)) if suffixes else '' super().__init__("Could not decode string%s." % suffix) + + +class UnserializationError(IntelMQException, ValueError): + """ + Unrecoverable error during message unserialization + """ + def __init__(self, exception: Exception = None, object: bytes = None): + self.object = object + super().__init__("Could not unserialize message%s." % exception) diff --git a/intelmq/lib/message.py b/intelmq/lib/message.py index 2b330ccca..ae16a96be 100644 --- a/intelmq/lib/message.py +++ b/intelmq/lib/message.py @@ -10,6 +10,7 @@ import warnings from collections import defaultdict from typing import Any, Dict, Iterable, Optional, Sequence, Union +import msgpack import intelmq.lib.exceptions as exceptions import intelmq.lib.harmonization @@ -54,8 +55,8 @@ def from_dict(message: dict, harmonization=None, return class_reference(message, auto=True, harmonization=harmonization) @staticmethod - def unserialize(raw_message: str, harmonization: dict = None, - default_type: Optional[str] = None) -> dict: + def unserialize(raw_message: bytes, harmonization: dict = None, + default_type: Optional[str] = None, use_packer: str = "msgpack") -> dict: """ Takes JSON-encoded Message object, returns instance of correct class. @@ -68,12 +69,12 @@ def unserialize(raw_message: str, harmonization: dict = None, MessageFactory.from_dict MessageFactory.serialize """ - message = Message.unserialize(raw_message) + message = Message.unserialize(raw_message, use_packer=use_packer) return MessageFactory.from_dict(message, harmonization=harmonization, default_type=default_type) @staticmethod - def serialize(message): + def serialize(message) -> bytes: """ Takes instance of message-derived class and makes JSON-encoded Message. @@ -121,7 +122,7 @@ def __init__(self, message: Union[dict, tuple] = (), auto: bool = False, elif isinstance(message, tuple): self.iterable = dict(message) else: - raise ValueError("Type %r of message can't be handled, must be dict or tuple.", type(message)) + raise ValueError("Type %r of message can't be handled, must be dict or tuple." % type(message)) for key, value in self.iterable.items(): if not self.add(key, value, sanitize=False, raise_failure=False): self.add(key, value, sanitize=True) @@ -304,18 +305,32 @@ def deep_copy(self): harmonization={self.__class__.__name__.lower(): self.harmonization_config}) def __str__(self): - return self.serialize() + return self.serialize(use_packer="json") - def serialize(self): - self['__type'] = self.__class__.__name__ - json_dump = utils.decode(json.dumps(self)) - del self['__type'] - return json_dump + def serialize(self, use_packer: str = "msgpack"): + delete_type = False + if '__type' not in self: + delete_type = True + self['__type'] = self.__class__.__name__ + + if use_packer == "json": + packed = json.dumps(self) + else: + packed = msgpack.packb(self) + + if delete_type: + del self['__type'] + return packed @staticmethod - def unserialize(message_string: str): - message = json.loads(message_string) - return message + def unserialize(message: bytes, use_packer: str = "msgpack"): + try: + if use_packer == "json": + return json.loads(message) + else: + return msgpack.unpackb(message) + except Exception as exc: + raise exceptions.UnserializationError(exception=exc, object=message) def __is_valid_key(self, key: str): try: @@ -462,7 +477,7 @@ def to_dict(self, hierarchical: bool = False, with_type: bool = False, json_dict_fp = json_dict_fp[subkey] for key, value in jsondicts.items(): - new_dict[key] = json.dumps(value, ensure_ascii=False) + new_dict[key] = json.dumps(value) return new_dict @@ -470,6 +485,10 @@ def to_json(self, hierarchical=False, with_type=False, jsondict_as_string=False) json_dict = self.to_dict(hierarchical=hierarchical, with_type=with_type) return json.dumps(json_dict, ensure_ascii=False, sort_keys=True) + def to_msgpack(self, hierarchical=False, with_type=False): + msgpack_dict = self.to_dict(hierarchical=hierarchical, with_type=with_type) + return msgpack.packb(msgpack_dict) + def __eq__(self, other: dict) -> bool: """ Wrapper is necessary as we have additional members diff --git a/intelmq/lib/pipeline.py b/intelmq/lib/pipeline.py index 496c0235e..3e1decf7d 100644 --- a/intelmq/lib/pipeline.py +++ b/intelmq/lib/pipeline.py @@ -117,14 +117,14 @@ def send(self, message: str, path: str = "_default", path_permissive: bool = False): raise NotImplementedError - def receive(self) -> str: + def receive(self) -> bytes: if self._has_message: raise exceptions.PipelineError("There's already a message, first " "acknowledge the existing one.") retval = self._receive() self._has_message = True - return utils.decode(retval) + return retval def _receive(self) -> bytes: raise NotImplementedError diff --git a/intelmq/lib/test.py b/intelmq/lib/test.py index a6fb8555f..ab0679a5e 100644 --- a/intelmq/lib/test.py +++ b/intelmq/lib/test.py @@ -8,6 +8,7 @@ import io import inspect import json +import msgpack import os import re import unittest @@ -150,8 +151,7 @@ def setUpClass(cls): elif cls.bot_type != 'collector' and cls.default_input_message == '': cls.default_input_message = {'__type': 'Event'} if type(cls.default_input_message) is dict: - cls.default_input_message = \ - utils.decode(json.dumps(cls.default_input_message)) + cls.default_input_message = msgpack.packb(cls.default_input_message) if cls.use_cache and not os.environ.get('INTELMQ_SKIP_REDIS'): password = os.environ.get('INTELMQ_TEST_REDIS_PASSWORD') or \ @@ -168,10 +168,10 @@ def setUpClass(cls): harmonization = utils.load_configuration(pkg_resources.resource_filename('intelmq', 'etc/harmonization.conf')) - def new_report(self, auto=False, examples=False): + def new_report(self, auto=False, examples=False) -> message.Report: return message.Report(harmonization=self.harmonization, auto=auto) - def new_event(self): + def new_event(self) -> message.Event: return message.Event(harmonization=self.harmonization) def get_mocked_logger(self, logger): @@ -240,7 +240,7 @@ def prepare_bot(self, parameters={}, destination_queues=None): self.input_queue = [] for msg in self.input_message: if type(msg) is dict: - self.input_queue.append(json.dumps(msg)) + self.input_queue.append(message.MessageFactory.serialize(msg)) elif issubclass(type(msg), message.Message): self.input_queue.append(msg.serialize()) else: @@ -315,8 +315,8 @@ def run_bot(self, iterations: int = 1, error_on_pipeline: bool = False, """ Test if report has required fields. """ if self.bot_type == 'collector': - for report_json in self.get_output_queue(): - report = message.MessageFactory.unserialize(report_json, + for report_data in self.get_output_queue(): + report = message.MessageFactory.unserialize(report_data, harmonization=self.harmonization) self.assertIsInstance(report, message.Report) self.assertIn('raw', report) @@ -324,8 +324,8 @@ def run_bot(self, iterations: int = 1, error_on_pipeline: bool = False, """ Test if event has required fields. """ if self.bot_type == 'parser': - for event_json in self.get_output_queue(): - event = message.MessageFactory.unserialize(event_json, + for event_data in self.get_output_queue(): + event = message.MessageFactory.unserialize(event_data, harmonization=self.harmonization) self.assertIsInstance(event, message.Event) self.assertIn('classification.type', event) @@ -383,7 +383,7 @@ def get_output_queue(self, path="_default"): """Getter for items in the output queues of this bot. Use in TestCase scenarios If there is multiple queues in named queue group, we return all the items chained. """ - return [utils.decode(text) for text in chain(*[self.pipe.state[x] for x in self.pipe.destination_queues[path]])] + return [text for text in chain(*[self.pipe.state[x] for x in self.pipe.destination_queues[path]])] # return [utils.decode(text) for text in self.pipe.state["%s-output" % self.bot_id]] def test_bot_name(self, *args, **kwargs): @@ -514,9 +514,9 @@ def assertMessageEqual(self, queue_pos, expected_msg, compare_raw=True, path="_d given queue position. """ event = self.get_output_queue(path=path)[queue_pos] - self.assertIsInstance(event, str) + self.assertIsInstance(event, bytes) - event_dict = json.loads(event) + event_dict = msgpack.unpackb(event) if isinstance(expected_msg, (message.Event, message.Report)): expected = expected_msg.to_dict(with_type=True) else: diff --git a/intelmq/tests/bots/collectors/tcp/test_collector.py b/intelmq/tests/bots/collectors/tcp/test_collector.py index 4ccdb5a62..67b193a79 100644 --- a/intelmq/tests/bots/collectors/tcp/test_collector.py +++ b/intelmq/tests/bots/collectors/tcp/test_collector.py @@ -119,7 +119,10 @@ def test_intelmq_exchange(self): for i, msg in enumerate(self.get_output_queue()): report = MessageFactory.unserialize(msg, harmonization=self.harmonization, default_type='Event') - output = MessageFactory.unserialize(utils.base64_decode(report["raw"]), harmonization=self.harmonization, default_type='Event') + output = MessageFactory.unserialize(utils.base64_decode(report["raw"]), + harmonization=self.harmonization, + default_type='Event', + use_packer="json") self.assertDictEqual(output, INPUT1) del report['time.observation'] diff --git a/intelmq/tests/bots/experts/cymru_whois/test_expert.py b/intelmq/tests/bots/experts/cymru_whois/test_expert.py index b14afd692..17d39e3fb 100644 --- a/intelmq/tests/bots/experts/cymru_whois/test_expert.py +++ b/intelmq/tests/bots/experts/cymru_whois/test_expert.py @@ -1,5 +1,5 @@ # -*- coding: utf-8 -*- -import json +import msgpack import unittest import intelmq.lib.test as test @@ -89,7 +89,7 @@ def test_6to4_result(self): """ self.input_message = EXAMPLE_6TO4_INPUT self.run_bot() - actual = json.loads(self.get_output_queue()[0]) + actual = msgpack.loads(self.get_output_queue()[0]) self.assertDictContainsSubset(EXAMPLE_6TO4_INPUT, actual) self.assertIn("source.asn", actual) self.assertIn("source.as_name", actual) diff --git a/intelmq/tests/bots/experts/idea/test_expert.py b/intelmq/tests/bots/experts/idea/test_expert.py index 5ab1b5e11..8a619f61c 100644 --- a/intelmq/tests/bots/experts/idea/test_expert.py +++ b/intelmq/tests/bots/experts/idea/test_expert.py @@ -1,6 +1,7 @@ # -*- coding: utf-8 -*- import unittest import json +import msgpack import intelmq.lib.test as test from intelmq.bots.experts.idea.expert import IdeaExpertBot @@ -82,8 +83,8 @@ def test_conversion(self): # The ID in the generated Idea event is random, so we have to extract # the data from the "output" field and compare after removing ID's event = self.get_output_queue()[0] - self.assertIsInstance(event, str) - event_dict = json.loads(event) + self.assertIsInstance(event, bytes) + event_dict = msgpack.loads(event) self.assertIsInstance(event_dict, dict) self.assertTrue("output" in event_dict) idea_event = json.loads(event_dict["output"]) diff --git a/intelmq/tests/bots/parsers/json/test_parser.py b/intelmq/tests/bots/parsers/json/test_parser.py index 017951858..268d3d02c 100644 --- a/intelmq/tests/bots/parsers/json/test_parser.py +++ b/intelmq/tests/bots/parsers/json/test_parser.py @@ -2,6 +2,8 @@ import base64 import os import unittest +import json +import msgpack import intelmq.lib.test as test from intelmq.bots.parsers.json.parser import JSONParserBot @@ -47,7 +49,6 @@ NO_DEFAULT_EVENT = MULTILINE_EVENTS[1].copy() NO_DEFAULT_EVENT['raw'] = base64.b64encode(b'{"source.ip": "127.0.0.2", "classification.type": "c2-server"}\n').decode() - class TestJSONParserBot(test.BotTestCase, unittest.TestCase): """ A TestCase for a MalwareDomainListParserBot. diff --git a/intelmq/tests/lib/test_bot.py b/intelmq/tests/lib/test_bot.py index 3c8214e31..244322fa0 100644 --- a/intelmq/tests/lib/test_bot.py +++ b/intelmq/tests/lib/test_bot.py @@ -57,7 +57,7 @@ def test_encoding_error_on_input_message(self): """ self.input_message = b'foo\xc9bar' self.run_bot(iterations=1, allowed_error_count=1) - self.assertLogMatches(r'.*intelmq\.lib\.exceptions\.DecodingError:.*') + self.assertLogMatches(r'.*intelmq\.lib\.exceptions\.UnserializationError:.*') self.assertEqual(self.pipe.state['test-bot-input-internal'], []) self.assertEqual(self.pipe.state['test-bot-input'], []) self.assertEqual(self.pipe.state['test-bot-output'], []) @@ -67,7 +67,7 @@ def test_invalid_value_on_input_message(self): Test if the bot is dumping / not retrying a message which is impossible to parse. https://github.com/certtools/intelmq/issues/1765 """ - self.input_message = b'{"source.asn": 0, "__type": "Event"}' + self.input_message = {"source.asn": 0, "__type": "Event"} self.run_bot(iterations=1, allowed_error_count=1) self.assertLogMatches(r'.*intelmq\.lib\.exceptions\.InvalidValue:.*') self.assertEqual(self.pipe.state['test-bot-input-internal'], []) diff --git a/intelmq/tests/lib/test_message.py b/intelmq/tests/lib/test_message.py index e60ac990b..56e7c2d32 100644 --- a/intelmq/tests/lib/test_message.py +++ b/intelmq/tests/lib/test_message.py @@ -7,6 +7,7 @@ but has a valid Harmonization configuration. """ import json +import msgpack import unittest import pkg_resources @@ -154,12 +155,12 @@ def test_event_ne_different_config(self): def test_invalid_type(self): """ Test if Message raises InvalidArgument for invalid type. """ with self.assertRaises(exceptions.InvalidArgument): - message.MessageFactory.unserialize('{"__type": "Message"}', harmonization=HARM) + message.MessageFactory.unserialize(msgpack.dumps({"__type": "Message"}), harmonization=HARM) def test_invalid_type2(self): """ Test if MessageFactory raises InvalidArgument for invalid type. """ with self.assertRaises(exceptions.InvalidArgument): - message.MessageFactory.unserialize('{"__type": "Invalid"}', harmonization=HARM) + message.MessageFactory.unserialize(msgpack.dumps({"__type": "Invalid"}), harmonization=HARM) def test_report_invalid_key(self): """ Test if report raises InvalidKey for invalid key in add(). """ @@ -361,10 +362,8 @@ def test_factory_serialize(self): report.add('feed.url', URL_SANE) report.add('raw', LOREM_BASE64, sanitize=False) actual = message.MessageFactory.serialize(report) - expected = ('{"raw": "bG9yZW0gaXBzdW0=", "__type": "Report", "feed.url' - '": "https://example.com/", "feed.name": "Example"}') - self.assertDictEqual(json.loads(expected), - json.loads(actual)) + expected = (b'\x84\xa9feed.name\xa7Example\xa8feed.url\xb4https://example.com/\xa3raw\xb0bG9yZW0gaXBzdW0=\xa6__type\xa6Report') + self.assertDictEqual(msgpack.unpackb(expected), msgpack.unpackb(actual)) def test_deep_copy_content(self): """ Test if deep_copy does return the same items. """ @@ -514,22 +513,32 @@ def test_event_json_hierarchical(self): '{"observation": "2015-01-01T13:37:00+00:00"}}') self.assertDictEqual(json.loads(expected), json.loads(actual)) + def test_event_msgpack(self): + """ Test event to_msgpack """ + event = self.new_event() + event = self.add_event_examples(event) + actual = event.to_msgpack() + print(actual) + self.assertIsInstance(actual, bytes) + excepted = (b'\x84\xa9feed.name\xa7Example\xa8feed.url\xb4https://example.com/\xa3raw\xb0bG9yZW0gaXBzdW0=\xb0time.observation\xb92015-01-01T13:37:00+00:00') + self.assertDictEqual(msgpack.unpackb(excepted), msgpack.unpackb(actual)) + def test_event_serialize(self): """ Test Event serialize. """ event = self.new_event() - self.assertEqual('{"__type": "Event"}', + self.assertEqual(b'\x81\xa6__type\xa5Event', event.serialize()) def test_event_string(self): """ Test Event serialize. """ event = self.new_event() - self.assertEqual('{"__type": "Event"}', + self.assertEqual(b'\x81\xa6__type\xa5Event', event.serialize()) def test_event_unicode(self): """ Test Event serialize. """ event = self.new_event() - self.assertEqual('{"__type": "Event"}', + self.assertEqual(b'\x81\xa6__type\xa5Event', event.serialize()) def test_event_from_report(self): @@ -595,7 +604,7 @@ def test_event_init_check_tuple(self): def test_event_init(self): """ Test if initialization method checks fields. """ - event = '{"__type": "Event", "source.asn": "foo"}' + event = msgpack.dumps({"__type": "Event", "source.asn": "foo"}) with self.assertRaises(exceptions.InvalidValue): message.MessageFactory.unserialize(event, harmonization=HARM) diff --git a/intelmq/tests/lib/test_pipeline.py b/intelmq/tests/lib/test_pipeline.py index f148e7958..3da9aa9b7 100644 --- a/intelmq/tests/lib/test_pipeline.py +++ b/intelmq/tests/lib/test_pipeline.py @@ -16,6 +16,7 @@ import intelmq.lib.pipeline as pipeline import intelmq.lib.test as test import intelmq.lib.exceptions as exceptions +import intelmq.lib.utils as utils SAMPLES = {'normal': [b'Lorem ipsum dolor sit amet', 'Lorem ipsum dolor sit amet'], @@ -66,7 +67,7 @@ def setUp(self): def test_receive(self): self.pipe.state['test-bot-input'] = [SAMPLES['normal'][0]] - self.assertEqual(SAMPLES['normal'][1], self.pipe.receive()) + self.assertEqual(SAMPLES['normal'][1], utils.decode(self.pipe.receive())) def test_send(self): self.pipe.send(SAMPLES['normal'][1]) @@ -75,7 +76,7 @@ def test_send(self): def test_receive_unicode(self): self.pipe.state['test-bot-input'] = [SAMPLES['unicode'][0]] - self.assertEqual(SAMPLES['unicode'][1], self.pipe.receive()) + self.assertEqual(SAMPLES['unicode'][1], utils.decode(self.pipe.receive())) def test_send_unicode(self): self.pipe.send(SAMPLES['unicode'][1]) @@ -106,7 +107,7 @@ def test_reject(self): self.pipe.state['test-bot-input'] = [SAMPLES['normal'][0]] self.pipe.receive() self.pipe.reject_message() - self.assertEqual(SAMPLES['normal'][1], self.pipe.receive()) + self.assertEqual(SAMPLES['normal'][1], utils.decode(self.pipe.receive())) def test_acknowledge(self): self.pipe.state['test-bot-input'] = [SAMPLES['normal'][0]] @@ -158,12 +159,12 @@ def test_send_receive(self): """ Sending bytest and receiving unicode. """ self.clear() self.pipe.send(SAMPLES['normal'][0]) - self.assertEqual(SAMPLES['normal'][1], self.pipe.receive()) + self.assertEqual(SAMPLES['normal'][1], utils.decode(self.pipe.receive())) def test_send_receive_unicode(self): self.clear() self.pipe.send(SAMPLES['unicode'][1]) - self.assertEqual(SAMPLES['unicode'][1], self.pipe.receive()) + self.assertEqual(SAMPLES['unicode'][1], utils.decode(self.pipe.receive())) def test_count(self): self.clear() @@ -182,7 +183,7 @@ def test_reject(self): self.pipe.send(SAMPLES['normal'][0]) self.pipe.receive() self.pipe.reject_message() - self.assertEqual(SAMPLES['normal'][1], self.pipe.receive()) + self.assertEqual(SAMPLES['normal'][1], utils.decode(self.pipe.receive())) def test_acknowledge(self): self.pipe.send(SAMPLES['normal'][0]) diff --git a/setup.py b/setup.py index fe0abcdd0..91991f668 100644 --- a/setup.py +++ b/setup.py @@ -15,6 +15,7 @@ 'pytz>=2012c', 'redis>=2.10', 'requests>=2.2.0', + 'msgpack>=0.5', ] exec(open(os.path.join(os.path.dirname(__file__),