Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BLOCKED] datastreams: implement asynchronous writer #140

Closed
wants to merge 1 commit into from

Conversation

ppanero
Copy link
Member

@ppanero ppanero commented Jan 21, 2022

closes #139

@ppanero ppanero force-pushed the async branch 3 times, most recently from 63e2d92 to 4e168c5 Compare January 21, 2022 14:29
"""Write an entry.

:param writer: writer configuration as accepted by the WriterFactory.
:param entry: dictionary, StreamEntry is not serializable.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note this. StreamEntry is not serializable so a bit of juggling is needed.


:param writer: writer to use.
"""
self._writer = writer
Copy link
Member Author

@ppanero ppanero Jan 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Accepts a writer configuration, we cannot accept the object because that would make it difficult to pass via config file/definition. Would only work for a programmatic API.

This config will be sent to the WriterFactory later on. I'm not convinced about the name of this parameter. Alternatives, which I dont fully like either are:

  • writer_conf, writer_config, w_conf...
  • config, conf...

Everything is too generic....

@ppanero ppanero marked this pull request as ready for review January 21, 2022 14:43
@ppanero
Copy link
Member Author

ppanero commented Jan 21, 2022

@slint it is ready for review. note that names is not writen async right now, but if we agree on this implementation is just a matter of changing this config, which would actually close #111.

@ppanero ppanero requested a review from slint January 21, 2022 14:45


@shared_task(ignore_result=True)
def write_entry(writer, entry):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def write_entry(writer, entry):
def write_entry(writer_config, entry):

Since it's passed to WriterFactory.create(config=...)

@@ -110,3 +111,21 @@ def write(self, stream_entry, *args, **kwargs):
yaml.safe_dump([stream_entry.entry], file)

return stream_entry


class AsyncWriter(BaseWriter):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From what I understand, this approach is to use this writer as a container for other writer configs.

I was thinking if it would be possible to have a delay kwarg in BaseWriter, which if True would achieve the same result... But from what I see that's pretty difficult to handle, given __init__ overrides, etc.

My fear is that we end up with a very complex datastream config structure, where especially in YAML it's easy to make indentation mistakes, which as a result would eventually lead to more difficult UX/DX.

Other approaches to discuss:

  • Move this up to the DataStream, i.e. in BaseDataStream.write(...), do something like if writer.delay: ...call task...
  • Preserve the original writer config/init-params when initializing writers?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with you. However, changes are not trivial.

In both cases we would need to implement a way to either save the configuration (type + args) or make writers JSON serializable. Otherwise, we won't be able to send them to the task. IMO this fits more at data stream level, i.e. the writers writes, but is the data stream who decides how/when.

@ppanero ppanero changed the title datastreams: implement asynchronous writer [BLOCKED] datastreams: implement asynchronous writer May 6, 2022
@slint
Copy link
Member

slint commented Jul 16, 2024

Rebased and included in #357

@slint slint closed this Jul 16, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
Development

Successfully merging this pull request may close these issues.

writers: support async
3 participants