0.0.9.4 (2023-05-01)
Great news! Now Propan can be used as a full part of FastAPI!
from fastapi import FastAPI
from propan.fastapi import RabbitRouter
app = FastAPI()
router = RabbitRouter("amqp://guest:guest@localhost:5672")
@router.event("test")
async def hello(m: dict) -> dict:
return { "response": "Hello Propan!" }
app.include_router(router)
You can find a complete example in documentation
Also, added the ability to test your application without running external dependencies as a broker (for now only for RabbitMQ)!
from propan import RabbitBroker
from propan.test import TestRabbitBroker
broker = RabbitBroker()
@broker.handler("ping")
async def healthcheck(msg: str) -> str:
return "pong"
def test_publish():
async with TestRabbitBroker(broker) as test_broker:
await test_broker.start()
r = await test_broker.publish("ping", queue="ping", callback=True)
assert r == "pong"
Also added support for RPC over MQ (RabbitMQ only for now): return
of your handler function will be sent in response to a message if a response is expected.
Breaking changes:
- Brokers
publish_message
method has been renamed topublish
- removed
declare
argument inRabbitQueue
andRabbitExchange
- now you need to usepassive