Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add Task methods to make _asyncio more similar to cpython #8647

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 143 additions & 1 deletion extmod/modasyncio.c
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@
(task)->state == TASK_STATE_DONE_NOT_WAITED_ON \
|| (task)->state == TASK_STATE_DONE_WAS_WAITED_ON)

#define IS_CANCELLED_ERROR(error) ( \
mp_obj_is_subclass_fast( \
MP_OBJ_FROM_PTR(mp_obj_get_type(error)), \
mp_obj_dict_get(asyncio_context, MP_OBJ_NEW_QSTR(MP_QSTR_CancelledError)) \
))

typedef struct _mp_obj_task_t {
mp_pairheap_t pairheap;
mp_obj_t coro;
Expand Down Expand Up @@ -202,6 +208,114 @@ STATIC mp_obj_t task_done(mp_obj_t self_in) {
}
STATIC MP_DEFINE_CONST_FUN_OBJ_1(task_done_obj, task_done);

STATIC mp_obj_t task_add_done_callback(mp_obj_t self_in, mp_obj_t callback) {
assert(mp_obj_is_callable(callback));
mp_obj_task_t *self = MP_OBJ_TO_PTR(self_in);

if (TASK_IS_DONE(self)) {
// In CPython the callbacks are not immediately called and are instead
// called by the event loop. However, MicroPython's event loop doesn't
// support `call_soon` to handle callback processing.
//
// Because of this, it's close enough to call the callback immediately.

mp_call_function_2(callback, self_in, self->data);
return mp_const_none;
}

if (self->state != mp_const_true) {
// Tasks SHOULD support more than one callback per CPython but to reduce
// the surface area of this change tasks can currently only support one.
mp_raise_msg(&mp_type_RuntimeError, MP_ERROR_TEXT(">1 callback unsupported"));
}

self->state = callback;
return mp_const_none;
}
STATIC MP_DEFINE_CONST_FUN_OBJ_2(task_add_done_callback_obj, task_add_done_callback);

STATIC mp_obj_t task_remove_done_callback(mp_obj_t self_in, mp_obj_t callback) {
mp_obj_task_t *self = MP_OBJ_TO_PTR(self_in);

if (callback != self->state) {
// If the callback isn't a match we can count this as removing 0 callbacks
return mp_obj_new_int(0);
}

self->state = mp_const_true;
return mp_obj_new_int(1);
}
STATIC MP_DEFINE_CONST_FUN_OBJ_2(task_remove_done_callback_obj, task_remove_done_callback);

STATIC mp_obj_t task_get_coro(mp_obj_t self_in) {
mp_obj_task_t *self = MP_OBJ_TO_PTR(self_in);
return MP_OBJ_FROM_PTR(self->coro);
}
STATIC MP_DEFINE_CONST_FUN_OBJ_1(task_get_coro_obj, task_get_coro);

STATIC mp_obj_t task_exception(mp_obj_t self_in) {
mp_obj_task_t *self = MP_OBJ_TO_PTR(self_in);

if (!TASK_IS_DONE(self)) {
mp_obj_t error_type = mp_obj_dict_get(asyncio_context, MP_OBJ_NEW_QSTR(MP_QSTR_InvalidStateError));
nlr_raise(mp_make_raise_obj(error_type));
}

// If the exception is a cancelled error then we should raise it
if (IS_CANCELLED_ERROR(self->data)) {
nlr_raise(mp_make_raise_obj(self->data));
}

// If it's a StopIteration we should should return none
if (mp_obj_is_subclass_fast(MP_OBJ_FROM_PTR(mp_obj_get_type(self->data)), MP_OBJ_FROM_PTR(&mp_type_StopIteration))) {
return mp_const_none;
}

if (!mp_obj_is_exception_instance(self->data)) {
return mp_const_none;
}

return self->data;
}
STATIC MP_DEFINE_CONST_FUN_OBJ_1(task_exception_obj, task_exception);

STATIC mp_obj_t task_result(mp_obj_t self_in) {
mp_obj_task_t *self = MP_OBJ_TO_PTR(self_in);

if (!TASK_IS_DONE(self)) {
mp_obj_t error_type = mp_obj_dict_get(asyncio_context, MP_OBJ_NEW_QSTR(MP_QSTR_InvalidStateError));
nlr_raise(mp_make_raise_obj(error_type));
}

// If `exception()` returns anything we raise that
mp_obj_t exception_obj = task_exception(self_in);
if (exception_obj != mp_const_none) {
nlr_raise(mp_make_raise_obj(exception_obj));
}

// If not StopIteration, bail early
if (!mp_obj_is_subclass_fast(MP_OBJ_FROM_PTR(mp_obj_get_type(self->data)), MP_OBJ_FROM_PTR(&mp_type_StopIteration))) {
return mp_const_none;
}

return mp_obj_exception_get_value(self->data);
}
STATIC MP_DEFINE_CONST_FUN_OBJ_1(task_result_obj, task_result);

STATIC mp_obj_t task_cancelled(mp_obj_t self_in) {
mp_obj_task_t *self = MP_OBJ_TO_PTR(self_in);

if (!TASK_IS_DONE(self)) {
// If task isn't done it can't possibly be cancelled, and would instead
// be considered "cancelling" even if a cancel was requested until it
// has fully completed.
return mp_obj_new_bool(false);
}

return mp_obj_new_bool(IS_CANCELLED_ERROR(self->data));
}
STATIC MP_DEFINE_CONST_FUN_OBJ_1(task_cancelled_obj, task_cancelled);

STATIC mp_obj_t task_cancel(mp_obj_t self_in) {
mp_obj_task_t *self = MP_OBJ_TO_PTR(self_in);
// Check if task is already finished.
Expand Down Expand Up @@ -276,6 +390,24 @@ STATIC void task_attr(mp_obj_t self_in, qstr attr, mp_obj_t *dest) {
} else if (attr == MP_QSTR___await__) {
dest[0] = MP_OBJ_FROM_PTR(&task_await_obj);
dest[1] = self_in;
} else if (attr == MP_QSTR_add_done_callback) {
dest[0] = MP_OBJ_FROM_PTR(&task_add_done_callback_obj);
dest[1] = self_in;
} else if (attr == MP_QSTR_remove_done_callback) {
dest[0] = MP_OBJ_FROM_PTR(&task_remove_done_callback_obj);
dest[1] = self_in;
} else if (attr == MP_QSTR_get_coro) {
dest[0] = MP_OBJ_FROM_PTR(&task_get_coro_obj);
dest[1] = self_in;
} else if (attr == MP_QSTR_result) {
dest[0] = MP_OBJ_FROM_PTR(&task_result_obj);
dest[1] = self_in;
} else if (attr == MP_QSTR_exception) {
dest[0] = MP_OBJ_FROM_PTR(&task_exception_obj);
dest[1] = self_in;
} else if (attr == MP_QSTR_cancelled) {
dest[0] = MP_OBJ_FROM_PTR(&task_cancelled_obj);
dest[1] = self_in;
}
} else if (dest[1] != MP_OBJ_NULL) {
// Store
Expand All @@ -289,6 +421,15 @@ STATIC void task_attr(mp_obj_t self_in, qstr attr, mp_obj_t *dest) {
}
}

STATIC mp_obj_t task_unary_op(mp_unary_op_t op, mp_obj_t o_in) {
switch (op) {
case MP_UNARY_OP_HASH:
return MP_OBJ_NEW_SMALL_INT((mp_uint_t)o_in);
default:
return MP_OBJ_NULL; // op not supported
}
}

STATIC mp_obj_t task_getiter(mp_obj_t self_in, mp_obj_iter_buf_t *iter_buf) {
(void)iter_buf;
mp_obj_task_t *self = MP_OBJ_TO_PTR(self_in);
Expand Down Expand Up @@ -337,7 +478,8 @@ STATIC MP_DEFINE_CONST_OBJ_TYPE(
MP_TYPE_FLAG_ITER_IS_CUSTOM,
make_new, task_make_new,
attr, task_attr,
iter, &task_getiter_iternext
iter, &task_getiter_iternext,
unary_op, task_unary_op
);

/******************************************************************************/
Expand Down
2 changes: 1 addition & 1 deletion frozen/Adafruit_CircuitPython_asyncio
4 changes: 4 additions & 0 deletions locale/circuitpython.pot
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,10 @@ msgstr ""
msgid "3-arg pow() not supported"
msgstr ""

#: extmod/modasyncio.c
msgid ">1 callback unsupported"
msgstr ""

#: ports/atmel-samd/common-hal/alarm/pin/PinAlarm.c
#: ports/atmel-samd/common-hal/countio/Counter.c
#: ports/atmel-samd/common-hal/rotaryio/IncrementalEncoder.c
Expand Down
1 change: 1 addition & 0 deletions ports/stm/boards/thunderpack_v12/mpconfigboard.mk
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ CIRCUITPY_BLEIO_HCI = 0
CIRCUITPY_BUSDEVICE = 0
CIRCUITPY_NVM = 1
CIRCUITPY_SYNTHIO = 0
CIRCUITPY_ULAB = 0
CIRCUITPY_ZLIB = 0

MCU_SERIES = F4
Expand Down
54 changes: 54 additions & 0 deletions tests/extmod/asyncio_task_add_done_callback.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# Test the Task.add_done_callback() method

try:
import asyncio
except ImportError:
print("SKIP")
raise SystemExit


async def task(t, exc=None):
if t >= 0:
await asyncio.sleep(t)
if exc:
raise exc


def done_callback(t, er):
print("done", repr(t), repr(er))


async def main():
# Tasks that aren't done only execute done callback after finishing
print("=" * 10)
t = asyncio.create_task(task(-1))
t.add_done_callback(done_callback)
print("Waiting for task to complete")
await asyncio.sleep(0)
print("Task has completed")

# Task that are done run the callback immediately
print("=" * 10)
t = asyncio.create_task(task(-1))
await asyncio.sleep(0)
print("Task has completed")
t.add_done_callback(done_callback)
print("Callback Added")

# Task that starts, runs and finishes without an exception should return None
print("=" * 10)
t = asyncio.create_task(task(0.01))
t.add_done_callback(done_callback)
try:
t.add_done_callback(done_callback)
except RuntimeError as e:
print("Second call to add_done_callback emits error:", repr(e))

# Task that raises immediately should still run done callback
print("=" * 10)
t = asyncio.create_task(task(-1, ValueError))
t.add_done_callback(done_callback)
await asyncio.sleep(0)


asyncio.run(main())
12 changes: 12 additions & 0 deletions tests/extmod/asyncio_task_add_done_callback.py.exp
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
==========
Waiting for task to complete
done <Task> StopIteration()
Task has completed
==========
Task has completed
done <Task> StopIteration()
Callback Added
==========
Second call to add_done_callback emits error: RuntimeError('>1 callback unsupported',)
==========
done <Task> ValueError()
54 changes: 54 additions & 0 deletions tests/extmod/asyncio_task_cancelled.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# Test the `Task.cancelled` method

try:
import asyncio
except ImportError:
print("SKIP")
raise SystemExit


async def task(t):
await asyncio.sleep(t)


async def main():
# Cancel task immediately doesn't mark the task as cancelled
print("=" * 10)
t = asyncio.create_task(task(2))
t.cancel()
print("Expecting task to not be cancelled because it is not done:", t.cancelled())

# Cancel task immediately and wait for cancellation to complete
print("=" * 10)
t = asyncio.create_task(task(2))
t.cancel()
await asyncio.sleep(0)
print("Expecting Task to be Cancelled:", t.cancelled())

# Cancel task and wait for cancellation to complete
print("=" * 10)
t = asyncio.create_task(task(2))
await asyncio.sleep(0.01)
t.cancel()
await asyncio.sleep(0)
print("Expecting Task to be Cancelled:", t.cancelled())

# Cancel task multiple times after it has started
print("=" * 10)
t = asyncio.create_task(task(2))
await asyncio.sleep(0.01)
for _ in range(4):
t.cancel()
await asyncio.sleep(0.01)

print("Expecting Task to be Cancelled:", t.cancelled())

# Cancel task after it has finished
print("=" * 10)
t = asyncio.create_task(task(0.01))
await asyncio.sleep(0.05)
t.cancel()
print("Expecting task to not be Cancelled:", t.cancelled())


asyncio.run(main())
10 changes: 10 additions & 0 deletions tests/extmod/asyncio_task_cancelled.py.exp
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
==========
Expecting task to not be cancelled because it is not done: False
==========
Expecting Task to be Cancelled: True
==========
Expecting Task to be Cancelled: True
==========
Expecting Task to be Cancelled: True
==========
Expecting task to not be Cancelled: False
64 changes: 64 additions & 0 deletions tests/extmod/asyncio_task_exception.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
# Test the Task.exception() method

try:
import asyncio
except ImportError:
print("SKIP")
raise SystemExit


async def task(t, exc=None):
if t >= 0:
await asyncio.sleep(t)
if exc:
raise exc


async def main():
# Task that is not done yet raises an InvalidStateError
print("=" * 10)
t = asyncio.create_task(task(1))
await asyncio.sleep(0)
try:
t.exception()
assert False, "Should not get here"
except Exception as e:
print("Tasks that aren't done yet raise an InvalidStateError:", repr(e))

# Task that is cancelled raises CancelledError
print("=" * 10)
t = asyncio.create_task(task(1))
t.cancel()
await asyncio.sleep(0)
try:
print(repr(t.exception()))
print(t.cancelled())
assert False, "Should not get here"
except asyncio.CancelledError as e:
print("Cancelled tasks cannot retrieve exception:", repr(e))

# Task that starts, runs and finishes without an exception should return None
print("=" * 10)
t = asyncio.create_task(task(0.01))
await t
print("None when no exception:", t.exception())

# Task that raises immediately should return that exception
print("=" * 10)
t = asyncio.create_task(task(-1, ValueError))
try:
await t
assert False, "Should not get here"
except ValueError as e:
pass
print("Returned Exception:", repr(t.exception()))

# Task returns `none` when somehow an exception isn't in data
print("=" * 10)
t = asyncio.create_task(task(-1))
await t
t.data = "Example"
print(t.exception())


asyncio.run(main())
Loading