From 837a0692aa502313ca8163242def9a5796f0c42c Mon Sep 17 00:00:00 2001 From: Pablo Panero Date: Fri, 21 Jan 2022 14:28:19 +0100 Subject: [PATCH] datastreams: implement asynchronous writer --- invenio_vocabularies/datastreams/tasks.py | 25 +++++++++++++ invenio_vocabularies/datastreams/writers.py | 21 ++++++++++- tests/datastreams/conftest.py | 7 +++- tests/datastreams/test_datastreams_tasks.py | 34 +++++++++++++++++ tests/datastreams/test_writers.py | 41 ++++++++++++++++++++- 5 files changed, 124 insertions(+), 4 deletions(-) create mode 100644 invenio_vocabularies/datastreams/tasks.py create mode 100644 tests/datastreams/test_datastreams_tasks.py diff --git a/invenio_vocabularies/datastreams/tasks.py b/invenio_vocabularies/datastreams/tasks.py new file mode 100644 index 00000000..9407c051 --- /dev/null +++ b/invenio_vocabularies/datastreams/tasks.py @@ -0,0 +1,25 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) 2022 CERN. +# +# Invenio-Vocabularies is free software; you can redistribute it and/or +# modify it under the terms of the MIT License; see LICENSE file for more +# details. + +"""Data Streams Celery tasks.""" + +from celery import shared_task + +from ..datastreams import StreamEntry +from ..datastreams.factories import WriterFactory + + +@shared_task(ignore_result=True) +def write_entry(writer, entry): + """Write an entry. + + :param writer: writer configuration as accepted by the WriterFactory. + :param entry: dictionary, StreamEntry is not serializable. + """ + writer = WriterFactory.create(config=writer) + writer.write(StreamEntry(entry)) diff --git a/invenio_vocabularies/datastreams/writers.py b/invenio_vocabularies/datastreams/writers.py index f062a21a..1c2f65e5 100644 --- a/invenio_vocabularies/datastreams/writers.py +++ b/invenio_vocabularies/datastreams/writers.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- # -# Copyright (C) 2021 CERN. +# Copyright (C) 2021-2022 CERN. # # Invenio-Vocabularies is free software; you can redistribute it and/or # modify it under the terms of the MIT License; see LICENSE file for more @@ -18,6 +18,7 @@ from .datastreams import StreamEntry from .errors import WriterError +from .tasks import write_entry class BaseWriter: @@ -110,3 +111,21 @@ def write(self, stream_entry, *args, **kwargs): yaml.safe_dump([stream_entry.entry], file) return stream_entry + + +class AsyncWriter(BaseWriter): + """Writes the entries asynchronously (celery task).""" + + def __init__(self, writer, *args, **kwargs): + """Constructor. + + :param writer: writer to use. + """ + self._writer = writer + super().__init__(*args, **kwargs) + + def write(self, stream_entry, *args, **kwargs): + """Launches a celery task to write an entry.""" + write_entry.delay(self._writer, stream_entry.entry) + + return stream_entry diff --git a/tests/datastreams/conftest.py b/tests/datastreams/conftest.py index e55b2287..688f297b 100644 --- a/tests/datastreams/conftest.py +++ b/tests/datastreams/conftest.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- # -# Copyright (C) 2021 CERN. +# Copyright (C) 2021-2022 CERN. # # Invenio-Vocabularies is free software; you can redistribute it and/or # modify it under the terms of the MIT License; see LICENSE file for more @@ -14,6 +14,8 @@ import pytest +from invenio_vocabularies.config import VOCABULARIES_DATASTREAM_READERS, \ + VOCABULARIES_DATASTREAM_TRANSFORMERS, VOCABULARIES_DATASTREAM_WRITERS from invenio_vocabularies.datastreams import StreamEntry from invenio_vocabularies.datastreams.errors import TransformerError, \ WriterError @@ -65,12 +67,15 @@ def write(self, stream_entry, *args, **kwargs): def app_config(app_config): """Mimic an instance's configuration.""" app_config["VOCABULARIES_DATASTREAM_READERS"] = { + **VOCABULARIES_DATASTREAM_READERS, "test": TestReader } app_config["VOCABULARIES_DATASTREAM_TRANSFORMERS"] = { + **VOCABULARIES_DATASTREAM_TRANSFORMERS, "test": TestTransformer } app_config["VOCABULARIES_DATASTREAM_WRITERS"] = { + **VOCABULARIES_DATASTREAM_WRITERS, "test": TestWriter, "fail": FailingTestWriter, } diff --git a/tests/datastreams/test_datastreams_tasks.py b/tests/datastreams/test_datastreams_tasks.py new file mode 100644 index 00000000..d907422f --- /dev/null +++ b/tests/datastreams/test_datastreams_tasks.py @@ -0,0 +1,34 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) 2021-2022 CERN. +# +# Invenio-Vocabularies is free software; you can redistribute it and/or +# modify it under the terms of the MIT License; see LICENSE file for more +# details. + +"""Data Streams tasks tests.""" + +from pathlib import Path + +import yaml + +from invenio_vocabularies.datastreams import StreamEntry +from invenio_vocabularies.datastreams.tasks import write_entry + + +def test_write_entry(app): + filepath = 'writer_test.yaml' + yaml_writer_config = { + "type": "yaml", + "args": { + "filepath": filepath + } + } + entry = {"key_one": [{"inner_one": 1}]} + write_entry(yaml_writer_config, entry) + + filepath = Path(filepath) + with open(filepath) as file: + assert yaml.safe_load(file) == [entry] + + filepath.unlink() diff --git a/tests/datastreams/test_writers.py b/tests/datastreams/test_writers.py index 576dba4c..28e10b5e 100644 --- a/tests/datastreams/test_writers.py +++ b/tests/datastreams/test_writers.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- # -# Copyright (C) 2021 CERN. +# Copyright (C) 2021-2022 CERN. # # Invenio-Vocabularies is free software; you can redistribute it and/or # modify it under the terms of the MIT License; see LICENSE file for more @@ -16,7 +16,12 @@ from invenio_vocabularies.datastreams import StreamEntry from invenio_vocabularies.datastreams.errors import WriterError -from invenio_vocabularies.datastreams.writers import ServiceWriter, YamlWriter +from invenio_vocabularies.datastreams.writers import AsyncWriter, \ + ServiceWriter, YamlWriter + +## +# Service Writer +## def test_service_writer_non_existing(lang_type, lang_data, service, identity): @@ -71,6 +76,10 @@ def test_service_writer_update_non_existing( assert dict(record, **updated_lang) == record +## +# YAML Writer +## + def test_yaml_writer(): filepath = Path('writer_test.yaml') @@ -87,3 +96,31 @@ def test_yaml_writer(): assert yaml.safe_load(file) == test_output filepath.unlink() + +## +# Async Writer +## + + +def test_async_writer(app): + filepath = 'writer_test.yaml' + yaml_writer_config = { + "type": "yaml", + "args": { + "filepath": filepath + } + } + async_writer = AsyncWriter(yaml_writer_config) + + test_output = [ + {"key_one": [{"inner_one": 1}]}, + {"key_two": [{"inner_two": "two"}]} + ] + for output in test_output: + async_writer.write(stream_entry=StreamEntry(output)) + + filepath = Path(filepath) + with open(filepath) as file: + assert yaml.safe_load(file) == test_output + + filepath.unlink()