Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BLOCKED] datastreams: implement asynchronous writer #140

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions invenio_vocabularies/datastreams/tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2022 CERN.
#
# Invenio-Vocabularies is free software; you can redistribute it and/or
# modify it under the terms of the MIT License; see LICENSE file for more
# details.

"""Data Streams Celery tasks."""

from celery import shared_task

from ..datastreams import StreamEntry
from ..datastreams.factories import WriterFactory


@shared_task(ignore_result=True)
def write_entry(writer, entry):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def write_entry(writer, entry):
def write_entry(writer_config, entry):

Since it's passed to WriterFactory.create(config=...)

"""Write an entry.

:param writer: writer configuration as accepted by the WriterFactory.
:param entry: dictionary, StreamEntry is not serializable.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note this. StreamEntry is not serializable so a bit of juggling is needed.

"""
writer = WriterFactory.create(config=writer)
writer.write(StreamEntry(entry))
21 changes: 20 additions & 1 deletion invenio_vocabularies/datastreams/writers.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2021 CERN.
# Copyright (C) 2021-2022 CERN.
#
# Invenio-Vocabularies is free software; you can redistribute it and/or
# modify it under the terms of the MIT License; see LICENSE file for more
Expand All @@ -18,6 +18,7 @@

from .datastreams import StreamEntry
from .errors import WriterError
from .tasks import write_entry


class BaseWriter:
Expand Down Expand Up @@ -110,3 +111,21 @@ def write(self, stream_entry, *args, **kwargs):
yaml.safe_dump([stream_entry.entry], file)

return stream_entry


class AsyncWriter(BaseWriter):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From what I understand, this approach is to use this writer as a container for other writer configs.

I was thinking if it would be possible to have a delay kwarg in BaseWriter, which if True would achieve the same result... But from what I see that's pretty difficult to handle, given __init__ overrides, etc.

My fear is that we end up with a very complex datastream config structure, where especially in YAML it's easy to make indentation mistakes, which as a result would eventually lead to more difficult UX/DX.

Other approaches to discuss:

  • Move this up to the DataStream, i.e. in BaseDataStream.write(...), do something like if writer.delay: ...call task...
  • Preserve the original writer config/init-params when initializing writers?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with you. However, changes are not trivial.

In both cases we would need to implement a way to either save the configuration (type + args) or make writers JSON serializable. Otherwise, we won't be able to send them to the task. IMO this fits more at data stream level, i.e. the writers writes, but is the data stream who decides how/when.

"""Writes the entries asynchronously (celery task)."""

def __init__(self, writer, *args, **kwargs):
"""Constructor.

:param writer: writer to use.
"""
self._writer = writer
Copy link
Member Author

@ppanero ppanero Jan 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Accepts a writer configuration, we cannot accept the object because that would make it difficult to pass via config file/definition. Would only work for a programmatic API.

This config will be sent to the WriterFactory later on. I'm not convinced about the name of this parameter. Alternatives, which I dont fully like either are:

  • writer_conf, writer_config, w_conf...
  • config, conf...

Everything is too generic....

super().__init__(*args, **kwargs)

def write(self, stream_entry, *args, **kwargs):
"""Launches a celery task to write an entry."""
write_entry.delay(self._writer, stream_entry.entry)

return stream_entry
7 changes: 6 additions & 1 deletion tests/datastreams/conftest.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2021 CERN.
# Copyright (C) 2021-2022 CERN.
#
# Invenio-Vocabularies is free software; you can redistribute it and/or
# modify it under the terms of the MIT License; see LICENSE file for more
Expand All @@ -14,6 +14,8 @@

import pytest

from invenio_vocabularies.config import VOCABULARIES_DATASTREAM_READERS, \
VOCABULARIES_DATASTREAM_TRANSFORMERS, VOCABULARIES_DATASTREAM_WRITERS
from invenio_vocabularies.datastreams import StreamEntry
from invenio_vocabularies.datastreams.errors import TransformerError, \
WriterError
Expand Down Expand Up @@ -65,12 +67,15 @@ def write(self, stream_entry, *args, **kwargs):
def app_config(app_config):
"""Mimic an instance's configuration."""
app_config["VOCABULARIES_DATASTREAM_READERS"] = {
**VOCABULARIES_DATASTREAM_READERS,
"test": TestReader
}
app_config["VOCABULARIES_DATASTREAM_TRANSFORMERS"] = {
**VOCABULARIES_DATASTREAM_TRANSFORMERS,
"test": TestTransformer
}
app_config["VOCABULARIES_DATASTREAM_WRITERS"] = {
**VOCABULARIES_DATASTREAM_WRITERS,
"test": TestWriter,
"fail": FailingTestWriter,
}
Expand Down
34 changes: 34 additions & 0 deletions tests/datastreams/test_datastreams_tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2021-2022 CERN.
#
# Invenio-Vocabularies is free software; you can redistribute it and/or
# modify it under the terms of the MIT License; see LICENSE file for more
# details.

"""Data Streams tasks tests."""

from pathlib import Path

import yaml

from invenio_vocabularies.datastreams import StreamEntry
from invenio_vocabularies.datastreams.tasks import write_entry


def test_write_entry(app):
filepath = 'writer_test.yaml'
yaml_writer_config = {
"type": "yaml",
"args": {
"filepath": filepath
}
}
entry = {"key_one": [{"inner_one": 1}]}
write_entry(yaml_writer_config, entry)

filepath = Path(filepath)
with open(filepath) as file:
assert yaml.safe_load(file) == [entry]

filepath.unlink()
41 changes: 39 additions & 2 deletions tests/datastreams/test_writers.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2021 CERN.
# Copyright (C) 2021-2022 CERN.
#
# Invenio-Vocabularies is free software; you can redistribute it and/or
# modify it under the terms of the MIT License; see LICENSE file for more
Expand All @@ -16,7 +16,12 @@

from invenio_vocabularies.datastreams import StreamEntry
from invenio_vocabularies.datastreams.errors import WriterError
from invenio_vocabularies.datastreams.writers import ServiceWriter, YamlWriter
from invenio_vocabularies.datastreams.writers import AsyncWriter, \
ServiceWriter, YamlWriter

##
# Service Writer
##


def test_service_writer_non_existing(lang_type, lang_data, service, identity):
Expand Down Expand Up @@ -71,6 +76,10 @@ def test_service_writer_update_non_existing(

assert dict(record, **updated_lang) == record

##
# YAML Writer
##


def test_yaml_writer():
filepath = Path('writer_test.yaml')
Expand All @@ -87,3 +96,31 @@ def test_yaml_writer():
assert yaml.safe_load(file) == test_output

filepath.unlink()

##
# Async Writer
##


def test_async_writer(app):
filepath = 'writer_test.yaml'
yaml_writer_config = {
"type": "yaml",
"args": {
"filepath": filepath
}
}
async_writer = AsyncWriter(yaml_writer_config)

test_output = [
{"key_one": [{"inner_one": 1}]},
{"key_two": [{"inner_two": "two"}]}
]
for output in test_output:
async_writer.write(stream_entry=StreamEntry(output))

filepath = Path(filepath)
with open(filepath) as file:
assert yaml.safe_load(file) == test_output

filepath.unlink()