Skip to content

Commit

Permalink
Merge pull request #3547 from fable-compiler/python-async
Browse files Browse the repository at this point in the history
[Python] Refactor python async trampoline handling
  • Loading branch information
dbrattli authored Oct 16, 2023
2 parents 6b2dabb + a552eea commit 0e8ebb1
Show file tree
Hide file tree
Showing 4 changed files with 154 additions and 104 deletions.
25 changes: 16 additions & 9 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,22 @@ pythonVersion = "3.10"
typeCheckingMode = "strict"

[tool.isort]
profile = "black"
atomic = true
lines_after_imports = 2
lines_between_types = 1
multi_line_output = 3 # corresponds to -m flag
include_trailing_comma = true # corresponds to -tc flag
line_length = 88
known_third_party = ["cognite","pytest"]
py_version=310

[tool.ruff]
# Keep in sync with .pre-commit-config.yaml
line-length = 120
ignore = []
target-version = "py310"
select = ["E", "W", "F", "I", "T", "RUF", "TID", "UP"]
exclude = ["tests", "build", "temp", "src/fable_library", "src/fable_library_rust", "src/fable_library_php"]
include =["*.py"]

[tool.ruff.pydocstyle]
convention = "google"

[tool.ruff.isort]
lines-after-imports = 2


[build-system]
requires = ["poetry-core>=1.0.0"]
Expand Down
115 changes: 77 additions & 38 deletions src/fable-library-py/fable_library/async_.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,20 @@
import asyncio
from __future__ import annotations

import asyncio
from asyncio import Future, ensure_future
from collections.abc import Awaitable, Callable, Iterable
from concurrent.futures import ThreadPoolExecutor
from threading import Timer
from typing import (
Any,
Awaitable,
Callable,
Iterable,
List,
Literal,
Optional,
TypeVar,
Union,
)

from .async_builder import (
Continuations,
Async,
CancellationToken,
Continuations,
IAsyncContext,
OperationCanceledError,
Trampoline,
Expand All @@ -30,8 +26,10 @@
)

# F# generated code (from Choice.fs)
from .choice import Choice_makeChoice1Of2 # type: ignore
from .choice import Choice_makeChoice2Of2 # type: ignore
from .choice import (
Choice_makeChoice1Of2, # type: ignore
Choice_makeChoice2Of2, # type: ignore
)
from .task import TaskCompletionSource


Expand All @@ -47,6 +45,7 @@ def cont(ctx: IAsyncContext[Any]):

default_cancellation_token = CancellationToken()


# see AsyncBuilder.Delay
def delay(generator: Callable[[], Async[_T]]):
def cont(ctx: IAsyncContext[_T]):
Expand All @@ -55,7 +54,7 @@ def cont(ctx: IAsyncContext[_T]):
return protected_cont(cont)


def create_cancellation_token(arg: Union[int, bool, None] = None) -> CancellationToken:
def create_cancellation_token(arg: int | bool | None = None) -> CancellationToken:
cancelled = arg if isinstance(arg, bool) else False
token = CancellationToken(cancelled)
if isinstance(arg, int):
Expand All @@ -81,7 +80,6 @@ def is_cancellation_requested(token: CancellationToken) -> bool:
def sleep(millisecondsDueTime: int) -> Async[None]:
def cont(ctx: IAsyncContext[None]):
def cancel():
timer.cancel()
ctx.on_cancel(OperationCanceledError())

token_id = ctx.cancel_token.add_listener(cancel)
Expand All @@ -90,32 +88,31 @@ def timeout():
ctx.cancel_token.remove_listener(token_id)
ctx.on_success(None)

timer = Timer(millisecondsDueTime / 1000.0, timeout)
timer.start()
due_time = millisecondsDueTime / 1000.0
ctx.trampoline.run_later(timeout, due_time)

return protected_cont(cont)


def ignore(computation: Async[Any]) -> Async[None]:
def binder(_: Optional[Any] = None) -> Async[None]:
def binder(_: Any | None = None) -> Async[None]:
return protected_return(None)

return protected_bind(computation, binder)


def parallel(computations: Iterable[Async[_T]]) -> Async[List[_T]]:
def delayed() -> Async[List[_T]]:
def parallel(computations: Iterable[Async[_T]]) -> Async[list[_T]]:
def delayed() -> Async[list[_T]]:
tasks: Iterable[Future[_T]] = map(start_as_task, computations) # type: ignore
all: Future[List[_T]] = asyncio.gather(*tasks)

all: Future[list[_T]] = asyncio.gather(*tasks)
return await_task(all)

return delay(delayed)


