diff --git a/README.md b/README.md index ccb774a..ebd8db1 100644 --- a/README.md +++ b/README.md @@ -80,3 +80,68 @@ async def main(): await generate_greeting("hello") asyncio.run(main()) ``` +## Periodic Tasks +Sometimes we need to run tasks periodically such as polling an external database or an +internal queue. We also want to make sure that when our application shuts down, it +gracefully tells the periodic tasks to stop starting new tasks but it waits for the +existing task to complete. +```python +import asyncio +from asyncio_signal_bus import SignalBus + +BUS = SignalBus() +@BUS.periodic_task(period_seconds=0.5) +async def print_foos(): + print("foo") + +async def main(): + async with BUS: + await asyncio.sleep(2) +asyncio.run(main()) +``` +### Period Tasks As Publishers +Given that the periodic task is the furthest upstream in a process, we may use it to +drive downstream subscribers. We can combine the periodic task with the injectors and +the publisher to periodically fetch data using configuration information and send it +to subscribers for processing. + +```python +import asyncio +from typing import List, Dict +from asyncio_signal_bus import SignalBus +import os + +BUS = SignalBus() + +async def get_url(): + return os.environ.get("URL") + +async def get_secret(): + return os.environ.get("SECRET") + +@BUS.periodic_task(period_seconds=1) +@BUS.publisher(topic_name="new-data") +@BUS.inject("url", get_url) +@BUS.inject("secret", get_secret) +async def get_data(url: str, secret: str): + # Perform some sort of IO here to get your data. + return [ + {"id": 0, "value": "cats"}, + {"id": 1, "value": "dogs"} + ] + +@BUS.subscriber(topic_name="new-data") +async def process_values(data: List[Dict]): + for row in data: + print(row["value"].upper()) + +@BUS.subscriber(topic_name="new-data") +async def process_ids(data: List[Dict]): + for row in data: + print(row["id"] + 1) + +async def main(): + async with BUS: + await asyncio.sleep(5) +asyncio.run(main()) +``` \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index 1b696e1..396764e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "asyncio-signal-bus" -version = "1.5.0" +version = "1.5.1" description = "Internal application publisher/subscriber bus using asyncio queues." authors = ["DustinMoriarty "] readme = "README.md"