-
Notifications
You must be signed in to change notification settings - Fork 17
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Using channels_rabbitmq makes group_send non-reenterable #14
Comments
I'm on vacation, so this will take until July to fix. In the meantime, we can debug a bit: Is your Websockets client actually consuming the messages? ... I'm developing a hunch. I suspect the send is stalling because no consumers are consuming the messages. That's a bug, and I'll look into it after vacation. In the meantime, v1.1.3 should "fix" this by discarding stale messages. (It'll warn when it does.) Do you experience this with 1.1.3? Aside from discarding stale messages, 1.1.3 also adds a config param and more docs that can help you avoid issues on production. (To be clear: send() should never stall. But with 1.1.3, if my hunch is correct, the stall will clear up once messages expire.) |
Hi @adamhooper,
I've started a daphne server and connected to the websocket from the browser, so the consumer is present. Moreover, when I disconnect websocket client, the issue disappears.
Not tested yet, 1.1.2 is installed now, will try. |
Tested against 1.1.3 and found the same issue. Lowering the I've changed the code for stress test slightly to be more intuitive. from channels.layers import get_channel_layer
import asyncio
from django.utils import timezone
class SpamGenerator:
def __init__(self):
self.counter = 0
self.running = True
async def test_event_generator(self, id):
"""(async) Spam thread sending an event to the group every second"""
while self.running:
self.counter += 1
print("!" + ">" * self.counter)
await self.async_group_send('simulation_event', {'Hello': 'World!'})
self.counter -= 1
print("!" + "<" * self.counter)
await asyncio.sleep(1)
async def test_time_print(self, seconds):
"""(async) Spam thread printing time every second and finishing loop"""
ts = timezone.now()
while timezone.now() - ts < timezone.timedelta(0, seconds):
print(">>>%s" % timezone.now())
await asyncio.sleep(1)
self.running = False
await asyncio.sleep(2)
if self.counter:
raise Exception("Something went wrong!")
async def async_group_send(self, kind, data):
"""(async) Function sending a message to the group"""
layer = get_channel_layer()
await layer.group_send(
kind,
{
'type': kind,
'data': data
}
)
def test_spam_generator(self):
"""Spam generator getting loop and starting spam threads simultaneously"""
loop = asyncio.get_event_loop()
jobs = list(self.test_event_generator('!%03d' % i) for i in range(10)) + [self.test_time_print(30)]
tasks = asyncio.gather(*jobs)
try:
loop.run_until_complete(tasks)
except Exception as ex:
print("???", ex)
print("COUNTER:", self.counter) That's the result (consumer is connected):
|
Thanks for testing! With 1.1.3 and local_expiry=2s, does this still happen? Separate test: if you make your Websockets client print each message as it's received, does this still happen? |
The same
I am using some chrome extension to test websockets chrome://extensions/?id=omalebghpgejjiaoknljcfmglgbpocdp It receives a message and outputs it on the screen in the special widget. I am also looking to the chrome network journal and see packets received. Should note that number of received packets corresponds to non-stalled send calls. There also some daphne debug output while processing messages above:
|
Thank you so much for all this testing. One last question: if you use a object-scoped asyncio.Lock around your group_send call, does the problem persist? |
Hi @adamhooper, When the lock (upd: around a group_send call) is used, the issue disappears. |
Thank you for this awesome testing! I look forward to fixing this when I return, second week of July. In the meantime, asyncio.Lock seems like a tolerable workaround :) |
A lock is the actual solution, so I'm adding it to channels_rabbitmq. I'm also filing Polyconseil/aioamqp#205 to clear up ambiguity. This is fixed in v1.1.4 (which Travis should publish momentarily). |
When:
layer.group_send()
simultaneouslythere is a high probability that some of these concurrent awaits never finished.
I have a JsonWebsocketConsumer like this (some details are removed):
To reproduce an issue I wrote code like this:
I've started a daphne server and connected to the websocket from the browser.
Then I've started my test code:
As you can see in the code, all counters should be 0 at the end.
It's interesting that if I disconnect the browser from the websocket (so the consumer is unregistered), the issue disappears.
Some other tests showed me that the
await group_send
is really stunned for some concurrent calls.The (upd) channels_redis backend works fine in the same circumstances.
Software versions:
The text was updated successfully, but these errors were encountered: