Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Timeout does not stop execution #682

Open
matt3o opened this issue Jul 18, 2024 · 5 comments
Open

Timeout does not stop execution #682

matt3o opened this issue Jul 18, 2024 · 5 comments
Assignees
Labels

Comments

@matt3o
Copy link

matt3o commented Jul 18, 2024

Describe the bug
What did I want: The timeout shall immediately stop the execution of the currently running state. I have e.g. blocking database calls which shall just end in that case. That is at least what I expected that the timeout would do.

However in the default implementation, the timeout does not interrupt the execution of the state. By now, I know the timeout runs in a separate thread.
I tried to raise a TimeoutError in the thread (so in the timeout_handler() function), however, that does not work as this exception is never caught in the main loop. I think it would be possible to at least catch the exception over event.state.runner, if I am not mistaken.
This approach should not work on a blocked main process loop, which is exactly where I would've wanted this feature.
My workaround will probably be to call asyncio.wait_for / asyncio.timeout

Minimal working example

from transitions.extensions.asyncio import AsyncMachine, AsyncState, AsyncTimeout
from transitions import State
from functools import partial
import asyncio
from enum import Enum
# import collections
from transitions.extensions.states import add_state_features, Tags
from transitions.extensions import GraphMachine
import logging
import time

logger = logging.getLogger("tmp")

logging.getLogger('transitions').setLevel(logging.INFO)
logging.getLogger('asyncio').setLevel(logging.WARNING)


class States(str, Enum):
    start = "Start handling of request"
    parse_input = "Parse Input"
    finished = "Finished handling of request"


states = [
    States.start,
    {"name": States.parse_input, 'timeout': 5, "on_timeout": "timeout_handler"},
    {"name":States.finished, "final": True},
]


@add_state_features(Tags, AsyncTimeout)
class CustomGraphMachine(GraphMachine):
    pass

@add_state_features(Tags, AsyncTimeout)
class CustomAsyncMachine(AsyncMachine):
    pass



class ChatStateMachine:
    def __init__(self):
        self.machine = CustomAsyncMachine(
            model=self, 
            states=states,
            auto_transitions=True,
            initial=States.start,
            send_event=True, 
            on_exception='handle_error',
            name="ChatStateMachine",
            before_state_change=[self.default_on_exit], 
            after_state_change=[self.default_on_enter], 
            queued=True,
            )
        self.start = time.time()


    async def default_on_enter(self, event):  
        # print(event)
        state = event.state.name  
        print(f"Entering state: {state}")  
        # Fire and forget the async database tracking logic  
        asyncio.create_task(self.update_database(state))  
        # await asyncio.sleep(0.01)
  
    async def default_on_exit(self, event):  
        # print(event)
        state = event.state.name  
        print(f"Exiting state: {state}")  
        # Fire and forget the async database tracking logic  
        # asyncio.create_task(self.update_database(state))  

    async def update_database(self, state_name):
        # Simulate async database update  
        await asyncio.sleep(0.1)  
        print(f"Database updated with state: {state_name}")


    async def on_enter_parse_input(self, event):
        print(f"on_enter_parse_input {time.time() - self.start:.2f} seconds")
        await asyncio.sleep(20)
        print(f"resuming after sleep {time.time() - self.start:.2f} seconds")
        # await asyncio.wait_for(asyncio.sleep(30), 6)

    async def handle_error(self, event):
        print(f"Received error: {event.error=}")
        # if not event.state.name == States.fatal_error:
        #     await self.to_fatal_error_raised()
        # raise event.error
        del event.error  

    async def timeout_handler(self, event):
        print(f"timeout_handler() after {time.time() - self.start:.2f} seconds")
        print("TIMEOUT")
        raise TimeoutError
    
    async def run(self):  
        await self.to_parse_input()  
        await self.to_finished()
        await self.to_finished()

async def test_state_machine():  
    chat_sm = ChatStateMachine()  
    await chat_sm.run()

  
if __name__ == "__main__":
    asyncio.run(test_state_machine())  

Output of that snippet (filenames censored):

Exiting state: start
on_enter_parse_input 0.00 seconds
timeout_handler() after 5.00 seconds
TIMEOUT
resuming after sleep 20.01 seconds
Entering state: parse_input
Exiting state: parse_input
Entering state: finished
Exiting state: finished
Entering state: finished
Task exception was never retrieved
future: <Task finished name='Task-3' coro=<AsyncTimeout.create_timer.<locals>._timeout() done, defined at /home//venvs/lib/python3.10/site-packages/transitions/extensions/asyncio.py:671> exception=TimeoutError()>
Traceback (most recent call last):
  File "/home//venvs/clean_backend/lib/python3.10/site-packages/transitions/extensions/asyncio.py", line 674, in _timeout
    await asyncio.shield(self._process_timeout(event_data))
  File "/home//venvs/clean_backend/lib/python3.10/site-packages/transitions/extensions/asyncio.py", line 682, in _process_timeout
    await event_data.machine.callbacks(self.on_timeout, event_data)
  File "/home//venvs/clean_backend/lib/python3.10/site-packages/transitions/extensions/asyncio.py", line 341, in callbacks
    await self.await_all([partial(event_data.machine.callback, func, event_data) for func in funcs])
  File "/home//venvs/clean_backend/lib/python3.10/site-packages/transitions/extensions/asyncio.py", line 371, in await_all
    return await asyncio.gather(*[func() for func in callables])
  File "/home//venvs/clean_backend/lib/python3.10/site-packages/transitions/extensions/asyncio.py", line 359, in callback
    await res
  File "//state_machine_timeout2.py", line 96, in timeout_handler
    raise TimeoutError
TimeoutError

Expected behavior
If you activate the line await asyncio.wait_for(asyncio.sleep(30), 6) you can see the expected bevhaviour. The code now correctly raises a TimeoutError which can be caught and handeld in the error handler.

@aleneum
Copy link
Member

aleneum commented Aug 13, 2024

Hello @matt3o,

this issue can be boiled down to this:

import asyncio

from transitions.extensions import AsyncMachine
from transitions.extensions.asyncio import AsyncTimeout
from transitions.extensions.states import add_state_features


@add_state_features(AsyncTimeout)
class MyMachine(AsyncMachine):

    async def on_enter_B(self):
        print("ENTERED")
        await asyncio.sleep(2)
        print("DONE")

    async def handle_timeout(self):
        print("CANCEL")
        await self.to_A()


async def run():
    m = MyMachine(states=['A', {"name": "B", "timeout": 1, "on_timeout": "handle_timeout"}], initial='A')
    await m.to_B()

asyncio.run(run())

Obviously, self.to_A() should cancel the execution of on_enter_B. I need to check why this is not the case. I assume there are some false assumptions about the context of the tasks.

@aleneum
Copy link
Member

aleneum commented Aug 13, 2024

Without queuing one could transition away from the current state and cause a CancelledError. The workaround is to prevent copying the current context with create_task(..., context=Context()). Your adapted code:

from contextvars import Context

from transitions.extensions.asyncio import AsyncMachine, AsyncTimeout
import asyncio
from enum import Enum
from transitions.extensions.states import add_state_features, Tags
from transitions.extensions import GraphMachine
import logging
import time

logger = logging.getLogger("tmp")

logging.getLogger('transitions').setLevel(logging.INFO)
logging.getLogger('asyncio').setLevel(logging.WARNING)


class States(str, Enum):
    start = "Start handling of request"
    parse_input = "Parse Input"
    finished = "Finished handling of request"
    error = "Error"


states = [
    States.start,
    {"name": States.parse_input, 'timeout': 5, "on_timeout": "timeout_handler"},
    {"name": States.finished, "final": True},
    States.error
]


@add_state_features(Tags, AsyncTimeout)
class CustomGraphMachine(GraphMachine):
    pass


@add_state_features(Tags, AsyncTimeout)
class CustomAsyncMachine(AsyncMachine):
    pass


class ChatStateMachine:
    def __init__(self):
        self.machine = CustomAsyncMachine(
            model=self,
            states=states,
            auto_transitions=True,
            initial=States.start,
            send_event=True,
            on_exception='handle_error',
            name="ChatStateMachine",
            before_state_change=[self.default_on_exit],
            after_state_change=[self.default_on_enter],
        )
        self.start = time.time()

    async def default_on_enter(self, event):
        # print(event)
        state = event.state.name
        print(f"Entering state: {state}")
        # Fire and forget the async database tracking logic
        asyncio.create_task(self.update_database(state))
        # await asyncio.sleep(0.01)

    async def default_on_exit(self, event):
        # print(event)
        state = event.state.name
        print(f"Exiting state: {state}")
        # Fire and forget the async database tracking logic
        # asyncio.create_task(self.update_database(state))

    async def update_database(self, state_name):
        # Simulate async database update
        await asyncio.sleep(0.1)
        print(f"Database updated with state: {state_name}")

    async def on_enter_parse_input(self, event):
        print(f"on_enter_parse_input {time.time() - self.start:.2f} seconds")
        await asyncio.sleep(20)
        print(f"resuming after sleep {time.time() - self.start:.2f} seconds")
        # await asyncio.wait_for(asyncio.sleep(30), 6)

    async def handle_error(self, event):
        print(f"Received error: {event.error=}")
        # if not event.state.name == States.fatal_error:
        #     await self.to_fatal_error_raised()
        # raise event.error
        del event.error

    async def timeout_handler(self, event):
        print(f"timeout_handler() after {time.time() - self.start:.2f} seconds")
        print("TIMEOUT")
        await asyncio.create_task(self.to_error(), context=Context())

    async def run(self):
        await self.to_parse_input()
        await asyncio.sleep(0.5)  # let the update database task finish
        await self.to_finished()


async def run_state_machine():
    chat_sm = ChatStateMachine()
    await chat_sm.run()


if __name__ == "__main__":
    asyncio.run(run_state_machine())

This is just a workaround. A part of the fix will/could be that this 'context switch' will be handled by AsyncTimeout.

@aleneum
Copy link
Member

aleneum commented Aug 13, 2024

I don't see an easy way to make raise TimeoutError work in handle_timeout since a timeout should not block a transition. It may happen after the transition has been successful (outside of the try/catch-block that forwards things to handle_error) but while the model/machine is still in a time-critical state. With an empty context, the created timeout handler can, however, either transition away directly (when queued=False) or explicitly force a context switch. I added this change to a branch as mentioned above. I hope this example illustrates how this could work.

import asyncio

from transitions.extensions import AsyncMachine
from transitions.extensions.asyncio import AsyncTimeout, AsyncEventData
from transitions.extensions.states import add_state_features


@add_state_features(AsyncTimeout)
class MyMachine(AsyncMachine):

    async def on_enter_A(self, event_data: AsyncEventData):
        print("Enter A...")

    async def on_enter_B(self, event_data: AsyncEventData):
        print("Enter B...")
        await asyncio.sleep(event_data.kwargs.get("sleep", 0))
        print("... Done in B.")

    async def handle_timeout(self, event_data: AsyncEventData):
        print("Timeout!")
        if event_data.machine.has_queue:
            await event_data.machine.switch_model_context(self)
        await self.to_A()

    async def handle_error(self, event_data):
        print(f"Error: {event_data.error=}")


async def run():
    m = MyMachine(states=['A', {"name": "B", "timeout": 1, "on_timeout": "handle_timeout"}],
                  transitions=[{"trigger": "try_something", "source": "*",
                                "dest": "A", "conditions": [lambda event_data: False]}],
                  initial='A', queued=True, send_event=True, on_exception="handle_error")
    print("First round")
    await m.to_B(sleep=2)
    print("Second round")
    await m.to_B(sleep=0.5)
    while not m.is_A():
        await m.try_something()  # when send_event=True, this will always return True
        print("Try something else...")
        await asyncio.sleep(0.2)


asyncio.run(run())

Output:

First round
Enter B...
Timeout!
Error: event_data.error=CancelledError()
Enter A...
Second round
Enter B...
... Done in B.
Try something else...
Try something else...
Try something else...
Timeout!
Enter A...

If you have some feedback, let me know.

aleneum added a commit that referenced this issue Aug 20, 2024
- see #682 for details
- renamed `AsyncMachine.switch_model_context` to `cancel_running_transitions`
@aleneum
Copy link
Member

aleneum commented Aug 20, 2024

I updated the development branch with another approach which hopefully tackles most scenarios when errors are (re-)raised and keeps internals internal since it does not require to manually cancel ongoing transitions.

I am trying to wrap my head around all the possible outcomes for an async timeout. When a timeout raises an exception and handle_error is not present OR raises an exception itself, a running transition should be halted and the transition queue -- if present -- should be cleared. Halting an ongoing transition will raise a CancelledError which means that handle_error can be called twice but with a different error. If a CancelledError is 're-raised' it will be caught by transitions as CancelledError are expected when transition interrupt each other. However, handle_error may also raise another error (e.g. RuntimeException). In this cause it will be forwarded to the event caller. In both cases, the event queue will be cleared.

This means an exception raised in a timeout cannot be 'forwarded' to the initial event caller. However, a CancelledException can be intercepted and -- if the cancellation has been caused by a timeout -- raised as another error (e.g. TimeoutException). How to determine wether the cancellation has been triggered by the timeout depends on the design of the state machine. A sequential order of TimeourError -> CancellationError may not always be accurate. Another approach could be to check the CancellationError error message. Transition will set this to either the name of the event that causes the current transition to be cancelled or to 'timeout' when the cancellation was caused by one.

Diagram

flowchart TD
    A[TimeoutState] --[timeout]--> B
    B[on_timeout] --> L["trigger event?"]
    L --"yes & queue"--> X
    L --"yes & !queue"--> D
    L --"no"--> X[continue...]
    B[on_timeout] --[error]--> C
    C["on_exception(TimeoutError)"] --[error]--> D
    C --> L
    D[cancel_running_transitions] --> E
    E[running tasks?] --yes--> G
    E --"no"--> X
    G["on_exception(CancelledError)"] --> X
    G --error--> I
    I --"other"-->  M["raised to event caller"]
    I[clear_queue] --"CancelledError"--> X
Loading

Sample code

import asyncio
from asyncio import CancelledError

from transitions.extensions import AsyncMachine
from transitions.extensions.asyncio import AsyncTimeout, AsyncEventData
from transitions.extensions.states import add_state_features

@add_state_features(AsyncTimeout)
class MyMachine(AsyncMachine):

    async def on_enter_A(self, event_data: AsyncEventData):
        print("Enter A...")

    async def on_enter_B(self, event_data: AsyncEventData):
        self.timeout_called = False
        await self.to_C()  # this will not be called when B is blocking
        seconds = event_data.kwargs.get("sleep", 100)
        await asyncio.sleep(seconds)

    async def on_enter_C(self, event_data: AsyncEventData):
        print("We are in C now. B was (hopefully!) non-blocking")

    async def handle_timeout(self, event_data: AsyncEventData):
        print("Trigger Timeout!")
        self.timeout_called = True
        raise TimeoutError()

    async def handle_error(self, event_data):
        print(f"Handle Error: {event_data.error=}")
        if isinstance(event_data.error, CancelledError) and self.timeout_called:
            print("CancelledError after a timeout!")
            raise TimeoutError()
        raise event_data.err


async def run():
    m = MyMachine(states=['A', {"name": "B", "timeout": 1, "on_timeout": "handle_timeout"},
                               {"name": "C", "timeout": 1, "on_timeout": "handle_timeout"}],
                  transitions=[{"trigger": "try_something", "source": "*",
                                "dest": "A", "conditions": [lambda event_data: False]}],
                  initial='A', queued=True, send_event=True, on_exception="handle_error")
    print("# Scenario A: No queue and blocking to_B...")
    try:
        await m.to_B()
    except TimeoutError:
        print("Caught a timeout without queue.")
    print("# Scenario B: No queue and non-blocking to_B...")
    await m.to_B(sleep=0.1)
    await asyncio.sleep(2)

    m = MyMachine(states=['A', {"name": "B", "timeout": 1, "on_timeout": "handle_timeout"},
                               {"name": "C", "timeout": 1, "on_timeout": "handle_timeout"}],
                  transitions=[{"trigger": "try_something", "source": "*",
                                "dest": "A", "conditions": [lambda event_data: False]}],
                  initial='A', queued=True, send_event=True, on_exception="handle_error")
    print("# Scenario C: queue and blocking to_B...")
    try:
        await m.to_B()
    except TimeoutError:
        print("Caught a timeout without queue.")
    print("# Scenario D: queue and non-blocking to_B...")
    await m.to_B(sleep=0.1)
    await asyncio.sleep(2)
asyncio.run(run())

Output

# Scenario A: No queue and blocking to_B...
Trigger Timeout!
Handle Error: event_data.error=TimeoutError()
Handle Error: event_data.error=CancelledError('timeout')
CancelledError after a timeout!
Caught a timeout without queue.
# Scenario B: No queue and non-blocking to_B...
We are in C now. B was (hopefully!) non-blocking
Trigger Timeout!
Handle Error: event_data.error=TimeoutError()
# Scenario C: queue and blocking to_B...
Trigger Timeout!
Handle Error: event_data.error=TimeoutError()
Handle Error: event_data.error=CancelledError('timeout')
CancelledError after a timeout!
Caught a timeout without queue.
# Scenario D: queue and non-blocking to_B...
We are in C now. B was (hopefully!) non-blocking
Trigger Timeout!
Handle Error: event_data.error=TimeoutError()

aleneum added a commit that referenced this issue Aug 20, 2024
@matt3o
Copy link
Author

matt3o commented Oct 1, 2024

Sorry for the super late response @aleneum. I'll try it this week or next week and come back to you.
Also many thanks for the quick response, I lost track of it before my holidays ❤️

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants