-
Notifications
You must be signed in to change notification settings - Fork 233
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Runtime error when a long async callback is triggered #962
Comments
Thanks for the bug report! I'm learning a lot from it.
This line is an exception raised by In short It seems Python intends The most helpful links I found were this open cPython pull request, and the stack overflow question that motivated it. rclpy's Lines 54 to 58 in c76a6a1
Looking at I don't understand Maybe we could borrow ideas from other libraries? Another idea not in any of the other libraries would be to make |
Interestingly, I've been using a decorator that takes an asyncio function and makes it compatible with rclpy. It goes like this: def coro_guard(function):
if not asyncio.iscoroutinefunction(function):
raise Exception('Expected an async function')
async def f(*args, **kargs):
coro = function(*args, **kargs)
try:
while True:
future = coro.send(None)
while not future.done():
await asyncio.sleep(0) # yields None
future.result() # raises exceptions if any
except StopIteration as e:
return e.value
return f This mimics rclpy awaitable behavior:
I was using this as a temporary workaround until an upstream fix comes out, but maybe it could be the upstream fix? That decorator assumes we're only dealing with asyncio awaitables. To handle both rclpy and asyncio awaiables in one, we could improve the decorator to check if future is None: # Allows calling __await__ repeatedly on awaitables that require waiting for an future before doing so (e.g. asyncio).
# This will make asyncio functions compatible with rclpy implementation of async.
# See https://github.com/ros2/rclpy/issues/962 for more info
def coro_guard(function):
if not asyncio.iscoroutinefunction(function):
raise Exception('Expected an async function')
async def f(*args, **kargs):
coro = function(*args, **kargs)
try:
while True:
future = coro.send(None)
assert asyncio.isfuture(future) or future is None, "Unexpected awaitable behavior. Only use rclpy or asyncio awaitables."
if future is None: # coro is rclpy-style awaitable; await is expected to be called repeatedly
await asyncio.sleep(0)
continue
while not future.done(): # coro is asyncio-style awaitable; stop calling await until future is done.
await asyncio.sleep(0) # yields None
future.result()
except StopIteration as e:
return e.value
return f Now rclpy.Task can automatically apply this transformation to all the coroutine functions and it turns them into the rclpy-style. What are your thoughts? Should I update the PR to use this concept? |
The decorator is a cool idea. One problem could be if if There's another idea worth looking at. Say we had an api We could integrate it into It's still important to note somewhere that one must use a single threaded executor, and |
This works out of the box: import asyncio
import logging
import rclpy
from std_msgs.msg import String
logging.getLogger("asyncio").setLevel(logging.DEBUG)
async def async_cb(msg):
print("Received a message:")
await asyncio.sleep(0.1)
print(msg.data)
def cb(msg):
asyncio.ensure_future(async_cb(msg))
async def main():
rclpy.init()
node = rclpy.create_node("my_node")
node.create_subscription(String, "/test", cb, 10)
while True:
rclpy.spin_once(node, timeout_sec=0.001)
await asyncio.sleep(0.001)
asyncio.run(main()) |
Maybe this is the wrong place to ask, but are there any plans to add equivalents to some of the asyncio primitives to rclpy if there is only going to be limited support for asyncio? I made an attempt at an import time
import rclpy
import rclpy.task
import rclpy.executors
from rclpy.node import Node
from rclpy.callback_groups import ReentrantCallbackGroup
async def async_sleep(node: Node,
seconds: float,
callback_group=ReentrantCallbackGroup()):
future = rclpy.task.Future()
def handler():
future.set_result(None)
timer = node.create_timer(seconds, handler, callback_group=callback_group)
await future
timer.cancel()
timer.destroy()
return None
class AsyncSleepNode(Node):
def __init__(self):
super().__init__('timer_node')
self.cb_group = ReentrantCallbackGroup()
self.timer = self.create_timer(0.0, self.timer_callback, callback_group=self.cb_group) # Create a timer with 1-second interval
async def timer_callback(self):
self.timer.cancel()
self.timer.destroy()
print(f"started at {time.strftime('%X')}")
await async_sleep(self, 1.0)
print('hello')
await async_sleep(self, 1.0)
print('world')
print(f"finished at {time.strftime('%X')}")
def main(args=None):
rclpy.init(args=args)
node = AsyncSleepNode()
executor = rclpy.executors.SingleThreadedExecutor()
executor.add_node(node)
executor.spin()
rclpy.shutdown()
if __name__ == '__main__':
main() |
@samiamlabs Using a ReentrantCallbackGroup with a SingleThreadedExecutor has no effect. Using a MultiThreadedExecutor doesn't play well with asyncio. Additionally, if you recreate the callback group each time (which happens because you don't supply it explicitly to the Running your code, the timer only fires once (even if you change the timer to 1 second, instead of 0) and fix the above suggestions. |
Thank you for the feedback @Achllle :) Using The timer callback was only supposed to fire once in the example. I just wanted to trigger an async callback to run by the "rclpy event loop". I understand that naming it timer_callback is confusing and that you would expect it to fire multiple times though. The comment about recreating the callback group every time is a good one! I wanted to limit the number of required input arguments to |
I was not aware that using a I don't think recreating the callback group here is an issue given you're trying to have it execute in parallel, but I believe if you recreate the group each time with a Though the PR to make
output:
@sloretz Curious if you agree with this approach or would suggest an alternative. I'd be happy to create a docs guide around this. |
Folks, how would you implement asyncio timeouts with a rclpy Future to be able to detect a timeouts if something never returns (like a service or action async call) |
Did you mean something like this @charlielito? import rclpy
import rclpy.task
import rclpy.executors
from rclpy.node import Node
from rclpy.callback_groups import ReentrantCallbackGroup
import example_interfaces.srv
async def async_sleep(node: Node, seconds: float):
future = rclpy.task.Future()
def handler():
future.set_result(None)
timer = node.create_timer(seconds, handler)
await future
timer.cancel()
timer.destroy()
return None
async def future_with_timeout(node: Node, future: rclpy.task.Future, timeout_sec: float):
first_done_future = rclpy.task.Future()
def handler(arg=None):
first_done_future.set_result(None)
timer = node.create_timer(timeout_sec, handler)
future.add_done_callback(handler)
await first_done_future
if not future.done():
raise TimeoutError(f"Future timed out after {timeout_sec} seconds")
timer.cancel()
timer.destroy()
return future.result()
class AsyncFutureWithTimeoutNode(Node):
def __init__(self):
super().__init__('timer_node')
self._default_callback_group = ReentrantCallbackGroup()
self.add_two_inst_service = self.create_service(
example_interfaces.srv.AddTwoInts,
'test_service',
self.test_service_callback
)
self.test_client = self.create_client(
example_interfaces.srv.AddTwoInts,
'test_service')
self.timer = self.create_timer(0.0, self.timer_callback) # Create a timer with 1-second interval
async def test_service_callback(self, request, response):
response.sum = request.a + request.b
await async_sleep(self, 1.0)
return response
async def timer_callback(self):
self.timer.cancel()
request = example_interfaces.srv.AddTwoInts.Request()
request.a = 38
request.b = 4
try:
future = self.test_client.call_async(request)
response = await future_with_timeout(self, future, 0.5)
print(f"AddTwoInts service call succeeded, the result is {response.sum}!")
except TimeoutError as e:
print(e)
try:
future = self.test_client.call_async(request)
response = await future_with_timeout(self, future, 1.5)
print(f"AddTwoInts service call succeeded, the result is {response.sum}!")
except TimeoutError as e:
print(e)
def main(args=None):
rclpy.init(args=args)
node = AsyncFutureWithTimeoutNode()
executor = rclpy.executors.SingleThreadedExecutor()
executor.add_node(node)
executor.spin()
rclpy.shutdown()
if __name__ == '__main__':
main() The output should be: Future timed out after 0.5 seconds
AddTwoInts service call succeeded, the result is 42! |
That works! Thanks! |
I have been using an extended version of https://github.com/samiamlabs/ros2_vscode/blob/main/src/vscode_py/vscode_py/async_primitives.py in some of my work projects. What I have so far is:
Are there any plans to add primitives like these to rclpy? Do you know anything about this @sloretz? |
Cool work! We implemented similar utils for out project. I think more work should go into expanding async capabilities. |
Haha, I can't believe I missed that. Thanks for pointing it out. It should be easy to fix.
I suspected it would not be very performant. I have only been using it to sleep for relatively long periods myself.
That would be nice! Good to hear that more people than me see the utility of this. I brought it up in a meeting about executors at the last ROSCon in Odense. They suggested I post about it in some specific place that I promptly forgot about... By the way. I'm concerned that something about this solution does not quite work. A colleague of mine found my code, stress-tested it, and concluded it did not work (without my knowledge). It hasn't been locking up or anything like that in your testing? |
Callback groups in rclpy are really interesting. By default, all callbacks are configured with the same mutually exclusive callback group.
The second option is useful for services, but I mostly use tasks these days. Under the hood, the rclpy executor is implemented using async, and every callback is handled as a task. rclpy/rclpy/rclpy/executors.py Line 760 in 53d7760
rclpy/rclpy/rclpy/executors.py Lines 553 to 557 in 53d7760
Each task is yielded in a loop. rclpy/rclpy/rclpy/executors.py Line 525 in 53d7760
The callback group is exited only when the callback returns rclpy/rclpy/rclpy/executors.py Lines 539 to 543 in 53d7760
Awaiting in a callback requires a correct configuration of callback groups. |
Bug report
Required Info:
Steps to reproduce issue
Run the subscriber and then run:
Expected behavior
When published to /test, the node should print
Received a message:
, wait for 0.1 seconds, and then print the message data.From #166 (comment):
Actual behavior
Additional information
Observation
Reducing the timeout in the asyncio.sleep line makes the issue disappear. In the example on provided, on my system, a timeout of ~0.002 seconds or smaller works fine.
Guess
This piece might play a role:
rclpy/rclpy/rclpy/task.py
Lines 234 to 256 in c76a6a1
My theory is that
self._handler.send(None)
is non-blocking, causing the lineself._executing = False
to be executed before the async callback is finished (whileself.done
is stillFalse
). So this causes the function to be assumed as paused (kind of like a yielded generator), so the already-running coro object gets called again by this piece:rclpy/rclpy/rclpy/executors.py
Lines 470 to 482 in c76a6a1
This causes some kind of clash, hence
RuntimeError: await wasn't used with future
.So as a quick test, I commented out the
self._executing = False
line. This caused the second call to be avoided, (so no runtime error anymore), but the callback didn't finish (the message wasn't printed out).Another minimal example
rclpy practically does something like this:
Which also throws
RuntimeError: await wasn't used with future
.The text was updated successfully, but these errors were encountered: