Replies: 2 comments 10 replies
-
Can you provide an MCVE for this? |
Beta Was this translation helpful? Give feedback.
4 replies
-
In general, something like this should work: import asyncio
from contextlib import asynccontextmanager
from typing import AsyncGenerator
from litestar import Litestar, get
from litestar.channels import ChannelsPlugin
from litestar.channels.backends.memory import MemoryChannelsBackend
from litestar.response import ServerSentEvent
@asynccontextmanager
async def lifespan(app: Litestar) -> AsyncGenerator[None, None]:
channels = app.plugins.get(ChannelsPlugin)
stop = asyncio.Event()
async def background_task():
while not stop.is_set():
channels.publish("hello", ["some_channel"])
await asyncio.sleep(1)
task = asyncio.create_task(background_task())
yield
stop.set()
await task
@get("/")
async def sse_handler(channels: ChannelsPlugin) -> ServerSentEvent:
async def generator() -> AsyncGenerator[bytes, None]:
async with channels.start_subscription(["some_channel"]) as subscriber:
async for event in subscriber.iter_events():
yield event
return ServerSentEvent(generator())
app = Litestar(
[sse_handler],
plugins=[ChannelsPlugin(MemoryChannelsBackend(), channels=["some_channel"])],
lifespan=[lifespan],
) I tested this locally and closing the client always correctly seems to terminate the connection and server shutdown does not hang for me. |
Beta Was this translation helpful? Give feedback.
6 replies
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
-
Based on my question here, I also looked into using Channels with ServerSentEvents when I just want to push messages from the server to the client and the full-duplex capability of websockets is not needed. Plus, since SSE is covered by a regular http request I don't need to add custom session/cookie handling as I had to for the websockets.
I ran into a similar problem as for the websockets that is: how to safely iterate over the events ;) Using
subscriber.iter_events()
worked but it would not close when the client connection closed so that was a problem (e.g., shutting down the uvicorn process would actually get stuck and it needed to be killed manually). I had the impression that thelifespan
andasynccontextmanager
approach would work here (https://github.com/orgs/litestar-org/discussions/3084#discussioncomment-8405508)So I came up with my own solution with inspiration from a
starlette-sse
pattern. Since litestar does not have the option to pass adata_sender_callable
to theServerSentEvent
response I instead start the pushing loop as a separate task. Since this task is outside the task group that handles the stream, the disconnect is not registered by this task. So instead, I pass a simple future that is set done in aBackgroundTask
once the sse client disconnects and the loop should be stopped.That seemed to work fine (e.g., log messages indicating completion of task), but I'm not sure if there are better or more efficient ways to do this.
Beta Was this translation helpful? Give feedback.
All reactions