def sequential(computations: Iterable[Async[_T]]) -> Async[List[Optional[_T]]]:
def delayed() -> Async[List[Optional[_T]]]:
results: List[_T] = []
def sequential(computations: Iterable[Async[_T]]) -> Async[list[_T | None]]:
def delayed() -> Async[list[_T | None]]:
results: list[_T] = []

def _arrow20(_arg: Async[_T]) -> Async[None]:
cmp: Async[_T] = _arg
Expand All @@ -127,7 +124,7 @@ def _arrow19(_arg_1: _T) -> Async[None]:

return singleton.Bind(cmp, _arrow19)

def _arrow21(__unit: Literal[None] = None) -> Async[List[_T]]:
def _arrow21(__unit: Literal[None] = None) -> Async[list[_T]]:
return singleton.Return(results)

return singleton.Combine(
Expand Down Expand Up @@ -189,17 +186,15 @@ def callback(conts: Continuations[_T]) -> None:
continuation = conts

task.add_done_callback(done)
return from_continuations(callback) # type: ignore
return from_continuations(callback)


def start_with_continuations(
computation: Async[_T],
continuation: Optional[Callable[[_T], None]] = None,
exception_continuation: Optional[Callable[[Exception], None]] = None,
cancellation_continuation: Optional[
Callable[[OperationCanceledError], None]
] = None,
cancellation_token: Optional[CancellationToken] = None,
continuation: Callable[[_T], None] | None = None,
exception_continuation: Callable[[Exception], None] | None = None,
cancellation_continuation: Callable[[OperationCanceledError], None] | None = None,
cancellation_token: CancellationToken | None = None,
) -> None:
"""Runs an asynchronous computation.
Expand All @@ -223,7 +218,7 @@ def start_with_continuations(


def start_as_task(
computation: Async[_T], cancellation_token: Optional[CancellationToken] = None
computation: Async[_T], cancellation_token: CancellationToken | None = None
) -> Awaitable[_T]:
"""Executes a computation in the thread pool.
Expand Down Expand Up @@ -251,24 +246,63 @@ def cancel(_: OperationCanceledError) -> None:
return tcs.get_task()


def start_child(computation: Async[_T], ms: int | None = None) -> Async[Async[_T]]:
if ms:
computation_with_timeout = protected_bind(
parallel(computation, throw_after(ms)), lambda xs: protected_return(xs[0])
)
return start_child(computation_with_timeout)

task = start_as_task(computation)

def cont(ctx: IAsyncContext[Async[_T]]) -> None:
def on_success(_: Async[_T]) -> None:
ctx.on_success(await_task(task))

on_error = ctx.on_error
on_cancel = ctx.on_cancel
trampoline = ctx.trampoline
cancel_token = ctx.cancel_token

ctx_ = IAsyncContext.create(
on_success, on_error, on_cancel, trampoline, cancel_token
)
computation(ctx_)

return protected_cont(cont)


def start_immediate(
computation: Async[Any],
cancellation_token: Optional[CancellationToken] = None,
cancellation_token: CancellationToken | None = None,
) -> None:
"""Start computation immediately.
Runs an asynchronous computation, starting immediately on the
current operating system thread
"""
return start_with_continuations(computation, cancellation_token=cancellation_token)
try:
asyncio.get_event_loop()
except RuntimeError:

async def runner() -> None:
return start_with_continuations(
computation, cancellation_token=cancellation_token
)

return asyncio.run(runner())
else:
return start_with_continuations(
computation, cancellation_token=cancellation_token
)


_executor: Optional[ThreadPoolExecutor] = None
_executor: ThreadPoolExecutor | None = None


def start(
computation: Callable[[IAsyncContext[Any]], None],
cancellation_token: Optional[CancellationToken] = None,
cancellation_token: CancellationToken | None = None,
) -> None:
"""Starts the asynchronous computation.
Expand All @@ -290,16 +324,16 @@ def worker() -> None:

def run_synchronously(
computation: Async[_T],
cancellation_token: Optional[CancellationToken] = None,
) -> Optional[_T]:
cancellation_token: CancellationToken | None = None,
) -> _T | None:
"""Run computation synchronously.
Runs an asynchronous computation and awaits its result on the
calling thread. Propagates an exception should the computation yield
one. This call is blocking.
"""

async def runner() -> Optional[_T]:
async def runner() -> _T | None:
return await start_as_task(computation, cancellation_token=cancellation_token)

return asyncio.run(runner())
Expand All @@ -312,11 +346,16 @@ async def runner() -> Optional[_T]:
"cancellation_token",
"catch_async",
"create_cancellation_token",
"delay",
"from_continuations",
"ignore",
"is_cancellation_requested",
"parallel",
"sequential",
"sleep",
"start",
"start_as_task",
"start_child",
"start_immediate",
"start_with_continuations",
]
Loading

0 comments on commit 0e8ebb1

Please sign in to comment.