Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Examples of use? #11

Open
inklesspen opened this issue Dec 28, 2022 · 1 comment
Open

Examples of use? #11

inklesspen opened this issue Dec 28, 2022 · 1 comment
Assignees
Labels
documentation Improvements or additions to documentation

Comments

@inklesspen
Copy link

Hi, I think slurry is good for my use case, but I cannot tell for sure, because there are not enough examples of using the various parts of the library in combination.

I would particularly like to know:

  • the best way to have a pipeline take input from a Trio memory channel
  • the best way to have a pipeline tap output into a Trio memory channel
  • how to enable/disable sections of a pipeline on the fly
@andersea
Copy link
Owner

Hi! Thanks for taking a look at slurry. I agree that there isn't enough examples. I will keep this issue open, as a reminder to expand the documentation in this regard.

In the meantime, I can provide you with these simple examples:

  1. A pipeline takes any async iterable as input. Since a trio memory channel is an async iterable, it can be used as an input directly.
import trio
from slurry import Pipeline
from slurry.sections import Map

async def generate_ints(input: trio.MemorySendChannel):
    i = 0
    while True:
        await input.send(i)
        await trio.sleep(1)
        i += 1

async def main():
    send, receive = trio.open_memory_channel(1)
    async with trio.open_nursery() as nursery:
        nursery.start_soon(generate_ints, send)
        async with Pipeline.create(
            receive,
            Map(lambda x: x+1)
        ) as pipeline, pipeline.tap() as tap:
            async for i in tap:
                print(i)
                if i > 5:
                    nursery.cancel_scope.cancel()

trio.run(main)

Note that the Map section can also take its input directly, so the above pipeline is equivalent to:

    async with Pipeline.create(
        Map(lambda x: x+1, receive)
    ) as pipeline, pipeline.tap() as tap:
  1. When feeding an output to a memory channel, create a new tap, iterate it and use memorychannel.send() to send it along. Keep in mind also, that a tap is actually a trio MemorySendChannel, underneath the hood, and can be treated as such, meaning instead of iterating it, you can call receive on it, if you like that pattern better.
    async with Pipeline.create(
        Map(lambda x: x+1, receive)
    ) as pipeline, pipeline.tap() as tap:
        async for item in tap:
            await mymemorychannel.send(item)
    ### This is equivalent to -
    async with Pipeline.create(
        Map(lambda x: x+1, receive)
    ) as pipeline, pipeline.tap() as tap:
        while True:
            await mymemorychannel.send(await tap.receive())
  1. There is no direct way to enable or disable subsections of a pipeline. However there is a possible solution, which is to rewire the pipeline dynamically at runtime, using the pipeline.extend() method. You have to be careful to extend the pipeline, while the previous configuration is still running and only discard the old 'extension' when you are receiving items on the new one. This is because the way the pipeline is designed right now, if there is a time when an item is ready to be sent, but noone is listening, slurry will close the pipeline down.

Hope this helps! Happy holidays!

@andersea andersea added the documentation Improvements or additions to documentation label Dec 30, 2022
@andersea andersea self-assigned this Dec 30, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
documentation Improvements or additions to documentation
Projects
None yet
Development

No branches or pull requests

2 participants