Skip to content

Commit

Permalink
Feature/doc (#12)
Browse files Browse the repository at this point in the history
* Updated README.md

* Updated version.
  • Loading branch information
DustinMoriarty authored Aug 25, 2023
1 parent be7919d commit 473d15c
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 1 deletion.
65 changes: 65 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,3 +80,68 @@ async def main():
await generate_greeting("hello")
asyncio.run(main())
```
## Periodic Tasks
Sometimes we need to run tasks periodically such as polling an external database or an
internal queue. We also want to make sure that when our application shuts down, it
gracefully tells the periodic tasks to stop starting new tasks but it waits for the
existing task to complete.
```python
import asyncio
from asyncio_signal_bus import SignalBus

BUS = SignalBus()
@BUS.periodic_task(period_seconds=0.5)
async def print_foos():
print("foo")

async def main():
async with BUS:
await asyncio.sleep(2)
asyncio.run(main())
```
### Period Tasks As Publishers
Given that the periodic task is the furthest upstream in a process, we may use it to
drive downstream subscribers. We can combine the periodic task with the injectors and
the publisher to periodically fetch data using configuration information and send it
to subscribers for processing.

```python
import asyncio
from typing import List, Dict
from asyncio_signal_bus import SignalBus
import os

BUS = SignalBus()

async def get_url():
return os.environ.get("URL")

async def get_secret():
return os.environ.get("SECRET")

@BUS.periodic_task(period_seconds=1)
@BUS.publisher(topic_name="new-data")
@BUS.inject("url", get_url)
@BUS.inject("secret", get_secret)
async def get_data(url: str, secret: str):
# Perform some sort of IO here to get your data.
return [
{"id": 0, "value": "cats"},
{"id": 1, "value": "dogs"}
]

@BUS.subscriber(topic_name="new-data")
async def process_values(data: List[Dict]):
for row in data:
print(row["value"].upper())

@BUS.subscriber(topic_name="new-data")
async def process_ids(data: List[Dict]):
for row in data:
print(row["id"] + 1)

async def main():
async with BUS:
await asyncio.sleep(5)
asyncio.run(main())
```
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "asyncio-signal-bus"
version = "1.5.0"
version = "1.5.1"
description = "Internal application publisher/subscriber bus using asyncio queues."
authors = ["DustinMoriarty <[email protected]>"]
readme = "README.md"
Expand Down

0 comments on commit 473d15c

Please sign in to comment.